Rename local dagservice to single dagservice

Local dagservice is not really a local as it add to other peers as well.
It is a dagservice that does not perform sharding. Since we are going to
have a local dagservice(one that adds only to the local peer), renaming
this `single` dagservice
This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-08-28 15:20:55 +05:30
parent dc2e73dfc2
commit 4febf87d79
4 changed files with 220 additions and 4 deletions

View File

@ -9,8 +9,8 @@ import (
"sync"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/local"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/adder/single"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
@ -38,7 +38,7 @@ func AddMultipartHTTPHandler(
if params.Shard {
dags = sharding.New(rpc, params.PinOptions, output)
} else {
dags = local.New(rpc, params.PinOptions)
dags = single.New(rpc, params.PinOptions)
}
if outputTransform == nil {

View File

@ -0,0 +1,81 @@
// Package single implements a ClusterDAGService that chunks and adds content
// to cluster without sharding, before pinning it.
package single
import (
"context"
"errors"
adder "github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var errNotFound = errors.New("dagservice: block not found")
var logger = logging.Logger("singledags")
// DAGService is an implementation of an adder.ClusterDAGService which
// puts the added blocks directly in the peers allocated to them (without
// sharding).
type DAGService struct {
adder.BaseDAGService
rpcClient *rpc.Client
dests []peer.ID
pinOpts api.PinOptions
ba *adder.BlockAdder
}
// New returns a new Adder with the given rpc Client. The client is used
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
func New(rpc *rpc.Client, opts api.PinOptions) *DAGService {
return &DAGService{
rpcClient: rpc,
dests: nil,
pinOpts: opts,
}
}
// Add puts the given node in the destination peers.
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
if dgs.dests == nil {
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts)
if err != nil {
return err
}
dgs.dests = dests
dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dests)
}
return dgs.ba.Add(ctx, node)
}
// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
// Cluster pin the result
rootPin := api.PinWithOpts(root, dgs.pinOpts)
rootPin.Allocations = dgs.dests
dgs.dests = nil
return root, adder.Pin(ctx, dgs.rpcClient, rootPin)
}
// AddMany calls Add for every given node.
func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error {
for _, node := range nodes {
err := dgs.Add(ctx, node)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,135 @@
package single
import (
"context"
"errors"
"mime/multipart"
"sync"
"testing"
adder "github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
type testIPFSRPC struct {
blocks sync.Map
}
type testClusterRPC struct {
pins sync.Map
}
func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in)
return nil
}
func (rpcs *testClusterRPC) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
rpcs.pins.Store(in.Cid.String(), in)
*out = *in
return nil
}
func (rpcs *testClusterRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
if in.ReplicationFactorMin > 1 {
return errors.New("we can only replicate to 1 peer")
}
// it does not matter since we use host == nil for RPC, so it uses the
// local one in all cases.
*out = []peer.ID{test.PeerID1}
return nil
}
func TestAdd(t *testing.T) {
t.Run("balanced", func(t *testing.T) {
clusterRPC := &testClusterRPC{}
ipfsRPC := &testIPFSRPC{}
server := rpc.NewServer(nil, "mock")
err := server.RegisterName("Cluster", clusterRPC)
if err != nil {
t.Fatal(err)
}
err = server.RegisterName("IPFSConnector", ipfsRPC)
if err != nil {
t.Fatal(err)
}
client := rpc.NewClientWithServer(nil, "mock", server)
params := api.DefaultAddParams()
params.Wrap = true
dags := New(client, params.PinOptions)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
mr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())
rootCid, err := add.FromMultipart(context.Background(), r)
if err != nil {
t.Fatal(err)
}
if rootCid.String() != test.ShardingDirBalancedRootCIDWrapped {
t.Fatal("bad root cid: ", rootCid)
}
expected := test.ShardingDirCids[:]
for _, c := range expected {
_, ok := ipfsRPC.blocks.Load(c)
if !ok {
t.Error("no IPFS.BlockPut for block", c)
}
}
_, ok := clusterRPC.pins.Load(test.ShardingDirBalancedRootCIDWrapped)
if !ok {
t.Error("the tree wasn't pinned")
}
})
t.Run("trickle", func(t *testing.T) {
clusterRPC := &testClusterRPC{}
ipfsRPC := &testIPFSRPC{}
server := rpc.NewServer(nil, "mock")
err := server.RegisterName("Cluster", clusterRPC)
if err != nil {
t.Fatal(err)
}
err = server.RegisterName("IPFSConnector", ipfsRPC)
if err != nil {
t.Fatal(err)
}
client := rpc.NewClientWithServer(nil, "mock", server)
params := api.DefaultAddParams()
params.Layout = "trickle"
dags := New(client, params.PinOptions)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
mr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())
rootCid, err := add.FromMultipart(context.Background(), r)
if err != nil {
t.Fatal(err)
}
if rootCid.String() != test.ShardingDirTrickleRootCID {
t.Fatal("bad root cid")
}
_, ok := clusterRPC.pins.Load(test.ShardingDirTrickleRootCID)
if !ok {
t.Error("the tree wasn't pinned")
}
})
}

View File

@ -10,8 +10,8 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/local"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/adder/single"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/rpcutil"
@ -1484,7 +1484,7 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.
if params.Shard {
dags = sharding.New(c.rpcClient, params.PinOptions, nil)
} else {
dags = local.New(c.rpcClient, params.PinOptions)
dags = single.New(c.rpcClient, params.PinOptions)
}
add := adder.New(dags, params, nil)
return add.FromMultipart(c.ctx, reader)