From 6f84b3bb010f714366663fe9987586470f9305ad Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 7 May 2018 14:33:16 +0200 Subject: [PATCH] Add new pubsubmon: A monitor that uses pubsub to send and receive metrics License: MIT Signed-off-by: Hector Sanjuan --- monitor/pubsubmon/config.go | 72 +++++++ monitor/pubsubmon/config_test.go | 57 +++++ monitor/pubsubmon/pubsubmon.go | 320 ++++++++++++++++++++++++++++ monitor/pubsubmon/pubsubmon_test.go | 273 ++++++++++++++++++++++++ 4 files changed, 722 insertions(+) create mode 100644 monitor/pubsubmon/config.go create mode 100644 monitor/pubsubmon/config_test.go create mode 100644 monitor/pubsubmon/pubsubmon.go create mode 100644 monitor/pubsubmon/pubsubmon_test.go diff --git a/monitor/pubsubmon/config.go b/monitor/pubsubmon/config.go new file mode 100644 index 00000000..1a21e633 --- /dev/null +++ b/monitor/pubsubmon/config.go @@ -0,0 +1,72 @@ +package pubsubmon + +import ( + "encoding/json" + "errors" + "time" + + "github.com/ipfs/ipfs-cluster/config" +) + +const configKey = "pubsubmon" + +// Default values for this Config. +const ( + DefaultCheckInterval = 15 * time.Second +) + +// Config allows to initialize a Monitor and customize some parameters. +type Config struct { + config.Saver + + CheckInterval time.Duration +} + +type jsonConfig struct { + CheckInterval string `json:"check_interval"` +} + +// ConfigKey provides a human-friendly identifier for this type of Config. +func (cfg *Config) ConfigKey() string { + return configKey +} + +// Default sets the fields of this Config to sensible values. +func (cfg *Config) Default() error { + cfg.CheckInterval = DefaultCheckInterval + return nil +} + +// Validate checks that the fields of this Config have working values, +// at least in appearance. +func (cfg *Config) Validate() error { + if cfg.CheckInterval <= 0 { + return errors.New("basic.check_interval too low") + } + return nil +} + +// LoadJSON sets the fields of this Config to the values defined by the JSON +// representation of it, as generated by ToJSON. +func (cfg *Config) LoadJSON(raw []byte) error { + jcfg := &jsonConfig{} + err := json.Unmarshal(raw, jcfg) + if err != nil { + logger.Error("Error unmarshaling basic monitor config") + return err + } + + interval, _ := time.ParseDuration(jcfg.CheckInterval) + cfg.CheckInterval = interval + + return cfg.Validate() +} + +// ToJSON generates a human-friendly JSON representation of this Config. +func (cfg *Config) ToJSON() ([]byte, error) { + jcfg := &jsonConfig{} + + jcfg.CheckInterval = cfg.CheckInterval.String() + + return json.MarshalIndent(jcfg, "", " ") +} diff --git a/monitor/pubsubmon/config_test.go b/monitor/pubsubmon/config_test.go new file mode 100644 index 00000000..e883800f --- /dev/null +++ b/monitor/pubsubmon/config_test.go @@ -0,0 +1,57 @@ +package pubsubmon + +import ( + "encoding/json" + "testing" +) + +var cfgJSON = []byte(` +{ + "check_interval": "15s" +} +`) + +func TestLoadJSON(t *testing.T) { + cfg := &Config{} + err := cfg.LoadJSON(cfgJSON) + if err != nil { + t.Fatal(err) + } + + j := &jsonConfig{} + + json.Unmarshal(cfgJSON, j) + j.CheckInterval = "-10" + tst, _ := json.Marshal(j) + err = cfg.LoadJSON(tst) + if err == nil { + t.Error("expected error decoding check_interval") + } +} + +func TestToJSON(t *testing.T) { + cfg := &Config{} + cfg.LoadJSON(cfgJSON) + newjson, err := cfg.ToJSON() + if err != nil { + t.Fatal(err) + } + cfg = &Config{} + err = cfg.LoadJSON(newjson) + if err != nil { + t.Fatal(err) + } +} + +func TestDefault(t *testing.T) { + cfg := &Config{} + cfg.Default() + if cfg.Validate() != nil { + t.Fatal("error validating") + } + + cfg.CheckInterval = 0 + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } +} diff --git a/monitor/pubsubmon/pubsubmon.go b/monitor/pubsubmon/pubsubmon.go new file mode 100644 index 00000000..8bc277a4 --- /dev/null +++ b/monitor/pubsubmon/pubsubmon.go @@ -0,0 +1,320 @@ +// Package pubsubmon implements a PeerMonitor component for IPFS Cluster that +// uses PubSub to send and receive metrics. +package pubsubmon + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/monitor/util" + + rpc "github.com/hsanjuan/go-libp2p-gorpc" + logging "github.com/ipfs/go-log" + floodsub "github.com/libp2p/go-floodsub" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + msgpack "github.com/multiformats/go-multicodec/msgpack" +) + +var logger = logging.Logger("monitor") + +// PubsubTopic specifies the topic used to publish Cluster metrics. +var PubsubTopic = "pubsubmon" + +// AlertChannelCap specifies how much buffer the alerts channel has. +var AlertChannelCap = 256 + +// DefaultWindowCap sets the amount of metrics to store per peer. +var DefaultWindowCap = 25 + +var msgpackHandle = msgpack.DefaultMsgpackHandle() + +// Monitor is a component in charge of monitoring peers, logging +// metrics and detecting failures +type Monitor struct { + ctx context.Context + cancel func() + rpcClient *rpc.Client + rpcReady chan struct{} + + host host.Host + pubsub *floodsub.PubSub + subscription *floodsub.Subscription + + metrics util.Metrics + metricsMux sync.RWMutex + windowCap int + + checker *util.MetricsChecker + alerts chan api.Alert + + config *Config + + shutdownLock sync.Mutex + shutdown bool + wg sync.WaitGroup +} + +// New creates a new monitor. It receives the window capacity +// (how many metrics to keep for each peer and type of metric) and the +// monitoringInterval (interval between the checks that produce alerts) +// as parameters +func New(h host.Host, cfg *Config) (*Monitor, error) { + err := cfg.Validate() + if err != nil { + return nil, err + } + + if DefaultWindowCap <= 0 { + panic("windowCap too small") + } + + ctx, cancel := context.WithCancel(context.Background()) + + alertCh := make(chan api.Alert, AlertChannelCap) + metrics := make(util.Metrics) + + pubsub, err := floodsub.NewFloodSub(ctx, h) + if err != nil { + cancel() + return nil, err + } + + subscription, err := pubsub.Subscribe(PubsubTopic) + if err != nil { + cancel() + return nil, err + } + + mon := &Monitor{ + ctx: ctx, + cancel: cancel, + rpcReady: make(chan struct{}, 1), + + host: h, + pubsub: pubsub, + subscription: subscription, + + metrics: metrics, + windowCap: DefaultWindowCap, + checker: util.NewMetricsChecker(metrics, alertCh), + alerts: alertCh, + config: cfg, + } + + go mon.run() + return mon, nil +} + +func (mon *Monitor) run() { + select { + case <-mon.rpcReady: + go mon.monitor() + go mon.logFromPubsub() + case <-mon.ctx.Done(): + } +} + +// logFromPubsub logs metrics received in the subscribed topic. +func (mon *Monitor) logFromPubsub() { + for { + select { + case <-mon.ctx.Done(): + return + default: + msg, err := mon.subscription.Next(mon.ctx) + if err != nil { // context cancelled enters here + continue + } + + data := msg.GetData() + buf := bytes.NewBuffer(data) + dec := msgpack.Multicodec(msgpackHandle).Decoder(buf) + metric := api.Metric{} + err = dec.Decode(&metric) + if err != nil { + logger.Error(err) + continue + } + logger.Debugf( + "received pubsub metric '%s' from '%s'", + metric.Name, + metric.Peer, + ) + + err = mon.LogMetric(metric) + if err != nil { + logger.Error(err) + continue + } + } + } +} + +// SetClient saves the given rpc.Client for later use +func (mon *Monitor) SetClient(c *rpc.Client) { + mon.rpcClient = c + mon.rpcReady <- struct{}{} +} + +// Shutdown stops the peer monitor. It particular, it will +// not deliver any alerts. +func (mon *Monitor) Shutdown() error { + mon.shutdownLock.Lock() + defer mon.shutdownLock.Unlock() + + if mon.shutdown { + logger.Warning("Monitor already shut down") + return nil + } + + logger.Info("stopping Monitor") + close(mon.rpcReady) + + // not necessary as this just removes subscription + // mon.subscription.Cancel() + mon.cancel() + + mon.wg.Wait() + mon.shutdown = true + return nil +} + +// LogMetric stores a metric so it can later be retrieved. +func (mon *Monitor) LogMetric(m api.Metric) error { + mon.metricsMux.Lock() + defer mon.metricsMux.Unlock() + name := m.Name + peer := m.Peer + mbyp, ok := mon.metrics[name] + if !ok { + mbyp = make(util.PeerMetrics) + mon.metrics[name] = mbyp + } + window, ok := mbyp[peer] + if !ok { + // We always lock the outer map, so we can use unsafe + // MetricsWindow. + window = util.NewMetricsWindow(mon.windowCap, false) + mbyp[peer] = window + } + + logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire) + window.Add(m) + return nil +} + +// PublishMetric broadcasts a metric to all current cluster peers. +func (mon *Monitor) PublishMetric(m api.Metric) error { + if m.Discard() { + logger.Warningf("discarding invalid metric: %+v", m) + return nil + } + + var b bytes.Buffer + + enc := msgpack.Multicodec(msgpackHandle).Encoder(&b) + err := enc.Encode(m) + if err != nil { + logger.Error(err) + return err + } + + logger.Debugf( + "publishing metric %s to pubsub. Expires: %d", + m.Name, + m.Expire, + ) + + err = mon.pubsub.Publish(PubsubTopic, b.Bytes()) + if err != nil { + logger.Error(err) + return err + } + + return nil +} + +// getPeers gets the current list of peers from the consensus component +func (mon *Monitor) getPeers() ([]peer.ID, error) { + // Ger current list of peers + var peers []peer.ID + err := mon.rpcClient.Call( + "", + "Cluster", + "ConsensusPeers", + struct{}{}, + &peers, + ) + return peers, err +} + +// LastMetrics returns last known VALID metrics of a given type. A metric +// is only valid if it has not expired and belongs to a current cluster peers. +func (mon *Monitor) LastMetrics(name string) []api.Metric { + peers, err := mon.getPeers() + if err != nil { + logger.Errorf("LastMetrics could not list peers: %s", err) + return []api.Metric{} + } + + mon.metricsMux.RLock() + defer mon.metricsMux.RUnlock() + + mbyp, ok := mon.metrics[name] + if !ok { + logger.Warningf("LastMetrics: No %s metrics", name) + return []api.Metric{} + } + + metrics := make([]api.Metric, 0, len(mbyp)) + + // only show metrics for current set of peers + for _, peer := range peers { + window, ok := mbyp[peer] + if !ok { + continue + } + last, err := window.Latest() + if err != nil || last.Discard() { + logger.Warningf("no valid last metric for peer: %+v", last) + continue + } + metrics = append(metrics, last) + + } + return metrics +} + +// Alerts returns a channel on which alerts are sent when the +// monitor detects a failure. +func (mon *Monitor) Alerts() <-chan api.Alert { + return mon.alerts +} + +// monitor creates a ticker which fetches current +// cluster peers and checks that the last metric for a peer +// has not expired. +func (mon *Monitor) monitor() { + ticker := time.NewTicker(mon.config.CheckInterval) + for { + select { + case <-ticker.C: + logger.Debug("monitoring tick") + peers, err := mon.getPeers() + if err != nil { + logger.Error(err) + break + } + mon.metricsMux.RLock() + mon.checker.CheckMetrics(peers) + mon.metricsMux.RUnlock() + case <-mon.ctx.Done(): + ticker.Stop() + return + } + } +} diff --git a/monitor/pubsubmon/pubsubmon_test.go b/monitor/pubsubmon/pubsubmon_test.go new file mode 100644 index 00000000..cd4dcfc0 --- /dev/null +++ b/monitor/pubsubmon/pubsubmon_test.go @@ -0,0 +1,273 @@ +package pubsubmon + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + libp2p "github.com/libp2p/go-libp2p" + peer "github.com/libp2p/go-libp2p-peer" + + peerstore "github.com/libp2p/go-libp2p-peerstore" + + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/test" +) + +type metricFactory struct { + l sync.Mutex + counter int +} + +func newMetricFactory() *metricFactory { + return &metricFactory{ + counter: 0, + } +} + +func (mf *metricFactory) newMetric(n string, p peer.ID) api.Metric { + mf.l.Lock() + defer mf.l.Unlock() + m := api.Metric{ + Name: n, + Peer: p, + Value: fmt.Sprintf("%d", mf.counter), + Valid: true, + } + m.SetTTL(5 * time.Second) + mf.counter++ + return m +} + +func (mf *metricFactory) count() int { + mf.l.Lock() + defer mf.l.Unlock() + return mf.counter +} + +func testPeerMonitor(t *testing.T) (*Monitor, func()) { + h, err := libp2p.New( + context.Background(), + libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), + ) + if err != nil { + t.Fatal(err) + } + + mock := test.NewMockRPCClientWithHost(t, h) + cfg := &Config{} + cfg.Default() + cfg.CheckInterval = 2 * time.Second + mon, err := New(h, cfg) + if err != nil { + t.Fatal(err) + } + mon.SetClient(mock) + + shutdownF := func() { + mon.Shutdown() + h.Close() + } + + return mon, shutdownF +} + +func TestPeerMonitorShutdown(t *testing.T) { + pm, shutdown := testPeerMonitor(t) + defer shutdown() + + err := pm.Shutdown() + if err != nil { + t.Error(err) + } + + err = pm.Shutdown() + if err != nil { + t.Error(err) + } +} + +func TestLogMetricConcurrent(t *testing.T) { + pm, shutdown := testPeerMonitor(t) + defer shutdown() + + var wg sync.WaitGroup + wg.Add(3) + + f := func() { + defer wg.Done() + for i := 0; i < 25; i++ { + mt := api.Metric{ + Name: "test", + Peer: test.TestPeerID1, + Value: fmt.Sprintf("%d", time.Now().UnixNano()), + Valid: true, + } + mt.SetTTL(150 * time.Millisecond) + pm.LogMetric(mt) + time.Sleep(75 * time.Millisecond) + } + } + go f() + go f() + go f() + + time.Sleep(150 * time.Millisecond) + last := time.Now().Add(-500 * time.Millisecond) + + for i := 0; i <= 20; i++ { + lastMtrcs := pm.LastMetrics("test") + + if len(lastMtrcs) != 1 { + t.Error("no valid metrics", len(lastMtrcs), i) + time.Sleep(75 * time.Millisecond) + continue + } + + n, err := strconv.Atoi(lastMtrcs[0].Value) + if err != nil { + t.Fatal(err) + } + current := time.Unix(0, int64(n)) + if current.Before(last) { + t.Errorf("expected newer metric: Current: %s, Last: %s", current, last) + } + last = current + time.Sleep(75 * time.Millisecond) + } + + wg.Wait() +} + +func TestPeerMonitorLogMetric(t *testing.T) { + pm, shutdown := testPeerMonitor(t) + defer shutdown() + mf := newMetricFactory() + + // dont fill window + pm.LogMetric(mf.newMetric("test", test.TestPeerID1)) + pm.LogMetric(mf.newMetric("test", test.TestPeerID2)) + pm.LogMetric(mf.newMetric("test", test.TestPeerID3)) + + // fill window + pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) + pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) + pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) + pm.LogMetric(mf.newMetric("test2", test.TestPeerID3)) + + lastMetrics := pm.LastMetrics("testbad") + if len(lastMetrics) != 0 { + t.Logf("%+v", lastMetrics) + t.Error("metrics should be empty") + } + + lastMetrics = pm.LastMetrics("test") + if len(lastMetrics) != 3 { + t.Error("metrics should correspond to 3 hosts") + } + + for _, v := range lastMetrics { + switch v.Peer { + case test.TestPeerID1: + if v.Value != "0" { + t.Error("bad metric value") + } + case test.TestPeerID2: + if v.Value != "1" { + t.Error("bad metric value") + } + case test.TestPeerID3: + if v.Value != "2" { + t.Error("bad metric value") + } + default: + t.Error("bad peer") + } + } + + lastMetrics = pm.LastMetrics("test2") + if len(lastMetrics) != 1 { + t.Fatal("should only be one metric") + } + if lastMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) { + t.Error("metric is not last") + } +} + +func TestPeerMonitorPublishMetric(t *testing.T) { + pm, shutdown := testPeerMonitor(t) + defer shutdown() + + pm2, shutdown2 := testPeerMonitor(t) + defer shutdown2() + + pm.host.Connect( + context.Background(), + peerstore.PeerInfo{ + ID: pm2.host.ID(), + Addrs: pm2.host.Addrs(), + }, + ) + + mf := newMetricFactory() + + metric := mf.newMetric("test", test.TestPeerID1) + err := pm.PublishMetric(metric) + if err != nil { + t.Fatal(err) + } + + time.Sleep(200 * time.Millisecond) + + checkMetric := func(t *testing.T, pm *Monitor) { + lastMetrics := pm.LastMetrics("test") + if len(lastMetrics) != 1 { + t.Fatal(pm.host.ID(), "expected 1 published metric") + } + t.Log(pm.host.ID(), "received metric") + + receivedMetric := lastMetrics[0] + if receivedMetric.Peer != metric.Peer || + receivedMetric.Expire != metric.Expire || + receivedMetric.Value != metric.Value || + receivedMetric.Valid != metric.Valid || + receivedMetric.Name != metric.Name { + t.Fatal("it should be exactly the same metric we published") + } + } + + t.Log("pm1") + checkMetric(t, pm) + t.Log("pm2") + checkMetric(t, pm2) +} + +func TestPeerMonitorAlerts(t *testing.T) { + pm, shutdown := testPeerMonitor(t) + defer shutdown() + mf := newMetricFactory() + + mtr := mf.newMetric("test", test.TestPeerID1) + mtr.SetTTL(0) + pm.LogMetric(mtr) + time.Sleep(time.Second) + timeout := time.NewTimer(time.Second * 5) + + // it should alert twice at least. Alert re-occurrs. + for i := 0; i < 2; i++ { + select { + case <-timeout.C: + t.Fatal("should have thrown an alert by now") + case alrt := <-pm.Alerts(): + if alrt.MetricName != "test" { + t.Error("Alert should be for test") + } + if alrt.Peer != test.TestPeerID1 { + t.Error("Peer should be TestPeerID1") + } + } + } +}