Merge pull request #1732 from ipfs-cluster/fix/goroutine-leak-adder

Fix: leaking goroutines on aborted /add requests
This commit is contained in:
Hector Sanjuan 2022-07-08 17:40:24 +02:00 committed by GitHub
commit c4d78d52f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 61 additions and 12 deletions

View File

@ -43,6 +43,9 @@ type ClusterDAGService interface {
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error)
// Close performs any necessary cleanups and should be called
// whenever the DAGService is not going to be used anymore.
Close() error
// Allocations returns the allocations made by the cluster DAG service
// for the added content.
Allocations() []peer.ID

View File

@ -55,6 +55,7 @@ func TestAdder(t *testing.T) {
expectedCids := test.ShardingDirCids[:]
dags := newMockCDAGServ()
defer dags.Close()
adder := New(dags, p, nil)
@ -88,6 +89,7 @@ func TestAdder_DoubleStart(t *testing.T) {
p := api.DefaultAddParams()
dags := newMockCDAGServ()
defer dags.Close()
adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), f)
@ -124,6 +126,7 @@ func TestAdder_ContextCancelled(t *testing.T) {
p := api.DefaultAddParams()
dags := newMockCDAGServ()
defer dags.Close()
ctx, cancel := context.WithCancel(context.Background())
adder := New(dags, p, nil)
@ -176,6 +179,8 @@ func TestAdder_CAR(t *testing.T) {
// Add the car, discarding old dags.
dags = newMockCDAGServ()
defer dags.Close()
p.Format = "car"
adder = New(dags, p, nil)
root2, err := adder.FromMultipart(ctx, carMr)
@ -217,6 +222,7 @@ func TestAdder_LargeFolder(t *testing.T) {
p.Wrap = true
dags := newMockCDAGServ()
defer dags.Close()
adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), slf)

View File

@ -39,6 +39,7 @@ func AddMultipartHTTPHandler(
} else {
dags = single.New(ctx, rpc, params, params.Local)
}
defer dags.Close()
if outputTransform == nil {
outputTransform = func(in api.AddedOutput) interface{} { return in }

View File

@ -77,6 +77,15 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return dgs.ingestBlock(ctx, node)
}
// Close performs cleanup and should be called when the DAGService is not
// going to be used anymore.
func (dgs *DAGService) Close() error {
if dgs.currentShard != nil {
dgs.currentShard.Close()
}
return nil
}
// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) {

View File

@ -3,10 +3,11 @@ package sharding
import (
"context"
"fmt"
"sync"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
ipld "github.com/ipfs/go-ipld-format"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
@ -19,12 +20,13 @@ import (
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
@ -93,6 +95,14 @@ func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
}
}
// Close stops any ongoing block streaming.
func (sh *shard) Close() error {
sh.closeBlocksOnce.Do(func() {
close(sh.blocks)
})
return nil
}
// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
@ -110,7 +120,9 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
return cid.Undef, err
}
}
close(sh.blocks)
sh.Close()
select {
case <-ctx.Done():
return cid.Undef, ctx.Err()

View File

@ -4,6 +4,7 @@ package single
import (
"context"
"sync"
adder "github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
@ -31,9 +32,10 @@ type DAGService struct {
addParams api.AddParams
local bool
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
recentBlocks *recentBlocks
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
recentBlocks *recentBlocks
}
// New returns a new Adder with the given rpc Client. The client is used
@ -109,10 +111,20 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
}
}
// Close cleans up the DAGService.
func (dgs *DAGService) Close() error {
dgs.closeBlocksOnce.Do(func() {
close(dgs.blocks)
})
return nil
}
// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) {
close(dgs.blocks)
// Close the blocks channel
dgs.Close()
// Wait for the BlockStreamer to finish.
select {
case <-dgs.ctx.Done():
return root, ctx.Err()

View File

@ -1744,6 +1744,7 @@ func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params
} else {
dags = single.New(ctx, c.rpcClient, params, params.Local)
}
defer dags.Close()
add := adder.New(dags, params, nil)
return add.FromMultipart(ctx, reader)
}

View File

@ -300,6 +300,11 @@ func NewMockDAGService(writeOnly bool) *MockDAGService {
}
}
// Close closes the DAGService.
func (d *MockDAGService) Close() error {
return nil
}
// Get reads a node.
func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) {
if d.writeOnly {