ipfs-cluster/adder/sharding/shard.go
Hector Sanjuan 1d98538411 Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.

Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).

Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.

Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.

Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.

Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.

Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 17:24:58 +01:00

166 lines
4.3 KiB
Go

package sharding
import (
"context"
"fmt"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
humanize "github.com/dustin/go-humanize"
)
// a shard represents a set of blocks (or bucket) which have been assigned
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
currentSize uint64
sizeLimit uint64
}
func newShard(globalCtx context.Context, ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) {
allocs, err := adder.BlockAllocate(ctx, rpc, opts)
if err != nil {
return nil, err
}
if opts.ReplicationFactorMin > 0 && len(allocs) == 0 {
// This would mean that the empty cid is part of the shared state somehow.
panic("allocations for new shard cannot be empty without error")
}
if opts.ReplicationFactorMin < 0 {
logger.Warn("Shard is set to replicate everywhere ,which doesn't make sense for sharding")
}
// TODO (hector): get latest metrics for allocations, adjust sizeLimit
// to minimum. This can be done later.
blocks := make(chan api.NodeWithMeta, 256)
return &shard{
ctx: globalCtx,
rpc: rpc,
allocations: allocs,
pinOptions: opts,
bs: adder.NewBlockStreamer(globalCtx, rpc, allocs, blocks),
blocks: blocks,
dagNode: make(map[string]cid.Cid),
currentSize: 0,
sizeLimit: opts.ShardSize,
}, nil
}
// AddLink tries to add a new block to this shard if it's not full.
// Returns true if the block was added
func (sh *shard) AddLink(ctx context.Context, c cid.Cid, s uint64) {
linkN := len(sh.dagNode)
linkName := fmt.Sprintf("%d", linkN)
logger.Debugf("shard: add link: %s", linkName)
sh.dagNode[linkName] = c
sh.currentSize += s
}
// Allocations returns the peer IDs on which blocks are put for this shard.
func (sh *shard) Allocations() []peer.ID {
if len(sh.allocations) == 1 && sh.allocations[0] == "" {
return nil
}
return sh.allocations
}
func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
select {
case <-ctx.Done():
return ctx.Err()
case sh.blocks <- adder.IpldNodeToNodeWithMeta(n):
return nil
}
}
// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, error) {
logger.Debugf("shard %d: flush", shardN)
nodes, err := makeDAG(ctx, sh.dagNode)
if err != nil {
return cid.Undef, err
}
for _, n := range nodes {
err = sh.sendBlock(ctx, n)
if err != nil {
close(sh.blocks)
return cid.Undef, err
}
}
close(sh.blocks)
select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
case <-sh.bs.Done():
}
if err := sh.bs.Err(); err != nil {
return cid.Undef, err
}
rootCid := nodes[0].Cid()
pin := api.PinWithOpts(rootCid, sh.pinOptions)
pin.Name = fmt.Sprintf("%s-shard-%d", sh.pinOptions.Name, shardN)
// this sets allocations as priority allocation
pin.Allocations = sh.allocations
pin.Type = api.ShardType
pin.Reference = &prev
pin.MaxDepth = 1
pin.ShardSize = sh.Size() // use current size, not the limit
if len(nodes) > len(sh.dagNode)+1 { // using an indirect graph
pin.MaxDepth = 2
}
logger.Infof("shard #%d (%s) completed. Total size: %s. Links: %d",
shardN,
rootCid,
humanize.Bytes(sh.Size()),
len(sh.dagNode),
)
return rootCid, adder.Pin(ctx, sh.rpc, pin)
}
// Size returns this shard's current size.
func (sh *shard) Size() uint64 {
return sh.currentSize
}
// Size returns this shard's size limit.
func (sh *shard) Limit() uint64 {
return sh.sizeLimit
}
// Last returns the last added link. When finishing sharding,
// the last link of the last shard is the data root for the
// full sharded DAG (the CID that would have resulted from
// adding the content to a single IPFS daemon).
func (sh *shard) LastLink() cid.Cid {
l := len(sh.dagNode)
lastLink := fmt.Sprintf("%d", l-1)
return sh.dagNode[lastLink]
}