Add with option --local

This commit introduces `--local` option for `ctl add` which would add
content only the local ipfs peer and then pin it according to pin
options (fetching from the local peer)
For achieving this, a new local dag service is introduced
This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-08-28 18:39:30 +05:30
parent 4febf87d79
commit a684c9567d
6 changed files with 37 additions and 170 deletions

View File

@ -9,6 +9,7 @@ import (
"sync"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/local"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/adder/single"
"github.com/ipfs/ipfs-cluster/api"
@ -35,7 +36,9 @@ func AddMultipartHTTPHandler(
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
if params.Shard {
if params.Local {
dags = local.New(rpc, params.PinOptions)
} else if params.Shard {
dags = sharding.New(rpc, params.PinOptions, output)
} else {
dags = single.New(rpc, params.PinOptions)

View File

@ -1,10 +1,9 @@
// Package local implements a ClusterDAGService that chunks and adds content
// to a local peer, before pinning it.
// to the local peer, before pinning it.
package local
import (
"context"
"errors"
adder "github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
@ -12,12 +11,9 @@ import (
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var errNotFound = errors.New("dagservice: block not found")
var logger = logging.Logger("localdags")
// DAGService is an implementation of an adder.ClusterDAGService which
@ -28,10 +24,7 @@ type DAGService struct {
rpcClient *rpc.Client
dests []peer.ID
pinOpts api.PinOptions
ba *adder.BlockAdder
}
// New returns a new Adder with the given rpc Client. The client is used
@ -39,42 +32,18 @@ type DAGService struct {
func New(rpc *rpc.Client, opts api.PinOptions) *DAGService {
return &DAGService{
rpcClient: rpc,
dests: nil,
pinOpts: opts,
}
}
// Add puts the given node in the destination peers.
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
if dgs.dests == nil {
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts)
if err != nil {
return err
}
dgs.dests = dests
dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dests)
}
return dgs.ba.Add(ctx, node)
return adder.AddNodeLocal(ctx, dgs.rpcClient, node)
}
// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
// Cluster pin the result
rootPin := api.PinWithOpts(root, dgs.pinOpts)
rootPin.Allocations = dgs.dests
dgs.dests = nil
var pinResp api.Pin
return root, dgs.rpcClient.CallContext(
ctx,
"",
"Cluster",
"Pin",
rootPin,
&pinResp,
)
return root, adder.Pin(ctx, dgs.rpcClient, api.PinWithOpts(root, dgs.pinOpts))
}
// AddMany calls Add for every given node.

View File

@ -1,135 +1 @@
package local
import (
"context"
"errors"
"mime/multipart"
"sync"
"testing"
adder "github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
type testIPFSRPC struct {
blocks sync.Map
}
type testClusterRPC struct {
pins sync.Map
}
func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in)
return nil
}
func (rpcs *testClusterRPC) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
rpcs.pins.Store(in.Cid.String(), in)
*out = *in
return nil
}
func (rpcs *testClusterRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
if in.ReplicationFactorMin > 1 {
return errors.New("we can only replicate to 1 peer")
}
// it does not matter since we use host == nil for RPC, so it uses the
// local one in all cases.
*out = []peer.ID{test.PeerID1}
return nil
}
func TestAdd(t *testing.T) {
t.Run("balanced", func(t *testing.T) {
clusterRPC := &testClusterRPC{}
ipfsRPC := &testIPFSRPC{}
server := rpc.NewServer(nil, "mock")
err := server.RegisterName("Cluster", clusterRPC)
if err != nil {
t.Fatal(err)
}
err = server.RegisterName("IPFSConnector", ipfsRPC)
if err != nil {
t.Fatal(err)
}
client := rpc.NewClientWithServer(nil, "mock", server)
params := api.DefaultAddParams()
params.Wrap = true
dags := New(client, params.PinOptions)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
mr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())
rootCid, err := add.FromMultipart(context.Background(), r)
if err != nil {
t.Fatal(err)
}
if rootCid.String() != test.ShardingDirBalancedRootCIDWrapped {
t.Fatal("bad root cid: ", rootCid)
}
expected := test.ShardingDirCids[:]
for _, c := range expected {
_, ok := ipfsRPC.blocks.Load(c)
if !ok {
t.Error("no IPFS.BlockPut for block", c)
}
}
_, ok := clusterRPC.pins.Load(test.ShardingDirBalancedRootCIDWrapped)
if !ok {
t.Error("the tree wasn't pinned")
}
})
t.Run("trickle", func(t *testing.T) {
clusterRPC := &testClusterRPC{}
ipfsRPC := &testIPFSRPC{}
server := rpc.NewServer(nil, "mock")
err := server.RegisterName("Cluster", clusterRPC)
if err != nil {
t.Fatal(err)
}
err = server.RegisterName("IPFSConnector", ipfsRPC)
if err != nil {
t.Fatal(err)
}
client := rpc.NewClientWithServer(nil, "mock", server)
params := api.DefaultAddParams()
params.Layout = "trickle"
dags := New(client, params.PinOptions)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
mr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())
rootCid, err := add.FromMultipart(context.Background(), r)
if err != nil {
t.Fatal(err)
}
if rootCid.String() != test.ShardingDirTrickleRootCID {
t.Fatal("bad root cid")
}
_, ok := clusterRPC.pins.Load(test.ShardingDirTrickleRootCID)
if !ok {
t.Error("the tree wasn't pinned")
}
})
}

