metrics: track total pins, queued, pinning, pin error.

This fixes #1470 and #1187.
This commit is contained in:
Hector Sanjuan 2022-04-22 15:26:40 +02:00
parent 4b351cad47
commit 3169fba9d1
13 changed files with 113 additions and 58 deletions

View File

@ -276,6 +276,7 @@ func (css *Consensus) setup() {
css.crdt = crdt css.crdt = crdt
clusterState, err := dsstate.New( clusterState, err := dsstate.New(
css.ctx,
css.crdt, css.crdt,
// unsure if we should set something else but crdt is already // unsure if we should set something else but crdt is already
// namespaced and this would only namespace the keys, which only // namespaced and this would only namespace the keys, which only
@ -290,6 +291,7 @@ func (css *Consensus) setup() {
css.state = clusterState css.state = clusterState
batchingState, err := dsstate.NewBatching( batchingState, err := dsstate.NewBatching(
css.ctx,
css.crdt, css.crdt,
"", "",
dsstate.DefaultHandle(), dsstate.DefaultHandle(),
@ -663,5 +665,5 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return dsstate.NewBatching(crdt, "", dsstate.DefaultHandle()) return dsstate.NewBatching(context.Background(), crdt, "", dsstate.DefaultHandle())
} }

View File

@ -71,28 +71,30 @@ func NewConsensus(
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx, cancel := context.WithCancel(context.Background())
logger.Debug("starting Consensus and waiting for a leader...") logger.Debug("starting Consensus and waiting for a leader...")
baseOp := &LogOp{tracing: cfg.Tracing} baseOp := &LogOp{tracing: cfg.Tracing}
state, err := dsstate.New( state, err := dsstate.New(
ctx,
store, store,
cfg.DatastoreNamespace, cfg.DatastoreNamespace,
dsstate.DefaultHandle(), dsstate.DefaultHandle(),
) )
if err != nil { if err != nil {
cancel()
return nil, err return nil, err
} }
consensus := libp2praft.NewOpLog(state, baseOp) consensus := libp2praft.NewOpLog(state, baseOp)
raft, err := newRaftWrapper(host, cfg, consensus.FSM(), staging) raft, err := newRaftWrapper(host, cfg, consensus.FSM(), staging)
if err != nil { if err != nil {
logger.Error("error creating raft: ", err) logger.Error("error creating raft: ", err)
cancel()
return nil, err return nil, err
} }
actor := libp2praft.NewActor(raft.raft) actor := libp2praft.NewActor(raft.raft)
consensus.SetActor(actor) consensus.SetActor(actor)
ctx, cancel := context.WithCancel(context.Background())
cc := &Consensus{ cc := &Consensus{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -550,7 +552,7 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.State, error) {
return nil, err return nil, err
} }
st, err := dsstate.New(store, cfg.DatastoreNamespace, dsstate.DefaultHandle()) st, err := dsstate.New(context.Background(), store, cfg.DatastoreNamespace, dsstate.DefaultHandle())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -316,7 +316,7 @@ func TestRaftLatestSnapshot(t *testing.T) {
} }
// Call raft.LastState and ensure we get the correct state // Call raft.LastState and ensure we get the correct state
snapState, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) snapState, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -21,7 +21,7 @@ func TestApplyToPin(t *testing.T) {
defer cleanRaft(1) defer cleanRaft(1)
defer cc.Shutdown(ctx) defer cc.Shutdown(ctx)
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -54,7 +54,7 @@ func TestApplyToUnpin(t *testing.T) {
defer cleanRaft(1) defer cleanRaft(1)
defer cc.Shutdown(ctx) defer cc.Shutdown(ctx)
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -7,12 +7,8 @@ import (
"time" "time"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
) )
// AlertChannelCap specifies how much buffer the alerts channel has. // AlertChannelCap specifies how much buffer the alerts channel has.
@ -135,11 +131,6 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
} }
select { select {
case mc.alertCh <- alrt: case mc.alertCh <- alrt:
stats.RecordWithTags(
mc.ctx,
[]tag.Mutator{tag.Upsert(observations.RemotePeerKey, pid.Pretty())},
observations.Alerts.M(1),
)
default: default:
return ErrAlertChannelFull return ErrAlertChannelFull
} }

View File

@ -12,10 +12,10 @@ import (
var logger = logging.Logger("observations") var logger = logging.Logger("observations")
var ( var (
// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go) // taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go)
// latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) // latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
// bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) // bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536) // messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
) )
// attributes // attributes
@ -31,47 +31,49 @@ var (
// metrics // metrics
var ( var (
// Pins counts the number of pins ipfs-cluster is tracking. // This metric is managed in state/dsstate.
Pins = stats.Int64("cluster/pin_count", "Number of pins", stats.UnitDimensionless) Pins = stats.Int64("pins", "Total number of pins", stats.UnitDimensionless)
// TrackerPins counts the number of pins the local peer is tracking.
TrackerPins = stats.Int64("pintracker/pin_count", "Number of pins", stats.UnitDimensionless) // These metrics are managed by the pintracker/optracker module.
// Peers counts the number of ipfs-cluster peers are currently in the cluster. PinsQueued = stats.Int64("pins/pin_queued", "Number of pins queued for pinning", stats.UnitDimensionless)
Peers = stats.Int64("cluster/peers", "Number of cluster peers", stats.UnitDimensionless) PinsPinning = stats.Int64("pins/pinning", "Number of pins currently pinning", stats.UnitDimensionless)
// Alerts is the number of alerts that have been sent due to peers not sending "ping" heartbeats in time. PinsPinError = stats.Int64("pins/pin_error", "Number of pins in pin_error state", stats.UnitDimensionless)
Alerts = stats.Int64("cluster/alerts", "Number of alerts triggered", stats.UnitDimensionless)
) )
// views, which is just the aggregation of the metrics // views, which is just the aggregation of the metrics
var ( var (
PinsView = &view.View{ PinsView = &view.View{
Measure: Pins, Measure: Pins,
TagKeys: []tag.Key{HostKey}, // This would add a tag to the metric if a value for this key
// is present in the context when recording the observation.
//TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(), Aggregation: view.LastValue(),
} }
TrackerPinsView = &view.View{ PinsQueuedView = &view.View{
Measure: TrackerPins, Measure: PinsQueued,
TagKeys: []tag.Key{HostKey}, //TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(), Aggregation: view.Sum(),
} }
PeersView = &view.View{ PinsPinningView = &view.View{
Measure: Peers, Measure: PinsPinning,
TagKeys: []tag.Key{HostKey}, //TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(), Aggregation: view.Sum(),
} }
AlertsView = &view.View{ PinsPinErrorView = &view.View{
Measure: Alerts, Measure: PinsPinError,
TagKeys: []tag.Key{HostKey, RemotePeerKey}, //TagKeys: []tag.Key{HostKey},
Aggregation: messageCountDistribution, Aggregation: view.Sum(),
} }
DefaultViews = []*view.View{ DefaultViews = []*view.View{
PinsView, PinsView,
TrackerPinsView, PinsQueuedView,
PeersView, PinsPinningView,
AlertsView, PinsPinErrorView,
} }
) )

View File

@ -137,12 +137,14 @@ func (op *Operation) Phase() Phase {
// SetPhase changes the Phase and updates the timestamp. // SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) { func (op *Operation) SetPhase(ph Phase) {
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase") _, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
recordMetric(op, -1)
op.mu.Lock() op.mu.Lock()
{ {
op.phase = ph op.phase = ph
op.ts = time.Now() op.ts = time.Now()
} }
op.mu.Unlock() op.mu.Unlock()
recordMetric(op, 1)
span.End() span.End()
} }
@ -194,6 +196,7 @@ func (op *Operation) Error() string {
// an error message. It updates the timestamp. // an error message. It updates the timestamp.
func (op *Operation) SetError(err error) { func (op *Operation) SetError(err error) {
_, span := trace.StartSpan(op.ctx, "optracker/SetError") _, span := trace.StartSpan(op.ctx, "optracker/SetError")
recordMetric(op, -1)
op.mu.Lock() op.mu.Lock()
{ {
op.phase = PhaseError op.phase = PhaseError
@ -201,6 +204,7 @@ func (op *Operation) SetError(err error) {
op.ts = time.Now() op.ts = time.Now()
} }
op.mu.Unlock() op.mu.Unlock()
recordMetric(op, 1)
span.End() span.End()
} }

View File

@ -13,10 +13,12 @@ import (
"time" "time"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
"go.opencensus.io/stats"
"go.opencensus.io/trace" "go.opencensus.io/trace"
) )
@ -52,6 +54,8 @@ func (opt *OperationTracker) String() string {
// NewOperationTracker creates a new OperationTracker. // NewOperationTracker creates a new OperationTracker.
func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *OperationTracker { func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *OperationTracker {
initializeMetrics(ctx)
return &OperationTracker{ return &OperationTracker{
ctx: ctx, ctx: ctx,
pid: pid, pid: pid,
@ -78,6 +82,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone { if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone {
return nil // an ongoing operation of the same sign exists return nil // an ongoing operation of the same sign exists
} }
recordMetric(op, -1)
op.Cancel() // cancel ongoing operation and replace it op.Cancel() // cancel ongoing operation and replace it
} }
@ -89,6 +94,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
} }
logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph) logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph)
opt.operations[pin.Cid] = op2 opt.operations[pin.Cid] = op2
recordMetric(op2, 1)
return op2 return op2
} }
@ -121,6 +127,7 @@ func (opt *OperationTracker) Status(ctx context.Context, c api.Cid) (api.Tracker
// is PhaseDone. Any other phases are considered in-flight and not touched. // is PhaseDone. Any other phases are considered in-flight and not touched.
// For things already in error, the error message is updated. // For things already in error, the error message is updated.
// Remote pins are ignored too. // Remote pins are ignored too.
// Only used in tests right now.
func (opt *OperationTracker) SetError(ctx context.Context, c api.Cid, err error) { func (opt *OperationTracker) SetError(ctx context.Context, c api.Cid, err error) {
opt.mu.Lock() opt.mu.Lock()
defer opt.mu.Unlock() defer opt.mu.Unlock()
@ -287,6 +294,7 @@ func (opt *OperationTracker) Filter(ctx context.Context, ipfs api.IPFSID, filter
// with the matching filter. Note, only supports // with the matching filter. Note, only supports
// filters of type OperationType or Phase, any other type // filters of type OperationType or Phase, any other type
// will result in a nil slice being returned. // will result in a nil slice being returned.
// Only used in tests right now.
func (opt *OperationTracker) filterOps(ctx context.Context, filters ...interface{}) []*Operation { func (opt *OperationTracker) filterOps(ctx context.Context, filters ...interface{}) []*Operation {
var fltops []*Operation var fltops []*Operation
opt.mu.RLock() opt.mu.RLock()
@ -328,3 +336,24 @@ func filter(ctx context.Context, in, out map[api.Cid]*Operation, filter interfac
} }
} }
} }
func initializeMetrics(ctx context.Context) {
stats.Record(ctx, observations.PinsPinError.M(0))
stats.Record(ctx, observations.PinsQueued.M(0))
stats.Record(ctx, observations.PinsPinning.M(0))
}
func recordMetric(op *Operation, val int64) {
if op.Type() == OperationPin {
switch op.Phase() {
case PhaseError:
stats.Record(op.Context(), observations.PinsPinError.M(val))
case PhaseQueued:
stats.Record(op.Context(), observations.PinsQueued.M(val))
case PhaseInProgress:
stats.Record(op.Context(), observations.PinsPinning.M(val))
case PhaseDone:
// we have no metric to log anything
}
}
}

View File

@ -40,8 +40,8 @@ var sortPinInfoByCid = func(p []api.PinInfo) {
// - Cid2 - weird / remote // replication factor set to 0, no allocations // - Cid2 - weird / remote // replication factor set to 0, no allocations
// - Cid3 - remote - this pin is on ipfs // - Cid3 - remote - this pin is on ipfs
// - Cid4 - pin everywhere - this pin is not on ipfs // - Cid4 - pin everywhere - this pin is not on ipfs
func prefilledState(context.Context) (state.ReadOnly, error) { func prefilledState(ctx context.Context) (state.ReadOnly, error) {
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -59,7 +59,6 @@ func prefilledState(context.Context) (state.ReadOnly, error) {
api.PinWithOpts(test.Cid4, pinOpts), api.PinWithOpts(test.Cid4, pinOpts),
} }
ctx := context.Background()
for _, pin := range pins { for _, pin := range pins {
err = st.Add(ctx, pin) err = st.Add(ctx, pin)
if err != nil { if err != nil {

View File

@ -121,7 +121,7 @@ func getStateFunc(t testing.TB, items ...api.Pin) func(context.Context) (state.R
t.Helper() t.Helper()
ctx := context.Background() ctx := context.Background()
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) st, err := dsstate.New(ctx, inmem.New(), "", dsstate.DefaultHandle())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -6,8 +6,10 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sync/atomic"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
@ -16,6 +18,7 @@ import (
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
codec "github.com/ugorji/go/codec" codec "github.com/ugorji/go/codec"
"go.opencensus.io/stats"
trace "go.opencensus.io/trace" trace "go.opencensus.io/trace"
) )
@ -34,6 +37,8 @@ type State struct {
codecHandle codec.Handle codecHandle codec.Handle
namespace ds.Key namespace ds.Key
// version int // version int
totalPins int64
} }
// DefaultHandle returns the codec handler of choice (Msgpack). // DefaultHandle returns the codec handler of choice (Msgpack).
@ -49,7 +54,7 @@ func DefaultHandle() codec.Handle {
// //
// The Handle controls options for the serialization of the full state // The Handle controls options for the serialization of the full state
// (marshaling/unmarshaling). // (marshaling/unmarshaling).
func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) { func New(ctx context.Context, dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) {
if handle == nil { if handle == nil {
handle = DefaultHandle() handle = DefaultHandle()
} }
@ -59,21 +64,34 @@ func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, er
dsWrite: dstore, dsWrite: dstore,
codecHandle: handle, codecHandle: handle,
namespace: ds.NewKey(namespace), namespace: ds.NewKey(namespace),
totalPins: 0,
} }
stats.Record(ctx, observations.Pins.M(0))
return st, nil return st, nil
} }
// Add adds a new Pin or replaces an existing one. // Add adds a new Pin or replaces an existing one.
func (st *State) Add(ctx context.Context, c api.Pin) error { func (st *State) Add(ctx context.Context, c api.Pin) (err error) {
_, span := trace.StartSpan(ctx, "state/dsstate/Add") _, span := trace.StartSpan(ctx, "state/dsstate/Add")
defer span.End() defer span.End()
ps, err := st.serializePin(c) ps, err := st.serializePin(c)
if err != nil { if err != nil {
return err return
} }
return st.dsWrite.Put(ctx, st.key(c.Cid), ps)
has, _ := st.Has(ctx, c.Cid)
defer func() {
if !has && err == nil {
total := atomic.AddInt64(&st.totalPins, 1)
stats.Record(ctx, observations.Pins.M(total))
}
}()
err = st.dsWrite.Put(ctx, st.key(c.Cid), ps)
return
} }
// Rm removes an existing Pin. It is a no-op when the // Rm removes an existing Pin. It is a no-op when the
@ -86,6 +104,11 @@ func (st *State) Rm(ctx context.Context, c api.Cid) error {
if err == ds.ErrNotFound { if err == ds.ErrNotFound {
return nil return nil
} }
if err == nil {
total := atomic.AddInt64(&st.totalPins, -1)
stats.Record(ctx, observations.Pins.M(total))
}
return err return err
} }
@ -140,7 +163,7 @@ func (st *State) List(ctx context.Context, out chan<- api.Pin) error {
} }
defer results.Close() defer results.Close()
total := 0 var total int64
for r := range results.Next() { for r := range results.Next() {
// Abort if we shutdown. // Abort if we shutdown.
select { select {
@ -177,7 +200,8 @@ func (st *State) List(ctx context.Context, out chan<- api.Pin) error {
if total >= 500000 { if total >= 500000 {
logger.Infof("Full pinset listing finished: %d pins", total) logger.Infof("Full pinset listing finished: %d pins", total)
} }
atomic.StoreInt64(&st.totalPins, total)
stats.Record(ctx, observations.Pins.M(total))
return nil return nil
} }
@ -308,7 +332,7 @@ type BatchingState struct {
// //
// The Handle controls options for the serialization of the full state // The Handle controls options for the serialization of the full state
// (marshaling/unmarshaling). // (marshaling/unmarshaling).
func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) { func NewBatching(ctx context.Context, dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) {
if handle == nil { if handle == nil {
handle = DefaultHandle() handle = DefaultHandle()
} }
@ -328,6 +352,8 @@ func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*Ba
bst := &BatchingState{} bst := &BatchingState{}
bst.State = st bst.State = st
bst.batch = batch bst.batch = batch
stats.Record(ctx, observations.Pins.M(0))
return bst, nil return bst, nil
} }

View File

@ -29,7 +29,7 @@ var c = api.Pin{
func newState(t *testing.T) *State { func newState(t *testing.T) *State {
store := inmem.New() store := inmem.New()
ds, err := New(store, "", DefaultHandle()) ds, err := New(context.Background(), store, "", DefaultHandle())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -110,7 +110,7 @@ type mockRepoGCResp struct {
// NewIpfsMock returns a new mock. // NewIpfsMock returns a new mock.
func NewIpfsMock(t *testing.T) *IpfsMock { func NewIpfsMock(t *testing.T) *IpfsMock {
store := inmem.New() store := inmem.New()
st, err := dsstate.New(store, "", dsstate.DefaultHandle()) st, err := dsstate.New(context.Background(), store, "", dsstate.DefaultHandle())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }