Adder: BlockPutHelper should fail on all RPC errors

It kept trying even when the destination libp2p host was down
because that's a ClientError.
This commit is contained in:
Hector Sanjuan 2019-08-13 16:02:43 +02:00
parent 70e429f925
commit 5c2af68459

View File

@ -14,15 +14,19 @@ import (
rpc "github.com/libp2p/go-libp2p-gorpc"
)
// ErrBlockAdder is returned when adding a to multiple destinations
// block fails on all of them.
var ErrBlockAdder = errors.New("failed to put block on all destinations")
// BlockAdder implements "github.com/ipfs/go-ipld-format".NodeAdder.
// It is efficient because it doesn't try failed peers again as long as
// block is stored with at least one peer.
// It helps sending nodes to multiple destinations, as long as one of
// them is still working.
type BlockAdder struct {
dests []peer.ID
rpcClient *rpc.Client
}
// NewBlockAdder creates a BlockAdder given an rpc client and allocation peers.
// NewBlockAdder creates a BlockAdder given an rpc client and allocated peers.
func NewBlockAdder(rpcClient *rpc.Client, dests []peer.ID) *BlockAdder {
return &BlockAdder{
dests: dests,
@ -30,28 +34,10 @@ func NewBlockAdder(rpcClient *rpc.Client, dests []peer.ID) *BlockAdder {
}
}
// Add puts an ipld node to allocated destinations.
// Add puts an ipld node to the allocated destinations.
func (ba *BlockAdder) Add(ctx context.Context, node ipld.Node) error {
size, err := node.Size()
if err != nil {
logger.Warning(err)
}
nodeSerial := &api.NodeWithMeta{
Cid: node.Cid(),
Data: node.RawData(),
CumSize: size,
}
nodeSerial := ipldNodeToNodeWithMeta(node)
format, ok := cid.CodecToStr[nodeSerial.Cid.Type()]
if !ok {
format = ""
logger.Warning("unsupported cid type, treating as v0")
}
if nodeSerial.Cid.Prefix().Version == 0 {
format = "v0"
}
nodeSerial.Format = format
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(ba.dests))
defer rpcutil.MultiCancel(cancels)
@ -65,20 +51,27 @@ func (ba *BlockAdder) Add(ctx context.Context, node ipld.Node) error {
rpcutil.RPCDiscardReplies(len(ba.dests)),
)
var actDests []peer.ID
var sucessfulDests []peer.ID
for i, e := range errs {
if rpc.IsAuthorizationError(e) || rpc.IsServerError(e) {
if e != nil {
logger.Errorf("BlockPut on %s: %s", ba.dests[i], e)
}
// RPCErrors include server errors (wrong RPC methods), client
// errors (creating, writing or reading streams) and
// authorization errors, but not IPFS errors from a failed blockput
// for example.
if rpc.IsRPCError(e) {
continue
}
actDests = append(actDests, ba.dests[i])
sucessfulDests = append(sucessfulDests, ba.dests[i])
}
if len(actDests) == 0 {
// request to all peers failed
return fmt.Errorf("could not put block on any peer: %s", rpcutil.CheckErrs(errs))
if len(sucessfulDests) == 0 {
return ErrBlockAdder
}
ba.dests = actDests
ba.dests = sucessfulDests
return nil
}
@ -93,6 +86,30 @@ func (ba *BlockAdder) AddMany(ctx context.Context, nodes []ipld.Node) error {
return nil
}
// ipldNodeToNodeSerial converts an ipld.Node to NodeWithMeta.
func ipldNodeToNodeWithMeta(n ipld.Node) *api.NodeWithMeta {
size, err := n.Size()
if err != nil {
logger.Warning(err)
}
format, ok := cid.CodecToStr[n.Cid().Type()]
if !ok {
format = ""
logger.Warning("unsupported cid type, treating as v0")
}
if n.Cid().Prefix().Version == 0 {
format = "v0"
}
return &api.NodeWithMeta{
Cid: n.Cid(),
Data: n.RawData(),
CumSize: size,
Format: format,
}
}
// BlockAllocate helps allocating blocks to peers.
func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) ([]peer.ID, error) {
// Find where to allocate this file