ipfs-cluster/adder/util.go
Hector Sanjuan 5452b59a2e
Dependency upgrades (#1755)
* Update go-libp2p to v0.22.0

* Testing with go1.19

* build(deps): bump github.com/multiformats/go-multicodec

Bumps [github.com/multiformats/go-multicodec](https://github.com/multiformats/go-multicodec) from 0.5.0 to 0.6.0.
- [Release notes](https://github.com/multiformats/go-multicodec/releases)
- [Commits](https://github.com/multiformats/go-multicodec/compare/v0.5.0...v0.6.0)

---
updated-dependencies:
- dependency-name: github.com/multiformats/go-multicodec
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/ipld/go-car from 0.4.0 to 0.5.0

Bumps [github.com/ipld/go-car](https://github.com/ipld/go-car) from 0.4.0 to 0.5.0.
- [Release notes](https://github.com/ipld/go-car/releases)
- [Commits](https://github.com/ipld/go-car/compare/v0.4.0...v0.5.0)

---
updated-dependencies:
- dependency-name: github.com/ipld/go-car
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/prometheus/client_golang

Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.12.2 to 1.13.0.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.12.2...v1.13.0)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/hashicorp/go-hclog from 1.2.1 to 1.3.0

Bumps [github.com/hashicorp/go-hclog](https://github.com/hashicorp/go-hclog) from 1.2.1 to 1.3.0.
- [Release notes](https://github.com/hashicorp/go-hclog/releases)
- [Commits](https://github.com/hashicorp/go-hclog/compare/v1.2.1...v1.3.0)

---
updated-dependencies:
- dependency-name: github.com/hashicorp/go-hclog
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/ipfs/go-ds-crdt from 0.3.6 to 0.3.7

Bumps [github.com/ipfs/go-ds-crdt](https://github.com/ipfs/go-ds-crdt) from 0.3.6 to 0.3.7.
- [Release notes](https://github.com/ipfs/go-ds-crdt/releases)
- [Commits](https://github.com/ipfs/go-ds-crdt/compare/v0.3.6...v0.3.7)

---
updated-dependencies:
- dependency-name: github.com/ipfs/go-ds-crdt
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/urfave/cli/v2 from 2.10.2 to 2.14.1

Bumps [github.com/urfave/cli/v2](https://github.com/urfave/cli) from 2.10.2 to 2.14.1.
- [Release notes](https://github.com/urfave/cli/releases)
- [Changelog](https://github.com/urfave/cli/blob/main/docs/CHANGELOG.md)
- [Commits](https://github.com/urfave/cli/compare/v2.10.2...v2.14.1)

---
updated-dependencies:
- dependency-name: github.com/urfave/cli/v2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/libp2p/go-libp2p-http from 0.3.0 to 0.4.0

Bumps [github.com/libp2p/go-libp2p-http](https://github.com/libp2p/go-libp2p-http) from 0.3.0 to 0.4.0.
- [Release notes](https://github.com/libp2p/go-libp2p-http/releases)
- [Commits](https://github.com/libp2p/go-libp2p-http/compare/v0.3.0...v0.4.0)

---
updated-dependencies:
- dependency-name: github.com/libp2p/go-libp2p-http
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/libp2p/go-libp2p-gorpc from 0.4.0 to 0.5.0

Bumps [github.com/libp2p/go-libp2p-gorpc](https://github.com/libp2p/go-libp2p-gorpc) from 0.4.0 to 0.5.0.
- [Release notes](https://github.com/libp2p/go-libp2p-gorpc/releases)
- [Commits](https://github.com/libp2p/go-libp2p-gorpc/compare/v0.4.0...v0.5.0)

---
updated-dependencies:
- dependency-name: github.com/libp2p/go-libp2p-gorpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump contrib.go.opencensus.io/exporter/prometheus

Bumps [contrib.go.opencensus.io/exporter/prometheus](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus) from 0.4.1 to 0.4.2.
- [Release notes](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus/releases)
- [Commits](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus/compare/v0.4.1...v0.4.2)

---
updated-dependencies:
- dependency-name: contrib.go.opencensus.io/exporter/prometheus
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/libp2p/go-libp2p-raft from 0.1.8 to 0.2.0

Bumps [github.com/libp2p/go-libp2p-raft](https://github.com/libp2p/go-libp2p-raft) from 0.1.8 to 0.2.0.
- [Release notes](https://github.com/libp2p/go-libp2p-raft/releases)
- [Commits](https://github.com/libp2p/go-libp2p-raft/compare/v0.1.8...v0.2.0)

---
updated-dependencies:
- dependency-name: github.com/libp2p/go-libp2p-raft
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump github.com/urfave/cli from 1.22.9 to 1.22.10

Bumps [github.com/urfave/cli](https://github.com/urfave/cli) from 1.22.9 to 1.22.10.
- [Release notes](https://github.com/urfave/cli/releases)
- [Changelog](https://github.com/urfave/cli/blob/main/docs/CHANGELOG.md)
- [Commits](https://github.com/urfave/cli/compare/v1.22.9...v1.22.10)

---
updated-dependencies:
- dependency-name: github.com/urfave/cli
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Fix checker/linter/staticcheck warnings

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2022-09-06 16:57:17 +02:00

181 lines
4.4 KiB
Go

package adder
import (
"context"
"errors"
"sync"
"github.com/ipfs-cluster/ipfs-cluster/api"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
peer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
// ErrBlockAdder is returned when adding a to multiple destinations
// block fails on all of them.
var ErrBlockAdder = errors.New("failed to put block on all destinations")
// BlockStreamer helps streaming nodes to multiple destinations, as long as
// one of them is still working.
type BlockStreamer struct {
dests []peer.ID
rpcClient *rpc.Client
blocks <-chan api.NodeWithMeta
ctx context.Context
cancel context.CancelFunc
errMu sync.Mutex
err error
}
// NewBlockStreamer creates a BlockStreamer given an rpc client, allocated
// peers and a channel on which the blocks to stream are received.
func NewBlockStreamer(ctx context.Context, rpcClient *rpc.Client, dests []peer.ID, blocks <-chan api.NodeWithMeta) *BlockStreamer {
bsCtx, cancel := context.WithCancel(ctx)
bs := BlockStreamer{
ctx: bsCtx,
cancel: cancel,
dests: dests,
rpcClient: rpcClient,
blocks: blocks,
err: nil,
}
go bs.streamBlocks()
return &bs
}
// Done returns a channel which gets closed when the BlockStreamer has
// finished.
func (bs *BlockStreamer) Done() <-chan struct{} {
return bs.ctx.Done()
}
func (bs *BlockStreamer) setErr(err error) {
bs.errMu.Lock()
bs.err = err
bs.errMu.Unlock()
}
// Err returns any errors that happened after the operation of the
// BlockStreamer, for example when blocks could not be put to all nodes.
func (bs *BlockStreamer) Err() error {
bs.errMu.Lock()
defer bs.errMu.Unlock()
return bs.err
}
func (bs *BlockStreamer) streamBlocks() {
defer bs.cancel()
// Nothing should be sent on out.
// We drain though
out := make(chan struct{})
go func() {
for range out {
}
}()
errs := bs.rpcClient.MultiStream(
bs.ctx,
bs.dests,
"IPFSConnector",
"BlockStream",
bs.blocks,
out,
)
combinedErrors := multierr.Combine(errs...)
// FIXME: replicate everywhere.
if len(multierr.Errors(combinedErrors)) == len(bs.dests) {
logger.Error(combinedErrors)
bs.setErr(ErrBlockAdder)
} else if combinedErrors != nil {
logger.Warning("there were errors streaming blocks, but at least one destination succeeded")
logger.Warning(combinedErrors)
}
}
// IpldNodeToNodeWithMeta converts an ipld.Node to api.NodeWithMeta.
func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
size, err := n.Size()
if err != nil {
logger.Warn(err)
}
return api.NodeWithMeta{
Cid: api.NewCid(n.Cid()),
Data: n.RawData(),
CumSize: size,
}
}
// BlockAllocate helps allocating blocks to peers.
func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) ([]peer.ID, error) {
// Find where to allocate this file
var allocsStr []peer.ID
err := rpc.CallContext(
ctx,
"",
"Cluster",
"BlockAllocate",
api.PinWithOpts(api.CidUndef, pinOpts),
&allocsStr,
)
return allocsStr, err
}
// Pin helps sending local RPC pin requests.
func Pin(ctx context.Context, rpc *rpc.Client, pin api.Pin) error {
if pin.ReplicationFactorMin < 0 {
pin.Allocations = []peer.ID{}
}
logger.Debugf("adder pinning %+v", pin)
var pinResp api.Pin
return rpc.CallContext(
ctx,
"", // use ourself to pin
"Cluster",
"Pin",
pin,
&pinResp,
)
}
// ErrDAGNotFound is returned whenever we try to get a block from the DAGService.
var ErrDAGNotFound = errors.New("dagservice: a Get operation was attempted while cluster-adding (this is likely a bug)")
// BaseDAGService partially implements an ipld.DAGService.
// It provides the methods which are not needed by ClusterDAGServices
// (Get*, Remove*) so that they can save adding this code.
type BaseDAGService struct {
}
// Get always returns errNotFound
func (dag BaseDAGService) Get(ctx context.Context, key cid.Cid) (ipld.Node, error) {
return nil, ErrDAGNotFound
}
// GetMany returns an output channel that always emits an error
func (dag BaseDAGService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
out := make(chan *ipld.NodeOption, 1)
out <- &ipld.NodeOption{Err: ErrDAGNotFound}
close(out)
return out
}
// Remove is a nop
func (dag BaseDAGService) Remove(ctx context.Context, key cid.Cid) error {
return nil
}
// RemoveMany is a nop
func (dag BaseDAGService) RemoveMany(ctx context.Context, keys []cid.Cid) error {
return nil
}