Merge pull request #815 from ipfs/fix/false-positive-alerts
fix how accrual fd treats ttls
This commit is contained in:
commit
80cf0ed695
|
@ -1635,11 +1635,6 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
|
|||
t.Skip("Need at least 5 peers")
|
||||
}
|
||||
|
||||
if consensus == "crdt" {
|
||||
t.Log("FIXME when re-alloc changes come through")
|
||||
return
|
||||
}
|
||||
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
for _, c := range clusters {
|
||||
|
|
|
@ -25,6 +25,11 @@ var MaxAlertThreshold = 1
|
|||
// ErrAlertChannelFull is returned if the alert channel is full.
|
||||
var ErrAlertChannelFull = errors.New("alert channel is full")
|
||||
|
||||
// accrualMetricsNum represents the number metrics required for
|
||||
// accrual to function appropriately, and under which we use
|
||||
// TTL to determine whether a peer may have failed.
|
||||
var accrualMetricsNum = 6
|
||||
|
||||
// Checker provides utilities to find expired metrics
|
||||
// for a given peerset and send alerts if it proceeds to do so.
|
||||
type Checker struct {
|
||||
|
@ -58,12 +63,14 @@ func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker
|
|||
// CheckPeers will trigger alerts based on the latest metrics from the given peerset
|
||||
// when they have expired and no alert has been sent before.
|
||||
func (mc *Checker) CheckPeers(peers []peer.ID) error {
|
||||
for _, peer := range peers {
|
||||
for _, metric := range mc.metrics.PeerMetrics(peer) {
|
||||
if mc.FailedMetric(metric.Name, peer) {
|
||||
err := mc.alert(peer, metric.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
for _, name := range mc.metrics.MetricNames() {
|
||||
for _, peer := range peers {
|
||||
for _, metric := range mc.metrics.PeerMetricAll(name, peer) {
|
||||
if mc.FailedMetric(metric.Name, peer) {
|
||||
err := mc.alert(peer, metric.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -104,8 +111,7 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
|
|||
mc.failedPeersMu.Lock()
|
||||
defer mc.failedPeersMu.Unlock()
|
||||
|
||||
_, ok := mc.failedPeers[pid]
|
||||
if !ok {
|
||||
if _, ok := mc.failedPeers[pid]; !ok {
|
||||
mc.failedPeers[pid] = make(map[string]int)
|
||||
}
|
||||
failedMetrics := mc.failedPeers[pid]
|
||||
|
@ -169,39 +175,38 @@ func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]pe
|
|||
}
|
||||
}
|
||||
|
||||
// Failed returns true if a peer has potentially failed.
|
||||
// Peers that are not present in the metrics store will return
|
||||
// as failed.
|
||||
func (mc *Checker) Failed(pid peer.ID) bool {
|
||||
_, _, _, result := mc.failed("ping", pid)
|
||||
return result
|
||||
}
|
||||
|
||||
// FailedMetric is the same as Failed but can use any metric type,
|
||||
// not just ping.
|
||||
// FailedMetric returns if a peer is marked as failed for a particular metric.
|
||||
func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
|
||||
_, _, _, result := mc.failed(metric, pid)
|
||||
return result
|
||||
}
|
||||
|
||||
// failed returns all the values involved in making the decision
|
||||
// as to whether a peer has failed or not. This mainly for debugging
|
||||
// purposes.
|
||||
// as to whether a peer has failed or not. The debugging parameter
|
||||
// enables a more computation heavy path of the function but
|
||||
// allows for insight into the return phi value.
|
||||
func (mc *Checker) failed(metric string, pid peer.ID) (float64, []float64, float64, bool) {
|
||||
latest := mc.metrics.PeerLatest(metric, pid)
|
||||
if latest == nil {
|
||||
return 0.0, nil, 0.0, true
|
||||
}
|
||||
|
||||
// A peer is never failed if the latest metric from is has
|
||||
// not expired or we do not have enough number of metrics
|
||||
// for accrual detection
|
||||
if !latest.Expired() {
|
||||
return 0.0, nil, 0.0, false
|
||||
}
|
||||
// The latest metric has expired
|
||||
|
||||
pmtrs := mc.metrics.PeerMetricAll(metric, pid)
|
||||
// Not enough values for accrual and metric expired. Peer failed.
|
||||
if len(pmtrs) < accrualMetricsNum {
|
||||
return 0.0, nil, 0.0, true
|
||||
}
|
||||
|
||||
v := time.Now().UnixNano() - latest.ReceivedAt
|
||||
dv := mc.metrics.Distribution(metric, pid)
|
||||
// one metric isn't enough to calculate a distribution
|
||||
// alerting/failure detection will fallback to the metric-expiring
|
||||
// method
|
||||
switch {
|
||||
case len(dv) < 5 && !latest.Expired():
|
||||
return float64(v), dv, 0.0, false
|
||||
default:
|
||||
phiv := phi(float64(v), dv)
|
||||
return float64(v), dv, phiv, phiv >= mc.threshold
|
||||
}
|
||||
phiv := phi(float64(v), dv)
|
||||
return float64(v), dv, phiv, phiv >= mc.threshold
|
||||
}
|
||||
|
|
|
@ -18,45 +18,90 @@ import (
|
|||
"gonum.org/v1/plot/vg"
|
||||
)
|
||||
|
||||
func TestCheckPeers(t *testing.T) {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
func TestChecker_CheckPeers(t *testing.T) {
|
||||
t.Run("check with single metric", func(t *testing.T) {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
|
||||
metr := &api.Metric{
|
||||
Name: "ping",
|
||||
Peer: test.PeerID1,
|
||||
Value: "1",
|
||||
Valid: true,
|
||||
}
|
||||
metr.SetTTL(2 * time.Second)
|
||||
metr := &api.Metric{
|
||||
Name: "ping",
|
||||
Peer: test.PeerID1,
|
||||
Value: "1",
|
||||
Valid: true,
|
||||
}
|
||||
metr.SetTTL(2 * time.Second)
|
||||
|
||||
metrics.Add(metr)
|
||||
metrics.Add(metr)
|
||||
|
||||
checker.CheckPeers([]peer.ID{test.PeerID1})
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
t.Error("there should not be an alert yet")
|
||||
default:
|
||||
}
|
||||
checker.CheckPeers([]peer.ID{test.PeerID1})
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
t.Error("there should not be an alert yet")
|
||||
default:
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
err := checker.CheckPeers([]peer.ID{test.PeerID1})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
err := checker.CheckPeers([]peer.ID{test.PeerID1})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
default:
|
||||
t.Error("an alert should have been triggered")
|
||||
}
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
default:
|
||||
t.Error("an alert should have been triggered")
|
||||
}
|
||||
|
||||
checker.CheckPeers([]peer.ID{test.PeerID2})
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
t.Error("there should not be alerts for different peer")
|
||||
default:
|
||||
}
|
||||
checker.CheckPeers([]peer.ID{test.PeerID2})
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
t.Error("there should not be alerts for different peer")
|
||||
default:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestChecker_CheckAll(t *testing.T) {
|
||||
t.Run("checkall with single metric", func(t *testing.T) {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
|
||||
metr := &api.Metric{
|
||||
Name: "ping",
|
||||
Peer: test.PeerID1,
|
||||
Value: "1",
|
||||
Valid: true,
|
||||
}
|
||||
metr.SetTTL(2 * time.Second)
|
||||
|
||||
metrics.Add(metr)
|
||||
|
||||
checker.CheckAll()
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
t.Error("there should not be an alert yet")
|
||||
default:
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
err := checker.CheckAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
default:
|
||||
t.Error("an alert should have been triggered")
|
||||
}
|
||||
|
||||
checker.CheckAll()
|
||||
select {
|
||||
case <-checker.Alerts():
|
||||
t.Error("there should not be alerts for different peer")
|
||||
default:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestChecker_Watch(t *testing.T) {
|
||||
|
@ -67,7 +112,7 @@ func TestChecker_Watch(t *testing.T) {
|
|||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
|
||||
metr := &api.Metric{
|
||||
Name: "test",
|
||||
Name: "ping",
|
||||
Peer: test.PeerID1,
|
||||
Value: "1",
|
||||
Valid: true,
|
||||
|
@ -95,12 +140,12 @@ func TestChecker_Failed(t *testing.T) {
|
|||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 3*time.Millisecond))
|
||||
time.Sleep(time.Duration(2) * time.Millisecond)
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
got := checker.Failed(test.PeerID1)
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 3*time.Millisecond))
|
||||
got := checker.FailedMetric("ping", test.PeerID1)
|
||||
// the magic number 17 represents the point at which
|
||||
// the time between metrics addition has gotten
|
||||
// so large that the probability that the service
|
||||
|
@ -111,6 +156,28 @@ func TestChecker_Failed(t *testing.T) {
|
|||
time.Sleep(time.Duration(i) * time.Millisecond)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ttl must expire before phiv causes failure", func(t *testing.T) {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(context.Background(), metrics, 0.05)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 10*time.Millisecond))
|
||||
time.Sleep(time.Duration(200) * time.Millisecond)
|
||||
}
|
||||
for i, j := 0, 10; i < 8; i, j = i+1, j*2 {
|
||||
time.Sleep(time.Duration(j) * time.Millisecond)
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 10*time.Millisecond))
|
||||
v, _, phiv, got := checker.failed("ping", test.PeerID1)
|
||||
t.Logf("i: %d: j: %d v: %f, phiv: %f, got: %v\n", i, j, v, phiv, got)
|
||||
if i < 7 && got {
|
||||
t.Fatal("threshold should not have been reached already")
|
||||
}
|
||||
if i >= 10 && !got {
|
||||
t.Fatal("threshold should have been reached by now")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestChecker_alert(t *testing.T) {
|
||||
|
@ -122,7 +189,7 @@ func TestChecker_alert(t *testing.T) {
|
|||
checker := NewChecker(ctx, metrics, 2.0)
|
||||
|
||||
metr := &api.Metric{
|
||||
Name: "test",
|
||||
Name: "ping",
|
||||
Peer: test.PeerID1,
|
||||
Value: "1",
|
||||
Valid: true,
|
||||
|
@ -200,13 +267,13 @@ func TestThresholdValues(t *testing.T) {
|
|||
for i := 0; i < 10; i++ {
|
||||
check(i)
|
||||
distTS.record(float64(10))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
|
||||
time.Sleep(time.Duration(10) * time.Millisecond)
|
||||
}
|
||||
// start linearly increasing the interval values
|
||||
for i := 10; i < 100 && !check(i); i++ {
|
||||
distTS.record(float64(i))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
|
||||
time.Sleep(time.Duration(i) * time.Millisecond)
|
||||
}
|
||||
|
||||
|
@ -261,13 +328,13 @@ func TestThresholdValues(t *testing.T) {
|
|||
for i := 0; i < 10; i++ {
|
||||
check(i)
|
||||
distTS.record(float64(8))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
|
||||
time.Sleep(time.Duration(8) * time.Millisecond)
|
||||
}
|
||||
for i := 2; !check(i) && i < 20; i++ {
|
||||
diff := math.Pow(float64(i), 3)
|
||||
distTS.record(diff)
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
|
||||
time.Sleep(time.Duration(diff) * time.Millisecond)
|
||||
}
|
||||
|
||||
|
@ -321,13 +388,13 @@ func TestThresholdValues(t *testing.T) {
|
|||
for i := 0; i < 10; i++ {
|
||||
check(i)
|
||||
distTS.record(float64(i))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
|
||||
time.Sleep(time.Duration(i) * time.Millisecond)
|
||||
}
|
||||
for i := 10; !check(i) && i < 30; i++ {
|
||||
diff := i * 50
|
||||
distTS.record(float64(diff))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
|
||||
time.Sleep(time.Duration(diff) * time.Millisecond)
|
||||
}
|
||||
|
||||
|
@ -348,13 +415,14 @@ func TestThresholdValues(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func makePeerMetric(pid peer.ID, value string) *api.Metric {
|
||||
func makePeerMetric(pid peer.ID, value string, ttl time.Duration) *api.Metric {
|
||||
metr := &api.Metric{
|
||||
Name: "ping",
|
||||
Peer: pid,
|
||||
Value: value,
|
||||
Valid: true,
|
||||
}
|
||||
metr.SetTTL(ttl)
|
||||
return metr
|
||||
}
|
||||
|
||||
|
|
|
@ -129,6 +129,25 @@ func (mtrs *Store) PeerMetrics(pid peer.ID) []*api.Metric {
|
|||
return result
|
||||
}
|
||||
|
||||
// PeerMetricAll returns all of a particular metrics for a
|
||||
// particular peer.
|
||||
func (mtrs *Store) PeerMetricAll(name string, pid peer.ID) []*api.Metric {
|
||||
mtrs.mux.RLock()
|
||||
defer mtrs.mux.RUnlock()
|
||||
|
||||
byPeer, ok := mtrs.byName[name]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
window, ok := byPeer[pid]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
ms := window.All()
|
||||
return ms
|
||||
}
|
||||
|
||||
// PeerLatest returns the latest of a particular metric for a
|
||||
// particular peer. It may return an expired metric.
|
||||
func (mtrs *Store) PeerLatest(name string, pid peer.ID) *api.Metric {
|
||||
|
@ -170,3 +189,15 @@ func (mtrs *Store) Distribution(name string, pid peer.ID) []float64 {
|
|||
|
||||
return window.Distribution()
|
||||
}
|
||||
|
||||
// MetricNames returns all the known metric names
|
||||
func (mtrs *Store) MetricNames() []string {
|
||||
mtrs.mux.RLock()
|
||||
defer mtrs.mux.RUnlock()
|
||||
|
||||
list := make([]string, 0, len(mtrs.byName))
|
||||
for k := range mtrs.byName {
|
||||
list = append(list, k)
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user