Add new pubsubmon: A monitor that uses pubsub to send and receive metrics
License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
parent
73b962f799
commit
6f84b3bb01
72
monitor/pubsubmon/config.go
Normal file
72
monitor/pubsubmon/config.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
package pubsubmon
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/config"
|
||||
)
|
||||
|
||||
const configKey = "pubsubmon"
|
||||
|
||||
// Default values for this Config.
|
||||
const (
|
||||
DefaultCheckInterval = 15 * time.Second
|
||||
)
|
||||
|
||||
// Config allows to initialize a Monitor and customize some parameters.
|
||||
type Config struct {
|
||||
config.Saver
|
||||
|
||||
CheckInterval time.Duration
|
||||
}
|
||||
|
||||
type jsonConfig struct {
|
||||
CheckInterval string `json:"check_interval"`
|
||||
}
|
||||
|
||||
// ConfigKey provides a human-friendly identifier for this type of Config.
|
||||
func (cfg *Config) ConfigKey() string {
|
||||
return configKey
|
||||
}
|
||||
|
||||
// Default sets the fields of this Config to sensible values.
|
||||
func (cfg *Config) Default() error {
|
||||
cfg.CheckInterval = DefaultCheckInterval
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate checks that the fields of this Config have working values,
|
||||
// at least in appearance.
|
||||
func (cfg *Config) Validate() error {
|
||||
if cfg.CheckInterval <= 0 {
|
||||
return errors.New("basic.check_interval too low")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadJSON sets the fields of this Config to the values defined by the JSON
|
||||
// representation of it, as generated by ToJSON.
|
||||
func (cfg *Config) LoadJSON(raw []byte) error {
|
||||
jcfg := &jsonConfig{}
|
||||
err := json.Unmarshal(raw, jcfg)
|
||||
if err != nil {
|
||||
logger.Error("Error unmarshaling basic monitor config")
|
||||
return err
|
||||
}
|
||||
|
||||
interval, _ := time.ParseDuration(jcfg.CheckInterval)
|
||||
cfg.CheckInterval = interval
|
||||
|
||||
return cfg.Validate()
|
||||
}
|
||||
|
||||
// ToJSON generates a human-friendly JSON representation of this Config.
|
||||
func (cfg *Config) ToJSON() ([]byte, error) {
|
||||
jcfg := &jsonConfig{}
|
||||
|
||||
jcfg.CheckInterval = cfg.CheckInterval.String()
|
||||
|
||||
return json.MarshalIndent(jcfg, "", " ")
|
||||
}
|
57
monitor/pubsubmon/config_test.go
Normal file
57
monitor/pubsubmon/config_test.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package pubsubmon
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var cfgJSON = []byte(`
|
||||
{
|
||||
"check_interval": "15s"
|
||||
}
|
||||
`)
|
||||
|
||||
func TestLoadJSON(t *testing.T) {
|
||||
cfg := &Config{}
|
||||
err := cfg.LoadJSON(cfgJSON)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
j := &jsonConfig{}
|
||||
|
||||
json.Unmarshal(cfgJSON, j)
|
||||
j.CheckInterval = "-10"
|
||||
tst, _ := json.Marshal(j)
|
||||
err = cfg.LoadJSON(tst)
|
||||
if err == nil {
|
||||
t.Error("expected error decoding check_interval")
|
||||
}
|
||||
}
|
||||
|
||||
func TestToJSON(t *testing.T) {
|
||||
cfg := &Config{}
|
||||
cfg.LoadJSON(cfgJSON)
|
||||
newjson, err := cfg.ToJSON()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg = &Config{}
|
||||
err = cfg.LoadJSON(newjson)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefault(t *testing.T) {
|
||||
cfg := &Config{}
|
||||
cfg.Default()
|
||||
if cfg.Validate() != nil {
|
||||
t.Fatal("error validating")
|
||||
}
|
||||
|
||||
cfg.CheckInterval = 0
|
||||
if cfg.Validate() == nil {
|
||||
t.Fatal("expected error validating")
|
||||
}
|
||||
}
|
320
monitor/pubsubmon/pubsubmon.go
Normal file
320
monitor/pubsubmon/pubsubmon.go
Normal file
|
@ -0,0 +1,320 @@
|
|||
// Package pubsubmon implements a PeerMonitor component for IPFS Cluster that
|
||||
// uses PubSub to send and receive metrics.
|
||||
package pubsubmon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/monitor/util"
|
||||
|
||||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||
logging "github.com/ipfs/go-log"
|
||||
floodsub "github.com/libp2p/go-floodsub"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
msgpack "github.com/multiformats/go-multicodec/msgpack"
|
||||
)
|
||||
|
||||
var logger = logging.Logger("monitor")
|
||||
|
||||
// PubsubTopic specifies the topic used to publish Cluster metrics.
|
||||
var PubsubTopic = "pubsubmon"
|
||||
|
||||
// AlertChannelCap specifies how much buffer the alerts channel has.
|
||||
var AlertChannelCap = 256
|
||||
|
||||
// DefaultWindowCap sets the amount of metrics to store per peer.
|
||||
var DefaultWindowCap = 25
|
||||
|
||||
var msgpackHandle = msgpack.DefaultMsgpackHandle()
|
||||
|
||||
// Monitor is a component in charge of monitoring peers, logging
|
||||
// metrics and detecting failures
|
||||
type Monitor struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
rpcClient *rpc.Client
|
||||
rpcReady chan struct{}
|
||||
|
||||
host host.Host
|
||||
pubsub *floodsub.PubSub
|
||||
subscription *floodsub.Subscription
|
||||
|
||||
metrics util.Metrics
|
||||
metricsMux sync.RWMutex
|
||||
windowCap int
|
||||
|
||||
checker *util.MetricsChecker
|
||||
alerts chan api.Alert
|
||||
|
||||
config *Config
|
||||
|
||||
shutdownLock sync.Mutex
|
||||
shutdown bool
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// New creates a new monitor. It receives the window capacity
|
||||
// (how many metrics to keep for each peer and type of metric) and the
|
||||
// monitoringInterval (interval between the checks that produce alerts)
|
||||
// as parameters
|
||||
func New(h host.Host, cfg *Config) (*Monitor, error) {
|
||||
err := cfg.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if DefaultWindowCap <= 0 {
|
||||
panic("windowCap too small")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
alertCh := make(chan api.Alert, AlertChannelCap)
|
||||
metrics := make(util.Metrics)
|
||||
|
||||
pubsub, err := floodsub.NewFloodSub(ctx, h)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
subscription, err := pubsub.Subscribe(PubsubTopic)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mon := &Monitor{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
rpcReady: make(chan struct{}, 1),
|
||||
|
||||
host: h,
|
||||
pubsub: pubsub,
|
||||
subscription: subscription,
|
||||
|
||||
metrics: metrics,
|
||||
windowCap: DefaultWindowCap,
|
||||
checker: util.NewMetricsChecker(metrics, alertCh),
|
||||
alerts: alertCh,
|
||||
config: cfg,
|
||||
}
|
||||
|
||||
go mon.run()
|
||||
return mon, nil
|
||||
}
|
||||
|
||||
func (mon *Monitor) run() {
|
||||
select {
|
||||
case <-mon.rpcReady:
|
||||
go mon.monitor()
|
||||
go mon.logFromPubsub()
|
||||
case <-mon.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
// logFromPubsub logs metrics received in the subscribed topic.
|
||||
func (mon *Monitor) logFromPubsub() {
|
||||
for {
|
||||
select {
|
||||
case <-mon.ctx.Done():
|
||||
return
|
||||
default:
|
||||
msg, err := mon.subscription.Next(mon.ctx)
|
||||
if err != nil { // context cancelled enters here
|
||||
continue
|
||||
}
|
||||
|
||||
data := msg.GetData()
|
||||
buf := bytes.NewBuffer(data)
|
||||
dec := msgpack.Multicodec(msgpackHandle).Decoder(buf)
|
||||
metric := api.Metric{}
|
||||
err = dec.Decode(&metric)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
continue
|
||||
}
|
||||
logger.Debugf(
|
||||
"received pubsub metric '%s' from '%s'",
|
||||
metric.Name,
|
||||
metric.Peer,
|
||||
)
|
||||
|
||||
err = mon.LogMetric(metric)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetClient saves the given rpc.Client for later use
|
||||
func (mon *Monitor) SetClient(c *rpc.Client) {
|
||||
mon.rpcClient = c
|
||||
mon.rpcReady <- struct{}{}
|
||||
}
|
||||
|
||||
// Shutdown stops the peer monitor. It particular, it will
|
||||
// not deliver any alerts.
|
||||
func (mon *Monitor) Shutdown() error {
|
||||
mon.shutdownLock.Lock()
|
||||
defer mon.shutdownLock.Unlock()
|
||||
|
||||
if mon.shutdown {
|
||||
logger.Warning("Monitor already shut down")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("stopping Monitor")
|
||||
close(mon.rpcReady)
|
||||
|
||||
// not necessary as this just removes subscription
|
||||
// mon.subscription.Cancel()
|
||||
mon.cancel()
|
||||
|
||||
mon.wg.Wait()
|
||||
mon.shutdown = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// LogMetric stores a metric so it can later be retrieved.
|
||||
func (mon *Monitor) LogMetric(m api.Metric) error {
|
||||
mon.metricsMux.Lock()
|
||||
defer mon.metricsMux.Unlock()
|
||||
name := m.Name
|
||||
peer := m.Peer
|
||||
mbyp, ok := mon.metrics[name]
|
||||
if !ok {
|
||||
mbyp = make(util.PeerMetrics)
|
||||
mon.metrics[name] = mbyp
|
||||
}
|
||||
window, ok := mbyp[peer]
|
||||
if !ok {
|
||||
// We always lock the outer map, so we can use unsafe
|
||||
// MetricsWindow.
|
||||
window = util.NewMetricsWindow(mon.windowCap, false)
|
||||
mbyp[peer] = window
|
||||
}
|
||||
|
||||
logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire)
|
||||
window.Add(m)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishMetric broadcasts a metric to all current cluster peers.
|
||||
func (mon *Monitor) PublishMetric(m api.Metric) error {
|
||||
if m.Discard() {
|
||||
logger.Warningf("discarding invalid metric: %+v", m)
|
||||
return nil
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
|
||||
enc := msgpack.Multicodec(msgpackHandle).Encoder(&b)
|
||||
err := enc.Encode(m)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Debugf(
|
||||
"publishing metric %s to pubsub. Expires: %d",
|
||||
m.Name,
|
||||
m.Expire,
|
||||
)
|
||||
|
||||
err = mon.pubsub.Publish(PubsubTopic, b.Bytes())
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getPeers gets the current list of peers from the consensus component
|
||||
func (mon *Monitor) getPeers() ([]peer.ID, error) {
|
||||
// Ger current list of peers
|
||||
var peers []peer.ID
|
||||
err := mon.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
"ConsensusPeers",
|
||||
struct{}{},
|
||||
&peers,
|
||||
)
|
||||
return peers, err
|
||||
}
|
||||
|
||||
// LastMetrics returns last known VALID metrics of a given type. A metric
|
||||
// is only valid if it has not expired and belongs to a current cluster peers.
|
||||
func (mon *Monitor) LastMetrics(name string) []api.Metric {
|
||||
peers, err := mon.getPeers()
|
||||
if err != nil {
|
||||
logger.Errorf("LastMetrics could not list peers: %s", err)
|
||||
return []api.Metric{}
|
||||
}
|
||||
|
||||
mon.metricsMux.RLock()
|
||||
defer mon.metricsMux.RUnlock()
|
||||
|
||||
mbyp, ok := mon.metrics[name]
|
||||
if !ok {
|
||||
logger.Warningf("LastMetrics: No %s metrics", name)
|
||||
return []api.Metric{}
|
||||
}
|
||||
|
||||
metrics := make([]api.Metric, 0, len(mbyp))
|
||||
|
||||
// only show metrics for current set of peers
|
||||
for _, peer := range peers {
|
||||
window, ok := mbyp[peer]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
last, err := window.Latest()
|
||||
if err != nil || last.Discard() {
|
||||
logger.Warningf("no valid last metric for peer: %+v", last)
|
||||
continue
|
||||
}
|
||||
metrics = append(metrics, last)
|
||||
|
||||
}
|
||||
return metrics
|
||||
}
|
||||
|
||||
// Alerts returns a channel on which alerts are sent when the
|
||||
// monitor detects a failure.
|
||||
func (mon *Monitor) Alerts() <-chan api.Alert {
|
||||
return mon.alerts
|
||||
}
|
||||
|
||||
// monitor creates a ticker which fetches current
|
||||
// cluster peers and checks that the last metric for a peer
|
||||
// has not expired.
|
||||
func (mon *Monitor) monitor() {
|
||||
ticker := time.NewTicker(mon.config.CheckInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
logger.Debug("monitoring tick")
|
||||
peers, err := mon.getPeers()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
break
|
||||
}
|
||||
mon.metricsMux.RLock()
|
||||
mon.checker.CheckMetrics(peers)
|
||||
mon.metricsMux.RUnlock()
|
||||
case <-mon.ctx.Done():
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
273
monitor/pubsubmon/pubsubmon_test.go
Normal file
273
monitor/pubsubmon/pubsubmon_test.go
Normal file
|
@ -0,0 +1,273 @@
|
|||
package pubsubmon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
libp2p "github.com/libp2p/go-libp2p"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
|
||||
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/test"
|
||||
)
|
||||
|
||||
type metricFactory struct {
|
||||
l sync.Mutex
|
||||
counter int
|
||||
}
|
||||
|
||||
func newMetricFactory() *metricFactory {
|
||||
return &metricFactory{
|
||||
counter: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (mf *metricFactory) newMetric(n string, p peer.ID) api.Metric {
|
||||
mf.l.Lock()
|
||||
defer mf.l.Unlock()
|
||||
m := api.Metric{
|
||||
Name: n,
|
||||
Peer: p,
|
||||
Value: fmt.Sprintf("%d", mf.counter),
|
||||
Valid: true,
|
||||
}
|
||||
m.SetTTL(5 * time.Second)
|
||||
mf.counter++
|
||||
return m
|
||||
}
|
||||
|
||||
func (mf *metricFactory) count() int {
|
||||
mf.l.Lock()
|
||||
defer mf.l.Unlock()
|
||||
return mf.counter
|
||||
}
|
||||
|
||||
func testPeerMonitor(t *testing.T) (*Monitor, func()) {
|
||||
h, err := libp2p.New(
|
||||
context.Background(),
|
||||
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mock := test.NewMockRPCClientWithHost(t, h)
|
||||
cfg := &Config{}
|
||||
cfg.Default()
|
||||
cfg.CheckInterval = 2 * time.Second
|
||||
mon, err := New(h, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mon.SetClient(mock)
|
||||
|
||||
shutdownF := func() {
|
||||
mon.Shutdown()
|
||||
h.Close()
|
||||
}
|
||||
|
||||
return mon, shutdownF
|
||||
}
|
||||
|
||||
func TestPeerMonitorShutdown(t *testing.T) {
|
||||
pm, shutdown := testPeerMonitor(t)
|
||||
defer shutdown()
|
||||
|
||||
err := pm.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = pm.Shutdown()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogMetricConcurrent(t *testing.T) {
|
||||
pm, shutdown := testPeerMonitor(t)
|
||||
defer shutdown()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
|
||||
f := func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 25; i++ {
|
||||
mt := api.Metric{
|
||||
Name: "test",
|
||||
Peer: test.TestPeerID1,
|
||||
Value: fmt.Sprintf("%d", time.Now().UnixNano()),
|
||||
Valid: true,
|
||||
}
|
||||
mt.SetTTL(150 * time.Millisecond)
|
||||
pm.LogMetric(mt)
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
go f()
|
||||
go f()
|
||||
go f()
|
||||
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
last := time.Now().Add(-500 * time.Millisecond)
|
||||
|
||||
for i := 0; i <= 20; i++ {
|
||||
lastMtrcs := pm.LastMetrics("test")
|
||||
|
||||
if len(lastMtrcs) != 1 {
|
||||
t.Error("no valid metrics", len(lastMtrcs), i)
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
n, err := strconv.Atoi(lastMtrcs[0].Value)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
current := time.Unix(0, int64(n))
|
||||
if current.Before(last) {
|
||||
t.Errorf("expected newer metric: Current: %s, Last: %s", current, last)
|
||||
}
|
||||
last = current
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestPeerMonitorLogMetric(t *testing.T) {
|
||||
pm, shutdown := testPeerMonitor(t)
|
||||
defer shutdown()
|
||||
mf := newMetricFactory()
|
||||
|
||||
// dont fill window
|
||||
pm.LogMetric(mf.newMetric("test", test.TestPeerID1))
|
||||
pm.LogMetric(mf.newMetric("test", test.TestPeerID2))
|
||||
pm.LogMetric(mf.newMetric("test", test.TestPeerID3))
|
||||
|
||||
// fill window
|
||||
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
|
||||
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
|
||||
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
|
||||
pm.LogMetric(mf.newMetric("test2", test.TestPeerID3))
|
||||
|
||||
lastMetrics := pm.LastMetrics("testbad")
|
||||
if len(lastMetrics) != 0 {
|
||||
t.Logf("%+v", lastMetrics)
|
||||
t.Error("metrics should be empty")
|
||||
}
|
||||
|
||||
lastMetrics = pm.LastMetrics("test")
|
||||
if len(lastMetrics) != 3 {
|
||||
t.Error("metrics should correspond to 3 hosts")
|
||||
}
|
||||
|
||||
for _, v := range lastMetrics {
|
||||
switch v.Peer {
|
||||
case test.TestPeerID1:
|
||||
if v.Value != "0" {
|
||||
t.Error("bad metric value")
|
||||
}
|
||||
case test.TestPeerID2:
|
||||
if v.Value != "1" {
|
||||
t.Error("bad metric value")
|
||||
}
|
||||
case test.TestPeerID3:
|
||||
if v.Value != "2" {
|
||||
t.Error("bad metric value")
|
||||
}
|
||||
default:
|
||||
t.Error("bad peer")
|
||||
}
|
||||
}
|
||||
|
||||
lastMetrics = pm.LastMetrics("test2")
|
||||
if len(lastMetrics) != 1 {
|
||||
t.Fatal("should only be one metric")
|
||||
}
|
||||
if lastMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) {
|
||||
t.Error("metric is not last")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeerMonitorPublishMetric(t *testing.T) {
|
||||
pm, shutdown := testPeerMonitor(t)
|
||||
defer shutdown()
|
||||
|
||||
pm2, shutdown2 := testPeerMonitor(t)
|
||||
defer shutdown2()
|
||||
|
||||
pm.host.Connect(
|
||||
context.Background(),
|
||||
peerstore.PeerInfo{
|
||||
ID: pm2.host.ID(),
|
||||
Addrs: pm2.host.Addrs(),
|
||||
},
|
||||
)
|
||||
|
||||
mf := newMetricFactory()
|
||||
|
||||
metric := mf.newMetric("test", test.TestPeerID1)
|
||||
err := pm.PublishMetric(metric)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
checkMetric := func(t *testing.T, pm *Monitor) {
|
||||
lastMetrics := pm.LastMetrics("test")
|
||||
if len(lastMetrics) != 1 {
|
||||
t.Fatal(pm.host.ID(), "expected 1 published metric")
|
||||
}
|
||||
t.Log(pm.host.ID(), "received metric")
|
||||
|
||||
receivedMetric := lastMetrics[0]
|
||||
if receivedMetric.Peer != metric.Peer ||
|
||||
receivedMetric.Expire != metric.Expire ||
|
||||
receivedMetric.Value != metric.Value ||
|
||||
receivedMetric.Valid != metric.Valid ||
|
||||
receivedMetric.Name != metric.Name {
|
||||
t.Fatal("it should be exactly the same metric we published")
|
||||
}
|
||||
}
|
||||
|
||||
t.Log("pm1")
|
||||
checkMetric(t, pm)
|
||||
t.Log("pm2")
|
||||
checkMetric(t, pm2)
|
||||
}
|
||||
|
||||
func TestPeerMonitorAlerts(t *testing.T) {
|
||||
pm, shutdown := testPeerMonitor(t)
|
||||
defer shutdown()
|
||||
mf := newMetricFactory()
|
||||
|
||||
mtr := mf.newMetric("test", test.TestPeerID1)
|
||||
mtr.SetTTL(0)
|
||||
pm.LogMetric(mtr)
|
||||
time.Sleep(time.Second)
|
||||
timeout := time.NewTimer(time.Second * 5)
|
||||
|
||||
// it should alert twice at least. Alert re-occurrs.
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-timeout.C:
|
||||
t.Fatal("should have thrown an alert by now")
|
||||
case alrt := <-pm.Alerts():
|
||||
if alrt.MetricName != "test" {
|
||||
t.Error("Alert should be for test")
|
||||
}
|
||||
if alrt.Peer != test.TestPeerID1 {
|
||||
t.Error("Peer should be TestPeerID1")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user