diff --git a/cluster.go b/cluster.go index 59703ab9..cb3af382 100644 --- a/cluster.go +++ b/cluster.go @@ -778,6 +778,14 @@ func (c *Cluster) Shutdown(ctx context.Context) error { logger.Info("shutting down Cluster") + // Shutdown APIs first, avoids more requests coming through. + for _, api := range c.apis { + if err := api.Shutdown(ctx); err != nil { + logger.Errorf("error stopping API: %s", err) + return err + } + } + // Cancel discovery service (this shutdowns announcing). Handling // entries is canceled along with the context below. if c.discovery != nil { @@ -829,13 +837,6 @@ func (c *Cluster) Shutdown(ctx context.Context) error { return err } - for _, api := range c.apis { - if err := api.Shutdown(ctx); err != nil { - logger.Errorf("error stopping API: %s", err) - return err - } - } - if err := c.ipfs.Shutdown(ctx); err != nil { logger.Errorf("error stopping IPFS Connector: %s", err) return err diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 2db624d9..462b6c3e 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -49,17 +49,20 @@ var ( // wraps pins so that they can be batched. type batchItem struct { - ctx context.Context - isPin bool // pin or unpin - pin api.Pin + ctx context.Context + isPin bool // pin or unpin + pin api.Pin + batched chan error // notify if item was sent for batching } // Consensus implement ipfscluster.Consensus and provides the facility to add // and remove pins from the Cluster shared state. It uses a CRDT-backed // implementation of go-datastore (go-ds-crdt). type Consensus struct { - ctx context.Context - cancel context.CancelFunc + ctx context.Context + cancel context.CancelFunc + batchingCtx context.Context + batchingCancel context.CancelFunc config *Config @@ -79,11 +82,14 @@ type Consensus struct { dht routing.Routing pubsub *pubsub.PubSub - rpcClient *rpc.Client - rpcReady chan struct{} - stateReady chan struct{} - readyCh chan struct{} - batchItemCh chan batchItem + rpcClient *rpc.Client + rpcReady chan struct{} + stateReady chan struct{} + readyCh chan struct{} + + sendToBatchCh chan batchItem + batchItemCh chan batchItem + batchingDone chan struct{} shutdownLock sync.RWMutex shutdown bool @@ -105,6 +111,7 @@ func New( } ctx, cancel := context.WithCancel(context.Background()) + batchingCtx, batchingCancel := context.WithCancel(ctx) var blocksDatastore ds.Batching ns := ds.NewKey(cfg.DatastoreNamespace) @@ -122,24 +129,29 @@ func New( if err != nil { logger.Errorf("error creating ipfs-lite: %s", err) cancel() + batchingCancel() return nil, err } css := &Consensus{ - ctx: ctx, - cancel: cancel, - config: cfg, - host: host, - peerManager: pstoremgr.New(ctx, host, ""), - dht: dht, - store: store, - ipfs: ipfs, - namespace: ns, - pubsub: pubsub, - rpcReady: make(chan struct{}, 1), - readyCh: make(chan struct{}, 1), - stateReady: make(chan struct{}, 1), - batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize), + ctx: ctx, + cancel: cancel, + batchingCtx: batchingCtx, + batchingCancel: batchingCancel, + config: cfg, + host: host, + peerManager: pstoremgr.New(ctx, host, ""), + dht: dht, + store: store, + ipfs: ipfs, + namespace: ns, + pubsub: pubsub, + rpcReady: make(chan struct{}, 1), + readyCh: make(chan struct{}, 1), + stateReady: make(chan struct{}, 1), + sendToBatchCh: make(chan batchItem), + batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize), + batchingDone: make(chan struct{}), } go css.setup() @@ -312,6 +324,7 @@ func (css *Consensus) setup() { css.config.Batching.MaxBatchSize, css.config.Batching.MaxBatchAge.String(), ) + go css.sendToBatchWorker() go css.batchWorker() } @@ -330,9 +343,16 @@ func (css *Consensus) Shutdown(ctx context.Context) error { logger.Debug("already shutdown") return nil } + css.shutdown = true logger.Info("stopping Consensus component") + // Cancel the batching code + css.batchingCancel() + if css.config.batchingEnabled() { + <-css.batchingDone + } + css.cancel() // Only close crdt after canceling the context, otherwise @@ -414,16 +434,14 @@ func (css *Consensus) LogPin(ctx context.Context, pin api.Pin) error { defer span.End() if css.config.batchingEnabled() { - select { - case css.batchItemCh <- batchItem{ - ctx: ctx, - isPin: true, - pin: pin, - }: - return nil - default: - return fmt.Errorf("error pinning: %w", ErrMaxQueueSizeReached) + batched := make(chan error) + css.sendToBatchCh <- batchItem{ + ctx: ctx, + isPin: true, + pin: pin, + batched: batched, } + return <-batched } return css.state.Add(ctx, pin) @@ -435,23 +453,50 @@ func (css *Consensus) LogUnpin(ctx context.Context, pin api.Pin) error { defer span.End() if css.config.batchingEnabled() { - select { - case css.batchItemCh <- batchItem{ - ctx: ctx, - isPin: false, - pin: pin, - }: - return nil - default: - return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached) + batched := make(chan error) + css.sendToBatchCh <- batchItem{ + ctx: ctx, + isPin: false, + pin: pin, + batched: batched, } + return <-batched } return css.state.Rm(ctx, pin.Cid) } +func (css *Consensus) sendToBatchWorker() { + for { + select { + case <-css.batchingCtx.Done(): + close(css.batchItemCh) + // This will stay here forever to catch any pins sent + // while shutting down. + for bi := range css.sendToBatchCh { + bi.batched <- errors.New("shutting down. Pin could not be batched") + close(bi.batched) + } + + return + case bi := <-css.sendToBatchCh: + select { + case css.batchItemCh <- bi: + close(bi.batched) // no error + default: // queue is full + err := fmt.Errorf("error batching item: %w", ErrMaxQueueSizeReached) + logger.Error(err) + bi.batched <- err + close(bi.batched) + } + } + } +} + // Launched in setup as a goroutine. func (css *Consensus) batchWorker() { + defer close(css.batchingDone) + maxSize := css.config.Batching.MaxBatchSize maxAge := css.config.Batching.MaxBatchAge batchCurSize := 0 @@ -462,9 +507,36 @@ func (css *Consensus) batchWorker() { <-batchTimer.C } + // Add/Rm from state + addToBatch := func(bi batchItem) error { + var err error + if bi.isPin { + err = css.batchingState.Add(bi.ctx, bi.pin) + } else { + err = css.batchingState.Rm(bi.ctx, bi.pin.Cid) + } + if err != nil { + logger.Errorf("error batching: %s (%s, isPin: %s)", err, bi.pin.Cid, bi.isPin) + } + return err + } + for { select { - case <-css.ctx.Done(): + case <-css.batchingCtx.Done(): + // Drain batchItemCh for missing things to be batched + for batchItem := range css.batchItemCh { + err := addToBatch(batchItem) + if err != nil { + continue + } + batchCurSize++ + } + if err := css.batchingState.Commit(css.ctx); err != nil { + logger.Errorf("error committing batch during shutdown: %s", err) + } + logger.Infof("batch commit (shutdown): %d items", batchCurSize) + return case batchItem := <-css.batchItemCh: // First item in batch. Start the timer @@ -472,15 +544,8 @@ func (css *Consensus) batchWorker() { batchTimer.Reset(maxAge) } - // Add/Rm from state - var err error - if batchItem.isPin { - err = css.batchingState.Add(batchItem.ctx, batchItem.pin) - } else { - err = css.batchingState.Rm(batchItem.ctx, batchItem.pin.Cid) - } + err := addToBatch(batchItem) if err != nil { - logger.Errorf("error batching: %s (%s, isPin: %s)", err, batchItem.pin.Cid, batchItem.isPin) continue } diff --git a/go.mod b/go.mod index f9d1ad05..339d7810 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/ipfs/go-cid v0.2.0 github.com/ipfs/go-datastore v0.5.1 github.com/ipfs/go-ds-badger v0.3.0 - github.com/ipfs/go-ds-crdt v0.3.5 + github.com/ipfs/go-ds-crdt v0.3.6 github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-fs-lock v0.0.7 github.com/ipfs/go-ipfs-api v0.3.0 diff --git a/go.sum b/go.sum index 478e5572..54f047b1 100644 --- a/go.sum +++ b/go.sum @@ -520,8 +520,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR github.com/ipfs/go-ds-badger v0.2.7/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA= github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro= github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek= -github.com/ipfs/go-ds-crdt v0.3.5 h1:lr2AiMrc1VjAgWXIwkwb7e+duRIgUA5Su3LtugQWp+M= -github.com/ipfs/go-ds-crdt v0.3.5/go.mod h1:w6ktorLWKqddUP2Xb7zIrbOnsjX/oGmoOb2THFi6KCM= +github.com/ipfs/go-ds-crdt v0.3.6 h1:GI/rD7+dpmskToF7GPQ60w8Xr8Z7Xw/r7rhkp42ItQE= +github.com/ipfs/go-ds-crdt v0.3.6/go.mod h1:w6ktorLWKqddUP2Xb7zIrbOnsjX/oGmoOb2THFi6KCM= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=