From 072f0bc7221ac74478c3a5afde4530d15216a36e Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 19 Apr 2021 19:28:03 +0200 Subject: [PATCH] Feat: support adding CAR files This commit adds a new add option: "format". This option specifies how IPFS Cluster is expected to build the DAG when adding content. By default, it takes a "unixfs", which chunks and DAG-ifies as it did before, resulting in a UnixFSv1 DAG. Alternatively, it can be set to "car". In this case, Cluster will directly read blocks from the CAR file and add them. Adding CAR files or doing normal processing is independent from letting cluster do sharding or not. If sharding is ever enabled, Cluster could potentially shard a large CAR file among peers. Currently, importing CAR files is limited to a single CAR file with a single root (the one that is pinned). Future iterations may support multiple CARs and/or multiple roots by transparently wrapping them. --- adder/adder.go | 215 +++++++++++++++++++++++------ adder/adder_test.go | 93 +++++++++---- adder/sharding/dag.go | 4 +- adder/sharding/dag_service_test.go | 36 +++-- api/add.go | 66 ++++++--- api/rest/client/methods_test.go | 10 +- cmd/ipfs-cluster-ctl/main.go | 110 ++++++++------- go.mod | 1 + go.sum | 11 ++ test/sharding.go | 77 +++++++++++ 10 files changed, 467 insertions(+), 156 deletions(-) diff --git a/adder/adder.go b/adder/adder.go index 259be86f..9f1c2b50 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -4,15 +4,19 @@ package adder import ( "context" + "errors" "fmt" + "io" "mime/multipart" "strings" "github.com/ipfs/ipfs-cluster/adder/ipfsadd" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipld/go-car" cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" + cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" merkledag "github.com/ipfs/go-merkledag" @@ -21,6 +25,14 @@ import ( var logger = logging.Logger("adder") +// go-merkledag does this, but it may be moved. +// We include for explicitness. +func init() { + ipld.Register(cid.DagProtobuf, merkledag.DecodeProtobufBlock) + ipld.Register(cid.Raw, merkledag.DecodeRawBlock) + ipld.Register(cid.DagCBOR, cbor.DecodeBlock) +} + // ClusterDAGService is an implementation of ipld.DAGService plus a Finalize // method. ClusterDAGServices can be used to provide Adders with a different // add implementation. @@ -31,6 +43,12 @@ type ClusterDAGService interface { Finalize(ctx context.Context, ipfsRoot cid.Cid) (cid.Cid, error) } +// A dagFormatter can create dags from files.Node. It can keep state +// to add several files to the same dag. +type dagFormatter interface { + Add(name string, f files.Node) (cid.Cid, error) +} + // Adder is used to add content to IPFS Cluster using an implementation of // ClusterDAGService. type Adder struct { @@ -104,33 +122,21 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro defer a.cancel() defer close(a.output) - ipfsAdder, err := ipfsadd.NewAdder(a.ctx, a.dgs) + var dagFmtr dagFormatter + var err error + switch a.params.Format { + case "", "unixfs": + dagFmtr, err = newIpfsAdder(ctx, a.dgs, a.params, a.output) + + case "car": + dagFmtr, err = newCarAdder(ctx, a.dgs, a.params, a.output) + default: + err = errors.New("bad dag formatter option") + } if err != nil { - logger.Error(err) return cid.Undef, err } - ipfsAdder.Trickle = a.params.Layout == "trickle" - ipfsAdder.RawLeaves = a.params.RawLeaves - ipfsAdder.Chunker = a.params.Chunker - ipfsAdder.Out = a.output - ipfsAdder.Progress = a.params.Progress - ipfsAdder.NoCopy = a.params.NoCopy - - // Set up prefix - prefix, err := merkledag.PrefixForCidVersion(a.params.CidVersion) - if err != nil { - return cid.Undef, fmt.Errorf("bad CID Version: %s", err) - } - - hashFunCode, ok := multihash.Names[strings.ToLower(a.params.HashFun)] - if !ok { - return cid.Undef, fmt.Errorf("unrecognized hash function: %s", a.params.HashFun) - } - prefix.MhType = hashFunCode - prefix.MhLength = -1 - ipfsAdder.CidBuilder = &prefix - // setup wrapping if a.params.Wrap { f = files.NewSliceDirectory( @@ -139,42 +145,31 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro } it := f.Entries() - var adderRoot ipld.Node + var adderRoot cid.Cid for it.Next() { - // In order to set the AddedOutput names right, we use - // OutputPrefix: - // - // When adding a folder, this is the root folder name which is - // prepended to the addedpaths. When adding a single file, - // this is the name of the file which overrides the empty - // AddedOutput name. - // - // After coreunix/add.go was refactored in go-ipfs and we - // followed suit, it no longer receives the name of the - // file/folder being added and does not emit AddedOutput - // events with the right names. We addressed this by adding - // OutputPrefix to our version. go-ipfs modifies emitted - // events before sending to user). - ipfsAdder.OutputPrefix = it.Name() - select { case <-a.ctx.Done(): return cid.Undef, a.ctx.Err() default: logger.Debugf("ipfsAdder AddFile(%s)", it.Name()) - adderRoot, err = ipfsAdder.AddAllAndPin(it.Node()) + adderRoot, err = dagFmtr.Add(it.Name(), it.Node()) if err != nil { logger.Error("error adding to cluster: ", err) return cid.Undef, err } } + // TODO (hector): We can only add a single CAR file for the + // moment. + if a.params.Format == "car" { + break + } } if it.Err() != nil { return cid.Undef, it.Err() } - clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot.Cid()) + clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot) if err != nil { logger.Error("error finalizing adder:", err) return cid.Undef, err @@ -182,3 +177,139 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro logger.Infof("%s successfully added to cluster", clusterRoot) return clusterRoot, nil } + +// A wrapper around the ipfsadd.Adder to satisfy the dagFormatter interface. +type ipfsAdder struct { + *ipfsadd.Adder +} + +func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) { + iadder, err := ipfsadd.NewAdder(ctx, dgs) + if err != nil { + logger.Error(err) + return nil, err + } + + iadder.Trickle = params.Layout == "trickle" + iadder.RawLeaves = params.RawLeaves + iadder.Chunker = params.Chunker + iadder.Out = out + iadder.Progress = params.Progress + iadder.NoCopy = params.NoCopy + + // Set up prefi + prefix, err := merkledag.PrefixForCidVersion(params.CidVersion) + if err != nil { + return nil, fmt.Errorf("bad CID Version: %s", err) + } + + hashFunCode, ok := multihash.Names[strings.ToLower(params.HashFun)] + if !ok { + return nil, fmt.Errorf("unrecognized hash function: %s", params.HashFun) + } + prefix.MhType = hashFunCode + prefix.MhLength = -1 + iadder.CidBuilder = &prefix + return &ipfsAdder{ + Adder: iadder, + }, nil +} + +func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) { + // In order to set the AddedOutput names right, we use + // OutputPrefix: + // + // When adding a folder, this is the root folder name which is + // prepended to the addedpaths. When adding a single file, + // this is the name of the file which overrides the empty + // AddedOutput name. + // + // After coreunix/add.go was refactored in go-ipfs and we + // followed suit, it no longer receives the name of the + // file/folder being added and does not emit AddedOutput + // events with the right names. We addressed this by adding + // OutputPrefix to our version. go-ipfs modifies emitted + // events before sending to user). + ia.OutputPrefix = name + + nd, err := ia.AddAllAndPin(f) + if err != nil { + return cid.Undef, err + } + return nd.Cid(), nil +} + +// An adder to add CAR files. It is at the moment very basic, and can +// add a single CAR file with a single root. Ideally, it should be able to +// add more complex, or several CARs by wrapping them with a single root. +// But for that we would need to keep state and track an MFS root similarly to +// what the ipfsadder does. +type carAdder struct { + ctx context.Context + dgs ClusterDAGService + params *api.AddParams + output chan *api.AddedOutput +} + +func newCarAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*carAdder, error) { + return &carAdder{ + ctx: ctx, + dgs: dgs, + params: params, + output: out, + }, nil +} + +// Add takes a node which should be a CAR file and nothing else and +// adds its blocks using the ClusterDAGService. +func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { + if ca.params.Wrap { + return cid.Undef, errors.New("cannot wrap a CAR file upload") + } + + f, ok := fn.(files.File) + if !ok { + return cid.Undef, errors.New("expected CAR file is not of type file") + } + carReader, err := car.NewCarReader(f) + if err != nil { + return cid.Undef, err + } + + if len(carReader.Header.Roots) != 1 { + return cid.Undef, errors.New("only CAR files with a single root are supported") + } + + root := carReader.Header.Roots[0] + bytes := uint64(0) + + for { + block, err := carReader.Next() + if err != nil && err != io.EOF { + return cid.Undef, err + } else if block == nil { + break + } + + bytes += uint64(len(block.RawData())) + + nd, err := ipld.Decode(block) + if err != nil { + return cid.Undef, err + } + + err = ca.dgs.Add(ca.ctx, nd) + if err != nil { + return cid.Undef, err + } + } + + ca.output <- &api.AddedOutput{ + Name: name, + Cid: root, + Bytes: bytes, + Size: 0, + } + + return root, nil +} diff --git a/adder/adder_test.go b/adder/adder_test.go index 0cfb1d20..a662266b 100644 --- a/adder/adder_test.go +++ b/adder/adder_test.go @@ -1,6 +1,7 @@ package adder import ( + "bytes" "context" "mime/multipart" "sync" @@ -9,32 +10,23 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" + "github.com/ipld/go-car" cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" - ipld "github.com/ipfs/go-ipld-format" ) type mockCDAGServ struct { - BaseDAGService - resultCids map[string]struct{} + *test.MockDAGService } -func (dag *mockCDAGServ) Add(ctx context.Context, node ipld.Node) error { - dag.resultCids[node.Cid().String()] = struct{}{} - return nil -} - -func (dag *mockCDAGServ) AddMany(ctx context.Context, nodes []ipld.Node) error { - for _, node := range nodes { - err := dag.Add(ctx, node) - if err != nil { - return err - } +func newMockCDAGServ() *mockCDAGServ { + return &mockCDAGServ{ + MockDAGService: test.NewMockDAGService(), } - return nil } +// noop func (dag *mockCDAGServ) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { return root, nil } @@ -49,9 +41,7 @@ func TestAdder(t *testing.T) { p := api.DefaultAddParams() expectedCids := test.ShardingDirCids[:] - dags := &mockCDAGServ{ - resultCids: make(map[string]struct{}), - } + dags := newMockCDAGServ() adder := New(dags, p, nil) @@ -64,12 +54,13 @@ func TestAdder(t *testing.T) { t.Error("expected the right content root") } - if len(expectedCids) != len(dags.resultCids) { + if len(expectedCids) != len(dags.Nodes) { t.Fatal("unexpected number of blocks imported") } for _, c := range expectedCids { - _, ok := dags.resultCids[c] + ci, _ := cid.Decode(c) + _, ok := dags.Nodes[ci] if !ok { t.Fatal("unexpected block emitted:", c) } @@ -83,9 +74,7 @@ func TestAdder_DoubleStart(t *testing.T) { f := sth.GetTreeSerialFile(t) p := api.DefaultAddParams() - dags := &mockCDAGServ{ - resultCids: make(map[string]struct{}), - } + dags := newMockCDAGServ() adder := New(dags, p, nil) _, err := adder.FromFiles(context.Background(), f) @@ -121,9 +110,7 @@ func TestAdder_ContextCancelled(t *testing.T) { p := api.DefaultAddParams() - dags := &mockCDAGServ{ - resultCids: make(map[string]struct{}), - } + dags := newMockCDAGServ() ctx, cancel := context.WithCancel(context.Background()) adder := New(dags, p, nil) @@ -142,3 +129,57 @@ func TestAdder_ContextCancelled(t *testing.T) { cancel() wg.Wait() } + +func TestAdder_CAR(t *testing.T) { + // prepare a CAR file + ctx := context.Background() + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + + mr, closer := sth.GetTreeMultiReader(t) + defer closer.Close() + r := multipart.NewReader(mr, mr.Boundary()) + p := api.DefaultAddParams() + dags := newMockCDAGServ() + adder := New(dags, p, nil) + root, err := adder.FromMultipart(ctx, r) + if err != nil { + t.Fatal(err) + } + var carBuf bytes.Buffer + err = car.WriteCar(ctx, dags, []cid.Cid{root}, &carBuf) + if err != nil { + t.Fatal(err) + } + + // Make the CAR look like a multipart. + carFile := files.NewReaderFile(&carBuf) + carDir := files.NewMapDirectory( + map[string]files.Node{"": carFile}, + ) + carMf := files.NewMultiFileReader(carDir, true) + carMr := multipart.NewReader(carMf, carMf.Boundary()) + + // Add the car, discarding old dags. + dags = newMockCDAGServ() + p.Format = "car" + adder = New(dags, p, nil) + root2, err := adder.FromMultipart(ctx, carMr) + if err != nil { + t.Fatal(err) + } + + if !root.Equals(root2) { + t.Error("Imported CAR file does not have expected root") + } + + expectedCids := test.ShardingDirCids[:] + for _, c := range expectedCids { + ci, _ := cid.Decode(c) + _, ok := dags.Nodes[ci] + if !ok { + t.Fatal("unexpected block extracted from CAR:", c) + } + } + +} diff --git a/adder/sharding/dag.go b/adder/sharding/dag.go index f265d1ff..bd5bdae5 100644 --- a/adder/sharding/dag.go +++ b/adder/sharding/dag.go @@ -28,10 +28,12 @@ import ( mh "github.com/multiformats/go-multihash" ) +// go-merkledag does this, but it may be moved. +// We include for explicitness. func init() { ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock) ipld.Register(cid.Raw, dag.DecodeRawBlock) - ipld.Register(cid.DagCBOR, cbor.DecodeBlock) // need to decode CBOR + ipld.Register(cid.DagCBOR, cbor.DecodeBlock) } // MaxLinks is the max number of links that, when serialized fit into a block diff --git a/adder/sharding/dag_service_test.go b/adder/sharding/dag_service_test.go index b60d3461..d0de0e77 100644 --- a/adder/sharding/dag_service_test.go +++ b/adder/sharding/dag_service_test.go @@ -197,11 +197,13 @@ func TestFromMultipart_Errors(t *testing.T) { { name: "bad chunker", params: &api.AddParams{ - Layout: "", - Chunker: "aweee", - RawLeaves: false, - Hidden: false, - Shard: true, + Format: "", + IPFSAddParams: api.IPFSAddParams{ + Chunker: "aweee", + RawLeaves: false, + }, + Hidden: false, + Shard: true, PinOptions: api.PinOptions{ ReplicationFactorMin: -1, ReplicationFactorMax: -1, @@ -213,11 +215,13 @@ func TestFromMultipart_Errors(t *testing.T) { { name: "shard size too small", params: &api.AddParams{ - Layout: "", - Chunker: "", - RawLeaves: false, - Hidden: false, - Shard: true, + Format: "", + IPFSAddParams: api.IPFSAddParams{ + Chunker: "", + RawLeaves: false, + }, + Hidden: false, + Shard: true, PinOptions: api.PinOptions{ ReplicationFactorMin: -1, ReplicationFactorMax: -1, @@ -229,11 +233,13 @@ func TestFromMultipart_Errors(t *testing.T) { { name: "replication too high", params: &api.AddParams{ - Layout: "", - Chunker: "", - RawLeaves: false, - Hidden: false, - Shard: true, + Format: "", + IPFSAddParams: api.IPFSAddParams{ + Chunker: "", + RawLeaves: false, + }, + Hidden: false, + Shard: true, PinOptions: api.PinOptions{ ReplicationFactorMin: 2, ReplicationFactorMax: 3, diff --git a/api/add.go b/api/add.go index ab34cc29..4653d964 100644 --- a/api/add.go +++ b/api/add.go @@ -21,6 +21,18 @@ type AddedOutput struct { Size uint64 `json:"size,omitempty" codec:"s,omitempty"` } +// IPFSAddParams groups options specific to the ipfs-adder, which builds +// UnixFS dags with the input files. This struct is embedded in AddParams. +type IPFSAddParams struct { + Layout string + Chunker string + RawLeaves bool + Progress bool + CidVersion int + HashFun string + NoCopy bool +} + // AddParams contains all of the configurable parameters needed to specify the // importing process of a file being added to an ipfs-cluster type AddParams struct { @@ -28,35 +40,28 @@ type AddParams struct { Local bool Recursive bool - Layout string - Chunker string - RawLeaves bool Hidden bool Wrap bool Shard bool - Progress bool - CidVersion int - HashFun string StreamChannels bool - NoCopy bool + Format string // selects with adder + + IPFSAddParams } // DefaultAddParams returns a AddParams object with standard defaults func DefaultAddParams() *AddParams { return &AddParams{ - Local: false, - Recursive: false, - Layout: "", // corresponds to balanced layout - Chunker: "size-262144", - RawLeaves: false, - Hidden: false, - Wrap: false, - Shard: false, - Progress: false, - CidVersion: 0, - HashFun: "sha2-256", + Local: false, + Recursive: false, + + Hidden: false, + Wrap: false, + Shard: false, + StreamChannels: true, - NoCopy: false, + + Format: "unixfs", PinOptions: PinOptions{ ReplicationFactorMin: 0, ReplicationFactorMax: 0, @@ -65,6 +70,15 @@ func DefaultAddParams() *AddParams { ShardSize: DefaultShardSize, Metadata: make(map[string]string), }, + IPFSAddParams: IPFSAddParams{ + Layout: "", // corresponds to balanced layout + Chunker: "size-262144", + RawLeaves: false, + Progress: false, + CidVersion: 0, + HashFun: "sha2-256", + NoCopy: false, + }, } } @@ -108,7 +122,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { case "trickle", "balanced", "": // nothing default: - return nil, errors.New("layout parameter invalid") + return nil, errors.New("layout parameter is invalid") } params.Layout = layout @@ -122,6 +136,14 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { params.HashFun = hashF } + format := query.Get("format") + switch format { + case "car", "unixfs", "": + default: + return nil, errors.New("format parameter is invalid") + } + params.Format = format + err = parseBoolParam(query, "local", ¶ms.Local) if err != nil { return nil, err @@ -195,6 +217,7 @@ func (p *AddParams) ToQueryString() (string, error) { query.Set("hash", p.HashFun) query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels)) query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy)) + query.Set("format", p.Format) return query.Encode(), nil } @@ -212,5 +235,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool { p.CidVersion == p2.CidVersion && p.HashFun == p2.HashFun && p.StreamChannels == p2.StreamChannels && - p.NoCopy == p2.NoCopy + p.NoCopy == p2.NoCopy && + p.Format == p2.Format } diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 00fae750..9405db94 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -618,10 +618,12 @@ func TestAddMultiFile(t *testing.T) { Name: "test something", ShardSize: 1024, }, - Shard: false, - Layout: "", - Chunker: "", - RawLeaves: false, + Shard: false, + Format: "", + IPFSAddParams: types.IPFSAddParams{ + Chunker: "", + RawLeaves: false, + }, Hidden: false, StreamChannels: true, } diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index ca1a6734..5de104f1 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -274,18 +274,19 @@ a Cluster Pin operation on success. It takes elements from local paths as well as from web URLs (accessed with a GET request). Providing several arguments will automatically set --wrap-in-directory. -Cluster Add is equivalent to "ipfs add" in terms of DAG building, and supports -the same options for adjusting the chunker, the DAG layout etc. However, -it will allocate the content and send it directly to the allocated peers (among -which may not necessarily be the local ipfs daemon). +Cluster "add" works, by default, just like "ipfs add" and has similar options +in terms of DAG layout, chunker, hash function etc. It also supports adding +CAR files directly (--format car), as long as they have a single root. When +adding CAR files, all the options related to dag-building are ignored. -Once the adding process is finished, the content is fully added to all -allocations and pinned in them. This makes cluster add slower than a local -ipfs add, but the result is a fully replicated CID on completion. -If you prefer faster adding, use the --local flag to add directly to the local -IPFS node and pin in the destinations after that. Note that the local IPFS -node may not be among the destinations, which will leave the unpinned content -in it. +Added content will be allocated and sent block by block to the peers that +should pin it (among which may not necessarily be the local ipfs daemon). +Once all the blocks have arrived, they will be "cluster-pinned". This makes +cluster add slower than a local ipfs add, but the result is a fully replicated +on completion. If you prefer faster adding, use the --local flag to add +directly to the local IPFS node and pin in the destinations after that. +Note that the local IPFS node may not be among the destinations, which will +leave the unpinned content in it. Optional replication-min and replication-max factors can be provided: -1 means "pin everywhere" and 0 means use cluster's default setting (i.e., replication @@ -318,38 +319,6 @@ content. Name: "no-stream", Usage: "Buffer output locally. Produces a valid JSON array with --enc=json.", }, - cli.StringFlag{ - Name: "layout", - Value: defaultAddParams.Layout, - Usage: "Dag layout to use for dag generation: balanced or trickle", - }, - cli.BoolFlag{ - Name: "wrap-with-directory, w", - Usage: "Wrap a with a directory object", - }, - cli.BoolFlag{ - Name: "hidden, H", - Usage: "Include files that are hidden. Only takes effect on recursive add", - }, - cli.StringFlag{ - Name: "chunker, s", - Usage: "'size-' or 'rabin---'", - Value: defaultAddParams.Chunker, - }, - cli.BoolFlag{ - Name: "raw-leaves", - Usage: "Use raw blocks for leaves (experimental)", - }, - cli.IntFlag{ - Name: "cid-version", - Usage: "CID version. Non default implies raw-leaves", - Value: defaultAddParams.CidVersion, - }, - cli.StringFlag{ - Name: "hash", - Usage: "Hash function to use. Implies cid-version=1", - Value: defaultAddParams.HashFun, - }, cli.BoolFlag{ Name: "local", Usage: "Add to local peer but pin normally", @@ -381,10 +350,6 @@ content. Name: "allocations, allocs", Usage: "Optional comma-separated list of peer IDs", }, - cli.BoolFlag{ - Name: "nocopy", - Usage: "Add the URL using filestore. Implies raw-leaves. (experimental)", - }, cli.BoolFlag{ Name: "wait", Usage: "Wait for all nodes to report a status of pinned before returning", @@ -394,6 +359,51 @@ content. Value: 0, Usage: "How long to --wait (in seconds), default is indefinitely", }, + + cli.BoolFlag{ + Name: "wrap-with-directory, w", + Usage: "Wrap a with a directory object", + }, + + cli.StringFlag{ + Name: "format", + Value: defaultAddParams.Format, + Usage: "'unixfs' (add as unixfs DAG), 'car' (import CAR file)", + }, + + cli.StringFlag{ + Name: "layout", + Value: defaultAddParams.Layout, + Usage: "Dag layout to use for dag generation: balanced or trickle", + }, + cli.BoolFlag{ + Name: "hidden, H", + Usage: "Include files that are hidden. Only takes effect on recursive add", + }, + cli.StringFlag{ + Name: "chunker, s", + Usage: "'size-' or 'rabin---'", + Value: defaultAddParams.Chunker, + }, + cli.BoolFlag{ + Name: "raw-leaves", + Usage: "Use raw blocks for leaves (experimental)", + }, + cli.IntFlag{ + Name: "cid-version", + Usage: "CID version. Non default implies raw-leaves", + Value: defaultAddParams.CidVersion, + }, + cli.StringFlag{ + Name: "hash", + Usage: "Hash function to use. Implies cid-version=1", + Value: defaultAddParams.HashFun, + }, + cli.BoolFlag{ + Name: "nocopy", + Usage: "Add the URL using filestore. Implies raw-leaves. (experimental)", + }, + // TODO: Uncomment when sharding is supported. // cli.BoolFlag{ // Name: "shard", @@ -448,6 +458,7 @@ content. if c.String("allocations") != "" { p.UserAllocations = api.StringsToPeers(strings.Split(c.String("allocations"), ",")) } + p.Format = c.String("format") //p.Shard = shard //p.ShardSize = c.Uint64("shard-size") p.Shard = false @@ -471,6 +482,11 @@ content. p.RawLeaves = true } + // Prevent footgun + if p.Wrap && p.Format == "car" { + checkErr("", errors.New("only a single CAR file can be added and wrap-with-directory is not supported")) + } + out := make(chan *api.AddedOutput, 1) var wg sync.WaitGroup wg.Add(1) diff --git a/go.mod b/go.mod index b89c5387..f3a4a1dc 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/ipfs/go-mfs v0.1.2 github.com/ipfs/go-path v0.0.9 github.com/ipfs/go-unixfs v0.2.4 + github.com/ipld/go-car v0.2.2 github.com/kelseyhightower/envconfig v1.4.0 github.com/kishansagathiya/go-dot v0.1.0 github.com/lanzafame/go-libp2p-ocgorpc v0.1.1 diff --git a/go.sum b/go.sum index 52e1ca87..b91d13c3 100644 --- a/go.sum +++ b/go.sum @@ -392,6 +392,7 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.1/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= github.com/ipfs/go-blockservice v0.1.2/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= +github.com/ipfs/go-blockservice v0.1.3/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU= github.com/ipfs/go-blockservice v0.1.4 h1:Vq+MlsH8000KbbUciRyYMEw/NNP8UAGmcqKi4uWmFGA= github.com/ipfs/go-blockservice v0.1.4/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -507,6 +508,7 @@ github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKy github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= github.com/ipfs/go-merkledag v0.3.0/go.mod h1:4pymaZLhSLNVuiCITYrpViD6vmfZ/Ws4n/L9tfNv3S4= +github.com/ipfs/go-merkledag v0.3.1/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M= github.com/ipfs/go-merkledag v0.3.2 h1:MRqj40QkrWkvPswXs4EfSslhZ4RVPRbxwX11js0t1xY= github.com/ipfs/go-merkledag v0.3.2/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= @@ -529,6 +531,12 @@ github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2 github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= github.com/ipfs/interface-go-ipfs-core v0.4.0 h1:+mUiamyHIwedqP8ZgbCIwpy40oX7QcXUbo4CZOeJVJg= github.com/ipfs/interface-go-ipfs-core v0.4.0/go.mod h1:UJBcU6iNennuI05amq3FQ7g0JHUkibHFAfhfUIy927o= +github.com/ipld/go-car v0.2.2 h1:Dq0Kl0XMaNMCNxATbYYu/EIgqWjo2w2W+Miu6npd20g= +github.com/ipld/go-car v0.2.2/go.mod h1:pPb7hzVBHBoRqU3GkPy1d6FZKQoGQ56yd3MyPGzZ0Xs= +github.com/ipld/go-ipld-prime v0.7.0 h1:eigF1ZpaL1prbsKYVMqPLoPJqD/pzkQOe2j1uzvVg7w= +github.com/ipld/go-ipld-prime v0.7.0/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= +github.com/ipld/go-ipld-prime-proto v0.1.1 h1:EX4yWYaIqSLwtVE30nxEcZDcvsWDtx1vImSG+XCJebY= +github.com/ipld/go-ipld-prime-proto v0.1.1/go.mod h1:cI9NwYAUKCLUwqufoUjChISxuTEkaY2yvNYCRCuhRck= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -536,6 +544,7 @@ github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs= github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= +github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c/go.mod h1:sdx1xVM9UuLw1tXnhJWN3piypTUO3vCIHYmG15KE/dU= github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= @@ -1236,6 +1245,7 @@ github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMI github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 h1:8kxMKmKzXXL4Ru1nyhvdms/JjWt+3YLpvRb/bAjO/y0= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= +github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 h1:WXhVOwj2USAXB5oMDwRl3piOux2XMV9TANaYxXHdkoE= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= @@ -1317,6 +1327,7 @@ golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/test/sharding.go b/test/sharding.go index c243e15d..5f85b0a4 100644 --- a/test/sharding.go +++ b/test/sharding.go @@ -1,14 +1,17 @@ package test import ( + "context" "encoding/hex" "io" "math/rand" "os" "path/filepath" + "sync" "testing" files "github.com/ipfs/go-ipfs-files" + format "github.com/ipfs/go-ipld-format" cid "github.com/ipfs/go-cid" ) @@ -277,3 +280,77 @@ func (sth *ShardingTestHelper) makeRandFile(t *testing.T, kbs int) os.FileInfo { return st } + +// MockDAGService implements an in-memory DAGService. The stored nodes are +// inspectable via the Nodes map. +type MockDAGService struct { + mu sync.Mutex + Nodes map[cid.Cid]format.Node +} + +// NewMockDAGService returns an in-memory DAG Service. +func NewMockDAGService() *MockDAGService { + return &MockDAGService{Nodes: make(map[cid.Cid]format.Node)} +} + +// Get reads a node. +func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) { + d.mu.Lock() + defer d.mu.Unlock() + if n, ok := d.Nodes[cid]; ok { + return n, nil + } + return nil, format.ErrNotFound +} + +// GetMany reads many nodes. +func (d *MockDAGService) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption { + d.mu.Lock() + defer d.mu.Unlock() + out := make(chan *format.NodeOption, len(cids)) + for _, c := range cids { + if n, ok := d.Nodes[c]; ok { + out <- &format.NodeOption{Node: n} + } else { + out <- &format.NodeOption{Err: format.ErrNotFound} + } + } + close(out) + return out +} + +// Add adds a node. +func (d *MockDAGService) Add(ctx context.Context, node format.Node) error { + d.mu.Lock() + defer d.mu.Unlock() + d.Nodes[node.Cid()] = node + return nil +} + +// AddMany adds many nodes. +func (d *MockDAGService) AddMany(ctx context.Context, nodes []format.Node) error { + d.mu.Lock() + defer d.mu.Unlock() + for _, n := range nodes { + d.Nodes[n.Cid()] = n + } + return nil +} + +// Remove deletes a node. +func (d *MockDAGService) Remove(ctx context.Context, c cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.Nodes, c) + return nil +} + +// RemoveMany removes many nodes. +func (d *MockDAGService) RemoveMany(ctx context.Context, cids []cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + for _, c := range cids { + delete(d.Nodes, c) + } + return nil +}