Feat pubsubmon: Extract MetricsWindow to utils module

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-05-01 15:39:41 +02:00
parent 029cd77c27
commit 1886782530
3 changed files with 176 additions and 71 deletions

View File

@ -5,7 +5,6 @@ package basic
import (
"context"
"errors"
"sync"
"time"
@ -14,6 +13,7 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/util"
)
var logger = logging.Logger("monitor")
@ -21,65 +21,10 @@ var logger = logging.Logger("monitor")
// AlertChannelCap specifies how much buffer the alerts channel has.
var AlertChannelCap = 256
// WindowCap specifies how many metrics to keep for given host and metric type
var WindowCap = 100
// DefaultWindowCap sets the amount of metrics to store per peer.
var DefaultWindowCap = 25
// peerMetrics is just a circular queue
type peerMetrics struct {
last int
window []api.Metric
// mux sync.RWMutex
}
func newPeerMetrics(windowCap int) *peerMetrics {
w := make([]api.Metric, 0, windowCap)
return &peerMetrics{0, w}
}
func (pmets *peerMetrics) add(m api.Metric) {
// pmets.mux.Lock()
// defer pmets.mux.Unlock()
if len(pmets.window) < cap(pmets.window) {
pmets.window = append(pmets.window, m)
pmets.last = len(pmets.window) - 1
return
}
// len == cap
pmets.last = (pmets.last + 1) % cap(pmets.window)
pmets.window[pmets.last] = m
return
}
func (pmets *peerMetrics) latest() (api.Metric, error) {
// pmets.mux.RLock()
// defer pmets.mux.RUnlock()
if len(pmets.window) == 0 {
logger.Warning("no metrics")
return api.Metric{}, errors.New("no metrics")
}
return pmets.window[pmets.last], nil
}
// ordered from newest to oldest
func (pmets *peerMetrics) all() []api.Metric {
// pmets.mux.RLock()
// pmets.mux.RUnlock()
wlen := len(pmets.window)
res := make([]api.Metric, 0, wlen)
if wlen == 0 {
return res
}
for i := pmets.last; i >= 0; i-- {
res = append(res, pmets.window[i])
}
for i := wlen; i > pmets.last; i-- {
res = append(res, pmets.window[i])
}
return res
}
type metricsByPeer map[peer.ID]*peerMetrics
type metricsByPeer map[peer.ID]*util.MetricsWindow
// Monitor is a component in charge of monitoring peers, logging
// metrics and detecting failures
@ -112,7 +57,7 @@ func NewMonitor(cfg *Config) (*Monitor, error) {
return nil, err
}
if WindowCap <= 0 {
if DefaultWindowCap <= 0 {
panic("windowCap too small")
}
@ -124,7 +69,7 @@ func NewMonitor(cfg *Config) (*Monitor, error) {
rpcReady: make(chan struct{}, 1),
metrics: make(map[string]metricsByPeer),
windowCap: WindowCap,
windowCap: DefaultWindowCap,
alerts: make(chan api.Alert, AlertChannelCap),
config: cfg,
@ -178,14 +123,16 @@ func (mon *Monitor) LogMetric(m api.Metric) {
mbyp = make(metricsByPeer)
mon.metrics[name] = mbyp
}
pmets, ok := mbyp[peer]
window, ok := mbyp[peer]
if !ok {
pmets = newPeerMetrics(mon.windowCap)
mbyp[peer] = pmets
// We always lock the outer map, so we can use unsafe
// MetricsWindow.
window = util.NewMetricsWindow(mon.windowCap, false)
mbyp[peer] = window
}
logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire)
pmets.add(m)
window.Add(m)
}
// func (mon *Monitor) getLastMetric(name string, p peer.ID) api.Metric {
@ -203,11 +150,11 @@ func (mon *Monitor) LogMetric(m api.Metric) {
// return emptyMetric
// }
// pmets, ok := mbyp[p]
// window, ok := mbyp[p]
// if !ok {
// return emptyMetric
// }
// metric, err := pmets.latest()
// metric, err := window.Latest()
// if err != nil {
// return emptyMetric
// }
@ -242,11 +189,11 @@ func (mon *Monitor) LastMetrics(name string) []api.Metric {
// only show metrics for current set of peers
for _, peer := range peers {
peerMetrics, ok := mbyp[peer]
window, ok := mbyp[peer]
if !ok {
continue
}
last, err := peerMetrics.latest()
last, err := window.Latest()
if err != nil || last.Discard() {
logger.Warningf("no valid last metric for peer: %+v", last)
continue
@ -308,11 +255,11 @@ func (mon *Monitor) checkMetrics(peers []peer.ID, metricName string) {
// for each of the given current peers
for _, p := range peers {
// get metrics for that peer
pMetrics, ok := metricsByPeer[p]
window, ok := metricsByPeer[p]
if !ok { // no metrics from this peer
continue
}
last, err := pMetrics.latest()
last, err := window.Latest()
if err != nil { // no metrics for this peer
continue
}

View File

@ -0,0 +1,87 @@
// Package util provides common functionality for monitoring components.
package util
import (
"errors"
"sync"
"github.com/ipfs/ipfs-cluster/api"
)
var ErrNoMetrics = errors.New("no metrics have been added to this window")
// MetricsWindow implements a circular queue to store metrics.
type MetricsWindow struct {
last int
safe bool
windowLock sync.RWMutex
window []api.Metric
}
// NewMetricsWindow creates an instance with the given
// window capacity. The safe indicates whether we use a lock
// for concurrent operations.
func NewMetricsWindow(windowCap int, safe bool) *MetricsWindow {
w := make([]api.Metric, 0, windowCap)
return &MetricsWindow{
last: 0,
safe: safe,
window: w,
}
}
// Add adds a new metric to the window. If the window capacity
// has been reached, the oldest metric (by the time it was added),
// will be discarded.
func (mw *MetricsWindow) Add(m api.Metric) {
if mw.safe {
mw.windowLock.Lock()
defer mw.windowLock.Unlock()
}
if len(mw.window) < cap(mw.window) {
mw.window = append(mw.window, m)
mw.last = len(mw.window) - 1
return
}
// len == cap
mw.last = (mw.last + 1) % cap(mw.window)
mw.window[mw.last] = m
return
}
// Latest returns the last metric added. It returns an error
// if no metrics were added.
func (mw *MetricsWindow) Latest() (api.Metric, error) {
if mw.safe {
mw.windowLock.Lock()
defer mw.windowLock.Unlock()
}
if len(mw.window) == 0 {
return api.Metric{}, ErrNoMetrics
}
return mw.window[mw.last], nil
}
// All returns all the metrics in the window, in the inverse order
// they were Added. That is, result[0] will be the last added
// metric.
func (mw *MetricsWindow) All() []api.Metric {
if mw.safe {
mw.windowLock.Lock()
defer mw.windowLock.Unlock()
}
wlen := len(mw.window)
res := make([]api.Metric, 0, wlen)
if wlen == 0 {
return res
}
for i := mw.last; i >= 0; i-- {
res = append(res, mw.window[i])
}
for i := wlen - 1; i > mw.last; i-- {
res = append(res, mw.window[i])
}
return res
}

View File

@ -0,0 +1,71 @@
package util
import (
"testing"
"github.com/ipfs/ipfs-cluster/api"
)
func TestMetricsWindow(t *testing.T) {
mw := NewMetricsWindow(4)
_, err := mw.Latest()
if err != ErrNoMetrics {
t.Error("expected ErrNoMetrics")
}
if len(mw.All()) != 0 {
t.Error("expected 0 metrics")
}
metr := api.Metric{
Name: "test",
Peer: "peer1",
Value: "1",
Valid: true,
}
metr.SetTTL(5)
mw.Add(metr)
metr2, err := mw.Latest()
if err != nil {
t.Fatal(err)
}
if metr2.Value != "1" {
t.Error("expected different value")
}
metr.Value = "2"
mw.Add(metr)
metr.Value = "3"
mw.Add(metr)
all := mw.All()
if len(all) != 3 {
t.Fatal("should only be storing 3 metrics")
}
if all[0].Value != "3" {
t.Error("newest metric should be first")
}
if all[1].Value != "2" {
t.Error("older metric should be second")
}
metr.Value = "4"
mw.Add(metr)
metr.Value = "5"
mw.Add(metr)
all = mw.All()
if len(all) != 4 {
t.Fatal("should only be storing 4 metrics")
}
if all[len(all)-1].Value != "2" {
t.Error("oldest metric should be 2")
}
}