No need to use separate dag service for local add
This commit is contained in:
parent
a684c9567d
commit
c9e6cd25cd
|
@ -9,7 +9,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ipfs/ipfs-cluster/adder"
|
"github.com/ipfs/ipfs-cluster/adder"
|
||||||
"github.com/ipfs/ipfs-cluster/adder/local"
|
|
||||||
"github.com/ipfs/ipfs-cluster/adder/sharding"
|
"github.com/ipfs/ipfs-cluster/adder/sharding"
|
||||||
"github.com/ipfs/ipfs-cluster/adder/single"
|
"github.com/ipfs/ipfs-cluster/adder/single"
|
||||||
"github.com/ipfs/ipfs-cluster/api"
|
"github.com/ipfs/ipfs-cluster/api"
|
||||||
|
@ -36,12 +35,10 @@ func AddMultipartHTTPHandler(
|
||||||
var dags adder.ClusterDAGService
|
var dags adder.ClusterDAGService
|
||||||
output := make(chan *api.AddedOutput, 200)
|
output := make(chan *api.AddedOutput, 200)
|
||||||
|
|
||||||
if params.Local {
|
if params.Shard {
|
||||||
dags = local.New(rpc, params.PinOptions)
|
|
||||||
} else if params.Shard {
|
|
||||||
dags = sharding.New(rpc, params.PinOptions, output)
|
dags = sharding.New(rpc, params.PinOptions, output)
|
||||||
} else {
|
} else {
|
||||||
dags = single.New(rpc, params.PinOptions)
|
dags = single.New(rpc, params.PinOptions, params.Local)
|
||||||
}
|
}
|
||||||
|
|
||||||
if outputTransform == nil {
|
if outputTransform == nil {
|
||||||
|
|
|
@ -1,58 +0,0 @@
|
||||||
// Package local implements a ClusterDAGService that chunks and adds content
|
|
||||||
// to the local peer, before pinning it.
|
|
||||||
package local
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
adder "github.com/ipfs/ipfs-cluster/adder"
|
|
||||||
"github.com/ipfs/ipfs-cluster/api"
|
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
|
||||||
ipld "github.com/ipfs/go-ipld-format"
|
|
||||||
logging "github.com/ipfs/go-log"
|
|
||||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
var logger = logging.Logger("localdags")
|
|
||||||
|
|
||||||
// DAGService is an implementation of an adder.ClusterDAGService which
|
|
||||||
// puts the added blocks directly in the peers allocated to them (without
|
|
||||||
// sharding).
|
|
||||||
type DAGService struct {
|
|
||||||
adder.BaseDAGService
|
|
||||||
|
|
||||||
rpcClient *rpc.Client
|
|
||||||
|
|
||||||
pinOpts api.PinOptions
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a new Adder with the given rpc Client. The client is used
|
|
||||||
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
|
|
||||||
func New(rpc *rpc.Client, opts api.PinOptions) *DAGService {
|
|
||||||
return &DAGService{
|
|
||||||
rpcClient: rpc,
|
|
||||||
pinOpts: opts,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add puts the given node in the destination peers.
|
|
||||||
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
|
|
||||||
return adder.AddNodeLocal(ctx, dgs.rpcClient, node)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Finalize pins the last Cid added to this DAGService.
|
|
||||||
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
|
|
||||||
return root, adder.Pin(ctx, dgs.rpcClient, api.PinWithOpts(root, dgs.pinOpts))
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddMany calls Add for every given node.
|
|
||||||
func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error {
|
|
||||||
for _, node := range nodes {
|
|
||||||
err := dgs.Add(ctx, node)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1 +0,0 @@
|
||||||
package local
|
|
|
@ -30,22 +30,28 @@ type DAGService struct {
|
||||||
|
|
||||||
dests []peer.ID
|
dests []peer.ID
|
||||||
pinOpts api.PinOptions
|
pinOpts api.PinOptions
|
||||||
|
local bool
|
||||||
|
|
||||||
ba *adder.BlockAdder
|
ba *adder.BlockAdder
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new Adder with the given rpc Client. The client is used
|
// New returns a new Adder with the given rpc Client. The client is used
|
||||||
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
|
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
|
||||||
func New(rpc *rpc.Client, opts api.PinOptions) *DAGService {
|
func New(rpc *rpc.Client, opts api.PinOptions, local bool) *DAGService {
|
||||||
return &DAGService{
|
return &DAGService{
|
||||||
rpcClient: rpc,
|
rpcClient: rpc,
|
||||||
dests: nil,
|
dests: nil,
|
||||||
pinOpts: opts,
|
pinOpts: opts,
|
||||||
|
local: local,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add puts the given node in the destination peers.
|
// Add puts the given node in the destination peers.
|
||||||
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
|
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
|
||||||
|
if dgs.local {
|
||||||
|
return adder.AddNodeLocal(ctx, dgs.rpcClient, node)
|
||||||
|
}
|
||||||
|
|
||||||
if dgs.dests == nil {
|
if dgs.dests == nil {
|
||||||
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts)
|
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -62,8 +68,11 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
|
||||||
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
|
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
|
||||||
// Cluster pin the result
|
// Cluster pin the result
|
||||||
rootPin := api.PinWithOpts(root, dgs.pinOpts)
|
rootPin := api.PinWithOpts(root, dgs.pinOpts)
|
||||||
rootPin.Allocations = dgs.dests
|
if dgs.local {
|
||||||
|
return root, adder.Pin(ctx, dgs.rpcClient, rootPin)
|
||||||
|
}
|
||||||
|
|
||||||
|
rootPin.Allocations = dgs.dests
|
||||||
dgs.dests = nil
|
dgs.dests = nil
|
||||||
|
|
||||||
return root, adder.Pin(ctx, dgs.rpcClient, rootPin)
|
return root, adder.Pin(ctx, dgs.rpcClient, rootPin)
|
||||||
|
|
|
@ -61,7 +61,7 @@ func TestAdd(t *testing.T) {
|
||||||
params := api.DefaultAddParams()
|
params := api.DefaultAddParams()
|
||||||
params.Wrap = true
|
params.Wrap = true
|
||||||
|
|
||||||
dags := New(client, params.PinOptions)
|
dags := New(client, params.PinOptions, false)
|
||||||
add := adder.New(dags, params, nil)
|
add := adder.New(dags, params, nil)
|
||||||
|
|
||||||
sth := test.NewShardingTestHelper()
|
sth := test.NewShardingTestHelper()
|
||||||
|
@ -109,7 +109,7 @@ func TestAdd(t *testing.T) {
|
||||||
params := api.DefaultAddParams()
|
params := api.DefaultAddParams()
|
||||||
params.Layout = "trickle"
|
params.Layout = "trickle"
|
||||||
|
|
||||||
dags := New(client, params.PinOptions)
|
dags := New(client, params.PinOptions, false)
|
||||||
add := adder.New(dags, params, nil)
|
add := adder.New(dags, params, nil)
|
||||||
|
|
||||||
sth := test.NewShardingTestHelper()
|
sth := test.NewShardingTestHelper()
|
||||||
|
|
|
@ -1484,7 +1484,7 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.
|
||||||
if params.Shard {
|
if params.Shard {
|
||||||
dags = sharding.New(c.rpcClient, params.PinOptions, nil)
|
dags = sharding.New(c.rpcClient, params.PinOptions, nil)
|
||||||
} else {
|
} else {
|
||||||
dags = single.New(c.rpcClient, params.PinOptions)
|
dags = single.New(c.rpcClient, params.PinOptions, false)
|
||||||
}
|
}
|
||||||
add := adder.New(dags, params, nil)
|
add := adder.New(dags, params, nil)
|
||||||
return add.FromMultipart(c.ctx, reader)
|
return add.FromMultipart(c.ctx, reader)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user