ipfs-cluster/adder/sharding/shard.go
Kishan Mohanbhai Sagathiya 70e429f925 Fix #852: Improve error handling on add
Keep sending blocks while adding as long as one destination works.
2019-08-13 16:06:20 +02:00

134 lines
3.7 KiB
Go

package sharding
import (
"context"
"fmt"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
humanize "github.com/dustin/go-humanize"
)
// a shard represents a set of blocks (or bucket) which have been assigned
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
ba *adder.BlockAdder
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
currentSize uint64
sizeLimit uint64
}
func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard, error) {
allocs, err := adder.BlockAllocate(ctx, rpc, opts)
if err != nil {
return nil, err
}
if opts.ReplicationFactorMin > 0 && (allocs == nil || len(allocs) == 0) {
// This would mean that the empty cid is part of the shared state somehow.
panic("allocations for new shard cannot be empty without error")
}
if opts.ReplicationFactorMin < 0 {
logger.Warning("Shard is set to replicate everywhere ,which doesn't make sense for sharding")
}
// TODO (hector): get latest metrics for allocations, adjust sizeLimit
// to minumum. This can be done later.
return &shard{
rpc: rpc,
allocations: allocs,
pinOptions: opts,
ba: adder.NewBlockAdder(rpc, allocs),
dagNode: make(map[string]cid.Cid),
currentSize: 0,
sizeLimit: opts.ShardSize,
}, nil
}
// AddLink tries to add a new block to this shard if it's not full.
// Returns true if the block was added
func (sh *shard) AddLink(ctx context.Context, c cid.Cid, s uint64) {
linkN := len(sh.dagNode)
linkName := fmt.Sprintf("%d", linkN)
logger.Debugf("shard: add link: %s", linkName)
sh.dagNode[linkName] = c
sh.currentSize += s
}
// Allocations returns the peer IDs on which blocks are put for this shard.
func (sh *shard) Allocations() []peer.ID {
return sh.allocations
}
// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, error) {
logger.Debugf("shard %d: flush", shardN)
nodes, err := makeDAG(ctx, sh.dagNode)
if err != nil {
return cid.Undef, err
}
err = sh.ba.AddMany(ctx, nodes)
if err != nil {
return cid.Undef, err
}
rootCid := nodes[0].Cid()
pin := api.PinWithOpts(rootCid, sh.pinOptions)
pin.Name = fmt.Sprintf("%s-shard-%d", sh.pinOptions.Name, shardN)
// this sets allocations as priority allocation
pin.Allocations = sh.allocations
pin.Type = api.ShardType
pin.Reference = &prev
pin.MaxDepth = 1
pin.ShardSize = sh.Size() // use current size, not the limit
if len(nodes) > len(sh.dagNode)+1 { // using an indirect graph
pin.MaxDepth = 2
}
logger.Infof("shard #%d (%s) completed. Total size: %s. Links: %d",
shardN,
rootCid,
humanize.Bytes(sh.Size()),
len(sh.dagNode),
)
return rootCid, adder.Pin(ctx, sh.rpc, pin)
}
// Size returns this shard's current size.
func (sh *shard) Size() uint64 {
return sh.currentSize
}
// Size returns this shard's size limit.
func (sh *shard) Limit() uint64 {
return sh.sizeLimit
}
// Last returns the last added link. When finishing sharding,
// the last link of the last shard is the data root for the
// full sharded DAG (the CID that would have resulted from
// adding the content to a single IPFS daemon).
func (sh *shard) LastLink() cid.Cid {
l := len(sh.dagNode)
lastLink := fmt.Sprintf("%d", l-1)
return sh.dagNode[lastLink]
}