ipfs-cluster/adder/adder.go
Hector Sanjuan d19c7facff Fix: leaking goroutines on aborted /add requests
It has been observed that some peers have a growing number of goroutines,
usually stuck in go-libp2p-gorpc.MultiStream() function, which is waiting to
read items from the arguments channel.

We suspect this is due to aborted /add requests. In situations when the add
request is aborted or fails, Finalize() is never called and the blocks channel
stays open, so MultiStream() can never exit, and the BlockStreamer can never
stop streaming etc.

As a fix, we added the requirement to call Close() when we stop using a
ClusterDAGService (error or not). This should ensure that the blocks channel
is always closed and not just on Finalize().
2022-07-08 17:39:59 +02:00

335 lines
8.8 KiB
Go

// Package adder implements functionality to add content to IPFS daemons
// managed by the Cluster.
package adder
import (
"context"
"errors"
"fmt"
"io"
"mime/multipart"
"strings"
"github.com/ipfs-cluster/ipfs-cluster/adder/ipfsadd"
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs/go-unixfs"
"github.com/ipld/go-car"
peer "github.com/libp2p/go-libp2p-core/peer"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
merkledag "github.com/ipfs/go-merkledag"
multihash "github.com/multiformats/go-multihash"
)
var logger = logging.Logger("adder")
// go-merkledag does this, but it may be moved.
// We include for explicitness.
func init() {
ipld.Register(cid.DagProtobuf, merkledag.DecodeProtobufBlock)
ipld.Register(cid.Raw, merkledag.DecodeRawBlock)
ipld.Register(cid.DagCBOR, cbor.DecodeBlock)
}
// ClusterDAGService is an implementation of ipld.DAGService plus a Finalize
// method. ClusterDAGServices can be used to provide Adders with a different
// add implementation.
type ClusterDAGService interface {
ipld.DAGService
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error)
// Close performs any necessary cleanups and should be called
// whenever the DAGService is not going to be used anymore.
Close() error
// Allocations returns the allocations made by the cluster DAG service
// for the added content.
Allocations() []peer.ID
}
// A dagFormatter can create dags from files.Node. It can keep state
// to add several files to the same dag.
type dagFormatter interface {
Add(name string, f files.Node) (api.Cid, error)
}
// Adder is used to add content to IPFS Cluster using an implementation of
// ClusterDAGService.
type Adder struct {
ctx context.Context
cancel context.CancelFunc
dgs ClusterDAGService
params api.AddParams
// AddedOutput updates are placed on this channel
// whenever a block is processed. They contain information
// about the block, the CID, the Name etc. and are mostly
// meant to be streamed back to the user.
output chan api.AddedOutput
}
// New returns a new Adder with the given ClusterDAGService, add options and a
// channel to send updates during the adding process.
//
// An Adder may only be used once.
func New(ds ClusterDAGService, p api.AddParams, out chan api.AddedOutput) *Adder {
// Discard all progress update output as the caller has not provided
// a channel for them to listen on.
if out == nil {
out = make(chan api.AddedOutput, 100)
go func() {
for range out {
}
}()
}
return &Adder{
dgs: ds,
params: p,
output: out,
}
}
func (a *Adder) setContext(ctx context.Context) {
if a.ctx == nil { // only allows first context
ctxc, cancel := context.WithCancel(ctx)
a.ctx = ctxc
a.cancel = cancel
}
}
// FromMultipart adds content from a multipart.Reader. The adder will
// no longer be usable after calling this method.
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (api.Cid, error) {
logger.Debugf("adding from multipart with params: %+v", a.params)
f, err := files.NewFileFromPartReader(r, "multipart/form-data")
if err != nil {
return api.CidUndef, err
}
defer f.Close()
return a.FromFiles(ctx, f)
}
// FromFiles adds content from a files.Directory. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (api.Cid, error) {
logger.Debug("adding from files")
a.setContext(ctx)
if a.ctx.Err() != nil { // don't allow running twice
return api.CidUndef, a.ctx.Err()
}
defer a.cancel()
defer close(a.output)
var dagFmtr dagFormatter
var err error
switch a.params.Format {
case "", "unixfs":
dagFmtr, err = newIpfsAdder(ctx, a.dgs, a.params, a.output)
case "car":
dagFmtr, err = newCarAdder(ctx, a.dgs, a.params, a.output)
default:
err = errors.New("bad dag formatter option")
}
if err != nil {
return api.CidUndef, err
}
// setup wrapping
if a.params.Wrap {
f = files.NewSliceDirectory(
[]files.DirEntry{files.FileEntry("", f)},
)
}
it := f.Entries()
var adderRoot api.Cid
for it.Next() {
select {
case <-a.ctx.Done():
return api.CidUndef, a.ctx.Err()
default:
logger.Debugf("ipfsAdder AddFile(%s)", it.Name())
adderRoot, err = dagFmtr.Add(it.Name(), it.Node())
if err != nil {
logger.Error("error adding to cluster: ", err)
return api.CidUndef, err
}
}
// TODO (hector): We can only add a single CAR file for the
// moment.
if a.params.Format == "car" {
break
}
}
if it.Err() != nil {
return api.CidUndef, it.Err()
}
clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot)
if err != nil {
logger.Error("error finalizing adder:", err)
return api.CidUndef, err
}
logger.Infof("%s successfully added to cluster", clusterRoot)
return clusterRoot, nil
}
// A wrapper around the ipfsadd.Adder to satisfy the dagFormatter interface.
type ipfsAdder struct {
*ipfsadd.Adder
}
func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*ipfsAdder, error) {
iadder, err := ipfsadd.NewAdder(ctx, dgs, dgs.Allocations)
if err != nil {
logger.Error(err)
return nil, err
}
iadder.Trickle = params.Layout == "trickle"
iadder.RawLeaves = params.RawLeaves
iadder.Chunker = params.Chunker
iadder.Out = out
iadder.Progress = params.Progress
iadder.NoCopy = params.NoCopy
// Set up prefi
prefix, err := merkledag.PrefixForCidVersion(params.CidVersion)
if err != nil {
return nil, fmt.Errorf("bad CID Version: %s", err)
}
hashFunCode, ok := multihash.Names[strings.ToLower(params.HashFun)]
if !ok {
return nil, errors.New("hash function name not known")
}
prefix.MhType = hashFunCode
prefix.MhLength = -1
iadder.CidBuilder = &prefix
return &ipfsAdder{
Adder: iadder,
}, nil
}
func (ia *ipfsAdder) Add(name string, f files.Node) (api.Cid, error) {
// In order to set the AddedOutput names right, we use
// OutputPrefix:
//
// When adding a folder, this is the root folder name which is
// prepended to the addedpaths. When adding a single file,
// this is the name of the file which overrides the empty
// AddedOutput name.
//
// After coreunix/add.go was refactored in go-ipfs and we
// followed suit, it no longer receives the name of the
// file/folder being added and does not emit AddedOutput
// events with the right names. We addressed this by adding
// OutputPrefix to our version. go-ipfs modifies emitted
// events before sending to user).
ia.OutputPrefix = name
nd, err := ia.AddAllAndPin(f)
if err != nil {
return api.CidUndef, err
}
return api.NewCid(nd.Cid()), nil
}
// An adder to add CAR files. It is at the moment very basic, and can
// add a single CAR file with a single root. Ideally, it should be able to
// add more complex, or several CARs by wrapping them with a single root.
// But for that we would need to keep state and track an MFS root similarly to
// what the ipfsadder does.
type carAdder struct {
ctx context.Context
dgs ClusterDAGService
params api.AddParams
output chan api.AddedOutput
}
func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*carAdder, error) {
return &carAdder{
ctx: ctx,
dgs: dgs,
params: params,
output: out,
}, nil
}
// Add takes a node which should be a CAR file and nothing else and
// adds its blocks using the ClusterDAGService.
func (ca *carAdder) Add(name string, fn files.Node) (api.Cid, error) {
if ca.params.Wrap {
return api.CidUndef, errors.New("cannot wrap a CAR file upload")
}
f, ok := fn.(files.File)
if !ok {
return api.CidUndef, errors.New("expected CAR file is not of type file")
}
carReader, err := car.NewCarReader(f)
if err != nil {
return api.CidUndef, err
}
if len(carReader.Header.Roots) != 1 {
return api.CidUndef, errors.New("only CAR files with a single root are supported")
}
root := carReader.Header.Roots[0]
bytes := uint64(0)
size := uint64(0)
for {
block, err := carReader.Next()
if err != nil && err != io.EOF {
return api.CidUndef, err
} else if block == nil {
break
}
bytes += uint64(len(block.RawData()))
nd, err := ipld.Decode(block)
if err != nil {
return api.CidUndef, err
}
// If the root is in the CAR and the root is a UnixFS
// node, then set the size in the output object.
if nd.Cid().Equals(root) {
ufs, err := unixfs.ExtractFSNode(nd)
if err == nil {
size = ufs.FileSize()
}
}
err = ca.dgs.Add(ca.ctx, nd)
if err != nil {
return api.CidUndef, err
}
}
ca.output <- api.AddedOutput{
Name: name,
Cid: api.NewCid(root),
Bytes: bytes,
Size: size,
Allocations: ca.dgs.Allocations(),
}
return api.NewCid(root), nil
}