5452b59a2e
* Update go-libp2p to v0.22.0 * Testing with go1.19 * build(deps): bump github.com/multiformats/go-multicodec Bumps [github.com/multiformats/go-multicodec](https://github.com/multiformats/go-multicodec) from 0.5.0 to 0.6.0. - [Release notes](https://github.com/multiformats/go-multicodec/releases) - [Commits](https://github.com/multiformats/go-multicodec/compare/v0.5.0...v0.6.0) --- updated-dependencies: - dependency-name: github.com/multiformats/go-multicodec dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipld/go-car from 0.4.0 to 0.5.0 Bumps [github.com/ipld/go-car](https://github.com/ipld/go-car) from 0.4.0 to 0.5.0. - [Release notes](https://github.com/ipld/go-car/releases) - [Commits](https://github.com/ipld/go-car/compare/v0.4.0...v0.5.0) --- updated-dependencies: - dependency-name: github.com/ipld/go-car dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/prometheus/client_golang Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.12.2 to 1.13.0. - [Release notes](https://github.com/prometheus/client_golang/releases) - [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md) - [Commits](https://github.com/prometheus/client_golang/compare/v1.12.2...v1.13.0) --- updated-dependencies: - dependency-name: github.com/prometheus/client_golang dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/hashicorp/go-hclog from 1.2.1 to 1.3.0 Bumps [github.com/hashicorp/go-hclog](https://github.com/hashicorp/go-hclog) from 1.2.1 to 1.3.0. - [Release notes](https://github.com/hashicorp/go-hclog/releases) - [Commits](https://github.com/hashicorp/go-hclog/compare/v1.2.1...v1.3.0) --- updated-dependencies: - dependency-name: github.com/hashicorp/go-hclog dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-ds-crdt from 0.3.6 to 0.3.7 Bumps [github.com/ipfs/go-ds-crdt](https://github.com/ipfs/go-ds-crdt) from 0.3.6 to 0.3.7. - [Release notes](https://github.com/ipfs/go-ds-crdt/releases) - [Commits](https://github.com/ipfs/go-ds-crdt/compare/v0.3.6...v0.3.7) --- updated-dependencies: - dependency-name: github.com/ipfs/go-ds-crdt dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/urfave/cli/v2 from 2.10.2 to 2.14.1 Bumps [github.com/urfave/cli/v2](https://github.com/urfave/cli) from 2.10.2 to 2.14.1. - [Release notes](https://github.com/urfave/cli/releases) - [Changelog](https://github.com/urfave/cli/blob/main/docs/CHANGELOG.md) - [Commits](https://github.com/urfave/cli/compare/v2.10.2...v2.14.1) --- updated-dependencies: - dependency-name: github.com/urfave/cli/v2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/libp2p/go-libp2p-http from 0.3.0 to 0.4.0 Bumps [github.com/libp2p/go-libp2p-http](https://github.com/libp2p/go-libp2p-http) from 0.3.0 to 0.4.0. - [Release notes](https://github.com/libp2p/go-libp2p-http/releases) - [Commits](https://github.com/libp2p/go-libp2p-http/compare/v0.3.0...v0.4.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-http dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/libp2p/go-libp2p-gorpc from 0.4.0 to 0.5.0 Bumps [github.com/libp2p/go-libp2p-gorpc](https://github.com/libp2p/go-libp2p-gorpc) from 0.4.0 to 0.5.0. - [Release notes](https://github.com/libp2p/go-libp2p-gorpc/releases) - [Commits](https://github.com/libp2p/go-libp2p-gorpc/compare/v0.4.0...v0.5.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-gorpc dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump contrib.go.opencensus.io/exporter/prometheus Bumps [contrib.go.opencensus.io/exporter/prometheus](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus) from 0.4.1 to 0.4.2. - [Release notes](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus/releases) - [Commits](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus/compare/v0.4.1...v0.4.2) --- updated-dependencies: - dependency-name: contrib.go.opencensus.io/exporter/prometheus dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/libp2p/go-libp2p-raft from 0.1.8 to 0.2.0 Bumps [github.com/libp2p/go-libp2p-raft](https://github.com/libp2p/go-libp2p-raft) from 0.1.8 to 0.2.0. - [Release notes](https://github.com/libp2p/go-libp2p-raft/releases) - [Commits](https://github.com/libp2p/go-libp2p-raft/compare/v0.1.8...v0.2.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-raft dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/urfave/cli from 1.22.9 to 1.22.10 Bumps [github.com/urfave/cli](https://github.com/urfave/cli) from 1.22.9 to 1.22.10. - [Release notes](https://github.com/urfave/cli/releases) - [Changelog](https://github.com/urfave/cli/blob/main/docs/CHANGELOG.md) - [Commits](https://github.com/urfave/cli/compare/v1.22.9...v1.22.10) --- updated-dependencies: - dependency-name: github.com/urfave/cli dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * Fix checker/linter/staticcheck warnings Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
181 lines
4.4 KiB
Go
181 lines
4.4 KiB
Go
package adder
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
|
"go.uber.org/multierr"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
)
|
|
|
|
// ErrBlockAdder is returned when adding a to multiple destinations
|
|
// block fails on all of them.
|
|
var ErrBlockAdder = errors.New("failed to put block on all destinations")
|
|
|
|
// BlockStreamer helps streaming nodes to multiple destinations, as long as
|
|
// one of them is still working.
|
|
type BlockStreamer struct {
|
|
dests []peer.ID
|
|
rpcClient *rpc.Client
|
|
blocks <-chan api.NodeWithMeta
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
errMu sync.Mutex
|
|
err error
|
|
}
|
|
|
|
// NewBlockStreamer creates a BlockStreamer given an rpc client, allocated
|
|
// peers and a channel on which the blocks to stream are received.
|
|
func NewBlockStreamer(ctx context.Context, rpcClient *rpc.Client, dests []peer.ID, blocks <-chan api.NodeWithMeta) *BlockStreamer {
|
|
bsCtx, cancel := context.WithCancel(ctx)
|
|
|
|
bs := BlockStreamer{
|
|
ctx: bsCtx,
|
|
cancel: cancel,
|
|
dests: dests,
|
|
rpcClient: rpcClient,
|
|
blocks: blocks,
|
|
err: nil,
|
|
}
|
|
|
|
go bs.streamBlocks()
|
|
return &bs
|
|
}
|
|
|
|
// Done returns a channel which gets closed when the BlockStreamer has
|
|
// finished.
|
|
func (bs *BlockStreamer) Done() <-chan struct{} {
|
|
return bs.ctx.Done()
|
|
}
|
|
|
|
func (bs *BlockStreamer) setErr(err error) {
|
|
bs.errMu.Lock()
|
|
bs.err = err
|
|
bs.errMu.Unlock()
|
|
}
|
|
|
|
// Err returns any errors that happened after the operation of the
|
|
// BlockStreamer, for example when blocks could not be put to all nodes.
|
|
func (bs *BlockStreamer) Err() error {
|
|
bs.errMu.Lock()
|
|
defer bs.errMu.Unlock()
|
|
return bs.err
|
|
}
|
|
|
|
func (bs *BlockStreamer) streamBlocks() {
|
|
defer bs.cancel()
|
|
|
|
// Nothing should be sent on out.
|
|
// We drain though
|
|
out := make(chan struct{})
|
|
go func() {
|
|
for range out {
|
|
}
|
|
}()
|
|
|
|
errs := bs.rpcClient.MultiStream(
|
|
bs.ctx,
|
|
bs.dests,
|
|
"IPFSConnector",
|
|
"BlockStream",
|
|
bs.blocks,
|
|
out,
|
|
)
|
|
|
|
combinedErrors := multierr.Combine(errs...)
|
|
|
|
// FIXME: replicate everywhere.
|
|
if len(multierr.Errors(combinedErrors)) == len(bs.dests) {
|
|
logger.Error(combinedErrors)
|
|
bs.setErr(ErrBlockAdder)
|
|
} else if combinedErrors != nil {
|
|
logger.Warning("there were errors streaming blocks, but at least one destination succeeded")
|
|
logger.Warning(combinedErrors)
|
|
}
|
|
}
|
|
|
|
// IpldNodeToNodeWithMeta converts an ipld.Node to api.NodeWithMeta.
|
|
func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
|
|
size, err := n.Size()
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
}
|
|
|
|
return api.NodeWithMeta{
|
|
Cid: api.NewCid(n.Cid()),
|
|
Data: n.RawData(),
|
|
CumSize: size,
|
|
}
|
|
}
|
|
|
|
// BlockAllocate helps allocating blocks to peers.
|
|
func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) ([]peer.ID, error) {
|
|
// Find where to allocate this file
|
|
var allocsStr []peer.ID
|
|
err := rpc.CallContext(
|
|
ctx,
|
|
"",
|
|
"Cluster",
|
|
"BlockAllocate",
|
|
api.PinWithOpts(api.CidUndef, pinOpts),
|
|
&allocsStr,
|
|
)
|
|
return allocsStr, err
|
|
}
|
|
|
|
// Pin helps sending local RPC pin requests.
|
|
func Pin(ctx context.Context, rpc *rpc.Client, pin api.Pin) error {
|
|
if pin.ReplicationFactorMin < 0 {
|
|
pin.Allocations = []peer.ID{}
|
|
}
|
|
logger.Debugf("adder pinning %+v", pin)
|
|
var pinResp api.Pin
|
|
return rpc.CallContext(
|
|
ctx,
|
|
"", // use ourself to pin
|
|
"Cluster",
|
|
"Pin",
|
|
pin,
|
|
&pinResp,
|
|
)
|
|
}
|
|
|
|
// ErrDAGNotFound is returned whenever we try to get a block from the DAGService.
|
|
var ErrDAGNotFound = errors.New("dagservice: a Get operation was attempted while cluster-adding (this is likely a bug)")
|
|
|
|
// BaseDAGService partially implements an ipld.DAGService.
|
|
// It provides the methods which are not needed by ClusterDAGServices
|
|
// (Get*, Remove*) so that they can save adding this code.
|
|
type BaseDAGService struct {
|
|
}
|
|
|
|
// Get always returns errNotFound
|
|
func (dag BaseDAGService) Get(ctx context.Context, key cid.Cid) (ipld.Node, error) {
|
|
return nil, ErrDAGNotFound
|
|
}
|
|
|
|
// GetMany returns an output channel that always emits an error
|
|
func (dag BaseDAGService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
|
|
out := make(chan *ipld.NodeOption, 1)
|
|
out <- &ipld.NodeOption{Err: ErrDAGNotFound}
|
|
close(out)
|
|
return out
|
|
}
|
|
|
|
// Remove is a nop
|
|
func (dag BaseDAGService) Remove(ctx context.Context, key cid.Cid) error {
|
|
return nil
|
|
}
|
|
|
|
// RemoveMany is a nop
|
|
func (dag BaseDAGService) RemoveMany(ctx context.Context, keys []cid.Cid) error {
|
|
return nil
|
|
}
|