2018-05-08 09:38:12 +00:00
|
|
|
package metrics
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
2019-06-10 04:41:39 +00:00
|
|
|
"sync"
|
2018-05-08 09:38:12 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
2019-04-26 06:33:01 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/observations"
|
2019-06-14 10:41:11 +00:00
|
|
|
|
|
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
|
2019-04-26 06:33:01 +00:00
|
|
|
"go.opencensus.io/stats"
|
2019-04-29 07:58:28 +00:00
|
|
|
"go.opencensus.io/tag"
|
2018-05-08 09:38:12 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// AlertChannelCap specifies how much buffer the alerts channel has.
|
|
|
|
var AlertChannelCap = 256
|
|
|
|
|
2019-06-10 11:54:48 +00:00
|
|
|
// MaxAlertThreshold specifies how many alerts will occur per a peer is
|
2019-09-13 01:01:15 +00:00
|
|
|
// removed from the list of monitored peers.
|
2019-06-10 12:09:06 +00:00
|
|
|
var MaxAlertThreshold = 1
|
2019-06-10 11:54:48 +00:00
|
|
|
|
2018-05-08 09:38:12 +00:00
|
|
|
// ErrAlertChannelFull is returned if the alert channel is full.
|
|
|
|
var ErrAlertChannelFull = errors.New("alert channel is full")
|
|
|
|
|
2019-06-27 15:55:51 +00:00
|
|
|
// accrualMetricsNum represents the number metrics required for
|
|
|
|
// accrual to function appropriately, and under which we use
|
|
|
|
// TTL to determine whether a peer may have failed.
|
|
|
|
var accrualMetricsNum = 6
|
|
|
|
|
2018-05-08 09:38:12 +00:00
|
|
|
// Checker provides utilities to find expired metrics
|
|
|
|
// for a given peerset and send alerts if it proceeds to do so.
|
|
|
|
type Checker struct {
|
2019-04-26 06:33:01 +00:00
|
|
|
ctx context.Context
|
2019-03-11 09:00:21 +00:00
|
|
|
alertCh chan *api.Alert
|
|
|
|
metrics *Store
|
|
|
|
threshold float64
|
2019-06-10 04:41:39 +00:00
|
|
|
|
|
|
|
failedPeersMu sync.Mutex
|
2019-06-11 08:53:12 +00:00
|
|
|
failedPeers map[peer.ID]map[string]int
|
2018-05-08 09:38:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewChecker creates a Checker using the given
|
2019-04-17 06:39:00 +00:00
|
|
|
// MetricsStore. The threshold value indicates when a
|
|
|
|
// monitored component should be considered to have failed.
|
|
|
|
// The greater the threshold value the more leniency is granted.
|
|
|
|
//
|
|
|
|
// A value between 2.0 and 4.0 is suggested for the threshold.
|
2019-04-26 06:33:01 +00:00
|
|
|
func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker {
|
2018-05-08 09:38:12 +00:00
|
|
|
return &Checker{
|
2019-06-10 04:41:39 +00:00
|
|
|
ctx: ctx,
|
|
|
|
alertCh: make(chan *api.Alert, AlertChannelCap),
|
|
|
|
metrics: metrics,
|
|
|
|
threshold: threshold,
|
2019-06-11 08:53:12 +00:00
|
|
|
failedPeers: make(map[peer.ID]map[string]int),
|
2018-05-08 09:38:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-04-23 10:30:26 +00:00
|
|
|
// CheckPeers will trigger alerts based on the latest metrics from the given peerset
|
2019-02-20 14:24:25 +00:00
|
|
|
// when they have expired and no alert has been sent before.
|
2018-05-08 09:38:12 +00:00
|
|
|
func (mc *Checker) CheckPeers(peers []peer.ID) error {
|
2019-06-23 08:05:32 +00:00
|
|
|
for _, name := range mc.metrics.MetricNames() {
|
|
|
|
for _, peer := range peers {
|
|
|
|
for _, metric := range mc.metrics.PeerMetricAll(name, peer) {
|
|
|
|
if mc.FailedMetric(metric.Name, peer) {
|
|
|
|
err := mc.alert(peer, metric.Name)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-04-23 10:30:26 +00:00
|
|
|
}
|
|
|
|
}
|
2018-05-08 09:38:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-02-20 14:24:25 +00:00
|
|
|
// CheckAll will trigger alerts for all latest metrics when they have expired
|
|
|
|
// and no alert has been sent before.
|
2019-06-10 11:54:48 +00:00
|
|
|
func (mc *Checker) CheckAll() error {
|
2019-02-20 14:24:25 +00:00
|
|
|
for _, metric := range mc.metrics.AllMetrics() {
|
2019-06-23 08:05:32 +00:00
|
|
|
if mc.FailedMetric(metric.Name, metric.Peer) {
|
2019-04-30 02:06:01 +00:00
|
|
|
err := mc.alert(metric.Peer, metric.Name)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-02-20 14:24:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-05-08 09:38:12 +00:00
|
|
|
func (mc *Checker) alert(pid peer.ID, metricName string) error {
|
2019-06-10 04:41:39 +00:00
|
|
|
mc.failedPeersMu.Lock()
|
2019-06-10 11:54:48 +00:00
|
|
|
defer mc.failedPeersMu.Unlock()
|
2019-06-11 08:53:12 +00:00
|
|
|
|
2019-06-17 09:37:58 +00:00
|
|
|
if _, ok := mc.failedPeers[pid]; !ok {
|
2019-06-11 08:53:12 +00:00
|
|
|
mc.failedPeers[pid] = make(map[string]int)
|
|
|
|
}
|
|
|
|
failedMetrics := mc.failedPeers[pid]
|
|
|
|
|
|
|
|
// If above threshold, remove all metrics for that peer
|
|
|
|
// and clean up failedPeers when no failed metrics are left.
|
|
|
|
if failedMetrics[metricName] >= MaxAlertThreshold {
|
|
|
|
mc.metrics.RemovePeerMetrics(pid, metricName)
|
|
|
|
delete(failedMetrics, metricName)
|
|
|
|
if len(mc.failedPeers[pid]) == 0 {
|
|
|
|
delete(mc.failedPeers, pid)
|
|
|
|
}
|
2019-06-10 04:41:39 +00:00
|
|
|
return nil
|
|
|
|
}
|
2019-06-11 08:53:12 +00:00
|
|
|
|
|
|
|
failedMetrics[metricName]++
|
2019-06-10 04:41:39 +00:00
|
|
|
|
2019-02-27 17:04:35 +00:00
|
|
|
alrt := &api.Alert{
|
2018-05-08 09:38:12 +00:00
|
|
|
Peer: pid,
|
|
|
|
MetricName: metricName,
|
2019-12-13 06:55:28 +00:00
|
|
|
Expiry: time.Now().Add(30 * time.Second).UnixNano(),
|
2018-05-08 09:38:12 +00:00
|
|
|
}
|
|
|
|
select {
|
|
|
|
case mc.alertCh <- alrt:
|
2019-04-29 07:58:28 +00:00
|
|
|
stats.RecordWithTags(
|
|
|
|
mc.ctx,
|
|
|
|
[]tag.Mutator{tag.Upsert(observations.RemotePeerKey, pid.Pretty())},
|
|
|
|
observations.Alerts.M(1),
|
|
|
|
)
|
2018-05-08 09:38:12 +00:00
|
|
|
default:
|
|
|
|
return ErrAlertChannelFull
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Alerts returns a channel which gets notified by CheckPeers.
|
2019-02-27 17:04:35 +00:00
|
|
|
func (mc *Checker) Alerts() <-chan *api.Alert {
|
2018-05-08 09:38:12 +00:00
|
|
|
return mc.alertCh
|
|
|
|
}
|
|
|
|
|
|
|
|
// Watch will trigger regular CheckPeers on the given interval. It will call
|
|
|
|
// peersF to obtain a peerset. It can be stopped by cancelling the context.
|
|
|
|
// Usually you want to launch this in a goroutine.
|
2018-06-27 04:03:15 +00:00
|
|
|
func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]peer.ID, error), interval time.Duration) {
|
2018-05-08 09:38:12 +00:00
|
|
|
ticker := time.NewTicker(interval)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ticker.C:
|
2019-02-20 14:24:25 +00:00
|
|
|
if peersF != nil {
|
|
|
|
peers, err := peersF(ctx)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
mc.CheckPeers(peers)
|
|
|
|
} else {
|
|
|
|
mc.CheckAll()
|
2018-05-08 09:38:12 +00:00
|
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
|
|
ticker.Stop()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-03-11 09:00:21 +00:00
|
|
|
|
2019-06-23 08:05:32 +00:00
|
|
|
// FailedMetric returns if a peer is marked as failed for a particular metric.
|
2019-04-23 10:30:26 +00:00
|
|
|
func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
|
2019-06-17 09:37:58 +00:00
|
|
|
_, _, _, result := mc.failed(metric, pid)
|
2019-04-17 06:39:00 +00:00
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
// failed returns all the values involved in making the decision
|
2019-06-10 11:30:34 +00:00
|
|
|
// as to whether a peer has failed or not. The debugging parameter
|
|
|
|
// enables a more computation heavy path of the function but
|
|
|
|
// allows for insight into the return phi value.
|
2019-06-17 09:37:58 +00:00
|
|
|
func (mc *Checker) failed(metric string, pid peer.ID) (float64, []float64, float64, bool) {
|
2019-04-23 10:30:26 +00:00
|
|
|
latest := mc.metrics.PeerLatest(metric, pid)
|
2019-03-11 09:00:21 +00:00
|
|
|
if latest == nil {
|
2019-04-17 06:39:00 +00:00
|
|
|
return 0.0, nil, 0.0, true
|
2019-03-11 09:00:21 +00:00
|
|
|
}
|
2019-06-10 11:30:34 +00:00
|
|
|
|
2019-09-13 01:01:15 +00:00
|
|
|
// A peer is never failed if the latest metric from it has
|
2019-06-27 15:55:51 +00:00
|
|
|
// not expired or we do not have enough number of metrics
|
|
|
|
// for accrual detection
|
|
|
|
if !latest.Expired() {
|
2019-06-10 11:30:34 +00:00
|
|
|
return 0.0, nil, 0.0, false
|
2019-06-12 11:30:14 +00:00
|
|
|
}
|
2019-06-27 15:55:51 +00:00
|
|
|
// The latest metric has expired
|
2019-06-10 11:30:34 +00:00
|
|
|
|
2019-06-27 15:55:51 +00:00
|
|
|
pmtrs := mc.metrics.PeerMetricAll(metric, pid)
|
|
|
|
// Not enough values for accrual and metric expired. Peer failed.
|
|
|
|
if len(pmtrs) < accrualMetricsNum {
|
|
|
|
return 0.0, nil, 0.0, true
|
2019-06-25 10:54:41 +00:00
|
|
|
}
|
|
|
|
|
2019-03-21 06:34:09 +00:00
|
|
|
v := time.Now().UnixNano() - latest.ReceivedAt
|
2019-04-23 10:30:26 +00:00
|
|
|
dv := mc.metrics.Distribution(metric, pid)
|
2019-06-27 15:55:51 +00:00
|
|
|
phiv := phi(float64(v), dv)
|
|
|
|
return float64(v), dv, phiv, phiv >= mc.threshold
|
2019-03-11 09:00:21 +00:00
|
|
|
}
|