Monitor: more refactoring. Rename util to metrics

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-05-08 11:38:12 +02:00
parent 8c8487d74b
commit 954ede931f
14 changed files with 418 additions and 406 deletions

View File

@ -167,9 +167,9 @@ type PeerMonitor interface {
// PublishMetric sends a metric to the rest of the peers.
// How to send it, and to who, is to be decided by the implementation.
PublishMetric(api.Metric) error
// LastMetrics returns a map with the latest metrics of matching name
// LastestMetrics returns a map with the latest metrics of matching name
// for the current cluster peers.
LastMetrics(name string) []api.Metric
LastestMetrics(name string) []api.Metric
// Alerts delivers alerts generated when this peer monitor detects
// a problem (i.e. metrics not arriving as expected). Alerts can be used
// to trigger self-healing measures or re-pinnings of content.

View File

@ -9,10 +9,9 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/util"
"github.com/ipfs/ipfs-cluster/monitor/metrics"
"github.com/ipfs/ipfs-cluster/rpcutil"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
@ -22,12 +21,6 @@ import (
var logger = logging.Logger("monitor")
// AlertChannelCap specifies how much buffer the alerts channel has.
var AlertChannelCap = 256
// DefaultWindowCap sets the amount of metrics to store per peer.
var DefaultWindowCap = 25
// Monitor is a component in charge of monitoring peers, logging
// metrics and detecting failures
type Monitor struct {
@ -36,12 +29,8 @@ type Monitor struct {
rpcClient *rpc.Client
rpcReady chan struct{}
metrics util.Metrics
metricsMux sync.RWMutex
windowCap int
checker *util.MetricsChecker
alerts chan api.Alert
metrics *metrics.Store
checker *metrics.Checker
config *Config
@ -50,35 +39,26 @@ type Monitor struct {
wg sync.WaitGroup
}
// NewMonitor creates a new monitor. It receives the window capacity
// (how many metrics to keep for each peer and type of metric) and the
// monitoringInterval (interval between the checks that produce alerts)
// as parameters
// NewMonitor creates a new monitor using the given config.
func NewMonitor(cfg *Config) (*Monitor, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
if DefaultWindowCap <= 0 {
panic("windowCap too small")
}
ctx, cancel := context.WithCancel(context.Background())
alertCh := make(chan api.Alert, AlertChannelCap)
metrics := make(util.Metrics)
mtrs := metrics.NewStore()
checker := metrics.NewChecker(mtrs)
mon := &Monitor{
ctx: ctx,
cancel: cancel,
rpcReady: make(chan struct{}, 1),
metrics: metrics,
windowCap: DefaultWindowCap,
checker: util.NewMetricsChecker(metrics, alertCh),
alerts: alertCh,
config: cfg,
metrics: mtrs,
checker: checker,
config: cfg,
}
go mon.run()
@ -88,7 +68,7 @@ func NewMonitor(cfg *Config) (*Monitor, error) {
func (mon *Monitor) run() {
select {
case <-mon.rpcReady:
go mon.monitor()
go mon.checker.Watch(mon.ctx, mon.getPeers, mon.config.CheckInterval)
case <-mon.ctx.Done():
}
}
@ -120,25 +100,8 @@ func (mon *Monitor) Shutdown() error {
// LogMetric stores a metric so it can later be retrieved.
func (mon *Monitor) LogMetric(m api.Metric) error {
mon.metricsMux.Lock()
defer mon.metricsMux.Unlock()
name := m.Name
peer := m.Peer
mbyp, ok := mon.metrics[name]
if !ok {
mbyp = make(util.PeerMetrics)
mon.metrics[name] = mbyp
}
window, ok := mbyp[peer]
if !ok {
// We always lock the outer map, so we can use unsafe
// MetricsWindow.
window = util.NewMetricsWindow(mon.windowCap, false)
mbyp[peer] = window
}
logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire)
window.Add(m)
mon.metrics.Add(m)
logger.Debugf("basic monitor logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire)
return nil
}
@ -151,7 +114,7 @@ func (mon *Monitor) PublishMetric(m api.Metric) error {
peers, err := mon.getPeers()
if err != nil {
logger.Error("PublishPeers could not list peers:", err)
return err
}
ctxs, cancels := rpcutil.CtxsWithTimeout(mon.ctx, len(peers), m.GetTTL()/2)
@ -204,7 +167,6 @@ func (mon *Monitor) PublishMetric(m api.Metric) error {
// getPeers gets the current list of peers from the consensus component
func (mon *Monitor) getPeers() ([]peer.ID, error) {
// Ger current list of peers
var peers []peer.ID
err := mon.rpcClient.Call(
"",
@ -213,98 +175,28 @@ func (mon *Monitor) getPeers() ([]peer.ID, error) {
struct{}{},
&peers,
)
if err != nil {
logger.Error(err)
}
return peers, err
}
// func (mon *Monitor) getLastMetric(name string, p peer.ID) api.Metric {
// mon.metricsMux.RLock()
// defer mon.metricsMux.RUnlock()
// LatestMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peers.
func (mon *Monitor) LatestMetrics(name string) []api.Metric {
latest := mon.metrics.Latest(name)
// emptyMetric := api.Metric{
// Name: name,
// Peer: p,
// Valid: false,
// }
// mbyp, ok := mon.metrics[name]
// if !ok {
// return emptyMetric
// }
// window, ok := mbyp[p]
// if !ok {
// return emptyMetric
// }
// metric, err := window.Latest()
// if err != nil {
// return emptyMetric
// }
// return metric
// }
// LastMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peer.
func (mon *Monitor) LastMetrics(name string) []api.Metric {
// Make sure we only return metrics in the current peerset
peers, err := mon.getPeers()
if err != nil {
logger.Errorf("LastMetrics could not list peers: %s", err)
return []api.Metric{}
}
mon.metricsMux.RLock()
defer mon.metricsMux.RUnlock()
mbyp, ok := mon.metrics[name]
if !ok {
logger.Warningf("LastMetrics: No %s metrics", name)
return []api.Metric{}
}
metrics := make([]api.Metric, 0, len(mbyp))
// only show metrics for current set of peers
for _, peer := range peers {
window, ok := mbyp[peer]
if !ok {
continue
}
last, err := window.Latest()
if err != nil || last.Discard() {
logger.Warningf("no valid last metric for peer: %+v", last)
continue
}
metrics = append(metrics, last)
}
return metrics
return metrics.PeersetFilter(latest, peers)
}
// Alerts returns a channel on which alerts are sent when the
// monitor detects a failure.
func (mon *Monitor) Alerts() <-chan api.Alert {
return mon.alerts
}
// monitor creates a ticker which fetches current
// cluster peers and checks that the last metric for a peer
// has not expired.
func (mon *Monitor) monitor() {
ticker := time.NewTicker(mon.config.CheckInterval)
for {
select {
case <-ticker.C:
logger.Debug("monitoring tick")
peers, err := mon.getPeers()
if err != nil {
logger.Error(err)
break
}
mon.metricsMux.RLock()
mon.checker.CheckMetrics(peers)
mon.metricsMux.RUnlock()
case <-mon.ctx.Done():
ticker.Stop()
return
}
}
return mon.checker.Alerts()
}

View File

@ -107,7 +107,7 @@ func TestLogMetricConcurrent(t *testing.T) {
last := time.Now().Add(-500 * time.Millisecond)
for i := 0; i <= 20; i++ {
lastMtrcs := pm.LastMetrics("test")
lastMtrcs := pm.LatestMetrics("test")
if len(lastMtrcs) != 1 {
t.Error("no valid metrics", len(lastMtrcs), i)
@ -146,18 +146,18 @@ func TestPeerMonitorLogMetric(t *testing.T) {
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
lastMetrics := pm.LastMetrics("testbad")
if len(lastMetrics) != 0 {
t.Logf("%+v", lastMetrics)
latestMetrics := pm.LatestMetrics("testbad")
if len(latestMetrics) != 0 {
t.Logf("%+v", latestMetrics)
t.Error("metrics should be empty")
}
lastMetrics = pm.LastMetrics("test")
if len(lastMetrics) != 3 {
latestMetrics = pm.LatestMetrics("test")
if len(latestMetrics) != 3 {
t.Error("metrics should correspond to 3 hosts")
}
for _, v := range lastMetrics {
for _, v := range latestMetrics {
switch v.Peer {
case test.TestPeerID1:
if v.Value != "0" {
@ -176,11 +176,11 @@ func TestPeerMonitorLogMetric(t *testing.T) {
}
}
lastMetrics = pm.LastMetrics("test2")
if len(lastMetrics) != 1 {
latestMetrics = pm.LatestMetrics("test2")
if len(latestMetrics) != 1 {
t.Fatal("should only be one metric")
}
if lastMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) {
if latestMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) {
t.Error("metric is not last")
}
}

View File

@ -0,0 +1,87 @@
package metrics
import (
"context"
"errors"
"time"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-peer"
)
// AlertChannelCap specifies how much buffer the alerts channel has.
var AlertChannelCap = 256
// ErrAlertChannelFull is returned if the alert channel is full.
var ErrAlertChannelFull = errors.New("alert channel is full")
// Checker provides utilities to find expired metrics
// for a given peerset and send alerts if it proceeds to do so.
type Checker struct {
alertCh chan api.Alert
metrics *Store
}
// NewChecker creates a Checker using the given
// MetricsStore.
func NewChecker(metrics *Store) *Checker {
return &Checker{
alertCh: make(chan api.Alert, AlertChannelCap),
metrics: metrics,
}
}
// CheckPeers will trigger alerts for expired metrics belonging to the
// given peerset.
func (mc *Checker) CheckPeers(peers []peer.ID) error {
for _, peer := range peers {
for _, metric := range mc.metrics.PeerMetrics(peer) {
if metric.Valid && metric.Expired() {
err := mc.alert(metric.Peer, metric.Name)
if err != nil {
return err
}
}
}
}
return nil
}
func (mc *Checker) alert(pid peer.ID, metricName string) error {
alrt := api.Alert{
Peer: pid,
MetricName: metricName,
}
select {
case mc.alertCh <- alrt:
default:
return ErrAlertChannelFull
}
return nil
}
// Alerts returns a channel which gets notified by CheckPeers.
func (mc *Checker) Alerts() <-chan api.Alert {
return mc.alertCh
}
// Watch will trigger regular CheckPeers on the given interval. It will call
// peersF to obtain a peerset. It can be stopped by cancelling the context.
// Usually you want to launch this in a goroutine.
func (mc *Checker) Watch(ctx context.Context, peersF func() ([]peer.ID, error), interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
peers, err := peersF()
if err != nil {
continue
}
mc.CheckPeers(peers)
case <-ctx.Done():
ticker.Stop()
return
}
}
}

View File

@ -0,0 +1,83 @@
package metrics
import (
"context"
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
func TestChecker(t *testing.T) {
metrics := NewStore()
checker := NewChecker(metrics)
metr := api.Metric{
Name: "test",
Peer: test.TestPeerID1,
Value: "1",
Valid: true,
}
metr.SetTTL(2 * time.Second)
metrics.Add(metr)
checker.CheckPeers([]peer.ID{test.TestPeerID1})
select {
case <-checker.Alerts():
t.Error("there should not be an alert yet")
default:
}
time.Sleep(3 * time.Second)
err := checker.CheckPeers([]peer.ID{test.TestPeerID1})
if err != nil {
t.Fatal(err)
}
select {
case <-checker.Alerts():
default:
t.Error("an alert should have been triggered")
}
checker.CheckPeers([]peer.ID{test.TestPeerID2})
select {
case <-checker.Alerts():
t.Error("there should not be alerts for different peer")
default:
}
}
func TestCheckerWatch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
metrics := NewStore()
checker := NewChecker(metrics)
metr := api.Metric{
Name: "test",
Peer: test.TestPeerID1,
Value: "1",
Valid: true,
}
metr.SetTTL(100 * time.Millisecond)
metrics.Add(metr)
peersF := func() ([]peer.ID, error) {
return []peer.ID{test.TestPeerID1}, nil
}
go checker.Watch(ctx, peersF, 200*time.Millisecond)
select {
case a := <-checker.Alerts():
t.Log("received alert:", a)
case <-ctx.Done():
t.Fatal("should have received an alert")
}
}

92
monitor/metrics/store.go Normal file
View File

@ -0,0 +1,92 @@
package metrics
import (
"sync"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeerMetrics maps a peer IDs to a metrics window.
type PeerMetrics map[peer.ID]*Window
// Store can be used to store and access metrics.
type Store struct {
mux sync.RWMutex
byName map[string]PeerMetrics
}
// NewStore can be used to create a Store.
func NewStore() *Store {
return &Store{
byName: make(map[string]PeerMetrics),
}
}
// Add inserts a new metric in Metrics.
func (mtrs *Store) Add(m api.Metric) {
mtrs.mux.Lock()
defer mtrs.mux.Unlock()
name := m.Name
peer := m.Peer
mbyp, ok := mtrs.byName[name]
if !ok {
mbyp = make(PeerMetrics)
mtrs.byName[name] = mbyp
}
window, ok := mbyp[peer]
if !ok {
// We always lock the outer map, so we can use unsafe
// Window.
window = NewWindow(DefaultWindowCap, false)
mbyp[peer] = window
}
window.Add(m)
}
// Latest returns all the last known valid metrics. A metric is valid
// if it has not expired.
func (mtrs *Store) Latest(name string) []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
byPeer, ok := mtrs.byName[name]
if !ok {
return []api.Metric{}
}
metrics := make([]api.Metric, 0, len(byPeer))
for _, window := range byPeer {
m, err := window.Latest()
if err != nil || m.Discard() {
continue
}
metrics = append(metrics, m)
}
return metrics
}
// PeerMetrics returns the latest metrics for a given peer ID for
// all known metrics types. It may return expired metrics.
func (mtrs *Store) PeerMetrics(peer peer.ID) []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
result := make([]api.Metric, 0)
for _, byPeer := range mtrs.byName {
window, ok := byPeer[peer]
if !ok {
continue
}
metric, err := window.Latest()
if err != nil {
continue
}
result = append(result, metric)
}
return result
}

View File

@ -0,0 +1,34 @@
package metrics
import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
func TestStoreLatest(t *testing.T) {
store := NewStore()
metr := api.Metric{
Name: "test",
Peer: test.TestPeerID1,
Value: "1",
Valid: true,
}
metr.SetTTL(200 * time.Millisecond)
store.Add(metr)
latest := store.Latest("test")
if len(latest) != 1 {
t.Error("expected 1 metric")
}
time.Sleep(220 * time.Millisecond)
latest = store.Latest("test")
if len(latest) != 0 {
t.Error("expected no metrics")
}
}

28
monitor/metrics/util.go Normal file
View File

@ -0,0 +1,28 @@
package metrics
import (
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeersetFilter removes all metrics not belonging to the given
// peerset
func PeersetFilter(metrics []api.Metric, peerset []peer.ID) []api.Metric {
peerMap := make(map[peer.ID]struct{})
for _, peer := range peerset {
peerMap[peer] = struct{}{}
}
filtered := make([]api.Metric, 0, len(metrics))
for _, metric := range metrics {
_, ok := peerMap[metric.Peer]
if !ok {
continue
}
filtered = append(filtered, metric)
}
return filtered
}

View File

@ -1,5 +1,7 @@
// Package util provides common functionality for monitoring components.
package util
// Package metrics provides common functionality for working with metrics,
// particulary useful for monitoring components. It includes types to store,
// check and filter metrics.
package metrics
import (
"errors"
@ -8,11 +10,14 @@ import (
"github.com/ipfs/ipfs-cluster/api"
)
// ErrNoMetrics is returned when there are no metrics in a MetricsWindow.
// DefaultWindowCap sets the amount of metrics to store per peer.
var DefaultWindowCap = 25
// ErrNoMetrics is returned when there are no metrics in a Window.
var ErrNoMetrics = errors.New("no metrics have been added to this window")
// MetricsWindow implements a circular queue to store metrics.
type MetricsWindow struct {
// Window implements a circular queue to store metrics.
type Window struct {
last int
safe bool
@ -20,12 +25,16 @@ type MetricsWindow struct {
window []api.Metric
}
// NewMetricsWindow creates an instance with the given
// NewWindow creates an instance with the given
// window capacity. The safe indicates whether we use a lock
// for concurrent operations.
func NewMetricsWindow(windowCap int, safe bool) *MetricsWindow {
func NewWindow(windowCap int, safe bool) *Window {
if windowCap <= 0 {
panic("invalid windowCap")
}
w := make([]api.Metric, 0, windowCap)
return &MetricsWindow{
return &Window{
last: 0,
safe: safe,
window: w,
@ -35,7 +44,7 @@ func NewMetricsWindow(windowCap int, safe bool) *MetricsWindow {
// Add adds a new metric to the window. If the window capacity
// has been reached, the oldest metric (by the time it was added),
// will be discarded.
func (mw *MetricsWindow) Add(m api.Metric) {
func (mw *Window) Add(m api.Metric) {
if mw.safe {
mw.windowLock.Lock()
defer mw.windowLock.Unlock()
@ -54,7 +63,7 @@ func (mw *MetricsWindow) Add(m api.Metric) {
// Latest returns the last metric added. It returns an error
// if no metrics were added.
func (mw *MetricsWindow) Latest() (api.Metric, error) {
func (mw *Window) Latest() (api.Metric, error) {
if mw.safe {
mw.windowLock.Lock()
defer mw.windowLock.Unlock()
@ -68,7 +77,7 @@ func (mw *MetricsWindow) Latest() (api.Metric, error) {
// All returns all the metrics in the window, in the inverse order
// they were Added. That is, result[0] will be the last added
// metric.
func (mw *MetricsWindow) All() []api.Metric {
func (mw *Window) All() []api.Metric {
if mw.safe {
mw.windowLock.Lock()
defer mw.windowLock.Unlock()

View File

@ -1,4 +1,4 @@
package util
package metrics
import (
"testing"
@ -8,7 +8,7 @@ import (
)
func TestMetricsWindow(t *testing.T) {
mw := NewMetricsWindow(4, true)
mw := NewWindow(4, true)
_, err := mw.Latest()
if err != ErrNoMetrics {

View File

@ -6,10 +6,9 @@ import (
"bytes"
"context"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/util"
"github.com/ipfs/ipfs-cluster/monitor/metrics"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
@ -22,13 +21,7 @@ import (
var logger = logging.Logger("monitor")
// PubsubTopic specifies the topic used to publish Cluster metrics.
var PubsubTopic = "pubsubmon"
// AlertChannelCap specifies how much buffer the alerts channel has.
var AlertChannelCap = 256
// DefaultWindowCap sets the amount of metrics to store per peer.
var DefaultWindowCap = 25
var PubsubTopic = "monitor.metrics"
var msgpackHandle = msgpack.DefaultMsgpackHandle()
@ -44,12 +37,8 @@ type Monitor struct {
pubsub *floodsub.PubSub
subscription *floodsub.Subscription
metrics util.Metrics
metricsMux sync.RWMutex
windowCap int
checker *util.MetricsChecker
alerts chan api.Alert
metrics *metrics.Store
checker *metrics.Checker
config *Config
@ -58,24 +47,17 @@ type Monitor struct {
wg sync.WaitGroup
}
// New creates a new monitor. It receives the window capacity
// (how many metrics to keep for each peer and type of metric) and the
// monitoringInterval (interval between the checks that produce alerts)
// as parameters
// New creates a new PubSub monitor, using the given host and config.
func New(h host.Host, cfg *Config) (*Monitor, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
if DefaultWindowCap <= 0 {
panic("windowCap too small")
}
ctx, cancel := context.WithCancel(context.Background())
alertCh := make(chan api.Alert, AlertChannelCap)
metrics := make(util.Metrics)
mtrs := metrics.NewStore()
checker := metrics.NewChecker(mtrs)
pubsub, err := floodsub.NewFloodSub(ctx, h)
if err != nil {
@ -98,11 +80,9 @@ func New(h host.Host, cfg *Config) (*Monitor, error) {
pubsub: pubsub,
subscription: subscription,
metrics: metrics,
windowCap: DefaultWindowCap,
checker: util.NewMetricsChecker(metrics, alertCh),
alerts: alertCh,
config: cfg,
metrics: mtrs,
checker: checker,
config: cfg,
}
go mon.run()
@ -112,8 +92,8 @@ func New(h host.Host, cfg *Config) (*Monitor, error) {
func (mon *Monitor) run() {
select {
case <-mon.rpcReady:
go mon.monitor()
go mon.logFromPubsub()
go mon.checker.Watch(mon.ctx, mon.getPeers, mon.config.CheckInterval)
case <-mon.ctx.Done():
}
}
@ -174,8 +154,6 @@ func (mon *Monitor) Shutdown() error {
logger.Info("stopping Monitor")
close(mon.rpcReady)
// not necessary as this just removes subscription
// mon.subscription.Cancel()
mon.cancel()
mon.wg.Wait()
@ -185,25 +163,8 @@ func (mon *Monitor) Shutdown() error {
// LogMetric stores a metric so it can later be retrieved.
func (mon *Monitor) LogMetric(m api.Metric) error {
mon.metricsMux.Lock()
defer mon.metricsMux.Unlock()
name := m.Name
peer := m.Peer
mbyp, ok := mon.metrics[name]
if !ok {
mbyp = make(util.PeerMetrics)
mon.metrics[name] = mbyp
}
window, ok := mbyp[peer]
if !ok {
// We always lock the outer map, so we can use unsafe
// MetricsWindow.
window = util.NewMetricsWindow(mon.windowCap, false)
mbyp[peer] = window
}
logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire)
window.Add(m)
mon.metrics.Add(m)
logger.Debugf("pubsub mon logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire)
return nil
}
@ -240,7 +201,6 @@ func (mon *Monitor) PublishMetric(m api.Metric) error {
// getPeers gets the current list of peers from the consensus component
func (mon *Monitor) getPeers() ([]peer.ID, error) {
// Ger current list of peers
var peers []peer.ID
err := mon.rpcClient.Call(
"",
@ -249,72 +209,28 @@ func (mon *Monitor) getPeers() ([]peer.ID, error) {
struct{}{},
&peers,
)
if err != nil {
logger.Error(err)
}
return peers, err
}
// LastMetrics returns last known VALID metrics of a given type. A metric
// LatestMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peers.
func (mon *Monitor) LastMetrics(name string) []api.Metric {
func (mon *Monitor) LatestMetrics(name string) []api.Metric {
latest := mon.metrics.Latest(name)
// Make sure we only return metrics in the current peerset
peers, err := mon.getPeers()
if err != nil {
logger.Errorf("LastMetrics could not list peers: %s", err)
return []api.Metric{}
}
mon.metricsMux.RLock()
defer mon.metricsMux.RUnlock()
mbyp, ok := mon.metrics[name]
if !ok {
logger.Warningf("LastMetrics: No %s metrics", name)
return []api.Metric{}
}
metrics := make([]api.Metric, 0, len(mbyp))
// only show metrics for current set of peers
for _, peer := range peers {
window, ok := mbyp[peer]
if !ok {
continue
}
last, err := window.Latest()
if err != nil || last.Discard() {
logger.Warningf("no valid last metric for peer: %+v", last)
continue
}
metrics = append(metrics, last)
}
return metrics
return metrics.PeersetFilter(latest, peers)
}
// Alerts returns a channel on which alerts are sent when the
// monitor detects a failure.
func (mon *Monitor) Alerts() <-chan api.Alert {
return mon.alerts
}
// monitor creates a ticker which fetches current
// cluster peers and checks that the last metric for a peer
// has not expired.
func (mon *Monitor) monitor() {
ticker := time.NewTicker(mon.config.CheckInterval)
for {
select {
case <-ticker.C:
logger.Debug("monitoring tick")
peers, err := mon.getPeers()
if err != nil {
logger.Error(err)
break
}
mon.metricsMux.RLock()
mon.checker.CheckMetrics(peers)
mon.metricsMux.RUnlock()
case <-mon.ctx.Done():
ticker.Stop()
return
}
}
return mon.checker.Alerts()
}

View File

@ -119,7 +119,7 @@ func TestLogMetricConcurrent(t *testing.T) {
last := time.Now().Add(-500 * time.Millisecond)
for i := 0; i <= 20; i++ {
lastMtrcs := pm.LastMetrics("test")
lastMtrcs := pm.LatestMetrics("test")
if len(lastMtrcs) != 1 {
t.Error("no valid metrics", len(lastMtrcs), i)
@ -158,18 +158,18 @@ func TestPeerMonitorLogMetric(t *testing.T) {
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
lastMetrics := pm.LastMetrics("testbad")
if len(lastMetrics) != 0 {
t.Logf("%+v", lastMetrics)
latestMetrics := pm.LatestMetrics("testbad")
if len(latestMetrics) != 0 {
t.Logf("%+v", latestMetrics)
t.Error("metrics should be empty")
}
lastMetrics = pm.LastMetrics("test")
if len(lastMetrics) != 3 {
latestMetrics = pm.LatestMetrics("test")
if len(latestMetrics) != 3 {
t.Error("metrics should correspond to 3 hosts")
}
for _, v := range lastMetrics {
for _, v := range latestMetrics {
switch v.Peer {
case test.TestPeerID1:
if v.Value != "0" {
@ -188,11 +188,11 @@ func TestPeerMonitorLogMetric(t *testing.T) {
}
}
lastMetrics = pm.LastMetrics("test2")
if len(lastMetrics) != 1 {
latestMetrics = pm.LatestMetrics("test2")
if len(latestMetrics) != 1 {
t.Fatal("should only be one metric")
}
if lastMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) {
if latestMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) {
t.Error("metric is not last")
}
}
@ -220,16 +220,16 @@ func TestPeerMonitorPublishMetric(t *testing.T) {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
checkMetric := func(t *testing.T, pm *Monitor) {
lastMetrics := pm.LastMetrics("test")
if len(lastMetrics) != 1 {
latestMetrics := pm.LatestMetrics("test")
if len(latestMetrics) != 1 {
t.Fatal(pm.host.ID(), "expected 1 published metric")
}
t.Log(pm.host.ID(), "received metric")
receivedMetric := lastMetrics[0]
receivedMetric := latestMetrics[0]
if receivedMetric.Peer != metric.Peer ||
receivedMetric.Expire != metric.Expire ||
receivedMetric.Value != metric.Value ||

View File

@ -1,76 +0,0 @@
package util
import (
"errors"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-peer"
)
// ErrAlertChannelFull is returned if the alert channel is full.
var ErrAlertChannelFull = errors.New("alert channel is full")
// Metrics maps metric names to PeerMetrics
type Metrics map[string]PeerMetrics
// PeerMetrics maps a peer IDs to a metric window.
type PeerMetrics map[peer.ID]*MetricsWindow
// MetricsChecker provides utilities to find expired metrics
// for a given peerset and send alerts if it proceeds to do so.
type MetricsChecker struct {
alertCh chan api.Alert
metrics Metrics
}
// NewMetricsChecker creates a MetricsChecker using the given
// Metrics and alert channel. MetricsChecker assumes non-concurrent
// access to the Metrics map. It's the caller's responsability
// to it lock otherwise while calling CheckMetrics().
func NewMetricsChecker(metrics Metrics, alertCh chan api.Alert) *MetricsChecker {
return &MetricsChecker{
alertCh: alertCh,
metrics: metrics,
}
}
// CheckMetrics triggers Check() on all metrics known for the given peerset.
func (mc *MetricsChecker) CheckMetrics(peers []peer.ID) {
for name, peerMetrics := range mc.metrics {
for _, pid := range peers {
window, ok := peerMetrics[pid]
if !ok { // no metrics for this peer
continue
}
mc.Check(pid, name, window)
}
}
}
// Check sends an alert on the alert channel for the given peer and metric name
// if the last metric in the window was valid but expired.
func (mc *MetricsChecker) Check(pid peer.ID, metricName string, mw *MetricsWindow) error {
last, err := mw.Latest()
if err != nil { // no metrics
return nil
}
if last.Valid && last.Expired() {
return mc.alert(pid, metricName)
}
return nil
}
func (mc *MetricsChecker) alert(pid peer.ID, metricName string) error {
alrt := api.Alert{
Peer: pid,
MetricName: metricName,
}
select {
case mc.alertCh <- alrt:
default:
return ErrAlertChannelFull
}
return nil
}

View File

@ -1,53 +0,0 @@
package util
import (
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
func TestMetricsChecker(t *testing.T) {
metrics := make(Metrics)
alerts := make(chan api.Alert, 1)
checker := NewMetricsChecker(metrics, alerts)
metr := api.Metric{
Name: "test",
Peer: test.TestPeerID1,
Value: "1",
Valid: true,
}
metr.SetTTL(2 * time.Second)
metrics["test"] = make(PeerMetrics)
metrics["test"][test.TestPeerID1] = NewMetricsWindow(5, true)
metrics["test"][test.TestPeerID1].Add(metr)
checker.CheckMetrics([]peer.ID{test.TestPeerID1})
select {
case <-alerts:
t.Error("there should not be an alert yet")
default:
}
time.Sleep(3 * time.Second)
checker.CheckMetrics([]peer.ID{test.TestPeerID1})
select {
case <-alerts:
default:
t.Error("an alert should have been triggered")
}
checker.CheckMetrics([]peer.ID{test.TestPeerID2})
select {
case <-alerts:
t.Error("there should not be alerts for different peer")
default:
}
}