diff --git a/ipfscluster.go b/ipfscluster.go index 7caf7162..e5553edb 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -167,9 +167,9 @@ type PeerMonitor interface { // PublishMetric sends a metric to the rest of the peers. // How to send it, and to who, is to be decided by the implementation. PublishMetric(api.Metric) error - // LastMetrics returns a map with the latest metrics of matching name + // LastestMetrics returns a map with the latest metrics of matching name // for the current cluster peers. - LastMetrics(name string) []api.Metric + LastestMetrics(name string) []api.Metric // Alerts delivers alerts generated when this peer monitor detects // a problem (i.e. metrics not arriving as expected). Alerts can be used // to trigger self-healing measures or re-pinnings of content. diff --git a/monitor/basic/peer_monitor.go b/monitor/basic/peer_monitor.go index 3f3e8bac..a436f089 100644 --- a/monitor/basic/peer_monitor.go +++ b/monitor/basic/peer_monitor.go @@ -9,10 +9,9 @@ import ( "fmt" "strings" "sync" - "time" "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/monitor/util" + "github.com/ipfs/ipfs-cluster/monitor/metrics" "github.com/ipfs/ipfs-cluster/rpcutil" rpc "github.com/hsanjuan/go-libp2p-gorpc" @@ -22,12 +21,6 @@ import ( var logger = logging.Logger("monitor") -// AlertChannelCap specifies how much buffer the alerts channel has. -var AlertChannelCap = 256 - -// DefaultWindowCap sets the amount of metrics to store per peer. -var DefaultWindowCap = 25 - // Monitor is a component in charge of monitoring peers, logging // metrics and detecting failures type Monitor struct { @@ -36,12 +29,8 @@ type Monitor struct { rpcClient *rpc.Client rpcReady chan struct{} - metrics util.Metrics - metricsMux sync.RWMutex - windowCap int - - checker *util.MetricsChecker - alerts chan api.Alert + metrics *metrics.Store + checker *metrics.Checker config *Config @@ -50,35 +39,26 @@ type Monitor struct { wg sync.WaitGroup } -// NewMonitor creates a new monitor. It receives the window capacity -// (how many metrics to keep for each peer and type of metric) and the -// monitoringInterval (interval between the checks that produce alerts) -// as parameters +// NewMonitor creates a new monitor using the given config. func NewMonitor(cfg *Config) (*Monitor, error) { err := cfg.Validate() if err != nil { return nil, err } - if DefaultWindowCap <= 0 { - panic("windowCap too small") - } - ctx, cancel := context.WithCancel(context.Background()) - alertCh := make(chan api.Alert, AlertChannelCap) - metrics := make(util.Metrics) + mtrs := metrics.NewStore() + checker := metrics.NewChecker(mtrs) mon := &Monitor{ ctx: ctx, cancel: cancel, rpcReady: make(chan struct{}, 1), - metrics: metrics, - windowCap: DefaultWindowCap, - checker: util.NewMetricsChecker(metrics, alertCh), - alerts: alertCh, - config: cfg, + metrics: mtrs, + checker: checker, + config: cfg, } go mon.run() @@ -88,7 +68,7 @@ func NewMonitor(cfg *Config) (*Monitor, error) { func (mon *Monitor) run() { select { case <-mon.rpcReady: - go mon.monitor() + go mon.checker.Watch(mon.ctx, mon.getPeers, mon.config.CheckInterval) case <-mon.ctx.Done(): } } @@ -120,25 +100,8 @@ func (mon *Monitor) Shutdown() error { // LogMetric stores a metric so it can later be retrieved. func (mon *Monitor) LogMetric(m api.Metric) error { - mon.metricsMux.Lock() - defer mon.metricsMux.Unlock() - name := m.Name - peer := m.Peer - mbyp, ok := mon.metrics[name] - if !ok { - mbyp = make(util.PeerMetrics) - mon.metrics[name] = mbyp - } - window, ok := mbyp[peer] - if !ok { - // We always lock the outer map, so we can use unsafe - // MetricsWindow. - window = util.NewMetricsWindow(mon.windowCap, false) - mbyp[peer] = window - } - - logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire) - window.Add(m) + mon.metrics.Add(m) + logger.Debugf("basic monitor logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire) return nil } @@ -151,7 +114,7 @@ func (mon *Monitor) PublishMetric(m api.Metric) error { peers, err := mon.getPeers() if err != nil { - logger.Error("PublishPeers could not list peers:", err) + return err } ctxs, cancels := rpcutil.CtxsWithTimeout(mon.ctx, len(peers), m.GetTTL()/2) @@ -204,7 +167,6 @@ func (mon *Monitor) PublishMetric(m api.Metric) error { // getPeers gets the current list of peers from the consensus component func (mon *Monitor) getPeers() ([]peer.ID, error) { - // Ger current list of peers var peers []peer.ID err := mon.rpcClient.Call( "", @@ -213,98 +175,28 @@ func (mon *Monitor) getPeers() ([]peer.ID, error) { struct{}{}, &peers, ) + if err != nil { + logger.Error(err) + } return peers, err } -// func (mon *Monitor) getLastMetric(name string, p peer.ID) api.Metric { -// mon.metricsMux.RLock() -// defer mon.metricsMux.RUnlock() +// LatestMetrics returns last known VALID metrics of a given type. A metric +// is only valid if it has not expired and belongs to a current cluster peers. +func (mon *Monitor) LatestMetrics(name string) []api.Metric { + latest := mon.metrics.Latest(name) -// emptyMetric := api.Metric{ -// Name: name, -// Peer: p, -// Valid: false, -// } - -// mbyp, ok := mon.metrics[name] -// if !ok { -// return emptyMetric -// } - -// window, ok := mbyp[p] -// if !ok { -// return emptyMetric -// } -// metric, err := window.Latest() -// if err != nil { -// return emptyMetric -// } -// return metric -// } - -// LastMetrics returns last known VALID metrics of a given type. A metric -// is only valid if it has not expired and belongs to a current cluster peer. -func (mon *Monitor) LastMetrics(name string) []api.Metric { + // Make sure we only return metrics in the current peerset peers, err := mon.getPeers() if err != nil { - logger.Errorf("LastMetrics could not list peers: %s", err) return []api.Metric{} } - mon.metricsMux.RLock() - defer mon.metricsMux.RUnlock() - - mbyp, ok := mon.metrics[name] - if !ok { - logger.Warningf("LastMetrics: No %s metrics", name) - return []api.Metric{} - } - - metrics := make([]api.Metric, 0, len(mbyp)) - - // only show metrics for current set of peers - for _, peer := range peers { - window, ok := mbyp[peer] - if !ok { - continue - } - last, err := window.Latest() - if err != nil || last.Discard() { - logger.Warningf("no valid last metric for peer: %+v", last) - continue - } - metrics = append(metrics, last) - - } - return metrics + return metrics.PeersetFilter(latest, peers) } // Alerts returns a channel on which alerts are sent when the // monitor detects a failure. func (mon *Monitor) Alerts() <-chan api.Alert { - return mon.alerts -} - -// monitor creates a ticker which fetches current -// cluster peers and checks that the last metric for a peer -// has not expired. -func (mon *Monitor) monitor() { - ticker := time.NewTicker(mon.config.CheckInterval) - for { - select { - case <-ticker.C: - logger.Debug("monitoring tick") - peers, err := mon.getPeers() - if err != nil { - logger.Error(err) - break - } - mon.metricsMux.RLock() - mon.checker.CheckMetrics(peers) - mon.metricsMux.RUnlock() - case <-mon.ctx.Done(): - ticker.Stop() - return - } - } + return mon.checker.Alerts() } diff --git a/monitor/basic/peer_monitor_test.go b/monitor/basic/peer_monitor_test.go index 47036169..5912eaa1 100644 --- a/monitor/basic/peer_monitor_test.go +++ b/monitor/basic/peer_monitor_test.go @@ -107,7 +107,7 @@ func TestLogMetricConcurrent(t *testing.T) { last := time.Now().Add(-500 * time.Millisecond) for i := 0; i <= 20; i++ { - lastMtrcs := pm.LastMetrics("test") + lastMtrcs := pm.LatestMetrics("test") if len(lastMtrcs) != 1 { t.Error("no valid metrics", len(lastMtrcs), i) @@ -146,18 +146,18 @@ func TestPeerMonitorLogMetric(t *testing.T) { pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) - lastMetrics := pm.LastMetrics("testbad") - if len(lastMetrics) != 0 { - t.Logf("%+v", lastMetrics) + latestMetrics := pm.LatestMetrics("testbad") + if len(latestMetrics) != 0 { + t.Logf("%+v", latestMetrics) t.Error("metrics should be empty") } - lastMetrics = pm.LastMetrics("test") - if len(lastMetrics) != 3 { + latestMetrics = pm.LatestMetrics("test") + if len(latestMetrics) != 3 { t.Error("metrics should correspond to 3 hosts") } - for _, v := range lastMetrics { + for _, v := range latestMetrics { switch v.Peer { case test.TestPeerID1: if v.Value != "0" { @@ -176,11 +176,11 @@ func TestPeerMonitorLogMetric(t *testing.T) { } } - lastMetrics = pm.LastMetrics("test2") - if len(lastMetrics) != 1 { + latestMetrics = pm.LatestMetrics("test2") + if len(latestMetrics) != 1 { t.Fatal("should only be one metric") } - if lastMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) { + if latestMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) { t.Error("metric is not last") } } diff --git a/monitor/metrics/checker.go b/monitor/metrics/checker.go new file mode 100644 index 00000000..a6e24ecf --- /dev/null +++ b/monitor/metrics/checker.go @@ -0,0 +1,87 @@ +package metrics + +import ( + "context" + "errors" + "time" + + "github.com/ipfs/ipfs-cluster/api" + + peer "github.com/libp2p/go-libp2p-peer" +) + +// AlertChannelCap specifies how much buffer the alerts channel has. +var AlertChannelCap = 256 + +// ErrAlertChannelFull is returned if the alert channel is full. +var ErrAlertChannelFull = errors.New("alert channel is full") + +// Checker provides utilities to find expired metrics +// for a given peerset and send alerts if it proceeds to do so. +type Checker struct { + alertCh chan api.Alert + metrics *Store +} + +// NewChecker creates a Checker using the given +// MetricsStore. +func NewChecker(metrics *Store) *Checker { + return &Checker{ + alertCh: make(chan api.Alert, AlertChannelCap), + metrics: metrics, + } +} + +// CheckPeers will trigger alerts for expired metrics belonging to the +// given peerset. +func (mc *Checker) CheckPeers(peers []peer.ID) error { + for _, peer := range peers { + for _, metric := range mc.metrics.PeerMetrics(peer) { + if metric.Valid && metric.Expired() { + err := mc.alert(metric.Peer, metric.Name) + if err != nil { + return err + } + } + } + } + return nil +} + +func (mc *Checker) alert(pid peer.ID, metricName string) error { + alrt := api.Alert{ + Peer: pid, + MetricName: metricName, + } + select { + case mc.alertCh <- alrt: + default: + return ErrAlertChannelFull + } + return nil +} + +// Alerts returns a channel which gets notified by CheckPeers. +func (mc *Checker) Alerts() <-chan api.Alert { + return mc.alertCh +} + +// Watch will trigger regular CheckPeers on the given interval. It will call +// peersF to obtain a peerset. It can be stopped by cancelling the context. +// Usually you want to launch this in a goroutine. +func (mc *Checker) Watch(ctx context.Context, peersF func() ([]peer.ID, error), interval time.Duration) { + ticker := time.NewTicker(interval) + for { + select { + case <-ticker.C: + peers, err := peersF() + if err != nil { + continue + } + mc.CheckPeers(peers) + case <-ctx.Done(): + ticker.Stop() + return + } + } +} diff --git a/monitor/metrics/checker_test.go b/monitor/metrics/checker_test.go new file mode 100644 index 00000000..45e41841 --- /dev/null +++ b/monitor/metrics/checker_test.go @@ -0,0 +1,83 @@ +package metrics + +import ( + "context" + "testing" + "time" + + peer "github.com/libp2p/go-libp2p-peer" + + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/test" +) + +func TestChecker(t *testing.T) { + metrics := NewStore() + checker := NewChecker(metrics) + + metr := api.Metric{ + Name: "test", + Peer: test.TestPeerID1, + Value: "1", + Valid: true, + } + metr.SetTTL(2 * time.Second) + + metrics.Add(metr) + + checker.CheckPeers([]peer.ID{test.TestPeerID1}) + select { + case <-checker.Alerts(): + t.Error("there should not be an alert yet") + default: + } + + time.Sleep(3 * time.Second) + err := checker.CheckPeers([]peer.ID{test.TestPeerID1}) + if err != nil { + t.Fatal(err) + } + + select { + case <-checker.Alerts(): + default: + t.Error("an alert should have been triggered") + } + + checker.CheckPeers([]peer.ID{test.TestPeerID2}) + select { + case <-checker.Alerts(): + t.Error("there should not be alerts for different peer") + default: + } +} + +func TestCheckerWatch(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + metrics := NewStore() + checker := NewChecker(metrics) + + metr := api.Metric{ + Name: "test", + Peer: test.TestPeerID1, + Value: "1", + Valid: true, + } + metr.SetTTL(100 * time.Millisecond) + metrics.Add(metr) + + peersF := func() ([]peer.ID, error) { + return []peer.ID{test.TestPeerID1}, nil + } + + go checker.Watch(ctx, peersF, 200*time.Millisecond) + + select { + case a := <-checker.Alerts(): + t.Log("received alert:", a) + case <-ctx.Done(): + t.Fatal("should have received an alert") + } +} diff --git a/monitor/metrics/store.go b/monitor/metrics/store.go new file mode 100644 index 00000000..79c2dd95 --- /dev/null +++ b/monitor/metrics/store.go @@ -0,0 +1,92 @@ +package metrics + +import ( + "sync" + + "github.com/ipfs/ipfs-cluster/api" + + peer "github.com/libp2p/go-libp2p-peer" +) + +// PeerMetrics maps a peer IDs to a metrics window. +type PeerMetrics map[peer.ID]*Window + +// Store can be used to store and access metrics. +type Store struct { + mux sync.RWMutex + byName map[string]PeerMetrics +} + +// NewStore can be used to create a Store. +func NewStore() *Store { + return &Store{ + byName: make(map[string]PeerMetrics), + } +} + +// Add inserts a new metric in Metrics. +func (mtrs *Store) Add(m api.Metric) { + mtrs.mux.Lock() + defer mtrs.mux.Unlock() + + name := m.Name + peer := m.Peer + mbyp, ok := mtrs.byName[name] + if !ok { + mbyp = make(PeerMetrics) + mtrs.byName[name] = mbyp + } + window, ok := mbyp[peer] + if !ok { + // We always lock the outer map, so we can use unsafe + // Window. + window = NewWindow(DefaultWindowCap, false) + mbyp[peer] = window + } + + window.Add(m) +} + +// Latest returns all the last known valid metrics. A metric is valid +// if it has not expired. +func (mtrs *Store) Latest(name string) []api.Metric { + mtrs.mux.RLock() + defer mtrs.mux.RUnlock() + + byPeer, ok := mtrs.byName[name] + if !ok { + return []api.Metric{} + } + + metrics := make([]api.Metric, 0, len(byPeer)) + for _, window := range byPeer { + m, err := window.Latest() + if err != nil || m.Discard() { + continue + } + metrics = append(metrics, m) + } + return metrics +} + +// PeerMetrics returns the latest metrics for a given peer ID for +// all known metrics types. It may return expired metrics. +func (mtrs *Store) PeerMetrics(peer peer.ID) []api.Metric { + mtrs.mux.RLock() + defer mtrs.mux.RUnlock() + + result := make([]api.Metric, 0) + + for _, byPeer := range mtrs.byName { + window, ok := byPeer[peer] + if !ok { + continue + } + metric, err := window.Latest() + if err != nil { + continue + } + result = append(result, metric) + } + return result +} diff --git a/monitor/metrics/store_test.go b/monitor/metrics/store_test.go new file mode 100644 index 00000000..660240a8 --- /dev/null +++ b/monitor/metrics/store_test.go @@ -0,0 +1,34 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/test" +) + +func TestStoreLatest(t *testing.T) { + store := NewStore() + + metr := api.Metric{ + Name: "test", + Peer: test.TestPeerID1, + Value: "1", + Valid: true, + } + metr.SetTTL(200 * time.Millisecond) + store.Add(metr) + + latest := store.Latest("test") + if len(latest) != 1 { + t.Error("expected 1 metric") + } + + time.Sleep(220 * time.Millisecond) + + latest = store.Latest("test") + if len(latest) != 0 { + t.Error("expected no metrics") + } +} diff --git a/monitor/metrics/util.go b/monitor/metrics/util.go new file mode 100644 index 00000000..17527027 --- /dev/null +++ b/monitor/metrics/util.go @@ -0,0 +1,28 @@ +package metrics + +import ( + "github.com/ipfs/ipfs-cluster/api" + + peer "github.com/libp2p/go-libp2p-peer" +) + +// PeersetFilter removes all metrics not belonging to the given +// peerset +func PeersetFilter(metrics []api.Metric, peerset []peer.ID) []api.Metric { + peerMap := make(map[peer.ID]struct{}) + for _, peer := range peerset { + peerMap[peer] = struct{}{} + } + + filtered := make([]api.Metric, 0, len(metrics)) + + for _, metric := range metrics { + _, ok := peerMap[metric.Peer] + if !ok { + continue + } + filtered = append(filtered, metric) + } + + return filtered +} diff --git a/monitor/util/metrics_window.go b/monitor/metrics/window.go similarity index 68% rename from monitor/util/metrics_window.go rename to monitor/metrics/window.go index 73014838..7fae1175 100644 --- a/monitor/util/metrics_window.go +++ b/monitor/metrics/window.go @@ -1,5 +1,7 @@ -// Package util provides common functionality for monitoring components. -package util +// Package metrics provides common functionality for working with metrics, +// particulary useful for monitoring components. It includes types to store, +// check and filter metrics. +package metrics import ( "errors" @@ -8,11 +10,14 @@ import ( "github.com/ipfs/ipfs-cluster/api" ) -// ErrNoMetrics is returned when there are no metrics in a MetricsWindow. +// DefaultWindowCap sets the amount of metrics to store per peer. +var DefaultWindowCap = 25 + +// ErrNoMetrics is returned when there are no metrics in a Window. var ErrNoMetrics = errors.New("no metrics have been added to this window") -// MetricsWindow implements a circular queue to store metrics. -type MetricsWindow struct { +// Window implements a circular queue to store metrics. +type Window struct { last int safe bool @@ -20,12 +25,16 @@ type MetricsWindow struct { window []api.Metric } -// NewMetricsWindow creates an instance with the given +// NewWindow creates an instance with the given // window capacity. The safe indicates whether we use a lock // for concurrent operations. -func NewMetricsWindow(windowCap int, safe bool) *MetricsWindow { +func NewWindow(windowCap int, safe bool) *Window { + if windowCap <= 0 { + panic("invalid windowCap") + } + w := make([]api.Metric, 0, windowCap) - return &MetricsWindow{ + return &Window{ last: 0, safe: safe, window: w, @@ -35,7 +44,7 @@ func NewMetricsWindow(windowCap int, safe bool) *MetricsWindow { // Add adds a new metric to the window. If the window capacity // has been reached, the oldest metric (by the time it was added), // will be discarded. -func (mw *MetricsWindow) Add(m api.Metric) { +func (mw *Window) Add(m api.Metric) { if mw.safe { mw.windowLock.Lock() defer mw.windowLock.Unlock() @@ -54,7 +63,7 @@ func (mw *MetricsWindow) Add(m api.Metric) { // Latest returns the last metric added. It returns an error // if no metrics were added. -func (mw *MetricsWindow) Latest() (api.Metric, error) { +func (mw *Window) Latest() (api.Metric, error) { if mw.safe { mw.windowLock.Lock() defer mw.windowLock.Unlock() @@ -68,7 +77,7 @@ func (mw *MetricsWindow) Latest() (api.Metric, error) { // All returns all the metrics in the window, in the inverse order // they were Added. That is, result[0] will be the last added // metric. -func (mw *MetricsWindow) All() []api.Metric { +func (mw *Window) All() []api.Metric { if mw.safe { mw.windowLock.Lock() defer mw.windowLock.Unlock() diff --git a/monitor/util/metrics_window_test.go b/monitor/metrics/window_test.go similarity index 95% rename from monitor/util/metrics_window_test.go rename to monitor/metrics/window_test.go index 6b7c4860..349b60f5 100644 --- a/monitor/util/metrics_window_test.go +++ b/monitor/metrics/window_test.go @@ -1,4 +1,4 @@ -package util +package metrics import ( "testing" @@ -8,7 +8,7 @@ import ( ) func TestMetricsWindow(t *testing.T) { - mw := NewMetricsWindow(4, true) + mw := NewWindow(4, true) _, err := mw.Latest() if err != ErrNoMetrics { diff --git a/monitor/pubsubmon/pubsubmon.go b/monitor/pubsubmon/pubsubmon.go index 8bc277a4..7a4bcd47 100644 --- a/monitor/pubsubmon/pubsubmon.go +++ b/monitor/pubsubmon/pubsubmon.go @@ -6,10 +6,9 @@ import ( "bytes" "context" "sync" - "time" "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/monitor/util" + "github.com/ipfs/ipfs-cluster/monitor/metrics" rpc "github.com/hsanjuan/go-libp2p-gorpc" logging "github.com/ipfs/go-log" @@ -22,13 +21,7 @@ import ( var logger = logging.Logger("monitor") // PubsubTopic specifies the topic used to publish Cluster metrics. -var PubsubTopic = "pubsubmon" - -// AlertChannelCap specifies how much buffer the alerts channel has. -var AlertChannelCap = 256 - -// DefaultWindowCap sets the amount of metrics to store per peer. -var DefaultWindowCap = 25 +var PubsubTopic = "monitor.metrics" var msgpackHandle = msgpack.DefaultMsgpackHandle() @@ -44,12 +37,8 @@ type Monitor struct { pubsub *floodsub.PubSub subscription *floodsub.Subscription - metrics util.Metrics - metricsMux sync.RWMutex - windowCap int - - checker *util.MetricsChecker - alerts chan api.Alert + metrics *metrics.Store + checker *metrics.Checker config *Config @@ -58,24 +47,17 @@ type Monitor struct { wg sync.WaitGroup } -// New creates a new monitor. It receives the window capacity -// (how many metrics to keep for each peer and type of metric) and the -// monitoringInterval (interval between the checks that produce alerts) -// as parameters +// New creates a new PubSub monitor, using the given host and config. func New(h host.Host, cfg *Config) (*Monitor, error) { err := cfg.Validate() if err != nil { return nil, err } - if DefaultWindowCap <= 0 { - panic("windowCap too small") - } - ctx, cancel := context.WithCancel(context.Background()) - alertCh := make(chan api.Alert, AlertChannelCap) - metrics := make(util.Metrics) + mtrs := metrics.NewStore() + checker := metrics.NewChecker(mtrs) pubsub, err := floodsub.NewFloodSub(ctx, h) if err != nil { @@ -98,11 +80,9 @@ func New(h host.Host, cfg *Config) (*Monitor, error) { pubsub: pubsub, subscription: subscription, - metrics: metrics, - windowCap: DefaultWindowCap, - checker: util.NewMetricsChecker(metrics, alertCh), - alerts: alertCh, - config: cfg, + metrics: mtrs, + checker: checker, + config: cfg, } go mon.run() @@ -112,8 +92,8 @@ func New(h host.Host, cfg *Config) (*Monitor, error) { func (mon *Monitor) run() { select { case <-mon.rpcReady: - go mon.monitor() go mon.logFromPubsub() + go mon.checker.Watch(mon.ctx, mon.getPeers, mon.config.CheckInterval) case <-mon.ctx.Done(): } } @@ -174,8 +154,6 @@ func (mon *Monitor) Shutdown() error { logger.Info("stopping Monitor") close(mon.rpcReady) - // not necessary as this just removes subscription - // mon.subscription.Cancel() mon.cancel() mon.wg.Wait() @@ -185,25 +163,8 @@ func (mon *Monitor) Shutdown() error { // LogMetric stores a metric so it can later be retrieved. func (mon *Monitor) LogMetric(m api.Metric) error { - mon.metricsMux.Lock() - defer mon.metricsMux.Unlock() - name := m.Name - peer := m.Peer - mbyp, ok := mon.metrics[name] - if !ok { - mbyp = make(util.PeerMetrics) - mon.metrics[name] = mbyp - } - window, ok := mbyp[peer] - if !ok { - // We always lock the outer map, so we can use unsafe - // MetricsWindow. - window = util.NewMetricsWindow(mon.windowCap, false) - mbyp[peer] = window - } - - logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire) - window.Add(m) + mon.metrics.Add(m) + logger.Debugf("pubsub mon logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire) return nil } @@ -240,7 +201,6 @@ func (mon *Monitor) PublishMetric(m api.Metric) error { // getPeers gets the current list of peers from the consensus component func (mon *Monitor) getPeers() ([]peer.ID, error) { - // Ger current list of peers var peers []peer.ID err := mon.rpcClient.Call( "", @@ -249,72 +209,28 @@ func (mon *Monitor) getPeers() ([]peer.ID, error) { struct{}{}, &peers, ) + if err != nil { + logger.Error(err) + } return peers, err } -// LastMetrics returns last known VALID metrics of a given type. A metric +// LatestMetrics returns last known VALID metrics of a given type. A metric // is only valid if it has not expired and belongs to a current cluster peers. -func (mon *Monitor) LastMetrics(name string) []api.Metric { +func (mon *Monitor) LatestMetrics(name string) []api.Metric { + latest := mon.metrics.Latest(name) + + // Make sure we only return metrics in the current peerset peers, err := mon.getPeers() if err != nil { - logger.Errorf("LastMetrics could not list peers: %s", err) return []api.Metric{} } - mon.metricsMux.RLock() - defer mon.metricsMux.RUnlock() - - mbyp, ok := mon.metrics[name] - if !ok { - logger.Warningf("LastMetrics: No %s metrics", name) - return []api.Metric{} - } - - metrics := make([]api.Metric, 0, len(mbyp)) - - // only show metrics for current set of peers - for _, peer := range peers { - window, ok := mbyp[peer] - if !ok { - continue - } - last, err := window.Latest() - if err != nil || last.Discard() { - logger.Warningf("no valid last metric for peer: %+v", last) - continue - } - metrics = append(metrics, last) - - } - return metrics + return metrics.PeersetFilter(latest, peers) } // Alerts returns a channel on which alerts are sent when the // monitor detects a failure. func (mon *Monitor) Alerts() <-chan api.Alert { - return mon.alerts -} - -// monitor creates a ticker which fetches current -// cluster peers and checks that the last metric for a peer -// has not expired. -func (mon *Monitor) monitor() { - ticker := time.NewTicker(mon.config.CheckInterval) - for { - select { - case <-ticker.C: - logger.Debug("monitoring tick") - peers, err := mon.getPeers() - if err != nil { - logger.Error(err) - break - } - mon.metricsMux.RLock() - mon.checker.CheckMetrics(peers) - mon.metricsMux.RUnlock() - case <-mon.ctx.Done(): - ticker.Stop() - return - } - } + return mon.checker.Alerts() } diff --git a/monitor/pubsubmon/pubsubmon_test.go b/monitor/pubsubmon/pubsubmon_test.go index cd4dcfc0..a6565d16 100644 --- a/monitor/pubsubmon/pubsubmon_test.go +++ b/monitor/pubsubmon/pubsubmon_test.go @@ -119,7 +119,7 @@ func TestLogMetricConcurrent(t *testing.T) { last := time.Now().Add(-500 * time.Millisecond) for i := 0; i <= 20; i++ { - lastMtrcs := pm.LastMetrics("test") + lastMtrcs := pm.LatestMetrics("test") if len(lastMtrcs) != 1 { t.Error("no valid metrics", len(lastMtrcs), i) @@ -158,18 +158,18 @@ func TestPeerMonitorLogMetric(t *testing.T) { pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) - lastMetrics := pm.LastMetrics("testbad") - if len(lastMetrics) != 0 { - t.Logf("%+v", lastMetrics) + latestMetrics := pm.LatestMetrics("testbad") + if len(latestMetrics) != 0 { + t.Logf("%+v", latestMetrics) t.Error("metrics should be empty") } - lastMetrics = pm.LastMetrics("test") - if len(lastMetrics) != 3 { + latestMetrics = pm.LatestMetrics("test") + if len(latestMetrics) != 3 { t.Error("metrics should correspond to 3 hosts") } - for _, v := range lastMetrics { + for _, v := range latestMetrics { switch v.Peer { case test.TestPeerID1: if v.Value != "0" { @@ -188,11 +188,11 @@ func TestPeerMonitorLogMetric(t *testing.T) { } } - lastMetrics = pm.LastMetrics("test2") - if len(lastMetrics) != 1 { + latestMetrics = pm.LatestMetrics("test2") + if len(latestMetrics) != 1 { t.Fatal("should only be one metric") } - if lastMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) { + if latestMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) { t.Error("metric is not last") } } @@ -220,16 +220,16 @@ func TestPeerMonitorPublishMetric(t *testing.T) { t.Fatal(err) } - time.Sleep(200 * time.Millisecond) + time.Sleep(500 * time.Millisecond) checkMetric := func(t *testing.T, pm *Monitor) { - lastMetrics := pm.LastMetrics("test") - if len(lastMetrics) != 1 { + latestMetrics := pm.LatestMetrics("test") + if len(latestMetrics) != 1 { t.Fatal(pm.host.ID(), "expected 1 published metric") } t.Log(pm.host.ID(), "received metric") - receivedMetric := lastMetrics[0] + receivedMetric := latestMetrics[0] if receivedMetric.Peer != metric.Peer || receivedMetric.Expire != metric.Expire || receivedMetric.Value != metric.Value || diff --git a/monitor/util/metrics_checker.go b/monitor/util/metrics_checker.go deleted file mode 100644 index 028e0195..00000000 --- a/monitor/util/metrics_checker.go +++ /dev/null @@ -1,76 +0,0 @@ -package util - -import ( - "errors" - - "github.com/ipfs/ipfs-cluster/api" - - peer "github.com/libp2p/go-libp2p-peer" -) - -// ErrAlertChannelFull is returned if the alert channel is full. -var ErrAlertChannelFull = errors.New("alert channel is full") - -// Metrics maps metric names to PeerMetrics -type Metrics map[string]PeerMetrics - -// PeerMetrics maps a peer IDs to a metric window. -type PeerMetrics map[peer.ID]*MetricsWindow - -// MetricsChecker provides utilities to find expired metrics -// for a given peerset and send alerts if it proceeds to do so. -type MetricsChecker struct { - alertCh chan api.Alert - metrics Metrics -} - -// NewMetricsChecker creates a MetricsChecker using the given -// Metrics and alert channel. MetricsChecker assumes non-concurrent -// access to the Metrics map. It's the caller's responsability -// to it lock otherwise while calling CheckMetrics(). -func NewMetricsChecker(metrics Metrics, alertCh chan api.Alert) *MetricsChecker { - return &MetricsChecker{ - alertCh: alertCh, - metrics: metrics, - } -} - -// CheckMetrics triggers Check() on all metrics known for the given peerset. -func (mc *MetricsChecker) CheckMetrics(peers []peer.ID) { - for name, peerMetrics := range mc.metrics { - for _, pid := range peers { - window, ok := peerMetrics[pid] - if !ok { // no metrics for this peer - continue - } - mc.Check(pid, name, window) - } - } -} - -// Check sends an alert on the alert channel for the given peer and metric name -// if the last metric in the window was valid but expired. -func (mc *MetricsChecker) Check(pid peer.ID, metricName string, mw *MetricsWindow) error { - last, err := mw.Latest() - if err != nil { // no metrics - return nil - } - - if last.Valid && last.Expired() { - return mc.alert(pid, metricName) - } - return nil -} - -func (mc *MetricsChecker) alert(pid peer.ID, metricName string) error { - alrt := api.Alert{ - Peer: pid, - MetricName: metricName, - } - select { - case mc.alertCh <- alrt: - default: - return ErrAlertChannelFull - } - return nil -} diff --git a/monitor/util/metrics_checker_test.go b/monitor/util/metrics_checker_test.go deleted file mode 100644 index 21b68fce..00000000 --- a/monitor/util/metrics_checker_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package util - -import ( - "testing" - "time" - - peer "github.com/libp2p/go-libp2p-peer" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/test" -) - -func TestMetricsChecker(t *testing.T) { - metrics := make(Metrics) - alerts := make(chan api.Alert, 1) - - checker := NewMetricsChecker(metrics, alerts) - - metr := api.Metric{ - Name: "test", - Peer: test.TestPeerID1, - Value: "1", - Valid: true, - } - metr.SetTTL(2 * time.Second) - - metrics["test"] = make(PeerMetrics) - metrics["test"][test.TestPeerID1] = NewMetricsWindow(5, true) - metrics["test"][test.TestPeerID1].Add(metr) - - checker.CheckMetrics([]peer.ID{test.TestPeerID1}) - select { - case <-alerts: - t.Error("there should not be an alert yet") - default: - } - - time.Sleep(3 * time.Second) - checker.CheckMetrics([]peer.ID{test.TestPeerID1}) - - select { - case <-alerts: - default: - t.Error("an alert should have been triggered") - } - - checker.CheckMetrics([]peer.ID{test.TestPeerID2}) - select { - case <-alerts: - t.Error("there should not be alerts for different peer") - default: - } -}