diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index e6bdb524..e792557f 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/ipfs/ipfs-cluster/adder" + "github.com/ipfs/ipfs-cluster/adder/local" "github.com/ipfs/ipfs-cluster/adder/sharding" "github.com/ipfs/ipfs-cluster/adder/single" "github.com/ipfs/ipfs-cluster/api" @@ -35,7 +36,9 @@ func AddMultipartHTTPHandler( var dags adder.ClusterDAGService output := make(chan *api.AddedOutput, 200) - if params.Shard { + if params.Local { + dags = local.New(rpc, params.PinOptions) + } else if params.Shard { dags = sharding.New(rpc, params.PinOptions, output) } else { dags = single.New(rpc, params.PinOptions) diff --git a/adder/local/dag_service.go b/adder/local/dag_service.go index 3f9250b5..35f845e7 100644 --- a/adder/local/dag_service.go +++ b/adder/local/dag_service.go @@ -1,10 +1,9 @@ // Package local implements a ClusterDAGService that chunks and adds content -// to a local peer, before pinning it. +// to the local peer, before pinning it. package local import ( "context" - "errors" adder "github.com/ipfs/ipfs-cluster/adder" "github.com/ipfs/ipfs-cluster/api" @@ -12,12 +11,9 @@ import ( cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) -var errNotFound = errors.New("dagservice: block not found") - var logger = logging.Logger("localdags") // DAGService is an implementation of an adder.ClusterDAGService which @@ -28,10 +24,7 @@ type DAGService struct { rpcClient *rpc.Client - dests []peer.ID pinOpts api.PinOptions - - ba *adder.BlockAdder } // New returns a new Adder with the given rpc Client. The client is used @@ -39,42 +32,18 @@ type DAGService struct { func New(rpc *rpc.Client, opts api.PinOptions) *DAGService { return &DAGService{ rpcClient: rpc, - dests: nil, pinOpts: opts, } } // Add puts the given node in the destination peers. func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { - if dgs.dests == nil { - dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts) - if err != nil { - return err - } - dgs.dests = dests - dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dests) - } - - return dgs.ba.Add(ctx, node) + return adder.AddNodeLocal(ctx, dgs.rpcClient, node) } // Finalize pins the last Cid added to this DAGService. func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { - // Cluster pin the result - rootPin := api.PinWithOpts(root, dgs.pinOpts) - rootPin.Allocations = dgs.dests - - dgs.dests = nil - - var pinResp api.Pin - return root, dgs.rpcClient.CallContext( - ctx, - "", - "Cluster", - "Pin", - rootPin, - &pinResp, - ) + return root, adder.Pin(ctx, dgs.rpcClient, api.PinWithOpts(root, dgs.pinOpts)) } // AddMany calls Add for every given node. diff --git a/adder/local/dag_service_test.go b/adder/local/dag_service_test.go index 43c292f7..469c3dc0 100644 --- a/adder/local/dag_service_test.go +++ b/adder/local/dag_service_test.go @@ -1,135 +1 @@ package local - -import ( - "context" - "errors" - "mime/multipart" - "sync" - "testing" - - adder "github.com/ipfs/ipfs-cluster/adder" - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/test" - - peer "github.com/libp2p/go-libp2p-core/peer" - rpc "github.com/libp2p/go-libp2p-gorpc" -) - -type testIPFSRPC struct { - blocks sync.Map -} - -type testClusterRPC struct { - pins sync.Map -} - -func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error { - rpcs.blocks.Store(in.Cid.String(), in) - return nil -} - -func (rpcs *testClusterRPC) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error { - rpcs.pins.Store(in.Cid.String(), in) - *out = *in - return nil -} - -func (rpcs *testClusterRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error { - if in.ReplicationFactorMin > 1 { - return errors.New("we can only replicate to 1 peer") - } - // it does not matter since we use host == nil for RPC, so it uses the - // local one in all cases. - *out = []peer.ID{test.PeerID1} - return nil -} - -func TestAdd(t *testing.T) { - t.Run("balanced", func(t *testing.T) { - clusterRPC := &testClusterRPC{} - ipfsRPC := &testIPFSRPC{} - server := rpc.NewServer(nil, "mock") - err := server.RegisterName("Cluster", clusterRPC) - if err != nil { - t.Fatal(err) - } - err = server.RegisterName("IPFSConnector", ipfsRPC) - if err != nil { - t.Fatal(err) - } - client := rpc.NewClientWithServer(nil, "mock", server) - params := api.DefaultAddParams() - params.Wrap = true - - dags := New(client, params.PinOptions) - add := adder.New(dags, params, nil) - - sth := test.NewShardingTestHelper() - defer sth.Clean(t) - mr, closer := sth.GetTreeMultiReader(t) - defer closer.Close() - r := multipart.NewReader(mr, mr.Boundary()) - - rootCid, err := add.FromMultipart(context.Background(), r) - if err != nil { - t.Fatal(err) - } - - if rootCid.String() != test.ShardingDirBalancedRootCIDWrapped { - t.Fatal("bad root cid: ", rootCid) - } - - expected := test.ShardingDirCids[:] - for _, c := range expected { - _, ok := ipfsRPC.blocks.Load(c) - if !ok { - t.Error("no IPFS.BlockPut for block", c) - } - } - - _, ok := clusterRPC.pins.Load(test.ShardingDirBalancedRootCIDWrapped) - if !ok { - t.Error("the tree wasn't pinned") - } - }) - - t.Run("trickle", func(t *testing.T) { - clusterRPC := &testClusterRPC{} - ipfsRPC := &testIPFSRPC{} - server := rpc.NewServer(nil, "mock") - err := server.RegisterName("Cluster", clusterRPC) - if err != nil { - t.Fatal(err) - } - err = server.RegisterName("IPFSConnector", ipfsRPC) - if err != nil { - t.Fatal(err) - } - client := rpc.NewClientWithServer(nil, "mock", server) - params := api.DefaultAddParams() - params.Layout = "trickle" - - dags := New(client, params.PinOptions) - add := adder.New(dags, params, nil) - - sth := test.NewShardingTestHelper() - defer sth.Clean(t) - mr, closer := sth.GetTreeMultiReader(t) - defer closer.Close() - r := multipart.NewReader(mr, mr.Boundary()) - - rootCid, err := add.FromMultipart(context.Background(), r) - if err != nil { - t.Fatal(err) - } - - if rootCid.String() != test.ShardingDirTrickleRootCID { - t.Fatal("bad root cid") - } - - _, ok := clusterRPC.pins.Load(test.ShardingDirTrickleRootCID) - if !ok { - t.Error("the tree wasn't pinned") - } - }) -} diff --git a/adder/util.go b/adder/util.go index 6ee578cf..592a4ded 100644 --- a/adder/util.go +++ b/adder/util.go @@ -142,6 +142,21 @@ func Pin(ctx context.Context, rpc *rpc.Client, pin *api.Pin) error { ) } +// AddNodeLocal puts the given node to the local peer. +func AddNodeLocal(ctx context.Context, rpc *rpc.Client, node ipld.Node) error { + nodeSerial := ipldNodeToNodeWithMeta(node) + logger.Debugf("block put %s to local peer", nodeSerial.Cid) + + return rpc.CallContext( + ctx, + "", // use ourself to pin + "IPFSConnector", + "BlockPut", + nodeSerial, + &struct{}{}, + ) +} + // ErrDAGNotFound is returned whenever we try to get a block from the DAGService. var ErrDAGNotFound = errors.New("dagservice: block not found") diff --git a/api/add.go b/api/add.go index 79320c29..2953e60b 100644 --- a/api/add.go +++ b/api/add.go @@ -26,6 +26,7 @@ type AddedOutput struct { type AddParams struct { PinOptions + Local bool Recursive bool Layout string Chunker string @@ -43,6 +44,7 @@ type AddParams struct { // DefaultAddParams returns a AddParams object with standard defaults func DefaultAddParams() *AddParams { return &AddParams{ + Local: false, Recursive: false, Layout: "", // corresponds to balanced layout Chunker: "size-262144", @@ -110,7 +112,12 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { params.HashFun = hashF } - err := parseBoolParam(query, "recursive", ¶ms.Recursive) + err := parseBoolParam(query, "local", ¶ms.Local) + if err != nil { + return nil, err + } + + err = parseBoolParam(query, "recursive", ¶ms.Recursive) if err != nil { return nil, err } @@ -180,6 +187,7 @@ func (p *AddParams) ToQueryString() string { query.Set("name", p.Name) query.Set("shard", fmt.Sprintf("%t", p.Shard)) query.Set("shard-size", fmt.Sprintf("%d", p.ShardSize)) + query.Set("local", fmt.Sprintf("%t", p.Local)) query.Set("recursive", fmt.Sprintf("%t", p.Recursive)) query.Set("layout", p.Layout) query.Set("chunker", p.Chunker) @@ -199,6 +207,7 @@ func (p *AddParams) Equals(p2 *AddParams) bool { return p.ReplicationFactorMin == p2.ReplicationFactorMin && p.ReplicationFactorMax == p2.ReplicationFactorMax && p.Name == p2.Name && + p.Local == p2.Local && p.Recursive == p2.Recursive && p.Shard == p2.Shard && p.ShardSize == p2.ShardSize && diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index 7c0ca109..1ee6d6d4 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -334,6 +334,10 @@ cluster "pin add". Usage: "Hash function to use. Implies cid-version=1.", Value: defaultAddParams.HashFun, }, + cli.BoolFlag{ + Name: "local", + Usage: "Puts blocks only in local IPFS peer and then pin as per PinOptions", + }, cli.StringFlag{ Name: "name, n", Value: defaultAddParams.Name, @@ -401,6 +405,7 @@ cluster "pin add". //p.ShardSize = c.Uint64("shard-size") p.Shard = false p.Recursive = c.Bool("recursive") + p.Local = c.Bool("local") p.Layout = c.String("layout") p.Chunker = c.String("chunker") p.RawLeaves = c.Bool("raw-leaves")