From 1d985384114dd50adc0e1351e58c01ae98696dbb Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 24 Mar 2022 02:17:10 +0100 Subject: [PATCH] Adders: stream blocks to destinations This commit fixes #810 and adds block streaming to the final destinations when adding. This should add major performance gains when adding data to clusters. Before, everytime cluster issued a block, it was broadcasted individually to all destinations (new libp2p stream), where it was block/put to IPFS (a single block/put http roundtrip per block). Now, blocks are streamed all the way from the adder module to the ipfs daemon, by making every block as it arrives a single part in a multipart block/put request. Before, block-broadcast needed to wait for all destinations to finish in order to process the next block. Now, buffers allow some destinations to be faster than others while sending and receiving blocks. Before, if a block put request failed to be broadcasted everywhere, an error would happen at that moment. Now, we keep streaming until the end and only then report any errors. The operation succeeds as long as at least one stream finished successfully. Errors block/putting to IPFS will not abort streams. Instead, subsequent blocks are retried with a new request, although the method will return an error when the stream finishes if there were errors at any point. --- add_test.go | 12 +- adder/adder.go | 14 +- adder/adderutils/adderutils.go | 14 +- adder/ipfsadd/add.go | 12 +- adder/sharding/dag_service.go | 43 ++++-- adder/sharding/dag_service_test.go | 11 +- adder/sharding/shard.go | 39 ++++- adder/single/dag_service.go | 42 +++++- adder/single/dag_service_test.go | 13 +- adder/util.go | 129 +++++++++-------- api/ipfsproxy/ipfsproxy.go | 2 +- cluster.go | 8 +- cluster_test.go | 12 +- ipfscluster.go | 4 +- ipfscluster_test.go | 2 +- ipfsconn/ipfshttp/ipfshttp.go | 221 +++++++++++++++++++++++------ ipfsconn/ipfshttp/ipfshttp_test.go | 38 +++-- rpc_api.go | 7 +- rpc_policy.go | 21 +-- rpcutil/policygen/policygen.go | 29 ++-- test/ipfs_mock.go | 77 ++++++---- test/rpc_api_mock.go | 3 +- 22 files changed, 511 insertions(+), 242 deletions(-) diff --git a/add_test.go b/add_test.go index dad2ea22..e79bc7f2 100644 --- a/add_test.go +++ b/add_test.go @@ -32,7 +32,7 @@ func TestAdd(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - ci, err := clusters[0].AddFile(r, params) + ci, err := clusters[0].AddFile(context.Background(), r, params) if err != nil { t.Fatal(err) } @@ -67,7 +67,7 @@ func TestAdd(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - ci, err := clusters[2].AddFile(r, params) + ci, err := clusters[2].AddFile(context.Background(), r, params) if err != nil { t.Fatal(err) } @@ -119,7 +119,7 @@ func TestAddWithUserAllocations(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - ci, err := clusters[0].AddFile(r, params) + ci, err := clusters[0].AddFile(context.Background(), r, params) if err != nil { t.Fatal(err) } @@ -167,7 +167,7 @@ func TestAddPeerDown(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - ci, err := clusters[1].AddFile(r, params) + ci, err := clusters[1].AddFile(context.Background(), r, params) if err != nil { t.Fatal(err) } @@ -218,7 +218,7 @@ func TestAddOnePeerFails(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := clusters[0].AddFile(r, params) + _, err := clusters[0].AddFile(context.Background(), r, params) if err != nil { t.Error(err) } @@ -276,7 +276,7 @@ func TestAddAllPeersFail(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := clusters[0].AddFile(r, params) + _, err := clusters[0].AddFile(context.Background(), r, params) if err != adder.ErrBlockAdder { t.Error("expected ErrBlockAdder. Got: ", err) } diff --git a/adder/adder.go b/adder/adder.go index 346d2600..1d5cc637 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -68,18 +68,18 @@ type Adder struct { // whenever a block is processed. They contain information // about the block, the CID, the Name etc. and are mostly // meant to be streamed back to the user. - output chan *api.AddedOutput + output chan api.AddedOutput } // New returns a new Adder with the given ClusterDAGService, add options and a // channel to send updates during the adding process. // // An Adder may only be used once. -func New(ds ClusterDAGService, p api.AddParams, out chan *api.AddedOutput) *Adder { +func New(ds ClusterDAGService, p api.AddParams, out chan api.AddedOutput) *Adder { // Discard all progress update output as the caller has not provided // a channel for them to listen on. if out == nil { - out = make(chan *api.AddedOutput, 100) + out = make(chan api.AddedOutput, 100) go func() { for range out { } @@ -188,7 +188,7 @@ type ipfsAdder struct { *ipfsadd.Adder } -func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) { +func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*ipfsAdder, error) { iadder, err := ipfsadd.NewAdder(ctx, dgs, dgs.Allocations) if err != nil { logger.Error(err) @@ -253,10 +253,10 @@ type carAdder struct { ctx context.Context dgs ClusterDAGService params api.AddParams - output chan *api.AddedOutput + output chan api.AddedOutput } -func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*carAdder, error) { +func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*carAdder, error) { return &carAdder{ ctx: ctx, dgs: dgs, @@ -319,7 +319,7 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { } } - ca.output <- &api.AddedOutput{ + ca.output <- api.AddedOutput{ Name: name, Cid: root, Bytes: bytes, diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index 1452be09..94c98d06 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -30,19 +30,19 @@ func AddMultipartHTTPHandler( params api.AddParams, reader *multipart.Reader, w http.ResponseWriter, - outputTransform func(*api.AddedOutput) interface{}, + outputTransform func(api.AddedOutput) interface{}, ) (cid.Cid, error) { var dags adder.ClusterDAGService - output := make(chan *api.AddedOutput, 200) + output := make(chan api.AddedOutput, 200) if params.Shard { - dags = sharding.New(rpc, params, output) + dags = sharding.New(ctx, rpc, params, output) } else { - dags = single.New(rpc, params, params.Local) + dags = single.New(ctx, rpc, params, params.Local) } if outputTransform == nil { - outputTransform = func(in *api.AddedOutput) interface{} { return in } + outputTransform = func(in api.AddedOutput) interface{} { return in } } // This must be application/json otherwise go-ipfs client @@ -112,7 +112,7 @@ func AddMultipartHTTPHandler( return root, err } -func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) { +func streamOutput(w http.ResponseWriter, output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) { flusher, flush := w.(http.Flusher) enc := json.NewEncoder(w) for v := range output { @@ -127,7 +127,7 @@ func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform } } -func buildOutput(output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) []interface{} { +func buildOutput(output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) []interface{} { var finalOutput []interface{} for v := range output { finalOutput = append(finalOutput, transform(v)) diff --git a/adder/ipfsadd/add.go b/adder/ipfsadd/add.go index 57f6d0d0..31e617de 100644 --- a/adder/ipfsadd/add.go +++ b/adder/ipfsadd/add.go @@ -51,7 +51,7 @@ type Adder struct { ctx context.Context dagService ipld.DAGService allocsFun func() []peer.ID - Out chan *api.AddedOutput + Out chan api.AddedOutput Progress bool Trickle bool RawLeaves bool @@ -425,9 +425,9 @@ func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) erro } // outputDagnode sends dagnode info over the output channel. -// Cluster: we use *api.AddedOutput instead of coreiface events +// Cluster: we use api.AddedOutput instead of coreiface events // and make this an adder method to be be able to prefix. -func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error { +func (adder *Adder) outputDagnode(out chan api.AddedOutput, name string, dn ipld.Node) error { if out == nil { return nil } @@ -445,7 +445,7 @@ func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipl // account for this here. name = filepath.Join(adder.OutputPrefix, name) - out <- &api.AddedOutput{ + out <- api.AddedOutput{ Cid: dn.Cid(), Name: name, Size: s, @@ -458,7 +458,7 @@ func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipl type progressReader struct { file io.Reader path string - out chan *api.AddedOutput + out chan api.AddedOutput bytes int64 lastProgress int64 } @@ -469,7 +469,7 @@ func (i *progressReader) Read(p []byte) (int, error) { i.bytes += int64(n) if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { i.lastProgress = i.bytes - i.out <- &api.AddedOutput{ + i.out <- api.AddedOutput{ Name: i.path, Bytes: uint64(i.bytes), } diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index a317f31d..0ffe971d 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -30,10 +30,11 @@ var logger = logging.Logger("shardingdags") type DAGService struct { adder.BaseDAGService + ctx context.Context rpcClient *rpc.Client addParams api.AddParams - output chan<- *api.AddedOutput + output chan<- api.AddedOutput addedSet *cid.Set @@ -50,11 +51,12 @@ type DAGService struct { } // New returns a new ClusterDAGService, which uses the given rpc client to perform -// Allocate, IPFSBlockPut and Pin requests to other cluster components. -func New(rpc *rpc.Client, opts api.AddParams, out chan<- *api.AddedOutput) *DAGService { +// Allocate, IPFSStream and Pin requests to other cluster components. +func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, out chan<- api.AddedOutput) *DAGService { // use a default value for this regardless of what is provided. opts.Mode = api.PinModeRecursive return &DAGService{ + ctx: ctx, rpcClient: rpc, addParams: opts, output: out, @@ -93,14 +95,34 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, } // PutDAG to ourselves - err = adder.NewBlockAdder(dgs.rpcClient, []peer.ID{""}).AddMany(ctx, clusterDAGNodes) - if err != nil { + blocks := make(chan api.NodeWithMeta, 256) + go func() { + defer close(blocks) + for _, n := range clusterDAGNodes { + select { + case <-ctx.Done(): + logger.Error(ctx.Err()) + return //abort + case blocks <- adder.IpldNodeToNodeWithMeta(n): + } + } + }() + + // Stream these blocks and wait until we are done. + bs := adder.NewBlockStreamer(ctx, dgs.rpcClient, []peer.ID{""}, blocks) + select { + case <-ctx.Done(): + return dataRoot, ctx.Err() + case <-bs.Done(): + } + + if err := bs.Err(); err != nil { return dataRoot, err } clusterDAG := clusterDAGNodes[0].Cid() - dgs.sendOutput(&api.AddedOutput{ + dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name), Cid: clusterDAG, Size: dgs.totalSize, @@ -174,7 +196,8 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error { if shard == nil { logger.Infof("new shard for '%s': #%d", dgs.addParams.Name, len(dgs.shards)) var err error - shard, err = newShard(ctx, dgs.rpcClient, dgs.addParams.PinOptions) + // important: shards use the DAGService context. + shard, err = newShard(dgs.ctx, ctx, dgs.rpcClient, dgs.addParams.PinOptions) if err != nil { return err } @@ -189,7 +212,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error { // add the block to it if it fits and return if shard.Size()+size < shard.Limit() { shard.AddLink(ctx, n.Cid(), size) - return dgs.currentShard.ba.Add(ctx, n) + return dgs.currentShard.sendBlock(ctx, n) } logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d", @@ -246,7 +269,7 @@ Ingest Rate: %s/s } -func (dgs *DAGService) sendOutput(ao *api.AddedOutput) { +func (dgs *DAGService) sendOutput(ao api.AddedOutput) { if dgs.output != nil { dgs.output <- ao } @@ -269,7 +292,7 @@ func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) { dgs.shards[fmt.Sprintf("%d", lens)] = shardCid dgs.previousShard = shardCid dgs.currentShard = nil - dgs.sendOutput(&api.AddedOutput{ + dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("shard-%d", lens), Cid: shardCid, Size: shard.Size(), diff --git a/adder/sharding/dag_service_test.go b/adder/sharding/dag_service_test.go index f5bee9f8..fcb52293 100644 --- a/adder/sharding/dag_service_test.go +++ b/adder/sharding/dag_service_test.go @@ -27,8 +27,11 @@ type testRPC struct { pins sync.Map } -func (rpcs *testRPC) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error { - rpcs.blocks.Store(in.Cid.String(), in.Data) +func (rpcs *testRPC) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error { + defer close(out) + for n := range in { + rpcs.blocks.Store(n.Cid.String(), n.Data) + } return nil } @@ -77,9 +80,9 @@ func makeAdder(t *testing.T, params api.AddParams) (*adder.Adder, *testRPC) { } client := rpc.NewClientWithServer(nil, "mock", server) - out := make(chan *api.AddedOutput, 1) + out := make(chan api.AddedOutput, 1) - dags := New(client, params, out) + dags := New(context.Background(), client, params, out) add := adder.New(dags, params, out) go func() { diff --git a/adder/sharding/shard.go b/adder/sharding/shard.go index 668d037e..cd3d5549 100644 --- a/adder/sharding/shard.go +++ b/adder/sharding/shard.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/ipfs-cluster/adder" "github.com/ipfs/ipfs-cluster/api" @@ -18,10 +19,12 @@ import ( // a peer to be block-put and will be part of the same shard in the // cluster DAG. type shard struct { + ctx context.Context rpc *rpc.Client allocations []peer.ID pinOptions api.PinOptions - ba *adder.BlockAdder + bs *adder.BlockStreamer + blocks chan api.NodeWithMeta // dagNode represents a node with links and will be converted // to Cbor. dagNode map[string]cid.Cid @@ -29,7 +32,7 @@ type shard struct { sizeLimit uint64 } -func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) { +func newShard(globalCtx context.Context, ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) { allocs, err := adder.BlockAllocate(ctx, rpc, opts) if err != nil { return nil, err @@ -47,11 +50,15 @@ func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard // TODO (hector): get latest metrics for allocations, adjust sizeLimit // to minimum. This can be done later. + blocks := make(chan api.NodeWithMeta, 256) + return &shard{ + ctx: globalCtx, rpc: rpc, allocations: allocs, pinOptions: opts, - ba: adder.NewBlockAdder(rpc, allocs), + bs: adder.NewBlockStreamer(globalCtx, rpc, allocs, blocks), + blocks: blocks, dagNode: make(map[string]cid.Cid), currentSize: 0, sizeLimit: opts.ShardSize, @@ -77,6 +84,15 @@ func (sh *shard) Allocations() []peer.ID { return sh.allocations } +func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error { + select { + case <-ctx.Done(): + return ctx.Err() + case sh.blocks <- adder.IpldNodeToNodeWithMeta(n): + return nil + } +} + // Flush completes the allocation of this shard by building a CBOR node // and adding it to IPFS, then pinning it in cluster. It returns the Cid of the // shard. @@ -87,8 +103,21 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, return cid.Undef, err } - err = sh.ba.AddMany(ctx, nodes) - if err != nil { + for _, n := range nodes { + err = sh.sendBlock(ctx, n) + if err != nil { + close(sh.blocks) + return cid.Undef, err + } + } + close(sh.blocks) + select { + case <-ctx.Done(): + return cid.Undef, ctx.Err() + case <-sh.bs.Done(): + } + + if err := sh.bs.Err(); err != nil { return cid.Undef, err } diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index 57502c0d..865bfc26 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -24,30 +24,38 @@ var _ = logger // otherwise unused type DAGService struct { adder.BaseDAGService + ctx context.Context rpcClient *rpc.Client dests []peer.ID addParams api.AddParams local bool - ba *adder.BlockAdder + bs *adder.BlockStreamer + blocks chan api.NodeWithMeta } // New returns a new Adder with the given rpc Client. The client is used -// to perform calls to IPFS.BlockPut and Pin content on Cluster. -func New(rpc *rpc.Client, opts api.AddParams, local bool) *DAGService { +// to perform calls to IPFS.BlockStream and Pin content on Cluster. +func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, local bool) *DAGService { // ensure don't Add something and pin it in direct mode. opts.Mode = api.PinModeRecursive return &DAGService{ + ctx: ctx, rpcClient: rpc, dests: nil, addParams: opts, local: local, + blocks: make(chan api.NodeWithMeta, 256), } } // Add puts the given node in the destination peers. func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { + + // FIXME: can't this happen on initialization? Perhaps the point here + // is the adder only allocates and starts streaming when the first + // block arrives and not on creation. if dgs.dests == nil { dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.addParams.PinOptions) if err != nil { @@ -76,17 +84,39 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { dgs.dests[len(dgs.dests)-1] = localPid } - dgs.ba = adder.NewBlockAdder(dgs.rpcClient, []peer.ID{localPid}) + dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, []peer.ID{localPid}, dgs.blocks) } else { - dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dgs.dests) + dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, dgs.dests, dgs.blocks) } } - return dgs.ba.Add(ctx, node) + select { + case <-ctx.Done(): + return ctx.Err() + case <-dgs.ctx.Done(): + return ctx.Err() + case dgs.blocks <- adder.IpldNodeToNodeWithMeta(node): + return nil + } } // Finalize pins the last Cid added to this DAGService. func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { + close(dgs.blocks) + + select { + case <-dgs.ctx.Done(): + return root, ctx.Err() + case <-ctx.Done(): + return root, ctx.Err() + case <-dgs.bs.Done(): + } + + // If the streamer failed to put blocks. + if err := dgs.bs.Err(); err != nil { + return root, err + } + // Do not pin, just block put. // Why? Because some people are uploading CAR files with partial DAGs // and ideally they should be pinning only when the last partial CAR diff --git a/adder/single/dag_service_test.go b/adder/single/dag_service_test.go index 227c6a0c..1dda2805 100644 --- a/adder/single/dag_service_test.go +++ b/adder/single/dag_service_test.go @@ -23,8 +23,11 @@ type testClusterRPC struct { pins sync.Map } -func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error { - rpcs.blocks.Store(in.Cid.String(), in) +func (rpcs *testIPFSRPC) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error { + defer close(out) + for n := range in { + rpcs.blocks.Store(n.Cid.String(), n) + } return nil } @@ -61,7 +64,7 @@ func TestAdd(t *testing.T) { params := api.DefaultAddParams() params.Wrap = true - dags := New(client, params, false) + dags := New(context.Background(), client, params, false) add := adder.New(dags, params, nil) sth := test.NewShardingTestHelper() @@ -83,7 +86,7 @@ func TestAdd(t *testing.T) { for _, c := range expected { _, ok := ipfsRPC.blocks.Load(c) if !ok { - t.Error("no IPFS.BlockPut for block", c) + t.Error("block was not added to IPFS", c) } } @@ -109,7 +112,7 @@ func TestAdd(t *testing.T) { params := api.DefaultAddParams() params.Layout = "trickle" - dags := New(client, params, false) + dags := New(context.Background(), client, params, false) add := adder.New(dags, params, nil) sth := test.NewShardingTestHelper() diff --git a/adder/util.go b/adder/util.go index 8310131e..5cf8914a 100644 --- a/adder/util.go +++ b/adder/util.go @@ -4,9 +4,10 @@ import ( "context" "errors" "fmt" + "sync" "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/rpcutil" + "go.uber.org/multierr" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -18,82 +19,90 @@ import ( // block fails on all of them. var ErrBlockAdder = errors.New("failed to put block on all destinations") -// BlockAdder implements "github.com/ipfs/go-ipld-format".NodeAdder. -// It helps sending nodes to multiple destinations, as long as one of -// them is still working. -type BlockAdder struct { +// BlockStreamer helps streaming nodes to multiple destinations, as long as +// one of them is still working. +type BlockStreamer struct { dests []peer.ID rpcClient *rpc.Client + blocks <-chan api.NodeWithMeta + + ctx context.Context + cancel context.CancelFunc + errMu sync.Mutex + err error } -// NewBlockAdder creates a BlockAdder given an rpc client and allocated peers. -func NewBlockAdder(rpcClient *rpc.Client, dests []peer.ID) *BlockAdder { - return &BlockAdder{ +// NewBlockStreamer creates a BlockStreamer given an rpc client, allocated +// peers and a channel on which the blocks to stream are received. +func NewBlockStreamer(ctx context.Context, rpcClient *rpc.Client, dests []peer.ID, blocks <-chan api.NodeWithMeta) *BlockStreamer { + bsCtx, cancel := context.WithCancel(ctx) + + bs := BlockStreamer{ + ctx: bsCtx, + cancel: cancel, dests: dests, rpcClient: rpcClient, + blocks: blocks, + err: nil, } + + go bs.streamBlocks() + return &bs } -// Add puts an ipld node to the allocated destinations. -func (ba *BlockAdder) Add(ctx context.Context, node ipld.Node) error { - nodeSerial := ipldNodeToNodeWithMeta(node) +// Done returns a channel which gets closed when the BlockStreamer has +// finished. +func (bs *BlockStreamer) Done() <-chan struct{} { + return bs.ctx.Done() +} - ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(ba.dests)) - defer rpcutil.MultiCancel(cancels) +func (bs *BlockStreamer) setErr(err error) { + bs.errMu.Lock() + bs.err = err + bs.errMu.Unlock() +} - logger.Debugf("block put %s to %s", nodeSerial.Cid, ba.dests) - errs := ba.rpcClient.MultiCall( - ctxs, - ba.dests, +// Err returns any errors that happened after the operation of the +// BlockStreamer, for example when blocks could not be put to all nodes. +func (bs *BlockStreamer) Err() error { + bs.errMu.Lock() + defer bs.errMu.Unlock() + return bs.err +} + +func (bs *BlockStreamer) streamBlocks() { + defer bs.cancel() + + // Nothing should be sent on out. + // We drain though + out := make(chan struct{}) + go func() { + for range out { + } + }() + + errs := bs.rpcClient.MultiStream( + bs.ctx, + bs.dests, "IPFSConnector", - "BlockPut", - nodeSerial, - rpcutil.RPCDiscardReplies(len(ba.dests)), + "BlockStream", + bs.blocks, + out, ) - var successfulDests []peer.ID - numErrs := 0 - for i, e := range errs { - if e != nil { - logger.Errorf("BlockPut on %s: %s", ba.dests[i], e) - numErrs++ - } - - // RPCErrors include server errors (wrong RPC methods), client - // errors (creating, writing or reading streams) and - // authorization errors, but not IPFS errors from a failed blockput - // for example. - if rpc.IsRPCError(e) { - continue - } - successfulDests = append(successfulDests, ba.dests[i]) + // this eliminates any nil errors. + combinedErrors := multierr.Combine(errs...) + if len(multierr.Errors(combinedErrors)) == len(bs.dests) { + logger.Error(combinedErrors) + bs.setErr(ErrBlockAdder) + } else { + logger.Warning("there were errors streaming blocks, but at least one destination succeeded") + logger.Warning(combinedErrors) } - - // If all requests resulted in errors, fail. - // Successful dests will have members when no errors happened - // or when an error happened but it was not an RPC error. - // As long as BlockPut worked in 1 destination, we move on. - if numErrs == len(ba.dests) || len(successfulDests) == 0 { - return ErrBlockAdder - } - - ba.dests = successfulDests - return nil } -// AddMany puts multiple ipld nodes to allocated destinations. -func (ba *BlockAdder) AddMany(ctx context.Context, nodes []ipld.Node) error { - for _, node := range nodes { - err := ba.Add(ctx, node) - if err != nil { - return err - } - } - return nil -} - -// ipldNodeToNodeSerial converts an ipld.Node to NodeWithMeta. -func ipldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta { +// IpldNodeToNodeWithMeta converts an ipld.Node to api.NodeWithMeta. +func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta { size, err := n.Size() if err != nil { logger.Warn(err) diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index 5b1a5832..c5de02ae 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -615,7 +615,7 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) { logger.Warnf("Proxy/add does not support all IPFS params. Current options: %+v", params) - outputTransform := func(in *api.AddedOutput) interface{} { + outputTransform := func(in api.AddedOutput) interface{} { cidStr := "" if in.Cid.Defined() { cidStr = in.Cid.String() diff --git a/cluster.go b/cluster.go index 918ea489..39e40311 100644 --- a/cluster.go +++ b/cluster.go @@ -1728,17 +1728,17 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) { // pipeline is used to DAGify the file. Depending on input parameters this // DAG can be added locally to the calling cluster peer's ipfs repo, or // sharded across the entire cluster. -func (c *Cluster) AddFile(reader *multipart.Reader, params api.AddParams) (cid.Cid, error) { +func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (cid.Cid, error) { // TODO: add context param and tracing var dags adder.ClusterDAGService if params.Shard { - dags = sharding.New(c.rpcClient, params, nil) + dags = sharding.New(ctx, c.rpcClient, params, nil) } else { - dags = single.New(c.rpcClient, params, params.Local) + dags = single.New(ctx, c.rpcClient, params, params.Local) } add := adder.New(dags, params, nil) - return add.FromMultipart(c.ctx, reader) + return add.FromMultipart(ctx, reader) } // Version returns the current IPFS Cluster version. diff --git a/cluster_test.go b/cluster_test.go index b90a2ac4..51e23438 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -134,8 +134,10 @@ func (ipfs *mockConnector) Resolve(ctx context.Context, path string) (cid.Cid, e func (ipfs *mockConnector) ConnectSwarms(ctx context.Context) error { return nil } func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil } -func (ipfs *mockConnector) BlockPut(ctx context.Context, nwm api.NodeWithMeta) error { - ipfs.blocks.Store(nwm.Cid.String(), nwm.Data) +func (ipfs *mockConnector) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta) error { + for n := range in { + ipfs.blocks.Store(n.Cid.String(), n.Data) + } return nil } @@ -372,7 +374,7 @@ func TestAddFile(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - c, err := cl.AddFile(r, params) + c, err := cl.AddFile(context.Background(), r, params) if err != nil { t.Fatal(err) } @@ -401,7 +403,7 @@ func TestAddFile(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - c, err := cl.AddFile(r, params) + c, err := cl.AddFile(context.Background(), r, params) if err != nil { t.Fatal(err) } @@ -431,7 +433,7 @@ func TestUnpinShard(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - root, err := cl.AddFile(r, params) + root, err := cl.AddFile(context.Background(), r, params) if err != nil { t.Fatal(err) } diff --git a/ipfscluster.go b/ipfscluster.go index b23fd5be..d21e0a63 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -95,8 +95,8 @@ type IPFSConnector interface { RepoGC(context.Context) (api.RepoGC, error) // Resolve returns a cid given a path. Resolve(context.Context, string) (cid.Cid, error) - // BlockPut directly adds a block of data to the IPFS repo. - BlockPut(context.Context, api.NodeWithMeta) error + // BlockStream adds a stream of blocks to IPFS. + BlockStream(context.Context, <-chan api.NodeWithMeta) error // BlockGet retrieves the raw data of an IPFS block. BlockGet(context.Context, cid.Cid) ([]byte, error) } diff --git a/ipfscluster_test.go b/ipfscluster_test.go index b932eb35..1807fe97 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -2150,7 +2150,7 @@ func TestClustersFollowerMode(t *testing.T) { mfr, closer := sth.GetTreeMultiReader(t) defer closer.Close() r := multipart.NewReader(mfr, mfr.Boundary()) - _, err = clusters[1].AddFile(r, params) + _, err = clusters[1].AddFile(ctx, r, params) if err != errFollowerMode { t.Error("expected follower mode error") } diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 1c87121a..544d95dd 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/observations" + "go.uber.org/multierr" cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" @@ -118,7 +119,7 @@ type ipfsSwarmPeersResp struct { } type ipfsBlockPutResp struct { - Key string + Key api.Cid Size int } @@ -903,35 +904,128 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) { return swarm, nil } -// BlockPut triggers an ipfs block put on the given data, inserting the block -// into the ipfs daemon's repo. -func (ipfs *Connector) BlockPut(ctx context.Context, b api.NodeWithMeta) error { - ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockPut") - defer span.End() +// chanDirectory implementes the files.Directory interace +type chanDirectory struct { + iterator files.DirIterator +} - logger.Debugf("putting block to IPFS: %s", b.Cid) - ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) - defer cancel() - defer ipfs.updateInformerMetric(ctx) +// Close is a no-op and it is not used. +func (cd *chanDirectory) Close() error { + return nil +} - mapDir := files.NewMapDirectory( - map[string]files.Node{ // IPFS reqs require a wrapping directory - "": files.NewBytesFile(b.Data), - }, - ) +// not implemented, I think not needed for multipart. +func (cd *chanDirectory) Size() (int64, error) { + return 0, nil +} - multiFileR := files.NewMultiFileReader(mapDir, true) +func (cd *chanDirectory) Entries() files.DirIterator { + return cd.iterator +} +// chanIterator implements the files.DirIterator interface. +type chanIterator struct { + ctx context.Context + blocks <-chan api.NodeWithMeta + + current api.NodeWithMeta + peeked api.NodeWithMeta + done bool + err error + + seenMu sync.Mutex + seen *cid.Set +} + +func (ci *chanIterator) Name() string { + if !ci.current.Cid.Defined() { + return "" + } + return ci.current.Cid.String() +} + +// return NewBytesFile. +func (ci *chanIterator) Node() files.Node { + if !ci.current.Cid.Defined() { + return nil + } + ci.seenMu.Lock() + if ci.seen.Visit(ci.current.Cid) { + logger.Debugf("block %s", ci.current.Cid) + } + ci.seenMu.Unlock() + return files.NewBytesFile(ci.current.Data) +} + +func (ci *chanIterator) Seen(c api.Cid) bool { + ci.seenMu.Lock() + has := ci.seen.Has(cid.Cid(c)) + ci.seen.Remove(cid.Cid(c)) + ci.seenMu.Unlock() + return has +} + +func (ci *chanIterator) Done() bool { + return ci.done +} + +// Peek reads one block from the channel but saves it so that Next also +// returns it. +func (ci *chanIterator) Peek() (api.NodeWithMeta, bool) { + if ci.done { + return api.NodeWithMeta{}, false + } + + select { + case <-ci.ctx.Done(): + return api.NodeWithMeta{}, false + case next, ok := <-ci.blocks: + if !ok { + return api.NodeWithMeta{}, false + } + ci.peeked = next + return next, true + } +} + +func (ci *chanIterator) Next() bool { + if ci.done { + return false + } + if ci.peeked.Cid.Defined() { + ci.current = ci.peeked + ci.peeked = api.NodeWithMeta{} + return true + } + select { + case <-ci.ctx.Done(): + ci.done = true + ci.err = ci.ctx.Err() + return false + case next, ok := <-ci.blocks: + if !ok { + ci.done = true + return false + } + ci.current = next + return true + } +} + +func (ci *chanIterator) Err() error { + return ci.err +} + +func blockPutQuery(prefix cid.Prefix) (url.Values, error) { q := make(url.Values, 3) - prefix := b.Cid.Prefix() format, ok := cid.CodecToStr[prefix.Codec] if !ok { - return fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec) + return q, fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec) } mhType, ok := multihash.Codes[prefix.MhType] if !ok { - return fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType) + return q, fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType) } // IPFS behaves differently when using v0 or protobuf which are @@ -944,35 +1038,76 @@ func (ipfs *Connector) BlockPut(ctx context.Context, b api.NodeWithMeta) error { q.Set("mhtype", mhType) q.Set("mhlen", strconv.Itoa(prefix.MhLength)) + return q, nil +} +// BlockStream performs a multipart request to block/put with the blocks +// received on the channel. +func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWithMeta) error { + ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockStream") + defer span.End() + + logger.Debug("streaming blocks to IPFS") + defer ipfs.updateInformerMetric(ctx) + + var errs error + + it := &chanIterator{ + ctx: ctx, + blocks: blocks, + seen: cid.NewSet(), + } + dir := &chanDirectory{ + iterator: it, + } + + // We need to pick into the first block to know which Cid prefix we + // are writing blocks with, so that ipfs calculates the expected + // multihash (we select the function used). This means that all blocks + // in a stream should use the same. + peek, ok := it.Peek() + if !ok { + return errors.New("BlockStream: no blocks to peek in blocks channel") + } + + q, err := blockPutQuery(peek.Cid.Prefix()) + if err != nil { + return err + } url := "block/put?" + q.Encode() - contentType := "multipart/form-data; boundary=" + multiFileR.Boundary() - body, err := ipfs.postCtx(ctx, url, contentType, multiFileR) - if err != nil { - return err + // We essentially keep going on any request errors and keep putting + // blocks until we are done. We will, however, return a final error if + // there were errors along the way, but we do not abort the blocks + // stream because we could not block/put. + for !it.Done() { + multiFileR := files.NewMultiFileReader(dir, true) + contentType := "multipart/form-data; boundary=" + multiFileR.Boundary() + body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR) + if err != nil { + errs = multierr.Append(errs, err) + continue + } + dec := json.NewDecoder(body) + for { + var res ipfsBlockPutResp + err := dec.Decode(&res) + if err == io.EOF { + break + } + if err != nil { + logger.Error(err) + errs = multierr.Append(errs, err) + break + } + if !it.Seen(res.Key) { + logger.Warnf("blockPut response CID (%s) does not match any blocks sent", res.Key) + } + } + // continue until it.Done() } - var res ipfsBlockPutResp - err = json.Unmarshal(body, &res) - if err != nil { - return err - } - - logger.Debug("block/put response CID", res.Key) - respCid, err := cid.Decode(res.Key) - if err != nil { - logger.Error("cannot parse CID from BlockPut response") - return err - } - - // IPFS is too brittle here. CIDv0 != CIDv1. Sending "protobuf" format - // returns CidV1. Sending "v0" format (which maps to protobuf) - // returns CidV0. Leaving this as warning. - if !respCid.Equals(b.Cid) { - logger.Warnf("blockPut response CID (%s) does not match the block sent (%s)", respCid, b.Cid) - } - return nil + return errs } // BlockGet retrieves an ipfs block with the given cid diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index e6d844e6..c06dc8a3 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -17,7 +17,7 @@ import ( func init() { _ = logging.Logger - //logging.SetLogLevel("*", "DEBUG") + logging.SetLogLevel("*", "DEBUG") } func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) { @@ -315,26 +315,40 @@ func TestSwarmPeers(t *testing.T) { } } -func TestBlockPut(t *testing.T) { +func TestBlockStream(t *testing.T) { ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown(ctx) - // CidV1 - err := ipfs.BlockPut(ctx, api.NodeWithMeta{ + blocks := make(chan api.NodeWithMeta, 10) + blocks <- api.NodeWithMeta{ Data: []byte(test.Cid4Data), Cid: test.Cid4, - }) + } + + // Because this has a different prefix, + // it will produce a warning. + blocks <- api.NodeWithMeta{ + Data: []byte(test.Cid5Data), + Cid: test.Cid5, + } + close(blocks) + + err := ipfs.BlockStream(ctx, blocks) if err != nil { t.Error(err) } - // CidV0 - err = ipfs.BlockPut(ctx, api.NodeWithMeta{ + // Try only adding v0 cid now + blocks2 := make(chan api.NodeWithMeta, 1) + blocks2 <- api.NodeWithMeta{ Data: []byte(test.Cid5Data), Cid: test.Cid5, - }) + } + close(blocks2) + + err = ipfs.BlockStream(ctx, blocks2) if err != nil { t.Error(err) } @@ -353,11 +367,13 @@ func TestBlockGet(t *testing.T) { t.Fatal("expected to fail getting unput block") } - // Put and then successfully get - err = ipfs.BlockPut(ctx, api.NodeWithMeta{ + blocks := make(chan api.NodeWithMeta, 1) + blocks <- api.NodeWithMeta{ Data: test.ShardData, Cid: test.ShardCid, - }) + } + close(blocks) + err = ipfs.BlockStream(ctx, blocks) if err != nil { t.Fatal(err) } diff --git a/rpc_api.go b/rpc_api.go index 479a4bc2..d24df6b2 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -570,9 +570,10 @@ func (rpcapi *IPFSConnectorRPCAPI) SwarmPeers(ctx context.Context, in struct{}, return nil } -// BlockPut runs IPFSConnector.BlockPut(). -func (rpcapi *IPFSConnectorRPCAPI) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error { - return rpcapi.ipfs.BlockPut(ctx, in) +// BlockStream runs IPFSConnector.BlockStream(). +func (rpcapi *IPFSConnectorRPCAPI) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error { + close(out) + return rpcapi.ipfs.BlockStream(ctx, in) } // BlockGet runs IPFSConnector.BlockGet(). diff --git a/rpc_policy.go b/rpc_policy.go index 1d7eadf8..26e92069 100644 --- a/rpc_policy.go +++ b/rpc_policy.go @@ -47,16 +47,17 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{ "PinTracker.Untrack": RPCClosed, // IPFSConnector methods - "IPFSConnector.BlockGet": RPCClosed, - "IPFSConnector.BlockPut": RPCTrusted, // Called from Add() - "IPFSConnector.ConfigKey": RPCClosed, - "IPFSConnector.Pin": RPCClosed, - "IPFSConnector.PinLs": RPCClosed, - "IPFSConnector.PinLsCid": RPCClosed, - "IPFSConnector.RepoStat": RPCTrusted, // Called in broadcast from proxy/repo/stat - "IPFSConnector.Resolve": RPCClosed, - "IPFSConnector.SwarmPeers": RPCTrusted, // Called in ConnectGraph - "IPFSConnector.Unpin": RPCClosed, + "IPFSConnector.BlockGet": RPCClosed, + "IPFSConnector.BlockPut": RPCClosed, // Not used - replaced by BlockStream + "IPFSConnector.BlockStream": RPCTrusted, // Called by adders + "IPFSConnector.ConfigKey": RPCClosed, + "IPFSConnector.Pin": RPCClosed, + "IPFSConnector.PinLs": RPCClosed, + "IPFSConnector.PinLsCid": RPCClosed, + "IPFSConnector.RepoStat": RPCTrusted, // Called in broadcast from proxy/repo/stat + "IPFSConnector.Resolve": RPCClosed, + "IPFSConnector.SwarmPeers": RPCTrusted, // Called in ConnectGraph + "IPFSConnector.Unpin": RPCClosed, // Consensus methods "Consensus.AddPeer": RPCTrusted, // Called by Raft/redirect to leader diff --git a/rpcutil/policygen/policygen.go b/rpcutil/policygen/policygen.go index 95d74c0e..c2fdb34d 100644 --- a/rpcutil/policygen/policygen.go +++ b/rpcutil/policygen/policygen.go @@ -24,20 +24,21 @@ func rpcTypeStr(t cluster.RPCEndpointType) string { } var comments = map[string]string{ - "Cluster.PeerAdd": "Used by Join()", - "Cluster.Peers": "Used by ConnectGraph()", - "Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi", - "PinTracker.Recover": "Called in broadcast from Recover()", - "PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented", - "Pintracker.Status": "Called in broadcast from Status()", - "Pintracker.StatusAll": "Called in broadcast from StatusAll()", - "IPFSConnector.BlockPut": "Called from Add()", - "IPFSConnector.RepoStat": "Called in broadcast from proxy/repo/stat", - "IPFSConnector.SwarmPeers": "Called in ConnectGraph", - "Consensus.AddPeer": "Called by Raft/redirect to leader", - "Consensus.LogPin": "Called by Raft/redirect to leader", - "Consensus.LogUnpin": "Called by Raft/redirect to leader", - "Consensus.RmPeer": "Called by Raft/redirect to leader", + "Cluster.PeerAdd": "Used by Join()", + "Cluster.Peers": "Used by ConnectGraph()", + "Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi", + "PinTracker.Recover": "Called in broadcast from Recover()", + "PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented", + "Pintracker.Status": "Called in broadcast from Status()", + "Pintracker.StatusAll": "Called in broadcast from StatusAll()", + "IPFSConnector.BlockPut": "Not used - replaced by BlockStream", + "IPFSConnector.BlockStream": "Called by adders", + "IPFSConnector.RepoStat": "Called in broadcast from proxy/repo/stat", + "IPFSConnector.SwarmPeers": "Called in ConnectGraph", + "Consensus.AddPeer": "Called by Raft/redirect to leader", + "Consensus.LogPin": "Called by Raft/redirect to leader", + "Consensus.LogUnpin": "Called by Raft/redirect to leader", + "Consensus.RmPeer": "Called by Raft/redirect to leader", } func main() { diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index 1b8edf3e..f4fa00fc 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -367,48 +368,62 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { j, _ := json.Marshal(resp) w.Write(j) case "block/put": - // Get the data and retun the hash - mpr, err := r.MultipartReader() - if err != nil { - goto ERROR - } - part, err := mpr.NextPart() - if err != nil { - goto ERROR - } - data, err := ioutil.ReadAll(part) - if err != nil { - goto ERROR - } - // Parse cid from data and format and add to mock block-store + w.Header().Set("Trailer", "X-Stream-Error") + query := r.URL.Query() formatStr := query.Get("format") format := cid.Codecs[formatStr] mhType := multihash.Names[query.Get("mhtype")] mhLen, _ := strconv.Atoi(query.Get("mhLen")) - var builder cid.Builder - if formatStr == "v0" && mhType == multihash.SHA2_256 { - builder = cid.V0Builder{} - } else { - builder = cid.V1Builder{ - Codec: format, - MhType: mhType, - MhLength: mhLen, - } - } - - c, err := builder.Sum(data) + // Get the data and retun the hash + mpr, err := r.MultipartReader() if err != nil { goto ERROR } - m.BlockStore[c.String()] = data - resp := mockBlockPutResp{ - Key: c.String(), + w.WriteHeader(http.StatusOK) + + for { + part, err := mpr.NextPart() + if err == io.EOF { + return + } + if err != nil { + w.Header().Set("X-Stream-Error", err.Error()) + return + } + data, err := ioutil.ReadAll(part) + if err != nil { + w.Header().Set("X-Stream-Error", err.Error()) + return + } + // Parse cid from data and format and add to mock block-store + + var builder cid.Builder + if formatStr == "v0" && mhType == multihash.SHA2_256 { + builder = cid.V0Builder{} + } else { + builder = cid.V1Builder{ + Codec: format, + MhType: mhType, + MhLength: mhLen, + } + } + + c, err := builder.Sum(data) + if err != nil { + w.Header().Set("X-Stream-Error", err.Error()) + return + } + m.BlockStore[c.String()] = data + + resp := mockBlockPutResp{ + Key: c.String(), + } + j, _ := json.Marshal(resp) + w.Write(j) } - j, _ := json.Marshal(resp) - w.Write(j) case "block/get": query := r.URL.Query() arg, ok := query["arg"] diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 0acd7c0c..894be139 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -584,7 +584,8 @@ func (mock *mockIPFSConnector) RepoStat(ctx context.Context, in struct{}, out *a return nil } -func (mock *mockIPFSConnector) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error { +func (mock *mockIPFSConnector) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error { + close(out) return nil }