crdt: Implement proper Batch commit on shutdown

This implements committing batches on shutdown properly.

Now the batchWorker will only finish when there are no more things queued to
be included in the final batch(es).

LogPin/Unpin operations will fail while we are shutting down and they cannot
be included in the batch.
This commit is contained in:
Hector Sanjuan 2022-06-22 18:53:00 +02:00
parent a393ebd8d8
commit 5e7a694cd1

View File

@ -49,9 +49,10 @@ var (
// wraps pins so that they can be batched. // wraps pins so that they can be batched.
type batchItem struct { type batchItem struct {
ctx context.Context ctx context.Context
isPin bool // pin or unpin isPin bool // pin or unpin
pin api.Pin pin api.Pin
batched chan error // notify if item was sent for batching
} }
// Consensus implement ipfscluster.Consensus and provides the facility to add // Consensus implement ipfscluster.Consensus and provides the facility to add
@ -81,11 +82,14 @@ type Consensus struct {
dht routing.Routing dht routing.Routing
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
rpcClient *rpc.Client rpcClient *rpc.Client
rpcReady chan struct{} rpcReady chan struct{}
stateReady chan struct{} stateReady chan struct{}
readyCh chan struct{} readyCh chan struct{}
batchItemCh chan batchItem
sendToBatchCh chan batchItem
batchItemCh chan batchItem
batchingDone chan struct{}
shutdownLock sync.RWMutex shutdownLock sync.RWMutex
shutdown bool shutdown bool
@ -145,7 +149,9 @@ func New(
rpcReady: make(chan struct{}, 1), rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1), readyCh: make(chan struct{}, 1),
stateReady: make(chan struct{}, 1), stateReady: make(chan struct{}, 1),
sendToBatchCh: make(chan batchItem),
batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize), batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize),
batchingDone: make(chan struct{}),
} }
go css.setup() go css.setup()
@ -318,6 +324,7 @@ func (css *Consensus) setup() {
css.config.Batching.MaxBatchSize, css.config.Batching.MaxBatchSize,
css.config.Batching.MaxBatchAge.String(), css.config.Batching.MaxBatchAge.String(),
) )
go css.sendToBatchWorker()
go css.batchWorker() go css.batchWorker()
} }
@ -340,13 +347,10 @@ func (css *Consensus) Shutdown(ctx context.Context) error {
logger.Info("stopping Consensus component") logger.Info("stopping Consensus component")
// Cancel // Cancel the batching code
css.batchingCancel() css.batchingCancel()
if css.config.batchingEnabled() { if css.config.batchingEnabled() {
logger.Info("committing pending batches") <-css.batchingDone
if err := css.batchingState.Commit(css.ctx); err != nil {
logger.Errorf("error committing batch before shutdown: %w", err)
}
} }
css.cancel() css.cancel()
@ -430,16 +434,14 @@ func (css *Consensus) LogPin(ctx context.Context, pin api.Pin) error {
defer span.End() defer span.End()
if css.config.batchingEnabled() { if css.config.batchingEnabled() {
select { batched := make(chan error)
case css.batchItemCh <- batchItem{ css.sendToBatchCh <- batchItem{
ctx: ctx, ctx: ctx,
isPin: true, isPin: true,
pin: pin, pin: pin,
}: batched: batched,
return nil
default:
return fmt.Errorf("error pinning: %w", ErrMaxQueueSizeReached)
} }
return <-batched
} }
return css.state.Add(ctx, pin) return css.state.Add(ctx, pin)
@ -451,23 +453,50 @@ func (css *Consensus) LogUnpin(ctx context.Context, pin api.Pin) error {
defer span.End() defer span.End()
if css.config.batchingEnabled() { if css.config.batchingEnabled() {
select { batched := make(chan error)
case css.batchItemCh <- batchItem{ css.sendToBatchCh <- batchItem{
ctx: ctx, ctx: ctx,
isPin: false, isPin: false,
pin: pin, pin: pin,
}: batched: batched,
return nil
default:
return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached)
} }
return <-batched
} }
return css.state.Rm(ctx, pin.Cid) return css.state.Rm(ctx, pin.Cid)
} }
func (css *Consensus) sendToBatchWorker() {
for {
select {
case <-css.batchingCtx.Done():
close(css.batchItemCh)
// This will stay here forever to catch any pins sent
// while shutting down.
for bi := range css.sendToBatchCh {
bi.batched <- errors.New("shutting down. Pin could not be batched")
close(bi.batched)
}
return
case bi := <-css.sendToBatchCh:
select {
case css.batchItemCh <- bi:
close(bi.batched) // no error
default: // queue is full
err := fmt.Errorf("error batching item: %w", ErrMaxQueueSizeReached)
logger.Error(err)
bi.batched <- err
close(bi.batched)
}
}
}
}
// Launched in setup as a goroutine. // Launched in setup as a goroutine.
func (css *Consensus) batchWorker() { func (css *Consensus) batchWorker() {
defer close(css.batchingDone)
maxSize := css.config.Batching.MaxBatchSize maxSize := css.config.Batching.MaxBatchSize
maxAge := css.config.Batching.MaxBatchAge maxAge := css.config.Batching.MaxBatchAge
batchCurSize := 0 batchCurSize := 0
@ -478,12 +507,36 @@ func (css *Consensus) batchWorker() {
<-batchTimer.C <-batchTimer.C
} }
// Add/Rm from state
addToBatch := func(bi batchItem) error {
var err error
if bi.isPin {
err = css.batchingState.Add(bi.ctx, bi.pin)
} else {
err = css.batchingState.Rm(bi.ctx, bi.pin.Cid)
}
if err != nil {
logger.Errorf("error batching: %s (%s, isPin: %s)", err, bi.pin.Cid, bi.isPin)
}
return err
}
for { for {
select { select {
case <-css.batchingCtx.Done(): case <-css.batchingCtx.Done():
if !batchTimer.Stop() { // Drain batchItemCh for missing things to be batched
<-batchTimer.C for batchItem := range css.batchItemCh {
err := addToBatch(batchItem)
if err != nil {
continue
}
batchCurSize++
} }
if err := css.batchingState.Commit(css.ctx); err != nil {
logger.Errorf("error committing batch during shutdown: %s", err)
}
logger.Infof("batch commit (shutdown): %d items", batchCurSize)
return return
case batchItem := <-css.batchItemCh: case batchItem := <-css.batchItemCh:
// First item in batch. Start the timer // First item in batch. Start the timer
@ -491,15 +544,8 @@ func (css *Consensus) batchWorker() {
batchTimer.Reset(maxAge) batchTimer.Reset(maxAge)
} }
// Add/Rm from state err := addToBatch(batchItem)
var err error
if batchItem.isPin {
err = css.batchingState.Add(batchItem.ctx, batchItem.pin)
} else {
err = css.batchingState.Rm(batchItem.ctx, batchItem.pin.Cid)
}
if err != nil { if err != nil {
logger.Errorf("error batching: %s (%s, isPin: %s)", err, batchItem.pin.Cid, batchItem.isPin)
continue continue
} }