Merge pull request #1647 from ipfs/fix/tracker-metrics-gauge

pintracker: metrics: convert pinning/queued/error metrics to gauges
This commit is contained in:
Hector Sanjuan 2022-04-29 22:57:04 +02:00 committed by GitHub
commit 585eb1e8e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 17 deletions

View File

@ -54,19 +54,19 @@ var (
PinsQueuedView = &view.View{ PinsQueuedView = &view.View{
Measure: PinsQueued, Measure: PinsQueued,
//TagKeys: []tag.Key{HostKey}, //TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(), Aggregation: view.LastValue(),
} }
PinsPinningView = &view.View{ PinsPinningView = &view.View{
Measure: PinsPinning, Measure: PinsPinning,
//TagKeys: []tag.Key{HostKey}, //TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(), Aggregation: view.LastValue(),
} }
PinsPinErrorView = &view.View{ PinsPinErrorView = &view.View{
Measure: PinsPinError, Measure: PinsPinError,
//TagKeys: []tag.Key{HostKey}, //TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(), Aggregation: view.LastValue(),
} }
DefaultViews = []*view.View{ DefaultViews = []*view.View{

View File

@ -54,6 +54,8 @@ type Operation struct {
ctx context.Context ctx context.Context
cancel func() cancel func()
tracker *OperationTracker
// RO fields // RO fields
opType OperationType opType OperationType
pin api.Pin pin api.Pin
@ -67,8 +69,8 @@ type Operation struct {
ts time.Time ts time.Time
} }
// NewOperation creates a new Operation. // newOperation creates a new Operation.
func NewOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase) *Operation { func newOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase, tracker *OperationTracker) *Operation {
ctx, span := trace.StartSpan(ctx, "optracker/NewOperation") ctx, span := trace.StartSpan(ctx, "optracker/NewOperation")
defer span.End() defer span.End()
@ -137,14 +139,14 @@ func (op *Operation) Phase() Phase {
// SetPhase changes the Phase and updates the timestamp. // SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) { func (op *Operation) SetPhase(ph Phase) {
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase") _, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
recordMetric(op, -1) op.tracker.recordMetric(op, -1)
op.mu.Lock() op.mu.Lock()
{ {
op.phase = ph op.phase = ph
op.ts = time.Now() op.ts = time.Now()
} }
op.mu.Unlock() op.mu.Unlock()
recordMetric(op, 1) op.tracker.recordMetric(op, 1)
span.End() span.End()
} }
@ -196,7 +198,7 @@ func (op *Operation) Error() string {
// an error message. It updates the timestamp. // an error message. It updates the timestamp.
func (op *Operation) SetError(err error) { func (op *Operation) SetError(err error) {
_, span := trace.StartSpan(op.ctx, "optracker/SetError") _, span := trace.StartSpan(op.ctx, "optracker/SetError")
recordMetric(op, -1) op.tracker.recordMetric(op, -1)
op.mu.Lock() op.mu.Lock()
{ {
op.phase = PhaseError op.phase = PhaseError
@ -204,7 +206,7 @@ func (op *Operation) SetError(err error) {
op.ts = time.Now() op.ts = time.Now()
} }
op.mu.Unlock() op.mu.Unlock()
recordMetric(op, 1) op.tracker.recordMetric(op, 1)
span.End() span.End()
} }

View File

@ -12,7 +12,7 @@ import (
func TestOperation(t *testing.T) { func TestOperation(t *testing.T) {
tim := time.Now().Add(-2 * time.Second) tim := time.Now().Add(-2 * time.Second)
op := NewOperation(context.Background(), api.PinCid(test.Cid1), OperationUnpin, PhaseQueued) op := newOperation(context.Background(), api.PinCid(test.Cid1), OperationUnpin, PhaseQueued, nil)
if !op.Cid().Equals(test.Cid1) { if !op.Cid().Equals(test.Cid1) {
t.Error("bad cid") t.Error("bad cid")
} }

View File

@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
@ -32,6 +33,10 @@ type OperationTracker struct {
mu sync.RWMutex mu sync.RWMutex
operations map[api.Cid]*Operation operations map[api.Cid]*Operation
pinningCount int64
pinErrorCount int64
pinQueuedCount int64
} }
func (opt *OperationTracker) String() string { func (opt *OperationTracker) String() string {
@ -82,11 +87,11 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone { if op.Type() == typ && op.Phase() != PhaseError && op.Phase() != PhaseDone {
return nil // an ongoing operation of the same sign exists return nil // an ongoing operation of the same sign exists
} }
recordMetric(op, -1) opt.recordMetric(op, -1)
op.Cancel() // cancel ongoing operation and replace it op.Cancel() // cancel ongoing operation and replace it
} }
op2 := NewOperation(ctx, pin, typ, ph) op2 := newOperation(ctx, pin, typ, ph, opt)
if ok && op.Type() == typ { if ok && op.Type() == typ {
// Carry over the attempt count when doing an operation of the // Carry over the attempt count when doing an operation of the
// same type. The old operation exists and was cancelled. // same type. The old operation exists and was cancelled.
@ -94,7 +99,7 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
} }
logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph) logger.Debugf("'%s' on cid '%s' has been created with phase '%s'", typ, pin.Cid, ph)
opt.operations[pin.Cid] = op2 opt.operations[pin.Cid] = op2
recordMetric(op2, 1) opt.recordMetric(op2, 1)
return op2 return op2
} }
@ -343,15 +348,21 @@ func initializeMetrics(ctx context.Context) {
stats.Record(ctx, observations.PinsPinning.M(0)) stats.Record(ctx, observations.PinsPinning.M(0))
} }
func recordMetric(op *Operation, val int64) { func (opt *OperationTracker) recordMetric(op *Operation, val int64) {
if opt == nil {
return
}
if op.Type() == OperationPin { if op.Type() == OperationPin {
switch op.Phase() { switch op.Phase() {
case PhaseError: case PhaseError:
stats.Record(op.Context(), observations.PinsPinError.M(val)) pinErrors := atomic.AddInt64(&opt.pinErrorCount, val)
stats.Record(op.Context(), observations.PinsPinError.M(pinErrors))
case PhaseQueued: case PhaseQueued:
stats.Record(op.Context(), observations.PinsQueued.M(val)) pinQueued := atomic.AddInt64(&opt.pinQueuedCount, val)
stats.Record(op.Context(), observations.PinsQueued.M(pinQueued))
case PhaseInProgress: case PhaseInProgress:
stats.Record(op.Context(), observations.PinsPinning.M(val)) pinning := atomic.AddInt64(&opt.pinningCount, val)
stats.Record(op.Context(), observations.PinsPinning.M(pinning))
case PhaseDone: case PhaseDone:
// we have no metric to log anything // we have no metric to log anything
} }