alerting for peers stops after one alert

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
Adrian Lanzafame 2019-06-10 14:41:39 +10:00 committed by Hector Sanjuan
parent 6d7daee875
commit 7459917275
4 changed files with 90 additions and 4 deletions

View File

@ -3,6 +3,7 @@ package metrics
import (
"context"
"errors"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
@ -26,6 +27,9 @@ type Checker struct {
alertCh chan *api.Alert
metrics *Store
threshold float64
failedPeersMu sync.Mutex
failedPeers map[peer.ID]bool
}
// NewChecker creates a Checker using the given
@ -36,10 +40,11 @@ type Checker struct {
// A value between 2.0 and 4.0 is suggested for the threshold.
func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker {
return &Checker{
ctx: ctx,
alertCh: make(chan *api.Alert, AlertChannelCap),
metrics: metrics,
threshold: threshold,
ctx: ctx,
alertCh: make(chan *api.Alert, AlertChannelCap),
metrics: metrics,
threshold: threshold,
failedPeers: make(map[peer.ID]bool),
}
}
@ -89,6 +94,15 @@ func (mc *Checker) alertIfExpired(metric *api.Metric) error {
}
func (mc *Checker) alert(pid peer.ID, metricName string) error {
mc.failedPeersMu.Lock()
if mc.failedPeers[pid] {
mc.metrics.RemovePeer(pid)
delete(mc.failedPeers, pid)
return nil
}
mc.failedPeers[pid] = true
mc.failedPeersMu.Unlock()
alrt := &api.Alert{
Peer: pid,
MetricName: metricName,

View File

@ -112,6 +112,48 @@ func TestChecker_Failed(t *testing.T) {
})
}
func TestChecker_alert(t *testing.T) {
t.Run("remove peer from store after alert", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
metrics := NewStore()
checker := NewChecker(ctx, metrics, 2.0)
metr := &api.Metric{
Name: "test",
Peer: test.PeerID1,
Value: "1",
Valid: true,
}
metr.SetTTL(100 * time.Millisecond)
metrics.Add(metr)
peersF := func(context.Context) ([]peer.ID, error) {
return []peer.ID{test.PeerID1}, nil
}
go checker.Watch(ctx, peersF, 200*time.Millisecond)
var alertCount int
for {
select {
case a := <-checker.Alerts():
t.Log("received alert:", a)
alertCount++
if alertCount > 1 {
t.Fatal("there should no more than one alert")
}
case <-ctx.Done():
if alertCount < 1 {
t.Fatal("should have received an alert")
}
return
}
}
})
}
//////////////////
// HELPER TESTS //
//////////////////

View File

@ -47,6 +47,15 @@ func (mtrs *Store) Add(m *api.Metric) {
window.Add(m)
}
// RemovePeer removes all metrics related to a peer from the Store.
func (mtrs *Store) RemovePeer(pid peer.ID) {
mtrs.mux.Lock()
for _, mtrs := range mtrs.byName {
delete(mtrs, pid)
}
mtrs.mux.Unlock()
}
// LatestValid returns all the last known valid metrics of a given type. A metric
// is valid if it has not expired.
func (mtrs *Store) LatestValid(name string) []*api.Metric {

View File

@ -32,3 +32,24 @@ func TestStoreLatest(t *testing.T) {
t.Error("expected no metrics")
}
}
func TestRemovePeer(t *testing.T) {
store := NewStore()
metr := &api.Metric{
Name: "test",
Peer: test.PeerID1,
Value: "1",
Valid: true,
}
metr.SetTTL(200 * time.Millisecond)
store.Add(metr)
if pmtrs := store.PeerMetrics(test.PeerID1); len(pmtrs) <= 0 {
t.Errorf("there should be one peer metric; got: %v", pmtrs)
}
store.RemovePeer(test.PeerID1)
if pmtrs := store.PeerMetrics(test.PeerID1); len(pmtrs) > 0 {
t.Errorf("there should be no peer metrics; got: %v", pmtrs)
}
}