From de40b2cd23dfeb9f3f63120d01510bd9b02230f5 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 26 Apr 2022 15:13:35 +0200 Subject: [PATCH] pintracker: metrics: convert pinning/queued/error metrics to gauges We were currently tracking this metrics as a counter (SUM). The number is good, but conceptually this is more a gauge (LastValue), given it can go down. Thus we switch it by tracking the aggregation numbers directy in the operation tracker. --- observations/metrics.go | 6 +++--- pintracker/optracker/operation.go | 14 +++++++------ pintracker/optracker/operation_test.go | 2 +- pintracker/optracker/operationtracker.go | 25 +++++++++++++++++------- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/observations/metrics.go b/observations/metrics.go index b2adbffd..04bfed87 100644 --- a/observations/metrics.go +++ b/observations/metrics.go @@ -54,19 +54,19 @@ var ( PinsQueuedView = &view.View{ Measure: PinsQueued, //TagKeys: []tag.Key{HostKey}, - Aggregation: view.Sum(), + Aggregation: view.LastValue(), } PinsPinningView = &view.View{ Measure: PinsPinning, //TagKeys: []tag.Key{HostKey}, - Aggregation: view.Sum(), + Aggregation: view.LastValue(), } PinsPinErrorView = &view.View{ Measure: PinsPinError, //TagKeys: []tag.Key{HostKey}, - Aggregation: view.Sum(), + Aggregation: view.LastValue(), } DefaultViews = []*view.View{ diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index a976460a..0b9d6e3b 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -54,6 +54,8 @@ type Operation struct { ctx context.Context cancel func() + tracker *OperationTracker + // RO fields opType OperationType pin api.Pin @@ -67,8 +69,8 @@ type Operation struct { ts time.Time } -// NewOperation creates a new Operation. -func NewOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase) *Operation { +// newOperation creates a new Operation. +func newOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase, tracker *OperationTracker) *Operation { ctx, span := trace.StartSpan(ctx, "optracker/NewOperation") defer span.End() @@ -137,14 +139,14 @@ func (op *Operation) Phase() Phase { // SetPhase changes the Phase and updates the timestamp. func (op *Operation) SetPhase(ph Phase) { _, span := trace.StartSpan(op.ctx, "optracker/SetPhase") - recordMetric(op, -1) + op.tracker.recordMetric(op, -1) op.mu.Lock() { op.phase = ph op.ts = time.Now() } op.mu.Unlock() - recordMetric(op, 1) + op.tracker.recordMetric(op, 1) span.End() } @@ -196,7 +198,7 @@ func (op *Operation) Error() string { // an error message. It updates the timestamp. func (op *Operation) SetError(err error) { _, span := trace.StartSpan(op.ctx, "optracker/SetError") - recordMetric(op, -1) + op.tracker.recordMetric(op, -1) op.mu.Lock() { op.phase = PhaseError @@ -204,7 +206,7 @@ func (op *Operation) SetError(err error) { op.ts = time.Now() } op.mu.Unlock() - recordMetric(op, 1) + op.tracker.recordMetric(op, 1) span.End() } diff --git a/pintracker/optracker/operation_test.go b/pintracker/optracker/operation_test.go index 7d1cdd9f..7c9bd80d 100644 --- a/pintracker/optracker/operation_test.go +++ b/pintracker/optracker/operation_test.go @@ -12,7 +12,7 @@ import ( func TestOperation(t *testing.T) { tim := time.Now().Add(-2 * time.Second) - op := NewOperation(context.Background(), api.PinCid(test.Cid1), OperationUnpin, PhaseQueued) + op := newOperation(context.Background(), api.PinCid(test.Cid1), OperationUnpin, PhaseQueued, nil) if !op.Cid().Equals(test.Cid1) { t.Error("bad cid") } diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index 0859f4b5..d8d068bc 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -10,6 +10,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/ipfs/ipfs-cluster/api" @@ -32,6 +33,10 @@ type OperationTracker struct { mu sync.RWMutex operations map[api.Cid]*Operation + + pinningCount int64 + pinErrorCount int64 + pinQueuedCount int64 } func (opt *OperationTracker) String() string { @@ -82,11 +87,11 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone { return nil // an ongoing operation of the same sign exists } - recordMetric(op, -1) + opt.recordMetric(op, -1) op.Cancel() // cancel ongoing operation and replace it } - op2 := NewOperation(ctx, pin, typ, ph) + op2 := newOperation(ctx, pin, typ, ph, opt) if ok && op.Type() == typ { // Carry over the attempt count when doing an operation of the // same type. The old operation exists and was cancelled. @@ -94,7 +99,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, } logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph) opt.operations[pin.Cid] = op2 - recordMetric(op2, 1) + opt.recordMetric(op2, 1) return op2 } @@ -343,15 +348,21 @@ func initializeMetrics(ctx context.Context) { stats.Record(ctx, observations.PinsPinning.M(0)) } -func recordMetric(op *Operation, val int64) { +func (opt *OperationTracker) recordMetric(op *Operation, val int64) { + if opt == nil { + return + } if op.Type() == OperationPin { switch op.Phase() { case PhaseError: - stats.Record(op.Context(), observations.PinsPinError.M(val)) + pinErrors := atomic.AddInt64(&opt.pinErrorCount, val) + stats.Record(op.Context(), observations.PinsPinError.M(pinErrors)) case PhaseQueued: - stats.Record(op.Context(), observations.PinsQueued.M(val)) + pinQueued := atomic.AddInt64(&opt.pinQueuedCount, val) + stats.Record(op.Context(), observations.PinsQueued.M(pinQueued)) case PhaseInProgress: - stats.Record(op.Context(), observations.PinsPinning.M(val)) + pinning := atomic.AddInt64(&opt.pinningCount, val) + stats.Record(op.Context(), observations.PinsPinning.M(pinning)) case PhaseDone: // we have no metric to log anything }