Adder: Add "no-pin" option.

This does 3 things:

- Add a NoPin option to the adder. When set to true, the adding process does not
send a pin in the end.

- When user-allocations are set and local=true happens, we do not overwrite
  the allocations returned by the allocator to include the local peer
  anymore, as this could alter user-allocations.

- Some code improvement (remove pointers).
This commit is contained in:
Hector Sanjuan 2022-02-28 19:44:04 +01:00
parent 9e35b7bc9b
commit 00dffe23b8
13 changed files with 91 additions and 68 deletions

View File

@ -58,7 +58,7 @@ type Adder struct {
dgs ClusterDAGService
params *api.AddParams
params api.AddParams
// AddedOutput updates are placed on this channel
// whenever a block is processed. They contain information
@ -71,7 +71,7 @@ type Adder struct {
// channel to send updates during the adding process.
//
// An Adder may only be used once.
func New(ds ClusterDAGService, p *api.AddParams, out chan *api.AddedOutput) *Adder {
func New(ds ClusterDAGService, p api.AddParams, out chan *api.AddedOutput) *Adder {
// Discard all progress update output as the caller has not provided
// a channel for them to listen on.
if out == nil {
@ -184,7 +184,7 @@ type ipfsAdder struct {
*ipfsadd.Adder
}
func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) {
func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) {
iadder, err := ipfsadd.NewAdder(ctx, dgs)
if err != nil {
logger.Error(err)
@ -248,11 +248,11 @@ func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) {
type carAdder struct {
ctx context.Context
dgs ClusterDAGService
params *api.AddParams
params api.AddParams
output chan *api.AddedOutput
}
func newCarAdder(ctx context.Context, dgs ClusterDAGService, params *api.AddParams, out chan *api.AddedOutput) (*carAdder, error) {
func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*carAdder, error) {
return &carAdder{
ctx: ctx,
dgs: dgs,

View File

@ -27,7 +27,7 @@ var logger = logging.Logger("adder")
func AddMultipartHTTPHandler(
ctx context.Context,
rpc *rpc.Client,
params *api.AddParams,
params api.AddParams,
reader *multipart.Reader,
w http.ResponseWriter,
outputTransform func(*api.AddedOutput) interface{},
@ -36,9 +36,9 @@ func AddMultipartHTTPHandler(
output := make(chan *api.AddedOutput, 200)
if params.Shard {
dags = sharding.New(rpc, params.PinOptions, output)
dags = sharding.New(rpc, params, output)
} else {
dags = single.New(rpc, params.PinOptions, params.Local)
dags = single.New(rpc, params, params.Local)
}
if outputTransform == nil {

View File

@ -32,8 +32,8 @@ type DAGService struct {
rpcClient *rpc.Client
pinOpts api.PinOptions
output chan<- *api.AddedOutput
addParams api.AddParams
output chan<- *api.AddedOutput
addedSet *cid.Set
@ -51,12 +51,12 @@ type DAGService struct {
// New returns a new ClusterDAGService, which uses the given rpc client to perform
// Allocate, IPFSBlockPut and Pin requests to other cluster components.
func New(rpc *rpc.Client, opts api.PinOptions, out chan<- *api.AddedOutput) *DAGService {
func New(rpc *rpc.Client, opts api.AddParams, out chan<- *api.AddedOutput) *DAGService {
// use a default value for this regardless of what is provided.
opts.Mode = api.PinModeRecursive
return &DAGService{
rpcClient: rpc,
pinOpts: opts,
addParams: opts,
output: out,
addedSet: cid.NewSet(),
shards: make(map[string]cid.Cid),
@ -101,17 +101,17 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
clusterDAG := clusterDAGNodes[0].Cid()
dgs.sendOutput(&api.AddedOutput{
Name: fmt.Sprintf("%s-clusterDAG", dgs.pinOpts.Name),
Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name),
Cid: clusterDAG,
Size: dgs.totalSize,
})
// Pin the ClusterDAG
clusterDAGPin := api.PinWithOpts(clusterDAG, dgs.pinOpts)
clusterDAGPin := api.PinWithOpts(clusterDAG, dgs.addParams.PinOptions)
clusterDAGPin.ReplicationFactorMin = -1
clusterDAGPin.ReplicationFactorMax = -1
clusterDAGPin.MaxDepth = 0 // pin direct
clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", dgs.pinOpts.Name)
clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name)
clusterDAGPin.Type = api.ClusterDAGType
clusterDAGPin.Reference = &dataRoot
err = adder.Pin(ctx, dgs.rpcClient, clusterDAGPin)
@ -120,7 +120,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
}
// Pin the META pin
metaPin := api.PinWithOpts(dataRoot, dgs.pinOpts)
metaPin := api.PinWithOpts(dataRoot, dgs.addParams.PinOptions)
metaPin.Type = api.MetaType
metaPin.Reference = &clusterDAG
metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned
@ -138,7 +138,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
// shardParents := cid.NewSet()
// shardParents.Add(clusterDAG)
// for shardN, shard := range dgs.shardNodes {
// pin := api.PinWithOpts(shard, dgs.pinOpts)
// pin := api.PinWithOpts(shard, dgs.addParams)
// pin.Name := fmt.Sprintf("%s-shard-%s", pin.Name, shardN)
// pin.Type = api.ShardType
// pin.Parents = shardParents
@ -160,16 +160,16 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error {
// if we have no currentShard, create one
if shard == nil {
logger.Infof("new shard for '%s': #%d", dgs.pinOpts.Name, len(dgs.shards))
logger.Infof("new shard for '%s': #%d", dgs.addParams.Name, len(dgs.shards))
var err error
shard, err = newShard(ctx, dgs.rpcClient, dgs.pinOpts)
shard, err = newShard(ctx, dgs.rpcClient, dgs.addParams.PinOptions)
if err != nil {
return err
}
dgs.currentShard = shard
}
logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid(), len(dgs.shards), dgs.pinOpts.Name)
logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid(), len(dgs.shards), dgs.addParams.Name)
// this is not same as n.Size()
size := uint64(len(n.RawData()))

View File

@ -64,7 +64,7 @@ func (rpcs *testRPC) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) {
return bI.([]byte), nil
}
func makeAdder(t *testing.T, params *api.AddParams) (*adder.Adder, *testRPC) {
func makeAdder(t *testing.T, params api.AddParams) (*adder.Adder, *testRPC) {
rpcObj := &testRPC{}
server := rpc.NewServer(nil, "mock")
err := server.RegisterName("Cluster", rpcObj)
@ -79,7 +79,7 @@ func makeAdder(t *testing.T, params *api.AddParams) (*adder.Adder, *testRPC) {
out := make(chan *api.AddedOutput, 1)
dags := New(client, params.PinOptions, out)
dags := New(client, params, out)
add := adder.New(dags, params, out)
go func() {
@ -190,13 +190,13 @@ func TestFromMultipart(t *testing.T) {
func TestFromMultipart_Errors(t *testing.T) {
type testcase struct {
name string
params *api.AddParams
params api.AddParams
}
tcs := []*testcase{
{
name: "bad chunker",
params: &api.AddParams{
params: api.AddParams{
Format: "",
IPFSAddParams: api.IPFSAddParams{
Chunker: "aweee",
@ -214,7 +214,7 @@ func TestFromMultipart_Errors(t *testing.T) {
},
{
name: "shard size too small",
params: &api.AddParams{
params: api.AddParams{
Format: "",
IPFSAddParams: api.IPFSAddParams{
Chunker: "",
@ -232,7 +232,7 @@ func TestFromMultipart_Errors(t *testing.T) {
},
{
name: "replication too high",
params: &api.AddParams{
params: api.AddParams{
Format: "",
IPFSAddParams: api.IPFSAddParams{
Chunker: "",

View File

@ -26,22 +26,22 @@ type DAGService struct {
rpcClient *rpc.Client
dests []peer.ID
pinOpts api.PinOptions
local bool
dests []peer.ID
addParams api.AddParams
local bool
ba *adder.BlockAdder
}
// New returns a new Adder with the given rpc Client. The client is used
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
func New(rpc *rpc.Client, opts api.PinOptions, local bool) *DAGService {
func New(rpc *rpc.Client, opts api.AddParams, local bool) *DAGService {
// ensure don't Add something and pin it in direct mode.
opts.Mode = api.PinModeRecursive
return &DAGService{
rpcClient: rpc,
dests: nil,
pinOpts: opts,
addParams: opts,
local: local,
}
}
@ -49,7 +49,7 @@ func New(rpc *rpc.Client, opts api.PinOptions, local bool) *DAGService {
// Add puts the given node in the destination peers.
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
if dgs.dests == nil {
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts)
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.addParams.PinOptions)
if err != nil {
return err
}
@ -57,8 +57,9 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
dgs.dests = dests
if dgs.local {
// If this is a local pin, make sure that the local peer is
// among the allocations.
// If this is a local pin, make sure that the local
// peer is among the allocations..
// UNLESS user-allocations are defined!
localPid := dgs.rpcClient.ID()
hasLocal := false
for _, d := range dests {
@ -67,7 +68,10 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
break
}
}
if !hasLocal && localPid != "" {
if !hasLocal &&
localPid != "" &&
len(dgs.addParams.UserAllocations) == 0 {
// replace last allocation with local peer
dgs.dests[len(dgs.dests)-1] = localPid
}
@ -83,8 +87,16 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
// Do not pin, just block put.
// Why? Because some people are uploading CAR files with partial DAGs
// and ideally they should be pinning only when the last partial CAR
// is uploaded. This gives them that option.
if dgs.addParams.NoPin {
return root, nil
}
// Cluster pin the result
rootPin := api.PinWithOpts(root, dgs.pinOpts)
rootPin := api.PinWithOpts(root, dgs.addParams.PinOptions)
rootPin.Allocations = dgs.dests
dgs.dests = nil

View File

@ -61,7 +61,7 @@ func TestAdd(t *testing.T) {
params := api.DefaultAddParams()
params.Wrap = true
dags := New(client, params.PinOptions, false)
dags := New(client, params, false)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()
@ -109,7 +109,7 @@ func TestAdd(t *testing.T) {
params := api.DefaultAddParams()
params.Layout = "trickle"
dags := New(client, params.PinOptions, false)
dags := New(client, params, false)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()

View File

@ -45,13 +45,14 @@ type AddParams struct {
Shard bool
StreamChannels bool
Format string // selects with adder
NoPin bool
IPFSAddParams
}
// DefaultAddParams returns a AddParams object with standard defaults
func DefaultAddParams() *AddParams {
return &AddParams{
func DefaultAddParams() AddParams {
return AddParams{
Local: false,
Recursive: false,
@ -62,6 +63,7 @@ func DefaultAddParams() *AddParams {
StreamChannels: true,
Format: "unixfs",
NoPin: false,
PinOptions: PinOptions{
ReplicationFactorMin: 0,
ReplicationFactorMax: 0,
@ -107,13 +109,13 @@ func parseIntParam(q url.Values, name string, dest *int) error {
// AddParamsFromQuery parses the AddParams object from
// a URL.Query().
func AddParamsFromQuery(query url.Values) (*AddParams, error) {
func AddParamsFromQuery(query url.Values) (AddParams, error) {
params := DefaultAddParams()
opts := &PinOptions{}
err := opts.FromQuery(query)
if err != nil {
return nil, err
return params, err
}
params.PinOptions = *opts
params.PinUpdate = cid.Undef // hardcode as does not make sense for adding
@ -123,7 +125,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
case "trickle", "balanced", "":
// nothing
default:
return nil, errors.New("layout parameter is invalid")
return params, errors.New("layout parameter is invalid")
}
params.Layout = layout
@ -141,41 +143,41 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
switch format {
case "car", "unixfs", "":
default:
return nil, errors.New("format parameter is invalid")
return params, errors.New("format parameter is invalid")
}
params.Format = format
err = parseBoolParam(query, "local", &params.Local)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "recursive", &params.Recursive)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "hidden", &params.Hidden)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "wrap-with-directory", &params.Wrap)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "shard", &params.Shard)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "progress", &params.Progress)
if err != nil {
return nil, err
return params, err
}
err = parseIntParam(query, "cid-version", &params.CidVersion)
if err != nil {
return nil, err
return params, err
}
// This mimics go-ipfs behaviour.
@ -188,24 +190,29 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
// CidVersion). Otherwise, it will be explicitly set.
err = parseBoolParam(query, "raw-leaves", &params.RawLeaves)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "stream-channels", &params.StreamChannels)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "nocopy", &params.NoCopy)
if err != nil {
return nil, err
return params, err
}
err = parseBoolParam(query, "no-pin", &params.NoPin)
if err != nil {
return params, err
}
return params, nil
}
// ToQueryString returns a url query string (key=value&key2=value2&...)
func (p *AddParams) ToQueryString() (string, error) {
func (p AddParams) ToQueryString() (string, error) {
pinOptsQuery, err := p.PinOptions.ToQuery()
if err != nil {
return "", err
@ -228,11 +235,12 @@ func (p *AddParams) ToQueryString() (string, error) {
query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels))
query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy))
query.Set("format", p.Format)
query.Set("no-pin", fmt.Sprintf("%t", p.NoPin))
return query.Encode(), nil
}
// Equals checks if p equals p2.
func (p *AddParams) Equals(p2 *AddParams) bool {
func (p AddParams) Equals(p2 AddParams) bool {
return p.PinOptions.Equals(&p2.PinOptions) &&
p.Local == p2.Local &&
p.Recursive == p2.Recursive &&
@ -246,5 +254,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool {
p.HashFun == p2.HashFun &&
p.StreamChannels == p2.StreamChannels &&
p.NoCopy == p2.NoCopy &&
p.Format == p2.Format
p.Format == p2.Format &&
p.NoPin == p2.NoPin
}

View File

@ -58,9 +58,9 @@ type Client interface {
PeerRm(ctx context.Context, pid peer.ID) error
// Add imports files to the cluster from the given paths.
Add(ctx context.Context, paths []string, params *api.AddParams, out chan<- *api.AddedOutput) error
Add(ctx context.Context, paths []string, params api.AddParams, out chan<- *api.AddedOutput) error
// AddMultiFile imports new files from a MultiFileReader.
AddMultiFile(ctx context.Context, multiFileR *files.MultiFileReader, params *api.AddParams, out chan<- *api.AddedOutput) error
AddMultiFile(ctx context.Context, multiFileR *files.MultiFileReader, params api.AddParams, out chan<- *api.AddedOutput) error
// Pin tracks a Cid with the given replication factor and a name for
// human-friendliness.

View File

@ -393,7 +393,7 @@ func (lc *loadBalancingClient) RepoGC(ctx context.Context, local bool) (*api.Glo
func (lc *loadBalancingClient) Add(
ctx context.Context,
paths []string,
params *api.AddParams,
params api.AddParams,
out chan<- *api.AddedOutput,
) error {
call := func(c Client) error {
@ -407,7 +407,7 @@ func (lc *loadBalancingClient) Add(
func (lc *loadBalancingClient) AddMultiFile(
ctx context.Context,
multiFileR *files.MultiFileReader,
params *api.AddParams,
params api.AddParams,
out chan<- *api.AddedOutput,
) error {
call := func(c Client) error {

View File

@ -513,7 +513,7 @@ func statusReached(target api.TrackerStatus, gblPinInfo *api.GlobalPinInfo, limi
}
// logic drawn from go-ipfs-cmds/cli/parse.go: appendFile
func makeSerialFile(fpath string, params *api.AddParams) (string, files.Node, error) {
func makeSerialFile(fpath string, params api.AddParams) (string, files.Node, error) {
if fpath == "." {
cwd, err := os.Getwd()
if err != nil {
@ -553,7 +553,7 @@ func makeSerialFile(fpath string, params *api.AddParams) (string, files.Node, er
func (c *defaultClient) Add(
ctx context.Context,
paths []string,
params *api.AddParams,
params api.AddParams,
out chan<- *api.AddedOutput,
) error {
ctx, span := trace.StartSpan(ctx, "client/Add")
@ -596,7 +596,7 @@ func (c *defaultClient) Add(
func (c *defaultClient) AddMultiFile(
ctx context.Context,
multiFileR *files.MultiFileReader,
params *api.AddParams,
params api.AddParams,
out chan<- *api.AddedOutput,
) error {
ctx, span := trace.StartSpan(ctx, "client/AddMultiFile")

View File

@ -749,7 +749,7 @@ func TestAddMultiFile(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
p := &types.AddParams{
p := types.AddParams{
PinOptions: types.PinOptions{
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,

View File

@ -1654,14 +1654,14 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (*api.Pin, error)
// pipeline is used to DAGify the file. Depending on input parameters this
// DAG can be added locally to the calling cluster peer's ipfs repo, or
// sharded across the entire cluster.
func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.Cid, error) {
func (c *Cluster) AddFile(reader *multipart.Reader, params api.AddParams) (cid.Cid, error) {
// TODO: add context param and tracing
var dags adder.ClusterDAGService
if params.Shard {
dags = sharding.New(c.rpcClient, params.PinOptions, nil)
dags = sharding.New(c.rpcClient, params, nil)
} else {
dags = single.New(c.rpcClient, params.PinOptions, params.Local)
dags = single.New(c.rpcClient, params, params.Local)
}
add := adder.New(dags, params, nil)
return add.FromMultipart(c.ctx, reader)

View File

@ -344,6 +344,8 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out
return errFollowerMode
}
// Allocating for a existing pin. Usually the adder calls this with
// cid.Undef.
existing, err := rpcapi.c.PinGet(ctx, in.Cid)
if err != nil && err != state.ErrNotFound {
return err