diff --git a/adder/adder.go b/adder/adder.go index 259be86f..9f1c2b50 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -4,15 +4,19 @@ package adder import ( "context" + "errors" "fmt" + "io" "mime/multipart" "strings" "github.com/ipfs/ipfs-cluster/adder/ipfsadd" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipld/go-car" cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" + cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" merkledag "github.com/ipfs/go-merkledag" @@ -21,6 +25,14 @@ import ( var logger = logging.Logger("adder") +// go-merkledag does this, but it may be moved. +// We include for explicitness. +func init() { + ipld.Register(cid.DagProtobuf, merkledag.DecodeProtobufBlock) + ipld.Register(cid.Raw, merkledag.DecodeRawBlock) + ipld.Register(cid.DagCBOR, cbor.DecodeBlock) +} + // ClusterDAGService is an implementation of ipld.DAGService plus a Finalize // method. ClusterDAGServices can be used to provide Adders with a different // add implementation. @@ -31,6 +43,12 @@ type ClusterDAGService interface { Finalize(ctx context.Context, ipfsRoot cid.Cid) (cid.Cid, error) } +// A dagFormatter can create dags from files.Node. It can keep state +// to add several files to the same dag. +type dagFormatter interface { + Add(name string, f files.Node) (cid.Cid, error) +} + // Adder is used to add content to IPFS Cluster using an implementation of // ClusterDAGService. type Adder struct { @@ -104,33 +122,21 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro defer a.cancel() defer close(a.output) - ipfsAdder, err := ipfsadd.NewAdder(a.ctx, a.dgs) + var dagFmtr dagFormatter + var err error + switch a.params.Format { + case "", "unixfs": + dagFmtr, err = newIpfsAdder(ctx, a.dgs, a.params, a.output) + + case "car": + dagFmtr, err = newCarAdder(ctx, a.dgs, a.params, a.output) + default: + err = errors.New("bad dag formatter option") + } if err != nil { - logger.Error(err) return cid.Undef, err } - ipfsAdder.Trickle = a.params.Layout == "trickle" - ipfsAdder.RawLeaves = a.params.RawLeaves - ipfsAdder.Chunker = a.params.Chunker - ipfsAdder.Out = a.output - ipfsAdder.Progress = a.params.Progress - ipfsAdder.NoCopy = a.params.NoCopy - - // Set up prefix - prefix, err := merkledag.PrefixForCidVersion(a.params.CidVersion) - if err != nil { - return cid.Undef, fmt.Errorf("bad CID Version: %s", err) - } - - hashFunCode, ok := multihash.Names[strings.ToLower(a.params.HashFun)] - if !ok { - return cid.Undef, fmt.Errorf("unrecognized hash function: %s", a.params.HashFun) - } - prefix.MhType = hashFunCode - prefix.MhLength = -1 - ipfsAdder.CidBuilder = &prefix - // setup wrapping if a.params.Wrap { f = files.NewSliceDirectory( @@ -139,42 +145,31 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro } it := f.Entries() - var adderRoot ipld.Node + var adderRoot cid.Cid for it.Next() { - // In order to set the AddedOutput names right, we use - // OutputPrefix: - // - // When adding a folder, this is the root folder name which is - // prepended to the addedpaths. When adding a single file, - // this is the name of the file which overrides the empty - // AddedOutput name. - // - // After coreunix/add.go was refactored in go-ipfs and we - // followed suit, it no longer receives the name of the - // file/folder being added and does not emit AddedOutput - // events with the right names. We addressed this by adding - // OutputPrefix to our version. go-ipfs modifies emitted - // events before sending to user). - ipfsAdder.OutputPrefix = it.Name() - select { case <-a.ctx.Done(): return cid.Undef, a.ctx.Err() default: logger.Debugf("ipfsAdder AddFile(%s)", it.Name()) - adderRoot, err = ipfsAdder.AddAllAndPin(it.Node()) + adderRoot, err = dagFmtr.Add(it.Name(), it.Node()) if err != nil { logger.Error("error adding to cluster: ", err) return cid.Undef, err } } + // TODO (hector): We can only add a single CAR file for the + // moment. + if a.params.Format == "car" { + break + } } if it.Err() != nil { return cid.Undef, it.Err() } - clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot.Cid()) + clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot) if err != nil { logger.Error("error finalizing adder:", err) return cid.Undef, err @@ -182,3 +177,139 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro logger.Infof("%s successfully added to cluster", clusterRoot) return clusterRoot, nil } + +// A wrapper around the ipfsadd.Adder to satisfy the dagFormatter interface. +type ipfsAdder struct { + *ipfsadd.Adder +} + +func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) { + iadder, err := ipfsadd.NewAdder(ctx, dgs) + if err != nil { + logger.Error(err) + return nil, err + } + + iadder.Trickle = params.Layout == "trickle" + iadder.RawLeaves = params.RawLeaves + iadder.Chunker = params.Chunker + iadder.Out = out + iadder.Progress = params.Progress + iadder.NoCopy = params.NoCopy + + // Set up prefi + prefix, err := merkledag.PrefixForCidVersion(params.CidVersion) + if err != nil { + return nil, fmt.Errorf("bad CID Version: %s", err) + } + + hashFunCode, ok := multihash.Names[strings.ToLower(params.HashFun)] + if !ok { + return nil, fmt.Errorf("unrecognized hash function: %s", params.HashFun) + } + prefix.MhType = hashFunCode + prefix.MhLength = -1 + iadder.CidBuilder = &prefix + return &ipfsAdder{ + Adder: iadder, + }, nil +} + +func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) { + // In order to set the AddedOutput names right, we use + // OutputPrefix: + // + // When adding a folder, this is the root folder name which is + // prepended to the addedpaths. When adding a single file, + // this is the name of the file which overrides the empty + // AddedOutput name. + // + // After coreunix/add.go was refactored in go-ipfs and we + // followed suit, it no longer receives the name of the + // file/folder being added and does not emit AddedOutput + // events with the right names. We addressed this by adding + // OutputPrefix to our version. go-ipfs modifies emitted + // events before sending to user). + ia.OutputPrefix = name + + nd, err := ia.AddAllAndPin(f) + if err != nil { + return cid.Undef, err + } + return nd.Cid(), nil +} + +// An adder to add CAR files. It is at the moment very basic, and can +// add a single CAR file with a single root. Ideally, it should be able to +// add more complex, or several CARs by wrapping them with a single root. +// But for that we would need to keep state and track an MFS root similarly to +// what the ipfsadder does. +type carAdder struct { + ctx context.Context + dgs ClusterDAGService + params *api.AddParams + output chan *api.AddedOutput +} + +func newCarAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*carAdder, error) { + return &carAdder{ + ctx: ctx, + dgs: dgs, + params: params, + output: out, + }, nil +} + +// Add takes a node which should be a CAR file and nothing else and +// adds its blocks using the ClusterDAGService. +func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { + if ca.params.Wrap { + return cid.Undef, errors.New("cannot wrap a CAR file upload") + } + + f, ok := fn.(files.File) + if !ok { + return cid.Undef, errors.New("expected CAR file is not of type file") + } + carReader, err := car.NewCarReader(f) + if err != nil { + return cid.Undef, err + } + + if len(carReader.Header.Roots) != 1 { + return cid.Undef, errors.New("only CAR files with a single root are supported") + } + + root := carReader.Header.Roots[0] + bytes := uint64(0) + + for { + block, err := carReader.Next() + if err != nil && err != io.EOF { + return cid.Undef, err + } else if block == nil { + break + } + + bytes += uint64(len(block.RawData())) + + nd, err := ipld.Decode(block) + if err != nil { + return cid.Undef, err + } + + err = ca.dgs.Add(ca.ctx, nd) + if err != nil { + return cid.Undef, err + } + } + + ca.output <- &api.AddedOutput{ + Name: name, + Cid: root, + Bytes: bytes, + Size: 0, + } + + return root, nil +} diff --git a/adder/adder_test.go b/adder/adder_test.go index 0cfb1d20..a662266b 100644 --- a/adder/adder_test.go +++ b/adder/adder_test.go @@ -1,6 +1,7 @@ package adder import ( + "bytes" "context" "mime/multipart" "sync" @@ -9,32 +10,23 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" + "github.com/ipld/go-car" cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" - ipld "github.com/ipfs/go-ipld-format" ) type mockCDAGServ struct { - BaseDAGService - resultCids map[string]struct{} + *test.MockDAGService } -func (dag *mockCDAGServ) Add(ctx context.Context, node ipld.Node) error { - dag.resultCids[node.Cid().String()] = struct{}{} - return nil -} - -func (dag *mockCDAGServ) AddMany(ctx context.Context, nodes []ipld.Node) error { - for _, node := range nodes { - err := dag.Add(ctx, node) - if err != nil { - return err - } +func newMockCDAGServ() *mockCDAGServ { + return &mockCDAGServ{ + MockDAGService: test.NewMockDAGService(), } - return nil } +// noop func (dag *mockCDAGServ) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { return root, nil } @@ -49,9 +41,7 @@ func TestAdder(t *testing.T) { p := api.DefaultAddParams() expectedCids := test.ShardingDirCids[:] - dags := &mockCDAGServ{ - resultCids: make(map[string]struct{}), - } + dags := newMockCDAGServ() adder := New(dags, p, nil) @@ -64,12 +54,13 @@ func TestAdder(t *testing.T) { t.Error("expected the right content root") } - if len(expectedCids) != len(dags.resultCids) { + if len(expectedCids) != len(dags.Nodes) { t.Fatal("unexpected number of blocks imported") } for _, c := range expectedCids { - _, ok := dags.resultCids[c] + ci, _ := cid.Decode(c) + _, ok := dags.Nodes[ci] if !ok { t.Fatal("unexpected block emitted:", c) } @@ -83,9 +74,7 @@ func TestAdder_DoubleStart(t *testing.T) { f := sth.GetTreeSerialFile(t) p := api.DefaultAddParams() - dags := &mockCDAGServ{ - resultCids: make(map[string]struct{}), - } + dags := newMockCDAGServ() adder := New(dags, p, nil) _, err := adder.FromFiles(context.Background(), f) @@ -121,9 +110,7 @@ func TestAdder_ContextCancelled(t *testing.T) { p := api.DefaultAddParams() - dags := &mockCDAGServ{ - resultCids: make(map[string]struct{}), - } + dags := newMockCDAGServ() ctx, cancel := context.WithCancel(context.Background()) adder := New(dags, p, nil) @@ -142,3 +129,57 @@ func TestAdder_ContextCancelled(t *testing.T) { cancel() wg.Wait() } + +func TestAdder_CAR(t *testing.T) { + // prepare a CAR file + ctx := context.Background() + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + + mr, closer := sth.GetTreeMultiReader(t) + defer closer.Close() + r := multipart.NewReader(mr, mr.Boundary()) + p := api.DefaultAddParams() + dags := newMockCDAGServ() + adder := New(dags, p, nil) + root, err := adder.FromMultipart(ctx, r) + if err != nil { + t.Fatal(err) + } + var carBuf bytes.Buffer + err = car.WriteCar(ctx, dags, []cid.Cid{root}, &carBuf) + if err != nil { + t.Fatal(err) + } + + // Make the CAR look like a multipart. + carFile := files.NewReaderFile(&carBuf) + carDir := files.NewMapDirectory( + map[string]files.Node{"": carFile}, + ) + carMf := files.NewMultiFileReader(carDir, true) + carMr := multipart.NewReader(carMf, carMf.Boundary()) + + // Add the car, discarding old dags. + dags = newMockCDAGServ() + p.Format = "car" + adder = New(dags, p, nil) + root2, err := adder.FromMultipart(ctx, carMr) + if err != nil { + t.Fatal(err) + } + + if !root.Equals(root2) { + t.Error("Imported CAR file does not have expected root") + } + + expectedCids := test.ShardingDirCids[:] + for _, c := range expectedCids { + ci, _ := cid.Decode(c) + _, ok := dags.Nodes[ci] + if !ok { + t.Fatal("unexpected block extracted from CAR:", c) + } + } + +} diff --git a/adder/sharding/dag.go b/adder/sharding/dag.go index f265d1ff..bd5bdae5 100644 --- a/adder/sharding/dag.go +++ b/adder/sharding/dag.go @@ -28,10 +28,12 @@ import ( mh "github.com/multiformats/go-multihash" ) +// go-merkledag does this, but it may be moved. +// We include for explicitness. func init() { ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock) ipld.Register(cid.Raw, dag.DecodeRawBlock) - ipld.Register(cid.DagCBOR, cbor.DecodeBlock) // need to decode CBOR + ipld.Register(cid.DagCBOR, cbor.DecodeBlock) } // MaxLinks is the max number of links that, when serialized fit into a block diff --git a/adder/sharding/dag_service_test.go b/adder/sharding/dag_service_test.go index b60d3461..d0de0e77 100644 --- a/adder/sharding/dag_service_test.go +++ b/adder/sharding/dag_service_test.go @@ -197,11 +197,13 @@ func TestFromMultipart_Errors(t *testing.T) { { name: "bad chunker", params: &api.AddParams{ - Layout: "", - Chunker: "aweee", - RawLeaves: false, - Hidden: false, - Shard: true, + Format: "", + IPFSAddParams: api.IPFSAddParams{ + Chunker: "aweee", + RawLeaves: false, + }, + Hidden: false, + Shard: true, PinOptions: api.PinOptions{ ReplicationFactorMin: -1, ReplicationFactorMax: -1, @@ -213,11 +215,13 @@ func TestFromMultipart_Errors(t *testing.T) { { name: "shard size too small", params: &api.AddParams{ - Layout: "", - Chunker: "", - RawLeaves: false, - Hidden: false, - Shard: true, + Format: "", + IPFSAddParams: api.IPFSAddParams{ + Chunker: "", + RawLeaves: false, + }, + Hidden: false, + Shard: true, PinOptions: api.PinOptions{ ReplicationFactorMin: -1, ReplicationFactorMax: -1, @@ -229,11 +233,13 @@ func TestFromMultipart_Errors(t *testing.T) { { name: "replication too high", params: &api.AddParams{ - Layout: "", - Chunker: "", - RawLeaves: false, - Hidden: false, - Shard: true, + Format: "", + IPFSAddParams: api.IPFSAddParams{ + Chunker: "", + RawLeaves: false, + }, + Hidden: false, + Shard: true, PinOptions: api.PinOptions{ ReplicationFactorMin: 2, ReplicationFactorMax: 3, diff --git a/api/add.go b/api/add.go index ab34cc29..4653d964 100644 --- a/api/add.go +++ b/api/add.go @@ -21,6 +21,18 @@ type AddedOutput struct { Size uint64 `json:"size,omitempty" codec:"s,omitempty"` } +// IPFSAddParams groups options specific to the ipfs-adder, which builds +// UnixFS dags with the input files. This struct is embedded in AddParams. +type IPFSAddParams struct { + Layout string + Chunker string + RawLeaves bool + Progress bool + CidVersion int + HashFun string + NoCopy bool +} + // AddParams contains all of the configurable parameters needed to specify the // importing process of a file being added to an ipfs-cluster type AddParams struct { @@ -28,35 +40,28 @@ type AddParams struct { Local bool Recursive bool - Layout string - Chunker string - RawLeaves bool Hidden bool Wrap bool Shard bool - Progress bool - CidVersion int - HashFun string StreamChannels bool - NoCopy bool + Format string // selects with adder + + IPFSAddParams } // DefaultAddParams returns a AddParams object with standard defaults func DefaultAddParams() *AddParams { return &AddParams{ - Local: false, - Recursive: false, - Layout: "", // corresponds to balanced layout - Chunker: "size-262144", - RawLeaves: false, - Hidden: false, - Wrap: false, - Shard: false, - Progress: false, - CidVersion: 0, - HashFun: "sha2-256", + Local: false, + Recursive: false, + + Hidden: false, + Wrap: false, + Shard: false, + StreamChannels: true, - NoCopy: false, + + Format: "unixfs", PinOptions: PinOptions{ ReplicationFactorMin: 0, ReplicationFactorMax: 0, @@ -65,6 +70,15 @@ func DefaultAddParams() *AddParams { ShardSize: DefaultShardSize, Metadata: make(map[string]string), }, + IPFSAddParams: IPFSAddParams{ + Layout: "", // corresponds to balanced layout + Chunker: "size-262144", + RawLeaves: false, + Progress: false, + CidVersion: 0, + HashFun: "sha2-256", + NoCopy: false, + }, } } @@ -108,7 +122,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { case "trickle", "balanced", "": // nothing default: - return nil, errors.New("layout parameter invalid") + return nil, errors.New("layout parameter is invalid") } params.Layout = layout @@ -122,6 +136,14 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { params.HashFun = hashF } + format := query.Get("format") + switch format { + case "car", "unixfs", "": + default: + return nil, errors.New("format parameter is invalid") + } + params.Format = format + err = parseBoolParam(query, "local", ¶ms.Local) if err != nil { return nil, err @@ -195,6 +217,7 @@ func (p *AddParams) ToQueryString() (string, error) { query.Set("hash", p.HashFun) query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels)) query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy)) + query.Set("format", p.Format) return query.Encode(), nil } @@ -212,5 +235,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool { p.CidVersion == p2.CidVersion && p.HashFun == p2.HashFun && p.StreamChannels == p2.StreamChannels && - p.NoCopy == p2.NoCopy + p.NoCopy == p2.NoCopy && + p.Format == p2.Format } diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 00fae750..9405db94 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -618,10 +618,12 @@ func TestAddMultiFile(t *testing.T) { Name: "test something", ShardSize: 1024, }, - Shard: false, - Layout: "", - Chunker: "", - RawLeaves: false, + Shard: false, + Format: "", + IPFSAddParams: types.IPFSAddParams{ + Chunker: "", + RawLeaves: false, + }, Hidden: false, StreamChannels: true, } diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index ca1a6734..5de104f1 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -274,18 +274,19 @@ a Cluster Pin operation on success. It takes elements from local paths as well as from web URLs (accessed with a GET request). Providing several arguments will automatically set --wrap-in-directory. -Cluster Add is equivalent to "ipfs add" in terms of DAG building, and supports -the same options for adjusting the chunker, the DAG layout etc. However, -it will allocate the content and send it directly to the allocated peers (among -which may not necessarily be the local ipfs daemon). +Cluster "add" works, by default, just like "ipfs add" and has similar options +in terms of DAG layout, chunker, hash function etc. It also supports adding +CAR files directly (--format car), as long as they have a single root. When +adding CAR files, all the options related to dag-building are ignored. -Once the adding process is finished, the content is fully added to all -allocations and pinned in them. This makes cluster add slower than a local -ipfs add, but the result is a fully replicated CID on completion. -If you prefer faster adding, use the --local flag to add directly to the local -IPFS node and pin in the destinations after that. Note that the local IPFS -node may not be among the destinations, which will leave the unpinned content -in it. +Added content will be allocated and sent block by block to the peers that +should pin it (among which may not necessarily be the local ipfs daemon). +Once all the blocks have arrived, they will be "cluster-pinned". This makes +cluster add slower than a local ipfs add, but the result is a fully replicated +on completion. If you prefer faster adding, use the --local flag to add +directly to the local IPFS node and pin in the destinations after that. +Note that the local IPFS node may not be among the destinations, which will +leave the unpinned content in it. Optional replication-min and replication-max factors can be provided: -1 means "pin everywhere" and 0 means use cluster's default setting (i.e., replication @@ -318,38 +319,6 @@ content. Name: "no-stream", Usage: "Buffer output locally. Produces a valid JSON array with --enc=json.", }, - cli.StringFlag{ - Name: "layout", - Value: defaultAddParams.Layout, - Usage: "Dag layout to use for dag generation: balanced or trickle", - }, - cli.BoolFlag{ - Name: "wrap-with-directory, w", - Usage: "Wrap a with a directory object", - }, - cli.BoolFlag{ - Name: "hidden, H", - Usage: "Include files that are hidden. Only takes effect on recursive add", - }, - cli.StringFlag{ - Name: "chunker, s", - Usage: "'size-' or 'rabin---'", - Value: defaultAddParams.Chunker, - }, - cli.BoolFlag{ - Name: "raw-leaves", - Usage: "Use raw blocks for leaves (experimental)", - }, - cli.IntFlag{ - Name: "cid-version", - Usage: "CID version. Non default implies raw-leaves", - Value: defaultAddParams.CidVersion, - }, - cli.StringFlag{ - Name: "hash", - Usage: "Hash function to use. Implies cid-version=1", - Value: defaultAddParams.HashFun, - }, cli.BoolFlag{ Name: "local", Usage: "Add to local peer but pin normally", @@ -381,10 +350,6 @@ content. Name: "allocations, allocs", Usage: "Optional comma-separated list of peer IDs", }, - cli.BoolFlag{ - Name: "nocopy", - Usage: "Add the URL using filestore. Implies raw-leaves. (experimental)", - }, cli.BoolFlag{ Name: "wait", Usage: "Wait for all nodes to report a status of pinned before returning", @@ -394,6 +359,51 @@ content. Value: 0, Usage: "How long to --wait (in seconds), default is indefinitely", }, + + cli.BoolFlag{ + Name: "wrap-with-directory, w", + Usage: "Wrap a with a directory object", + }, + + cli.StringFlag{ + Name: "format", + Value: defaultAddParams.Format, + Usage: "'unixfs' (add as unixfs DAG), 'car' (import CAR file)", + }, + + cli.StringFlag{ + Name: "layout", + Value: defaultAddParams.Layout, + Usage: "Dag layout to use for dag generation: balanced or trickle", + }, + cli.BoolFlag{ + Name: "hidden, H", + Usage: "Include files that are hidden. Only takes effect on recursive add", + }, + cli.StringFlag{ + Name: "chunker, s", + Usage: "'size-' or 'rabin---'", + Value: defaultAddParams.Chunker, + }, + cli.BoolFlag{ + Name: "raw-leaves", + Usage: "Use raw blocks for leaves (experimental)", + }, + cli.IntFlag{ + Name: "cid-version", + Usage: "CID version. Non default implies raw-leaves", + Value: defaultAddParams.CidVersion, + }, + cli.StringFlag{ + Name: "hash", + Usage: "Hash function to use. Implies cid-version=1", + Value: defaultAddParams.HashFun, + }, + cli.BoolFlag{ + Name: "nocopy", + Usage: "Add the URL using filestore. Implies raw-leaves. (experimental)", + }, + // TODO: Uncomment when sharding is supported. // cli.BoolFlag{ // Name: "shard", @@ -448,6 +458,7 @@ content. if c.String("allocations") != "" { p.UserAllocations = api.StringsToPeers(strings.Split(c.String("allocations"), ",")) } + p.Format = c.String("format") //p.Shard = shard //p.ShardSize = c.Uint64("shard-size") p.Shard = false @@ -471,6 +482,11 @@ content. p.RawLeaves = true } + // Prevent footgun + if p.Wrap && p.Format == "car" { + checkErr("", errors.New("only a single CAR file can be added and wrap-with-directory is not supported")) + } + out := make(chan *api.AddedOutput, 1) var wg sync.WaitGroup wg.Add(1) diff --git a/go.mod b/go.mod index b89c5387..f3a4a1dc 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/ipfs/go-mfs v0.1.2 github.com/ipfs/go-path v0.0.9 github.com/ipfs/go-unixfs v0.2.4 + github.com/ipld/go-car v0.2.2 github.com/kelseyhightower/envconfig v1.4.0 github.com/kishansagathiya/go-dot v0.1.0 github.com/lanzafame/go-libp2p-ocgorpc v0.1.1 diff --git a/go.sum b/go.sum index 52e1ca87..b91d13c3 100644 --- a/go.sum +++ b/go.sum @@ -392,6 +392,7 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.1/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= github.com/ipfs/go-blockservice v0.1.2/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= +github.com/ipfs/go-blockservice v0.1.3/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU= github.com/ipfs/go-blockservice v0.1.4 h1:Vq+MlsH8000KbbUciRyYMEw/NNP8UAGmcqKi4uWmFGA= github.com/ipfs/go-blockservice v0.1.4/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -507,6 +508,7 @@ github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKy github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= github.com/ipfs/go-merkledag v0.3.0/go.mod h1:4pymaZLhSLNVuiCITYrpViD6vmfZ/Ws4n/L9tfNv3S4= +github.com/ipfs/go-merkledag v0.3.1/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M= github.com/ipfs/go-merkledag v0.3.2 h1:MRqj40QkrWkvPswXs4EfSslhZ4RVPRbxwX11js0t1xY= github.com/ipfs/go-merkledag v0.3.2/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= @@ -529,6 +531,12 @@ github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2 github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= github.com/ipfs/interface-go-ipfs-core v0.4.0 h1:+mUiamyHIwedqP8ZgbCIwpy40oX7QcXUbo4CZOeJVJg= github.com/ipfs/interface-go-ipfs-core v0.4.0/go.mod h1:UJBcU6iNennuI05amq3FQ7g0JHUkibHFAfhfUIy927o= +github.com/ipld/go-car v0.2.2 h1:Dq0Kl0XMaNMCNxATbYYu/EIgqWjo2w2W+Miu6npd20g= +github.com/ipld/go-car v0.2.2/go.mod h1:pPb7hzVBHBoRqU3GkPy1d6FZKQoGQ56yd3MyPGzZ0Xs= +github.com/ipld/go-ipld-prime v0.7.0 h1:eigF1ZpaL1prbsKYVMqPLoPJqD/pzkQOe2j1uzvVg7w= +github.com/ipld/go-ipld-prime v0.7.0/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= +github.com/ipld/go-ipld-prime-proto v0.1.1 h1:EX4yWYaIqSLwtVE30nxEcZDcvsWDtx1vImSG+XCJebY= +github.com/ipld/go-ipld-prime-proto v0.1.1/go.mod h1:cI9NwYAUKCLUwqufoUjChISxuTEkaY2yvNYCRCuhRck= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -536,6 +544,7 @@ github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs= github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= +github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c/go.mod h1:sdx1xVM9UuLw1tXnhJWN3piypTUO3vCIHYmG15KE/dU= github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= @@ -1236,6 +1245,7 @@ github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMI github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 h1:8kxMKmKzXXL4Ru1nyhvdms/JjWt+3YLpvRb/bAjO/y0= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= +github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 h1:WXhVOwj2USAXB5oMDwRl3piOux2XMV9TANaYxXHdkoE= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= @@ -1317,6 +1327,7 @@ golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/test/sharding.go b/test/sharding.go index c243e15d..5f85b0a4 100644 --- a/test/sharding.go +++ b/test/sharding.go @@ -1,14 +1,17 @@ package test import ( + "context" "encoding/hex" "io" "math/rand" "os" "path/filepath" + "sync" "testing" files "github.com/ipfs/go-ipfs-files" + format "github.com/ipfs/go-ipld-format" cid "github.com/ipfs/go-cid" ) @@ -277,3 +280,77 @@ func (sth *ShardingTestHelper) makeRandFile(t *testing.T, kbs int) os.FileInfo { return st } + +// MockDAGService implements an in-memory DAGService. The stored nodes are +// inspectable via the Nodes map. +type MockDAGService struct { + mu sync.Mutex + Nodes map[cid.Cid]format.Node +} + +// NewMockDAGService returns an in-memory DAG Service. +func NewMockDAGService() *MockDAGService { + return &MockDAGService{Nodes: make(map[cid.Cid]format.Node)} +} + +// Get reads a node. +func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) { + d.mu.Lock() + defer d.mu.Unlock() + if n, ok := d.Nodes[cid]; ok { + return n, nil + } + return nil, format.ErrNotFound +} + +// GetMany reads many nodes. +func (d *MockDAGService) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption { + d.mu.Lock() + defer d.mu.Unlock() + out := make(chan *format.NodeOption, len(cids)) + for _, c := range cids { + if n, ok := d.Nodes[c]; ok { + out <- &format.NodeOption{Node: n} + } else { + out <- &format.NodeOption{Err: format.ErrNotFound} + } + } + close(out) + return out +} + +// Add adds a node. +func (d *MockDAGService) Add(ctx context.Context, node format.Node) error { + d.mu.Lock() + defer d.mu.Unlock() + d.Nodes[node.Cid()] = node + return nil +} + +// AddMany adds many nodes. +func (d *MockDAGService) AddMany(ctx context.Context, nodes []format.Node) error { + d.mu.Lock() + defer d.mu.Unlock() + for _, n := range nodes { + d.Nodes[n.Cid()] = n + } + return nil +} + +// Remove deletes a node. +func (d *MockDAGService) Remove(ctx context.Context, c cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.Nodes, c) + return nil +} + +// RemoveMany removes many nodes. +func (d *MockDAGService) RemoveMany(ctx context.Context, cids []cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + for _, c := range cids { + delete(d.Nodes, c) + } + return nil +}