diff --git a/adder/adder.go b/adder/adder.go index 87581111..0f32e03e 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -43,6 +43,9 @@ type ClusterDAGService interface { // Finalize receives the IPFS content root CID as // returned by the ipfs adder. Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error) + // Close performs any necessary cleanups and should be called + // whenever the DAGService is not going to be used anymore. + Close() error // Allocations returns the allocations made by the cluster DAG service // for the added content. Allocations() []peer.ID diff --git a/adder/adder_test.go b/adder/adder_test.go index 4839850f..d65b36da 100644 --- a/adder/adder_test.go +++ b/adder/adder_test.go @@ -55,6 +55,7 @@ func TestAdder(t *testing.T) { expectedCids := test.ShardingDirCids[:] dags := newMockCDAGServ() + defer dags.Close() adder := New(dags, p, nil) @@ -88,6 +89,7 @@ func TestAdder_DoubleStart(t *testing.T) { p := api.DefaultAddParams() dags := newMockCDAGServ() + defer dags.Close() adder := New(dags, p, nil) _, err := adder.FromFiles(context.Background(), f) @@ -124,6 +126,7 @@ func TestAdder_ContextCancelled(t *testing.T) { p := api.DefaultAddParams() dags := newMockCDAGServ() + defer dags.Close() ctx, cancel := context.WithCancel(context.Background()) adder := New(dags, p, nil) @@ -176,6 +179,8 @@ func TestAdder_CAR(t *testing.T) { // Add the car, discarding old dags. dags = newMockCDAGServ() + defer dags.Close() + p.Format = "car" adder = New(dags, p, nil) root2, err := adder.FromMultipart(ctx, carMr) @@ -217,6 +222,7 @@ func TestAdder_LargeFolder(t *testing.T) { p.Wrap = true dags := newMockCDAGServ() + defer dags.Close() adder := New(dags, p, nil) _, err := adder.FromFiles(context.Background(), slf) diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index a35f5dcc..e204e87d 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -39,6 +39,7 @@ func AddMultipartHTTPHandler( } else { dags = single.New(ctx, rpc, params, params.Local) } + defer dags.Close() if outputTransform == nil { outputTransform = func(in api.AddedOutput) interface{} { return in } diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index 4fea1a2a..021a40a7 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -77,6 +77,15 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { return dgs.ingestBlock(ctx, node) } +// Close performs cleanup and should be called when the DAGService is not +// going to be used anymore. +func (dgs *DAGService) Close() error { + if dgs.currentShard != nil { + dgs.currentShard.Close() + } + return nil +} + // Finalize finishes sharding, creates the cluster DAG and pins it along // with the meta pin for the root node of the content. func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) { diff --git a/adder/sharding/shard.go b/adder/sharding/shard.go index 35d28843..bbf5ff67 100644 --- a/adder/sharding/shard.go +++ b/adder/sharding/shard.go @@ -3,10 +3,11 @@ package sharding import ( "context" "fmt" + "sync" - ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs-cluster/ipfs-cluster/adder" "github.com/ipfs-cluster/ipfs-cluster/api" + ipld "github.com/ipfs/go-ipld-format" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -19,12 +20,13 @@ import ( // a peer to be block-put and will be part of the same shard in the // cluster DAG. type shard struct { - ctx context.Context - rpc *rpc.Client - allocations []peer.ID - pinOptions api.PinOptions - bs *adder.BlockStreamer - blocks chan api.NodeWithMeta + ctx context.Context + rpc *rpc.Client + allocations []peer.ID + pinOptions api.PinOptions + bs *adder.BlockStreamer + blocks chan api.NodeWithMeta + closeBlocksOnce sync.Once // dagNode represents a node with links and will be converted // to Cbor. dagNode map[string]cid.Cid @@ -93,6 +95,14 @@ func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error { } } +// Close stops any ongoing block streaming. +func (sh *shard) Close() error { + sh.closeBlocksOnce.Do(func() { + close(sh.blocks) + }) + return nil +} + // Flush completes the allocation of this shard by building a CBOR node // and adding it to IPFS, then pinning it in cluster. It returns the Cid of the // shard. @@ -110,7 +120,9 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, return cid.Undef, err } } - close(sh.blocks) + + sh.Close() + select { case <-ctx.Done(): return cid.Undef, ctx.Err() diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index 246712e0..9f61abef 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -4,6 +4,7 @@ package single import ( "context" + "sync" adder "github.com/ipfs-cluster/ipfs-cluster/adder" "github.com/ipfs-cluster/ipfs-cluster/api" @@ -31,9 +32,10 @@ type DAGService struct { addParams api.AddParams local bool - bs *adder.BlockStreamer - blocks chan api.NodeWithMeta - recentBlocks *recentBlocks + bs *adder.BlockStreamer + blocks chan api.NodeWithMeta + closeBlocksOnce sync.Once + recentBlocks *recentBlocks } // New returns a new Adder with the given rpc Client. The client is used @@ -109,10 +111,20 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { } } +// Close cleans up the DAGService. +func (dgs *DAGService) Close() error { + dgs.closeBlocksOnce.Do(func() { + close(dgs.blocks) + }) + return nil +} + // Finalize pins the last Cid added to this DAGService. func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) { - close(dgs.blocks) + // Close the blocks channel + dgs.Close() + // Wait for the BlockStreamer to finish. select { case <-dgs.ctx.Done(): return root, ctx.Err() diff --git a/cluster.go b/cluster.go index cb3af382..a5da477f 100644 --- a/cluster.go +++ b/cluster.go @@ -1744,6 +1744,7 @@ func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params } else { dags = single.New(ctx, c.rpcClient, params, params.Local) } + defer dags.Close() add := adder.New(dags, params, nil) return add.FromMultipart(ctx, reader) } diff --git a/test/sharding.go b/test/sharding.go index 0b157ce7..5377d98a 100644 --- a/test/sharding.go +++ b/test/sharding.go @@ -300,6 +300,11 @@ func NewMockDAGService(writeOnly bool) *MockDAGService { } } +// Close closes the DAGService. +func (d *MockDAGService) Close() error { + return nil +} + // Get reads a node. func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) { if d.writeOnly {