From 962d249e74def710f29b32a2150772dd7d3c5509 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Thu, 21 Mar 2019 22:48:40 +0530 Subject: [PATCH] Remove basic monitor (#726) Remove basic monitor This commit removes `basic` monitor component, because it is not being used by default since few releases ago pubsub monitor was introduced. Issue #689 --- .travis.yml | 3 - cluster_test.go | 9 +- cmd/ipfs-cluster-service/configs.go | 5 - cmd/ipfs-cluster-service/daemon.go | 30 +-- cmd/ipfs-cluster-service/main.go | 7 - config_test.go | 13 +- ipfscluster_test.go | 21 +- monitor/basic/config.go | 97 -------- monitor/basic/config_test.go | 69 ------ monitor/basic/peer_monitor.go | 219 ----------------- monitor/basic/peer_monitor_test.go | 246 -------------------- monitor/pubsubmon/config.go | 4 +- sharness/config/ssl-basic_auth/service.json | 3 - sharness/config/ssl/service.json | 3 - 14 files changed, 19 insertions(+), 710 deletions(-) delete mode 100644 monitor/basic/config.go delete mode 100644 monitor/basic/config_test.go delete mode 100644 monitor/basic/peer_monitor.go delete mode 100644 monitor/basic/peer_monitor_test.go diff --git a/.travis.yml b/.travis.yml index 80b8358a..bef539e7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,9 +28,6 @@ jobs: - go test -v -coverprofile=coverage.txt -covermode=atomic ./... after_success: - bash <(curl -s https://codecov.io/bash) - - name: "Main Tests with basic monitor" - script: - - go test -v . -monitor basic - name: "Main Tests with stateless tracker" script: - go test -v . -tracker stateless diff --git a/cluster_test.go b/cluster_test.go index ccd2a8d0..1a76a692 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -15,6 +15,7 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/informer/numpin" + "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state/mapstate" "github.com/ipfs/ipfs-cluster/test" @@ -138,7 +139,7 @@ type mockTracer struct { } func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, state.State, PinTracker) { - clusterCfg, _, _, _, consensusCfg, maptrackerCfg, statelesstrackerCfg, bmonCfg, psmonCfg, _, _ := testingConfigs() + clusterCfg, _, _, _, consensusCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() host, err := NewClusterHost(context.Background(), clusterCfg) if err != nil { @@ -154,9 +155,11 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, state.Sta raftcon, _ := raft.NewConsensus(host, consensusCfg, st, false) - bmonCfg.CheckInterval = 2 * time.Second psmonCfg.CheckInterval = 2 * time.Second - mon := makeMonitor(t, host, bmonCfg, psmonCfg) + mon, err := pubsubmon.New(host, psmonCfg) + if err != nil { + t.Fatal(err) + } alloc := ascendalloc.NewAllocator() numpinCfg := &numpin.Config{} diff --git a/cmd/ipfs-cluster-service/configs.go b/cmd/ipfs-cluster-service/configs.go index 8b1bc221..48890d89 100644 --- a/cmd/ipfs-cluster-service/configs.go +++ b/cmd/ipfs-cluster-service/configs.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" - "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" @@ -27,7 +26,6 @@ type cfgs struct { consensusCfg *raft.Config maptrackerCfg *maptracker.Config statelessTrackerCfg *stateless.Config - monCfg *basic.Config pubsubmonCfg *pubsubmon.Config diskInfCfg *disk.Config numpinInfCfg *numpin.Config @@ -44,7 +42,6 @@ func makeConfigs() (*config.Manager, *cfgs) { consensusCfg := &raft.Config{} maptrackerCfg := &maptracker.Config{} statelessCfg := &stateless.Config{} - monCfg := &basic.Config{} pubsubmonCfg := &pubsubmon.Config{} diskInfCfg := &disk.Config{} numpinInfCfg := &numpin.Config{} @@ -57,7 +54,6 @@ func makeConfigs() (*config.Manager, *cfgs) { cfg.RegisterComponent(config.Consensus, consensusCfg) cfg.RegisterComponent(config.PinTracker, maptrackerCfg) cfg.RegisterComponent(config.PinTracker, statelessCfg) - cfg.RegisterComponent(config.Monitor, monCfg) cfg.RegisterComponent(config.Monitor, pubsubmonCfg) cfg.RegisterComponent(config.Informer, diskInfCfg) cfg.RegisterComponent(config.Informer, numpinInfCfg) @@ -71,7 +67,6 @@ func makeConfigs() (*config.Manager, *cfgs) { consensusCfg, maptrackerCfg, statelessCfg, - monCfg, pubsubmonCfg, diskInfCfg, numpinInfCfg, diff --git a/cmd/ipfs-cluster-service/daemon.go b/cmd/ipfs-cluster-service/daemon.go index 5dab00b0..faeccf74 100644 --- a/cmd/ipfs-cluster-service/daemon.go +++ b/cmd/ipfs-cluster-service/daemon.go @@ -20,7 +20,6 @@ import ( "github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" - "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" @@ -145,7 +144,9 @@ func createCluster( checkErr("creating consensus component", err) tracker := setupPinTracker(c.String("pintracker"), host, cfgs.maptrackerCfg, cfgs.statelessTrackerCfg, cfgs.clusterCfg.Peername) - mon := setupMonitor(c.String("monitor"), host, cfgs.monCfg, cfgs.pubsubmonCfg) + mon, err := pubsubmon.New(host, cfgs.pubsubmonCfg) + checkErr("creating monitor", err) + logger.Debug("pubsub monitor loaded") informer, alloc := setupAllocation(c.String("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg) ipfscluster.ReadyTimeout = cfgs.consensusCfg.WaitForLeaderTimeout + 5*time.Second @@ -248,31 +249,6 @@ func setupAllocation( } } -func setupMonitor( - name string, - h host.Host, - basicCfg *basic.Config, - pubsubCfg *pubsubmon.Config, -) ipfscluster.PeerMonitor { - switch name { - case "basic": - mon, err := basic.NewMonitor(basicCfg) - checkErr("creating monitor", err) - logger.Debug("basic monitor loaded") - return mon - case "pubsub": - mon, err := pubsubmon.New(h, pubsubCfg) - checkErr("creating monitor", err) - logger.Debug("pubsub monitor loaded") - return mon - default: - err := errors.New("unknown monitor type") - checkErr("", err) - return nil - } - -} - func setupPinTracker( name string, h host.Host, diff --git a/cmd/ipfs-cluster-service/main.go b/cmd/ipfs-cluster-service/main.go index 0d82358c..b8e99012 100644 --- a/cmd/ipfs-cluster-service/main.go +++ b/cmd/ipfs-cluster-service/main.go @@ -25,7 +25,6 @@ const programName = `ipfs-cluster-service` // flag defaults const ( defaultAllocation = "disk-freespace" - defaultMonitor = "pubsub" defaultPinTracker = "map" defaultLogLevel = "info" ) @@ -275,12 +274,6 @@ configuration. Value: defaultAllocation, Usage: "allocation strategy to use [disk-freespace,disk-reposize,numpin].", }, - cli.StringFlag{ - Name: "monitor", - Value: defaultMonitor, - Hidden: true, - Usage: "peer monitor to use [basic,pubsub].", - }, cli.StringFlag{ Name: "pintracker", Value: defaultPinTracker, diff --git a/config_test.go b/config_test.go index 65050bd6..90f0ab20 100644 --- a/config_test.go +++ b/config_test.go @@ -6,7 +6,6 @@ import ( "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" - "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" @@ -104,8 +103,8 @@ var testingTracerCfg = []byte(`{ "enable_tracing": false }`) -func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *basic.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { - clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs() +func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { + clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs() clusterCfg.LoadJSON(testingClusterCfg) apiCfg.LoadJSON(testingAPICfg) proxyCfg.LoadJSON(testingProxyCfg) @@ -113,15 +112,14 @@ func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Confi consensusCfg.LoadJSON(testingRaftCfg) maptrackerCfg.LoadJSON(testingTrackerCfg) statelesstrkrCfg.LoadJSON(testingTrackerCfg) - basicmonCfg.LoadJSON(testingMonCfg) pubsubmonCfg.LoadJSON(testingMonCfg) diskInfCfg.LoadJSON(testingDiskInfCfg) tracingCfg.LoadJSON(testingTracerCfg) - return clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, tracingCfg + return clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg } -func testingEmptyConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *basic.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { +func testingEmptyConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { clusterCfg := &Config{} apiCfg := &rest.Config{} proxyCfg := &ipfsproxy.Config{} @@ -129,11 +127,10 @@ func testingEmptyConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp. consensusCfg := &raft.Config{} maptrackerCfg := &maptracker.Config{} statelessCfg := &stateless.Config{} - basicmonCfg := &basic.Config{} pubsubmonCfg := &pubsubmon.Config{} diskInfCfg := &disk.Config{} tracingCfg := &observations.TracingConfig{} - return clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelessCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, tracingCfg + return clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg } // func TestConfigDefault(t *testing.T) { diff --git a/ipfscluster_test.go b/ipfscluster_test.go index b777301c..07774820 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" - "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" @@ -142,7 +141,7 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) ( checkErr(t, err) peername := fmt.Sprintf("peer_%d", i) - clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelesstrackerCfg, bmonCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs() + clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs() clusterCfg.ID = pid clusterCfg.Peername = peername @@ -171,7 +170,8 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) ( state := mapstate.NewMapState() tracker := makePinTracker(t, clusterCfg.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) - mon := makeMonitor(t, host, bmonCfg, psmonCfg) + mon, err := pubsubmon.New(host, psmonCfg) + checkErr(t, err) alloc := descendalloc.NewAllocator() inf, err := disk.NewInformer(diskInfCfg) @@ -185,21 +185,6 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) ( return host, clusterCfg, raftCon, []API{api, ipfsProxy}, ipfs, state, tracker, mon, alloc, inf, tracer, mock } -func makeMonitor(t *testing.T, h host.Host, bmonCfg *basic.Config, psmonCfg *pubsubmon.Config) PeerMonitor { - var mon PeerMonitor - var err error - switch pmonitor { - case "basic": - mon, err = basic.NewMonitor(bmonCfg) - case "pubsub": - mon, err = pubsubmon.New(h, psmonCfg) - default: - panic("bad monitor") - } - checkErr(t, err) - return mon -} - func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg *stateless.Config, peerName string) PinTracker { var ptrkr PinTracker switch ptracker { diff --git a/monitor/basic/config.go b/monitor/basic/config.go deleted file mode 100644 index bd8527a2..00000000 --- a/monitor/basic/config.go +++ /dev/null @@ -1,97 +0,0 @@ -package basic - -import ( - "encoding/json" - "errors" - "time" - - "github.com/ipfs/ipfs-cluster/config" - "github.com/kelseyhightower/envconfig" -) - -const configKey = "monbasic" -const envConfigKey = "cluster_monbasic" - -// Default values for this Config. -const ( - DefaultCheckInterval = 15 * time.Second -) - -// Config allows to initialize a Monitor and customize some parameters. -type Config struct { - config.Saver - - CheckInterval time.Duration -} - -type jsonConfig struct { - CheckInterval string `json:"check_interval"` -} - -// ConfigKey provides a human-friendly identifier for this type of Config. -func (cfg *Config) ConfigKey() string { - return configKey -} - -// Default sets the fields of this Config to sensible values. -func (cfg *Config) Default() error { - cfg.CheckInterval = DefaultCheckInterval - return nil -} - -// ApplyEnvVars fills in any Config fields found -// as environment variables. -func (cfg *Config) ApplyEnvVars() error { - jcfg := cfg.toJSONConfig() - - err := envconfig.Process(envConfigKey, jcfg) - if err != nil { - return err - } - - return cfg.applyJSONConfig(jcfg) -} - -// Validate checks that the fields of this Config have working values, -// at least in appearance. -func (cfg *Config) Validate() error { - if cfg.CheckInterval <= 0 { - return errors.New("basic.check_interval too low") - } - return nil -} - -// LoadJSON sets the fields of this Config to the values defined by the JSON -// representation of it, as generated by ToJSON. -func (cfg *Config) LoadJSON(raw []byte) error { - jcfg := &jsonConfig{} - err := json.Unmarshal(raw, jcfg) - if err != nil { - logger.Error("Error unmarshaling basic monitor config") - return err - } - - cfg.Default() - - return cfg.applyJSONConfig(jcfg) -} - -func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { - interval, _ := time.ParseDuration(jcfg.CheckInterval) - cfg.CheckInterval = interval - - return cfg.Validate() -} - -// ToJSON generates a human-friendly JSON representation of this Config. -func (cfg *Config) ToJSON() ([]byte, error) { - jcfg := cfg.toJSONConfig() - - return json.MarshalIndent(jcfg, "", " ") -} - -func (cfg *Config) toJSONConfig() *jsonConfig { - return &jsonConfig{ - CheckInterval: cfg.CheckInterval.String(), - } -} diff --git a/monitor/basic/config_test.go b/monitor/basic/config_test.go deleted file mode 100644 index 2857ae37..00000000 --- a/monitor/basic/config_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package basic - -import ( - "encoding/json" - "os" - "testing" - "time" -) - -var cfgJSON = []byte(` -{ - "check_interval": "15s" -} -`) - -func TestLoadJSON(t *testing.T) { - cfg := &Config{} - err := cfg.LoadJSON(cfgJSON) - if err != nil { - t.Fatal(err) - } - - j := &jsonConfig{} - - json.Unmarshal(cfgJSON, j) - j.CheckInterval = "-10" - tst, _ := json.Marshal(j) - err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error decoding check_interval") - } -} - -func TestToJSON(t *testing.T) { - cfg := &Config{} - cfg.LoadJSON(cfgJSON) - newjson, err := cfg.ToJSON() - if err != nil { - t.Fatal(err) - } - cfg = &Config{} - err = cfg.LoadJSON(newjson) - if err != nil { - t.Fatal(err) - } -} - -func TestDefault(t *testing.T) { - cfg := &Config{} - cfg.Default() - if cfg.Validate() != nil { - t.Fatal("error validating") - } - - cfg.CheckInterval = 0 - if cfg.Validate() == nil { - t.Fatal("expected error validating") - } -} - -func TestApplyEnvVars(t *testing.T) { - os.Setenv("CLUSTER_MONBASIC_CHECKINTERVAL", "22s") - cfg := &Config{} - cfg.ApplyEnvVars() - - if cfg.CheckInterval != 22*time.Second { - t.Fatal("failed to override check_interval with env var") - } -} diff --git a/monitor/basic/peer_monitor.go b/monitor/basic/peer_monitor.go deleted file mode 100644 index 7997df74..00000000 --- a/monitor/basic/peer_monitor.go +++ /dev/null @@ -1,219 +0,0 @@ -// Package basic implements a basic PeerMonitor component for IPFS Cluster. This -// component is in charge of logging metrics and triggering alerts when a peer -// goes down. -package basic - -import ( - "context" - "errors" - "fmt" - "strings" - "sync" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/monitor/metrics" - "github.com/ipfs/ipfs-cluster/rpcutil" - "go.opencensus.io/trace" - - logging "github.com/ipfs/go-log" - rpc "github.com/libp2p/go-libp2p-gorpc" - peer "github.com/libp2p/go-libp2p-peer" -) - -var logger = logging.Logger("monitor") - -// Monitor is a component in charge of monitoring peers, logging -// metrics and detecting failures -type Monitor struct { - ctx context.Context - cancel func() - rpcClient *rpc.Client - rpcReady chan struct{} - - metrics *metrics.Store - checker *metrics.Checker - - config *Config - - shutdownLock sync.Mutex - shutdown bool - wg sync.WaitGroup -} - -// NewMonitor creates a new monitor using the given config. -func NewMonitor(cfg *Config) (*Monitor, error) { - err := cfg.Validate() - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(context.Background()) - - mtrs := metrics.NewStore() - checker := metrics.NewChecker(mtrs) - - mon := &Monitor{ - ctx: ctx, - cancel: cancel, - rpcReady: make(chan struct{}, 1), - - metrics: mtrs, - checker: checker, - config: cfg, - } - - go mon.run() - return mon, nil -} - -func (mon *Monitor) run() { - select { - case <-mon.rpcReady: - go mon.checker.Watch(mon.ctx, mon.getPeers, mon.config.CheckInterval) - case <-mon.ctx.Done(): - } -} - -// SetClient saves the given rpc.Client for later use -func (mon *Monitor) SetClient(c *rpc.Client) { - mon.rpcClient = c - mon.rpcReady <- struct{}{} -} - -// Shutdown stops the peer monitor. It particular, it will -// not deliver any alerts. -func (mon *Monitor) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "monitor/basic/Shutdown") - defer span.End() - - mon.shutdownLock.Lock() - defer mon.shutdownLock.Unlock() - - if mon.shutdown { - logger.Warning("Monitor already shut down") - return nil - } - - logger.Info("stopping Monitor") - close(mon.rpcReady) - mon.cancel() - mon.wg.Wait() - mon.shutdown = true - return nil -} - -// LogMetric stores a metric so it can later be retrieved. -func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error { - ctx, span := trace.StartSpan(ctx, "monitor/basic/LogMetric") - defer span.End() - - mon.metrics.Add(m) - logger.Debugf("basic monitor logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire) - return nil -} - -// PublishMetric broadcasts a metric to all current cluster peers. -func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error { - ctx, span := trace.StartSpan(ctx, "monitor/basic/PublishMetric") - defer span.End() - - if m.Discard() { - logger.Warningf("discarding invalid metric: %+v", m) - return nil - } - - peers, err := mon.getPeers(ctx) - if err != nil { - return err - } - - ctxs, cancels := rpcutil.CtxsWithTimeout(ctx, len(peers), m.GetTTL()/2) - defer rpcutil.MultiCancel(cancels) - - logger.Debugf( - "broadcasting metric %s to %s. Expires: %d", - m.Name, - peers, - m.Expire, - ) - - // This may hang if one of the calls does, but we will return when the - // context expires. - errs := mon.rpcClient.MultiCall( - ctxs, - peers, - "Cluster", - "PeerMonitorLogMetric", - m, - rpcutil.RPCDiscardReplies(len(peers)), - ) - - var errStrs []string - - for i, e := range errs { - if e != nil { - errStr := fmt.Sprintf( - "error pushing metric to %s: %s", - peers[i].Pretty(), - e, - ) - logger.Errorf(errStr) - errStrs = append(errStrs, errStr) - } - } - - if len(errStrs) > 0 { - return errors.New(strings.Join(errStrs, "\n")) - } - - logger.Debugf( - "broadcasted metric %s to [%s]. Expires: %d", - m.Name, - peers, - m.Expire, - ) - return nil -} - -// getPeers gets the current list of peers from the consensus component -func (mon *Monitor) getPeers(ctx context.Context) ([]peer.ID, error) { - ctx, span := trace.StartSpan(ctx, "monitor/basic/getPeers") - defer span.End() - - var peers []peer.ID - err := mon.rpcClient.CallContext( - ctx, - "", - "Cluster", - "ConsensusPeers", - struct{}{}, - &peers, - ) - if err != nil { - logger.Error(err) - } - return peers, err -} - -// LatestMetrics returns last known VALID metrics of a given type. A metric -// is only valid if it has not expired and belongs to a current cluster peers. -func (mon *Monitor) LatestMetrics(ctx context.Context, name string) []*api.Metric { - ctx, span := trace.StartSpan(ctx, "monitor/basic/LatestMetrics") - defer span.End() - - latest := mon.metrics.Latest(name) - - // Make sure we only return metrics in the current peerset - peers, err := mon.getPeers(ctx) - if err != nil { - return []*api.Metric{} - } - - return metrics.PeersetFilter(latest, peers) -} - -// Alerts returns a channel on which alerts are sent when the -// monitor detects a failure. -func (mon *Monitor) Alerts() <-chan *api.Alert { - return mon.checker.Alerts() -} diff --git a/monitor/basic/peer_monitor_test.go b/monitor/basic/peer_monitor_test.go deleted file mode 100644 index 978bdab2..00000000 --- a/monitor/basic/peer_monitor_test.go +++ /dev/null @@ -1,246 +0,0 @@ -package basic - -import ( - "context" - "fmt" - "strconv" - "sync" - "testing" - "time" - - libp2p "github.com/libp2p/go-libp2p" - peer "github.com/libp2p/go-libp2p-peer" - - host "github.com/libp2p/go-libp2p-host" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/test" -) - -type metricFactory struct { - l sync.Mutex - counter int -} - -func newMetricFactory() *metricFactory { - return &metricFactory{ - counter: 0, - } -} - -func (mf *metricFactory) newMetric(n string, p peer.ID) *api.Metric { - mf.l.Lock() - defer mf.l.Unlock() - m := &api.Metric{ - Name: n, - Peer: p, - Value: fmt.Sprintf("%d", mf.counter), - Valid: true, - } - m.SetTTL(5 * time.Second) - mf.counter++ - return m -} - -func (mf *metricFactory) count() int { - mf.l.Lock() - defer mf.l.Unlock() - return mf.counter -} - -func testPeerMonitor(t *testing.T) *Monitor { - return testPeerMonitorWithHost(t, nil) -} - -func testPeerMonitorWithHost(t *testing.T, h host.Host) *Monitor { - mock := test.NewMockRPCClientWithHost(t, h) - cfg := &Config{} - cfg.Default() - cfg.CheckInterval = 2 * time.Second - mon, err := NewMonitor(cfg) - if err != nil { - t.Fatal(err) - } - mon.SetClient(mock) - return mon -} - -func TestPeerMonitorShutdown(t *testing.T) { - ctx := context.Background() - pm := testPeerMonitor(t) - err := pm.Shutdown(ctx) - if err != nil { - t.Error(err) - } - - err = pm.Shutdown(ctx) - if err != nil { - t.Error(err) - } -} - -func TestLogMetricConcurrent(t *testing.T) { - ctx := context.Background() - pm := testPeerMonitor(t) - defer pm.Shutdown(ctx) - - var wg sync.WaitGroup - wg.Add(3) - - // Insert 25 metrics - f := func() { - defer wg.Done() - for i := 0; i < 25; i++ { - mt := &api.Metric{ - Name: "test", - Peer: test.PeerID1, - Value: fmt.Sprintf("%d", time.Now().UnixNano()), - Valid: true, - } - mt.SetTTL(150 * time.Millisecond) - pm.LogMetric(ctx, mt) - time.Sleep(75 * time.Millisecond) - } - } - go f() - go f() - go f() - - // Wait for at least two metrics to be inserted - time.Sleep(200 * time.Millisecond) - last := time.Now().Add(-500 * time.Millisecond) - - for i := 0; i <= 20; i++ { - lastMtrcs := pm.LatestMetrics(ctx, "test") - - // There should always 1 valid LatestMetric "test" - if len(lastMtrcs) != 1 { - t.Error("no valid metrics", len(lastMtrcs), i) - time.Sleep(75 * time.Millisecond) - continue - } - - n, err := strconv.Atoi(lastMtrcs[0].Value) - if err != nil { - t.Fatal(err) - } - - // The timestamp of the metric cannot be older than - // the timestamp from the last - current := time.Unix(0, int64(n)) - if current.Before(last) { - t.Errorf("expected newer metric: Current: %s, Last: %s", current, last) - } - last = current - time.Sleep(75 * time.Millisecond) - } - - wg.Wait() -} - -func TestPeerMonitorLogMetric(t *testing.T) { - ctx := context.Background() - pm := testPeerMonitor(t) - defer pm.Shutdown(ctx) - mf := newMetricFactory() - - // dont fill window - pm.LogMetric(ctx, mf.newMetric("test", test.PeerID1)) - pm.LogMetric(ctx, mf.newMetric("test", test.PeerID2)) - pm.LogMetric(ctx, mf.newMetric("test", test.PeerID3)) - - // fill window - pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3)) - pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3)) - pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3)) - pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3)) - - latestMetrics := pm.LatestMetrics(ctx, "testbad") - if len(latestMetrics) != 0 { - t.Logf("%+v", latestMetrics) - t.Error("metrics should be empty") - } - - latestMetrics = pm.LatestMetrics(ctx, "test") - if len(latestMetrics) != 3 { - t.Error("metrics should correspond to 3 hosts") - } - - for _, v := range latestMetrics { - switch v.Peer { - case test.PeerID1: - if v.Value != "0" { - t.Error("bad metric value") - } - case test.PeerID2: - if v.Value != "1" { - t.Error("bad metric value") - } - case test.PeerID3: - if v.Value != "2" { - t.Error("bad metric value") - } - default: - t.Error("bad peer") - } - } - - latestMetrics = pm.LatestMetrics(ctx, "test2") - if len(latestMetrics) != 1 { - t.Fatal("should only be one metric") - } - if latestMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) { - t.Error("metric is not last") - } -} - -func TestPeerMonitorPublishMetric(t *testing.T) { - ctx := context.Background() - h, err := libp2p.New(context.Background()) - if err != nil { - t.Fatal(err) - } - - pm := testPeerMonitorWithHost(t, h) - defer pm.Shutdown(ctx) - defer h.Close() - mf := newMetricFactory() - - metric := mf.newMetric("test", test.PeerID1) - err = pm.PublishMetric(ctx, metric) - - // Note mock rpc returns 3 consensus peers and we cannot - // push to those so an error is in order and indicates - // things work as expected. - if err == nil { - t.Error("expected an error") - } -} - -func TestPeerMonitorAlerts(t *testing.T) { - ctx := context.Background() - pm := testPeerMonitor(t) - defer pm.Shutdown(ctx) - mf := newMetricFactory() - - mtr := mf.newMetric("test", test.PeerID1) - mtr.SetTTL(0) - pm.LogMetric(ctx, mtr) - time.Sleep(time.Second) - timeout := time.NewTimer(time.Second * 5) - - // it should alert twice at least. Alert re-occurrs. - for i := 0; i < 2; i++ { - select { - case <-timeout.C: - t.Fatal("should have thrown an alert by now") - case alrt := <-pm.Alerts(): - if alrt.MetricName != "test" { - t.Error("Alert should be for test") - } - if alrt.Peer != test.PeerID1 { - t.Error("Peer should be TestPeerID1") - } - } - } -} diff --git a/monitor/pubsubmon/config.go b/monitor/pubsubmon/config.go index 37fdc53d..d02626ec 100644 --- a/monitor/pubsubmon/config.go +++ b/monitor/pubsubmon/config.go @@ -56,7 +56,7 @@ func (cfg *Config) ApplyEnvVars() error { // at least in appearance. func (cfg *Config) Validate() error { if cfg.CheckInterval <= 0 { - return errors.New("basic.check_interval too low") + return errors.New("pubsubmon.check_interval too low") } return nil } @@ -67,7 +67,7 @@ func (cfg *Config) LoadJSON(raw []byte) error { jcfg := &jsonConfig{} err := json.Unmarshal(raw, jcfg) if err != nil { - logger.Error("Error unmarshaling basic monitor config") + logger.Error("Error unmarshaling pubsubmon monitor config") return err } diff --git a/sharness/config/ssl-basic_auth/service.json b/sharness/config/ssl-basic_auth/service.json index 6403d671..8ef5f0c2 100644 --- a/sharness/config/ssl-basic_auth/service.json +++ b/sharness/config/ssl-basic_auth/service.json @@ -82,9 +82,6 @@ } }, "monitor": { - "monbasic": { - "check_interval": "15s" - }, "pubsubmon": { "check_interval": "15s" } diff --git a/sharness/config/ssl/service.json b/sharness/config/ssl/service.json index f639810e..0b006764 100644 --- a/sharness/config/ssl/service.json +++ b/sharness/config/ssl/service.json @@ -79,9 +79,6 @@ } }, "monitor": { - "monbasic": { - "check_interval": "15s" - }, "pubsubmon": { "check_interval": "15s" }