2018-08-07 18:01:02 +00:00
|
|
|
// Package local implements a ClusterDAGService that chunks and adds content
|
|
|
|
// to a local peer, before pinning it.
|
|
|
|
package local
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
|
|
|
|
adder "github.com/ipfs/ipfs-cluster/adder"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
|
|
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
|
|
logging "github.com/ipfs/go-log"
|
2018-10-17 13:28:03 +00:00
|
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
2018-08-07 18:01:02 +00:00
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
)
|
|
|
|
|
|
|
|
var errNotFound = errors.New("dagservice: block not found")
|
|
|
|
|
|
|
|
var logger = logging.Logger("localdags")
|
|
|
|
|
|
|
|
// DAGService is an implementation of an adder.ClusterDAGService which
|
|
|
|
// puts the added blocks directly in the peers allocated to them (without
|
|
|
|
// sharding).
|
|
|
|
type DAGService struct {
|
|
|
|
adder.BaseDAGService
|
|
|
|
|
|
|
|
rpcClient *rpc.Client
|
|
|
|
|
|
|
|
dests []peer.ID
|
|
|
|
pinOpts api.PinOptions
|
|
|
|
}
|
|
|
|
|
|
|
|
// New returns a new Adder with the given rpc Client. The client is used
|
|
|
|
// to perform calls to IPFSBlockPut and Pin content on Cluster.
|
|
|
|
func New(rpc *rpc.Client, opts api.PinOptions) *DAGService {
|
|
|
|
return &DAGService{
|
|
|
|
rpcClient: rpc,
|
|
|
|
dests: nil,
|
|
|
|
pinOpts: opts,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add puts the given node in the destination peers.
|
2018-08-09 11:22:47 +00:00
|
|
|
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
|
|
|
|
if dgs.dests == nil {
|
|
|
|
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.pinOpts)
|
2018-08-07 18:01:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-08-09 11:22:47 +00:00
|
|
|
dgs.dests = dests
|
2018-08-07 18:01:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
size, err := node.Size()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
nodeSerial := &api.NodeWithMeta{
|
2019-02-27 17:04:35 +00:00
|
|
|
Cid: node.Cid(),
|
2018-08-07 18:01:02 +00:00
|
|
|
Data: node.RawData(),
|
|
|
|
CumSize: size,
|
|
|
|
}
|
|
|
|
|
2018-08-09 11:22:47 +00:00
|
|
|
return adder.PutBlock(ctx, dgs.rpcClient, nodeSerial, dgs.dests)
|
2018-08-07 18:01:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Finalize pins the last Cid added to this DAGService.
|
2018-09-22 01:00:10 +00:00
|
|
|
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
|
2018-08-07 18:01:02 +00:00
|
|
|
// Cluster pin the result
|
2018-08-10 12:39:34 +00:00
|
|
|
rootPin := api.PinWithOpts(root, dgs.pinOpts)
|
|
|
|
rootPin.Allocations = dgs.dests
|
2018-08-07 18:01:02 +00:00
|
|
|
|
2018-08-09 11:22:47 +00:00
|
|
|
dgs.dests = nil
|
2018-08-07 18:01:02 +00:00
|
|
|
|
2018-08-09 11:22:47 +00:00
|
|
|
return root, dgs.rpcClient.CallContext(
|
2018-08-07 18:01:02 +00:00
|
|
|
ctx,
|
|
|
|
"",
|
|
|
|
"Cluster",
|
|
|
|
"Pin",
|
2019-02-27 17:04:35 +00:00
|
|
|
rootPin,
|
2018-08-07 18:01:02 +00:00
|
|
|
&struct{}{},
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddMany calls Add for every given node.
|
2018-08-09 11:22:47 +00:00
|
|
|
func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error {
|
2018-08-07 18:01:02 +00:00
|
|
|
for _, node := range nodes {
|
2018-08-09 11:22:47 +00:00
|
|
|
err := dgs.Add(ctx, node)
|
2018-08-07 18:01:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|