diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index 353e2de5..e6bdb524 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -9,8 +9,8 @@ import ( "sync" "github.com/ipfs/ipfs-cluster/adder" - "github.com/ipfs/ipfs-cluster/adder/local" "github.com/ipfs/ipfs-cluster/adder/sharding" + "github.com/ipfs/ipfs-cluster/adder/single" "github.com/ipfs/ipfs-cluster/api" cid "github.com/ipfs/go-cid" @@ -38,7 +38,7 @@ func AddMultipartHTTPHandler( if params.Shard { dags = sharding.New(rpc, params.PinOptions, output) } else { - dags = local.New(rpc, params.PinOptions) + dags = single.New(rpc, params.PinOptions) } if outputTransform == nil { diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go new file mode 100644 index 00000000..6dc29d22 --- /dev/null +++ b/adder/single/dag_service.go @@ -0,0 +1,81 @@ +// Package single implements a ClusterDAGService that chunks and adds content +// to cluster without sharding, before pinning it. +package single + +import ( + "context" + "errors" + + adder "github.com/ipfs/ipfs-cluster/adder" + "github.com/ipfs/ipfs-cluster/api" + + cid "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log" + peer "github.com/libp2p/go-libp2p-core/peer" + rpc "github.com/libp2p/go-libp2p-gorpc" +) + +var errNotFound = errors.New("dagservice: block not found") + +var logger = logging.Logger("singledags") + +// DAGService is an implementation of an adder.ClusterDAGService which +// puts the added blocks directly in the peers allocated to them (without +// sharding). +type DAGService struct { + adder.BaseDAGService + + rpcClient *rpc.Client + + dests []peer.ID + pinOpts api.PinOptions + + ba *adder.BlockAdder +} + +// New returns a new Adder with the given rpc Client. The client is used +// to perform calls to IPFS.BlockPut and Pin content on Cluster. +func New(rpc *rpc.Client, opts api.PinOptions) *DAGService { + return &DAGService{ + rpcClient: rpc, + dests: nil, + pinOpts: opts, + } +} + +// Add puts the given node in the destination peers. +func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { + if dgs.dests == nil { + dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts) + if err != nil { + return err + } + dgs.dests = dests + dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dests) + } + + return dgs.ba.Add(ctx, node) +} + +// Finalize pins the last Cid added to this DAGService. +func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { + // Cluster pin the result + rootPin := api.PinWithOpts(root, dgs.pinOpts) + rootPin.Allocations = dgs.dests + + dgs.dests = nil + + return root, adder.Pin(ctx, dgs.rpcClient, rootPin) +} + +// AddMany calls Add for every given node. +func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { + for _, node := range nodes { + err := dgs.Add(ctx, node) + if err != nil { + return err + } + } + return nil +} diff --git a/adder/single/dag_service_test.go b/adder/single/dag_service_test.go new file mode 100644 index 00000000..cc9e55c3 --- /dev/null +++ b/adder/single/dag_service_test.go @@ -0,0 +1,135 @@ +package single + +import ( + "context" + "errors" + "mime/multipart" + "sync" + "testing" + + adder "github.com/ipfs/ipfs-cluster/adder" + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/test" + + peer "github.com/libp2p/go-libp2p-core/peer" + rpc "github.com/libp2p/go-libp2p-gorpc" +) + +type testIPFSRPC struct { + blocks sync.Map +} + +type testClusterRPC struct { + pins sync.Map +} + +func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error { + rpcs.blocks.Store(in.Cid.String(), in) + return nil +} + +func (rpcs *testClusterRPC) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error { + rpcs.pins.Store(in.Cid.String(), in) + *out = *in + return nil +} + +func (rpcs *testClusterRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error { + if in.ReplicationFactorMin > 1 { + return errors.New("we can only replicate to 1 peer") + } + // it does not matter since we use host == nil for RPC, so it uses the + // local one in all cases. + *out = []peer.ID{test.PeerID1} + return nil +} + +func TestAdd(t *testing.T) { + t.Run("balanced", func(t *testing.T) { + clusterRPC := &testClusterRPC{} + ipfsRPC := &testIPFSRPC{} + server := rpc.NewServer(nil, "mock") + err := server.RegisterName("Cluster", clusterRPC) + if err != nil { + t.Fatal(err) + } + err = server.RegisterName("IPFSConnector", ipfsRPC) + if err != nil { + t.Fatal(err) + } + client := rpc.NewClientWithServer(nil, "mock", server) + params := api.DefaultAddParams() + params.Wrap = true + + dags := New(client, params.PinOptions) + add := adder.New(dags, params, nil) + + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + mr, closer := sth.GetTreeMultiReader(t) + defer closer.Close() + r := multipart.NewReader(mr, mr.Boundary()) + + rootCid, err := add.FromMultipart(context.Background(), r) + if err != nil { + t.Fatal(err) + } + + if rootCid.String() != test.ShardingDirBalancedRootCIDWrapped { + t.Fatal("bad root cid: ", rootCid) + } + + expected := test.ShardingDirCids[:] + for _, c := range expected { + _, ok := ipfsRPC.blocks.Load(c) + if !ok { + t.Error("no IPFS.BlockPut for block", c) + } + } + + _, ok := clusterRPC.pins.Load(test.ShardingDirBalancedRootCIDWrapped) + if !ok { + t.Error("the tree wasn't pinned") + } + }) + + t.Run("trickle", func(t *testing.T) { + clusterRPC := &testClusterRPC{} + ipfsRPC := &testIPFSRPC{} + server := rpc.NewServer(nil, "mock") + err := server.RegisterName("Cluster", clusterRPC) + if err != nil { + t.Fatal(err) + } + err = server.RegisterName("IPFSConnector", ipfsRPC) + if err != nil { + t.Fatal(err) + } + client := rpc.NewClientWithServer(nil, "mock", server) + params := api.DefaultAddParams() + params.Layout = "trickle" + + dags := New(client, params.PinOptions) + add := adder.New(dags, params, nil) + + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + mr, closer := sth.GetTreeMultiReader(t) + defer closer.Close() + r := multipart.NewReader(mr, mr.Boundary()) + + rootCid, err := add.FromMultipart(context.Background(), r) + if err != nil { + t.Fatal(err) + } + + if rootCid.String() != test.ShardingDirTrickleRootCID { + t.Fatal("bad root cid") + } + + _, ok := clusterRPC.pins.Load(test.ShardingDirTrickleRootCID) + if !ok { + t.Error("the tree wasn't pinned") + } + }) +} diff --git a/cluster.go b/cluster.go index d16218b0..9f96a3b1 100644 --- a/cluster.go +++ b/cluster.go @@ -10,8 +10,8 @@ import ( "time" "github.com/ipfs/ipfs-cluster/adder" - "github.com/ipfs/ipfs-cluster/adder/local" "github.com/ipfs/ipfs-cluster/adder/sharding" + "github.com/ipfs/ipfs-cluster/adder/single" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/rpcutil" @@ -1484,7 +1484,7 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid. if params.Shard { dags = sharding.New(c.rpcClient, params.PinOptions, nil) } else { - dags = local.New(c.rpcClient, params.PinOptions) + dags = single.New(c.rpcClient, params.PinOptions) } add := adder.New(dags, params, nil) return add.FromMultipart(c.ctx, reader)