diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 4b2cb5d1..462b6c3e 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -49,9 +49,10 @@ var ( // wraps pins so that they can be batched. type batchItem struct { - ctx context.Context - isPin bool // pin or unpin - pin api.Pin + ctx context.Context + isPin bool // pin or unpin + pin api.Pin + batched chan error // notify if item was sent for batching } // Consensus implement ipfscluster.Consensus and provides the facility to add @@ -81,11 +82,14 @@ type Consensus struct { dht routing.Routing pubsub *pubsub.PubSub - rpcClient *rpc.Client - rpcReady chan struct{} - stateReady chan struct{} - readyCh chan struct{} - batchItemCh chan batchItem + rpcClient *rpc.Client + rpcReady chan struct{} + stateReady chan struct{} + readyCh chan struct{} + + sendToBatchCh chan batchItem + batchItemCh chan batchItem + batchingDone chan struct{} shutdownLock sync.RWMutex shutdown bool @@ -145,7 +149,9 @@ func New( rpcReady: make(chan struct{}, 1), readyCh: make(chan struct{}, 1), stateReady: make(chan struct{}, 1), + sendToBatchCh: make(chan batchItem), batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize), + batchingDone: make(chan struct{}), } go css.setup() @@ -318,6 +324,7 @@ func (css *Consensus) setup() { css.config.Batching.MaxBatchSize, css.config.Batching.MaxBatchAge.String(), ) + go css.sendToBatchWorker() go css.batchWorker() } @@ -340,13 +347,10 @@ func (css *Consensus) Shutdown(ctx context.Context) error { logger.Info("stopping Consensus component") - // Cancel + // Cancel the batching code css.batchingCancel() if css.config.batchingEnabled() { - logger.Info("committing pending batches") - if err := css.batchingState.Commit(css.ctx); err != nil { - logger.Errorf("error committing batch before shutdown: %w", err) - } + <-css.batchingDone } css.cancel() @@ -430,16 +434,14 @@ func (css *Consensus) LogPin(ctx context.Context, pin api.Pin) error { defer span.End() if css.config.batchingEnabled() { - select { - case css.batchItemCh <- batchItem{ - ctx: ctx, - isPin: true, - pin: pin, - }: - return nil - default: - return fmt.Errorf("error pinning: %w", ErrMaxQueueSizeReached) + batched := make(chan error) + css.sendToBatchCh <- batchItem{ + ctx: ctx, + isPin: true, + pin: pin, + batched: batched, } + return <-batched } return css.state.Add(ctx, pin) @@ -451,23 +453,50 @@ func (css *Consensus) LogUnpin(ctx context.Context, pin api.Pin) error { defer span.End() if css.config.batchingEnabled() { - select { - case css.batchItemCh <- batchItem{ - ctx: ctx, - isPin: false, - pin: pin, - }: - return nil - default: - return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached) + batched := make(chan error) + css.sendToBatchCh <- batchItem{ + ctx: ctx, + isPin: false, + pin: pin, + batched: batched, } + return <-batched } return css.state.Rm(ctx, pin.Cid) } +func (css *Consensus) sendToBatchWorker() { + for { + select { + case <-css.batchingCtx.Done(): + close(css.batchItemCh) + // This will stay here forever to catch any pins sent + // while shutting down. + for bi := range css.sendToBatchCh { + bi.batched <- errors.New("shutting down. Pin could not be batched") + close(bi.batched) + } + + return + case bi := <-css.sendToBatchCh: + select { + case css.batchItemCh <- bi: + close(bi.batched) // no error + default: // queue is full + err := fmt.Errorf("error batching item: %w", ErrMaxQueueSizeReached) + logger.Error(err) + bi.batched <- err + close(bi.batched) + } + } + } +} + // Launched in setup as a goroutine. func (css *Consensus) batchWorker() { + defer close(css.batchingDone) + maxSize := css.config.Batching.MaxBatchSize maxAge := css.config.Batching.MaxBatchAge batchCurSize := 0 @@ -478,12 +507,36 @@ func (css *Consensus) batchWorker() { <-batchTimer.C } + // Add/Rm from state + addToBatch := func(bi batchItem) error { + var err error + if bi.isPin { + err = css.batchingState.Add(bi.ctx, bi.pin) + } else { + err = css.batchingState.Rm(bi.ctx, bi.pin.Cid) + } + if err != nil { + logger.Errorf("error batching: %s (%s, isPin: %s)", err, bi.pin.Cid, bi.isPin) + } + return err + } + for { select { case <-css.batchingCtx.Done(): - if !batchTimer.Stop() { - <-batchTimer.C + // Drain batchItemCh for missing things to be batched + for batchItem := range css.batchItemCh { + err := addToBatch(batchItem) + if err != nil { + continue + } + batchCurSize++ } + if err := css.batchingState.Commit(css.ctx); err != nil { + logger.Errorf("error committing batch during shutdown: %s", err) + } + logger.Infof("batch commit (shutdown): %d items", batchCurSize) + return case batchItem := <-css.batchItemCh: // First item in batch. Start the timer @@ -491,15 +544,8 @@ func (css *Consensus) batchWorker() { batchTimer.Reset(maxAge) } - // Add/Rm from state - var err error - if batchItem.isPin { - err = css.batchingState.Add(batchItem.ctx, batchItem.pin) - } else { - err = css.batchingState.Rm(batchItem.ctx, batchItem.pin.Cid) - } + err := addToBatch(batchItem) if err != nil { - logger.Errorf("error batching: %s (%s, isPin: %s)", err, batchItem.pin.Cid, batchItem.isPin) continue }