ipfs-cluster/clusterhost.go
2023-04-19 12:59:07 +02:00

243 lines
7.5 KiB
Go

package ipfscluster
import (
"context"
"encoding/hex"
config "github.com/ipfs-cluster/ipfs-cluster/config"
ipns "github.com/ipfs/boxo/ipns"
ds "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
libp2p "github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
dual "github.com/libp2p/go-libp2p-kad-dht/dual"
pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record"
crypto "github.com/libp2p/go-libp2p/core/crypto"
host "github.com/libp2p/go-libp2p/core/host"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
corepnet "github.com/libp2p/go-libp2p/core/pnet"
routing "github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
tcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
websocket "github.com/libp2p/go-libp2p/p2p/transport/websocket"
)
const dhtNamespace = "dht"
var _ = libp2pquic.NewTransport
func init() {
// Cluster peers should advertise their public IPs as soon as they
// learn about them. Default for this is 4, which prevents clusters
// with less than 4 peers to advertise an external address they know
// of, therefore they cannot be remembered by other peers asap. This
// affects dockerized setups mostly. This may announce non-dialable
// NATed addresses too eagerly, but they should progressively be
// cleaned up.
identify.ActivationThresh = 1
}
// NewClusterHost creates a fully-featured libp2p Host with the options from
// the provided cluster configuration. Using that host, it creates pubsub and
// a DHT instances (persisting to the given datastore), for shared use by all
// cluster components. The returned host uses the DHT for routing. Relay and
// NATService are additionally setup for this host.
func NewClusterHost(
ctx context.Context,
ident *config.Identity,
cfg *Config,
ds ds.Datastore,
) (host.Host, *pubsub.PubSub, *dual.DHT, error) {
// Set the default dial timeout for all libp2p connections. It is not
// very good to touch this global variable here, but the alternative
// is to used a modify context everywhere, even if the user supplies
// it.
network.DialPeerTimeout = cfg.DialPeerTimeout
connman, err := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, connmgr.WithGracePeriod(cfg.ConnMgr.GracePeriod))
if err != nil {
return nil, nil, nil, err
}
var h host.Host
var idht *dual.DHT
// a channel to wait until these variables have been set
// (or left unset on errors). Mostly to avoid reading while writing.
hostAndDHTReady := make(chan struct{})
defer close(hostAndDHTReady)
hostGetter := func() host.Host {
<-hostAndDHTReady // closed when we finish NewClusterHost
return h
}
dhtGetter := func() *dual.DHT {
<-hostAndDHTReady // closed when we finish NewClusterHost
return idht
}
opts := []libp2p.Option{
libp2p.ListenAddrs(cfg.ListenAddr...),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connman),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
idht, err = newDHT(ctx, h, ds)
return idht, err
}),
libp2p.EnableNATService(),
libp2p.EnableRelay(),
libp2p.EnableAutoRelayWithPeerSource(newPeerSource(hostGetter, dhtGetter)),
libp2p.EnableHolePunching(),
}
if cfg.EnableRelayHop {
opts = append(opts, libp2p.EnableRelayService())
}
h, err = newHost(
ctx,
cfg.Secret,
ident.PrivateKey,
opts...,
)
if err != nil {
return nil, nil, nil, err
}
psub, err := newPubSub(ctx, h)
if err != nil {
h.Close()
return nil, nil, nil, err
}
return h, psub, idht, nil
}
// newHost creates a base cluster host without dht, pubsub, relay or nat etc.
// mostly used for testing.
func newHost(ctx context.Context, psk corepnet.PSK, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
finalOpts := []libp2p.Option{
libp2p.Identity(priv),
}
finalOpts = append(finalOpts, baseOpts(psk)...)
finalOpts = append(finalOpts, opts...)
h, err := libp2p.New(
finalOpts...,
)
if err != nil {
return nil, err
}
return h, nil
}
func baseOpts(psk corepnet.PSK) []libp2p.Option {
return []libp2p.Option{
libp2p.PrivateNetwork(psk),
libp2p.EnableNATService(),
libp2p.Security(noise.ID, noise.New),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
// TODO: quic does not support private networks
// libp2p.DefaultTransports,
libp2p.NoTransports,
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(websocket.New),
}
}
func newDHT(ctx context.Context, h host.Host, store ds.Datastore, extraopts ...dual.Option) (*dual.DHT, error) {
opts := []dual.Option{
dual.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})),
dual.DHTOption(dht.NamespacedValidator("ipns", ipns.Validator{KeyBook: h.Peerstore()})),
dual.DHTOption(dht.Concurrency(10)),
}
opts = append(opts, extraopts...)
if batchingDs, ok := store.(ds.Batching); ok {
dhtDatastore := namespace.Wrap(batchingDs, ds.NewKey(dhtNamespace))
opts = append(opts, dual.DHTOption(dht.Datastore(dhtDatastore)))
logger.Debug("enabling DHT record persistence to datastore")
}
return dual.New(ctx, h, opts...)
}
func newPubSub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
return pubsub.NewGossipSub(
ctx,
h,
pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true),
)
}
// Inspired in Kubo's
// https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/relay.go#L79-L103
// and https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/routing.go#L242-L317
// but simplified and adapted:
// - Everytime we need peers for relays we do a DHT lookup.
// - We return the peers from that lookup.
// - No need to do it async, since we have to wait for the full lookup to
// return anyways. We put them on a buffered channel and be done.
func newPeerSource(hostGetter func() host.Host, dhtGetter func() *dual.DHT) autorelay.PeerSource {
return func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
// make a channel to return, and put items from numPeers on
// that channel up to numPeers. Then close it.
r := make(chan peer.AddrInfo, numPeers)
defer close(r)
// Because the Host, DHT are initialized after relay, we need to
// obtain them indirectly this way.
h := hostGetter()
if h == nil { // context canceled etc.
return r
}
idht := dhtGetter()
if idht == nil { // context canceled etc.
return r
}
// length of closest peers is K.
closestPeers, err := idht.WAN.GetClosestPeers(ctx, h.ID().String())
if err != nil { // Bail out. Usually a "no peers found".
return r
}
logger.Debug("peerSource: %d closestPeers for %d requested", len(closestPeers), numPeers)
for _, p := range closestPeers {
addrs := h.Peerstore().Addrs(p)
if len(addrs) == 0 {
continue
}
dhtPeer := peer.AddrInfo{ID: p, Addrs: addrs}
// Attempt to put peers on r if we have space,
// otherwise return (we reached numPeers)
select {
case r <- dhtPeer:
case <-ctx.Done():
return r
default:
return r
}
}
// We are here if numPeers > closestPeers
return r
}
}
// EncodeProtectorKey converts a byte slice to its hex string representation.
func EncodeProtectorKey(secretBytes []byte) string {
return hex.EncodeToString(secretBytes)
}