2017-03-14 15:37:29 +00:00
|
|
|
// Package basic implements a basic PeerMonitor component for IPFS Cluster. This
|
|
|
|
// component is in charge of logging metrics and triggering alerts when a peer
|
|
|
|
// goes down.
|
2017-03-10 16:24:25 +00:00
|
|
|
package basic
|
2017-02-13 15:46:53 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"sync"
|
2017-02-28 15:01:26 +00:00
|
|
|
"time"
|
2017-02-13 15:46:53 +00:00
|
|
|
|
|
|
|
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
2017-03-10 16:24:25 +00:00
|
|
|
logging "github.com/ipfs/go-log"
|
2017-02-13 15:46:53 +00:00
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
|
|
)
|
|
|
|
|
2017-03-10 16:24:25 +00:00
|
|
|
var logger = logging.Logger("monitor")
|
|
|
|
|
2017-02-13 15:46:53 +00:00
|
|
|
// AlertChannelCap specifies how much buffer the alerts channel has.
|
|
|
|
var AlertChannelCap = 256
|
|
|
|
|
2017-02-28 15:01:26 +00:00
|
|
|
// WindowCap specifies how many metrics to keep for given host and metric type
|
|
|
|
var WindowCap = 10
|
|
|
|
|
2017-02-13 15:46:53 +00:00
|
|
|
// peerMetrics is just a circular queue
|
|
|
|
type peerMetrics struct {
|
|
|
|
last int
|
|
|
|
window []api.Metric
|
|
|
|
// mux sync.RWMutex
|
|
|
|
}
|
|
|
|
|
|
|
|
func newPeerMetrics(windowCap int) *peerMetrics {
|
|
|
|
w := make([]api.Metric, 0, windowCap)
|
|
|
|
return &peerMetrics{0, w}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (pmets *peerMetrics) add(m api.Metric) {
|
|
|
|
// pmets.mux.Lock()
|
|
|
|
// defer pmets.mux.Unlock()
|
|
|
|
if len(pmets.window) < cap(pmets.window) {
|
|
|
|
pmets.window = append(pmets.window, m)
|
|
|
|
pmets.last = len(pmets.window) - 1
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// len == cap
|
|
|
|
pmets.last = (pmets.last + 1) % cap(pmets.window)
|
|
|
|
pmets.window[pmets.last] = m
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (pmets *peerMetrics) latest() (api.Metric, error) {
|
|
|
|
// pmets.mux.RLock()
|
|
|
|
// defer pmets.mux.RUnlock()
|
|
|
|
if len(pmets.window) == 0 {
|
|
|
|
return api.Metric{}, errors.New("no metrics")
|
|
|
|
}
|
|
|
|
return pmets.window[pmets.last], nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ordered from newest to oldest
|
|
|
|
func (pmets *peerMetrics) all() []api.Metric {
|
|
|
|
// pmets.mux.RLock()
|
|
|
|
// pmets.mux.RUnlock()
|
|
|
|
wlen := len(pmets.window)
|
|
|
|
res := make([]api.Metric, 0, wlen)
|
|
|
|
if wlen == 0 {
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
for i := pmets.last; i >= 0; i-- {
|
|
|
|
res = append(res, pmets.window[i])
|
|
|
|
}
|
|
|
|
for i := wlen; i > pmets.last; i-- {
|
|
|
|
res = append(res, pmets.window[i])
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
|
|
|
type metricsByPeer map[peer.ID]*peerMetrics
|
|
|
|
|
|
|
|
// StdPeerMonitor is a component in charge of monitoring peers, logging
|
|
|
|
// metrics and detecting failures
|
|
|
|
type StdPeerMonitor struct {
|
|
|
|
ctx context.Context
|
|
|
|
cancel func()
|
|
|
|
rpcClient *rpc.Client
|
|
|
|
rpcReady chan struct{}
|
|
|
|
|
|
|
|
metrics map[string]metricsByPeer
|
|
|
|
metricsMux sync.RWMutex
|
|
|
|
windowCap int
|
|
|
|
|
|
|
|
alerts chan api.Alert
|
|
|
|
|
2017-02-28 15:01:26 +00:00
|
|
|
monitoringInterval int
|
|
|
|
|
2017-02-13 15:46:53 +00:00
|
|
|
shutdownLock sync.Mutex
|
|
|
|
shutdown bool
|
|
|
|
wg sync.WaitGroup
|
|
|
|
}
|
|
|
|
|
2017-02-28 15:01:26 +00:00
|
|
|
// NewStdPeerMonitor creates a new monitor. It receives the window capacity
|
|
|
|
// (how many metrics to keep for each peer and type of metric) and the
|
|
|
|
// monitoringInterval (interval between the checks that produce alerts)
|
|
|
|
// as parameters
|
2017-03-10 16:24:25 +00:00
|
|
|
func NewStdPeerMonitor(monIntervalSecs int) *StdPeerMonitor {
|
2017-02-28 15:01:26 +00:00
|
|
|
if WindowCap <= 0 {
|
2017-02-13 15:46:53 +00:00
|
|
|
panic("windowCap too small")
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
mon := &StdPeerMonitor{
|
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
rpcReady: make(chan struct{}, 1),
|
|
|
|
|
|
|
|
metrics: make(map[string]metricsByPeer),
|
2017-02-28 15:01:26 +00:00
|
|
|
windowCap: WindowCap,
|
|
|
|
alerts: make(chan api.Alert, AlertChannelCap),
|
|
|
|
|
2017-03-10 16:24:25 +00:00
|
|
|
monitoringInterval: monIntervalSecs,
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
go mon.run()
|
|
|
|
return mon
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mon *StdPeerMonitor) run() {
|
|
|
|
select {
|
|
|
|
case <-mon.rpcReady:
|
2017-02-28 15:01:26 +00:00
|
|
|
go mon.monitor()
|
2017-02-13 15:46:53 +00:00
|
|
|
case <-mon.ctx.Done():
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetClient saves the given rpc.Client for later use
|
|
|
|
func (mon *StdPeerMonitor) SetClient(c *rpc.Client) {
|
|
|
|
mon.rpcClient = c
|
|
|
|
mon.rpcReady <- struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown stops the peer monitor. It particular, it will
|
|
|
|
// not deliver any alerts.
|
|
|
|
func (mon *StdPeerMonitor) Shutdown() error {
|
|
|
|
mon.shutdownLock.Lock()
|
|
|
|
defer mon.shutdownLock.Unlock()
|
|
|
|
|
|
|
|
if mon.shutdown {
|
|
|
|
logger.Warning("StdPeerMonitor already shut down")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.Info("stopping StdPeerMonitor")
|
|
|
|
close(mon.rpcReady)
|
|
|
|
mon.cancel()
|
|
|
|
mon.wg.Wait()
|
|
|
|
mon.shutdown = true
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// LogMetric stores a metric so it can later be retrieved.
|
|
|
|
func (mon *StdPeerMonitor) LogMetric(m api.Metric) {
|
|
|
|
mon.metricsMux.Lock()
|
|
|
|
defer mon.metricsMux.Unlock()
|
|
|
|
name := m.Name
|
|
|
|
peer := m.Peer
|
|
|
|
mbyp, ok := mon.metrics[name]
|
|
|
|
if !ok {
|
|
|
|
mbyp = make(metricsByPeer)
|
|
|
|
mon.metrics[name] = mbyp
|
|
|
|
}
|
|
|
|
pmets, ok := mbyp[peer]
|
|
|
|
if !ok {
|
|
|
|
pmets = newPeerMetrics(mon.windowCap)
|
|
|
|
mbyp[peer] = pmets
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.Debugf("logged '%s' metric from '%s'", name, peer)
|
|
|
|
pmets.add(m)
|
|
|
|
}
|
|
|
|
|
|
|
|
// func (mon *StdPeerMonitor) getLastMetric(name string, p peer.ID) api.Metric {
|
|
|
|
// mon.metricsMux.RLock()
|
|
|
|
// defer mon.metricsMux.RUnlock()
|
|
|
|
|
|
|
|
// emptyMetric := api.Metric{
|
|
|
|
// Name: name,
|
|
|
|
// Peer: p,
|
|
|
|
// Valid: false,
|
|
|
|
// }
|
|
|
|
|
|
|
|
// mbyp, ok := mon.metrics[name]
|
|
|
|
// if !ok {
|
|
|
|
// return emptyMetric
|
|
|
|
// }
|
|
|
|
|
|
|
|
// pmets, ok := mbyp[p]
|
|
|
|
// if !ok {
|
|
|
|
// return emptyMetric
|
|
|
|
// }
|
|
|
|
// metric, err := pmets.latest()
|
|
|
|
// if err != nil {
|
|
|
|
// return emptyMetric
|
|
|
|
// }
|
|
|
|
// return metric
|
|
|
|
// }
|
|
|
|
|
|
|
|
// LastMetrics returns last known VALID metrics of a given type
|
|
|
|
func (mon *StdPeerMonitor) LastMetrics(name string) []api.Metric {
|
|
|
|
mon.metricsMux.RLock()
|
|
|
|
defer mon.metricsMux.RUnlock()
|
|
|
|
|
|
|
|
mbyp, ok := mon.metrics[name]
|
|
|
|
if !ok {
|
2017-03-14 12:45:17 +00:00
|
|
|
logger.Warningf("LastMetrics: No %s metrics", name)
|
2017-02-13 15:46:53 +00:00
|
|
|
return []api.Metric{}
|
|
|
|
}
|
|
|
|
|
|
|
|
metrics := make([]api.Metric, 0, len(mbyp))
|
|
|
|
|
|
|
|
for _, peerMetrics := range mbyp {
|
|
|
|
last, err := peerMetrics.latest()
|
|
|
|
if err != nil || last.Discard() {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
metrics = append(metrics, last)
|
|
|
|
}
|
|
|
|
return metrics
|
|
|
|
}
|
|
|
|
|
2017-02-15 14:16:16 +00:00
|
|
|
// Alerts returns a channel on which alerts are sent when the
|
2017-02-13 15:46:53 +00:00
|
|
|
// monitor detects a failure.
|
|
|
|
func (mon *StdPeerMonitor) Alerts() <-chan api.Alert {
|
|
|
|
return mon.alerts
|
|
|
|
}
|
2017-02-28 15:01:26 +00:00
|
|
|
|
|
|
|
func (mon *StdPeerMonitor) monitor() {
|
|
|
|
ticker := time.NewTicker(time.Second * time.Duration(mon.monitoringInterval))
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ticker.C:
|
|
|
|
logger.Debug("monitoring tick")
|
|
|
|
// Get current peers
|
|
|
|
var peers []peer.ID
|
|
|
|
err := mon.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"PeerManagerPeers",
|
|
|
|
struct{}{},
|
|
|
|
&peers)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
for k := range mon.metrics {
|
|
|
|
logger.Debug("check metrics ", k)
|
|
|
|
mon.checkMetrics(peers, k)
|
|
|
|
}
|
|
|
|
case <-mon.ctx.Done():
|
|
|
|
ticker.Stop()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// This is probably the place to implement some advanced ways of detecting down
|
|
|
|
// peers.
|
|
|
|
// Currently easy logic, just check that all peers have a valid metric.
|
|
|
|
func (mon *StdPeerMonitor) checkMetrics(peers []peer.ID, metricName string) {
|
|
|
|
mon.metricsMux.RLock()
|
|
|
|
defer mon.metricsMux.RUnlock()
|
|
|
|
|
|
|
|
// get metric windows for peers
|
|
|
|
metricsByPeer := mon.metrics[metricName]
|
|
|
|
|
|
|
|
// for each of the given current peers
|
|
|
|
for _, p := range peers {
|
|
|
|
// get metrics for that peer
|
|
|
|
pMetrics, ok := metricsByPeer[p]
|
|
|
|
if !ok { // no metrics from this peer
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
last, err := pMetrics.latest()
|
|
|
|
if err != nil { // no metrics for this peer
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// send alert if metric is expired (but was valid at some point)
|
|
|
|
if last.Valid && last.Expired() {
|
|
|
|
mon.sendAlert(p, metricName)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mon *StdPeerMonitor) sendAlert(p peer.ID, metricName string) {
|
|
|
|
alrt := api.Alert{
|
|
|
|
Peer: p,
|
|
|
|
MetricName: metricName,
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case mon.alerts <- alrt:
|
|
|
|
default:
|
|
|
|
logger.Error("alert channel is full")
|
|
|
|
}
|
|
|
|
}
|