Merge pull request #1614 from ipfs/feat/810-streaming-blocks

Adders: stream blocks to destinations
This commit is contained in:
Hector Sanjuan 2022-03-24 18:56:03 +01:00 committed by GitHub
commit 5207cb34ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 511 additions and 242 deletions

View File

@ -32,7 +32,7 @@ func TestAdd(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[0].AddFile(r, params)
ci, err := clusters[0].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -67,7 +67,7 @@ func TestAdd(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[2].AddFile(r, params)
ci, err := clusters[2].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -119,7 +119,7 @@ func TestAddWithUserAllocations(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[0].AddFile(r, params)
ci, err := clusters[0].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -167,7 +167,7 @@ func TestAddPeerDown(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[1].AddFile(r, params)
ci, err := clusters[1].AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -218,7 +218,7 @@ func TestAddOnePeerFails(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := clusters[0].AddFile(r, params)
_, err := clusters[0].AddFile(context.Background(), r, params)
if err != nil {
t.Error(err)
}
@ -276,7 +276,7 @@ func TestAddAllPeersFail(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := clusters[0].AddFile(r, params)
_, err := clusters[0].AddFile(context.Background(), r, params)
if err != adder.ErrBlockAdder {
t.Error("expected ErrBlockAdder. Got: ", err)
}

View File

@ -68,18 +68,18 @@ type Adder struct {
// whenever a block is processed. They contain information
// about the block, the CID, the Name etc. and are mostly
// meant to be streamed back to the user.
output chan *api.AddedOutput
output chan api.AddedOutput
}
// New returns a new Adder with the given ClusterDAGService, add options and a
// channel to send updates during the adding process.
//
// An Adder may only be used once.
func New(ds ClusterDAGService, p api.AddParams, out chan *api.AddedOutput) *Adder {
func New(ds ClusterDAGService, p api.AddParams, out chan api.AddedOutput) *Adder {
// Discard all progress update output as the caller has not provided
// a channel for them to listen on.
if out == nil {
out = make(chan *api.AddedOutput, 100)
out = make(chan api.AddedOutput, 100)
go func() {
for range out {
}
@ -188,7 +188,7 @@ type ipfsAdder struct {
*ipfsadd.Adder
}
func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*ipfsAdder, error) {
func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*ipfsAdder, error) {
iadder, err := ipfsadd.NewAdder(ctx, dgs, dgs.Allocations)
if err != nil {
logger.Error(err)
@ -253,10 +253,10 @@ type carAdder struct {
ctx context.Context
dgs ClusterDAGService
params api.AddParams
output chan *api.AddedOutput
output chan api.AddedOutput
}
func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan *api.AddedOutput) (*carAdder, error) {
func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParams, out chan api.AddedOutput) (*carAdder, error) {
return &carAdder{
ctx: ctx,
dgs: dgs,
@ -319,7 +319,7 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) {
}
}
ca.output <- &api.AddedOutput{
ca.output <- api.AddedOutput{
Name: name,
Cid: root,
Bytes: bytes,

View File

@ -30,19 +30,19 @@ func AddMultipartHTTPHandler(
params api.AddParams,
reader *multipart.Reader,
w http.ResponseWriter,
outputTransform func(*api.AddedOutput) interface{},
outputTransform func(api.AddedOutput) interface{},
) (cid.Cid, error) {
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
output := make(chan api.AddedOutput, 200)
if params.Shard {
dags = sharding.New(rpc, params, output)
dags = sharding.New(ctx, rpc, params, output)
} else {
dags = single.New(rpc, params, params.Local)
dags = single.New(ctx, rpc, params, params.Local)
}
if outputTransform == nil {
outputTransform = func(in *api.AddedOutput) interface{} { return in }
outputTransform = func(in api.AddedOutput) interface{} { return in }
}
// This must be application/json otherwise go-ipfs client
@ -112,7 +112,7 @@ func AddMultipartHTTPHandler(
return root, err
}
func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) {
func streamOutput(w http.ResponseWriter, output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) {
flusher, flush := w.(http.Flusher)
enc := json.NewEncoder(w)
for v := range output {
@ -127,7 +127,7 @@ func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform
}
}
func buildOutput(output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) []interface{} {
func buildOutput(output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) []interface{} {
var finalOutput []interface{}
for v := range output {
finalOutput = append(finalOutput, transform(v))

View File

@ -51,7 +51,7 @@ type Adder struct {
ctx context.Context
dagService ipld.DAGService
allocsFun func() []peer.ID
Out chan *api.AddedOutput
Out chan api.AddedOutput
Progress bool
Trickle bool
RawLeaves bool
@ -425,9 +425,9 @@ func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) erro
}
// outputDagnode sends dagnode info over the output channel.
// Cluster: we use *api.AddedOutput instead of coreiface events
// Cluster: we use api.AddedOutput instead of coreiface events
// and make this an adder method to be be able to prefix.
func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error {
func (adder *Adder) outputDagnode(out chan api.AddedOutput, name string, dn ipld.Node) error {
if out == nil {
return nil
}
@ -445,7 +445,7 @@ func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipl
// account for this here.
name = filepath.Join(adder.OutputPrefix, name)
out <- &api.AddedOutput{
out <- api.AddedOutput{
Cid: dn.Cid(),
Name: name,
Size: s,
@ -458,7 +458,7 @@ func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipl
type progressReader struct {
file io.Reader
path string
out chan *api.AddedOutput
out chan api.AddedOutput
bytes int64
lastProgress int64
}
@ -469,7 +469,7 @@ func (i *progressReader) Read(p []byte) (int, error) {
i.bytes += int64(n)
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes
i.out <- &api.AddedOutput{
i.out <- api.AddedOutput{
Name: i.path,
Bytes: uint64(i.bytes),
}

View File

@ -30,10 +30,11 @@ var logger = logging.Logger("shardingdags")
type DAGService struct {
adder.BaseDAGService
ctx context.Context
rpcClient *rpc.Client
addParams api.AddParams
output chan<- *api.AddedOutput
output chan<- api.AddedOutput
addedSet *cid.Set
@ -50,11 +51,12 @@ type DAGService struct {
}
// New returns a new ClusterDAGService, which uses the given rpc client to perform
// Allocate, IPFSBlockPut and Pin requests to other cluster components.
func New(rpc *rpc.Client, opts api.AddParams, out chan<- *api.AddedOutput) *DAGService {
// Allocate, IPFSStream and Pin requests to other cluster components.
func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, out chan<- api.AddedOutput) *DAGService {
// use a default value for this regardless of what is provided.
opts.Mode = api.PinModeRecursive
return &DAGService{
ctx: ctx,
rpcClient: rpc,
addParams: opts,
output: out,
@ -93,14 +95,34 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
}
// PutDAG to ourselves
err = adder.NewBlockAdder(dgs.rpcClient, []peer.ID{""}).AddMany(ctx, clusterDAGNodes)
if err != nil {
blocks := make(chan api.NodeWithMeta, 256)
go func() {
defer close(blocks)
for _, n := range clusterDAGNodes {
select {
case <-ctx.Done():
logger.Error(ctx.Err())
return //abort
case blocks <- adder.IpldNodeToNodeWithMeta(n):
}
}
}()
// Stream these blocks and wait until we are done.
bs := adder.NewBlockStreamer(ctx, dgs.rpcClient, []peer.ID{""}, blocks)
select {
case <-ctx.Done():
return dataRoot, ctx.Err()
case <-bs.Done():
}
if err := bs.Err(); err != nil {
return dataRoot, err
}
clusterDAG := clusterDAGNodes[0].Cid()
dgs.sendOutput(&api.AddedOutput{
dgs.sendOutput(api.AddedOutput{
Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name),
Cid: clusterDAG,
Size: dgs.totalSize,
@ -174,7 +196,8 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error {
if shard == nil {
logger.Infof("new shard for '%s': #%d", dgs.addParams.Name, len(dgs.shards))
var err error
shard, err = newShard(ctx, dgs.rpcClient, dgs.addParams.PinOptions)
// important: shards use the DAGService context.
shard, err = newShard(dgs.ctx, ctx, dgs.rpcClient, dgs.addParams.PinOptions)
if err != nil {
return err
}
@ -189,7 +212,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error {
// add the block to it if it fits and return
if shard.Size()+size < shard.Limit() {
shard.AddLink(ctx, n.Cid(), size)
return dgs.currentShard.ba.Add(ctx, n)
return dgs.currentShard.sendBlock(ctx, n)
}
logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d",
@ -246,7 +269,7 @@ Ingest Rate: %s/s
}
func (dgs *DAGService) sendOutput(ao *api.AddedOutput) {
func (dgs *DAGService) sendOutput(ao api.AddedOutput) {
if dgs.output != nil {
dgs.output <- ao
}
@ -269,7 +292,7 @@ func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) {
dgs.shards[fmt.Sprintf("%d", lens)] = shardCid
dgs.previousShard = shardCid
dgs.currentShard = nil
dgs.sendOutput(&api.AddedOutput{
dgs.sendOutput(api.AddedOutput{
Name: fmt.Sprintf("shard-%d", lens),
Cid: shardCid,
Size: shard.Size(),

View File

@ -27,8 +27,11 @@ type testRPC struct {
pins sync.Map
}
func (rpcs *testRPC) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in.Data)
func (rpcs *testRPC) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
defer close(out)
for n := range in {
rpcs.blocks.Store(n.Cid.String(), n.Data)
}
return nil
}
@ -77,9 +80,9 @@ func makeAdder(t *testing.T, params api.AddParams) (*adder.Adder, *testRPC) {
}
client := rpc.NewClientWithServer(nil, "mock", server)
out := make(chan *api.AddedOutput, 1)
out := make(chan api.AddedOutput, 1)
dags := New(client, params, out)
dags := New(context.Background(), client, params, out)
add := adder.New(dags, params, out)
go func() {

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
@ -18,10 +19,12 @@ import (
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
ba *adder.BlockAdder
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
@ -29,7 +32,7 @@ type shard struct {
sizeLimit uint64
}
func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) {
func newShard(globalCtx context.Context, ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) {
allocs, err := adder.BlockAllocate(ctx, rpc, opts)
if err != nil {
return nil, err
@ -47,11 +50,15 @@ func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard
// TODO (hector): get latest metrics for allocations, adjust sizeLimit
// to minimum. This can be done later.
blocks := make(chan api.NodeWithMeta, 256)
return &shard{
ctx: globalCtx,
rpc: rpc,
allocations: allocs,
pinOptions: opts,
ba: adder.NewBlockAdder(rpc, allocs),
bs: adder.NewBlockStreamer(globalCtx, rpc, allocs, blocks),
blocks: blocks,
dagNode: make(map[string]cid.Cid),
currentSize: 0,
sizeLimit: opts.ShardSize,
@ -77,6 +84,15 @@ func (sh *shard) Allocations() []peer.ID {
return sh.allocations
}
func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
select {
case <-ctx.Done():
return ctx.Err()
case sh.blocks <- adder.IpldNodeToNodeWithMeta(n):
return nil
}
}
// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
@ -87,8 +103,21 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
return cid.Undef, err
}
err = sh.ba.AddMany(ctx, nodes)
if err != nil {
for _, n := range nodes {
err = sh.sendBlock(ctx, n)
if err != nil {
close(sh.blocks)
return cid.Undef, err
}
}
close(sh.blocks)
select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
case <-sh.bs.Done():
}
if err := sh.bs.Err(); err != nil {
return cid.Undef, err
}

View File

@ -24,30 +24,38 @@ var _ = logger // otherwise unused
type DAGService struct {
adder.BaseDAGService
ctx context.Context
rpcClient *rpc.Client
dests []peer.ID
addParams api.AddParams
local bool
ba *adder.BlockAdder
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
}
// New returns a new Adder with the given rpc Client. The client is used
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
func New(rpc *rpc.Client, opts api.AddParams, local bool) *DAGService {
// to perform calls to IPFS.BlockStream and Pin content on Cluster.
func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, local bool) *DAGService {
// ensure don't Add something and pin it in direct mode.
opts.Mode = api.PinModeRecursive
return &DAGService{
ctx: ctx,
rpcClient: rpc,
dests: nil,
addParams: opts,
local: local,
blocks: make(chan api.NodeWithMeta, 256),
}
}
// Add puts the given node in the destination peers.
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
// FIXME: can't this happen on initialization? Perhaps the point here
// is the adder only allocates and starts streaming when the first
// block arrives and not on creation.
if dgs.dests == nil {
dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.addParams.PinOptions)
if err != nil {
@ -76,17 +84,39 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
dgs.dests[len(dgs.dests)-1] = localPid
}
dgs.ba = adder.NewBlockAdder(dgs.rpcClient, []peer.ID{localPid})
dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, []peer.ID{localPid}, dgs.blocks)
} else {
dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dgs.dests)
dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, dgs.dests, dgs.blocks)
}
}
return dgs.ba.Add(ctx, node)
select {
case <-ctx.Done():
return ctx.Err()
case <-dgs.ctx.Done():
return ctx.Err()
case dgs.blocks <- adder.IpldNodeToNodeWithMeta(node):
return nil
}
}
// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
close(dgs.blocks)
select {
case <-dgs.ctx.Done():
return root, ctx.Err()
case <-ctx.Done():
return root, ctx.Err()
case <-dgs.bs.Done():
}
// If the streamer failed to put blocks.
if err := dgs.bs.Err(); err != nil {
return root, err
}
// Do not pin, just block put.
// Why? Because some people are uploading CAR files with partial DAGs
// and ideally they should be pinning only when the last partial CAR

View File

@ -23,8 +23,11 @@ type testClusterRPC struct {
pins sync.Map
}
func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in)
func (rpcs *testIPFSRPC) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
defer close(out)
for n := range in {
rpcs.blocks.Store(n.Cid.String(), n)
}
return nil
}
@ -61,7 +64,7 @@ func TestAdd(t *testing.T) {
params := api.DefaultAddParams()
params.Wrap = true
dags := New(client, params, false)
dags := New(context.Background(), client, params, false)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()
@ -83,7 +86,7 @@ func TestAdd(t *testing.T) {
for _, c := range expected {
_, ok := ipfsRPC.blocks.Load(c)
if !ok {
t.Error("no IPFS.BlockPut for block", c)
t.Error("block was not added to IPFS", c)
}
}
@ -109,7 +112,7 @@ func TestAdd(t *testing.T) {
params := api.DefaultAddParams()
params.Layout = "trickle"
dags := New(client, params, false)
dags := New(context.Background(), client, params, false)
add := adder.New(dags, params, nil)
sth := test.NewShardingTestHelper()

View File

@ -4,9 +4,10 @@ import (
"context"
"errors"
"fmt"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
@ -18,82 +19,90 @@ import (
// block fails on all of them.
var ErrBlockAdder = errors.New("failed to put block on all destinations")
// BlockAdder implements "github.com/ipfs/go-ipld-format".NodeAdder.
// It helps sending nodes to multiple destinations, as long as one of
// them is still working.
type BlockAdder struct {
// BlockStreamer helps streaming nodes to multiple destinations, as long as
// one of them is still working.
type BlockStreamer struct {
dests []peer.ID
rpcClient *rpc.Client
blocks <-chan api.NodeWithMeta
ctx context.Context
cancel context.CancelFunc
errMu sync.Mutex
err error
}
// NewBlockAdder creates a BlockAdder given an rpc client and allocated peers.
func NewBlockAdder(rpcClient *rpc.Client, dests []peer.ID) *BlockAdder {
return &BlockAdder{
// NewBlockStreamer creates a BlockStreamer given an rpc client, allocated
// peers and a channel on which the blocks to stream are received.
func NewBlockStreamer(ctx context.Context, rpcClient *rpc.Client, dests []peer.ID, blocks <-chan api.NodeWithMeta) *BlockStreamer {
bsCtx, cancel := context.WithCancel(ctx)
bs := BlockStreamer{
ctx: bsCtx,
cancel: cancel,
dests: dests,
rpcClient: rpcClient,
blocks: blocks,
err: nil,
}
go bs.streamBlocks()
return &bs
}
// Add puts an ipld node to the allocated destinations.
func (ba *BlockAdder) Add(ctx context.Context, node ipld.Node) error {
nodeSerial := ipldNodeToNodeWithMeta(node)
// Done returns a channel which gets closed when the BlockStreamer has
// finished.
func (bs *BlockStreamer) Done() <-chan struct{} {
return bs.ctx.Done()
}
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(ba.dests))
defer rpcutil.MultiCancel(cancels)
func (bs *BlockStreamer) setErr(err error) {
bs.errMu.Lock()
bs.err = err
bs.errMu.Unlock()
}
logger.Debugf("block put %s to %s", nodeSerial.Cid, ba.dests)
errs := ba.rpcClient.MultiCall(
ctxs,
ba.dests,
// Err returns any errors that happened after the operation of the
// BlockStreamer, for example when blocks could not be put to all nodes.
func (bs *BlockStreamer) Err() error {
bs.errMu.Lock()
defer bs.errMu.Unlock()
return bs.err
}
func (bs *BlockStreamer) streamBlocks() {
defer bs.cancel()
// Nothing should be sent on out.
// We drain though
out := make(chan struct{})
go func() {
for range out {
}
}()
errs := bs.rpcClient.MultiStream(
bs.ctx,
bs.dests,
"IPFSConnector",
"BlockPut",
nodeSerial,
rpcutil.RPCDiscardReplies(len(ba.dests)),
"BlockStream",
bs.blocks,
out,
)
var successfulDests []peer.ID
numErrs := 0
for i, e := range errs {
if e != nil {
logger.Errorf("BlockPut on %s: %s", ba.dests[i], e)
numErrs++
}
// RPCErrors include server errors (wrong RPC methods), client
// errors (creating, writing or reading streams) and
// authorization errors, but not IPFS errors from a failed blockput
// for example.
if rpc.IsRPCError(e) {
continue
}
successfulDests = append(successfulDests, ba.dests[i])
// this eliminates any nil errors.
combinedErrors := multierr.Combine(errs...)
if len(multierr.Errors(combinedErrors)) == len(bs.dests) {
logger.Error(combinedErrors)
bs.setErr(ErrBlockAdder)
} else {
logger.Warning("there were errors streaming blocks, but at least one destination succeeded")
logger.Warning(combinedErrors)
}
// If all requests resulted in errors, fail.
// Successful dests will have members when no errors happened
// or when an error happened but it was not an RPC error.
// As long as BlockPut worked in 1 destination, we move on.
if numErrs == len(ba.dests) || len(successfulDests) == 0 {
return ErrBlockAdder
}
ba.dests = successfulDests
return nil
}
// AddMany puts multiple ipld nodes to allocated destinations.
func (ba *BlockAdder) AddMany(ctx context.Context, nodes []ipld.Node) error {
for _, node := range nodes {
err := ba.Add(ctx, node)
if err != nil {
return err
}
}
return nil
}
// ipldNodeToNodeSerial converts an ipld.Node to NodeWithMeta.
func ipldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
// IpldNodeToNodeWithMeta converts an ipld.Node to api.NodeWithMeta.
func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
size, err := n.Size()
if err != nil {
logger.Warn(err)

View File

@ -615,7 +615,7 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
logger.Warnf("Proxy/add does not support all IPFS params. Current options: %+v", params)
outputTransform := func(in *api.AddedOutput) interface{} {
outputTransform := func(in api.AddedOutput) interface{} {
cidStr := ""
if in.Cid.Defined() {
cidStr = in.Cid.String()

View File

@ -1728,17 +1728,17 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) {
// pipeline is used to DAGify the file. Depending on input parameters this
// DAG can be added locally to the calling cluster peer's ipfs repo, or
// sharded across the entire cluster.
func (c *Cluster) AddFile(reader *multipart.Reader, params api.AddParams) (cid.Cid, error) {
func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (cid.Cid, error) {
// TODO: add context param and tracing
var dags adder.ClusterDAGService
if params.Shard {
dags = sharding.New(c.rpcClient, params, nil)
dags = sharding.New(ctx, c.rpcClient, params, nil)
} else {
dags = single.New(c.rpcClient, params, params.Local)
dags = single.New(ctx, c.rpcClient, params, params.Local)
}
add := adder.New(dags, params, nil)
return add.FromMultipart(c.ctx, reader)
return add.FromMultipart(ctx, reader)
}
// Version returns the current IPFS Cluster version.

View File

@ -134,8 +134,10 @@ func (ipfs *mockConnector) Resolve(ctx context.Context, path string) (cid.Cid, e
func (ipfs *mockConnector) ConnectSwarms(ctx context.Context) error { return nil }
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
func (ipfs *mockConnector) BlockPut(ctx context.Context, nwm api.NodeWithMeta) error {
ipfs.blocks.Store(nwm.Cid.String(), nwm.Data)
func (ipfs *mockConnector) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta) error {
for n := range in {
ipfs.blocks.Store(n.Cid.String(), n.Data)
}
return nil
}
@ -372,7 +374,7 @@ func TestAddFile(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
c, err := cl.AddFile(r, params)
c, err := cl.AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -401,7 +403,7 @@ func TestAddFile(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
c, err := cl.AddFile(r, params)
c, err := cl.AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}
@ -431,7 +433,7 @@ func TestUnpinShard(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
root, err := cl.AddFile(r, params)
root, err := cl.AddFile(context.Background(), r, params)
if err != nil {
t.Fatal(err)
}

View File

@ -95,8 +95,8 @@ type IPFSConnector interface {
RepoGC(context.Context) (api.RepoGC, error)
// Resolve returns a cid given a path.
Resolve(context.Context, string) (cid.Cid, error)
// BlockPut directly adds a block of data to the IPFS repo.
BlockPut(context.Context, api.NodeWithMeta) error
// BlockStream adds a stream of blocks to IPFS.
BlockStream(context.Context, <-chan api.NodeWithMeta) error
// BlockGet retrieves the raw data of an IPFS block.
BlockGet(context.Context, cid.Cid) ([]byte, error)
}

View File

@ -2150,7 +2150,7 @@ func TestClustersFollowerMode(t *testing.T) {
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
_, err = clusters[1].AddFile(r, params)
_, err = clusters[1].AddFile(ctx, r, params)
if err != errFollowerMode {
t.Error("expected follower mode error")
}

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
@ -118,7 +119,7 @@ type ipfsSwarmPeersResp struct {
}
type ipfsBlockPutResp struct {
Key string
Key api.Cid
Size int
}
@ -903,35 +904,128 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
return swarm, nil
}
// BlockPut triggers an ipfs block put on the given data, inserting the block
// into the ipfs daemon's repo.
func (ipfs *Connector) BlockPut(ctx context.Context, b api.NodeWithMeta) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockPut")
defer span.End()
// chanDirectory implementes the files.Directory interace
type chanDirectory struct {
iterator files.DirIterator
}
logger.Debugf("putting block to IPFS: %s", b.Cid)
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
defer ipfs.updateInformerMetric(ctx)
// Close is a no-op and it is not used.
func (cd *chanDirectory) Close() error {
return nil
}
mapDir := files.NewMapDirectory(
map[string]files.Node{ // IPFS reqs require a wrapping directory
"": files.NewBytesFile(b.Data),
},
)
// not implemented, I think not needed for multipart.
func (cd *chanDirectory) Size() (int64, error) {
return 0, nil
}
multiFileR := files.NewMultiFileReader(mapDir, true)
func (cd *chanDirectory) Entries() files.DirIterator {
return cd.iterator
}
// chanIterator implements the files.DirIterator interface.
type chanIterator struct {
ctx context.Context
blocks <-chan api.NodeWithMeta
current api.NodeWithMeta
peeked api.NodeWithMeta
done bool
err error
seenMu sync.Mutex
seen *cid.Set
}
func (ci *chanIterator) Name() string {
if !ci.current.Cid.Defined() {
return ""
}
return ci.current.Cid.String()
}
// return NewBytesFile.
func (ci *chanIterator) Node() files.Node {
if !ci.current.Cid.Defined() {
return nil
}
ci.seenMu.Lock()
if ci.seen.Visit(ci.current.Cid) {
logger.Debugf("block %s", ci.current.Cid)
}
ci.seenMu.Unlock()
return files.NewBytesFile(ci.current.Data)
}
func (ci *chanIterator) Seen(c api.Cid) bool {
ci.seenMu.Lock()
has := ci.seen.Has(cid.Cid(c))
ci.seen.Remove(cid.Cid(c))
ci.seenMu.Unlock()
return has
}
func (ci *chanIterator) Done() bool {
return ci.done
}
// Peek reads one block from the channel but saves it so that Next also
// returns it.
func (ci *chanIterator) Peek() (api.NodeWithMeta, bool) {
if ci.done {
return api.NodeWithMeta{}, false
}
select {
case <-ci.ctx.Done():
return api.NodeWithMeta{}, false
case next, ok := <-ci.blocks:
if !ok {
return api.NodeWithMeta{}, false
}
ci.peeked = next
return next, true
}
}
func (ci *chanIterator) Next() bool {
if ci.done {
return false
}
if ci.peeked.Cid.Defined() {
ci.current = ci.peeked
ci.peeked = api.NodeWithMeta{}
return true
}
select {
case <-ci.ctx.Done():
ci.done = true
ci.err = ci.ctx.Err()
return false
case next, ok := <-ci.blocks:
if !ok {
ci.done = true
return false
}
ci.current = next
return true
}
}
func (ci *chanIterator) Err() error {
return ci.err
}
func blockPutQuery(prefix cid.Prefix) (url.Values, error) {
q := make(url.Values, 3)
prefix := b.Cid.Prefix()
format, ok := cid.CodecToStr[prefix.Codec]
if !ok {
return fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec)
return q, fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec)
}
mhType, ok := multihash.Codes[prefix.MhType]
if !ok {
return fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType)
return q, fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType)
}
// IPFS behaves differently when using v0 or protobuf which are
@ -944,35 +1038,76 @@ func (ipfs *Connector) BlockPut(ctx context.Context, b api.NodeWithMeta) error {
q.Set("mhtype", mhType)
q.Set("mhlen", strconv.Itoa(prefix.MhLength))
return q, nil
}
// BlockStream performs a multipart request to block/put with the blocks
// received on the channel.
func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWithMeta) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockStream")
defer span.End()
logger.Debug("streaming blocks to IPFS")
defer ipfs.updateInformerMetric(ctx)
var errs error
it := &chanIterator{
ctx: ctx,
blocks: blocks,
seen: cid.NewSet(),
}
dir := &chanDirectory{
iterator: it,
}
// We need to pick into the first block to know which Cid prefix we
// are writing blocks with, so that ipfs calculates the expected
// multihash (we select the function used). This means that all blocks
// in a stream should use the same.
peek, ok := it.Peek()
if !ok {
return errors.New("BlockStream: no blocks to peek in blocks channel")
}
q, err := blockPutQuery(peek.Cid.Prefix())
if err != nil {
return err
}
url := "block/put?" + q.Encode()
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
body, err := ipfs.postCtx(ctx, url, contentType, multiFileR)
if err != nil {
return err
// We essentially keep going on any request errors and keep putting
// blocks until we are done. We will, however, return a final error if
// there were errors along the way, but we do not abort the blocks
// stream because we could not block/put.
for !it.Done() {
multiFileR := files.NewMultiFileReader(dir, true)
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
dec := json.NewDecoder(body)
for {
var res ipfsBlockPutResp
err := dec.Decode(&res)
if err == io.EOF {
break
}
if err != nil {
logger.Error(err)
errs = multierr.Append(errs, err)
break
}
if !it.Seen(res.Key) {
logger.Warnf("blockPut response CID (%s) does not match any blocks sent", res.Key)
}
}
// continue until it.Done()
}
var res ipfsBlockPutResp
err = json.Unmarshal(body, &res)
if err != nil {
return err
}
logger.Debug("block/put response CID", res.Key)
respCid, err := cid.Decode(res.Key)
if err != nil {
logger.Error("cannot parse CID from BlockPut response")
return err
}
// IPFS is too brittle here. CIDv0 != CIDv1. Sending "protobuf" format
// returns CidV1. Sending "v0" format (which maps to protobuf)
// returns CidV0. Leaving this as warning.
if !respCid.Equals(b.Cid) {
logger.Warnf("blockPut response CID (%s) does not match the block sent (%s)", respCid, b.Cid)
}
return nil
return errs
}
// BlockGet retrieves an ipfs block with the given cid

View File

@ -17,7 +17,7 @@ import (
func init() {
_ = logging.Logger
//logging.SetLogLevel("*", "DEBUG")
logging.SetLogLevel("*", "DEBUG")
}
func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
@ -315,26 +315,40 @@ func TestSwarmPeers(t *testing.T) {
}
}
func TestBlockPut(t *testing.T) {
func TestBlockStream(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown(ctx)
// CidV1
err := ipfs.BlockPut(ctx, api.NodeWithMeta{
blocks := make(chan api.NodeWithMeta, 10)
blocks <- api.NodeWithMeta{
Data: []byte(test.Cid4Data),
Cid: test.Cid4,
})
}
// Because this has a different prefix,
// it will produce a warning.
blocks <- api.NodeWithMeta{
Data: []byte(test.Cid5Data),
Cid: test.Cid5,
}
close(blocks)
err := ipfs.BlockStream(ctx, blocks)
if err != nil {
t.Error(err)
}
// CidV0
err = ipfs.BlockPut(ctx, api.NodeWithMeta{
// Try only adding v0 cid now
blocks2 := make(chan api.NodeWithMeta, 1)
blocks2 <- api.NodeWithMeta{
Data: []byte(test.Cid5Data),
Cid: test.Cid5,
})
}
close(blocks2)
err = ipfs.BlockStream(ctx, blocks2)
if err != nil {
t.Error(err)
}
@ -353,11 +367,13 @@ func TestBlockGet(t *testing.T) {
t.Fatal("expected to fail getting unput block")
}
// Put and then successfully get
err = ipfs.BlockPut(ctx, api.NodeWithMeta{
blocks := make(chan api.NodeWithMeta, 1)
blocks <- api.NodeWithMeta{
Data: test.ShardData,
Cid: test.ShardCid,
})
}
close(blocks)
err = ipfs.BlockStream(ctx, blocks)
if err != nil {
t.Fatal(err)
}

View File

@ -570,9 +570,10 @@ func (rpcapi *IPFSConnectorRPCAPI) SwarmPeers(ctx context.Context, in struct{},
return nil
}
// BlockPut runs IPFSConnector.BlockPut().
func (rpcapi *IPFSConnectorRPCAPI) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
return rpcapi.ipfs.BlockPut(ctx, in)
// BlockStream runs IPFSConnector.BlockStream().
func (rpcapi *IPFSConnectorRPCAPI) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
close(out)
return rpcapi.ipfs.BlockStream(ctx, in)
}
// BlockGet runs IPFSConnector.BlockGet().

View File

@ -47,16 +47,17 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"PinTracker.Untrack": RPCClosed,
// IPFSConnector methods
"IPFSConnector.BlockGet": RPCClosed,
"IPFSConnector.BlockPut": RPCTrusted, // Called from Add()
"IPFSConnector.ConfigKey": RPCClosed,
"IPFSConnector.Pin": RPCClosed,
"IPFSConnector.PinLs": RPCClosed,
"IPFSConnector.PinLsCid": RPCClosed,
"IPFSConnector.RepoStat": RPCTrusted, // Called in broadcast from proxy/repo/stat
"IPFSConnector.Resolve": RPCClosed,
"IPFSConnector.SwarmPeers": RPCTrusted, // Called in ConnectGraph
"IPFSConnector.Unpin": RPCClosed,
"IPFSConnector.BlockGet": RPCClosed,
"IPFSConnector.BlockPut": RPCClosed, // Not used - replaced by BlockStream
"IPFSConnector.BlockStream": RPCTrusted, // Called by adders
"IPFSConnector.ConfigKey": RPCClosed,
"IPFSConnector.Pin": RPCClosed,
"IPFSConnector.PinLs": RPCClosed,
"IPFSConnector.PinLsCid": RPCClosed,
"IPFSConnector.RepoStat": RPCTrusted, // Called in broadcast from proxy/repo/stat
"IPFSConnector.Resolve": RPCClosed,
"IPFSConnector.SwarmPeers": RPCTrusted, // Called in ConnectGraph
"IPFSConnector.Unpin": RPCClosed,
// Consensus methods
"Consensus.AddPeer": RPCTrusted, // Called by Raft/redirect to leader

View File

@ -24,20 +24,21 @@ func rpcTypeStr(t cluster.RPCEndpointType) string {
}
var comments = map[string]string{
"Cluster.PeerAdd": "Used by Join()",
"Cluster.Peers": "Used by ConnectGraph()",
"Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi",
"PinTracker.Recover": "Called in broadcast from Recover()",
"PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented",
"Pintracker.Status": "Called in broadcast from Status()",
"Pintracker.StatusAll": "Called in broadcast from StatusAll()",
"IPFSConnector.BlockPut": "Called from Add()",
"IPFSConnector.RepoStat": "Called in broadcast from proxy/repo/stat",
"IPFSConnector.SwarmPeers": "Called in ConnectGraph",
"Consensus.AddPeer": "Called by Raft/redirect to leader",
"Consensus.LogPin": "Called by Raft/redirect to leader",
"Consensus.LogUnpin": "Called by Raft/redirect to leader",
"Consensus.RmPeer": "Called by Raft/redirect to leader",
"Cluster.PeerAdd": "Used by Join()",
"Cluster.Peers": "Used by ConnectGraph()",
"Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi",
"PinTracker.Recover": "Called in broadcast from Recover()",
"PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented",
"Pintracker.Status": "Called in broadcast from Status()",
"Pintracker.StatusAll": "Called in broadcast from StatusAll()",
"IPFSConnector.BlockPut": "Not used - replaced by BlockStream",
"IPFSConnector.BlockStream": "Called by adders",
"IPFSConnector.RepoStat": "Called in broadcast from proxy/repo/stat",
"IPFSConnector.SwarmPeers": "Called in ConnectGraph",
"Consensus.AddPeer": "Called by Raft/redirect to leader",
"Consensus.LogPin": "Called by Raft/redirect to leader",
"Consensus.LogUnpin": "Called by Raft/redirect to leader",
"Consensus.RmPeer": "Called by Raft/redirect to leader",
}
func main() {

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -367,48 +368,62 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
j, _ := json.Marshal(resp)
w.Write(j)
case "block/put":
// Get the data and retun the hash
mpr, err := r.MultipartReader()
if err != nil {
goto ERROR
}
part, err := mpr.NextPart()
if err != nil {
goto ERROR
}
data, err := ioutil.ReadAll(part)
if err != nil {
goto ERROR
}
// Parse cid from data and format and add to mock block-store
w.Header().Set("Trailer", "X-Stream-Error")
query := r.URL.Query()
formatStr := query.Get("format")
format := cid.Codecs[formatStr]
mhType := multihash.Names[query.Get("mhtype")]
mhLen, _ := strconv.Atoi(query.Get("mhLen"))
var builder cid.Builder
if formatStr == "v0" && mhType == multihash.SHA2_256 {
builder = cid.V0Builder{}
} else {
builder = cid.V1Builder{
Codec: format,
MhType: mhType,
MhLength: mhLen,
}
}
c, err := builder.Sum(data)
// Get the data and retun the hash
mpr, err := r.MultipartReader()
if err != nil {
goto ERROR
}
m.BlockStore[c.String()] = data
resp := mockBlockPutResp{
Key: c.String(),
w.WriteHeader(http.StatusOK)
for {
part, err := mpr.NextPart()
if err == io.EOF {
return
}
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
data, err := ioutil.ReadAll(part)
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
// Parse cid from data and format and add to mock block-store
var builder cid.Builder
if formatStr == "v0" && mhType == multihash.SHA2_256 {
builder = cid.V0Builder{}
} else {
builder = cid.V1Builder{
Codec: format,
MhType: mhType,
MhLength: mhLen,
}
}
c, err := builder.Sum(data)
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
m.BlockStore[c.String()] = data
resp := mockBlockPutResp{
Key: c.String(),
}
j, _ := json.Marshal(resp)
w.Write(j)
}
j, _ := json.Marshal(resp)
w.Write(j)
case "block/get":
query := r.URL.Query()
arg, ok := query["arg"]

View File

@ -584,7 +584,8 @@ func (mock *mockIPFSConnector) RepoStat(ctx context.Context, in struct{}, out *a
return nil
}
func (mock *mockIPFSConnector) BlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
func (mock *mockIPFSConnector) BlockStream(ctx context.Context, in <-chan api.NodeWithMeta, out chan<- struct{}) error {
close(out)
return nil
}