d553227755
* build(deps): bump github.com/hsanjuan/ipfs-lite from 1.5.0 to 1.6.0 Bumps [github.com/hsanjuan/ipfs-lite](https://github.com/hsanjuan/ipfs-lite) from 1.5.0 to 1.6.0. - [Release notes](https://github.com/hsanjuan/ipfs-lite/releases) - [Commits](https://github.com/hsanjuan/ipfs-lite/compare/v1.5.0...v1.6.0) --- updated-dependencies: - dependency-name: github.com/hsanjuan/ipfs-lite dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/golang-jwt/jwt/v4 from 4.4.3 to 4.5.0 Bumps [github.com/golang-jwt/jwt/v4](https://github.com/golang-jwt/jwt) from 4.4.3 to 4.5.0. - [Release notes](https://github.com/golang-jwt/jwt/releases) - [Changelog](https://github.com/golang-jwt/jwt/blob/main/VERSION_HISTORY.md) - [Commits](https://github.com/golang-jwt/jwt/compare/v4.4.3...v4.5.0) --- updated-dependencies: - dependency-name: github.com/golang-jwt/jwt/v4 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Update pubsub too * Fix compatiblity with latest libp2p, libipfs * build(deps): bump github.com/libp2p/go-libp2p-raft from 0.3.0 to 0.4.0 Bumps [github.com/libp2p/go-libp2p-raft](https://github.com/libp2p/go-libp2p-raft) from 0.3.0 to 0.4.0. - [Release notes](https://github.com/libp2p/go-libp2p-raft/releases) - [Commits](https://github.com/libp2p/go-libp2p-raft/compare/v0.3.0...v0.4.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-raft dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/multiformats/go-multicodec Bumps [github.com/multiformats/go-multicodec](https://github.com/multiformats/go-multicodec) from 0.7.0 to 0.8.1. - [Release notes](https://github.com/multiformats/go-multicodec/releases) - [Commits](https://github.com/multiformats/go-multicodec/compare/v0.7.0...v0.8.1) --- updated-dependencies: - dependency-name: github.com/multiformats/go-multicodec dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipld/go-car from 0.5.0 to 0.6.0 Bumps [github.com/ipld/go-car](https://github.com/ipld/go-car) from 0.5.0 to 0.6.0. - [Release notes](https://github.com/ipld/go-car/releases) - [Changelog](https://github.com/ipld/go-car/blob/master/.goreleaser.yaml) - [Commits](https://github.com/ipld/go-car/compare/v0.5.0...v0.6.0) --- updated-dependencies: - dependency-name: github.com/ipld/go-car dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump golang.org/x/crypto from 0.5.0 to 0.6.0 Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.5.0 to 0.6.0. - [Release notes](https://github.com/golang/crypto/releases) - [Commits](https://github.com/golang/crypto/compare/v0.5.0...v0.6.0) --- updated-dependencies: - dependency-name: golang.org/x/crypto dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-ds-pebble from 0.1.0 to 0.2.2 Bumps [github.com/ipfs/go-ds-pebble](https://github.com/ipfs/go-ds-pebble) from 0.1.0 to 0.2.2. - [Release notes](https://github.com/ipfs/go-ds-pebble/releases) - [Commits](https://github.com/ipfs/go-ds-pebble/compare/v0.1.0...v0.2.2) --- updated-dependencies: - dependency-name: github.com/ipfs/go-ds-pebble dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ugorji/go/codec from 1.2.8 to 1.2.10 Bumps [github.com/ugorji/go/codec](https://github.com/ugorji/go) from 1.2.8 to 1.2.10. - [Release notes](https://github.com/ugorji/go/releases) - [Commits](https://github.com/ugorji/go/compare/v1.2.8...v1.2.10) --- updated-dependencies: - dependency-name: github.com/ugorji/go/codec dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/libp2p/go-libp2p-http from 0.4.0 to 0.5.0 Bumps [github.com/libp2p/go-libp2p-http](https://github.com/libp2p/go-libp2p-http) from 0.4.0 to 0.5.0. - [Release notes](https://github.com/libp2p/go-libp2p-http/releases) - [Commits](https://github.com/libp2p/go-libp2p-http/compare/v0.4.0...v0.5.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-http dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-ipfs-pinner from 0.2.1 to 0.3.0 Bumps [github.com/ipfs/go-ipfs-pinner](https://github.com/ipfs/go-ipfs-pinner) from 0.2.1 to 0.3.0. - [Release notes](https://github.com/ipfs/go-ipfs-pinner/releases) - [Commits](https://github.com/ipfs/go-ipfs-pinner/compare/v0.2.1...v0.3.0) --- updated-dependencies: - dependency-name: github.com/ipfs/go-ipfs-pinner dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-unixfs from 0.4.3 to 0.4.4 Bumps [github.com/ipfs/go-unixfs](https://github.com/ipfs/go-unixfs) from 0.4.3 to 0.4.4. - [Release notes](https://github.com/ipfs/go-unixfs/releases) - [Commits](https://github.com/ipfs/go-unixfs/compare/v0.4.3...v0.4.4) --- updated-dependencies: - dependency-name: github.com/ipfs/go-unixfs dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-path from 0.3.0 to 0.3.1 Bumps [github.com/ipfs/go-path](https://github.com/ipfs/go-path) from 0.3.0 to 0.3.1. - [Release notes](https://github.com/ipfs/go-path/releases) - [Commits](https://github.com/ipfs/go-path/compare/v0.3.0...v0.3.1) --- updated-dependencies: - dependency-name: github.com/ipfs/go-path dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-ds-crdt from 0.3.9 to 0.3.10 Bumps [github.com/ipfs/go-ds-crdt](https://github.com/ipfs/go-ds-crdt) from 0.3.9 to 0.3.10. - [Release notes](https://github.com/ipfs/go-ds-crdt/releases) - [Commits](https://github.com/ipfs/go-ds-crdt/compare/v0.3.9...v0.3.10) --- updated-dependencies: - dependency-name: github.com/ipfs/go-ds-crdt dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump golang.org/x/crypto from 0.6.0 to 0.7.0 Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.6.0 to 0.7.0. - [Release notes](https://github.com/golang/crypto/releases) - [Commits](https://github.com/golang/crypto/compare/v0.6.0...v0.7.0) --- updated-dependencies: - dependency-name: golang.org/x/crypto dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-ds-pebble from 0.2.2 to 0.2.3 Bumps [github.com/ipfs/go-ds-pebble](https://github.com/ipfs/go-ds-pebble) from 0.2.2 to 0.2.3. - [Release notes](https://github.com/ipfs/go-ds-pebble/releases) - [Commits](https://github.com/ipfs/go-ds-pebble/compare/v0.2.2...v0.2.3) --- updated-dependencies: - dependency-name: github.com/ipfs/go-ds-pebble dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
243 lines
7.5 KiB
Go
243 lines
7.5 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
|
|
config "github.com/ipfs-cluster/ipfs-cluster/config"
|
|
ds "github.com/ipfs/go-datastore"
|
|
namespace "github.com/ipfs/go-datastore/namespace"
|
|
ipns "github.com/ipfs/go-ipns"
|
|
libp2p "github.com/libp2p/go-libp2p"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
dual "github.com/libp2p/go-libp2p-kad-dht/dual"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
record "github.com/libp2p/go-libp2p-record"
|
|
crypto "github.com/libp2p/go-libp2p/core/crypto"
|
|
host "github.com/libp2p/go-libp2p/core/host"
|
|
network "github.com/libp2p/go-libp2p/core/network"
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
corepnet "github.com/libp2p/go-libp2p/core/pnet"
|
|
routing "github.com/libp2p/go-libp2p/core/routing"
|
|
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
|
|
connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
|
|
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
|
|
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
|
|
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
|
|
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
|
|
tcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
|
|
websocket "github.com/libp2p/go-libp2p/p2p/transport/websocket"
|
|
)
|
|
|
|
const dhtNamespace = "dht"
|
|
|
|
var _ = libp2pquic.NewTransport
|
|
|
|
func init() {
|
|
// Cluster peers should advertise their public IPs as soon as they
|
|
// learn about them. Default for this is 4, which prevents clusters
|
|
// with less than 4 peers to advertise an external address they know
|
|
// of, therefore they cannot be remembered by other peers asap. This
|
|
// affects dockerized setups mostly. This may announce non-dialable
|
|
// NATed addresses too eagerly, but they should progressively be
|
|
// cleaned up.
|
|
identify.ActivationThresh = 1
|
|
}
|
|
|
|
// NewClusterHost creates a fully-featured libp2p Host with the options from
|
|
// the provided cluster configuration. Using that host, it creates pubsub and
|
|
// a DHT instances (persisting to the given datastore), for shared use by all
|
|
// cluster components. The returned host uses the DHT for routing. Relay and
|
|
// NATService are additionally setup for this host.
|
|
func NewClusterHost(
|
|
ctx context.Context,
|
|
ident *config.Identity,
|
|
cfg *Config,
|
|
ds ds.Datastore,
|
|
) (host.Host, *pubsub.PubSub, *dual.DHT, error) {
|
|
|
|
// Set the default dial timeout for all libp2p connections. It is not
|
|
// very good to touch this global variable here, but the alternative
|
|
// is to used a modify context everywhere, even if the user supplies
|
|
// it.
|
|
network.DialPeerTimeout = cfg.DialPeerTimeout
|
|
|
|
connman, err := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, connmgr.WithGracePeriod(cfg.ConnMgr.GracePeriod))
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
var h host.Host
|
|
var idht *dual.DHT
|
|
// a channel to wait until these variables have been set
|
|
// (or left unset on errors). Mostly to avoid reading while writing.
|
|
hostAndDHTReady := make(chan struct{})
|
|
defer close(hostAndDHTReady)
|
|
|
|
hostGetter := func() host.Host {
|
|
<-hostAndDHTReady // closed when we finish NewClusterHost
|
|
return h
|
|
}
|
|
|
|
dhtGetter := func() *dual.DHT {
|
|
<-hostAndDHTReady // closed when we finish NewClusterHost
|
|
return idht
|
|
}
|
|
|
|
opts := []libp2p.Option{
|
|
libp2p.ListenAddrs(cfg.ListenAddr...),
|
|
libp2p.NATPortMap(),
|
|
libp2p.ConnectionManager(connman),
|
|
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
|
|
idht, err = newDHT(ctx, h, ds)
|
|
return idht, err
|
|
}),
|
|
libp2p.EnableNATService(),
|
|
libp2p.EnableRelay(),
|
|
libp2p.EnableAutoRelayWithPeerSource(newPeerSource(hostGetter, dhtGetter)),
|
|
libp2p.EnableHolePunching(),
|
|
}
|
|
|
|
if cfg.EnableRelayHop {
|
|
opts = append(opts, libp2p.EnableRelayService())
|
|
}
|
|
|
|
h, err = newHost(
|
|
ctx,
|
|
cfg.Secret,
|
|
ident.PrivateKey,
|
|
opts...,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
psub, err := newPubSub(ctx, h)
|
|
if err != nil {
|
|
h.Close()
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
return h, psub, idht, nil
|
|
}
|
|
|
|
// newHost creates a base cluster host without dht, pubsub, relay or nat etc.
|
|
// mostly used for testing.
|
|
func newHost(ctx context.Context, psk corepnet.PSK, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
|
|
finalOpts := []libp2p.Option{
|
|
libp2p.Identity(priv),
|
|
}
|
|
finalOpts = append(finalOpts, baseOpts(psk)...)
|
|
finalOpts = append(finalOpts, opts...)
|
|
|
|
h, err := libp2p.New(
|
|
finalOpts...,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return h, nil
|
|
}
|
|
|
|
func baseOpts(psk corepnet.PSK) []libp2p.Option {
|
|
return []libp2p.Option{
|
|
libp2p.PrivateNetwork(psk),
|
|
libp2p.EnableNATService(),
|
|
libp2p.Security(noise.ID, noise.New),
|
|
libp2p.Security(libp2ptls.ID, libp2ptls.New),
|
|
// TODO: quic does not support private networks
|
|
// libp2p.DefaultTransports,
|
|
libp2p.NoTransports,
|
|
libp2p.Transport(tcp.NewTCPTransport),
|
|
libp2p.Transport(websocket.New),
|
|
}
|
|
}
|
|
|
|
func newDHT(ctx context.Context, h host.Host, store ds.Datastore, extraopts ...dual.Option) (*dual.DHT, error) {
|
|
opts := []dual.Option{
|
|
dual.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})),
|
|
dual.DHTOption(dht.NamespacedValidator("ipns", ipns.Validator{KeyBook: h.Peerstore()})),
|
|
dual.DHTOption(dht.Concurrency(10)),
|
|
}
|
|
|
|
opts = append(opts, extraopts...)
|
|
|
|
if batchingDs, ok := store.(ds.Batching); ok {
|
|
dhtDatastore := namespace.Wrap(batchingDs, ds.NewKey(dhtNamespace))
|
|
opts = append(opts, dual.DHTOption(dht.Datastore(dhtDatastore)))
|
|
logger.Debug("enabling DHT record persistence to datastore")
|
|
}
|
|
|
|
return dual.New(ctx, h, opts...)
|
|
}
|
|
|
|
func newPubSub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
|
|
return pubsub.NewGossipSub(
|
|
ctx,
|
|
h,
|
|
pubsub.WithMessageSigning(true),
|
|
pubsub.WithStrictSignatureVerification(true),
|
|
)
|
|
}
|
|
|
|
// Inspired in Kubo's
|
|
// https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/relay.go#L79-L103
|
|
// and https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/routing.go#L242-L317
|
|
// but simplified and adapted:
|
|
// - Everytime we need peers for relays we do a DHT lookup.
|
|
// - We return the peers from that lookup.
|
|
// - No need to do it async, since we have to wait for the full lookup to
|
|
// return anyways. We put them on a buffered channel and be done.
|
|
func newPeerSource(hostGetter func() host.Host, dhtGetter func() *dual.DHT) autorelay.PeerSource {
|
|
return func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
|
|
// make a channel to return, and put items from numPeers on
|
|
// that channel up to numPeers. Then close it.
|
|
r := make(chan peer.AddrInfo, numPeers)
|
|
defer close(r)
|
|
|
|
// Because the Host, DHT are initialized after relay, we need to
|
|
// obtain them indirectly this way.
|
|
h := hostGetter()
|
|
if h == nil { // context canceled etc.
|
|
return r
|
|
}
|
|
idht := dhtGetter()
|
|
if idht == nil { // context canceled etc.
|
|
return r
|
|
}
|
|
|
|
// length of closest peers is K.
|
|
closestPeers, err := idht.WAN.GetClosestPeers(ctx, h.ID().String())
|
|
if err != nil { // Bail out. Usually a "no peers found".
|
|
return r
|
|
}
|
|
|
|
logger.Debug("peerSource: %d closestPeers for %d requested", len(closestPeers), numPeers)
|
|
|
|
for _, p := range closestPeers {
|
|
addrs := h.Peerstore().Addrs(p)
|
|
if len(addrs) == 0 {
|
|
continue
|
|
}
|
|
dhtPeer := peer.AddrInfo{ID: p, Addrs: addrs}
|
|
// Attempt to put peers on r if we have space,
|
|
// otherwise return (we reached numPeers)
|
|
select {
|
|
case r <- dhtPeer:
|
|
case <-ctx.Done():
|
|
return r
|
|
default:
|
|
return r
|
|
}
|
|
}
|
|
// We are here if numPeers > closestPeers
|
|
return r
|
|
}
|
|
}
|
|
|
|
// EncodeProtectorKey converts a byte slice to its hex string representation.
|
|
func EncodeProtectorKey(secretBytes []byte) string {
|
|
return hex.EncodeToString(secretBytes)
|
|
}
|