ipfs-cluster/monitor/pubsubmon/pubsubmon.go
Hector Sanjuan 7f60cb318c Pubsubmon: Gossipsub
License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
2018-10-10 16:00:37 +02:00

237 lines
5.0 KiB
Go

// Package pubsubmon implements a PeerMonitor component for IPFS Cluster that
// uses PubSub to send and receive metrics.
package pubsubmon
import (
"bytes"
"context"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/metrics"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
floodsub "github.com/libp2p/go-floodsub"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
msgpack "github.com/multiformats/go-multicodec/msgpack"
)
var logger = logging.Logger("monitor")
// PubsubTopic specifies the topic used to publish Cluster metrics.
var PubsubTopic = "monitor.metrics"
var msgpackHandle = msgpack.DefaultMsgpackHandle()
// Monitor is a component in charge of monitoring peers, logging
// metrics and detecting failures
type Monitor struct {
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
host host.Host
pubsub *floodsub.PubSub
subscription *floodsub.Subscription
metrics *metrics.Store
checker *metrics.Checker
config *Config
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// New creates a new PubSub monitor, using the given host and config.
func New(h host.Host, cfg *Config) (*Monitor, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
mtrs := metrics.NewStore()
checker := metrics.NewChecker(mtrs)
pubsub, err := floodsub.NewGossipSub(ctx, h)
if err != nil {
cancel()
return nil, err
}
subscription, err := pubsub.Subscribe(PubsubTopic)
if err != nil {
cancel()
return nil, err
}
mon := &Monitor{
ctx: ctx,
cancel: cancel,
rpcReady: make(chan struct{}, 1),
host: h,
pubsub: pubsub,
subscription: subscription,
metrics: mtrs,
checker: checker,
config: cfg,
}
go mon.run()
return mon, nil
}
func (mon *Monitor) run() {
select {
case <-mon.rpcReady:
go mon.logFromPubsub()
go mon.checker.Watch(mon.ctx, mon.getPeers, mon.config.CheckInterval)
case <-mon.ctx.Done():
}
}
// logFromPubsub logs metrics received in the subscribed topic.
func (mon *Monitor) logFromPubsub() {
for {
select {
case <-mon.ctx.Done():
return
default:
msg, err := mon.subscription.Next(mon.ctx)
if err != nil { // context cancelled enters here
continue
}
data := msg.GetData()
buf := bytes.NewBuffer(data)
dec := msgpack.Multicodec(msgpackHandle).Decoder(buf)
metric := api.Metric{}
err = dec.Decode(&metric)
if err != nil {
logger.Error(err)
continue
}
logger.Debugf(
"received pubsub metric '%s' from '%s'",
metric.Name,
metric.Peer,
)
err = mon.LogMetric(metric)
if err != nil {
logger.Error(err)
continue
}
}
}
}
// SetClient saves the given rpc.Client for later use
func (mon *Monitor) SetClient(c *rpc.Client) {
mon.rpcClient = c
mon.rpcReady <- struct{}{}
}
// Shutdown stops the peer monitor. It particular, it will
// not deliver any alerts.
func (mon *Monitor) Shutdown() error {
mon.shutdownLock.Lock()
defer mon.shutdownLock.Unlock()
if mon.shutdown {
logger.Warning("Monitor already shut down")
return nil
}
logger.Info("stopping Monitor")
close(mon.rpcReady)
mon.cancel()
mon.wg.Wait()
mon.shutdown = true
return nil
}
// LogMetric stores a metric so it can later be retrieved.
func (mon *Monitor) LogMetric(m api.Metric) error {
mon.metrics.Add(m)
logger.Debugf("pubsub mon logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire)
return nil
}
// PublishMetric broadcasts a metric to all current cluster peers.
func (mon *Monitor) PublishMetric(m api.Metric) error {
if m.Discard() {
logger.Warningf("discarding invalid metric: %+v", m)
return nil
}
var b bytes.Buffer
enc := msgpack.Multicodec(msgpackHandle).Encoder(&b)
err := enc.Encode(m)
if err != nil {
logger.Error(err)
return err
}
logger.Debugf(
"publishing metric %s to pubsub. Expires: %d",
m.Name,
m.Expire,
)
err = mon.pubsub.Publish(PubsubTopic, b.Bytes())
if err != nil {
logger.Error(err)
return err
}
return nil
}
// getPeers gets the current list of peers from the consensus component
func (mon *Monitor) getPeers() ([]peer.ID, error) {
var peers []peer.ID
err := mon.rpcClient.Call(
"",
"Cluster",
"ConsensusPeers",
struct{}{},
&peers,
)
if err != nil {
logger.Error(err)
}
return peers, err
}
// LatestMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peers.
func (mon *Monitor) LatestMetrics(name string) []api.Metric {
latest := mon.metrics.Latest(name)
// Make sure we only return metrics in the current peerset
peers, err := mon.getPeers()
if err != nil {
return []api.Metric{}
}
return metrics.PeersetFilter(latest, peers)
}
// Alerts returns a channel on which alerts are sent when the
// monitor detects a failure.
func (mon *Monitor) Alerts() <-chan api.Alert {
return mon.checker.Alerts()
}