add Alerts measure
License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
parent
b0dbcbaa8d
commit
3c09ebcc71
|
@ -156,7 +156,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
|
|||
raftcon, _ := raft.NewConsensus(host, raftCfg, store, false)
|
||||
|
||||
psmonCfg.CheckInterval = 2 * time.Second
|
||||
mon, err := pubsubmon.New(psmonCfg, pubsub, raftcon.Peers)
|
||||
mon, err := pubsubmon.New(context.Background(), psmonCfg, pubsub, raftcon.Peers)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
|
||||
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
|
||||
"github.com/ipfs/ipfs-cluster/pstoremgr"
|
||||
"go.opencensus.io/tag"
|
||||
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
|
@ -103,6 +104,8 @@ func createCluster(
|
|||
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg)
|
||||
checkErr("creating libP2P Host", err)
|
||||
|
||||
ctx = tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty()))
|
||||
|
||||
peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath())
|
||||
// Import peers but do not connect. We cannot connect to peers until
|
||||
// everything has been created (dht, pubsub, bitswap). Otherwise things
|
||||
|
|
|
@ -187,7 +187,7 @@ func createComponents(t *testing.T, host host.Host, pubsub *pubsub.PubSub, dht *
|
|||
if consensus == "raft" {
|
||||
peersF = cons.Peers
|
||||
}
|
||||
mon, err := pubsubmon.New(psmonCfg, pubsub, peersF)
|
||||
mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, peersF)
|
||||
checkErr(t, err)
|
||||
|
||||
tracer, err := observations.SetupTracing(tracingCfg)
|
||||
|
|
|
@ -297,7 +297,7 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
|
|||
return err
|
||||
}
|
||||
logger.Debugf("Refs for %s sucessfully fetched", hash)
|
||||
stats.Record(ctx, observations.PinCountMetric.M(1))
|
||||
stats.Record(ctx, observations.Pins.M(1))
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("pin/add?arg=%s&%s", hash, pinArgs)
|
||||
|
@ -329,7 +329,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error {
|
|||
return err
|
||||
}
|
||||
logger.Info("IPFS Unpin request succeeded:", hash)
|
||||
stats.Record(ctx, observations.PinCountMetric.M(-1))
|
||||
stats.Record(ctx, observations.Pins.M(-1))
|
||||
}
|
||||
|
||||
logger.Debug("IPFS object is already unpinned: ", hash)
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/observations"
|
||||
"go.opencensus.io/stats"
|
||||
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
)
|
||||
|
@ -19,6 +21,7 @@ var ErrAlertChannelFull = errors.New("alert channel is full")
|
|||
// Checker provides utilities to find expired metrics
|
||||
// for a given peerset and send alerts if it proceeds to do so.
|
||||
type Checker struct {
|
||||
ctx context.Context
|
||||
alertCh chan *api.Alert
|
||||
metrics *Store
|
||||
threshold float64
|
||||
|
@ -30,8 +33,9 @@ type Checker struct {
|
|||
// The greater the threshold value the more leniency is granted.
|
||||
//
|
||||
// A value between 2.0 and 4.0 is suggested for the threshold.
|
||||
func NewChecker(metrics *Store, threshold float64) *Checker {
|
||||
func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker {
|
||||
return &Checker{
|
||||
ctx: ctx,
|
||||
alertCh: make(chan *api.Alert, AlertChannelCap),
|
||||
metrics: metrics,
|
||||
threshold: threshold,
|
||||
|
@ -92,6 +96,7 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
|
|||
}
|
||||
select {
|
||||
case mc.alertCh <- alrt:
|
||||
stats.Record(mc.ctx, observations.Alerts.M(1))
|
||||
default:
|
||||
return ErrAlertChannelFull
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
func TestCheckPeers(t *testing.T) {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(metrics, 2.0)
|
||||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
|
||||
metr := &api.Metric{
|
||||
Name: "ping",
|
||||
|
@ -63,7 +63,7 @@ func TestChecker_Watch(t *testing.T) {
|
|||
defer cancel()
|
||||
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(metrics, 2.0)
|
||||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
|
||||
metr := &api.Metric{
|
||||
Name: "test",
|
||||
|
@ -91,7 +91,7 @@ func TestChecker_Watch(t *testing.T) {
|
|||
func TestChecker_Failed(t *testing.T) {
|
||||
t.Run("standard failure check", func(t *testing.T) {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(metrics, 2.0)
|
||||
checker := NewChecker(context.Background(), metrics, 2.0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics.Add(makePeerMetric(test.PeerID1, "1"))
|
||||
|
@ -128,7 +128,7 @@ func TestThresholdValues(t *testing.T) {
|
|||
phivs := make([]timeseries, 0)
|
||||
for _, v := range thresholds {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(metrics, v)
|
||||
checker := NewChecker(context.Background(), metrics, v)
|
||||
tsName := fmt.Sprintf("%f", v)
|
||||
distTS := newTS(tsName)
|
||||
phivTS := newTS(tsName)
|
||||
|
@ -188,7 +188,7 @@ func TestThresholdValues(t *testing.T) {
|
|||
phivs := make([]timeseries, 0)
|
||||
for _, v := range thresholds {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(metrics, v)
|
||||
checker := NewChecker(context.Background(), metrics, v)
|
||||
tsName := fmt.Sprintf("%f", v)
|
||||
distTS := newTS(tsName)
|
||||
phivTS := newTS(tsName)
|
||||
|
@ -249,7 +249,7 @@ func TestThresholdValues(t *testing.T) {
|
|||
phivs := make([]timeseries, 0)
|
||||
for _, v := range thresholds {
|
||||
metrics := NewStore()
|
||||
checker := NewChecker(metrics, v)
|
||||
checker := NewChecker(context.Background(), metrics, v)
|
||||
tsName := fmt.Sprintf("%f", v)
|
||||
distTS := newTS(tsName)
|
||||
phivTS := newTS(tsName)
|
||||
|
|
|
@ -56,6 +56,7 @@ type PeersFunc func(context.Context) ([]peer.ID, error)
|
|||
// PeersFunc. The PeersFunc can be nil. In this case, no metric filtering is
|
||||
// done based on peers (any peer is considered part of the peerset).
|
||||
func New(
|
||||
ctx context.Context,
|
||||
cfg *Config,
|
||||
psub *pubsub.PubSub,
|
||||
peers PeersFunc,
|
||||
|
@ -65,10 +66,10 @@ func New(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
mtrs := metrics.NewStore()
|
||||
checker := metrics.NewChecker(mtrs, cfg.FailureThreshold)
|
||||
checker := metrics.NewChecker(ctx, mtrs, cfg.FailureThreshold)
|
||||
|
||||
subscription, err := psub.Subscribe(PubsubTopic)
|
||||
if err != nil {
|
||||
|
|
|
@ -17,49 +17,58 @@ var (
|
|||
bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
|
||||
)
|
||||
|
||||
// opencensus attributes
|
||||
// attributes
|
||||
var (
|
||||
ClientIPAttribute = "http.client.ip"
|
||||
)
|
||||
|
||||
// opencensus keys
|
||||
// keys
|
||||
var (
|
||||
HostKey = makeKey("host")
|
||||
)
|
||||
|
||||
// opencensus metrics
|
||||
// metrics
|
||||
var (
|
||||
// PinCountMetric counts the number of pins ipfs-cluster is tracking.
|
||||
PinCountMetric = stats.Int64("cluster/pin_count", "Number of pins", stats.UnitDimensionless)
|
||||
// TrackerPinCountMetric counts the number of pins the local peer is tracking.
|
||||
TrackerPinCountMetric = stats.Int64("pintracker/pin_count", "Number of pins", stats.UnitDimensionless)
|
||||
// PeerCountMetric counts the number of ipfs-cluster peers are currently in the cluster.
|
||||
PeerCountMetric = stats.Int64("cluster/peer_count", "Number of cluster peers", stats.UnitDimensionless)
|
||||
// Pins counts the number of pins ipfs-cluster is tracking.
|
||||
Pins = stats.Int64("cluster/pin_count", "Number of pins", stats.UnitDimensionless)
|
||||
// TrackerPins counts the number of pins the local peer is tracking.
|
||||
TrackerPins = stats.Int64("pintracker/pin_count", "Number of pins", stats.UnitDimensionless)
|
||||
// Peers counts the number of ipfs-cluster peers are currently in the cluster.
|
||||
Peers = stats.Int64("cluster/peers", "Number of cluster peers", stats.UnitDimensionless)
|
||||
// Alerts is the number of alerts that have been sent due to peers not sending "ping" heartbeats in time.
|
||||
Alerts = stats.Int64("cluster/alerts", "Number of alerts triggered", stats.UnitDimensionless)
|
||||
)
|
||||
|
||||
// opencensus views, which is just the aggregation of the metrics
|
||||
// views, which is just the aggregation of the metrics
|
||||
var (
|
||||
PinCountView = &view.View{
|
||||
Measure: PinCountMetric,
|
||||
Aggregation: view.Sum(),
|
||||
PinsView = &view.View{
|
||||
Measure: Pins,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
|
||||
TrackerPinCountView = &view.View{
|
||||
Measure: TrackerPinCountMetric,
|
||||
TrackerPinsView = &view.View{
|
||||
Measure: TrackerPins,
|
||||
TagKeys: []tag.Key{HostKey},
|
||||
Aggregation: view.Sum(),
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
|
||||
PeerCountView = &view.View{
|
||||
Measure: PeerCountMetric,
|
||||
PeersView = &view.View{
|
||||
Measure: Peers,
|
||||
TagKeys: []tag.Key{HostKey},
|
||||
Aggregation: view.Count(),
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
|
||||
AlertsView = &view.View{
|
||||
Measure: Alerts,
|
||||
TagKeys: []tag.Key{HostKey},
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
|
||||
DefaultViews = []*view.View{
|
||||
PinCountView,
|
||||
TrackerPinCountView,
|
||||
PeerCountView,
|
||||
PinsView,
|
||||
TrackerPinsView,
|
||||
PeersView,
|
||||
AlertsView,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user