diff --git a/monitor/basic/peer_monitor.go b/monitor/basic/peer_monitor.go index b76f66c8..e1434830 100644 --- a/monitor/basic/peer_monitor.go +++ b/monitor/basic/peer_monitor.go @@ -5,7 +5,6 @@ package basic import ( "context" - "errors" "sync" "time" @@ -14,6 +13,7 @@ import ( peer "github.com/libp2p/go-libp2p-peer" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/monitor/util" ) var logger = logging.Logger("monitor") @@ -21,65 +21,10 @@ var logger = logging.Logger("monitor") // AlertChannelCap specifies how much buffer the alerts channel has. var AlertChannelCap = 256 -// WindowCap specifies how many metrics to keep for given host and metric type -var WindowCap = 100 +// DefaultWindowCap sets the amount of metrics to store per peer. +var DefaultWindowCap = 25 -// peerMetrics is just a circular queue -type peerMetrics struct { - last int - window []api.Metric - // mux sync.RWMutex -} - -func newPeerMetrics(windowCap int) *peerMetrics { - w := make([]api.Metric, 0, windowCap) - return &peerMetrics{0, w} -} - -func (pmets *peerMetrics) add(m api.Metric) { - // pmets.mux.Lock() - // defer pmets.mux.Unlock() - if len(pmets.window) < cap(pmets.window) { - pmets.window = append(pmets.window, m) - pmets.last = len(pmets.window) - 1 - return - } - - // len == cap - pmets.last = (pmets.last + 1) % cap(pmets.window) - pmets.window[pmets.last] = m - return -} - -func (pmets *peerMetrics) latest() (api.Metric, error) { - // pmets.mux.RLock() - // defer pmets.mux.RUnlock() - if len(pmets.window) == 0 { - logger.Warning("no metrics") - return api.Metric{}, errors.New("no metrics") - } - return pmets.window[pmets.last], nil -} - -// ordered from newest to oldest -func (pmets *peerMetrics) all() []api.Metric { - // pmets.mux.RLock() - // pmets.mux.RUnlock() - wlen := len(pmets.window) - res := make([]api.Metric, 0, wlen) - if wlen == 0 { - return res - } - for i := pmets.last; i >= 0; i-- { - res = append(res, pmets.window[i]) - } - for i := wlen; i > pmets.last; i-- { - res = append(res, pmets.window[i]) - } - return res -} - -type metricsByPeer map[peer.ID]*peerMetrics +type metricsByPeer map[peer.ID]*util.MetricsWindow // Monitor is a component in charge of monitoring peers, logging // metrics and detecting failures @@ -112,7 +57,7 @@ func NewMonitor(cfg *Config) (*Monitor, error) { return nil, err } - if WindowCap <= 0 { + if DefaultWindowCap <= 0 { panic("windowCap too small") } @@ -124,7 +69,7 @@ func NewMonitor(cfg *Config) (*Monitor, error) { rpcReady: make(chan struct{}, 1), metrics: make(map[string]metricsByPeer), - windowCap: WindowCap, + windowCap: DefaultWindowCap, alerts: make(chan api.Alert, AlertChannelCap), config: cfg, @@ -178,14 +123,16 @@ func (mon *Monitor) LogMetric(m api.Metric) { mbyp = make(metricsByPeer) mon.metrics[name] = mbyp } - pmets, ok := mbyp[peer] + window, ok := mbyp[peer] if !ok { - pmets = newPeerMetrics(mon.windowCap) - mbyp[peer] = pmets + // We always lock the outer map, so we can use unsafe + // MetricsWindow. + window = util.NewMetricsWindow(mon.windowCap, false) + mbyp[peer] = window } logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire) - pmets.add(m) + window.Add(m) } // func (mon *Monitor) getLastMetric(name string, p peer.ID) api.Metric { @@ -203,11 +150,11 @@ func (mon *Monitor) LogMetric(m api.Metric) { // return emptyMetric // } -// pmets, ok := mbyp[p] +// window, ok := mbyp[p] // if !ok { // return emptyMetric // } -// metric, err := pmets.latest() +// metric, err := window.Latest() // if err != nil { // return emptyMetric // } @@ -242,11 +189,11 @@ func (mon *Monitor) LastMetrics(name string) []api.Metric { // only show metrics for current set of peers for _, peer := range peers { - peerMetrics, ok := mbyp[peer] + window, ok := mbyp[peer] if !ok { continue } - last, err := peerMetrics.latest() + last, err := window.Latest() if err != nil || last.Discard() { logger.Warningf("no valid last metric for peer: %+v", last) continue @@ -308,11 +255,11 @@ func (mon *Monitor) checkMetrics(peers []peer.ID, metricName string) { // for each of the given current peers for _, p := range peers { // get metrics for that peer - pMetrics, ok := metricsByPeer[p] + window, ok := metricsByPeer[p] if !ok { // no metrics from this peer continue } - last, err := pMetrics.latest() + last, err := window.Latest() if err != nil { // no metrics for this peer continue } diff --git a/monitor/util/metrics_window.go b/monitor/util/metrics_window.go new file mode 100644 index 00000000..4b2906bf --- /dev/null +++ b/monitor/util/metrics_window.go @@ -0,0 +1,87 @@ +// Package util provides common functionality for monitoring components. +package util + +import ( + "errors" + "sync" + + "github.com/ipfs/ipfs-cluster/api" +) + +var ErrNoMetrics = errors.New("no metrics have been added to this window") + +// MetricsWindow implements a circular queue to store metrics. +type MetricsWindow struct { + last int + + safe bool + windowLock sync.RWMutex + window []api.Metric +} + +// NewMetricsWindow creates an instance with the given +// window capacity. The safe indicates whether we use a lock +// for concurrent operations. +func NewMetricsWindow(windowCap int, safe bool) *MetricsWindow { + w := make([]api.Metric, 0, windowCap) + return &MetricsWindow{ + last: 0, + safe: safe, + window: w, + } +} + +// Add adds a new metric to the window. If the window capacity +// has been reached, the oldest metric (by the time it was added), +// will be discarded. +func (mw *MetricsWindow) Add(m api.Metric) { + if mw.safe { + mw.windowLock.Lock() + defer mw.windowLock.Unlock() + } + if len(mw.window) < cap(mw.window) { + mw.window = append(mw.window, m) + mw.last = len(mw.window) - 1 + return + } + + // len == cap + mw.last = (mw.last + 1) % cap(mw.window) + mw.window[mw.last] = m + return +} + +// Latest returns the last metric added. It returns an error +// if no metrics were added. +func (mw *MetricsWindow) Latest() (api.Metric, error) { + if mw.safe { + mw.windowLock.Lock() + defer mw.windowLock.Unlock() + } + if len(mw.window) == 0 { + return api.Metric{}, ErrNoMetrics + } + return mw.window[mw.last], nil +} + +// All returns all the metrics in the window, in the inverse order +// they were Added. That is, result[0] will be the last added +// metric. +func (mw *MetricsWindow) All() []api.Metric { + if mw.safe { + mw.windowLock.Lock() + defer mw.windowLock.Unlock() + } + wlen := len(mw.window) + res := make([]api.Metric, 0, wlen) + if wlen == 0 { + return res + } + for i := mw.last; i >= 0; i-- { + res = append(res, mw.window[i]) + } + for i := wlen - 1; i > mw.last; i-- { + res = append(res, mw.window[i]) + } + return res +} diff --git a/monitor/util/metrics_window_test.go b/monitor/util/metrics_window_test.go new file mode 100644 index 00000000..9f3f5305 --- /dev/null +++ b/monitor/util/metrics_window_test.go @@ -0,0 +1,71 @@ +package util + +import ( + "testing" + + "github.com/ipfs/ipfs-cluster/api" +) + +func TestMetricsWindow(t *testing.T) { + mw := NewMetricsWindow(4) + + _, err := mw.Latest() + if err != ErrNoMetrics { + t.Error("expected ErrNoMetrics") + } + + if len(mw.All()) != 0 { + t.Error("expected 0 metrics") + } + + metr := api.Metric{ + Name: "test", + Peer: "peer1", + Value: "1", + Valid: true, + } + metr.SetTTL(5) + + mw.Add(metr) + + metr2, err := mw.Latest() + if err != nil { + t.Fatal(err) + } + + if metr2.Value != "1" { + t.Error("expected different value") + } + + metr.Value = "2" + mw.Add(metr) + metr.Value = "3" + mw.Add(metr) + + all := mw.All() + if len(all) != 3 { + t.Fatal("should only be storing 3 metrics") + } + + if all[0].Value != "3" { + t.Error("newest metric should be first") + } + + if all[1].Value != "2" { + t.Error("older metric should be second") + } + + metr.Value = "4" + mw.Add(metr) + metr.Value = "5" + mw.Add(metr) + + all = mw.All() + if len(all) != 4 { + t.Fatal("should only be storing 4 metrics") + } + + if all[len(all)-1].Value != "2" { + t.Error("oldest metric should be 2") + } +}