Adders: stream blocks to destinations

This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.

Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).

Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.

Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.

Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.

Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.

Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
This commit is contained in:
Hector Sanjuan 2022-03-24 02:17:10 +01:00
parent 400869efc0
commit 1d98538411
22 changed files with 511 additions and 242 deletions

View File

@ -32,7 +32,7 @@ func TestAdd(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[0].AddFile(r, params)
ci, err := clusters[0].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -67,7 +67,7 @@ func TestAdd(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[2].AddFile(r, params)
ci, err := clusters[2].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -119,7 +119,7 @@ func TestAddWithUserAllocations(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[0].AddFile(r, params)
ci, err := clusters[0].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -167,7 +167,7 @@ func TestAddPeerDown(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[1].AddFile(r, params)
ci, err := clusters[1].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -218,7 +218,7 @@ func TestAddOnePeerFails(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := clusters[0].AddFile(r, params)
_, err := clusters[0].AddFile(context.Background(), r, params)
if err != nil {
t.Error(err)
}
@ -276,7 +276,7 @@ func TestAddAllPeersFail(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := clusters[0].AddFile(r, params)
_, err := clusters[0].AddFile(context.Background(), r, params)
if err != adder.ErrBlockAdder {
t.Error("expected ErrBlockAdder. Got: ", err)
}

View File

@ -68,18 +68,18 @@ type Adder struct {
// whenever a block is processed. They contain information
// about the block, the CID, the Name etc. and are mostly
// meant to be streamed back to the user.
output chan *api.AddedOutput
output chan api.AddedOutput
}
// New returns a new Adder with the given ClusterDAGService, add options and a
// channel to send updates during the adding process.
//
// An Adder may only be used once.
func New(ds ClusterDAGService, p api.AddParams, out chan *api.AddedOutput) *Adder {
func New(ds ClusterDAGService, p api.AddParams, out chan api.AddedOutput) *Adder {
// Discard all progress update output as the caller has not provided
// a channel for them to listen on.
if out == nil {
out = make(chan *api.AddedOutput, 100)
out = make(chan api.AddedOutput, 100)
go func() {
for range out {
}
@ -188,7 +188,7 @@ type ipfsAdder struct {
*ipfsadd.Adder
}
func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) {
func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*ipfsAdder, error) {
iadder, err := ipfsadd.NewAdder(ctx, dgs, dgs.Allocations)
if err != nil {
logger.Error(err)
@ -253,10 +253,10 @@ type carAdder struct {
ctx context.Context
dgs ClusterDAGService
params api.AddParams
output chan *api.AddedOutput
output chan api.AddedOutput
}
func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*carAdder, error) {
func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*carAdder, error) {
return &carAdder{
ctx: ctx,
dgs: dgs,
@ -319,7 +319,7 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) {
}
}
ca.output <- &api.AddedOutput{
ca.output <- api.AddedOutput{
Name: name,
Cid: root,
Bytes: bytes,

View File

@ -30,19 +30,19 @@ func AddMultipartHTTPHandler(
params api.AddParams,
reader *multipart.Reader,
w http.ResponseWriter,
outputTransform func(*api.AddedOutput) interface{},
outputTransform func(api.AddedOutput) interface{},
) (cid.Cid, error) {
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
output := make(chan api.AddedOutput, 200)
if params.Shard {
dags = sharding.New(rpc, params, output)
dags = sharding.New(ctx, rpc, params, output)
} else {
dags = single.New(rpc, params, params.Local)
dags = single.New(ctx, rpc, params, params.Local)
}
if outputTransform == nil {
outputTransform = func(in *api.AddedOutput) interface{} { return in }
outputTransform = func(in api.AddedOutput) interface{} { return in }
}
// This must be application/json otherwise go-ipfs client
@ -112,7 +112,7 @@ func AddMultipartHTTPHandler(
return root, err
}
func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) {
func streamOutput(w http.ResponseWriter, output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) {
flusher, flush := w.(http.Flusher)
enc := json.NewEncoder(w)
for v := range output {
@ -127,7 +127,7 @@ func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform
}
}
func buildOutput(output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) []interface{} {
func buildOutput(output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) []interface{} {
var finalOutput []interface{}
for v := range output {
finalOutput = append(finalOutput, transform(v))

View File

@ -51,7 +51,7 @@ type Adder struct {
ctx context.Context
dagService ipld.DAGService
allocsFun func() []peer.ID
Out chan *api.AddedOutput
Out chan api.AddedOutput
Progress bool
Trickle bool
RawLeaves bool
@ -425,9 +425,9 @@ func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) erro
}
// outputDagnode sends dagnode info over the output channel.
// Cluster: we use *api.AddedOutput instead of coreiface events
// Cluster: we use api.AddedOutput instead of coreiface events
// and make this an adder method to be be able to prefix.
func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error {
func (adder *Adder) outputDagnode(out chan api.AddedOutput, name string, dn ipld.Node) error {
if out == nil {
return nil
}
@ -445,7 +445,7 @@ func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipl
// account for this here.
name = filepath.Join(adder.OutputPrefix, name)
out <- &api.AddedOutput{
out <- api.AddedOutput{
Cid: dn.Cid(),
Name: name,
Size: s,
@ -458,7 +458,7 @@ func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipl
type progressReader struct {
file io.Reader
path string
out chan *api.AddedOutput
out chan api.AddedOutput
bytes int64
lastProgress int64
}
@ -469,7 +469,7 @@ func (i *progressReader) Read(p []byte) (int, error) {
i.bytes += int64(n)
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes
i.out <- &api.AddedOutput{
i.out <- api.AddedOutput{
Name: i.path,
Bytes: uint64(i.bytes),
}

View File

@ -30,10 +30,11 @@ var logger = logging.Logger("shardingdags")
type DAGService struct {
adder.BaseDAGService
ctx context.Context
rpcClient *rpc.Client
addParams api.AddParams
output chan<- *api.AddedOutput
output chan<- api.AddedOutput
addedSet *cid.Set
@ -50,11 +51,12 @@ type DAGService struct {
}
// New returns a new ClusterDAGService, which uses the given rpc client to perform
// Allocate, IPFSBlockPut and Pin requests to other cluster components.
func New(rpc *rpc.Client, opts api.AddParams, out chan<- *api.AddedOutput) *DAGService {
// Allocate, IPFSStream and Pin requests to other cluster components.
func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, out chan<- api.AddedOutput) *DAGService {
// use a default value for this regardless of what is provided.
opts.Mode = api.PinModeRecursive
return &DAGService{
ctx: ctx,
rpcClient: rpc,
addParams: opts,
output: out,
@ -93,14 +95,34 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
}
// PutDAG to ourselves
err = adder.NewBlockAdder(dgs.rpcClient, []peer.ID{""}).AddMany(ctx, clusterDAGNodes)
if err != nil {
blocks := make(chan api.NodeWithMeta, 256)
go func() {
defer close(blocks)
for _, n := range clusterDAGNodes {
select {
case <-ctx.Done():
logger.Error(ctx.Err())
return //abort
case blocks <- adder.IpldNodeToNodeWithMeta(n):
}
}
}()
// Stream these blocks and wait until we are done.
bs := adder.NewBlockStreamer(ctx, dgs.rpcClient, []peer.ID{""}, blocks)
select {
case <-ctx.Done():
return dataRoot, ctx.Err()
case <-bs.Done():
}
if err := bs.Err(); err != nil {
return dataRoot, err
}
clusterDAG := clusterDAGNodes[0].Cid()
dgs.sendOutput(&api.AddedOutput{
dgs.sendOutput(api.AddedOutput{
Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name),
Cid: clusterDAG,
Size: dgs.totalSize,
@ -174,7 +196,8 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error {
if shard == nil {
logger.Infof("new shard for '%s': #%d", dgs.addParams.Name, len(dgs.shards))
var err error
shard, err = newShard(ctx, dgs.rpcClient, dgs.addParams.PinOptions)
// important: shards use the DAGService context.
shard, err = newShard(dgs.ctx, ctx, dgs.rpcClient, dgs.addParams.PinOptions)
if err != nil {
return err
}
@ -189,7 +212,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error {
// add the block to it if it fits and return
if shard.Size()+size < shard.Limit() {
shard.AddLink(ctx, n.Cid(), size)
return dgs.currentShard.ba.Add(ctx, n)
return dgs.currentShard.sendBlock(ctx, n)
}
logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d",
@ -246,7 +269,7 @@ Ingest Rate: %s/s
}
func (dgs *DAGService) sendOutput(ao *api.AddedOutput) {
func (dgs *DAGService) sendOutput(ao api.AddedOutput) {
if dgs.output != nil {
dgs.output <- ao
}
@ -269,7 +292,7 @@ func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) {
dgs.shards[fmt.Sprintf("%d", lens)] = shardCid
dgs.previousShard = shardCid
dgs.currentShard = nil
dgs.sendOutput(&api.AddedOutput{
dgs.sendOutput(api.AddedOutput{
Name: fmt.Sprintf("shard-%d", lens),
Cid: shardCid,
Size: shard.Size(),

View File

@ -27,8 +27,11 @@ type testRPC struct {
pins sync.Map
}
func (rpcs *testRPC) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in.Data)
func (rpcs *testRPC) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
defer close(out)
for n := range in {
rpcs.blocks.Store(n.Cid.String(), n.Data)
}
return nil
}
@ -77,9 +80,9 @@ func makeAdder(t *testing.T, params api.AddParams) (*adder.Adder, *testRPC) {
}
client := rpc.NewClientWithServer(nil, "mock", server)
out := make(chan *api.AddedOutput, 1)
out := make(chan api.AddedOutput, 1)
dags := New(client, params, out)
dags := New(context.Background(), client, params, out)
add := adder.New(dags, params, out)
go func() {

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
@ -18,10 +19,12 @@ import (
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
ba *adder.BlockAdder
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
@ -29,7 +32,7 @@ type shard struct {
sizeLimit uint64
}
func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) {
func newShard(globalCtx context.Context, ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) {
allocs, err := adder.BlockAllocate(ctx, rpc, opts)
if err != nil {
return nil, err
@ -47,11 +50,15 @@ func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard
// TODO (hector): get latest metrics for allocations, adjust sizeLimit
// to minimum. This can be done later.
blocks := make(chan api.NodeWithMeta, 256)
return &shard{
ctx: globalCtx,
rpc: rpc,
allocations: allocs,
pinOptions: opts,
ba: adder.NewBlockAdder(rpc, allocs),
bs: adder.NewBlockStreamer(globalCtx, rpc, allocs, blocks),
blocks: blocks,
dagNode: make(map[string]cid.Cid),
currentSize: 0,
sizeLimit: opts.ShardSize,
@ -77,6 +84,15 @@ func (sh *shard) Allocations() []peer.ID {
return sh.allocations
}
func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
select {
case <-ctx.Done():
return ctx.Err()
case sh.blocks <- adder.IpldNodeToNodeWithMeta(n):
return nil
}
}
// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
@ -87,8 +103,21 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
return cid.Undef, err
}
err = sh.ba.AddMany(ctx, nodes)
if err != nil {
for _, n := range nodes {
err = sh.sendBlock(ctx, n)
if err != nil {
close(sh.blocks)
return cid.Undef, err
}
}
close(sh.blocks)
select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
case <-sh.bs.Done():
}
if err := sh.bs.Err(); err != nil {
return cid.Undef, err
}

View File

@ -24,30 +24,38 @@ var _ = logger // otherwise unused
type DAGService struct {
adder.BaseDAGService
ctx context.Context
rpcClient *rpc.Client
dests []peer.ID
addParams api.AddParams
local bool
ba *adder.BlockAdder
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
}
// New returns a new Adder with the given rpc Client. The client is used
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
func New(rpc *rpc.Client, opts api.AddParams, local bool) *DAGService {
// to perform calls to IPFS.BlockStream and Pin content on Cluster.
func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, local bool) *DAGService {
// ensure don't Add something and pin it in direct mode.
opts.Mode = api.PinModeRecursive
return &DAGService{
ctx: ctx,
rpcClient: rpc,
dests: nil,
addParams: opts,
local: local,
blocks: make(chan api.NodeWithMeta, 256),
}
}
// Add puts the given node in the destination peers.
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
// FIXME: can't this happen on initialization? Perhaps the point here
// is the adder only allocates and starts streaming when the first
// block arrives and not on creation.
if dgs.dests == nil {
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.addParams.PinOptions)
if err != nil {
@ -76,17 +84,39 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
dgs.dests[len(dgs.dests)-1] = localPid
}
dgs.ba = adder.NewBlockAdder(dgs.rpcClient, []peer.ID{localPid})
dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, []peer.ID{localPid}, dgs.blocks)
} else {
dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dgs.dests)
dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, dgs.dests, dgs.blocks)
}
}
return dgs.ba.Add(ctx, node)
select {
case <-ctx.Done():
return ctx.Err()
case <-dgs.ctx.Done():
return ctx.Err()
case dgs.blocks <- adder.IpldNodeToNodeWithMeta(node):
return nil
}
}
// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
close(dgs.blocks)
select {
case <-dgs.ctx.Done():
return root, ctx.Err()
case <-ctx.Done():
return root, ctx.Err()
case <-dgs.bs.Done():
}
// If the streamer failed to put blocks.
if err := dgs.bs.Err(); err != nil {
return root, err
}
// Do not pin, just block put.
// Why? Because some people are uploading CAR files with partial DAGs
// and ideally they should be pinning only when the last partial CAR

View File

@ -23,8 +23,11 @@ type testClusterRPC struct {
pins sync.Map
}
func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in)
func (rpcs *testIPFSRPC) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
defer close(out)
for n := range in {
rpcs.blocks.Store(n.Cid.String(), n)
}
return nil
}
@ -61,7 +64,7 @@ func TestAdd(t *testing.T) {
params := api.DefaultAddParams()
params.Wrap = true
dags := New(client, params, false)
dags := New(context.Background(), client, params, false)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()
@ -83,7 +86,7 @@ func TestAdd(t *testing.T) {
for _, c := range expected {
_, ok := ipfsRPC.blocks.Load(c)
if !ok {
t.Error("no IPFS.BlockPut for block", c)
t.Error("block was not added to IPFS", c)
}
}
@ -109,7 +112,7 @@ func TestAdd(t *testing.T) {
params := api.DefaultAddParams()
params.Layout = "trickle"
dags := New(client, params, false)
dags := New(context.Background(), client, params, false)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()

View File

@ -4,9 +4,10 @@ import (
"context"
"errors"
"fmt"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
@ -18,82 +19,90 @@ import (
// block fails on all of them.
var ErrBlockAdder = errors.New("failed to put block on all destinations")
// BlockAdder implements "github.com/ipfs/go-ipld-format".NodeAdder.
// It helps sending nodes to multiple destinations, as long as one of
// them is still working.
type BlockAdder struct {
// BlockStreamer helps streaming nodes to multiple destinations, as long as
// one of them is still working.
type BlockStreamer struct {
dests []peer.ID
rpcClient *rpc.Client
blocks <-chan api.NodeWithMeta
ctx context.Context
cancel context.CancelFunc
errMu sync.Mutex
err error
}
// NewBlockAdder creates a BlockAdder given an rpc client and allocated peers.
func NewBlockAdder(rpcClient *rpc.Client, dests []peer.ID) *BlockAdder {
return &BlockAdder{
// NewBlockStreamer creates a BlockStreamer given an rpc client, allocated
// peers and a channel on which the blocks to stream are received.
func NewBlockStreamer(ctx context.Context, rpcClient *rpc.Client, dests []peer.ID, blocks <-chan api.NodeWithMeta) *BlockStreamer {
bsCtx, cancel := context.WithCancel(ctx)
bs := BlockStreamer{
ctx: bsCtx,
cancel: cancel,
dests: dests,
rpcClient: rpcClient,
blocks: blocks,
err: nil,
}
go bs.streamBlocks()
return &bs
}
// Add puts an ipld node to the allocated destinations.
func (ba *BlockAdder) Add(ctx context.Context, node ipld.Node) error {
nodeSerial := ipldNodeToNodeWithMeta(node)
// Done returns a channel which gets closed when the BlockStreamer has
// finished.
func (bs *BlockStreamer) Done() <-chan struct{} {
return bs.ctx.Done()
}
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(ba.dests))
defer rpcutil.MultiCancel(cancels)
func (bs *BlockStreamer) setErr(err error) {
bs.errMu.Lock()
bs.err = err
bs.errMu.Unlock()
}
logger.Debugf("block put %s to %s", nodeSerial.Cid, ba.dests)
errs := ba.rpcClient.MultiCall(
ctxs,
ba.dests,
// Err returns any errors that happened after the operation of the
// BlockStreamer, for example when blocks could not be put to all nodes.
func (bs *BlockStreamer) Err() error {
bs.errMu.Lock()
defer bs.errMu.Unlock()
return bs.err
}
func (bs *BlockStreamer) streamBlocks() {
defer bs.cancel()
// Nothing should be sent on out.
// We drain though
out := make(chan struct{})
go func() {
for range out {
}
}()
errs := bs.rpcClient.MultiStream(
bs.ctx,
bs.dests,
"IPFSConnector",
"BlockPut",
nodeSerial,
rpcutil.RPCDiscardReplies(len(ba.dests)),
"BlockStream",
bs.blocks,
out,
)
var successfulDests []peer.ID
numErrs := 0
for i, e := range errs {
if e != nil {
logger.Errorf("BlockPut on %s: %s", ba.dests[i], e)
numErrs++
}
// RPCErrors include server errors (wrong RPC methods), client
// errors (creating, writing or reading streams) and
// authorization errors, but not IPFS errors from a failed blockput
// for example.
if rpc.IsRPCError(e) {
continue
}
successfulDests = append(successfulDests, ba.dests[i])
// this eliminates any nil errors.
combinedErrors := multierr.Combine(errs...)
if len(multierr.Errors(combinedErrors)) == len(bs.dests) {
logger.Error(combinedErrors)
bs.setErr(ErrBlockAdder)
} else {
logger.Warning("there were errors streaming blocks, but at least one destination succeeded")
logger.Warning(combinedErrors)
}
// If all requests resulted in errors, fail.
// Successful dests will have members when no errors happened
// or when an error happened but it was not an RPC error.
// As long as BlockPut worked in 1 destination, we move on.
if numErrs == len(ba.dests) || len(successfulDests) == 0 {
return ErrBlockAdder
}
ba.dests = successfulDests
return nil
}
// AddMany puts multiple ipld nodes to allocated destinations.
func (ba *BlockAdder) AddMany(ctx context.Context, nodes []ipld.Node) error {
for _, node := range nodes {
err := ba.Add(ctx, node)
if err != nil {
return err
}
}
return nil
}
// ipldNodeToNodeSerial converts an ipld.Node to NodeWithMeta.
func ipldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
// IpldNodeToNodeWithMeta converts an ipld.Node to api.NodeWithMeta.
func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
size, err := n.Size()
if err != nil {
logger.Warn(err)

View File

@ -615,7 +615,7 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
logger.Warnf("Proxy/add does not support all IPFS params. Current options: %+v", params)
outputTransform := func(in *api.AddedOutput) interface{} {
outputTransform := func(in api.AddedOutput) interface{} {
cidStr := ""
if in.Cid.Defined() {
cidStr = in.Cid.String()

View File

@ -1728,17 +1728,17 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) {
// pipeline is used to DAGify the file. Depending on input parameters this
// DAG can be added locally to the calling cluster peer's ipfs repo, or
// sharded across the entire cluster.
func (c *Cluster) AddFile(reader *multipart.Reader, params api.AddParams) (cid.Cid, error) {
func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (cid.Cid, error) {
// TODO: add context param and tracing
var dags adder.ClusterDAGService
if params.Shard {
dags = sharding.New(c.rpcClient, params, nil)
dags = sharding.New(ctx, c.rpcClient, params, nil)
} else {
dags = single.New(c.rpcClient, params, params.Local)
dags = single.New(ctx, c.rpcClient, params, params.Local)
}
add := adder.New(dags, params, nil)
return add.FromMultipart(c.ctx, reader)
return add.FromMultipart(ctx, reader)
}
// Version returns the current IPFS Cluster version.

View File

@ -134,8 +134,10 @@ func (ipfs *mockConnector) Resolve(ctx context.Context, path string) (cid.Cid, e
func (ipfs *mockConnector) ConnectSwarms(ctx context.Context) error { return nil }
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
func (ipfs *mockConnector) BlockPut(ctx context.Context, nwm api.NodeWithMeta) error {
ipfs.blocks.Store(nwm.Cid.String(), nwm.Data)
func (ipfs *mockConnector) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta) error {
for n := range in {
ipfs.blocks.Store(n.Cid.String(), n.Data)
}
return nil
}
@ -372,7 +374,7 @@ func TestAddFile(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
c, err := cl.AddFile(r, params)
c, err := cl.AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -401,7 +403,7 @@ func TestAddFile(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
c, err := cl.AddFile(r, params)
c, err := cl.AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -431,7 +433,7 @@ func TestUnpinShard(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
root, err := cl.AddFile(r, params)
root, err := cl.AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}

View File

@ -95,8 +95,8 @@ type IPFSConnector interface {
RepoGC(context.Context) (api.RepoGC, error)
// Resolve returns a cid given a path.
Resolve(context.Context, string) (cid.Cid, error)
// BlockPut directly adds a block of data to the IPFS repo.
BlockPut(context.Context, api.NodeWithMeta) error
// BlockStream adds a stream of blocks to IPFS.
BlockStream(context.Context, <-chan api.NodeWithMeta) error
// BlockGet retrieves the raw data of an IPFS block.
BlockGet(context.Context, cid.Cid) ([]byte, error)
}

View File

@ -2150,7 +2150,7 @@ func TestClustersFollowerMode(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
_, err = clusters[1].AddFile(r, params)
_, err = clusters[1].AddFile(ctx, r, params)
if err != errFollowerMode {
t.Error("expected follower mode error")
}

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
@ -118,7 +119,7 @@ type ipfsSwarmPeersResp struct {
}
type ipfsBlockPutResp struct {
Key string
Key api.Cid
Size int
}
@ -903,35 +904,128 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
return swarm, nil
}
// BlockPut triggers an ipfs block put on the given data, inserting the block
// into the ipfs daemon's repo.
func (ipfs *Connector) BlockPut(ctx context.Context, b api.NodeWithMeta) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockPut")
defer span.End()
// chanDirectory implementes the files.Directory interace
type chanDirectory struct {
iterator files.DirIterator
}
logger.Debugf("putting block to IPFS: %s", b.Cid)
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
defer ipfs.updateInformerMetric(ctx)
// Close is a no-op and it is not used.
func (cd *chanDirectory) Close() error {
return nil
}
mapDir := files.NewMapDirectory(
map[string]files.Node{ // IPFS reqs require a wrapping directory
"": files.NewBytesFile(b.Data),
},
)
// not implemented, I think not needed for multipart.
func (cd *chanDirectory) Size() (int64, error) {
return 0, nil
}
multiFileR := files.NewMultiFileReader(mapDir, true)
func (cd *chanDirectory) Entries() files.DirIterator {
return cd.iterator
}
// chanIterator implements the files.DirIterator interface.
type chanIterator struct {
ctx context.Context
blocks <-chan api.NodeWithMeta
current api.NodeWithMeta
peeked api.NodeWithMeta
done bool
err error
seenMu sync.Mutex
seen *cid.Set
}
func (ci *chanIterator) Name() string {
if !ci.current.Cid.Defined() {
return ""
}
return ci.current.Cid.String()
}
// return NewBytesFile.
func (ci *chanIterator) Node() files.Node {
if !ci.current.Cid.Defined() {
return nil
}
ci.seenMu.Lock()
if ci.seen.Visit(ci.current.Cid) {
logger.Debugf("block %s", ci.current.Cid)
}
ci.seenMu.Unlock()
return files.NewBytesFile(ci.current.Data)
}
func (ci *chanIterator) Seen(c api.Cid) bool {
ci.seenMu.Lock()
has := ci.seen.Has(cid.Cid(c))
ci.seen.Remove(cid.Cid(c))
ci.seenMu.Unlock()
return has
}
func (ci *chanIterator) Done() bool {
return ci.done
}
// Peek reads one block from the channel but saves it so that Next also
// returns it.
func (ci *chanIterator) Peek() (api.NodeWithMeta, bool) {
if ci.done {
return api.NodeWithMeta{}, false
}
select {
case <-ci.ctx.Done():
return api.NodeWithMeta{}, false
case next, ok := <-ci.blocks:
if !ok {
return api.NodeWithMeta{}, false
}
ci.peeked = next
return next, true
}
}
func (ci *chanIterator) Next() bool {
if ci.done {
return false
}
if ci.peeked.Cid.Defined() {
ci.current = ci.peeked
ci.peeked = api.NodeWithMeta{}
return true
}
select {
case <-ci.ctx.Done():
ci.done = true
ci.err = ci.ctx.Err()
return false
case next, ok := <-ci.blocks:
if !ok {
ci.done = true
return false
}
ci.current = next
return true
}
}
func (ci *chanIterator) Err() error {
return ci.err
}
func blockPutQuery(prefix cid.Prefix) (url.Values, error) {
q := make(url.Values, 3)
prefix := b.Cid.Prefix()
format, ok := cid.CodecToStr[prefix.Codec]
if !ok {
return fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec)
return q, fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec)
}
mhType, ok := multihash.Codes[prefix.MhType]
if !ok {
return fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType)
return q, fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType)
}
// IPFS behaves differently when using v0 or protobuf which are
@ -944,35 +1038,76 @@ func (ipfs *Connector) BlockPut(ctx context.Context, b api.NodeWithMeta) error {
q.Set("mhtype", mhType)
q.Set("mhlen", strconv.Itoa(prefix.MhLength))
return q, nil
}
// BlockStream performs a multipart request to block/put with the blocks
// received on the channel.
func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWithMeta) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockStream")
defer span.End()
logger.Debug("streaming blocks to IPFS")
defer ipfs.updateInformerMetric(ctx)
var errs error
it := &chanIterator{
ctx: ctx,
blocks: blocks,
seen: cid.NewSet(),
}
dir := &chanDirectory{
iterator: it,
}
// We need to pick into the first block to know which Cid prefix we
// are writing blocks with, so that ipfs calculates the expected
// multihash (we select the function used). This means that all blocks
// in a stream should use the same.
peek, ok := it.Peek()
if !ok {
return errors.New("BlockStream: no blocks to peek in blocks channel")
}
q, err := blockPutQuery(peek.Cid.Prefix())
if err != nil {
return err
}
url := "block/put?" + q.Encode()
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
body, err := ipfs.postCtx(ctx, url, contentType, multiFileR)
if err != nil {
return err
// We essentially keep going on any request errors and keep putting
// blocks until we are done. We will, however, return a final error if
// there were errors along the way, but we do not abort the blocks
// stream because we could not block/put.
for !it.Done() {
multiFileR := files.NewMultiFileReader(dir, true)
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
dec := json.NewDecoder(body)
for {
var res ipfsBlockPutResp
err := dec.Decode(&res)
if err == io.EOF {
break
}
if err != nil {
logger.Error(err)
errs = multierr.Append(errs, err)
break
}
if !it.Seen(res.Key) {
logger.Warnf("blockPut response CID (%s) does not match any blocks sent", res.Key)
}
}
// continue until it.Done()
}
var res ipfsBlockPutResp
err = json.Unmarshal(body, &res)
if err != nil {
return err
}
logger.Debug("block/put response CID", res.Key)
respCid, err := cid.Decode(res.Key)
if err != nil {
logger.Error("cannot parse CID from BlockPut response")
return err
}
// IPFS is too brittle here. CIDv0 != CIDv1. Sending "protobuf" format
// returns CidV1. Sending "v0" format (which maps to protobuf)
// returns CidV0. Leaving this as warning.
if !respCid.Equals(b.Cid) {
logger.Warnf("blockPut response CID (%s) does not match the block sent (%s)", respCid, b.Cid)
}
return nil
return errs
}
// BlockGet retrieves an ipfs block with the given cid

View File

@ -17,7 +17,7 @@ import (
func init() {
_ = logging.Logger
//logging.SetLogLevel("*", "DEBUG")
logging.SetLogLevel("*", "DEBUG")
}
func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
@ -315,26 +315,40 @@ func TestSwarmPeers(t *testing.T) {
}
}
func TestBlockPut(t *testing.T) {
func TestBlockStream(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown(ctx)
// CidV1
err := ipfs.BlockPut(ctx, api.NodeWithMeta{
blocks := make(chan api.NodeWithMeta, 10)
blocks <- api.NodeWithMeta{
Data: []byte(test.Cid4Data),
Cid: test.Cid4,
})
}
// Because this has a different prefix,
// it will produce a warning.
blocks <- api.NodeWithMeta{
Data: []byte(test.Cid5Data),
Cid: test.Cid5,
}
close(blocks)
err := ipfs.BlockStream(ctx, blocks)
if err != nil {
t.Error(err)
}
// CidV0
err = ipfs.BlockPut(ctx, api.NodeWithMeta{
// Try only adding v0 cid now
blocks2 := make(chan api.NodeWithMeta, 1)
blocks2 <- api.NodeWithMeta{
Data: []byte(test.Cid5Data),
Cid: test.Cid5,
})
}
close(blocks2)
err = ipfs.BlockStream(ctx, blocks2)
if err != nil {
t.Error(err)
}
@ -353,11 +367,13 @@ func TestBlockGet(t *testing.T) {
t.Fatal("expected to fail getting unput block")
}
// Put and then successfully get
err = ipfs.BlockPut(ctx, api.NodeWithMeta{
blocks := make(chan api.NodeWithMeta, 1)
blocks <- api.NodeWithMeta{
Data: test.ShardData,
Cid: test.ShardCid,
})
}
close(blocks)
err = ipfs.BlockStream(ctx, blocks)
if err != nil {
t.Fatal(err)
}

View File

@ -570,9 +570,10 @@ func (rpcapi *IPFSConnectorRPCAPI) SwarmPeers(ctx context.Context, in struct{},
return nil
}
// BlockPut runs IPFSConnector.BlockPut().
func (rpcapi *IPFSConnectorRPCAPI) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
return rpcapi.ipfs.BlockPut(ctx, in)
// BlockStream runs IPFSConnector.BlockStream().
func (rpcapi *IPFSConnectorRPCAPI) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
close(out)
return rpcapi.ipfs.BlockStream(ctx, in)
}
// BlockGet runs IPFSConnector.BlockGet().

View File

@ -47,16 +47,17 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"PinTracker.Untrack": RPCClosed,
// IPFSConnector methods
"IPFSConnector.BlockGet": RPCClosed,
"IPFSConnector.BlockPut": RPCTrusted, // Called from Add()
"IPFSConnector.ConfigKey": RPCClosed,
"IPFSConnector.Pin": RPCClosed,
"IPFSConnector.PinLs": RPCClosed,
"IPFSConnector.PinLsCid": RPCClosed,
"IPFSConnector.RepoStat": RPCTrusted, // Called in broadcast from proxy/repo/stat
"IPFSConnector.Resolve": RPCClosed,
"IPFSConnector.SwarmPeers": RPCTrusted, // Called in ConnectGraph
"IPFSConnector.Unpin": RPCClosed,
"IPFSConnector.BlockGet": RPCClosed,
"IPFSConnector.BlockPut": RPCClosed, // Not used - replaced by BlockStream
"IPFSConnector.BlockStream": RPCTrusted, // Called by adders
"IPFSConnector.ConfigKey": RPCClosed,
"IPFSConnector.Pin": RPCClosed,
"IPFSConnector.PinLs": RPCClosed,
"IPFSConnector.PinLsCid": RPCClosed,
"IPFSConnector.RepoStat": RPCTrusted, // Called in broadcast from proxy/repo/stat
"IPFSConnector.Resolve": RPCClosed,
"IPFSConnector.SwarmPeers": RPCTrusted, // Called in ConnectGraph
"IPFSConnector.Unpin": RPCClosed,
// Consensus methods
"Consensus.AddPeer": RPCTrusted, // Called by Raft/redirect to leader

View File

@ -24,20 +24,21 @@ func rpcTypeStr(t cluster.RPCEndpointType) string {
}
var comments = map[string]string{
"Cluster.PeerAdd": "Used by Join()",
"Cluster.Peers": "Used by ConnectGraph()",
"Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi",
"PinTracker.Recover": "Called in broadcast from Recover()",
"PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented",
"Pintracker.Status": "Called in broadcast from Status()",
"Pintracker.StatusAll": "Called in broadcast from StatusAll()",
"IPFSConnector.BlockPut": "Called from Add()",
"IPFSConnector.RepoStat": "Called in broadcast from proxy/repo/stat",
"IPFSConnector.SwarmPeers": "Called in ConnectGraph",
"Consensus.AddPeer": "Called by Raft/redirect to leader",
"Consensus.LogPin": "Called by Raft/redirect to leader",
"Consensus.LogUnpin": "Called by Raft/redirect to leader",
"Consensus.RmPeer": "Called by Raft/redirect to leader",
"Cluster.PeerAdd": "Used by Join()",
"Cluster.Peers": "Used by ConnectGraph()",
"Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi",
"PinTracker.Recover": "Called in broadcast from Recover()",
"PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented",
"Pintracker.Status": "Called in broadcast from Status()",
"Pintracker.StatusAll": "Called in broadcast from StatusAll()",
"IPFSConnector.BlockPut": "Not used - replaced by BlockStream",
"IPFSConnector.BlockStream": "Called by adders",
"IPFSConnector.RepoStat": "Called in broadcast from proxy/repo/stat",
"IPFSConnector.SwarmPeers": "Called in ConnectGraph",
"Consensus.AddPeer": "Called by Raft/redirect to leader",
"Consensus.LogPin": "Called by Raft/redirect to leader",
"Consensus.LogUnpin": "Called by Raft/redirect to leader",
"Consensus.RmPeer": "Called by Raft/redirect to leader",
}
func main() {

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -367,48 +368,62 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
j, _ := json.Marshal(resp)
w.Write(j)
case "block/put":
// Get the data and retun the hash
mpr, err := r.MultipartReader()
if err != nil {
goto ERROR
}
part, err := mpr.NextPart()
if err != nil {
goto ERROR
}
data, err := ioutil.ReadAll(part)
if err != nil {
goto ERROR
}
// Parse cid from data and format and add to mock block-store
w.Header().Set("Trailer", "X-Stream-Error")
query := r.URL.Query()
formatStr := query.Get("format")
format := cid.Codecs[formatStr]
mhType := multihash.Names[query.Get("mhtype")]
mhLen, _ := strconv.Atoi(query.Get("mhLen"))
var builder cid.Builder
if formatStr == "v0" && mhType == multihash.SHA2_256 {
builder = cid.V0Builder{}
} else {
builder = cid.V1Builder{
Codec: format,
MhType: mhType,
MhLength: mhLen,
}
}
c, err := builder.Sum(data)
// Get the data and retun the hash
mpr, err := r.MultipartReader()
if err != nil {
goto ERROR
}
m.BlockStore[c.String()] = data
resp := mockBlockPutResp{
Key: c.String(),
w.WriteHeader(http.StatusOK)
for {
part, err := mpr.NextPart()
if err == io.EOF {
return
}
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
data, err := ioutil.ReadAll(part)
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
// Parse cid from data and format and add to mock block-store
var builder cid.Builder
if formatStr == "v0" && mhType == multihash.SHA2_256 {
builder = cid.V0Builder{}
} else {
builder = cid.V1Builder{
Codec: format,
MhType: mhType,
MhLength: mhLen,
}
}
c, err := builder.Sum(data)
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
m.BlockStore[c.String()] = data
resp := mockBlockPutResp{
Key: c.String(),
}
j, _ := json.Marshal(resp)
w.Write(j)
}
j, _ := json.Marshal(resp)
w.Write(j)
case "block/get":
query := r.URL.Query()
arg, ok := query["arg"]

View File

@ -584,7 +584,8 @@ func (mock *mockIPFSConnector) RepoStat(ctx context.Context, in struct{}, out *a
return nil
}
func (mock *mockIPFSConnector) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
func (mock *mockIPFSConnector) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
close(out)
return nil
}