3b3f786d68
This commit adds support for OpenCensus tracing and metrics collection. This required support for context.Context propogation throughout the cluster codebase, and in particular, the ipfscluster component interfaces. The tracing propogates across RPC and HTTP boundaries. The current default tracing backend is Jaeger. The metrics currently exports the metrics exposed by the opencensus http plugin as well as the pprof metrics to a prometheus endpoint for scraping. The current default metrics backend is Prometheus. Metrics are currently exposed by default due to low overhead, can be turned off if desired, whereas tracing is off by default as it has a much higher performance overhead, though the extent of the performance hit can be adjusted with smaller sampling rates. License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
247 lines
5.8 KiB
Go
247 lines
5.8 KiB
Go
package optracker
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
//go:generate stringer -type=OperationType
|
|
|
|
// OperationType represents the kinds of operations that the PinTracker
|
|
// performs and the operationTracker tracks the status of.
|
|
type OperationType int
|
|
|
|
const (
|
|
// OperationUnknown represents an unknown operation.
|
|
OperationUnknown OperationType = iota
|
|
// OperationPin represents a pin operation.
|
|
OperationPin
|
|
// OperationUnpin represents an unpin operation.
|
|
OperationUnpin
|
|
// OperationRemote represents an noop operation
|
|
OperationRemote
|
|
// OperationShard represents a meta pin. We don't
|
|
// pin these.
|
|
OperationShard
|
|
)
|
|
|
|
//go:generate stringer -type=Phase
|
|
|
|
// Phase represents the multiple phase that an operation can be in.
|
|
type Phase int
|
|
|
|
const (
|
|
// PhaseError represents an error state.
|
|
PhaseError Phase = iota
|
|
// PhaseQueued represents the queued phase of an operation.
|
|
PhaseQueued
|
|
// PhaseInProgress represents the operation as in progress.
|
|
PhaseInProgress
|
|
// PhaseDone represents the operation once finished.
|
|
PhaseDone
|
|
)
|
|
|
|
// Operation represents an ongoing operation involving a
|
|
// particular Cid. It provides the type and phase of operation
|
|
// and a way to mark the operation finished (also used to cancel).
|
|
type Operation struct {
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
// RO fields
|
|
opType OperationType
|
|
pin api.Pin
|
|
|
|
// RW fields
|
|
mu sync.RWMutex
|
|
phase Phase
|
|
error string
|
|
ts time.Time
|
|
}
|
|
|
|
// NewOperation creates a new Operation.
|
|
func NewOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase) *Operation {
|
|
ctx, span := trace.StartSpan(ctx, "optracker/NewOperation")
|
|
defer span.End()
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &Operation{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
|
|
pin: pin,
|
|
opType: typ,
|
|
phase: ph,
|
|
ts: time.Now(),
|
|
error: "",
|
|
}
|
|
}
|
|
|
|
// Cid returns the Cid associated to this operation.
|
|
func (op *Operation) Cid() cid.Cid {
|
|
op.mu.RLock()
|
|
defer op.mu.RUnlock()
|
|
return op.pin.Cid
|
|
}
|
|
|
|
// Context returns the context associated to this operation.
|
|
func (op *Operation) Context() context.Context {
|
|
return op.ctx
|
|
}
|
|
|
|
// Cancel will cancel the context associated to this operation.
|
|
func (op *Operation) Cancel() {
|
|
ctx, span := trace.StartSpan(op.ctx, "optracker/Cancel")
|
|
_ = ctx
|
|
defer span.End()
|
|
op.cancel()
|
|
}
|
|
|
|
// Phase returns the Phase.
|
|
func (op *Operation) Phase() Phase {
|
|
op.mu.RLock()
|
|
defer op.mu.RUnlock()
|
|
return op.phase
|
|
}
|
|
|
|
// SetPhase changes the Phase and updates the timestamp.
|
|
func (op *Operation) SetPhase(ph Phase) {
|
|
ctx, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
|
|
_ = ctx
|
|
defer span.End()
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
op.phase = ph
|
|
op.ts = time.Now()
|
|
}
|
|
|
|
// Error returns any error message attached to the operation.
|
|
func (op *Operation) Error() string {
|
|
op.mu.RLock()
|
|
defer op.mu.RUnlock()
|
|
return op.error
|
|
}
|
|
|
|
// SetError sets the phase to PhaseError along with
|
|
// an error message. It updates the timestamp.
|
|
func (op *Operation) SetError(err error) {
|
|
ctx, span := trace.StartSpan(op.ctx, "optracker/SetError")
|
|
_ = ctx
|
|
defer span.End()
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
op.phase = PhaseError
|
|
op.error = err.Error()
|
|
op.ts = time.Now()
|
|
}
|
|
|
|
// Type returns the operation Type.
|
|
func (op *Operation) Type() OperationType {
|
|
return op.opType
|
|
}
|
|
|
|
// Pin returns the Pin object associated to the operation.
|
|
func (op *Operation) Pin() api.Pin {
|
|
return op.pin
|
|
}
|
|
|
|
// Timestamp returns the time when this operation was
|
|
// last modified (phase changed, error was set...).
|
|
func (op *Operation) Timestamp() time.Time {
|
|
op.mu.RLock()
|
|
defer op.mu.RUnlock()
|
|
return op.ts
|
|
}
|
|
|
|
// Cancelled returns whether the context for this
|
|
// operation has been cancelled.
|
|
func (op *Operation) Cancelled() bool {
|
|
ctx, span := trace.StartSpan(op.ctx, "optracker/Cancelled")
|
|
_ = ctx
|
|
defer span.End()
|
|
select {
|
|
case <-op.ctx.Done():
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// ToTrackerStatus returns an api.TrackerStatus reflecting
|
|
// the current status of this operation. It's a translation
|
|
// from the Type and the Phase.
|
|
func (op *Operation) ToTrackerStatus() api.TrackerStatus {
|
|
typ := op.Type()
|
|
ph := op.Phase()
|
|
switch typ {
|
|
case OperationPin:
|
|
switch ph {
|
|
case PhaseError:
|
|
return api.TrackerStatusPinError
|
|
case PhaseQueued:
|
|
return api.TrackerStatusPinQueued
|
|
case PhaseInProgress:
|
|
return api.TrackerStatusPinning
|
|
case PhaseDone:
|
|
return api.TrackerStatusPinned
|
|
default:
|
|
return api.TrackerStatusUndefined
|
|
}
|
|
case OperationUnpin:
|
|
switch ph {
|
|
case PhaseError:
|
|
return api.TrackerStatusUnpinError
|
|
case PhaseQueued:
|
|
return api.TrackerStatusUnpinQueued
|
|
case PhaseInProgress:
|
|
return api.TrackerStatusUnpinning
|
|
case PhaseDone:
|
|
return api.TrackerStatusUnpinned
|
|
default:
|
|
return api.TrackerStatusUndefined
|
|
}
|
|
case OperationRemote:
|
|
return api.TrackerStatusRemote
|
|
case OperationShard:
|
|
return api.TrackerStatusSharded
|
|
default:
|
|
return api.TrackerStatusUndefined
|
|
}
|
|
|
|
}
|
|
|
|
// TrackerStatusToOperationPhase takes an api.TrackerStatus and
|
|
// converts it to an OpType and Phase.
|
|
func TrackerStatusToOperationPhase(status api.TrackerStatus) (OperationType, Phase) {
|
|
switch status {
|
|
case api.TrackerStatusPinError:
|
|
return OperationPin, PhaseError
|
|
case api.TrackerStatusPinQueued:
|
|
return OperationPin, PhaseQueued
|
|
case api.TrackerStatusPinning:
|
|
return OperationPin, PhaseInProgress
|
|
case api.TrackerStatusPinned:
|
|
return OperationPin, PhaseDone
|
|
case api.TrackerStatusUnpinError:
|
|
return OperationUnpin, PhaseError
|
|
case api.TrackerStatusUnpinQueued:
|
|
return OperationUnpin, PhaseQueued
|
|
case api.TrackerStatusUnpinning:
|
|
return OperationUnpin, PhaseInProgress
|
|
case api.TrackerStatusUnpinned:
|
|
return OperationUnpin, PhaseDone
|
|
case api.TrackerStatusRemote:
|
|
return OperationRemote, PhaseDone
|
|
case api.TrackerStatusSharded:
|
|
return OperationShard, PhaseDone
|
|
default:
|
|
return OperationUnknown, PhaseError
|
|
}
|
|
}
|