3575f05753
This was a long FIXME/TODO. Handling adding output and reporting to the client of the progress of the adding process. This attempts to do it. It is not sure that it works correctly (response body being written while the multipart request is still being read) License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
444 lines
9.1 KiB
Go
444 lines
9.1 KiB
Go
// Package ipfsadd is a simplified copy of go-ipfs/core/coreunix/add.go
|
|
package ipfsadd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
gopath "path"
|
|
"strconv"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-ipfs-chunker"
|
|
files "github.com/ipfs/go-ipfs-cmdkit/files"
|
|
posinfo "github.com/ipfs/go-ipfs-posinfo"
|
|
balanced "github.com/ipfs/go-ipfs/importer/balanced"
|
|
ihelper "github.com/ipfs/go-ipfs/importer/helpers"
|
|
trickle "github.com/ipfs/go-ipfs/importer/trickle"
|
|
dag "github.com/ipfs/go-ipfs/merkledag"
|
|
mfs "github.com/ipfs/go-ipfs/mfs"
|
|
unixfs "github.com/ipfs/go-ipfs/unixfs"
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
logging "github.com/ipfs/go-log"
|
|
)
|
|
|
|
var log = logging.Logger("coreunix")
|
|
|
|
// how many bytes of progress to wait before sending a progress update message
|
|
const progressReaderIncrement = 1024 * 256
|
|
|
|
var liveCacheSize = uint64(256 << 10)
|
|
|
|
type hiddenFileError struct {
|
|
fileName string
|
|
}
|
|
|
|
func (e *hiddenFileError) Error() string {
|
|
return fmt.Sprintf("%s is a hidden file", e.fileName)
|
|
}
|
|
|
|
type ignoreFileError struct {
|
|
fileName string
|
|
}
|
|
|
|
func (e *ignoreFileError) Error() string {
|
|
return fmt.Sprintf("%s is an ignored file", e.fileName)
|
|
}
|
|
|
|
// NewAdder Returns a new Adder used for a file add operation.
|
|
func NewAdder(ctx context.Context, ds ipld.DAGService) (*Adder, error) {
|
|
return &Adder{
|
|
ctx: ctx,
|
|
dagService: ds,
|
|
Progress: false,
|
|
Hidden: true,
|
|
Trickle: false,
|
|
Wrap: false,
|
|
Chunker: "",
|
|
}, nil
|
|
}
|
|
|
|
// Adder holds the switches passed to the `add` command.
|
|
type Adder struct {
|
|
ctx context.Context
|
|
dagService ipld.DAGService
|
|
Out chan *api.AddedOutput
|
|
Progress bool
|
|
Hidden bool
|
|
Trickle bool
|
|
RawLeaves bool
|
|
Silent bool
|
|
Wrap bool
|
|
NoCopy bool
|
|
Chunker string
|
|
root ipld.Node
|
|
mroot *mfs.Root
|
|
tempRoot *cid.Cid
|
|
Prefix *cid.Prefix
|
|
liveNodes uint64
|
|
}
|
|
|
|
func (adder *Adder) mfsRoot() (*mfs.Root, error) {
|
|
if adder.mroot != nil {
|
|
return adder.mroot, nil
|
|
}
|
|
rnode := unixfs.EmptyDirNode()
|
|
rnode.SetPrefix(adder.Prefix)
|
|
mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
adder.mroot = mr
|
|
return adder.mroot, nil
|
|
}
|
|
|
|
// SetMfsRoot sets `r` as the root for Adder.
|
|
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
|
|
adder.mroot = r
|
|
}
|
|
|
|
// Constructs a node from reader's data, and adds it. Doesn't pin.
|
|
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
|
|
chnk, err := chunk.FromString(reader, adder.Chunker)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
params := ihelper.DagBuilderParams{
|
|
Dagserv: adder.dagService,
|
|
RawLeaves: adder.RawLeaves,
|
|
Maxlinks: ihelper.DefaultLinksPerBlock,
|
|
NoCopy: adder.NoCopy,
|
|
Prefix: adder.Prefix,
|
|
}
|
|
|
|
if adder.Trickle {
|
|
return trickle.Layout(params.New(chnk))
|
|
}
|
|
|
|
return balanced.Layout(params.New(chnk))
|
|
}
|
|
|
|
// RootNode returns the root node of the Added.
|
|
func (adder *Adder) RootNode() (ipld.Node, error) {
|
|
// for memoizing
|
|
if adder.root != nil {
|
|
return adder.root, nil
|
|
}
|
|
|
|
mr, err := adder.mfsRoot()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
root, err := mr.GetValue().GetNode()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if not wrapping, AND one root file, use that hash as root.
|
|
if !adder.Wrap && len(root.Links()) == 1 {
|
|
nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
root = nd
|
|
}
|
|
|
|
adder.root = root
|
|
return root, err
|
|
}
|
|
|
|
// Finalize flushes the mfs root directory and returns the mfs root node.
|
|
func (adder *Adder) Finalize() (ipld.Node, error) {
|
|
mr, err := adder.mfsRoot()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
root := mr.GetValue()
|
|
|
|
err = root.Flush()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var name string
|
|
if !adder.Wrap {
|
|
children, err := root.(*mfs.Directory).ListNames(adder.ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(children) == 0 {
|
|
return nil, fmt.Errorf("expected at least one child dir, got none")
|
|
}
|
|
|
|
name = children[0]
|
|
|
|
mr, err := adder.mfsRoot()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dir, ok := mr.GetValue().(*mfs.Directory)
|
|
if !ok {
|
|
return nil, fmt.Errorf("root is not a directory")
|
|
}
|
|
|
|
root, err = dir.Child(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
err = adder.outputDirs(name, root)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = mr.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return root.GetNode()
|
|
}
|
|
|
|
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
|
|
switch fsn := fsn.(type) {
|
|
case *mfs.File:
|
|
return nil
|
|
case *mfs.Directory:
|
|
names, err := fsn.ListNames(adder.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, name := range names {
|
|
child, err := fsn.Child(name)
|
|
if err != nil {
|
|
// This fails when Child is of type *mfs.File
|
|
// because it tries to get them from the DAG
|
|
// service (does not implement this and returns
|
|
// a "not found" error)
|
|
// *mfs.Files are ignored in the recursive call
|
|
// anyway.
|
|
// For Cluster, we just ignore errors here.
|
|
continue
|
|
}
|
|
|
|
childpath := gopath.Join(path, name)
|
|
err = adder.outputDirs(childpath, child)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fsn.Uncache(name)
|
|
}
|
|
nd, err := fsn.GetNode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return outputDagnode(adder.ctx, adder.Out, path, nd)
|
|
default:
|
|
return fmt.Errorf("unrecognized fsn type: %#v", fsn)
|
|
}
|
|
}
|
|
|
|
func (adder *Adder) addNode(node ipld.Node, path string) error {
|
|
// patch it into the root
|
|
if path == "" {
|
|
path = node.Cid().String()
|
|
}
|
|
|
|
if pi, ok := node.(*posinfo.FilestoreNode); ok {
|
|
node = pi.Node
|
|
}
|
|
|
|
mr, err := adder.mfsRoot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dir := gopath.Dir(path)
|
|
if dir != "." {
|
|
opts := mfs.MkdirOpts{
|
|
Mkparents: true,
|
|
Flush: false,
|
|
Prefix: adder.Prefix,
|
|
}
|
|
if err := mfs.Mkdir(mr, dir, opts); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := mfs.PutNode(mr, path, node); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !adder.Silent {
|
|
return outputDagnode(adder.ctx, adder.Out, path, node)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddFile adds a file to the dagservice and outputs through the outchan
|
|
func (adder *Adder) AddFile(file files.File) error {
|
|
if adder.liveNodes >= liveCacheSize {
|
|
// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
|
|
mr, err := adder.mfsRoot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := mr.FlushMemFree(adder.ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
adder.liveNodes = 0
|
|
}
|
|
adder.liveNodes++
|
|
|
|
if file.IsDirectory() {
|
|
return adder.addDir(file)
|
|
}
|
|
|
|
// case for symlink
|
|
if s, ok := file.(*files.Symlink); ok {
|
|
sdata, err := unixfs.SymlinkData(s.Target)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dagnode := dag.NodeWithData(sdata)
|
|
dagnode.SetPrefix(adder.Prefix)
|
|
err = adder.dagService.Add(adder.ctx, dagnode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return adder.addNode(dagnode, s.FileName())
|
|
}
|
|
|
|
// case for regular file
|
|
// if the progress flag was specified, wrap the file so that we can send
|
|
// progress updates to the client (over the output channel)
|
|
var reader io.Reader = file
|
|
if adder.Progress {
|
|
rdr := &progressReader{file: file, out: adder.Out}
|
|
if fi, ok := file.(files.FileInfo); ok {
|
|
reader = &progressReader2{rdr, fi}
|
|
} else {
|
|
reader = rdr
|
|
}
|
|
}
|
|
|
|
dagnode, err := adder.add(reader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// patch it into the root
|
|
return adder.addNode(dagnode, file.FileName())
|
|
}
|
|
|
|
func (adder *Adder) addDir(dir files.File) error {
|
|
log.Infof("adding directory: %s", dir.FileName())
|
|
|
|
mr, err := adder.mfsRoot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
|
|
Mkparents: true,
|
|
Flush: false,
|
|
Prefix: adder.Prefix,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
file, err := dir.NextFile()
|
|
if err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
if file == nil {
|
|
break
|
|
}
|
|
|
|
// Skip hidden files when adding recursively, unless Hidden is enabled.
|
|
if files.IsHidden(file) && !adder.Hidden {
|
|
log.Infof("%s is hidden, skipping", file.FileName())
|
|
continue
|
|
}
|
|
err = adder.AddFile(file)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// outputDagnode sends dagnode info over the output channel
|
|
func outputDagnode(ctx context.Context, out chan *api.AddedOutput, name string, dn ipld.Node) error {
|
|
// Ugly,but I don't want my program to crash because
|
|
// this is trying to write to close channels.
|
|
defer func() {
|
|
recover()
|
|
}()
|
|
|
|
if out == nil {
|
|
return nil
|
|
}
|
|
|
|
c := dn.Cid()
|
|
s, err := dn.Size()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// This thing keeps trying to print
|
|
// even when importing is cancelled.
|
|
// Panics on closed channel.
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
out <- &api.AddedOutput{
|
|
Hash: c.String(),
|
|
Name: name,
|
|
Size: strconv.FormatUint(s, 10),
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type progressReader struct {
|
|
file files.File
|
|
out chan *api.AddedOutput
|
|
bytes int64
|
|
lastProgress int64
|
|
}
|
|
|
|
func (i *progressReader) Read(p []byte) (int, error) {
|
|
n, err := i.file.Read(p)
|
|
|
|
i.bytes += int64(n)
|
|
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
|
|
i.lastProgress = i.bytes
|
|
i.out <- &api.AddedOutput{
|
|
Name: i.file.FileName(),
|
|
Bytes: i.bytes,
|
|
}
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
type progressReader2 struct {
|
|
*progressReader
|
|
files.FileInfo
|
|
}
|