From b5cc68a321d8111c26038730b2ef34f8443c2ebf Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 29 Apr 2021 00:51:01 +0200 Subject: [PATCH] Feat #1008: Support pin-batching with CRDT consensus. This adds batching support to crdt-consensus per #1008 . The crdt component can now take advantage of the BatchingState, which uses the batching-crdt datastore. In batching mode, the crdt datastore groups any Add and Delete operations in a single delta (instead of just 1, as it does by default). Batching is enabled in the crdt configuration section by setting MaxBatchSize **and** MaxBatchAge. These two settings control when a batch is committed, either by reaching a maximum number of pin/unpin operations, or by reaching a maximum age. Batching unlocks large pin-ingestion scalability for clusters, but should be set according to expected work loads. An additional, hidden MaxQueueSize parameter provides the ability to perform backpressure on Pin/Unpin requests. When more than MaxQueueSize pin/unpins are waiting to be included in a batch, the LogPin/LogUnpin operations will fail. If this happens, it is means cluster cannot commit batches as fast as pins are arriving. Thus, MaxQueueSize should be increase (to accommodate bursts), or the batch size increased (to perform less commits and hopefully handle the requests faster). Note that the underlying CRDT library will auto-commit when batch deltas reach 1MB of size. --- consensus/crdt/consensus.go | 143 +++++++++++++++++++++++++++++++++--- 1 file changed, 134 insertions(+), 9 deletions(-) diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 52c69391..41bf94bb 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -3,6 +3,7 @@ package crdt import ( "context" "errors" + "fmt" "sort" "sync" "time" @@ -40,10 +41,18 @@ var ( // Common variables for the module. var ( - ErrNoLeader = errors.New("crdt consensus component does not provide a leader") - ErrRmPeer = errors.New("crdt consensus component cannot remove peers") + ErrNoLeader = errors.New("crdt consensus component does not provide a leader") + ErrRmPeer = errors.New("crdt consensus component cannot remove peers") + ErrMaxQueueSizeReached = errors.New("batching max_queue_size reached. Too many operations are waiting to be batched. Try increasing the max_queue_size or adjusting the batching options") ) +// wraps pins so that they can be batched. +type batchItem struct { + ctx context.Context + isPin bool // pin or unpin + pin *api.Pin +} + // Consensus implement ipfscluster.Consensus and provides the facility to add // and remove pins from the Cluster shared state. It uses a CRDT-backed // implementation of go-datastore (go-ds-crdt). @@ -61,17 +70,19 @@ type Consensus struct { store ds.Datastore namespace ds.Key - state state.State - crdt *crdt.Datastore - ipfs *ipfslite.Peer + state state.State + batchingState state.BatchingState + crdt *crdt.Datastore + ipfs *ipfslite.Peer dht routing.Routing pubsub *pubsub.PubSub - rpcClient *rpc.Client - rpcReady chan struct{} - stateReady chan struct{} - readyCh chan struct{} + rpcClient *rpc.Client + rpcReady chan struct{} + stateReady chan struct{} + readyCh chan struct{} + batchItemCh chan batchItem shutdownLock sync.RWMutex shutdown bool @@ -127,6 +138,7 @@ func New( rpcReady: make(chan struct{}, 1), readyCh: make(chan struct{}, 1), stateReady: make(chan struct{}, 1), + batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize), } go css.setup() @@ -273,10 +285,30 @@ func (css *Consensus) setup() { } css.state = clusterState + batchingState, err := dsstate.NewBatching( + css.crdt, + "", + dsstate.DefaultHandle(), + ) + if err != nil { + logger.Errorf("error creating cluster state batching datastore: %s", err) + return + } + css.batchingState = batchingState + if css.config.TrustAll { logger.Info("'trust all' mode enabled. Any peer in the cluster can modify the pinset.") } + // launch batching workers + if css.config.batchingEnabled() { + logger.Infof("'crdt batching' enabled: %d items / %s", + css.config.Batching.MaxBatchSize, + css.config.Batching.MaxBatchAge.String(), + ) + go css.batchWorker() + } + // notifies State() it is safe to return close(css.stateReady) css.readyCh <- struct{}{} @@ -375,6 +407,19 @@ func (css *Consensus) LogPin(ctx context.Context, pin *api.Pin) error { ctx, span := trace.StartSpan(ctx, "consensus/LogPin") defer span.End() + if css.config.batchingEnabled() { + select { + case css.batchItemCh <- batchItem{ + ctx: ctx, + isPin: true, + pin: pin, + }: + return nil + default: + return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached) + } + } + return css.state.Add(ctx, pin) } @@ -383,9 +428,89 @@ func (css *Consensus) LogUnpin(ctx context.Context, pin *api.Pin) error { ctx, span := trace.StartSpan(ctx, "consensus/LogUnpin") defer span.End() + if css.config.batchingEnabled() { + select { + case css.batchItemCh <- batchItem{ + ctx: ctx, + isPin: false, + pin: pin, + }: + return nil + default: + return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached) + } + } + return css.state.Rm(ctx, pin.Cid) } +// Launched in setup as a goroutine. +func (css *Consensus) batchWorker() { + maxSize := css.config.Batching.MaxBatchSize + maxAge := css.config.Batching.MaxBatchAge + batchCurSize := 0 + // Create the timer but stop it. It will reset when + // items start arriving. + batchTimer := time.NewTimer(maxAge) + if !batchTimer.Stop() { + <-batchTimer.C + } + + for { + select { + case <-css.ctx.Done(): + return + case batchItem := <-css.batchItemCh: + // First item in batch. Start the timer + if batchCurSize == 0 { + batchTimer.Reset(maxAge) + } + + // Add/Rm from state + var err error + if batchItem.isPin { + err = css.batchingState.Add(batchItem.ctx, batchItem.pin) + } else { + err = css.batchingState.Rm(batchItem.ctx, batchItem.pin.Cid) + } + if err != nil { + logger.Errorf("error batching: %s (%s, isPin: %s)", err, batchItem.pin.Cid, batchItem.isPin) + continue + } + + batchCurSize++ + + if batchCurSize < maxSize { + continue + } + + if err := css.batchingState.Commit(css.ctx); err != nil { + logger.Errorf("error commiting batch after reaching max size: %s", err) + continue + } + logger.Debugf("batch commit (size): %d items", maxSize) + + // Stop timer and commit. Leave ready to reset on next + // item. + if !batchTimer.Stop() { + <-batchTimer.C + } + batchCurSize = 0 + + case <-batchTimer.C: + // Commit + if err := css.batchingState.Commit(css.ctx); err != nil { + logger.Errorf("error commiting batch after reaching max age: %s", err) + continue + } + logger.Debugf("batch commit (max age): %d items", batchCurSize) + // timer is expired at this point, it will have to be + // reset. + batchCurSize = 0 + } + } +} + // Peers returns the current known peerset. It uses // the monitor component and considers every peer with // valid known metrics a member.