License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
Adrian Lanzafame 2019-04-23 20:30:26 +10:00
parent 6d593799ba
commit d5ecd9ef6a
No known key found for this signature in database
GPG Key ID: 87E40C5D62EAE192
2 changed files with 37 additions and 20 deletions

View File

@ -38,19 +38,17 @@ func NewChecker(metrics *Store, threshold float64) *Checker {
} }
} }
// CheckPeers will trigger alerts all latest metrics from the given peerset // CheckPeers will trigger alerts based on the latest metrics from the given peerset
// when they have expired and no alert has been sent before. // when they have expired and no alert has been sent before.
func (mc *Checker) CheckPeers(peers []peer.ID) error { func (mc *Checker) CheckPeers(peers []peer.ID) error {
for _, peer := range peers { for _, peer := range peers {
// shortcut checking all metrics based on heartbeat
// failure detection
if mc.Failed(peer) {
err := mc.alert(peer, "ping")
if err != nil {
return err
}
}
for _, metric := range mc.metrics.PeerMetrics(peer) { for _, metric := range mc.metrics.PeerMetrics(peer) {
if mc.FailedMetric(metric.Name, peer) {
err := mc.alert(peer, metric.Name)
if err != nil {
return err
}
}
err := mc.alertIfExpired(metric) err := mc.alertIfExpired(metric)
if err != nil { if err != nil {
return err return err
@ -133,20 +131,35 @@ func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]pe
// Peers that are not present in the metrics store will return // Peers that are not present in the metrics store will return
// as failed. // as failed.
func (mc *Checker) Failed(pid peer.ID) bool { func (mc *Checker) Failed(pid peer.ID) bool {
_, _, _, result := mc.failed(pid) _, _, _, result := mc.failed("ping", pid)
return result
}
// FailedMetric is the same as Failed but can use any metric type,
// not just ping.
func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
_, _, _, result := mc.failed(metric, pid)
return result return result
} }
// failed returns all the values involved in making the decision // failed returns all the values involved in making the decision
// as to whether a peer has failed or not. This mainly for debugging // as to whether a peer has failed or not. This mainly for debugging
// purposes. // purposes.
func (mc *Checker) failed(pid peer.ID) (float64, []float64, float64, bool) { func (mc *Checker) failed(metric string, pid peer.ID) (float64, []float64, float64, bool) {
latest := mc.metrics.PeerLatest("ping", pid) latest := mc.metrics.PeerLatest(metric, pid)
if latest == nil { if latest == nil {
return 0.0, nil, 0.0, true return 0.0, nil, 0.0, true
} }
v := time.Now().UnixNano() - latest.ReceivedAt v := time.Now().UnixNano() - latest.ReceivedAt
dv := mc.metrics.Distribution("ping", pid) dv := mc.metrics.Distribution(metric, pid)
phiv := phi(float64(v), dv) // one metric isn't enough to calculate a distribution
return float64(v), dv, phiv, phiv >= mc.threshold // alerting/failure detection will fallback to the metric-expiring
// method
switch {
case len(dv) < 2 && !latest.Expired():
return float64(v), dv, 0.0, false
default:
phiv := phi(float64(v), dv)
return float64(v), dv, phiv, phiv >= mc.threshold
}
} }

View File

@ -17,12 +17,12 @@ import (
"github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/test"
) )
func TestChecker(t *testing.T) { func TestCheckPeers(t *testing.T) {
metrics := NewStore() metrics := NewStore()
checker := NewChecker(metrics, 2.0) checker := NewChecker(metrics, 2.0)
metr := &api.Metric{ metr := &api.Metric{
Name: "test", Name: "ping",
Peer: test.PeerID1, Peer: test.PeerID1,
Value: "1", Value: "1",
Valid: true, Valid: true,
@ -112,6 +112,10 @@ func TestChecker_Failed(t *testing.T) {
}) })
} }
//////////////////
// HELPER TESTS //
//////////////////
func TestThresholdValues(t *testing.T) { func TestThresholdValues(t *testing.T) {
t.Log("TestThresholdValues is useful for testing out different threshold values") t.Log("TestThresholdValues is useful for testing out different threshold values")
t.Log("It doesn't actually perform any 'tests', so it is skipped by default") t.Log("It doesn't actually perform any 'tests', so it is skipped by default")
@ -133,7 +137,7 @@ func TestThresholdValues(t *testing.T) {
output := false output := false
check := func(i int) bool { check := func(i int) bool {
inputv, dist, phiv, got := checker.failed(test.PeerID1) inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output { if output {
fmt.Println(i) fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv) fmt.Printf("phiv: %f\n", phiv)
@ -193,7 +197,7 @@ func TestThresholdValues(t *testing.T) {
output := false output := false
check := func(i int) bool { check := func(i int) bool {
inputv, dist, phiv, got := checker.failed(test.PeerID1) inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output { if output {
fmt.Println(i) fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv) fmt.Printf("phiv: %f\n", phiv)
@ -254,7 +258,7 @@ func TestThresholdValues(t *testing.T) {
output := false output := false
check := func(i int) bool { check := func(i int) bool {
inputv, dist, phiv, got := checker.failed(test.PeerID1) inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output { if output {
fmt.Println(i) fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv) fmt.Printf("phiv: %f\n", phiv)