diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index 0744dee3..7bd9e363 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -49,8 +49,9 @@ type Tracker struct { rpcClient *rpc.Client rpcReady chan struct{} - pinCh chan *optracker.Operation - unpinCh chan *optracker.Operation + priorityPinCh chan *optracker.Operation + pinCh chan *optracker.Operation + unpinCh chan *optracker.Operation shutdownMu sync.Mutex shutdown bool @@ -62,48 +63,67 @@ func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Co ctx, cancel := context.WithCancel(context.Background()) spt := &Tracker{ - config: cfg, - peerID: pid, - peerName: peerName, - ctx: ctx, - cancel: cancel, - getState: getState, - optracker: optracker.NewOperationTracker(ctx, pid, peerName), - rpcReady: make(chan struct{}, 1), - pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), - unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), + config: cfg, + peerID: pid, + peerName: peerName, + ctx: ctx, + cancel: cancel, + getState: getState, + optracker: optracker.NewOperationTracker(ctx, pid, peerName), + rpcReady: make(chan struct{}, 1), + priorityPinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), + pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), + unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), } for i := 0; i < spt.config.ConcurrentPins; i++ { - go spt.opWorker(spt.pin, spt.pinCh) + go spt.opWorker(spt.pin, spt.priorityPinCh, spt.pinCh) } - go spt.opWorker(spt.unpin, spt.unpinCh) + go spt.opWorker(spt.unpin, spt.unpinCh, nil) return spt } -// receives a pin Function (pin or unpin) and a channel. -// Used for both pinning and unpinning -func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) { - for { - select { - case op := <-opChan: - if cont := applyPinF(pinF, op); cont { - continue - } +// receives a pin Function (pin or unpin) and channels. Used for both pinning +// and unpinning. +func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, prioCh, normalCh chan *optracker.Operation) { - spt.optracker.Clean(op.Context(), op) + var op *optracker.Operation + + for { + // Process the priority channel first. + select { + case op = <-prioCh: + goto APPLY_OP case <-spt.ctx.Done(): return + default: + } + + // Then process things on the other channels. + // Block if there are no things to process. + select { + case op = <-prioCh: + goto APPLY_OP + case op = <-normalCh: + goto APPLY_OP + case <-spt.ctx.Done(): + return + } + + // apply operations that came from some channel + APPLY_OP: + if clean := applyPinF(pinF, op); clean { + spt.optracker.Clean(op.Context(), op) } } } -// applyPinF returns true if caller should call `continue` inside calling loop. +// applyPinF returns true if the operation can be considered "DONE". func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) bool { if op.Cancelled() { // operation was cancelled. Move on. // This saves some time, but not 100% needed. - return true + return false } op.SetPhase(optracker.PhaseInProgress) op.IncRetry() @@ -112,15 +132,15 @@ func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) b if op.Cancelled() { // there was an error because // we were cancelled. Move on. - return true + return false } op.SetError(err) op.Cancel() - return true + return false } op.SetPhase(optracker.PhaseDone) op.Cancel() - return false // this tells the opWorker to clean the operation from the tracker. + return true // this tells the opWorker to clean the operation from the tracker. } func (spt *Tracker) pin(op *optracker.Operation) error {