Pintracker: add and track retry counts in the operation manager.

Report retry count in the PinStatus
This commit is contained in:
Hector Sanjuan 2021-10-27 11:04:47 +02:00
parent be18c645fa
commit 29c277b67f
5 changed files with 65 additions and 31 deletions

View File

@ -289,6 +289,7 @@ type PinInfoShort struct {
Status TrackerStatus `json:"status" codec:"st,omitempty"`
TS time.Time `json:"timestamp" codec:"ts,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
RetryCount int `json:"retry_count" codec:"r,omitempty"`
}
// PinInfo holds information about local pins. This is used by the Pin

View File

@ -172,7 +172,12 @@ func textFormatPrintGPInfo(obj *api.GlobalPinInfo) {
fmt.Fprintf(&b, ": %s", v.Error)
}
txt, _ := v.TS.MarshalText()
fmt.Fprintf(&b, " | %s\n", txt)
fmt.Fprintf(&b, " | %s", txt)
if retries := v.RetryCount; retries > 0 {
fmt.Fprintf(&b, " | Retries: %d", retries)
}
fmt.Fprintf(&b, "\n")
}
fmt.Print(b.String())
}

View File

@ -63,6 +63,7 @@ type Operation struct {
// RW fields
mu sync.RWMutex
phase Phase
retryCount int
error string
ts time.Time
}
@ -80,6 +81,7 @@ func NewOperation(ctx context.Context, pin *api.Pin, typ OperationType, ph Phase
pin: pin,
opType: typ,
phase: ph,
retryCount: 0,
ts: time.Now(),
error: "",
}
@ -97,6 +99,7 @@ func (op *Operation) String() string {
fmt.Fprintf(&b, "\t%s\n", s)
}
fmt.Fprintf(&b, "phase: %s\n", op.Phase().String())
fmt.Fprintf(&b, "retryCount: %d\n", op.RetryCount())
fmt.Fprintf(&b, "error: %s\n", op.Error())
fmt.Fprintf(&b, "timestamp: %s\n", op.Timestamp().String())
@ -147,6 +150,25 @@ func (op *Operation) SetPhase(ph Phase) {
span.End()
}
// RetryCount returns the number of times that this operation has been in
// progress.
func (op *Operation) RetryCount() int {
var retries int
op.mu.RLock()
retries = op.retryCount
op.mu.RUnlock()
return retries
}
// IncRetry does a plus-one on the RetryCount.
func (op *Operation) IncRetry() {
op.mu.Lock()
op.retryCount++
op.mu.Unlock()
}
// Error returns any error message attached to the operation.
func (op *Operation) Error() string {
var err string

View File

@ -144,6 +144,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation) a
PeerName: opt.peerName,
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
RetryCount: 0,
Error: "",
},
}
@ -155,6 +156,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation) a
PeerName: opt.peerName,
Status: op.ToTrackerStatus(),
TS: op.Timestamp(),
RetryCount: op.RetryCount(),
Error: op.Error(),
},
}

View File

@ -106,6 +106,7 @@ func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) b
return true
}
op.SetPhase(optracker.PhaseInProgress)
op.IncRetry()
err := pinF(op) // call pin/unpin
if err != nil {
if op.Cancelled() {
@ -119,7 +120,7 @@ func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) b
}
op.SetPhase(optracker.PhaseDone)
op.Cancel()
return false
return false // this tells the opWorker to clean the operation from the tracker.
}
func (spt *Tracker) pin(op *optracker.Operation) error {
@ -168,7 +169,7 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera
logger.Debugf("entering enqueue: pin: %+v", c)
op := spt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued)
if op == nil {
return nil // ongoing operation.
return nil // the operation exists and must be queued already.
}
var ch chan *optracker.Operation
@ -317,6 +318,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
TS: time.Now(),
RetryCount: 0,
},
}
@ -466,6 +468,7 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo
PeerName: spt.peerName,
Status: ips.ToTrackerStatus(),
TS: time.Now(), // to be set later
RetryCount: 0,
},
}
pins[c] = p
@ -526,6 +529,7 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.T
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
TS: p.Timestamp,
RetryCount: 0,
},
}