crdt: Commit batches on shutdown

This attempt to commit any pending batches when the crdt component is being
shutudown. A commit should succeed if the new DAG node is created, heads are
replaced and broadcast.

The latest version of CRDT ensure that the datastore does not unnecessarily
gets marked as dirty when a broadcasted head cannot be fetched/processed, so
the side effect of publishing a head before shutting down should be under
control at least.
This commit is contained in:
Hector Sanjuan 2022-06-21 15:58:12 +02:00
parent d6166d802b
commit a393ebd8d8
4 changed files with 47 additions and 27 deletions

View File

@ -778,6 +778,14 @@ func (c *Cluster) Shutdown(ctx context.Context) error {
logger.Info("shutting down Cluster")
// Shutdown APIs first, avoids more requests coming through.
for _, api := range c.apis {
if err := api.Shutdown(ctx); err != nil {
logger.Errorf("error stopping API: %s", err)
return err
}
}
// Cancel discovery service (this shutdowns announcing). Handling
// entries is canceled along with the context below.
if c.discovery != nil {
@ -829,13 +837,6 @@ func (c *Cluster) Shutdown(ctx context.Context) error {
return err
}
for _, api := range c.apis {
if err := api.Shutdown(ctx); err != nil {
logger.Errorf("error stopping API: %s", err)
return err
}
}
if err := c.ipfs.Shutdown(ctx); err != nil {
logger.Errorf("error stopping IPFS Connector: %s", err)
return err

View File

@ -58,8 +58,10 @@ type batchItem struct {
// and remove pins from the Cluster shared state. It uses a CRDT-backed
// implementation of go-datastore (go-ds-crdt).
type Consensus struct {
ctx context.Context
cancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
batchingCtx context.Context
batchingCancel context.CancelFunc
config *Config
@ -105,6 +107,7 @@ func New(
}
ctx, cancel := context.WithCancel(context.Background())
batchingCtx, batchingCancel := context.WithCancel(ctx)
var blocksDatastore ds.Batching
ns := ds.NewKey(cfg.DatastoreNamespace)
@ -122,24 +125,27 @@ func New(
if err != nil {
logger.Errorf("error creating ipfs-lite: %s", err)
cancel()
batchingCancel()
return nil, err
}
css := &Consensus{
ctx: ctx,
cancel: cancel,
config: cfg,
host: host,
peerManager: pstoremgr.New(ctx, host, ""),
dht: dht,
store: store,
ipfs: ipfs,
namespace: ns,
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
stateReady: make(chan struct{}, 1),
batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize),
ctx: ctx,
cancel: cancel,
batchingCtx: batchingCtx,
batchingCancel: batchingCancel,
config: cfg,
host: host,
peerManager: pstoremgr.New(ctx, host, ""),
dht: dht,
store: store,
ipfs: ipfs,
namespace: ns,
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
stateReady: make(chan struct{}, 1),
batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize),
}
go css.setup()
@ -330,9 +336,19 @@ func (css *Consensus) Shutdown(ctx context.Context) error {
logger.Debug("already shutdown")
return nil
}
css.shutdown = true
logger.Info("stopping Consensus component")
// Cancel
css.batchingCancel()
if css.config.batchingEnabled() {
logger.Info("committing pending batches")
if err := css.batchingState.Commit(css.ctx); err != nil {
logger.Errorf("error committing batch before shutdown: %w", err)
}
}
css.cancel()
// Only close crdt after canceling the context, otherwise
@ -464,7 +480,10 @@ func (css *Consensus) batchWorker() {
for {
select {
case <-css.ctx.Done():
case <-css.batchingCtx.Done():
if !batchTimer.Stop() {
<-batchTimer.C
}
return
case batchItem := <-css.batchItemCh:
// First item in batch. Start the timer

2
go.mod
View File

@ -19,7 +19,7 @@ require (
github.com/ipfs/go-cid v0.2.0
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/go-ds-crdt v0.3.5
github.com/ipfs/go-ds-crdt v0.3.6
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-fs-lock v0.0.7
github.com/ipfs/go-ipfs-api v0.3.0

4
go.sum
View File

@ -520,8 +520,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-badger v0.2.7/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro=
github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek=
github.com/ipfs/go-ds-crdt v0.3.5 h1:lr2AiMrc1VjAgWXIwkwb7e+duRIgUA5Su3LtugQWp+M=
github.com/ipfs/go-ds-crdt v0.3.5/go.mod h1:w6ktorLWKqddUP2Xb7zIrbOnsjX/oGmoOb2THFi6KCM=
github.com/ipfs/go-ds-crdt v0.3.6 h1:GI/rD7+dpmskToF7GPQ60w8Xr8Z7Xw/r7rhkp42ItQE=
github.com/ipfs/go-ds-crdt v0.3.6/go.mod h1:w6ktorLWKqddUP2Xb7zIrbOnsjX/oGmoOb2THFi6KCM=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=