ipfs-cluster/monitor/pubsubmon/pubsubmon.go
Hector Sanjuan adb15feb6e
Dependency upgrades (#1395)
* build(deps): bump github.com/multiformats/go-multiaddr-dns

Bumps [github.com/multiformats/go-multiaddr-dns](https://github.com/multiformats/go-multiaddr-dns) from 0.2.0 to 0.3.1.
- [Release notes](https://github.com/multiformats/go-multiaddr-dns/releases)
- [Commits](https://github.com/multiformats/go-multiaddr-dns/compare/v0.2.0...v0.3.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

* build(deps): bump github.com/hashicorp/go-hclog from 0.15.0 to 0.16.0

Bumps [github.com/hashicorp/go-hclog](https://github.com/hashicorp/go-hclog) from 0.15.0 to 0.16.0.
- [Release notes](https://github.com/hashicorp/go-hclog/releases)
- [Commits](https://github.com/hashicorp/go-hclog/compare/v0.15.0...v0.16.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

* build(deps): bump github.com/ipfs/go-unixfs from 0.2.4 to 0.2.5

Bumps [github.com/ipfs/go-unixfs](https://github.com/ipfs/go-unixfs) from 0.2.4 to 0.2.5.
- [Release notes](https://github.com/ipfs/go-unixfs/releases)
- [Commits](https://github.com/ipfs/go-unixfs/compare/v0.2.4...v0.2.5)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

* build(deps): bump github.com/libp2p/go-libp2p-peerstore

Bumps [github.com/libp2p/go-libp2p-peerstore](https://github.com/libp2p/go-libp2p-peerstore) from 0.2.6 to 0.2.7.
- [Release notes](https://github.com/libp2p/go-libp2p-peerstore/releases)
- [Commits](https://github.com/libp2p/go-libp2p-peerstore/compare/v0.2.6...v0.2.7)

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump go.uber.org/multierr from 1.6.0 to 1.7.0

Bumps [go.uber.org/multierr](https://github.com/uber-go/multierr) from 1.6.0 to 1.7.0.
- [Release notes](https://github.com/uber-go/multierr/releases)
- [Changelog](https://github.com/uber-go/multierr/blob/master/CHANGELOG.md)
- [Commits](https://github.com/uber-go/multierr/compare/v1.6.0...v1.7.0)

Signed-off-by: dependabot[bot] <support@github.com>

* Chore: update deps

* Update changelog

* Update to go1.16. Downgrade unixfs.

* go mod tidy

* travis: use go install

* golint no more

* Update configuration for dependabot

* Fix wrong dependabot config

* dependabot

* Revert update of go-unixfs

* Dependency upgrades

* Bump github.com/libp2p/go-libp2p-gorpc from 0.1.2 to 0.1.3

Bumps [github.com/libp2p/go-libp2p-gorpc](https://github.com/libp2p/go-libp2p-gorpc) from 0.1.2 to 0.1.3.
- [Release notes](https://github.com/libp2p/go-libp2p-gorpc/releases)
- [Commits](https://github.com/libp2p/go-libp2p-gorpc/compare/v0.1.2...v0.1.3)

---
updated-dependencies:
- dependency-name: github.com/libp2p/go-libp2p-gorpc
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Fix deprecated objects with prometheus

* chore: update dependencies

* monitor: remove dependency to go-multicodec

go-multicodec has been deprecated and it was just a wrapper.

This switches directly to ugorji/go/codec's msgpack for cluster metrics
serialization.

* Upgrade mfs so it works with latest go-unixfs

* Bump github.com/ugorji/go/codec from 1.2.5 to 1.2.6 (#1391)

Bumps [github.com/ugorji/go/codec](https://github.com/ugorji/go) from 1.2.5 to 1.2.6.
- [Release notes](https://github.com/ugorji/go/releases)
- [Commits](https://github.com/ugorji/go/compare/v1.2.5...v1.2.6)

---
updated-dependencies:
- dependency-name: github.com/ugorji/go/codec
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump github.com/hashicorp/go-hclog from 0.16.0 to 0.16.1 (#1392)

Bumps [github.com/hashicorp/go-hclog](https://github.com/hashicorp/go-hclog) from 0.16.0 to 0.16.1.
- [Release notes](https://github.com/hashicorp/go-hclog/releases)
- [Commits](https://github.com/hashicorp/go-hclog/compare/v0.16.0...v0.16.1)

---
updated-dependencies:
- dependency-name: github.com/hashicorp/go-hclog
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-07-06 16:47:04 +02:00

263 lines
5.8 KiB
Go

// Package pubsubmon implements a PeerMonitor component for IPFS Cluster that
// uses PubSub to send and receive metrics.
package pubsubmon
import (
"bytes"
"context"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/metrics"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
pubsub "github.com/libp2p/go-libp2p-pubsub"
gocodec "github.com/ugorji/go/codec"
"go.opencensus.io/trace"
)
var logger = logging.Logger("monitor")
// PubsubTopic specifies the topic used to publish Cluster metrics.
var PubsubTopic = "monitor.metrics"
var msgpackHandle = &gocodec.MsgpackHandle{}
// Monitor is a component in charge of monitoring peers, logging
// metrics and detecting failures
type Monitor struct {
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
pubsub *pubsub.PubSub
topic *pubsub.Topic
subscription *pubsub.Subscription
peers PeersFunc
metrics *metrics.Store
checker *metrics.Checker
config *Config
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// PeersFunc allows the Monitor to filter and discard metrics
// that do not belong to a given peerset.
type PeersFunc func(context.Context) ([]peer.ID, error)
// New creates a new PubSub monitor, using the given host, config and
// PeersFunc. The PeersFunc can be nil. In this case, no metric filtering is
// done based on peers (any peer is considered part of the peerset).
func New(
ctx context.Context,
cfg *Config,
psub *pubsub.PubSub,
peers PeersFunc,
) (*Monitor, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
mtrs := metrics.NewStore()
checker := metrics.NewChecker(ctx, mtrs, cfg.FailureThreshold)
topic, err := psub.Join(PubsubTopic)
if err != nil {
cancel()
return nil, err
}
subscription, err := topic.Subscribe()
if err != nil {
cancel()
return nil, err
}
mon := &Monitor{
ctx: ctx,
cancel: cancel,
rpcReady: make(chan struct{}, 1),
pubsub: psub,
topic: topic,
subscription: subscription,
peers: peers,
metrics: mtrs,
checker: checker,
config: cfg,
}
go mon.run()
return mon, nil
}
func (mon *Monitor) run() {
select {
case <-mon.rpcReady:
go mon.logFromPubsub()
go mon.checker.Watch(mon.ctx, mon.peers, mon.config.CheckInterval)
case <-mon.ctx.Done():
}
}
// logFromPubsub logs metrics received in the subscribed topic.
func (mon *Monitor) logFromPubsub() {
ctx, span := trace.StartSpan(mon.ctx, "monitor/pubsub/logFromPubsub")
defer span.End()
for {
select {
case <-ctx.Done():
return
default:
msg, err := mon.subscription.Next(ctx)
if err != nil { // context cancelled enters here
continue
}
data := msg.GetData()
buf := bytes.NewBuffer(data)
dec := gocodec.NewDecoder(buf, msgpackHandle)
metric := api.Metric{}
err = dec.Decode(&metric)
if err != nil {
logger.Error(err)
continue
}
logger.Debugf(
"received pubsub metric '%s' from '%s'",
metric.Name,
metric.Peer,
)
err = mon.LogMetric(ctx, &metric)
if err != nil {
logger.Error(err)
continue
}
}
}
}
// SetClient saves the given rpc.Client for later use
func (mon *Monitor) SetClient(c *rpc.Client) {
mon.rpcClient = c
mon.rpcReady <- struct{}{}
}
// Shutdown stops the peer monitor. It particular, it will
// not deliver any alerts.
func (mon *Monitor) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "monitor/pubsub/Shutdown")
defer span.End()
mon.shutdownLock.Lock()
defer mon.shutdownLock.Unlock()
if mon.shutdown {
logger.Warn("Monitor already shut down")
return nil
}
logger.Info("stopping Monitor")
close(mon.rpcReady)
mon.cancel()
mon.wg.Wait()
mon.shutdown = true
return nil
}
// LogMetric stores a metric so it can later be retrieved.
func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error {
_, span := trace.StartSpan(ctx, "monitor/pubsub/LogMetric")
defer span.End()
mon.metrics.Add(m)
logger.Debugf("pubsub mon logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire)
return nil
}
// PublishMetric broadcasts a metric to all current cluster peers.
func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error {
ctx, span := trace.StartSpan(ctx, "monitor/pubsub/PublishMetric")
defer span.End()
if m.Discard() {
logger.Warnf("discarding invalid metric: %+v", m)
return nil
}
var b bytes.Buffer
enc := gocodec.NewEncoder(&b, msgpackHandle)
err := enc.Encode(m)
if err != nil {
logger.Error(err)
return err
}
logger.Debugf(
"publishing metric %s to pubsub. Expires: %d",
m.Name,
m.Expire,
)
err = mon.topic.Publish(ctx, b.Bytes())
if err != nil {
logger.Error(err)
return err
}
return nil
}
// LatestMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peer.
func (mon *Monitor) LatestMetrics(ctx context.Context, name string) []*api.Metric {
ctx, span := trace.StartSpan(ctx, "monitor/pubsub/LatestMetrics")
defer span.End()
latest := mon.metrics.LatestValid(name)
if mon.peers == nil {
return latest
}
// Make sure we only return metrics in the current peerset if we have
// a peerset provider.
peers, err := mon.peers(ctx)
if err != nil {
return []*api.Metric{}
}
return metrics.PeersetFilter(latest, peers)
}
// Alerts returns a channel on which alerts are sent when the
// monitor detects a failure.
func (mon *Monitor) Alerts() <-chan *api.Alert {
return mon.checker.Alerts()
}
// MetricNames lists all metric names.
func (mon *Monitor) MetricNames(ctx context.Context) []string {
_, span := trace.StartSpan(ctx, "monitor/pubsub/MetricNames")
defer span.End()
return mon.metrics.MetricNames()
}