ipfs-cluster/adder/ipfsadd/add.go
Hector Sanjuan 759dd4802d Fix 1598: Include allocations in responses from /add
This adds a new allocations field to add response objects which
provides the cluster peers to which the content has been allocated.

In the case of sharded dags, it provides peers for the current shard.
2022-03-15 11:03:11 +01:00

489 lines
11 KiB
Go

// Package ipfsadd is a simplified copy of go-ipfs/core/coreunix/add.go
package ipfsadd
import (
"context"
"errors"
"fmt"
"io"
gopath "path"
"path/filepath"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
chunker "github.com/ipfs/go-ipfs-chunker"
files "github.com/ipfs/go-ipfs-files"
posinfo "github.com/ipfs/go-ipfs-posinfo"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
dag "github.com/ipfs/go-merkledag"
mfs "github.com/ipfs/go-mfs"
unixfs "github.com/ipfs/go-unixfs"
balanced "github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
trickle "github.com/ipfs/go-unixfs/importer/trickle"
peer "github.com/libp2p/go-libp2p-core/peer"
)
var log = logging.Logger("coreunix")
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256
var liveCacheSize = uint64(256 << 10)
// NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, ds ipld.DAGService, allocs func() []peer.ID) (*Adder, error) {
// Cluster: we don't use pinner nor GCLocker.
return &Adder{
ctx: ctx,
dagService: ds,
allocsFun: allocs,
Progress: false,
Trickle: false,
Chunker: "",
}, nil
}
// Adder holds the switches passed to the `add` command.
type Adder struct {
ctx context.Context
dagService ipld.DAGService
allocsFun func() []peer.ID
Out chan *api.AddedOutput
Progress bool
Trickle bool
RawLeaves bool
Silent bool
NoCopy bool
Chunker string
mroot *mfs.Root
tempRoot cid.Cid
CidBuilder cid.Builder
liveNodes uint64
lastFile mfs.FSNode
// Cluster: ipfs does a hack in commands/add.go to set the filenames
// in emitted events correctly. We carry a root folder name (or a
// filename in the case of single files here and emit those events
// correctly from the beginning).
OutputPrefix string
}
func (adder *Adder) mfsRoot() (*mfs.Root, error) {
if adder.mroot != nil {
return adder.mroot, nil
}
rnode := unixfs.EmptyDirNode()
rnode.SetCidBuilder(adder.CidBuilder)
mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
if err != nil {
return nil, err
}
adder.mroot = mr
return adder.mroot, nil
}
// SetMfsRoot sets `r` as the root for Adder.
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
adder.mroot = r
}
// Constructs a node from reader's data, and adds it. Doesn't pin.
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
chnk, err := chunker.FromString(reader, adder.Chunker)
if err != nil {
return nil, err
}
// Cluster: we don't do batching/use BufferedDS.
params := ihelper.DagBuilderParams{
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
NoCopy: adder.NoCopy,
CidBuilder: adder.CidBuilder,
}
db, err := params.New(chnk)
if err != nil {
return nil, err
}
var nd ipld.Node
if adder.Trickle {
nd, err = trickle.Layout(db)
} else {
nd, err = balanced.Layout(db)
}
if err != nil {
return nil, err
}
return nd, nil
}
// Cluster: commented as it is unused
// // RootNode returns the mfs root node
// func (adder *Adder) curRootNode() (ipld.Node, error) {
// mr, err := adder.mfsRoot()
// if err != nil {
// return nil, err
// }
// root, err := mr.GetDirectory().GetNode()
// if err != nil {
// return nil, err
// }
// // if one root file, use that hash as root.
// if len(root.Links()) == 1 {
// nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
// if err != nil {
// return nil, err
// }
// root = nd
// }
// return root, err
// }
// PinRoot recursively pins the root node of Adder and
// writes the pin state to the backing datastore.
// Cluster: we don't pin. Former Finalize().
func (adder *Adder) PinRoot(root ipld.Node) error {
rnk := root.Cid()
err := adder.dagService.Add(adder.ctx, root)
if err != nil {
return err
}
if adder.tempRoot.Defined() {
adder.tempRoot = rnk
}
return nil
}
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
switch fsn := fsn.(type) {
case *mfs.File:
return nil
case *mfs.Directory:
names, err := fsn.ListNames(adder.ctx)
if err != nil {
return err
}
for _, name := range names {
child, err := fsn.Child(name)
if err != nil {
// This fails when Child is of type *mfs.File
// because it tries to get them from the DAG
// service (does not implement this and returns
// a "not found" error)
// *mfs.Files are ignored in the recursive call
// anyway.
// For Cluster, we just ignore errors here.
continue
}
childpath := gopath.Join(path, name)
err = adder.outputDirs(childpath, child)
if err != nil {
return err
}
fsn.Uncache(name)
}
nd, err := fsn.GetNode()
if err != nil {
return err
}
return adder.outputDagnode(adder.Out, path, nd)
default:
return fmt.Errorf("unrecognized fsn type: %#v", fsn)
}
}
func (adder *Adder) addNode(node ipld.Node, path string) error {
// patch it into the root
outputName := path
if path == "" {
path = node.Cid().String()
outputName = ""
}
if pi, ok := node.(*posinfo.FilestoreNode); ok {
node = pi.Node
}
mr, err := adder.mfsRoot()
if err != nil {
return err
}
dir := gopath.Dir(path)
if dir != "." {
opts := mfs.MkdirOpts{
Mkparents: true,
Flush: false,
CidBuilder: adder.CidBuilder,
}
if err := mfs.Mkdir(mr, dir, opts); err != nil {
return err
}
}
if err := mfs.PutNode(mr, path, node); err != nil {
return err
}
// Cluster: cache the last file added.
// This avoids using the DAGService to get the first children
// if the MFS root when not wrapping.
lastFile, err := mfs.NewFile(path, node, nil, adder.dagService)
if err != nil {
return err
}
adder.lastFile = lastFile
if !adder.Silent {
return adder.outputDagnode(adder.Out, outputName, node)
}
return nil
}
// AddAllAndPin adds the given request's files and pin them.
// Cluster: we don'pin. Former AddFiles.
func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
if err := adder.addFileNode("", file, true); err != nil {
return nil, err
}
// get root
mr, err := adder.mfsRoot()
if err != nil {
return nil, err
}
var root mfs.FSNode
rootdir := mr.GetDirectory()
root = rootdir
err = root.Flush()
if err != nil {
return nil, err
}
// if adding a file without wrapping, swap the root to it (when adding a
// directory, mfs root is the directory)
_, dir := file.(files.Directory)
var name string
if !dir {
children, err := rootdir.ListNames(adder.ctx)
if err != nil {
return nil, err
}
if len(children) == 0 {
return nil, fmt.Errorf("expected at least one child dir, got none")
}
// Replace root with the first child
name = children[0]
root, err = rootdir.Child(name)
if err != nil {
// Cluster: use the last file we added
// if we have one.
if adder.lastFile == nil {
return nil, err
}
root = adder.lastFile
}
}
err = mr.Close()
if err != nil {
return nil, err
}
nd, err := root.GetNode()
if err != nil {
return nil, err
}
// output directory events
err = adder.outputDirs(name, root)
if err != nil {
return nil, err
}
// Cluster: call PinRoot which adds the root cid to the DAGService.
// Unsure if this a bug in IPFS when not pinning. Or it would get added
// twice.
return nd, adder.PinRoot(nd)
}
// Cluster: we don't Pause for GC
func (adder *Adder) addFileNode(path string, file files.Node, toplevel bool) error {
defer file.Close()
if adder.liveNodes >= liveCacheSize {
// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
mr, err := adder.mfsRoot()
if err != nil {
return err
}
if err := mr.FlushMemFree(adder.ctx); err != nil {
return err
}
adder.liveNodes = 0
}
adder.liveNodes++
switch f := file.(type) {
case files.Directory:
return adder.addDir(path, f, toplevel)
case *files.Symlink:
return adder.addSymlink(path, f)
case files.File:
return adder.addFile(path, f)
default:
return errors.New("unknown file type")
}
}
func (adder *Adder) addSymlink(path string, l *files.Symlink) error {
sdata, err := unixfs.SymlinkData(l.Target)
if err != nil {
return err
}
dagnode := dag.NodeWithData(sdata)
dagnode.SetCidBuilder(adder.CidBuilder)
err = adder.dagService.Add(adder.ctx, dagnode)
if err != nil {
return err
}
return adder.addNode(dagnode, path)
}
func (adder *Adder) addFile(path string, file files.File) error {
// if the progress flag was specified, wrap the file so that we can send
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
rdr := &progressReader{file: reader, path: path, out: adder.Out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
reader = rdr
}
}
dagnode, err := adder.add(reader)
if err != nil {
return err
}
// patch it into the root
return adder.addNode(dagnode, path)
}
func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) error {
log.Infof("adding directory: %s", path)
if !(toplevel && path == "") {
mr, err := adder.mfsRoot()
if err != nil {
return err
}
err = mfs.Mkdir(mr, path, mfs.MkdirOpts{
Mkparents: true,
Flush: false,
CidBuilder: adder.CidBuilder,
})
if err != nil {
return err
}
}
it := dir.Entries()
for it.Next() {
fpath := gopath.Join(path, it.Name())
err := adder.addFileNode(fpath, it.Node(), false)
if err != nil {
return err
}
}
return it.Err()
}
// outputDagnode sends dagnode info over the output channel.
// Cluster: we use *api.AddedOutput instead of coreiface events
// and make this an adder method to be be able to prefix.
func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error {
if out == nil {
return nil
}
s, err := dn.Size()
if err != nil {
return err
}
// When adding things in a folder: "OutputPrefix/name"
// When adding a single file: "OutputPrefix" (name is unset)
// When adding a single thing with no name: ""
// Note: ipfs sets the name of files received on stdin to the CID,
// but cluster does not support stdin-adding so we do not
// account for this here.
name = filepath.Join(adder.OutputPrefix, name)
out <- &api.AddedOutput{
Cid: dn.Cid(),
Name: name,
Size: s,
Allocations: adder.allocsFun(),
}
return nil
}
type progressReader struct {
file io.Reader
path string
out chan *api.AddedOutput
bytes int64
lastProgress int64
}
func (i *progressReader) Read(p []byte) (int, error) {
n, err := i.file.Read(p)
i.bytes += int64(n)
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes
i.out <- &api.AddedOutput{
Name: i.path,
Bytes: uint64(i.bytes),
}
}
return n, err
}
type progressReader2 struct {
*progressReader
files.FileInfo
}
func (i *progressReader2) Read(p []byte) (int, error) {
return i.progressReader.Read(p)
}