2018-07-24 12:21:29 +00:00
|
|
|
// Package ipfsadd is a simplified copy of go-ipfs/core/coreunix/add.go
|
2018-07-04 16:30:24 +00:00
|
|
|
package ipfsadd
|
2018-02-28 22:49:03 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2018-12-01 04:58:55 +00:00
|
|
|
"errors"
|
2018-02-28 22:49:03 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
gopath "path"
|
2019-12-17 22:01:25 +00:00
|
|
|
"path/filepath"
|
2018-02-28 22:49:03 +00:00
|
|
|
|
2022-06-15 09:19:17 +00:00
|
|
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
2018-02-28 22:49:03 +00:00
|
|
|
|
2023-03-27 19:49:08 +00:00
|
|
|
chunker "github.com/ipfs/boxo/chunker"
|
|
|
|
files "github.com/ipfs/boxo/files"
|
|
|
|
posinfo "github.com/ipfs/boxo/filestore/posinfo"
|
|
|
|
dag "github.com/ipfs/boxo/ipld/merkledag"
|
|
|
|
unixfs "github.com/ipfs/boxo/ipld/unixfs"
|
|
|
|
balanced "github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
|
|
|
|
ihelper "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
|
|
|
|
trickle "github.com/ipfs/boxo/ipld/unixfs/importer/trickle"
|
|
|
|
mfs "github.com/ipfs/boxo/mfs"
|
2018-12-06 18:59:05 +00:00
|
|
|
cid "github.com/ipfs/go-cid"
|
|
|
|
ipld "github.com/ipfs/go-ipld-format"
|
2020-03-13 20:40:02 +00:00
|
|
|
logging "github.com/ipfs/go-log/v2"
|
2022-09-06 14:57:17 +00:00
|
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
2018-02-28 22:49:03 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var log = logging.Logger("coreunix")
|
|
|
|
|
|
|
|
// how many bytes of progress to wait before sending a progress update message
|
|
|
|
const progressReaderIncrement = 1024 * 256
|
|
|
|
|
|
|
|
var liveCacheSize = uint64(256 << 10)
|
|
|
|
|
|
|
|
// NewAdder Returns a new Adder used for a file add operation.
|
2022-03-14 14:45:51 +00:00
|
|
|
func NewAdder(ctx context.Context, ds ipld.DAGService, allocs func() []peer.ID) (*Adder, error) {
|
2019-09-27 16:54:49 +00:00
|
|
|
// Cluster: we don't use pinner nor GCLocker.
|
2018-02-28 22:49:03 +00:00
|
|
|
return &Adder{
|
|
|
|
ctx: ctx,
|
|
|
|
dagService: ds,
|
2022-03-14 14:45:51 +00:00
|
|
|
allocsFun: allocs,
|
2018-02-28 22:49:03 +00:00
|
|
|
Progress: false,
|
|
|
|
Trickle: false,
|
|
|
|
Chunker: "",
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Adder holds the switches passed to the `add` command.
|
|
|
|
type Adder struct {
|
|
|
|
ctx context.Context
|
|
|
|
dagService ipld.DAGService
|
2022-03-14 14:45:51 +00:00
|
|
|
allocsFun func() []peer.ID
|
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
|
|
|
Out chan api.AddedOutput
|
2018-02-28 22:49:03 +00:00
|
|
|
Progress bool
|
|
|
|
Trickle bool
|
|
|
|
RawLeaves bool
|
|
|
|
Silent bool
|
|
|
|
NoCopy bool
|
|
|
|
Chunker string
|
|
|
|
mroot *mfs.Root
|
2018-09-22 01:00:10 +00:00
|
|
|
tempRoot cid.Cid
|
2018-08-20 14:25:26 +00:00
|
|
|
CidBuilder cid.Builder
|
2018-02-28 22:49:03 +00:00
|
|
|
liveNodes uint64
|
2018-08-08 23:16:30 +00:00
|
|
|
lastFile mfs.FSNode
|
2019-12-17 23:24:16 +00:00
|
|
|
// Cluster: ipfs does a hack in commands/add.go to set the filenames
|
2020-02-03 09:30:04 +00:00
|
|
|
// in emitted events correctly. We carry a root folder name (or a
|
2019-12-17 23:24:16 +00:00
|
|
|
// filename in the case of single files here and emit those events
|
|
|
|
// correctly from the beginning).
|
|
|
|
OutputPrefix string
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (adder *Adder) mfsRoot() (*mfs.Root, error) {
|
|
|
|
if adder.mroot != nil {
|
|
|
|
return adder.mroot, nil
|
|
|
|
}
|
|
|
|
rnode := unixfs.EmptyDirNode()
|
2018-08-20 14:25:26 +00:00
|
|
|
rnode.SetCidBuilder(adder.CidBuilder)
|
2018-02-28 22:49:03 +00:00
|
|
|
mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
adder.mroot = mr
|
|
|
|
return adder.mroot, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetMfsRoot sets `r` as the root for Adder.
|
|
|
|
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
|
|
|
|
adder.mroot = r
|
|
|
|
}
|
|
|
|
|
|
|
|
// Constructs a node from reader's data, and adds it. Doesn't pin.
|
|
|
|
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
|
2018-08-10 12:39:34 +00:00
|
|
|
chnk, err := chunker.FromString(reader, adder.Chunker)
|
2018-02-28 22:49:03 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
// Cluster: we don't do batching/use BufferedDS.
|
2018-10-29 11:50:36 +00:00
|
|
|
|
2018-02-28 22:49:03 +00:00
|
|
|
params := ihelper.DagBuilderParams{
|
2018-08-20 14:25:26 +00:00
|
|
|
Dagserv: adder.dagService,
|
|
|
|
RawLeaves: adder.RawLeaves,
|
|
|
|
Maxlinks: ihelper.DefaultLinksPerBlock,
|
|
|
|
NoCopy: adder.NoCopy,
|
|
|
|
CidBuilder: adder.CidBuilder,
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
db, err := params.New(chnk)
|
2019-02-15 12:40:53 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
var nd ipld.Node
|
2018-02-28 22:49:03 +00:00
|
|
|
if adder.Trickle {
|
2019-09-27 16:54:49 +00:00
|
|
|
nd, err = trickle.Layout(db)
|
|
|
|
} else {
|
|
|
|
nd, err = balanced.Layout(db)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
return nd, nil
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2020-04-14 17:58:00 +00:00
|
|
|
// Cluster: commented as it is unused
|
|
|
|
// // RootNode returns the mfs root node
|
|
|
|
// func (adder *Adder) curRootNode() (ipld.Node, error) {
|
|
|
|
// mr, err := adder.mfsRoot()
|
|
|
|
// if err != nil {
|
|
|
|
// return nil, err
|
|
|
|
// }
|
|
|
|
// root, err := mr.GetDirectory().GetNode()
|
|
|
|
// if err != nil {
|
|
|
|
// return nil, err
|
|
|
|
// }
|
|
|
|
|
|
|
|
// // if one root file, use that hash as root.
|
|
|
|
// if len(root.Links()) == 1 {
|
|
|
|
// nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
|
|
|
|
// if err != nil {
|
|
|
|
// return nil, err
|
|
|
|
// }
|
|
|
|
|
|
|
|
// root = nd
|
|
|
|
// }
|
|
|
|
|
|
|
|
// return root, err
|
|
|
|
// }
|
2018-02-28 22:49:03 +00:00
|
|
|
|
2019-10-01 11:59:54 +00:00
|
|
|
// PinRoot recursively pins the root node of Adder and
|
2019-09-27 16:54:49 +00:00
|
|
|
// writes the pin state to the backing datastore.
|
|
|
|
// Cluster: we don't pin. Former Finalize().
|
|
|
|
func (adder *Adder) PinRoot(root ipld.Node) error {
|
|
|
|
rnk := root.Cid()
|
2018-08-10 12:39:34 +00:00
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
err := adder.dagService.Add(adder.ctx, root)
|
2018-02-28 22:49:03 +00:00
|
|
|
if err != nil {
|
2019-09-27 16:54:49 +00:00
|
|
|
return err
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
if adder.tempRoot.Defined() {
|
|
|
|
adder.tempRoot = rnk
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
2018-08-10 12:39:34 +00:00
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
return nil
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
|
|
|
|
switch fsn := fsn.(type) {
|
|
|
|
case *mfs.File:
|
|
|
|
return nil
|
|
|
|
case *mfs.Directory:
|
|
|
|
names, err := fsn.ListNames(adder.ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, name := range names {
|
|
|
|
child, err := fsn.Child(name)
|
|
|
|
if err != nil {
|
2018-07-19 13:17:27 +00:00
|
|
|
// This fails when Child is of type *mfs.File
|
|
|
|
// because it tries to get them from the DAG
|
|
|
|
// service (does not implement this and returns
|
|
|
|
// a "not found" error)
|
|
|
|
// *mfs.Files are ignored in the recursive call
|
|
|
|
// anyway.
|
|
|
|
// For Cluster, we just ignore errors here.
|
2018-07-04 16:30:24 +00:00
|
|
|
continue
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
childpath := gopath.Join(path, name)
|
|
|
|
err = adder.outputDirs(childpath, child)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
fsn.Uncache(name)
|
|
|
|
}
|
|
|
|
nd, err := fsn.GetNode()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-09-27 16:54:49 +00:00
|
|
|
|
2019-12-17 22:01:25 +00:00
|
|
|
return adder.outputDagnode(adder.Out, path, nd)
|
2018-02-28 22:49:03 +00:00
|
|
|
default:
|
|
|
|
return fmt.Errorf("unrecognized fsn type: %#v", fsn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (adder *Adder) addNode(node ipld.Node, path string) error {
|
|
|
|
// patch it into the root
|
2019-12-17 23:24:16 +00:00
|
|
|
outputName := path
|
2018-02-28 22:49:03 +00:00
|
|
|
if path == "" {
|
|
|
|
path = node.Cid().String()
|
2019-12-17 23:24:16 +00:00
|
|
|
outputName = ""
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if pi, ok := node.(*posinfo.FilestoreNode); ok {
|
|
|
|
node = pi.Node
|
|
|
|
}
|
|
|
|
|
|
|
|
mr, err := adder.mfsRoot()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
dir := gopath.Dir(path)
|
|
|
|
if dir != "." {
|
|
|
|
opts := mfs.MkdirOpts{
|
2018-08-20 14:25:26 +00:00
|
|
|
Mkparents: true,
|
|
|
|
Flush: false,
|
|
|
|
CidBuilder: adder.CidBuilder,
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
if err := mfs.Mkdir(mr, dir, opts); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := mfs.PutNode(mr, path, node); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-08 23:16:30 +00:00
|
|
|
// Cluster: cache the last file added.
|
|
|
|
// This avoids using the DAGService to get the first children
|
|
|
|
// if the MFS root when not wrapping.
|
|
|
|
lastFile, err := mfs.NewFile(path, node, nil, adder.dagService)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
adder.lastFile = lastFile
|
|
|
|
|
2018-02-28 22:49:03 +00:00
|
|
|
if !adder.Silent {
|
2019-12-17 23:24:16 +00:00
|
|
|
return adder.outputDagnode(adder.Out, outputName, node)
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
// AddAllAndPin adds the given request's files and pin them.
|
|
|
|
// Cluster: we don'pin. Former AddFiles.
|
|
|
|
func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
|
|
|
|
if err := adder.addFileNode("", file, true); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// get root
|
|
|
|
mr, err := adder.mfsRoot()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var root mfs.FSNode
|
|
|
|
rootdir := mr.GetDirectory()
|
|
|
|
root = rootdir
|
|
|
|
|
|
|
|
err = root.Flush()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// if adding a file without wrapping, swap the root to it (when adding a
|
|
|
|
// directory, mfs root is the directory)
|
|
|
|
_, dir := file.(files.Directory)
|
|
|
|
var name string
|
|
|
|
if !dir {
|
|
|
|
children, err := rootdir.ListNames(adder.ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(children) == 0 {
|
|
|
|
return nil, fmt.Errorf("expected at least one child dir, got none")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Replace root with the first child
|
|
|
|
name = children[0]
|
|
|
|
root, err = rootdir.Child(name)
|
|
|
|
if err != nil {
|
|
|
|
// Cluster: use the last file we added
|
|
|
|
// if we have one.
|
|
|
|
if adder.lastFile == nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
root = adder.lastFile
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
err = mr.Close()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
nd, err := root.GetNode()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// output directory events
|
|
|
|
err = adder.outputDirs(name, root)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cluster: call PinRoot which adds the root cid to the DAGService.
|
|
|
|
// Unsure if this a bug in IPFS when not pinning. Or it would get added
|
|
|
|
// twice.
|
|
|
|
return nd, adder.PinRoot(nd)
|
2018-08-10 12:39:34 +00:00
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
// Cluster: we don't Pause for GC
|
|
|
|
func (adder *Adder) addFileNode(path string, file files.Node, toplevel bool) error {
|
|
|
|
defer file.Close()
|
|
|
|
|
2018-02-28 22:49:03 +00:00
|
|
|
if adder.liveNodes >= liveCacheSize {
|
|
|
|
// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
|
|
|
|
mr, err := adder.mfsRoot()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := mr.FlushMemFree(adder.ctx); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
adder.liveNodes = 0
|
|
|
|
}
|
|
|
|
adder.liveNodes++
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
switch f := file.(type) {
|
|
|
|
case files.Directory:
|
|
|
|
return adder.addDir(path, f, toplevel)
|
|
|
|
case *files.Symlink:
|
|
|
|
return adder.addSymlink(path, f)
|
|
|
|
case files.File:
|
|
|
|
return adder.addFile(path, f)
|
|
|
|
default:
|
|
|
|
return errors.New("unknown file type")
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
2019-09-27 16:54:49 +00:00
|
|
|
}
|
2018-02-28 22:49:03 +00:00
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
func (adder *Adder) addSymlink(path string, l *files.Symlink) error {
|
|
|
|
sdata, err := unixfs.SymlinkData(l.Target)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2018-12-01 04:58:55 +00:00
|
|
|
}
|
2019-09-27 16:54:49 +00:00
|
|
|
|
|
|
|
dagnode := dag.NodeWithData(sdata)
|
|
|
|
dagnode.SetCidBuilder(adder.CidBuilder)
|
|
|
|
err = adder.dagService.Add(adder.ctx, dagnode)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
return adder.addNode(dagnode, path)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (adder *Adder) addFile(path string, file files.File) error {
|
2018-02-28 22:49:03 +00:00
|
|
|
// if the progress flag was specified, wrap the file so that we can send
|
|
|
|
// progress updates to the client (over the output channel)
|
|
|
|
var reader io.Reader = file
|
|
|
|
if adder.Progress {
|
2019-09-27 16:54:49 +00:00
|
|
|
rdr := &progressReader{file: reader, path: path, out: adder.Out}
|
2018-02-28 22:49:03 +00:00
|
|
|
if fi, ok := file.(files.FileInfo); ok {
|
|
|
|
reader = &progressReader2{rdr, fi}
|
|
|
|
} else {
|
|
|
|
reader = rdr
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
dagnode, err := adder.add(reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// patch it into the root
|
2018-12-01 04:58:55 +00:00
|
|
|
return adder.addNode(dagnode, path)
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) error {
|
2018-12-01 04:58:55 +00:00
|
|
|
log.Infof("adding directory: %s", path)
|
2018-02-28 22:49:03 +00:00
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
if !(toplevel && path == "") {
|
|
|
|
mr, err := adder.mfsRoot()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
err = mfs.Mkdir(mr, path, mfs.MkdirOpts{
|
|
|
|
Mkparents: true,
|
|
|
|
Flush: false,
|
|
|
|
CidBuilder: adder.CidBuilder,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2018-12-01 04:58:55 +00:00
|
|
|
it := dir.Entries()
|
|
|
|
for it.Next() {
|
|
|
|
fpath := gopath.Join(path, it.Name())
|
2019-09-27 16:54:49 +00:00
|
|
|
err := adder.addFileNode(fpath, it.Node(), false)
|
|
|
|
if err != nil {
|
2018-02-28 22:49:03 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2019-09-27 16:54:49 +00:00
|
|
|
|
2018-12-01 04:58:55 +00:00
|
|
|
return it.Err()
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
2019-09-27 16:54:49 +00:00
|
|
|
// outputDagnode sends dagnode info over the output channel.
|
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
|
|
|
// Cluster: we use api.AddedOutput instead of coreiface events
|
2019-12-17 22:01:25 +00:00
|
|
|
// and make this an adder method to be be able to prefix.
|
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
|
|
|
func (adder *Adder) outputDagnode(out chan api.AddedOutput, name string, dn ipld.Node) error {
|
2018-02-28 22:49:03 +00:00
|
|
|
if out == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
s, err := dn.Size()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-12-17 23:24:16 +00:00
|
|
|
// When adding things in a folder: "OutputPrefix/name"
|
|
|
|
// When adding a single file: "OutputPrefix" (name is unset)
|
|
|
|
// When adding a single thing with no name: ""
|
|
|
|
// Note: ipfs sets the name of files received on stdin to the CID,
|
|
|
|
// but cluster does not support stdin-adding so we do not
|
|
|
|
// account for this here.
|
|
|
|
name = filepath.Join(adder.OutputPrefix, name)
|
|
|
|
|
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
|
|
|
out <- api.AddedOutput{
|
2022-04-07 11:53:30 +00:00
|
|
|
Cid: api.NewCid(dn.Cid()),
|
2022-03-14 14:45:51 +00:00
|
|
|
Name: name,
|
|
|
|
Size: s,
|
|
|
|
Allocations: adder.allocsFun(),
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type progressReader struct {
|
2018-12-01 04:58:55 +00:00
|
|
|
file io.Reader
|
|
|
|
path string
|
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
|
|
|
out chan api.AddedOutput
|
2018-02-28 22:49:03 +00:00
|
|
|
bytes int64
|
|
|
|
lastProgress int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *progressReader) Read(p []byte) (int, error) {
|
|
|
|
n, err := i.file.Read(p)
|
|
|
|
|
|
|
|
i.bytes += int64(n)
|
|
|
|
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
|
|
|
|
i.lastProgress = i.bytes
|
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
|
|
|
i.out <- api.AddedOutput{
|
2018-12-01 04:58:55 +00:00
|
|
|
Name: i.path,
|
2018-10-03 21:03:30 +00:00
|
|
|
Bytes: uint64(i.bytes),
|
2018-02-28 22:49:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
|
|
|
type progressReader2 struct {
|
|
|
|
*progressReader
|
|
|
|
files.FileInfo
|
|
|
|
}
|
2018-12-01 04:58:55 +00:00
|
|
|
|
|
|
|
func (i *progressReader2) Read(p []byte) (int, error) {
|
|
|
|
return i.progressReader.Read(p)
|
|
|
|
}
|