ipfs-cluster/monitor/pubsubmon/pubsubmon.go
Hector Sanjuan b6a46cd8a4 allocator: rework the whole allocator system
The new "metrics" allocator is about to partition metrics and distribe
allocations among the partitions.

For example: given a region, an availability zone and free space on disk, the
allocator would be able to choose allocations by distributing among regions
and availability zones as much as possible, and for those peers in the same
region/az, selecting those with most free space first.

This requires a major overhaul of the allocator component.
2021-09-13 12:24:00 +02:00

288 lines
6.5 KiB
Go

// Package pubsubmon implements a PeerMonitor component for IPFS Cluster that
// uses PubSub to send and receive metrics.
package pubsubmon
import (
"bytes"
"context"
"time"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/metrics"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
pubsub "github.com/libp2p/go-libp2p-pubsub"
gocodec "github.com/ugorji/go/codec"
"go.opencensus.io/trace"
)
var logger = logging.Logger("monitor")
// PubsubTopic specifies the topic used to publish Cluster metrics.
var PubsubTopic = "monitor.metrics"
var msgpackHandle = &gocodec.MsgpackHandle{}
// Monitor is a component in charge of monitoring peers, logging
// metrics and detecting failures
type Monitor struct {
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
pubsub *pubsub.PubSub
topic *pubsub.Topic
subscription *pubsub.Subscription
peers PeersFunc
metrics *metrics.Store
checker *metrics.Checker
config *Config
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// PeersFunc allows the Monitor to filter and discard metrics
// that do not belong to a given peerset.
type PeersFunc func(context.Context) ([]peer.ID, error)
// New creates a new PubSub monitor, using the given host, config and
// PeersFunc. The PeersFunc can be nil. In this case, no metric filtering is
// done based on peers (any peer is considered part of the peerset).
func New(
ctx context.Context,
cfg *Config,
psub *pubsub.PubSub,
peers PeersFunc,
) (*Monitor, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
mtrs := metrics.NewStore()
checker := metrics.NewChecker(ctx, mtrs, cfg.FailureThreshold)
topic, err := psub.Join(PubsubTopic)
if err != nil {
cancel()
return nil, err
}
subscription, err := topic.Subscribe()
if err != nil {
cancel()
return nil, err
}
mon := &Monitor{
ctx: ctx,
cancel: cancel,
rpcReady: make(chan struct{}, 1),
pubsub: psub,
topic: topic,
subscription: subscription,
peers: peers,
metrics: mtrs,
checker: checker,
config: cfg,
}
go mon.run()
return mon, nil
}
func (mon *Monitor) run() {
select {
case <-mon.rpcReady:
go mon.logFromPubsub()
go mon.checker.Watch(mon.ctx, mon.peers, mon.config.CheckInterval)
case <-mon.ctx.Done():
}
}
// logFromPubsub logs metrics received in the subscribed topic.
func (mon *Monitor) logFromPubsub() {
ctx, span := trace.StartSpan(mon.ctx, "monitor/pubsub/logFromPubsub")
defer span.End()
decodeWarningPrinted := false
// Previous versions use multicodec with the following header, which
// we need to remove.
multicodecPrefix := append([]byte{byte(9)}, []byte("/msgpack\n")...)
for {
select {
case <-ctx.Done():
return
default:
msg, err := mon.subscription.Next(ctx)
if err != nil { // context cancelled enters here
continue
}
data := msg.GetData()
buf := bytes.NewBuffer(data)
dec := gocodec.NewDecoder(buf, msgpackHandle)
metric := api.Metric{}
err = dec.Decode(&metric)
if err != nil {
if bytes.HasPrefix(data, multicodecPrefix) {
buf := bytes.NewBuffer(data[len(multicodecPrefix):])
dec := gocodec.NewDecoder(buf, msgpackHandle)
err = dec.Decode(&metric)
if err != nil {
logger.Error(err)
continue
}
// managed to decode an older version metric. Warn about it once.
if !decodeWarningPrinted {
logger.Warning("Peers in versions <= v0.13.3 detected. These peers will not receive metrics from this or other newer peers. Please upgrade them.")
decodeWarningPrinted = true
}
} else {
logger.Error(err)
continue
}
}
debug("recieved", &metric)
err = mon.LogMetric(ctx, &metric)
if err != nil {
logger.Error(err)
continue
}
}
}
}
// SetClient saves the given rpc.Client for later use
func (mon *Monitor) SetClient(c *rpc.Client) {
mon.rpcClient = c
mon.rpcReady <- struct{}{}
}
// Shutdown stops the peer monitor. It particular, it will
// not deliver any alerts.
func (mon *Monitor) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "monitor/pubsub/Shutdown")
defer span.End()
mon.shutdownLock.Lock()
defer mon.shutdownLock.Unlock()
if mon.shutdown {
logger.Warn("Monitor already shut down")
return nil
}
logger.Info("stopping Monitor")
close(mon.rpcReady)
mon.cancel()
mon.wg.Wait()
mon.shutdown = true
return nil
}
// LogMetric stores a metric so it can later be retrieved.
func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error {
_, span := trace.StartSpan(ctx, "monitor/pubsub/LogMetric")
defer span.End()
mon.metrics.Add(m)
debug("logged", m)
return nil
}
// PublishMetric broadcasts a metric to all current cluster peers.
func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error {
ctx, span := trace.StartSpan(ctx, "monitor/pubsub/PublishMetric")
defer span.End()
if m.Discard() {
logger.Warnf("discarding invalid metric: %+v", m)
return nil
}
var b bytes.Buffer
enc := gocodec.NewEncoder(&b, msgpackHandle)
err := enc.Encode(m)
if err != nil {
logger.Error(err)
return err
}
debug("publish", m)
err = mon.topic.Publish(ctx, b.Bytes())
if err != nil {
logger.Error(err)
return err
}
return nil
}
// LatestMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peer.
func (mon *Monitor) LatestMetrics(ctx context.Context, name string) []*api.Metric {
ctx, span := trace.StartSpan(ctx, "monitor/pubsub/LatestMetrics")
defer span.End()
latest := mon.metrics.LatestValid(name)
if mon.peers == nil {
return latest
}
// Make sure we only return metrics in the current peerset if we have
// a peerset provider.
peers, err := mon.peers(ctx)
if err != nil {
return []*api.Metric{}
}
return metrics.PeersetFilter(latest, peers)
}
// Alerts returns a channel on which alerts are sent when the
// monitor detects a failure.
func (mon *Monitor) Alerts() <-chan *api.Alert {
return mon.checker.Alerts()
}
// MetricNames lists all metric names.
func (mon *Monitor) MetricNames(ctx context.Context) []string {
_, span := trace.StartSpan(ctx, "monitor/pubsub/MetricNames")
defer span.End()
return mon.metrics.MetricNames()
}
func debug(event string, m *api.Metric) {
logger.Debugf(
"%s metric: '%s' - '%s' - '%s' - '%s'",
event,
m.Peer,
m.Name,
m.Value,
time.Unix(0, m.Expire),
)
}