From 28c24931b63ebbbd37ead827b26d440457328d27 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 20 Jun 2022 22:03:03 +0200 Subject: [PATCH] pintracker: fix some races resulting in wrong metric counts I believe this fixes the issue with some metrics like pinning going into negative numbers occasionally. Fixes #1702. --- pintracker/optracker/operation.go | 14 +++++++---- pintracker/optracker/operationtracker.go | 30 +++++++++++++++++------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index 50224d04..5a0d5322 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -75,7 +75,7 @@ func newOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase, defer span.End() ctx, cancel := context.WithCancel(ctx) - return &Operation{ + op := &Operation{ ctx: ctx, cancel: cancel, @@ -89,6 +89,8 @@ func newOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase, ts: time.Now(), error: "", } + tracker.recordMetricUnsafe(op, 1) + return op } // String returns a string representation of an Operation. @@ -124,6 +126,7 @@ func (op *Operation) Context() context.Context { func (op *Operation) Cancel() { _, span := trace.StartSpan(op.ctx, "optracker/Cancel") op.cancel() + op.tracker.recordMetric(op, -1) span.End() } @@ -141,14 +144,15 @@ func (op *Operation) Phase() Phase { // SetPhase changes the Phase and updates the timestamp. func (op *Operation) SetPhase(ph Phase) { _, span := trace.StartSpan(op.ctx, "optracker/SetPhase") - op.tracker.recordMetric(op, -1) op.mu.Lock() { + op.tracker.recordMetricUnsafe(op, -1) op.phase = ph op.ts = time.Now() + op.tracker.recordMetricUnsafe(op, 1) } op.mu.Unlock() - op.tracker.recordMetric(op, 1) + span.End() } @@ -200,15 +204,15 @@ func (op *Operation) Error() string { // an error message. It updates the timestamp. func (op *Operation) SetError(err error) { _, span := trace.StartSpan(op.ctx, "optracker/SetError") - op.tracker.recordMetric(op, -1) op.mu.Lock() { + op.tracker.recordMetricUnsafe(op, -1) op.phase = PhaseError op.error = err.Error() op.ts = time.Now() + op.tracker.recordMetricUnsafe(op, 1) } op.mu.Unlock() - op.tracker.recordMetric(op, 1) span.End() } diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index ffca373c..663d32a5 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -83,11 +83,14 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, defer opt.mu.Unlock() op, ok := opt.operations[pin.Cid] - if ok { // operation exists + if ok { // operation exists for the CID if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone { - return nil // an ongoing operation of the same sign exists + // an ongoing operation of the same + // type. i.e. pinning, or queued. + return nil } - opt.recordMetric(op, -1) + // i.e. operations in error phase + // i.e. pin operations that need to be canceled for unpinning op.Cancel() // cancel ongoing operation and replace it } @@ -99,7 +102,6 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, } logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph) opt.operations[pin.Cid] = op2 - opt.recordMetric(op2, 1) return op2 } @@ -355,12 +357,13 @@ func initializeMetrics(ctx context.Context) { stats.Record(ctx, observations.PinsPinning.M(0)) } -func (opt *OperationTracker) recordMetric(op *Operation, val int64) { - if opt == nil { +func (opt *OperationTracker) recordMetricUnsafe(op *Operation, val int64) { + if opt == nil || op == nil { return } - if op.Type() == OperationPin { - switch op.Phase() { + + if op.opType == OperationPin { + switch op.phase { case PhaseError: pinErrors := atomic.AddInt64(&opt.pinErrorCount, val) stats.Record(op.Context(), observations.PinsPinError.M(pinErrors)) @@ -376,6 +379,17 @@ func (opt *OperationTracker) recordMetric(op *Operation, val int64) { } } +func (opt *OperationTracker) recordMetric(op *Operation, val int64) { + if op == nil { + return + } + op.mu.RLock() + { + opt.recordMetricUnsafe(op, val) + } + op.mu.RUnlock() +} + // PinQueueSize returns the current number of items queued to pin. func (opt *OperationTracker) PinQueueSize() int64 { return atomic.LoadInt64(&opt.pinQueuedCount)