ipfs-cluster/pintracker/optracker/operation.go
Hector Sanjuan a97ed10d0b Adopt api.Cid type - replaces cid.Cid everwhere.
This commit introduces an api.Cid type and replaces the usage of cid.Cid
everywhere.

The main motivation here is to override MarshalJSON so that Cids are
JSON-ified as '"Qm...."' instead of '{ "/": "Qm....." }', as this "ipld"
representation of IDs is horrible to work with, and our APIs are not issuing
IPLD objects to start with.

Unfortunately, there is no way to do this cleanly, and the best way is to just
switch everything to our own type.
2022-04-07 14:27:39 +02:00

312 lines
7.2 KiB
Go

package optracker
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
"go.opencensus.io/trace"
)
//go:generate stringer -type=OperationType
// OperationType represents the kinds of operations that the PinTracker
// performs and the operationTracker tracks the status of.
type OperationType int
const (
// OperationUnknown represents an unknown operation.
OperationUnknown OperationType = iota
// OperationPin represents a pin operation.
OperationPin
// OperationUnpin represents an unpin operation.
OperationUnpin
// OperationRemote represents an noop operation
OperationRemote
// OperationShard represents a meta pin. We don't
// pin these.
OperationShard
)
//go:generate stringer -type=Phase
// Phase represents the multiple phase that an operation can be in.
type Phase int
const (
// PhaseError represents an error state.
PhaseError Phase = iota
// PhaseQueued represents the queued phase of an operation.
PhaseQueued
// PhaseInProgress represents the operation as in progress.
PhaseInProgress
// PhaseDone represents the operation once finished.
PhaseDone
)
// Operation represents an ongoing operation involving a
// particular Cid. It provides the type and phase of operation
// and a way to mark the operation finished (also used to cancel).
type Operation struct {
ctx context.Context
cancel func()
// RO fields
opType OperationType
pin api.Pin
// RW fields
mu sync.RWMutex
phase Phase
attemptCount int
priority bool
error string
ts time.Time
}
// NewOperation creates a new Operation.
func NewOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase) *Operation {
ctx, span := trace.StartSpan(ctx, "optracker/NewOperation")
defer span.End()
ctx, cancel := context.WithCancel(ctx)
return &Operation{
ctx: ctx,
cancel: cancel,
pin: pin,
opType: typ,
phase: ph,
attemptCount: 0,
priority: false,
ts: time.Now(),
error: "",
}
}
// String returns a string representation of an Operation.
func (op *Operation) String() string {
var b strings.Builder
fmt.Fprintf(&b, "type: %s\n", op.Type().String())
fmt.Fprint(&b, "pin:\n")
pinstr := op.Pin().String()
pinstrs := strings.Split(pinstr, "\n")
for _, s := range pinstrs {
fmt.Fprintf(&b, "\t%s\n", s)
}
fmt.Fprintf(&b, "phase: %s\n", op.Phase().String())
fmt.Fprintf(&b, "attemptCount: %d\n", op.AttemptCount())
fmt.Fprintf(&b, "error: %s\n", op.Error())
fmt.Fprintf(&b, "timestamp: %s\n", op.Timestamp().String())
return b.String()
}
// Cid returns the Cid associated to this operation.
func (op *Operation) Cid() api.Cid {
return op.pin.Cid
}
// Context returns the context associated to this operation.
func (op *Operation) Context() context.Context {
return op.ctx
}
// Cancel will cancel the context associated to this operation.
func (op *Operation) Cancel() {
_, span := trace.StartSpan(op.ctx, "optracker/Cancel")
op.cancel()
span.End()
}
// Phase returns the Phase.
func (op *Operation) Phase() Phase {
var ph Phase
op.mu.RLock()
ph = op.phase
op.mu.RUnlock()
return ph
}
// SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) {
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
op.mu.Lock()
{
op.phase = ph
op.ts = time.Now()
}
op.mu.Unlock()
span.End()
}
// AttemptCount returns the number of times that this operation has been in
// progress.
func (op *Operation) AttemptCount() int {
var retries int
op.mu.RLock()
retries = op.attemptCount
op.mu.RUnlock()
return retries
}
// IncAttempt does a plus-one on the AttemptCount.
func (op *Operation) IncAttempt() {
op.mu.Lock()
op.attemptCount++
op.mu.Unlock()
}
// PriorityPin returns true if the pin has been marked as priority pin.
func (op *Operation) PriorityPin() bool {
var p bool
op.mu.RLock()
p = op.priority
op.mu.RUnlock()
return p
}
// SetPriorityPin returns true if the pin has been marked as priority pin.
func (op *Operation) SetPriorityPin(p bool) {
op.mu.Lock()
op.priority = p
op.mu.Unlock()
}
// Error returns any error message attached to the operation.
func (op *Operation) Error() string {
var err string
op.mu.RLock()
err = op.error
op.mu.RUnlock()
return err
}
// SetError sets the phase to PhaseError along with
// an error message. It updates the timestamp.
func (op *Operation) SetError(err error) {
_, span := trace.StartSpan(op.ctx, "optracker/SetError")
op.mu.Lock()
{
op.phase = PhaseError
op.error = err.Error()
op.ts = time.Now()
}
op.mu.Unlock()
span.End()
}
// Type returns the operation Type.
func (op *Operation) Type() OperationType {
return op.opType
}
// Pin returns the Pin object associated to the operation.
func (op *Operation) Pin() api.Pin {
return op.pin
}
// Timestamp returns the time when this operation was
// last modified (phase changed, error was set...).
func (op *Operation) Timestamp() time.Time {
var ts time.Time
op.mu.RLock()
ts = op.ts
op.mu.RUnlock()
return ts
}
// Cancelled returns whether the context for this
// operation has been cancelled.
func (op *Operation) Cancelled() bool {
ctx, span := trace.StartSpan(op.ctx, "optracker/Cancelled")
_ = ctx
defer span.End()
select {
case <-op.ctx.Done():
return true
default:
return false
}
}
// ToTrackerStatus returns an api.TrackerStatus reflecting
// the current status of this operation. It's a translation
// from the Type and the Phase.
func (op *Operation) ToTrackerStatus() api.TrackerStatus {
typ := op.Type()
ph := op.Phase()
switch typ {
case OperationPin:
switch ph {
case PhaseError:
return api.TrackerStatusPinError
case PhaseQueued:
return api.TrackerStatusPinQueued
case PhaseInProgress:
return api.TrackerStatusPinning
case PhaseDone:
return api.TrackerStatusPinned
default:
return api.TrackerStatusUndefined
}
case OperationUnpin:
switch ph {
case PhaseError:
return api.TrackerStatusUnpinError
case PhaseQueued:
return api.TrackerStatusUnpinQueued
case PhaseInProgress:
return api.TrackerStatusUnpinning
case PhaseDone:
return api.TrackerStatusUnpinned
default:
return api.TrackerStatusUndefined
}
case OperationRemote:
return api.TrackerStatusRemote
case OperationShard:
return api.TrackerStatusSharded
default:
return api.TrackerStatusUndefined
}
}
// TrackerStatusToOperationPhase takes an api.TrackerStatus and
// converts it to an OpType and Phase.
func TrackerStatusToOperationPhase(status api.TrackerStatus) (OperationType, Phase) {
switch status {
case api.TrackerStatusPinError:
return OperationPin, PhaseError
case api.TrackerStatusPinQueued:
return OperationPin, PhaseQueued
case api.TrackerStatusPinning:
return OperationPin, PhaseInProgress
case api.TrackerStatusPinned:
return OperationPin, PhaseDone
case api.TrackerStatusUnpinError:
return OperationUnpin, PhaseError
case api.TrackerStatusUnpinQueued:
return OperationUnpin, PhaseQueued
case api.TrackerStatusUnpinning:
return OperationUnpin, PhaseInProgress
case api.TrackerStatusUnpinned:
return OperationUnpin, PhaseDone
case api.TrackerStatusRemote:
return OperationRemote, PhaseDone
case api.TrackerStatusSharded:
return OperationShard, PhaseDone
default:
return OperationUnknown, PhaseError
}
}