fix check failed

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
Adrian Lanzafame 2019-06-17 19:37:58 +10:00 committed by Hector Sanjuan
parent 5e09da9d63
commit 27295c10ac
3 changed files with 62 additions and 43 deletions

View File

@ -1635,11 +1635,6 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
t.Skip("Need at least 5 peers")
}
if consensus == "crdt" {
t.Log("FIXME when re-alloc changes come through")
return
}
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {

View File

@ -3,7 +3,6 @@ package metrics
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -60,7 +59,7 @@ func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker
// when they have expired and no alert has been sent before.
func (mc *Checker) CheckPeers(peers []peer.ID) error {
for _, peer := range peers {
for _, metric := range mc.metrics.PeerMetrics(peer) {
for _, metric := range mc.metrics.PeerMetricAll("ping", peer) {
if mc.FailedMetric(metric.Name, peer) {
err := mc.alert(peer, metric.Name)
if err != nil {
@ -76,7 +75,7 @@ func (mc *Checker) CheckPeers(peers []peer.ID) error {
// and no alert has been sent before.
func (mc *Checker) CheckAll() error {
for _, metric := range mc.metrics.AllMetrics() {
if mc.FailedMetric(metric.Name, metric.Peer) {
if metric.Name == "ping" && mc.Failed(metric.Peer) {
err := mc.alert(metric.Peer, metric.Name)
if err != nil {
return err
@ -105,8 +104,7 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
mc.failedPeersMu.Lock()
defer mc.failedPeersMu.Unlock()
_, ok := mc.failedPeers[pid]
if !ok {
if _, ok := mc.failedPeers[pid]; !ok {
mc.failedPeers[pid] = make(map[string]int)
}
failedMetrics := mc.failedPeers[pid]
@ -174,14 +172,14 @@ func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]pe
// Peers that are not present in the metrics store will return
// as failed.
func (mc *Checker) Failed(pid peer.ID) bool {
_, _, _, result := mc.failed("ping", pid, false)
_, _, _, result := mc.failed("ping", pid)
return result
}
// FailedMetric is the same as Failed but can use any metric type,
// not just ping.
func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
_, _, _, result := mc.failed(metric, pid, false)
_, _, _, result := mc.failed(metric, pid)
return result
}
@ -189,7 +187,11 @@ func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
// as to whether a peer has failed or not. The debugging parameter
// enables a more computation heavy path of the function but
// allows for insight into the return phi value.
func (mc *Checker) failed(metric string, pid peer.ID, debugging bool) (float64, []float64, float64, bool) {
func (mc *Checker) failed(metric string, pid peer.ID) (float64, []float64, float64, bool) {
// accrualMetricsNum represents the number metrics required for
// accrual to function appropriately, and under which we use
// TTL to determine whether a peer may have failed.
accrualMetricsNum := 6
latest := mc.metrics.PeerLatest(metric, pid)
if latest == nil {
return 0.0, nil, 0.0, true
@ -199,8 +201,7 @@ func (mc *Checker) failed(metric string, pid peer.ID, debugging bool) (float64,
// where multiple metrics closer together skew the distribution
// to be less than that of the TTL value of the metrics
pmtrs := mc.metrics.PeerMetricAll(metric, pid)
var withinTTL bool
if len(pmtrs) == 1 {
if len(pmtrs) < accrualMetricsNum {
// one metric isn't enough to consider a peer failed
// unless it is expired
if pmtrs[0].Expired() {
@ -208,31 +209,11 @@ func (mc *Checker) failed(metric string, pid peer.ID, debugging bool) (float64,
}
return 0.0, nil, 0.0, false
}
if len(pmtrs) >= 2 {
currMetricExpiry := time.Unix(0, pmtrs[1].Expire)
prevMetricReceived := time.Unix(0, pmtrs[0].ReceivedAt)
// accrual failure detection should only kick if the
// the ttl has expired
withinTTL = prevMetricReceived.Before(currMetricExpiry)
if debugging {
fmt.Printf("validTTL: %v\texp: %v,\tra: %v\n", withinTTL, currMetricExpiry, prevMetricReceived)
}
// shortcut the function if not debugging
if withinTTL && !debugging {
return 0.0, nil, 0.0, false
}
}
v := time.Now().UnixNano() - latest.ReceivedAt
dv := mc.metrics.Distribution(metric, pid)
// one metric isn't enough to calculate a distribution
// alerting/failure detection will fallback to the metric-expiring
// method
switch {
case withinTTL && debugging:
phiv := phi(float64(v), dv)
return float64(v), dv, phiv, false
case len(dv) < 5 && !latest.Expired():
case len(dv) < accrualMetricsNum-1 && !latest.Expired():
return float64(v), dv, 0.0, false
default:
phiv := phi(float64(v), dv)

View File

@ -18,7 +18,7 @@ import (
"gonum.org/v1/plot/vg"
)
func TestCheckPeers(t *testing.T) {
func TestChecker_CheckPeers(t *testing.T) {
t.Run("check with single metric", func(t *testing.T) {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 2.0)
@ -61,6 +61,49 @@ func TestCheckPeers(t *testing.T) {
})
}
func TestChecker_CheckAll(t *testing.T) {
t.Run("checkall with single metric", func(t *testing.T) {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 2.0)
metr := &api.Metric{
Name: "ping",
Peer: test.PeerID1,
Value: "1",
Valid: true,
}
metr.SetTTL(2 * time.Second)
metrics.Add(metr)
checker.CheckAll()
select {
case <-checker.Alerts():
t.Error("there should not be an alert yet")
default:
}
time.Sleep(3 * time.Second)
err := checker.CheckAll()
if err != nil {
t.Fatal(err)
}
select {
case <-checker.Alerts():
default:
t.Error("an alert should have been triggered")
}
checker.CheckAll()
select {
case <-checker.Alerts():
t.Error("there should not be alerts for different peer")
default:
}
})
}
func TestChecker_Watch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@ -69,7 +112,7 @@ func TestChecker_Watch(t *testing.T) {
checker := NewChecker(context.Background(), metrics, 2.0)
metr := &api.Metric{
Name: "test",
Name: "ping",
Peer: test.PeerID1,
Value: "1",
Valid: true,
@ -127,7 +170,7 @@ func TestChecker_Failed(t *testing.T) {
for i, j := 0, 100; i < 7; i, j = i+1, j*2 {
time.Sleep(time.Duration(j) * time.Millisecond)
metrics.Add(makePeerMetric(test.PeerID1, "1", 5000*time.Millisecond))
v, _, phiv, got := checker.failed("ping", test.PeerID1, false)
v, _, phiv, got := checker.failed("ping", test.PeerID1)
t.Logf("i: %d: j: %d v: %f, phiv: %f, got: %v\n", i, j, v, phiv, got)
if i > 5 && !got {
t.Fatal("threshold should have been reached by now")
@ -145,7 +188,7 @@ func TestChecker_alert(t *testing.T) {
checker := NewChecker(ctx, metrics, 2.0)
metr := &api.Metric{
Name: "test",
Name: "ping",
Peer: test.PeerID1,
Value: "1",
Valid: true,
@ -203,7 +246,7 @@ func TestThresholdValues(t *testing.T) {
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1, true)
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
@ -263,7 +306,7 @@ func TestThresholdValues(t *testing.T) {
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1, true)
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
@ -324,7 +367,7 @@ func TestThresholdValues(t *testing.T) {
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1, true)
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)