pintracker: support a priority channel for pinning

This commit is contained in:
Hector Sanjuan 2021-10-27 14:04:59 +02:00
parent 29c277b67f
commit 0a146dae76

View File

@ -49,6 +49,7 @@ type Tracker struct {
rpcClient *rpc.Client rpcClient *rpc.Client
rpcReady chan struct{} rpcReady chan struct{}
priorityPinCh chan *optracker.Operation
pinCh chan *optracker.Operation pinCh chan *optracker.Operation
unpinCh chan *optracker.Operation unpinCh chan *optracker.Operation
@ -70,40 +71,59 @@ func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Co
getState: getState, getState: getState,
optracker: optracker.NewOperationTracker(ctx, pid, peerName), optracker: optracker.NewOperationTracker(ctx, pid, peerName),
rpcReady: make(chan struct{}, 1), rpcReady: make(chan struct{}, 1),
priorityPinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
} }
for i := 0; i < spt.config.ConcurrentPins; i++ { for i := 0; i < spt.config.ConcurrentPins; i++ {
go spt.opWorker(spt.pin, spt.pinCh) go spt.opWorker(spt.pin, spt.priorityPinCh, spt.pinCh)
} }
go spt.opWorker(spt.unpin, spt.unpinCh) go spt.opWorker(spt.unpin, spt.unpinCh, nil)
return spt return spt
} }
// receives a pin Function (pin or unpin) and a channel. // receives a pin Function (pin or unpin) and channels. Used for both pinning
// Used for both pinning and unpinning // and unpinning.
func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) { func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, prioCh, normalCh chan *optracker.Operation) {
var op *optracker.Operation
for { for {
// Process the priority channel first.
select { select {
case op := <-opChan: case op = <-prioCh:
if cont := applyPinF(pinF, op); cont { goto APPLY_OP
continue case <-spt.ctx.Done():
return
default:
} }
spt.optracker.Clean(op.Context(), op) // Then process things on the other channels.
// Block if there are no things to process.
select {
case op = <-prioCh:
goto APPLY_OP
case op = <-normalCh:
goto APPLY_OP
case <-spt.ctx.Done(): case <-spt.ctx.Done():
return return
} }
// apply operations that came from some channel
APPLY_OP:
if clean := applyPinF(pinF, op); clean {
spt.optracker.Clean(op.Context(), op)
}
} }
} }
// applyPinF returns true if caller should call `continue` inside calling loop. // applyPinF returns true if the operation can be considered "DONE".
func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) bool { func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) bool {
if op.Cancelled() { if op.Cancelled() {
// operation was cancelled. Move on. // operation was cancelled. Move on.
// This saves some time, but not 100% needed. // This saves some time, but not 100% needed.
return true return false
} }
op.SetPhase(optracker.PhaseInProgress) op.SetPhase(optracker.PhaseInProgress)
op.IncRetry() op.IncRetry()
@ -112,15 +132,15 @@ func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) b
if op.Cancelled() { if op.Cancelled() {
// there was an error because // there was an error because
// we were cancelled. Move on. // we were cancelled. Move on.
return true return false
} }
op.SetError(err) op.SetError(err)
op.Cancel() op.Cancel()
return true return false
} }
op.SetPhase(optracker.PhaseDone) op.SetPhase(optracker.PhaseDone)
op.Cancel() op.Cancel()
return false // this tells the opWorker to clean the operation from the tracker. return true // this tells the opWorker to clean the operation from the tracker.
} }
func (spt *Tracker) pin(op *optracker.Operation) error { func (spt *Tracker) pin(op *optracker.Operation) error {