diff --git a/adder/adder.go b/adder/adder.go index d45c50dd..44e5481d 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -58,7 +58,7 @@ type Adder struct { dgs ClusterDAGService - params *api.AddParams + params api.AddParams // AddedOutput updates are placed on this channel // whenever a block is processed. They contain information @@ -71,7 +71,7 @@ type Adder struct { // channel to send updates during the adding process. // // An Adder may only be used once. -func New(ds ClusterDAGService, p *api.AddParams, out chan *api.AddedOutput) *Adder { +func New(ds ClusterDAGService, p api.AddParams, out chan *api.AddedOutput) *Adder { // Discard all progress update output as the caller has not provided // a channel for them to listen on. if out == nil { @@ -184,7 +184,7 @@ type ipfsAdder struct { *ipfsadd.Adder } -func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) { +func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) { iadder, err := ipfsadd.NewAdder(ctx, dgs) if err != nil { logger.Error(err) @@ -248,11 +248,11 @@ func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) { type carAdder struct { ctx context.Context dgs ClusterDAGService - params *api.AddParams + params api.AddParams output chan *api.AddedOutput } -func newCarAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*carAdder, error) { +func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*carAdder, error) { return &carAdder{ ctx: ctx, dgs: dgs, diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index 6a1212ae..1452be09 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -27,7 +27,7 @@ var logger = logging.Logger("adder") func AddMultipartHTTPHandler( ctx context.Context, rpc *rpc.Client, - params *api.AddParams, + params api.AddParams, reader *multipart.Reader, w http.ResponseWriter, outputTransform func(*api.AddedOutput) interface{}, @@ -36,9 +36,9 @@ func AddMultipartHTTPHandler( output := make(chan *api.AddedOutput, 200) if params.Shard { - dags = sharding.New(rpc, params.PinOptions, output) + dags = sharding.New(rpc, params, output) } else { - dags = single.New(rpc, params.PinOptions, params.Local) + dags = single.New(rpc, params, params.Local) } if outputTransform == nil { diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index a84455a2..6b52bdc4 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -32,8 +32,8 @@ type DAGService struct { rpcClient *rpc.Client - pinOpts api.PinOptions - output chan<- *api.AddedOutput + addParams api.AddParams + output chan<- *api.AddedOutput addedSet *cid.Set @@ -51,12 +51,12 @@ type DAGService struct { // New returns a new ClusterDAGService, which uses the given rpc client to perform // Allocate, IPFSBlockPut and Pin requests to other cluster components. -func New(rpc *rpc.Client, opts api.PinOptions, out chan<- *api.AddedOutput) *DAGService { +func New(rpc *rpc.Client, opts api.AddParams, out chan<- *api.AddedOutput) *DAGService { // use a default value for this regardless of what is provided. opts.Mode = api.PinModeRecursive return &DAGService{ rpcClient: rpc, - pinOpts: opts, + addParams: opts, output: out, addedSet: cid.NewSet(), shards: make(map[string]cid.Cid), @@ -101,17 +101,17 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, clusterDAG := clusterDAGNodes[0].Cid() dgs.sendOutput(&api.AddedOutput{ - Name: fmt.Sprintf("%s-clusterDAG", dgs.pinOpts.Name), + Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name), Cid: clusterDAG, Size: dgs.totalSize, }) // Pin the ClusterDAG - clusterDAGPin := api.PinWithOpts(clusterDAG, dgs.pinOpts) + clusterDAGPin := api.PinWithOpts(clusterDAG, dgs.addParams.PinOptions) clusterDAGPin.ReplicationFactorMin = -1 clusterDAGPin.ReplicationFactorMax = -1 clusterDAGPin.MaxDepth = 0 // pin direct - clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", dgs.pinOpts.Name) + clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name) clusterDAGPin.Type = api.ClusterDAGType clusterDAGPin.Reference = &dataRoot err = adder.Pin(ctx, dgs.rpcClient, clusterDAGPin) @@ -120,7 +120,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, } // Pin the META pin - metaPin := api.PinWithOpts(dataRoot, dgs.pinOpts) + metaPin := api.PinWithOpts(dataRoot, dgs.addParams.PinOptions) metaPin.Type = api.MetaType metaPin.Reference = &clusterDAG metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned @@ -138,7 +138,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, // shardParents := cid.NewSet() // shardParents.Add(clusterDAG) // for shardN, shard := range dgs.shardNodes { - // pin := api.PinWithOpts(shard, dgs.pinOpts) + // pin := api.PinWithOpts(shard, dgs.addParams) // pin.Name := fmt.Sprintf("%s-shard-%s", pin.Name, shardN) // pin.Type = api.ShardType // pin.Parents = shardParents @@ -160,16 +160,16 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error { // if we have no currentShard, create one if shard == nil { - logger.Infof("new shard for '%s': #%d", dgs.pinOpts.Name, len(dgs.shards)) + logger.Infof("new shard for '%s': #%d", dgs.addParams.Name, len(dgs.shards)) var err error - shard, err = newShard(ctx, dgs.rpcClient, dgs.pinOpts) + shard, err = newShard(ctx, dgs.rpcClient, dgs.addParams.PinOptions) if err != nil { return err } dgs.currentShard = shard } - logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid(), len(dgs.shards), dgs.pinOpts.Name) + logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid(), len(dgs.shards), dgs.addParams.Name) // this is not same as n.Size() size := uint64(len(n.RawData())) diff --git a/adder/sharding/dag_service_test.go b/adder/sharding/dag_service_test.go index d0de0e77..50afb060 100644 --- a/adder/sharding/dag_service_test.go +++ b/adder/sharding/dag_service_test.go @@ -64,7 +64,7 @@ func (rpcs *testRPC) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { return bI.([]byte), nil } -func makeAdder(t *testing.T, params *api.AddParams) (*adder.Adder, *testRPC) { +func makeAdder(t *testing.T, params api.AddParams) (*adder.Adder, *testRPC) { rpcObj := &testRPC{} server := rpc.NewServer(nil, "mock") err := server.RegisterName("Cluster", rpcObj) @@ -79,7 +79,7 @@ func makeAdder(t *testing.T, params *api.AddParams) (*adder.Adder, *testRPC) { out := make(chan *api.AddedOutput, 1) - dags := New(client, params.PinOptions, out) + dags := New(client, params, out) add := adder.New(dags, params, out) go func() { @@ -190,13 +190,13 @@ func TestFromMultipart(t *testing.T) { func TestFromMultipart_Errors(t *testing.T) { type testcase struct { name string - params *api.AddParams + params api.AddParams } tcs := []*testcase{ { name: "bad chunker", - params: &api.AddParams{ + params: api.AddParams{ Format: "", IPFSAddParams: api.IPFSAddParams{ Chunker: "aweee", @@ -214,7 +214,7 @@ func TestFromMultipart_Errors(t *testing.T) { }, { name: "shard size too small", - params: &api.AddParams{ + params: api.AddParams{ Format: "", IPFSAddParams: api.IPFSAddParams{ Chunker: "", @@ -232,7 +232,7 @@ func TestFromMultipart_Errors(t *testing.T) { }, { name: "replication too high", - params: &api.AddParams{ + params: api.AddParams{ Format: "", IPFSAddParams: api.IPFSAddParams{ Chunker: "", diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index f51c8fd9..29154782 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -26,22 +26,22 @@ type DAGService struct { rpcClient *rpc.Client - dests []peer.ID - pinOpts api.PinOptions - local bool + dests []peer.ID + addParams api.AddParams + local bool ba *adder.BlockAdder } // New returns a new Adder with the given rpc Client. The client is used // to perform calls to IPFS.BlockPut and Pin content on Cluster. -func New(rpc *rpc.Client, opts api.PinOptions, local bool) *DAGService { +func New(rpc *rpc.Client, opts api.AddParams, local bool) *DAGService { // ensure don't Add something and pin it in direct mode. opts.Mode = api.PinModeRecursive return &DAGService{ rpcClient: rpc, dests: nil, - pinOpts: opts, + addParams: opts, local: local, } } @@ -49,7 +49,7 @@ func New(rpc *rpc.Client, opts api.PinOptions, local bool) *DAGService { // Add puts the given node in the destination peers. func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { if dgs.dests == nil { - dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts) + dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.addParams.PinOptions) if err != nil { return err } @@ -57,8 +57,9 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { dgs.dests = dests if dgs.local { - // If this is a local pin, make sure that the local peer is - // among the allocations. + // If this is a local pin, make sure that the local + // peer is among the allocations.. + // UNLESS user-allocations are defined! localPid := dgs.rpcClient.ID() hasLocal := false for _, d := range dests { @@ -67,7 +68,10 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { break } } - if !hasLocal && localPid != "" { + + if !hasLocal && + localPid != "" && + len(dgs.addParams.UserAllocations) == 0 { // replace last allocation with local peer dgs.dests[len(dgs.dests)-1] = localPid } @@ -83,8 +87,16 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { // Finalize pins the last Cid added to this DAGService. func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { + // Do not pin, just block put. + // Why? Because some people are uploading CAR files with partial DAGs + // and ideally they should be pinning only when the last partial CAR + // is uploaded. This gives them that option. + if dgs.addParams.NoPin { + return root, nil + } + // Cluster pin the result - rootPin := api.PinWithOpts(root, dgs.pinOpts) + rootPin := api.PinWithOpts(root, dgs.addParams.PinOptions) rootPin.Allocations = dgs.dests dgs.dests = nil diff --git a/adder/single/dag_service_test.go b/adder/single/dag_service_test.go index 3dc86a7b..3785f23a 100644 --- a/adder/single/dag_service_test.go +++ b/adder/single/dag_service_test.go @@ -61,7 +61,7 @@ func TestAdd(t *testing.T) { params := api.DefaultAddParams() params.Wrap = true - dags := New(client, params.PinOptions, false) + dags := New(client, params, false) add := adder.New(dags, params, nil) sth := test.NewShardingTestHelper() @@ -109,7 +109,7 @@ func TestAdd(t *testing.T) { params := api.DefaultAddParams() params.Layout = "trickle" - dags := New(client, params.PinOptions, false) + dags := New(client, params, false) add := adder.New(dags, params, nil) sth := test.NewShardingTestHelper() diff --git a/api/add.go b/api/add.go index cf6e9db7..9ea5fb51 100644 --- a/api/add.go +++ b/api/add.go @@ -45,13 +45,14 @@ type AddParams struct { Shard bool StreamChannels bool Format string // selects with adder + NoPin bool IPFSAddParams } // DefaultAddParams returns a AddParams object with standard defaults -func DefaultAddParams() *AddParams { - return &AddParams{ +func DefaultAddParams() AddParams { + return AddParams{ Local: false, Recursive: false, @@ -62,6 +63,7 @@ func DefaultAddParams() *AddParams { StreamChannels: true, Format: "unixfs", + NoPin: false, PinOptions: PinOptions{ ReplicationFactorMin: 0, ReplicationFactorMax: 0, @@ -107,13 +109,13 @@ func parseIntParam(q url.Values, name string, dest *int) error { // AddParamsFromQuery parses the AddParams object from // a URL.Query(). -func AddParamsFromQuery(query url.Values) (*AddParams, error) { +func AddParamsFromQuery(query url.Values) (AddParams, error) { params := DefaultAddParams() opts := &PinOptions{} err := opts.FromQuery(query) if err != nil { - return nil, err + return params, err } params.PinOptions = *opts params.PinUpdate = cid.Undef // hardcode as does not make sense for adding @@ -123,7 +125,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { case "trickle", "balanced", "": // nothing default: - return nil, errors.New("layout parameter is invalid") + return params, errors.New("layout parameter is invalid") } params.Layout = layout @@ -141,41 +143,41 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { switch format { case "car", "unixfs", "": default: - return nil, errors.New("format parameter is invalid") + return params, errors.New("format parameter is invalid") } params.Format = format err = parseBoolParam(query, "local", ¶ms.Local) if err != nil { - return nil, err + return params, err } err = parseBoolParam(query, "recursive", ¶ms.Recursive) if err != nil { - return nil, err + return params, err } err = parseBoolParam(query, "hidden", ¶ms.Hidden) if err != nil { - return nil, err + return params, err } err = parseBoolParam(query, "wrap-with-directory", ¶ms.Wrap) if err != nil { - return nil, err + return params, err } err = parseBoolParam(query, "shard", ¶ms.Shard) if err != nil { - return nil, err + return params, err } err = parseBoolParam(query, "progress", ¶ms.Progress) if err != nil { - return nil, err + return params, err } err = parseIntParam(query, "cid-version", ¶ms.CidVersion) if err != nil { - return nil, err + return params, err } // This mimics go-ipfs behaviour. @@ -188,24 +190,29 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { // CidVersion). Otherwise, it will be explicitly set. err = parseBoolParam(query, "raw-leaves", ¶ms.RawLeaves) if err != nil { - return nil, err + return params, err } err = parseBoolParam(query, "stream-channels", ¶ms.StreamChannels) if err != nil { - return nil, err + return params, err } err = parseBoolParam(query, "nocopy", ¶ms.NoCopy) if err != nil { - return nil, err + return params, err + } + + err = parseBoolParam(query, "no-pin", ¶ms.NoPin) + if err != nil { + return params, err } return params, nil } // ToQueryString returns a url query string (key=value&key2=value2&...) -func (p *AddParams) ToQueryString() (string, error) { +func (p AddParams) ToQueryString() (string, error) { pinOptsQuery, err := p.PinOptions.ToQuery() if err != nil { return "", err @@ -228,11 +235,12 @@ func (p *AddParams) ToQueryString() (string, error) { query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels)) query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy)) query.Set("format", p.Format) + query.Set("no-pin", fmt.Sprintf("%t", p.NoPin)) return query.Encode(), nil } // Equals checks if p equals p2. -func (p *AddParams) Equals(p2 *AddParams) bool { +func (p AddParams) Equals(p2 AddParams) bool { return p.PinOptions.Equals(&p2.PinOptions) && p.Local == p2.Local && p.Recursive == p2.Recursive && @@ -246,5 +254,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool { p.HashFun == p2.HashFun && p.StreamChannels == p2.StreamChannels && p.NoCopy == p2.NoCopy && - p.Format == p2.Format + p.Format == p2.Format && + p.NoPin == p2.NoPin } diff --git a/api/rest/client/client.go b/api/rest/client/client.go index 34abcc0c..d1feaa86 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -58,9 +58,9 @@ type Client interface { PeerRm(ctx context.Context, pid peer.ID) error // Add imports files to the cluster from the given paths. - Add(ctx context.Context, paths []string, params *api.AddParams, out chan<- *api.AddedOutput) error + Add(ctx context.Context, paths []string, params api.AddParams, out chan<- *api.AddedOutput) error // AddMultiFile imports new files from a MultiFileReader. - AddMultiFile(ctx context.Context, multiFileR *files.MultiFileReader, params *api.AddParams, out chan<- *api.AddedOutput) error + AddMultiFile(ctx context.Context, multiFileR *files.MultiFileReader, params api.AddParams, out chan<- *api.AddedOutput) error // Pin tracks a Cid with the given replication factor and a name for // human-friendliness. diff --git a/api/rest/client/lbclient.go b/api/rest/client/lbclient.go index 38e5a7bf..b35800fc 100644 --- a/api/rest/client/lbclient.go +++ b/api/rest/client/lbclient.go @@ -393,7 +393,7 @@ func (lc *loadBalancingClient) RepoGC(ctx context.Context, local bool) (*api.Glo func (lc *loadBalancingClient) Add( ctx context.Context, paths []string, - params *api.AddParams, + params api.AddParams, out chan<- *api.AddedOutput, ) error { call := func(c Client) error { @@ -407,7 +407,7 @@ func (lc *loadBalancingClient) Add( func (lc *loadBalancingClient) AddMultiFile( ctx context.Context, multiFileR *files.MultiFileReader, - params *api.AddParams, + params api.AddParams, out chan<- *api.AddedOutput, ) error { call := func(c Client) error { diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index aa0795be..e91b5c40 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -513,7 +513,7 @@ func statusReached(target api.TrackerStatus, gblPinInfo *api.GlobalPinInfo, limi } // logic drawn from go-ipfs-cmds/cli/parse.go: appendFile -func makeSerialFile(fpath string, params *api.AddParams) (string, files.Node, error) { +func makeSerialFile(fpath string, params api.AddParams) (string, files.Node, error) { if fpath == "." { cwd, err := os.Getwd() if err != nil { @@ -553,7 +553,7 @@ func makeSerialFile(fpath string, params *api.AddParams) (string, files.Node, er func (c *defaultClient) Add( ctx context.Context, paths []string, - params *api.AddParams, + params api.AddParams, out chan<- *api.AddedOutput, ) error { ctx, span := trace.StartSpan(ctx, "client/Add") @@ -596,7 +596,7 @@ func (c *defaultClient) Add( func (c *defaultClient) AddMultiFile( ctx context.Context, multiFileR *files.MultiFileReader, - params *api.AddParams, + params api.AddParams, out chan<- *api.AddedOutput, ) error { ctx, span := trace.StartSpan(ctx, "client/AddMultiFile") diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 0c39e553..122a2e3b 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -749,7 +749,7 @@ func TestAddMultiFile(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() - p := &types.AddParams{ + p := types.AddParams{ PinOptions: types.PinOptions{ ReplicationFactorMin: -1, ReplicationFactorMax: -1, diff --git a/cluster.go b/cluster.go index 4dc447c2..e1e7e764 100644 --- a/cluster.go +++ b/cluster.go @@ -1654,14 +1654,14 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (*api.Pin, error) // pipeline is used to DAGify the file. Depending on input parameters this // DAG can be added locally to the calling cluster peer's ipfs repo, or // sharded across the entire cluster. -func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.Cid, error) { +func (c *Cluster) AddFile(reader *multipart.Reader, params api.AddParams) (cid.Cid, error) { // TODO: add context param and tracing var dags adder.ClusterDAGService if params.Shard { - dags = sharding.New(c.rpcClient, params.PinOptions, nil) + dags = sharding.New(c.rpcClient, params, nil) } else { - dags = single.New(c.rpcClient, params.PinOptions, params.Local) + dags = single.New(c.rpcClient, params, params.Local) } add := adder.New(dags, params, nil) return add.FromMultipart(c.ctx, reader) diff --git a/rpc_api.go b/rpc_api.go index bd5e3c7e..8be72184 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -344,6 +344,8 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out return errFollowerMode } + // Allocating for a existing pin. Usually the adder calls this with + // cid.Undef. existing, err := rpcapi.c.PinGet(ctx, in.Cid) if err != nil && err != state.ErrNotFound { return err