From d19c7facffb9e445088c3a7babc7463ba2bdae62 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 8 Jul 2022 16:15:49 +0200 Subject: [PATCH] Fix: leaking goroutines on aborted /add requests It has been observed that some peers have a growing number of goroutines, usually stuck in go-libp2p-gorpc.MultiStream() function, which is waiting to read items from the arguments channel. We suspect this is due to aborted /add requests. In situations when the add request is aborted or fails, Finalize() is never called and the blocks channel stays open, so MultiStream() can never exit, and the BlockStreamer can never stop streaming etc. As a fix, we added the requirement to call Close() when we stop using a ClusterDAGService (error or not). This should ensure that the blocks channel is always closed and not just on Finalize(). --- adder/adder.go | 3 +++ adder/adder_test.go | 6 ++++++ adder/adderutils/adderutils.go | 1 + adder/sharding/dag_service.go | 9 +++++++++ adder/sharding/shard.go | 28 ++++++++++++++++++++-------- adder/single/dag_service.go | 20 ++++++++++++++++---- cluster.go | 1 + test/sharding.go | 5 +++++ 8 files changed, 61 insertions(+), 12 deletions(-) diff --git a/adder/adder.go b/adder/adder.go index 87581111..0f32e03e 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -43,6 +43,9 @@ type ClusterDAGService interface { // Finalize receives the IPFS content root CID as // returned by the ipfs adder. Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error) + // Close performs any necessary cleanups and should be called + // whenever the DAGService is not going to be used anymore. + Close() error // Allocations returns the allocations made by the cluster DAG service // for the added content. Allocations() []peer.ID diff --git a/adder/adder_test.go b/adder/adder_test.go index 4839850f..d65b36da 100644 --- a/adder/adder_test.go +++ b/adder/adder_test.go @@ -55,6 +55,7 @@ func TestAdder(t *testing.T) { expectedCids := test.ShardingDirCids[:] dags := newMockCDAGServ() + defer dags.Close() adder := New(dags, p, nil) @@ -88,6 +89,7 @@ func TestAdder_DoubleStart(t *testing.T) { p := api.DefaultAddParams() dags := newMockCDAGServ() + defer dags.Close() adder := New(dags, p, nil) _, err := adder.FromFiles(context.Background(), f) @@ -124,6 +126,7 @@ func TestAdder_ContextCancelled(t *testing.T) { p := api.DefaultAddParams() dags := newMockCDAGServ() + defer dags.Close() ctx, cancel := context.WithCancel(context.Background()) adder := New(dags, p, nil) @@ -176,6 +179,8 @@ func TestAdder_CAR(t *testing.T) { // Add the car, discarding old dags. dags = newMockCDAGServ() + defer dags.Close() + p.Format = "car" adder = New(dags, p, nil) root2, err := adder.FromMultipart(ctx, carMr) @@ -217,6 +222,7 @@ func TestAdder_LargeFolder(t *testing.T) { p.Wrap = true dags := newMockCDAGServ() + defer dags.Close() adder := New(dags, p, nil) _, err := adder.FromFiles(context.Background(), slf) diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index a35f5dcc..e204e87d 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -39,6 +39,7 @@ func AddMultipartHTTPHandler( } else { dags = single.New(ctx, rpc, params, params.Local) } + defer dags.Close() if outputTransform == nil { outputTransform = func(in api.AddedOutput) interface{} { return in } diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index 4fea1a2a..021a40a7 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -77,6 +77,15 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { return dgs.ingestBlock(ctx, node) } +// Close performs cleanup and should be called when the DAGService is not +// going to be used anymore. +func (dgs *DAGService) Close() error { + if dgs.currentShard != nil { + dgs.currentShard.Close() + } + return nil +} + // Finalize finishes sharding, creates the cluster DAG and pins it along // with the meta pin for the root node of the content. func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) { diff --git a/adder/sharding/shard.go b/adder/sharding/shard.go index 35d28843..bbf5ff67 100644 --- a/adder/sharding/shard.go +++ b/adder/sharding/shard.go @@ -3,10 +3,11 @@ package sharding import ( "context" "fmt" + "sync" - ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs-cluster/ipfs-cluster/adder" "github.com/ipfs-cluster/ipfs-cluster/api" + ipld "github.com/ipfs/go-ipld-format" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -19,12 +20,13 @@ import ( // a peer to be block-put and will be part of the same shard in the // cluster DAG. type shard struct { - ctx context.Context - rpc *rpc.Client - allocations []peer.ID - pinOptions api.PinOptions - bs *adder.BlockStreamer - blocks chan api.NodeWithMeta + ctx context.Context + rpc *rpc.Client + allocations []peer.ID + pinOptions api.PinOptions + bs *adder.BlockStreamer + blocks chan api.NodeWithMeta + closeBlocksOnce sync.Once // dagNode represents a node with links and will be converted // to Cbor. dagNode map[string]cid.Cid @@ -93,6 +95,14 @@ func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error { } } +// Close stops any ongoing block streaming. +func (sh *shard) Close() error { + sh.closeBlocksOnce.Do(func() { + close(sh.blocks) + }) + return nil +} + // Flush completes the allocation of this shard by building a CBOR node // and adding it to IPFS, then pinning it in cluster. It returns the Cid of the // shard. @@ -110,7 +120,9 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, return cid.Undef, err } } - close(sh.blocks) + + sh.Close() + select { case <-ctx.Done(): return cid.Undef, ctx.Err() diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index 246712e0..9f61abef 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -4,6 +4,7 @@ package single import ( "context" + "sync" adder "github.com/ipfs-cluster/ipfs-cluster/adder" "github.com/ipfs-cluster/ipfs-cluster/api" @@ -31,9 +32,10 @@ type DAGService struct { addParams api.AddParams local bool - bs *adder.BlockStreamer - blocks chan api.NodeWithMeta - recentBlocks *recentBlocks + bs *adder.BlockStreamer + blocks chan api.NodeWithMeta + closeBlocksOnce sync.Once + recentBlocks *recentBlocks } // New returns a new Adder with the given rpc Client. The client is used @@ -109,10 +111,20 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { } } +// Close cleans up the DAGService. +func (dgs *DAGService) Close() error { + dgs.closeBlocksOnce.Do(func() { + close(dgs.blocks) + }) + return nil +} + // Finalize pins the last Cid added to this DAGService. func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) { - close(dgs.blocks) + // Close the blocks channel + dgs.Close() + // Wait for the BlockStreamer to finish. select { case <-dgs.ctx.Done(): return root, ctx.Err() diff --git a/cluster.go b/cluster.go index cb3af382..a5da477f 100644 --- a/cluster.go +++ b/cluster.go @@ -1744,6 +1744,7 @@ func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params } else { dags = single.New(ctx, c.rpcClient, params, params.Local) } + defer dags.Close() add := adder.New(dags, params, nil) return add.FromMultipart(ctx, reader) } diff --git a/test/sharding.go b/test/sharding.go index 0b157ce7..5377d98a 100644 --- a/test/sharding.go +++ b/test/sharding.go @@ -300,6 +300,11 @@ func NewMockDAGService(writeOnly bool) *MockDAGService { } } +// Close closes the DAGService. +func (d *MockDAGService) Close() error { + return nil +} + // Get reads a node. func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) { if d.writeOnly {