Merge pull request #1719 from ipfs-cluster/fix/1697-commit-batches

crdt: Commit batches on shutdown
This commit is contained in:
Hector Sanjuan 2022-06-23 11:03:28 +02:00 committed by GitHub
commit 12490c959a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 61 deletions

View File

@ -778,6 +778,14 @@ func (c *Cluster) Shutdown(ctx context.Context) error {
logger.Info("shutting down Cluster")
// Shutdown APIs first, avoids more requests coming through.
for _, api := range c.apis {
if err := api.Shutdown(ctx); err != nil {
logger.Errorf("error stopping API: %s", err)
return err
}
}
// Cancel discovery service (this shutdowns announcing). Handling
// entries is canceled along with the context below.
if c.discovery != nil {
@ -829,13 +837,6 @@ func (c *Cluster) Shutdown(ctx context.Context) error {
return err
}
for _, api := range c.apis {
if err := api.Shutdown(ctx); err != nil {
logger.Errorf("error stopping API: %s", err)
return err
}
}
if err := c.ipfs.Shutdown(ctx); err != nil {
logger.Errorf("error stopping IPFS Connector: %s", err)
return err

View File

@ -49,17 +49,20 @@ var (
// wraps pins so that they can be batched.
type batchItem struct {
ctx context.Context
isPin bool // pin or unpin
pin api.Pin
ctx context.Context
isPin bool // pin or unpin
pin api.Pin
batched chan error // notify if item was sent for batching
}
// Consensus implement ipfscluster.Consensus and provides the facility to add
// and remove pins from the Cluster shared state. It uses a CRDT-backed
// implementation of go-datastore (go-ds-crdt).
type Consensus struct {
ctx context.Context
cancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
batchingCtx context.Context
batchingCancel context.CancelFunc
config *Config
@ -79,11 +82,14 @@ type Consensus struct {
dht routing.Routing
pubsub *pubsub.PubSub
rpcClient *rpc.Client
rpcReady chan struct{}
stateReady chan struct{}
readyCh chan struct{}
batchItemCh chan batchItem
rpcClient *rpc.Client
rpcReady chan struct{}
stateReady chan struct{}
readyCh chan struct{}
sendToBatchCh chan batchItem
batchItemCh chan batchItem
batchingDone chan struct{}
shutdownLock sync.RWMutex
shutdown bool
@ -105,6 +111,7 @@ func New(
}
ctx, cancel := context.WithCancel(context.Background())
batchingCtx, batchingCancel := context.WithCancel(ctx)
var blocksDatastore ds.Batching
ns := ds.NewKey(cfg.DatastoreNamespace)
@ -122,24 +129,29 @@ func New(
if err != nil {
logger.Errorf("error creating ipfs-lite: %s", err)
cancel()
batchingCancel()
return nil, err
}
css := &Consensus{
ctx: ctx,
cancel: cancel,
config: cfg,
host: host,
peerManager: pstoremgr.New(ctx, host, ""),
dht: dht,
store: store,
ipfs: ipfs,
namespace: ns,
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
stateReady: make(chan struct{}, 1),
batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize),
ctx: ctx,
cancel: cancel,
batchingCtx: batchingCtx,
batchingCancel: batchingCancel,
config: cfg,
host: host,
peerManager: pstoremgr.New(ctx, host, ""),
dht: dht,
store: store,
ipfs: ipfs,
namespace: ns,
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
stateReady: make(chan struct{}, 1),
sendToBatchCh: make(chan batchItem),
batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize),
batchingDone: make(chan struct{}),
}
go css.setup()
@ -312,6 +324,7 @@ func (css *Consensus) setup() {
css.config.Batching.MaxBatchSize,
css.config.Batching.MaxBatchAge.String(),
)
go css.sendToBatchWorker()
go css.batchWorker()
}
@ -330,9 +343,16 @@ func (css *Consensus) Shutdown(ctx context.Context) error {
logger.Debug("already shutdown")
return nil
}
css.shutdown = true
logger.Info("stopping Consensus component")
// Cancel the batching code
css.batchingCancel()
if css.config.batchingEnabled() {
<-css.batchingDone
}
css.cancel()
// Only close crdt after canceling the context, otherwise
@ -414,16 +434,14 @@ func (css *Consensus) LogPin(ctx context.Context, pin api.Pin) error {
defer span.End()
if css.config.batchingEnabled() {
select {
case css.batchItemCh <- batchItem{
ctx: ctx,
isPin: true,
pin: pin,
}:
return nil
default:
return fmt.Errorf("error pinning: %w", ErrMaxQueueSizeReached)
batched := make(chan error)
css.sendToBatchCh <- batchItem{
ctx: ctx,
isPin: true,
pin: pin,
batched: batched,
}
return <-batched
}
return css.state.Add(ctx, pin)
@ -435,23 +453,50 @@ func (css *Consensus) LogUnpin(ctx context.Context, pin api.Pin) error {
defer span.End()
if css.config.batchingEnabled() {
select {
case css.batchItemCh <- batchItem{
ctx: ctx,
isPin: false,
pin: pin,
}:
return nil
default:
return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached)
batched := make(chan error)
css.sendToBatchCh <- batchItem{
ctx: ctx,
isPin: false,
pin: pin,
batched: batched,
}
return <-batched
}
return css.state.Rm(ctx, pin.Cid)
}
func (css *Consensus) sendToBatchWorker() {
for {
select {
case <-css.batchingCtx.Done():
close(css.batchItemCh)
// This will stay here forever to catch any pins sent
// while shutting down.
for bi := range css.sendToBatchCh {
bi.batched <- errors.New("shutting down. Pin could not be batched")
close(bi.batched)
}
return
case bi := <-css.sendToBatchCh:
select {
case css.batchItemCh <- bi:
close(bi.batched) // no error
default: // queue is full
err := fmt.Errorf("error batching item: %w", ErrMaxQueueSizeReached)
logger.Error(err)
bi.batched <- err
close(bi.batched)
}
}
}
}
// Launched in setup as a goroutine.
func (css *Consensus) batchWorker() {
defer close(css.batchingDone)
maxSize := css.config.Batching.MaxBatchSize
maxAge := css.config.Batching.MaxBatchAge
batchCurSize := 0
@ -462,9 +507,36 @@ func (css *Consensus) batchWorker() {
<-batchTimer.C
}
// Add/Rm from state
addToBatch := func(bi batchItem) error {
var err error
if bi.isPin {
err = css.batchingState.Add(bi.ctx, bi.pin)
} else {
err = css.batchingState.Rm(bi.ctx, bi.pin.Cid)
}
if err != nil {
logger.Errorf("error batching: %s (%s, isPin: %s)", err, bi.pin.Cid, bi.isPin)
}
return err
}
for {
select {
case <-css.ctx.Done():
case <-css.batchingCtx.Done():
// Drain batchItemCh for missing things to be batched
for batchItem := range css.batchItemCh {
err := addToBatch(batchItem)
if err != nil {
continue
}
batchCurSize++
}
if err := css.batchingState.Commit(css.ctx); err != nil {
logger.Errorf("error committing batch during shutdown: %s", err)
}
logger.Infof("batch commit (shutdown): %d items", batchCurSize)
return
case batchItem := <-css.batchItemCh:
// First item in batch. Start the timer
@ -472,15 +544,8 @@ func (css *Consensus) batchWorker() {
batchTimer.Reset(maxAge)
}
// Add/Rm from state
var err error
if batchItem.isPin {
err = css.batchingState.Add(batchItem.ctx, batchItem.pin)
} else {
err = css.batchingState.Rm(batchItem.ctx, batchItem.pin.Cid)
}
err := addToBatch(batchItem)
if err != nil {
logger.Errorf("error batching: %s (%s, isPin: %s)", err, batchItem.pin.Cid, batchItem.isPin)
continue
}

2
go.mod
View File

@ -19,7 +19,7 @@ require (
github.com/ipfs/go-cid v0.2.0
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/go-ds-crdt v0.3.5
github.com/ipfs/go-ds-crdt v0.3.6
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-fs-lock v0.0.7
github.com/ipfs/go-ipfs-api v0.3.0

4
go.sum
View File

@ -520,8 +520,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-badger v0.2.7/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro=
github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek=
github.com/ipfs/go-ds-crdt v0.3.5 h1:lr2AiMrc1VjAgWXIwkwb7e+duRIgUA5Su3LtugQWp+M=
github.com/ipfs/go-ds-crdt v0.3.5/go.mod h1:w6ktorLWKqddUP2Xb7zIrbOnsjX/oGmoOb2THFi6KCM=
github.com/ipfs/go-ds-crdt v0.3.6 h1:GI/rD7+dpmskToF7GPQ60w8Xr8Z7Xw/r7rhkp42ItQE=
github.com/ipfs/go-ds-crdt v0.3.6/go.mod h1:w6ktorLWKqddUP2Xb7zIrbOnsjX/oGmoOb2THFi6KCM=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=