Merge pull request #1717 from ipfs-cluster/fix/1702-neg-metrics

pintracker: fix some races resulting in wrong metric counts
This commit is contained in:
Hector Sanjuan 2022-06-20 22:29:51 +02:00 committed by GitHub
commit d6166d802b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 13 deletions

View File

@ -75,7 +75,7 @@ func newOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase,
defer span.End() defer span.End()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
return &Operation{ op := &Operation{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -89,6 +89,8 @@ func newOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase,
ts: time.Now(), ts: time.Now(),
error: "", error: "",
} }
tracker.recordMetricUnsafe(op, 1)
return op
} }
// String returns a string representation of an Operation. // String returns a string representation of an Operation.
@ -124,6 +126,7 @@ func (op *Operation) Context() context.Context {
func (op *Operation) Cancel() { func (op *Operation) Cancel() {
_, span := trace.StartSpan(op.ctx, "optracker/Cancel") _, span := trace.StartSpan(op.ctx, "optracker/Cancel")
op.cancel() op.cancel()
op.tracker.recordMetric(op, -1)
span.End() span.End()
} }
@ -141,14 +144,15 @@ func (op *Operation) Phase() Phase {
// SetPhase changes the Phase and updates the timestamp. // SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) { func (op *Operation) SetPhase(ph Phase) {
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase") _, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
op.tracker.recordMetric(op, -1)
op.mu.Lock() op.mu.Lock()
{ {
op.tracker.recordMetricUnsafe(op, -1)
op.phase = ph op.phase = ph
op.ts = time.Now() op.ts = time.Now()
op.tracker.recordMetricUnsafe(op, 1)
} }
op.mu.Unlock() op.mu.Unlock()
op.tracker.recordMetric(op, 1)
span.End() span.End()
} }
@ -200,15 +204,15 @@ func (op *Operation) Error() string {
// an error message. It updates the timestamp. // an error message. It updates the timestamp.
func (op *Operation) SetError(err error) { func (op *Operation) SetError(err error) {
_, span := trace.StartSpan(op.ctx, "optracker/SetError") _, span := trace.StartSpan(op.ctx, "optracker/SetError")
op.tracker.recordMetric(op, -1)
op.mu.Lock() op.mu.Lock()
{ {
op.tracker.recordMetricUnsafe(op, -1)
op.phase = PhaseError op.phase = PhaseError
op.error = err.Error() op.error = err.Error()
op.ts = time.Now() op.ts = time.Now()
op.tracker.recordMetricUnsafe(op, 1)
} }
op.mu.Unlock() op.mu.Unlock()
op.tracker.recordMetric(op, 1)
span.End() span.End()
} }

View File

@ -83,11 +83,14 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
defer opt.mu.Unlock() defer opt.mu.Unlock()
op, ok := opt.operations[pin.Cid] op, ok := opt.operations[pin.Cid]
if ok { // operation exists if ok { // operation exists for the CID
if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone { if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone {
return nil // an ongoing operation of the same sign exists // an ongoing operation of the same
// type. i.e. pinning, or queued.
return nil
} }
opt.recordMetric(op, -1) // i.e. operations in error phase
// i.e. pin operations that need to be canceled for unpinning
op.Cancel() // cancel ongoing operation and replace it op.Cancel() // cancel ongoing operation and replace it
} }
@ -99,7 +102,6 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
} }
logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph) logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph)
opt.operations[pin.Cid] = op2 opt.operations[pin.Cid] = op2
opt.recordMetric(op2, 1)
return op2 return op2
} }
@ -355,12 +357,13 @@ func initializeMetrics(ctx context.Context) {
stats.Record(ctx, observations.PinsPinning.M(0)) stats.Record(ctx, observations.PinsPinning.M(0))
} }
func (opt *OperationTracker) recordMetric(op *Operation, val int64) { func (opt *OperationTracker) recordMetricUnsafe(op *Operation, val int64) {
if opt == nil { if opt == nil || op == nil {
return return
} }
if op.Type() == OperationPin {
switch op.Phase() { if op.opType == OperationPin {
switch op.phase {
case PhaseError: case PhaseError:
pinErrors := atomic.AddInt64(&opt.pinErrorCount, val) pinErrors := atomic.AddInt64(&opt.pinErrorCount, val)
stats.Record(op.Context(), observations.PinsPinError.M(pinErrors)) stats.Record(op.Context(), observations.PinsPinError.M(pinErrors))
@ -376,6 +379,17 @@ func (opt *OperationTracker) recordMetric(op *Operation, val int64) {
} }
} }
func (opt *OperationTracker) recordMetric(op *Operation, val int64) {
if op == nil {
return
}
op.mu.RLock()
{
opt.recordMetricUnsafe(op, val)
}
op.mu.RUnlock()
}
// PinQueueSize returns the current number of items queued to pin. // PinQueueSize returns the current number of items queued to pin.
func (opt *OperationTracker) PinQueueSize() int64 { func (opt *OperationTracker) PinQueueSize() int64 {
return atomic.LoadInt64(&opt.pinQueuedCount) return atomic.LoadInt64(&opt.pinQueuedCount)