diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index e792557f..cd10621d 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -9,7 +9,6 @@ import ( "sync" "github.com/ipfs/ipfs-cluster/adder" - "github.com/ipfs/ipfs-cluster/adder/local" "github.com/ipfs/ipfs-cluster/adder/sharding" "github.com/ipfs/ipfs-cluster/adder/single" "github.com/ipfs/ipfs-cluster/api" @@ -36,12 +35,10 @@ func AddMultipartHTTPHandler( var dags adder.ClusterDAGService output := make(chan *api.AddedOutput, 200) - if params.Local { - dags = local.New(rpc, params.PinOptions) - } else if params.Shard { + if params.Shard { dags = sharding.New(rpc, params.PinOptions, output) } else { - dags = single.New(rpc, params.PinOptions) + dags = single.New(rpc, params.PinOptions, params.Local) } if outputTransform == nil { diff --git a/adder/local/dag_service.go b/adder/local/dag_service.go deleted file mode 100644 index 35f845e7..00000000 --- a/adder/local/dag_service.go +++ /dev/null @@ -1,58 +0,0 @@ -// Package local implements a ClusterDAGService that chunks and adds content -// to the local peer, before pinning it. -package local - -import ( - "context" - - adder "github.com/ipfs/ipfs-cluster/adder" - "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" - logging "github.com/ipfs/go-log" - rpc "github.com/libp2p/go-libp2p-gorpc" -) - -var logger = logging.Logger("localdags") - -// DAGService is an implementation of an adder.ClusterDAGService which -// puts the added blocks directly in the peers allocated to them (without -// sharding). -type DAGService struct { - adder.BaseDAGService - - rpcClient *rpc.Client - - pinOpts api.PinOptions -} - -// New returns a new Adder with the given rpc Client. The client is used -// to perform calls to IPFS.BlockPut and Pin content on Cluster. -func New(rpc *rpc.Client, opts api.PinOptions) *DAGService { - return &DAGService{ - rpcClient: rpc, - pinOpts: opts, - } -} - -// Add puts the given node in the destination peers. -func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { - return adder.AddNodeLocal(ctx, dgs.rpcClient, node) -} - -// Finalize pins the last Cid added to this DAGService. -func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { - return root, adder.Pin(ctx, dgs.rpcClient, api.PinWithOpts(root, dgs.pinOpts)) -} - -// AddMany calls Add for every given node. -func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { - for _, node := range nodes { - err := dgs.Add(ctx, node) - if err != nil { - return err - } - } - return nil -} diff --git a/adder/local/dag_service_test.go b/adder/local/dag_service_test.go deleted file mode 100644 index 469c3dc0..00000000 --- a/adder/local/dag_service_test.go +++ /dev/null @@ -1 +0,0 @@ -package local diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index 6dc29d22..1a67d7be 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -30,22 +30,28 @@ type DAGService struct { dests []peer.ID pinOpts api.PinOptions + local bool ba *adder.BlockAdder } // New returns a new Adder with the given rpc Client. The client is used // to perform calls to IPFS.BlockPut and Pin content on Cluster. -func New(rpc *rpc.Client, opts api.PinOptions) *DAGService { +func New(rpc *rpc.Client, opts api.PinOptions, local bool) *DAGService { return &DAGService{ rpcClient: rpc, dests: nil, pinOpts: opts, + local: local, } } // Add puts the given node in the destination peers. func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { + if dgs.local { + return adder.AddNodeLocal(ctx, dgs.rpcClient, node) + } + if dgs.dests == nil { dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts) if err != nil { @@ -62,8 +68,11 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { // Cluster pin the result rootPin := api.PinWithOpts(root, dgs.pinOpts) - rootPin.Allocations = dgs.dests + if dgs.local { + return root, adder.Pin(ctx, dgs.rpcClient, rootPin) + } + rootPin.Allocations = dgs.dests dgs.dests = nil return root, adder.Pin(ctx, dgs.rpcClient, rootPin) diff --git a/adder/single/dag_service_test.go b/adder/single/dag_service_test.go index cc9e55c3..3dc86a7b 100644 --- a/adder/single/dag_service_test.go +++ b/adder/single/dag_service_test.go @@ -61,7 +61,7 @@ func TestAdd(t *testing.T) { params := api.DefaultAddParams() params.Wrap = true - dags := New(client, params.PinOptions) + dags := New(client, params.PinOptions, false) add := adder.New(dags, params, nil) sth := test.NewShardingTestHelper() @@ -109,7 +109,7 @@ func TestAdd(t *testing.T) { params := api.DefaultAddParams() params.Layout = "trickle" - dags := New(client, params.PinOptions) + dags := New(client, params.PinOptions, false) add := adder.New(dags, params, nil) sth := test.NewShardingTestHelper() diff --git a/cluster.go b/cluster.go index 9f96a3b1..ccba752d 100644 --- a/cluster.go +++ b/cluster.go @@ -1484,7 +1484,7 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid. if params.Shard { dags = sharding.New(c.rpcClient, params.PinOptions, nil) } else { - dags = single.New(c.rpcClient, params.PinOptions) + dags = single.New(c.rpcClient, params.PinOptions, false) } add := adder.New(dags, params, nil) return add.FromMultipart(c.ctx, reader)