Feat #1008: Support pin-batching with CRDT consensus.

This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).

Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.

Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).

Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
This commit is contained in:
Hector Sanjuan 2021-04-29 00:51:01 +02:00
parent 75cf1b32c7
commit b5cc68a321

View File

@ -3,6 +3,7 @@ package crdt
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
@ -40,10 +41,18 @@ var (
// Common variables for the module.
var (
ErrNoLeader = errors.New("crdt consensus component does not provide a leader")
ErrRmPeer = errors.New("crdt consensus component cannot remove peers")
ErrNoLeader = errors.New("crdt consensus component does not provide a leader")
ErrRmPeer = errors.New("crdt consensus component cannot remove peers")
ErrMaxQueueSizeReached = errors.New("batching max_queue_size reached. Too many operations are waiting to be batched. Try increasing the max_queue_size or adjusting the batching options")
)
// wraps pins so that they can be batched.
type batchItem struct {
ctx context.Context
isPin bool // pin or unpin
pin *api.Pin
}
// Consensus implement ipfscluster.Consensus and provides the facility to add
// and remove pins from the Cluster shared state. It uses a CRDT-backed
// implementation of go-datastore (go-ds-crdt).
@ -61,17 +70,19 @@ type Consensus struct {
store ds.Datastore
namespace ds.Key
state state.State
crdt *crdt.Datastore
ipfs *ipfslite.Peer
state state.State
batchingState state.BatchingState
crdt *crdt.Datastore
ipfs *ipfslite.Peer
dht routing.Routing
pubsub *pubsub.PubSub
rpcClient *rpc.Client
rpcReady chan struct{}
stateReady chan struct{}
readyCh chan struct{}
rpcClient *rpc.Client
rpcReady chan struct{}
stateReady chan struct{}
readyCh chan struct{}
batchItemCh chan batchItem
shutdownLock sync.RWMutex
shutdown bool
@ -127,6 +138,7 @@ func New(
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
stateReady: make(chan struct{}, 1),
batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize),
}
go css.setup()
@ -273,10 +285,30 @@ func (css *Consensus) setup() {
}
css.state = clusterState
batchingState, err := dsstate.NewBatching(
css.crdt,
"",
dsstate.DefaultHandle(),
)
if err != nil {
logger.Errorf("error creating cluster state batching datastore: %s", err)
return
}
css.batchingState = batchingState
if css.config.TrustAll {
logger.Info("'trust all' mode enabled. Any peer in the cluster can modify the pinset.")
}
// launch batching workers
if css.config.batchingEnabled() {
logger.Infof("'crdt batching' enabled: %d items / %s",
css.config.Batching.MaxBatchSize,
css.config.Batching.MaxBatchAge.String(),
)
go css.batchWorker()
}
// notifies State() it is safe to return
close(css.stateReady)
css.readyCh <- struct{}{}
@ -375,6 +407,19 @@ func (css *Consensus) LogPin(ctx context.Context, pin *api.Pin) error {
ctx, span := trace.StartSpan(ctx, "consensus/LogPin")
defer span.End()
if css.config.batchingEnabled() {
select {
case css.batchItemCh <- batchItem{
ctx: ctx,
isPin: true,
pin: pin,
}:
return nil
default:
return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached)
}
}
return css.state.Add(ctx, pin)
}
@ -383,9 +428,89 @@ func (css *Consensus) LogUnpin(ctx context.Context, pin *api.Pin) error {
ctx, span := trace.StartSpan(ctx, "consensus/LogUnpin")
defer span.End()
if css.config.batchingEnabled() {
select {
case css.batchItemCh <- batchItem{
ctx: ctx,
isPin: false,
pin: pin,
}:
return nil
default:
return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached)
}
}
return css.state.Rm(ctx, pin.Cid)
}
// Launched in setup as a goroutine.
func (css *Consensus) batchWorker() {
maxSize := css.config.Batching.MaxBatchSize
maxAge := css.config.Batching.MaxBatchAge
batchCurSize := 0
// Create the timer but stop it. It will reset when
// items start arriving.
batchTimer := time.NewTimer(maxAge)
if !batchTimer.Stop() {
<-batchTimer.C
}
for {
select {
case <-css.ctx.Done():
return
case batchItem := <-css.batchItemCh:
// First item in batch. Start the timer
if batchCurSize == 0 {
batchTimer.Reset(maxAge)
}
// Add/Rm from state
var err error
if batchItem.isPin {
err = css.batchingState.Add(batchItem.ctx, batchItem.pin)
} else {
err = css.batchingState.Rm(batchItem.ctx, batchItem.pin.Cid)
}
if err != nil {
logger.Errorf("error batching: %s (%s, isPin: %s)", err, batchItem.pin.Cid, batchItem.isPin)
continue
}
batchCurSize++
if batchCurSize < maxSize {
continue
}
if err := css.batchingState.Commit(css.ctx); err != nil {
logger.Errorf("error commiting batch after reaching max size: %s", err)
continue
}
logger.Debugf("batch commit (size): %d items", maxSize)
// Stop timer and commit. Leave ready to reset on next
// item.
if !batchTimer.Stop() {
<-batchTimer.C
}
batchCurSize = 0
case <-batchTimer.C:
// Commit
if err := css.batchingState.Commit(css.ctx); err != nil {
logger.Errorf("error commiting batch after reaching max age: %s", err)
continue
}
logger.Debugf("batch commit (max age): %d items", batchCurSize)
// timer is expired at this point, it will have to be
// reset.
batchCurSize = 0
}
}
}
// Peers returns the current known peerset. It uses
// the monitor component and considers every peer with
// valid known metrics a member.