ipfs-cluster/monitor/metrics/checker.go

194 lines
4.9 KiB
Go
Raw Normal View History

package metrics
import (
"context"
"errors"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
peer "github.com/libp2p/go-libp2p-peer"
)
// AlertChannelCap specifies how much buffer the alerts channel has.
var AlertChannelCap = 256
// MaxAlertThreshold specifies how many alerts will occur per a peer is
// removed the list of monitored peers.
2019-06-10 12:09:06 +00:00
var MaxAlertThreshold = 1
// ErrAlertChannelFull is returned if the alert channel is full.
var ErrAlertChannelFull = errors.New("alert channel is full")
// Checker provides utilities to find expired metrics
// for a given peerset and send alerts if it proceeds to do so.
type Checker struct {
ctx context.Context
alertCh chan *api.Alert
metrics *Store
threshold float64
alertThreshold int
failedPeersMu sync.Mutex
failedPeers map[peer.ID]int
}
// NewChecker creates a Checker using the given
// MetricsStore. The threshold value indicates when a
// monitored component should be considered to have failed.
// The greater the threshold value the more leniency is granted.
//
// A value between 2.0 and 4.0 is suggested for the threshold.
func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker {
return &Checker{
ctx: ctx,
alertCh: make(chan *api.Alert, AlertChannelCap),
metrics: metrics,
threshold: threshold,
failedPeers: make(map[peer.ID]int),
}
}
// CheckPeers will trigger alerts based on the latest metrics from the given peerset
Consensus: add new "crdt" consensus component This adds a new "crdt" consensus component using go-ds-crdt. This implies several refactors to fully make cluster consensus-component independent: * Delete mapstate and fully adopt dsstate (after people have migrated). * Return errors from state methods rather than ignoring them. * Add a new "datastore" modules so that we can configure datastores in the main configuration like other components. * Let the consensus components fully define the "state.State". Thus, they do not receive the state, they receive the storage where we put the state (a go-datastore). * Allow to customize how the monitor component obtains Peers() (the current peerset), including avoiding using the current peerset. At the moment the crdt consensus uses the monitoring component to define the current peerset. Therefore the monitor component cannot rely on the consensus component to produce a peerset. * Re-factor/re-implementation of "ipfs-cluster-service state" operations. Includes the dissapearance of the "migrate" one. The CRDT consensus component defines creates a crdt-datastore (with ipfs-lite) and uses it to intitialize a dssate. Thus the crdt-store is elegantly wrapped. Any modifications to the state get automatically replicated to other peers. We store all the CRDT DAG blocks in the local datastore. The consensus components only expose a ReadOnly state, as any modifications to the shared state should happen through them. DHT and PubSub facilities must now be created outside of Cluster and passed in so they can be re-used by different components.
2019-02-20 14:24:25 +00:00
// when they have expired and no alert has been sent before.
func (mc *Checker) CheckPeers(peers []peer.ID) error {
for _, peer := range peers {
for _, metric := range mc.metrics.PeerMetrics(peer) {
if mc.FailedMetric(metric.Name, peer) {
err := mc.alert(peer, metric.Name)
if err != nil {
return err
}
}
}
}
return nil
}
Consensus: add new "crdt" consensus component This adds a new "crdt" consensus component using go-ds-crdt. This implies several refactors to fully make cluster consensus-component independent: * Delete mapstate and fully adopt dsstate (after people have migrated). * Return errors from state methods rather than ignoring them. * Add a new "datastore" modules so that we can configure datastores in the main configuration like other components. * Let the consensus components fully define the "state.State". Thus, they do not receive the state, they receive the storage where we put the state (a go-datastore). * Allow to customize how the monitor component obtains Peers() (the current peerset), including avoiding using the current peerset. At the moment the crdt consensus uses the monitoring component to define the current peerset. Therefore the monitor component cannot rely on the consensus component to produce a peerset. * Re-factor/re-implementation of "ipfs-cluster-service state" operations. Includes the dissapearance of the "migrate" one. The CRDT consensus component defines creates a crdt-datastore (with ipfs-lite) and uses it to intitialize a dssate. Thus the crdt-store is elegantly wrapped. Any modifications to the state get automatically replicated to other peers. We store all the CRDT DAG blocks in the local datastore. The consensus components only expose a ReadOnly state, as any modifications to the shared state should happen through them. DHT and PubSub facilities must now be created outside of Cluster and passed in so they can be re-used by different components.
2019-02-20 14:24:25 +00:00
// CheckAll will trigger alerts for all latest metrics when they have expired
// and no alert has been sent before.
func (mc *Checker) CheckAll() error {
Consensus: add new "crdt" consensus component This adds a new "crdt" consensus component using go-ds-crdt. This implies several refactors to fully make cluster consensus-component independent: * Delete mapstate and fully adopt dsstate (after people have migrated). * Return errors from state methods rather than ignoring them. * Add a new "datastore" modules so that we can configure datastores in the main configuration like other components. * Let the consensus components fully define the "state.State". Thus, they do not receive the state, they receive the storage where we put the state (a go-datastore). * Allow to customize how the monitor component obtains Peers() (the current peerset), including avoiding using the current peerset. At the moment the crdt consensus uses the monitoring component to define the current peerset. Therefore the monitor component cannot rely on the consensus component to produce a peerset. * Re-factor/re-implementation of "ipfs-cluster-service state" operations. Includes the dissapearance of the "migrate" one. The CRDT consensus component defines creates a crdt-datastore (with ipfs-lite) and uses it to intitialize a dssate. Thus the crdt-store is elegantly wrapped. Any modifications to the state get automatically replicated to other peers. We store all the CRDT DAG blocks in the local datastore. The consensus components only expose a ReadOnly state, as any modifications to the shared state should happen through them. DHT and PubSub facilities must now be created outside of Cluster and passed in so they can be re-used by different components.
2019-02-20 14:24:25 +00:00
for _, metric := range mc.metrics.AllMetrics() {
if mc.FailedMetric(metric.Name, metric.Peer) {
err := mc.alert(metric.Peer, metric.Name)
if err != nil {
return err
}
Consensus: add new "crdt" consensus component This adds a new "crdt" consensus component using go-ds-crdt. This implies several refactors to fully make cluster consensus-component independent: * Delete mapstate and fully adopt dsstate (after people have migrated). * Return errors from state methods rather than ignoring them. * Add a new "datastore" modules so that we can configure datastores in the main configuration like other components. * Let the consensus components fully define the "state.State". Thus, they do not receive the state, they receive the storage where we put the state (a go-datastore). * Allow to customize how the monitor component obtains Peers() (the current peerset), including avoiding using the current peerset. At the moment the crdt consensus uses the monitoring component to define the current peerset. Therefore the monitor component cannot rely on the consensus component to produce a peerset. * Re-factor/re-implementation of "ipfs-cluster-service state" operations. Includes the dissapearance of the "migrate" one. The CRDT consensus component defines creates a crdt-datastore (with ipfs-lite) and uses it to intitialize a dssate. Thus the crdt-store is elegantly wrapped. Any modifications to the state get automatically replicated to other peers. We store all the CRDT DAG blocks in the local datastore. The consensus components only expose a ReadOnly state, as any modifications to the shared state should happen through them. DHT and PubSub facilities must now be created outside of Cluster and passed in so they can be re-used by different components.
2019-02-20 14:24:25 +00:00
}
}
return nil
}
func (mc *Checker) alertIfExpired(metric *api.Metric) error {
if !metric.Expired() {
return nil
}
err := mc.alert(metric.Peer, metric.Name)
if err != nil {
return err
}
metric.Valid = false
mc.metrics.Add(metric) // invalidate so we don't alert again
return nil
}
func (mc *Checker) alert(pid peer.ID, metricName string) error {
mc.failedPeersMu.Lock()
defer mc.failedPeersMu.Unlock()
if mc.failedPeers[pid] >= MaxAlertThreshold {
mc.metrics.RemovePeer(pid)
delete(mc.failedPeers, pid)
return nil
}
mc.failedPeers[pid]++
alrt := &api.Alert{
Peer: pid,
MetricName: metricName,
}
select {
case mc.alertCh <- alrt:
stats.RecordWithTags(
mc.ctx,
[]tag.Mutator{tag.Upsert(observations.RemotePeerKey, pid.Pretty())},
observations.Alerts.M(1),
)
default:
return ErrAlertChannelFull
}
return nil
}
// Alerts returns a channel which gets notified by CheckPeers.
func (mc *Checker) Alerts() <-chan *api.Alert {
return mc.alertCh
}
// Watch will trigger regular CheckPeers on the given interval. It will call
// peersF to obtain a peerset. It can be stopped by cancelling the context.
// Usually you want to launch this in a goroutine.
func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]peer.ID, error), interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
Consensus: add new "crdt" consensus component This adds a new "crdt" consensus component using go-ds-crdt. This implies several refactors to fully make cluster consensus-component independent: * Delete mapstate and fully adopt dsstate (after people have migrated). * Return errors from state methods rather than ignoring them. * Add a new "datastore" modules so that we can configure datastores in the main configuration like other components. * Let the consensus components fully define the "state.State". Thus, they do not receive the state, they receive the storage where we put the state (a go-datastore). * Allow to customize how the monitor component obtains Peers() (the current peerset), including avoiding using the current peerset. At the moment the crdt consensus uses the monitoring component to define the current peerset. Therefore the monitor component cannot rely on the consensus component to produce a peerset. * Re-factor/re-implementation of "ipfs-cluster-service state" operations. Includes the dissapearance of the "migrate" one. The CRDT consensus component defines creates a crdt-datastore (with ipfs-lite) and uses it to intitialize a dssate. Thus the crdt-store is elegantly wrapped. Any modifications to the state get automatically replicated to other peers. We store all the CRDT DAG blocks in the local datastore. The consensus components only expose a ReadOnly state, as any modifications to the shared state should happen through them. DHT and PubSub facilities must now be created outside of Cluster and passed in so they can be re-used by different components.
2019-02-20 14:24:25 +00:00
if peersF != nil {
peers, err := peersF(ctx)
if err != nil {
continue
}
mc.CheckPeers(peers)
} else {
mc.CheckAll()
}
case <-ctx.Done():
ticker.Stop()
return
}
}
}
// Failed returns true if a peer has potentially failed.
// Peers that are not present in the metrics store will return
// as failed.
func (mc *Checker) Failed(pid peer.ID) bool {
_, _, _, result := mc.failed("ping", pid)
return result
}
// FailedMetric is the same as Failed but can use any metric type,
// not just ping.
func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
_, _, _, result := mc.failed(metric, pid)
return result
}
// failed returns all the values involved in making the decision
// as to whether a peer has failed or not. This mainly for debugging
// purposes.
func (mc *Checker) failed(metric string, pid peer.ID) (float64, []float64, float64, bool) {
latest := mc.metrics.PeerLatest(metric, pid)
if latest == nil {
return 0.0, nil, 0.0, true
}
v := time.Now().UnixNano() - latest.ReceivedAt
dv := mc.metrics.Distribution(metric, pid)
// one metric isn't enough to calculate a distribution
// alerting/failure detection will fallback to the metric-expiring
// method
switch {
case len(dv) < 5 && !latest.Expired():
return float64(v), dv, 0.0, false
default:
phiv := phi(float64(v), dv)
return float64(v), dv, phiv, phiv >= mc.threshold
}
}