View File

@ -142,6 +142,21 @@ func Pin(ctx context.Context, rpc *rpc.Client, pin *api.Pin) error {
)
}
// AddNodeLocal puts the given node to the local peer.
func AddNodeLocal(ctx context.Context, rpc *rpc.Client, node ipld.Node) error {
nodeSerial := ipldNodeToNodeWithMeta(node)
logger.Debugf("block put %s to local peer", nodeSerial.Cid)
return rpc.CallContext(
ctx,
"", // use ourself to pin
"IPFSConnector",
"BlockPut",
nodeSerial,
&struct{}{},
)
}
// ErrDAGNotFound is returned whenever we try to get a block from the DAGService.
var ErrDAGNotFound = errors.New("dagservice: block not found")

View File

@ -26,6 +26,7 @@ type AddedOutput struct {
type AddParams struct {
PinOptions
Local bool
Recursive bool
Layout string
Chunker string
@ -43,6 +44,7 @@ type AddParams struct {
// DefaultAddParams returns a AddParams object with standard defaults
func DefaultAddParams() *AddParams {
return &AddParams{
Local: false,
Recursive: false,
Layout: "", // corresponds to balanced layout
Chunker: "size-262144",
@ -110,7 +112,12 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
params.HashFun = hashF
}
err := parseBoolParam(query, "recursive", &params.Recursive)
err := parseBoolParam(query, "local", &params.Local)
if err != nil {
return nil, err
}
err = parseBoolParam(query, "recursive", &params.Recursive)
if err != nil {
return nil, err
}
@ -180,6 +187,7 @@ func (p *AddParams) ToQueryString() string {
query.Set("name", p.Name)
query.Set("shard", fmt.Sprintf("%t", p.Shard))
query.Set("shard-size", fmt.Sprintf("%d", p.ShardSize))
query.Set("local", fmt.Sprintf("%t", p.Local))
query.Set("recursive", fmt.Sprintf("%t", p.Recursive))
query.Set("layout", p.Layout)
query.Set("chunker", p.Chunker)
@ -199,6 +207,7 @@ func (p *AddParams) Equals(p2 *AddParams) bool {
return p.ReplicationFactorMin == p2.ReplicationFactorMin &&
p.ReplicationFactorMax == p2.ReplicationFactorMax &&
p.Name == p2.Name &&
p.Local == p2.Local &&
p.Recursive == p2.Recursive &&
p.Shard == p2.Shard &&
p.ShardSize == p2.ShardSize &&

View File

@ -334,6 +334,10 @@ cluster "pin add".
Usage: "Hash function to use. Implies cid-version=1.",
Value: defaultAddParams.HashFun,
},
cli.BoolFlag{
Name: "local",
Usage: "Puts blocks only in local IPFS peer and then pin as per PinOptions",
},
cli.StringFlag{
Name: "name, n",
Value: defaultAddParams.Name,
@ -401,6 +405,7 @@ cluster "pin add".
//p.ShardSize = c.Uint64("shard-size")
p.Shard = false
p.Recursive = c.Bool("recursive")
p.Local = c.Bool("local")
p.Layout = c.String("layout")
p.Chunker = c.String("chunker")
p.RawLeaves = c.Bool("raw-leaves")