Remove basic monitor (#726)

Remove basic monitor

This commit removes `basic` monitor component, because it is not being
used by default since few releases ago pubsub monitor was introduced.

Issue #689
This commit is contained in:
Kishan Sagathiya 2019-03-21 22:48:40 +05:30 committed by GitHub
parent af4ccdc9bb
commit 962d249e74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 19 additions and 710 deletions

View File

@ -28,9 +28,6 @@ jobs:
- go test -v -coverprofile=coverage.txt -covermode=atomic ./... - go test -v -coverprofile=coverage.txt -covermode=atomic ./...
after_success: after_success:
- bash <(curl -s https://codecov.io/bash) - bash <(curl -s https://codecov.io/bash)
- name: "Main Tests with basic monitor"
script:
- go test -v . -monitor basic
- name: "Main Tests with stateless tracker" - name: "Main Tests with stateless tracker"
script: script:
- go test -v . -tracker stateless - go test -v . -tracker stateless

View File

@ -15,6 +15,7 @@ import (
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate" "github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/test"
@ -138,7 +139,7 @@ type mockTracer struct {
} }
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, state.State, PinTracker) { func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, state.State, PinTracker) {
clusterCfg, _, _, _, consensusCfg, maptrackerCfg, statelesstrackerCfg, bmonCfg, psmonCfg, _, _ := testingConfigs() clusterCfg, _, _, _, consensusCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
host, err := NewClusterHost(context.Background(), clusterCfg) host, err := NewClusterHost(context.Background(), clusterCfg)
if err != nil { if err != nil {
@ -154,9 +155,11 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, state.Sta
raftcon, _ := raft.NewConsensus(host, consensusCfg, st, false) raftcon, _ := raft.NewConsensus(host, consensusCfg, st, false)
bmonCfg.CheckInterval = 2 * time.Second
psmonCfg.CheckInterval = 2 * time.Second psmonCfg.CheckInterval = 2 * time.Second
mon := makeMonitor(t, host, bmonCfg, psmonCfg) mon, err := pubsubmon.New(host, psmonCfg)
if err != nil {
t.Fatal(err)
}
alloc := ascendalloc.NewAllocator() alloc := ascendalloc.NewAllocator()
numpinCfg := &numpin.Config{} numpinCfg := &numpin.Config{}

View File

@ -12,7 +12,6 @@ import (
"github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/maptracker"
@ -27,7 +26,6 @@ type cfgs struct {
consensusCfg *raft.Config consensusCfg *raft.Config
maptrackerCfg *maptracker.Config maptrackerCfg *maptracker.Config
statelessTrackerCfg *stateless.Config statelessTrackerCfg *stateless.Config
monCfg *basic.Config
pubsubmonCfg *pubsubmon.Config pubsubmonCfg *pubsubmon.Config
diskInfCfg *disk.Config diskInfCfg *disk.Config
numpinInfCfg *numpin.Config numpinInfCfg *numpin.Config
@ -44,7 +42,6 @@ func makeConfigs() (*config.Manager, *cfgs) {
consensusCfg := &raft.Config{} consensusCfg := &raft.Config{}
maptrackerCfg := &maptracker.Config{} maptrackerCfg := &maptracker.Config{}
statelessCfg := &stateless.Config{} statelessCfg := &stateless.Config{}
monCfg := &basic.Config{}
pubsubmonCfg := &pubsubmon.Config{} pubsubmonCfg := &pubsubmon.Config{}
diskInfCfg := &disk.Config{} diskInfCfg := &disk.Config{}
numpinInfCfg := &numpin.Config{} numpinInfCfg := &numpin.Config{}
@ -57,7 +54,6 @@ func makeConfigs() (*config.Manager, *cfgs) {
cfg.RegisterComponent(config.Consensus, consensusCfg) cfg.RegisterComponent(config.Consensus, consensusCfg)
cfg.RegisterComponent(config.PinTracker, maptrackerCfg) cfg.RegisterComponent(config.PinTracker, maptrackerCfg)
cfg.RegisterComponent(config.PinTracker, statelessCfg) cfg.RegisterComponent(config.PinTracker, statelessCfg)
cfg.RegisterComponent(config.Monitor, monCfg)
cfg.RegisterComponent(config.Monitor, pubsubmonCfg) cfg.RegisterComponent(config.Monitor, pubsubmonCfg)
cfg.RegisterComponent(config.Informer, diskInfCfg) cfg.RegisterComponent(config.Informer, diskInfCfg)
cfg.RegisterComponent(config.Informer, numpinInfCfg) cfg.RegisterComponent(config.Informer, numpinInfCfg)
@ -71,7 +67,6 @@ func makeConfigs() (*config.Manager, *cfgs) {
consensusCfg, consensusCfg,
maptrackerCfg, maptrackerCfg,
statelessCfg, statelessCfg,
monCfg,
pubsubmonCfg, pubsubmonCfg,
diskInfCfg, diskInfCfg,
numpinInfCfg, numpinInfCfg,

View File

@ -20,7 +20,6 @@ import (
"github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/maptracker"
@ -145,7 +144,9 @@ func createCluster(
checkErr("creating consensus component", err) checkErr("creating consensus component", err)
tracker := setupPinTracker(c.String("pintracker"), host, cfgs.maptrackerCfg, cfgs.statelessTrackerCfg, cfgs.clusterCfg.Peername) tracker := setupPinTracker(c.String("pintracker"), host, cfgs.maptrackerCfg, cfgs.statelessTrackerCfg, cfgs.clusterCfg.Peername)
mon := setupMonitor(c.String("monitor"), host, cfgs.monCfg, cfgs.pubsubmonCfg) mon, err := pubsubmon.New(host, cfgs.pubsubmonCfg)
checkErr("creating monitor", err)
logger.Debug("pubsub monitor loaded")
informer, alloc := setupAllocation(c.String("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg) informer, alloc := setupAllocation(c.String("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg)
ipfscluster.ReadyTimeout = cfgs.consensusCfg.WaitForLeaderTimeout + 5*time.Second ipfscluster.ReadyTimeout = cfgs.consensusCfg.WaitForLeaderTimeout + 5*time.Second
@ -248,31 +249,6 @@ func setupAllocation(
} }
} }
func setupMonitor(
name string,
h host.Host,
basicCfg *basic.Config,
pubsubCfg *pubsubmon.Config,
) ipfscluster.PeerMonitor {
switch name {
case "basic":
mon, err := basic.NewMonitor(basicCfg)
checkErr("creating monitor", err)
logger.Debug("basic monitor loaded")
return mon
case "pubsub":
mon, err := pubsubmon.New(h, pubsubCfg)
checkErr("creating monitor", err)
logger.Debug("pubsub monitor loaded")
return mon
default:
err := errors.New("unknown monitor type")
checkErr("", err)
return nil
}
}
func setupPinTracker( func setupPinTracker(
name string, name string,
h host.Host, h host.Host,

View File

@ -25,7 +25,6 @@ const programName = `ipfs-cluster-service`
// flag defaults // flag defaults
const ( const (
defaultAllocation = "disk-freespace" defaultAllocation = "disk-freespace"
defaultMonitor = "pubsub"
defaultPinTracker = "map" defaultPinTracker = "map"
defaultLogLevel = "info" defaultLogLevel = "info"
) )
@ -275,12 +274,6 @@ configuration.
Value: defaultAllocation, Value: defaultAllocation,
Usage: "allocation strategy to use [disk-freespace,disk-reposize,numpin].", Usage: "allocation strategy to use [disk-freespace,disk-reposize,numpin].",
}, },
cli.StringFlag{
Name: "monitor",
Value: defaultMonitor,
Hidden: true,
Usage: "peer monitor to use [basic,pubsub].",
},
cli.StringFlag{ cli.StringFlag{
Name: "pintracker", Name: "pintracker",
Value: defaultPinTracker, Value: defaultPinTracker,

View File

@ -6,7 +6,6 @@ import (
"github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/maptracker"
@ -104,8 +103,8 @@ var testingTracerCfg = []byte(`{
"enable_tracing": false "enable_tracing": false
}`) }`)
func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *basic.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs() clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
clusterCfg.LoadJSON(testingClusterCfg) clusterCfg.LoadJSON(testingClusterCfg)
apiCfg.LoadJSON(testingAPICfg) apiCfg.LoadJSON(testingAPICfg)
proxyCfg.LoadJSON(testingProxyCfg) proxyCfg.LoadJSON(testingProxyCfg)
@ -113,15 +112,14 @@ func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Confi
consensusCfg.LoadJSON(testingRaftCfg) consensusCfg.LoadJSON(testingRaftCfg)
maptrackerCfg.LoadJSON(testingTrackerCfg) maptrackerCfg.LoadJSON(testingTrackerCfg)
statelesstrkrCfg.LoadJSON(testingTrackerCfg) statelesstrkrCfg.LoadJSON(testingTrackerCfg)
basicmonCfg.LoadJSON(testingMonCfg)
pubsubmonCfg.LoadJSON(testingMonCfg) pubsubmonCfg.LoadJSON(testingMonCfg)
diskInfCfg.LoadJSON(testingDiskInfCfg) diskInfCfg.LoadJSON(testingDiskInfCfg)
tracingCfg.LoadJSON(testingTracerCfg) tracingCfg.LoadJSON(testingTracerCfg)
return clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, tracingCfg return clusterCfg, apiCfg, proxyCfg, ipfsCfg, consensusCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg
} }
func testingEmptyConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *basic.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { func testingEmptyConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
clusterCfg := &Config{} clusterCfg := &Config{}
apiCfg := &rest.Config{} apiCfg := &rest.Config{}
proxyCfg := &ipfsproxy.Config{} proxyCfg := &ipfsproxy.Config{}
@ -129,11 +127,10 @@ func testingEmptyConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.
consensusCfg := &raft.Config{} consensusCfg := &raft.Config{}
maptrackerCfg := &maptracker.Config{} maptrackerCfg := &maptracker.Config{}
statelessCfg := &stateless.Config{} statelessCfg := &stateless.Config{}
basicmonCfg := &basic.Config{}
pubsubmonCfg := &pubsubmon.Config{} pubsubmonCfg := &pubsubmon.Config{}
diskInfCfg := &disk.Config{} diskInfCfg := &disk.Config{}
tracingCfg := &observations.TracingConfig{} tracingCfg := &observations.TracingConfig{}
return clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelessCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, tracingCfg return clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg
} }
// func TestConfigDefault(t *testing.T) { // func TestConfigDefault(t *testing.T) {

View File

@ -19,7 +19,6 @@ import (
"github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/maptracker"
@ -142,7 +141,7 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (
checkErr(t, err) checkErr(t, err)
peername := fmt.Sprintf("peer_%d", i) peername := fmt.Sprintf("peer_%d", i)
clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelesstrackerCfg, bmonCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs() clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, consensusCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs()
clusterCfg.ID = pid clusterCfg.ID = pid
clusterCfg.Peername = peername clusterCfg.Peername = peername
@ -171,7 +170,8 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (
state := mapstate.NewMapState() state := mapstate.NewMapState()
tracker := makePinTracker(t, clusterCfg.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) tracker := makePinTracker(t, clusterCfg.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
mon := makeMonitor(t, host, bmonCfg, psmonCfg) mon, err := pubsubmon.New(host, psmonCfg)
checkErr(t, err)
alloc := descendalloc.NewAllocator() alloc := descendalloc.NewAllocator()
inf, err := disk.NewInformer(diskInfCfg) inf, err := disk.NewInformer(diskInfCfg)
@ -185,21 +185,6 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (
return host, clusterCfg, raftCon, []API{api, ipfsProxy}, ipfs, state, tracker, mon, alloc, inf, tracer, mock return host, clusterCfg, raftCon, []API{api, ipfsProxy}, ipfs, state, tracker, mon, alloc, inf, tracer, mock
} }
func makeMonitor(t *testing.T, h host.Host, bmonCfg *basic.Config, psmonCfg *pubsubmon.Config) PeerMonitor {
var mon PeerMonitor
var err error
switch pmonitor {
case "basic":
mon, err = basic.NewMonitor(bmonCfg)
case "pubsub":
mon, err = pubsubmon.New(h, psmonCfg)
default:
panic("bad monitor")
}
checkErr(t, err)
return mon
}
func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg *stateless.Config, peerName string) PinTracker { func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg *stateless.Config, peerName string) PinTracker {
var ptrkr PinTracker var ptrkr PinTracker
switch ptracker { switch ptracker {

View File

@ -1,97 +0,0 @@
package basic
import (
"encoding/json"
"errors"
"time"
"github.com/ipfs/ipfs-cluster/config"
"github.com/kelseyhightower/envconfig"
)
const configKey = "monbasic"
const envConfigKey = "cluster_monbasic"
// Default values for this Config.
const (
DefaultCheckInterval = 15 * time.Second
)
// Config allows to initialize a Monitor and customize some parameters.
type Config struct {
config.Saver
CheckInterval time.Duration
}
type jsonConfig struct {
CheckInterval string `json:"check_interval"`
}
// ConfigKey provides a human-friendly identifier for this type of Config.
func (cfg *Config) ConfigKey() string {
return configKey
}
// Default sets the fields of this Config to sensible values.
func (cfg *Config) Default() error {
cfg.CheckInterval = DefaultCheckInterval
return nil
}
// ApplyEnvVars fills in any Config fields found
// as environment variables.
func (cfg *Config) ApplyEnvVars() error {
jcfg := cfg.toJSONConfig()
err := envconfig.Process(envConfigKey, jcfg)
if err != nil {
return err
}
return cfg.applyJSONConfig(jcfg)
}
// Validate checks that the fields of this Config have working values,
// at least in appearance.
func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 {
return errors.New("basic.check_interval too low")
}
return nil
}
// LoadJSON sets the fields of this Config to the values defined by the JSON
// representation of it, as generated by ToJSON.
func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg)
if err != nil {
logger.Error("Error unmarshaling basic monitor config")
return err
}
cfg.Default()
return cfg.applyJSONConfig(jcfg)
}
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
interval, _ := time.ParseDuration(jcfg.CheckInterval)
cfg.CheckInterval = interval
return cfg.Validate()
}
// ToJSON generates a human-friendly JSON representation of this Config.
func (cfg *Config) ToJSON() ([]byte, error) {
jcfg := cfg.toJSONConfig()
return json.MarshalIndent(jcfg, "", " ")
}
func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{
CheckInterval: cfg.CheckInterval.String(),
}
}

View File

@ -1,69 +0,0 @@
package basic
import (
"encoding/json"
"os"
"testing"
"time"
)
var cfgJSON = []byte(`
{
"check_interval": "15s"
}
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(cfgJSON)
if err != nil {
t.Fatal(err)
}
j := &jsonConfig{}
json.Unmarshal(cfgJSON, j)
j.CheckInterval = "-10"
tst, _ := json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error decoding check_interval")
}
}
func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(cfgJSON)
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
}
cfg = &Config{}
err = cfg.LoadJSON(newjson)
if err != nil {
t.Fatal(err)
}
}
func TestDefault(t *testing.T) {
cfg := &Config{}
cfg.Default()
if cfg.Validate() != nil {
t.Fatal("error validating")
}
cfg.CheckInterval = 0
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}
func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_MONBASIC_CHECKINTERVAL", "22s")
cfg := &Config{}
cfg.ApplyEnvVars()
if cfg.CheckInterval != 22*time.Second {
t.Fatal("failed to override check_interval with env var")
}
}

View File

@ -1,219 +0,0 @@
// Package basic implements a basic PeerMonitor component for IPFS Cluster. This
// component is in charge of logging metrics and triggering alerts when a peer
// goes down.
package basic
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/metrics"
"github.com/ipfs/ipfs-cluster/rpcutil"
"go.opencensus.io/trace"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("monitor")
// Monitor is a component in charge of monitoring peers, logging
// metrics and detecting failures
type Monitor struct {
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
metrics *metrics.Store
checker *metrics.Checker
config *Config
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// NewMonitor creates a new monitor using the given config.
func NewMonitor(cfg *Config) (*Monitor, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
mtrs := metrics.NewStore()
checker := metrics.NewChecker(mtrs)
mon := &Monitor{
ctx: ctx,
cancel: cancel,
rpcReady: make(chan struct{}, 1),
metrics: mtrs,
checker: checker,
config: cfg,
}
go mon.run()
return mon, nil
}
func (mon *Monitor) run() {
select {
case <-mon.rpcReady:
go mon.checker.Watch(mon.ctx, mon.getPeers, mon.config.CheckInterval)
case <-mon.ctx.Done():
}
}
// SetClient saves the given rpc.Client for later use
func (mon *Monitor) SetClient(c *rpc.Client) {
mon.rpcClient = c
mon.rpcReady <- struct{}{}
}
// Shutdown stops the peer monitor. It particular, it will
// not deliver any alerts.
func (mon *Monitor) Shutdown(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "monitor/basic/Shutdown")
defer span.End()
mon.shutdownLock.Lock()
defer mon.shutdownLock.Unlock()
if mon.shutdown {
logger.Warning("Monitor already shut down")
return nil
}
logger.Info("stopping Monitor")
close(mon.rpcReady)
mon.cancel()
mon.wg.Wait()
mon.shutdown = true
return nil
}
// LogMetric stores a metric so it can later be retrieved.
func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error {
ctx, span := trace.StartSpan(ctx, "monitor/basic/LogMetric")
defer span.End()
mon.metrics.Add(m)
logger.Debugf("basic monitor logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire)
return nil
}
// PublishMetric broadcasts a metric to all current cluster peers.
func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error {
ctx, span := trace.StartSpan(ctx, "monitor/basic/PublishMetric")
defer span.End()
if m.Discard() {
logger.Warningf("discarding invalid metric: %+v", m)
return nil
}
peers, err := mon.getPeers(ctx)
if err != nil {
return err
}
ctxs, cancels := rpcutil.CtxsWithTimeout(ctx, len(peers), m.GetTTL()/2)
defer rpcutil.MultiCancel(cancels)
logger.Debugf(
"broadcasting metric %s to %s. Expires: %d",
m.Name,
peers,
m.Expire,
)
// This may hang if one of the calls does, but we will return when the
// context expires.
errs := mon.rpcClient.MultiCall(
ctxs,
peers,
"Cluster",
"PeerMonitorLogMetric",
m,
rpcutil.RPCDiscardReplies(len(peers)),
)
var errStrs []string
for i, e := range errs {
if e != nil {
errStr := fmt.Sprintf(
"error pushing metric to %s: %s",
peers[i].Pretty(),
e,
)
logger.Errorf(errStr)
errStrs = append(errStrs, errStr)
}
}
if len(errStrs) > 0 {
return errors.New(strings.Join(errStrs, "\n"))
}
logger.Debugf(
"broadcasted metric %s to [%s]. Expires: %d",
m.Name,
peers,
m.Expire,
)
return nil
}
// getPeers gets the current list of peers from the consensus component
func (mon *Monitor) getPeers(ctx context.Context) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "monitor/basic/getPeers")
defer span.End()
var peers []peer.ID
err := mon.rpcClient.CallContext(
ctx,
"",
"Cluster",
"ConsensusPeers",
struct{}{},
&peers,
)
if err != nil {
logger.Error(err)
}
return peers, err
}
// LatestMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peers.
func (mon *Monitor) LatestMetrics(ctx context.Context, name string) []*api.Metric {
ctx, span := trace.StartSpan(ctx, "monitor/basic/LatestMetrics")
defer span.End()
latest := mon.metrics.Latest(name)
// Make sure we only return metrics in the current peerset
peers, err := mon.getPeers(ctx)
if err != nil {
return []*api.Metric{}
}
return metrics.PeersetFilter(latest, peers)
}
// Alerts returns a channel on which alerts are sent when the
// monitor detects a failure.
func (mon *Monitor) Alerts() <-chan *api.Alert {
return mon.checker.Alerts()
}

View File

@ -1,246 +0,0 @@
package basic
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
libp2p "github.com/libp2p/go-libp2p"
peer "github.com/libp2p/go-libp2p-peer"
host "github.com/libp2p/go-libp2p-host"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
type metricFactory struct {
l sync.Mutex
counter int
}
func newMetricFactory() *metricFactory {
return &metricFactory{
counter: 0,
}
}
func (mf *metricFactory) newMetric(n string, p peer.ID) *api.Metric {
mf.l.Lock()
defer mf.l.Unlock()
m := &api.Metric{
Name: n,
Peer: p,
Value: fmt.Sprintf("%d", mf.counter),
Valid: true,
}
m.SetTTL(5 * time.Second)
mf.counter++
return m
}
func (mf *metricFactory) count() int {
mf.l.Lock()
defer mf.l.Unlock()
return mf.counter
}
func testPeerMonitor(t *testing.T) *Monitor {
return testPeerMonitorWithHost(t, nil)
}
func testPeerMonitorWithHost(t *testing.T, h host.Host) *Monitor {
mock := test.NewMockRPCClientWithHost(t, h)
cfg := &Config{}
cfg.Default()
cfg.CheckInterval = 2 * time.Second
mon, err := NewMonitor(cfg)
if err != nil {
t.Fatal(err)
}
mon.SetClient(mock)
return mon
}
func TestPeerMonitorShutdown(t *testing.T) {
ctx := context.Background()
pm := testPeerMonitor(t)
err := pm.Shutdown(ctx)
if err != nil {
t.Error(err)
}
err = pm.Shutdown(ctx)
if err != nil {
t.Error(err)
}
}
func TestLogMetricConcurrent(t *testing.T) {
ctx := context.Background()
pm := testPeerMonitor(t)
defer pm.Shutdown(ctx)
var wg sync.WaitGroup
wg.Add(3)
// Insert 25 metrics
f := func() {
defer wg.Done()
for i := 0; i < 25; i++ {
mt := &api.Metric{
Name: "test",
Peer: test.PeerID1,
Value: fmt.Sprintf("%d", time.Now().UnixNano()),
Valid: true,
}
mt.SetTTL(150 * time.Millisecond)
pm.LogMetric(ctx, mt)
time.Sleep(75 * time.Millisecond)
}
}
go f()
go f()
go f()
// Wait for at least two metrics to be inserted
time.Sleep(200 * time.Millisecond)
last := time.Now().Add(-500 * time.Millisecond)
for i := 0; i <= 20; i++ {
lastMtrcs := pm.LatestMetrics(ctx, "test")
// There should always 1 valid LatestMetric "test"
if len(lastMtrcs) != 1 {
t.Error("no valid metrics", len(lastMtrcs), i)
time.Sleep(75 * time.Millisecond)
continue
}
n, err := strconv.Atoi(lastMtrcs[0].Value)
if err != nil {
t.Fatal(err)
}
// The timestamp of the metric cannot be older than
// the timestamp from the last
current := time.Unix(0, int64(n))
if current.Before(last) {
t.Errorf("expected newer metric: Current: %s, Last: %s", current, last)
}
last = current
time.Sleep(75 * time.Millisecond)
}
wg.Wait()
}
func TestPeerMonitorLogMetric(t *testing.T) {
ctx := context.Background()
pm := testPeerMonitor(t)
defer pm.Shutdown(ctx)
mf := newMetricFactory()
// dont fill window
pm.LogMetric(ctx, mf.newMetric("test", test.PeerID1))
pm.LogMetric(ctx, mf.newMetric("test", test.PeerID2))
pm.LogMetric(ctx, mf.newMetric("test", test.PeerID3))
// fill window
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
latestMetrics := pm.LatestMetrics(ctx, "testbad")
if len(latestMetrics) != 0 {
t.Logf("%+v", latestMetrics)
t.Error("metrics should be empty")
}
latestMetrics = pm.LatestMetrics(ctx, "test")
if len(latestMetrics) != 3 {
t.Error("metrics should correspond to 3 hosts")
}
for _, v := range latestMetrics {
switch v.Peer {
case test.PeerID1:
if v.Value != "0" {
t.Error("bad metric value")
}
case test.PeerID2:
if v.Value != "1" {
t.Error("bad metric value")
}
case test.PeerID3:
if v.Value != "2" {
t.Error("bad metric value")
}
default:
t.Error("bad peer")
}
}
latestMetrics = pm.LatestMetrics(ctx, "test2")
if len(latestMetrics) != 1 {
t.Fatal("should only be one metric")
}
if latestMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) {
t.Error("metric is not last")
}
}
func TestPeerMonitorPublishMetric(t *testing.T) {
ctx := context.Background()
h, err := libp2p.New(context.Background())
if err != nil {
t.Fatal(err)
}
pm := testPeerMonitorWithHost(t, h)
defer pm.Shutdown(ctx)
defer h.Close()
mf := newMetricFactory()
metric := mf.newMetric("test", test.PeerID1)
err = pm.PublishMetric(ctx, metric)
// Note mock rpc returns 3 consensus peers and we cannot
// push to those so an error is in order and indicates
// things work as expected.
if err == nil {
t.Error("expected an error")
}
}
func TestPeerMonitorAlerts(t *testing.T) {
ctx := context.Background()
pm := testPeerMonitor(t)
defer pm.Shutdown(ctx)
mf := newMetricFactory()
mtr := mf.newMetric("test", test.PeerID1)
mtr.SetTTL(0)
pm.LogMetric(ctx, mtr)
time.Sleep(time.Second)
timeout := time.NewTimer(time.Second * 5)
// it should alert twice at least. Alert re-occurrs.
for i := 0; i < 2; i++ {
select {
case <-timeout.C:
t.Fatal("should have thrown an alert by now")
case alrt := <-pm.Alerts():
if alrt.MetricName != "test" {
t.Error("Alert should be for test")
}
if alrt.Peer != test.PeerID1 {
t.Error("Peer should be TestPeerID1")
}
}
}
}

View File

@ -56,7 +56,7 @@ func (cfg *Config) ApplyEnvVars() error {
// at least in appearance. // at least in appearance.
func (cfg *Config) Validate() error { func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 { if cfg.CheckInterval <= 0 {
return errors.New("basic.check_interval too low") return errors.New("pubsubmon.check_interval too low")
} }
return nil return nil
} }
@ -67,7 +67,7 @@ func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{} jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg) err := json.Unmarshal(raw, jcfg)
if err != nil { if err != nil {
logger.Error("Error unmarshaling basic monitor config") logger.Error("Error unmarshaling pubsubmon monitor config")
return err return err
} }

View File

@ -82,9 +82,6 @@
} }
}, },
"monitor": { "monitor": {
"monbasic": {
"check_interval": "15s"
},
"pubsubmon": { "pubsubmon": {
"check_interval": "15s" "check_interval": "15s"
} }

View File

@ -79,9 +79,6 @@
} }
}, },
"monitor": { "monitor": {
"monbasic": {
"check_interval": "15s"
},
"pubsubmon": { "pubsubmon": {
"check_interval": "15s" "check_interval": "15s"
} }