diff --git a/adder/util.go b/adder/util.go index f7675530..6ee578cf 100644 --- a/adder/util.go +++ b/adder/util.go @@ -14,15 +14,19 @@ import ( rpc "github.com/libp2p/go-libp2p-gorpc" ) +// ErrBlockAdder is returned when adding a to multiple destinations +// block fails on all of them. +var ErrBlockAdder = errors.New("failed to put block on all destinations") + // BlockAdder implements "github.com/ipfs/go-ipld-format".NodeAdder. -// It is efficient because it doesn't try failed peers again as long as -// block is stored with at least one peer. +// It helps sending nodes to multiple destinations, as long as one of +// them is still working. type BlockAdder struct { dests []peer.ID rpcClient *rpc.Client } -// NewBlockAdder creates a BlockAdder given an rpc client and allocation peers. +// NewBlockAdder creates a BlockAdder given an rpc client and allocated peers. func NewBlockAdder(rpcClient *rpc.Client, dests []peer.ID) *BlockAdder { return &BlockAdder{ dests: dests, @@ -30,28 +34,10 @@ func NewBlockAdder(rpcClient *rpc.Client, dests []peer.ID) *BlockAdder { } } -// Add puts an ipld node to allocated destinations. +// Add puts an ipld node to the allocated destinations. func (ba *BlockAdder) Add(ctx context.Context, node ipld.Node) error { - size, err := node.Size() - if err != nil { - logger.Warning(err) - } - nodeSerial := &api.NodeWithMeta{ - Cid: node.Cid(), - Data: node.RawData(), - CumSize: size, - } + nodeSerial := ipldNodeToNodeWithMeta(node) - format, ok := cid.CodecToStr[nodeSerial.Cid.Type()] - if !ok { - format = "" - logger.Warning("unsupported cid type, treating as v0") - } - if nodeSerial.Cid.Prefix().Version == 0 { - format = "v0" - } - - nodeSerial.Format = format ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(ba.dests)) defer rpcutil.MultiCancel(cancels) @@ -65,20 +51,27 @@ func (ba *BlockAdder) Add(ctx context.Context, node ipld.Node) error { rpcutil.RPCDiscardReplies(len(ba.dests)), ) - var actDests []peer.ID + var sucessfulDests []peer.ID for i, e := range errs { - if rpc.IsAuthorizationError(e) || rpc.IsServerError(e) { + if e != nil { + logger.Errorf("BlockPut on %s: %s", ba.dests[i], e) + } + + // RPCErrors include server errors (wrong RPC methods), client + // errors (creating, writing or reading streams) and + // authorization errors, but not IPFS errors from a failed blockput + // for example. + if rpc.IsRPCError(e) { continue } - actDests = append(actDests, ba.dests[i]) + sucessfulDests = append(sucessfulDests, ba.dests[i]) } - if len(actDests) == 0 { - // request to all peers failed - return fmt.Errorf("could not put block on any peer: %s", rpcutil.CheckErrs(errs)) + if len(sucessfulDests) == 0 { + return ErrBlockAdder } - ba.dests = actDests + ba.dests = sucessfulDests return nil } @@ -93,6 +86,30 @@ func (ba *BlockAdder) AddMany(ctx context.Context, nodes []ipld.Node) error { return nil } +// ipldNodeToNodeSerial converts an ipld.Node to NodeWithMeta. +func ipldNodeToNodeWithMeta(n ipld.Node) *api.NodeWithMeta { + size, err := n.Size() + if err != nil { + logger.Warning(err) + } + + format, ok := cid.CodecToStr[n.Cid().Type()] + if !ok { + format = "" + logger.Warning("unsupported cid type, treating as v0") + } + if n.Cid().Prefix().Version == 0 { + format = "v0" + } + + return &api.NodeWithMeta{ + Cid: n.Cid(), + Data: n.RawData(), + CumSize: size, + Format: format, + } +} + // BlockAllocate helps allocating blocks to peers. func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) ([]peer.ID, error) { // Find where to allocate this file