Fix #1796: Implement basic DHT-peer source for libp2p AutoRelay.

At some point libp2p decided that it would not automatically lookup peers on
the DHT when trying to find relays, and instead silently introduced options
for the EnableAutoRelay option, in a way that it randomly panicked first, or,
in later versions, consistently panics.

This fixes that by providing a PeerSource option for AutoRelay. We auto-relay
decides that it needs peers, we will perform a cluster-DHT lookup and send
those. Hopefully this is similar to the previous behaviour.

Since all cluster peers are relays, that should work, hopefully.  In general,
this means NAT'ed peers should be able to find relays to perform hole-punching
when connecting to other NAT'ed peers etc. In practice this is not a scenario
we see a lot with clusters so it is not very well tested.

At least things are not going to panic.
This commit is contained in:
Hector Sanjuan 2023-01-27 18:01:24 +01:00
parent b29f8b807a
commit ba1ba989ca

View File

@ -3,6 +3,7 @@ package ipfscluster
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"time"
config "github.com/ipfs-cluster/ipfs-cluster/config" config "github.com/ipfs-cluster/ipfs-cluster/config"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
@ -16,8 +17,10 @@ import (
crypto "github.com/libp2p/go-libp2p/core/crypto" crypto "github.com/libp2p/go-libp2p/core/crypto"
host "github.com/libp2p/go-libp2p/core/host" host "github.com/libp2p/go-libp2p/core/host"
network "github.com/libp2p/go-libp2p/core/network" network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
corepnet "github.com/libp2p/go-libp2p/core/pnet" corepnet "github.com/libp2p/go-libp2p/core/pnet"
routing "github.com/libp2p/go-libp2p/core/routing" routing "github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
noise "github.com/libp2p/go-libp2p/p2p/security/noise" noise "github.com/libp2p/go-libp2p/p2p/security/noise"
@ -65,7 +68,23 @@ func NewClusterHost(
return nil, nil, nil, err return nil, nil, nil, err
} }
var h host.Host
var idht *dual.DHT var idht *dual.DHT
// a channel to wait until these variables have been set
// (or left unset on errors). Mostly to avoid reading while writing.
hostAndDHTReady := make(chan struct{})
defer close(hostAndDHTReady)
hostGetter := func() host.Host {
<-hostAndDHTReady // closed when we finish NewClusterHost
return h
}
dhtGetter := func() *dual.DHT {
<-hostAndDHTReady // closed when we finish NewClusterHost
return idht
}
opts := []libp2p.Option{ opts := []libp2p.Option{
libp2p.ListenAddrs(cfg.ListenAddr...), libp2p.ListenAddrs(cfg.ListenAddr...),
libp2p.NATPortMap(), libp2p.NATPortMap(),
@ -76,9 +95,7 @@ func NewClusterHost(
}), }),
libp2p.EnableNATService(), libp2p.EnableNATService(),
libp2p.EnableRelay(), libp2p.EnableRelay(),
// breaks without passing a way to discover relays. libp2p.EnableAutoRelay(autorelay.WithPeerSource(newPeerSource(hostGetter, dhtGetter), time.Minute)),
// https://github.com/libp2p/go-libp2p/issues/1852
// libp2p.EnableAutoRelay(),
libp2p.EnableHolePunching(), libp2p.EnableHolePunching(),
} }
@ -86,7 +103,7 @@ func NewClusterHost(
opts = append(opts, libp2p.EnableRelayService()) opts = append(opts, libp2p.EnableRelayService())
} }
h, err := newHost( h, err = newHost(
ctx, ctx,
cfg.Secret, cfg.Secret,
ident.PrivateKey, ident.PrivateKey,
@ -165,6 +182,63 @@ func newPubSub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
) )
} }
type peerSourceF func(ctx context.Context, numPeers int) <-chan peer.AddrInfo
// Inspired in Kubo's
// https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/relay.go#L79-L103
// and https://github.com/ipfs/go-ipfs/blob/9327ee64ce96ca6da29bb2a099e0e0930b0d9e09/core/node/libp2p/routing.go#L242-L317
// but simplified and adapted:
// - Everytime we need peers for relays we do a DHT lookup.
// - We return the peers from that lookup.
// - No need to do it async, since we have to wait for the full lookup to
// return anyways. We put them on a buffered channel and be done.
func newPeerSource(hostGetter func() host.Host, dhtGetter func() *dual.DHT) peerSourceF {
return func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
// make a channel to return, and put items from numPeers on
// that channel up to numPeers. Then close it.
r := make(chan peer.AddrInfo, numPeers)
defer close(r)
// Because the Host, DHT are initialized after relay, we need to
// obtain them indirectly this way.
h := hostGetter()
if h == nil { // context canceled etc.
return r
}
idht := dhtGetter()
if idht == nil { // context canceled etc.
return r
}
// length of closest peers is K.
closestPeers, err := idht.WAN.GetClosestPeers(ctx, h.ID().String())
if err != nil { // Bail out. Usually a "no peers found".
return r
}
logger.Debug("peerSource: %d closestPeers for %d requested", len(closestPeers), numPeers)
for _, p := range closestPeers {
addrs := h.Peerstore().Addrs(p)
if len(addrs) == 0 {
continue
}
dhtPeer := peer.AddrInfo{ID: p, Addrs: addrs}
// Attempt to put peers on r if we have space,
// otherwise return (we reached numPeers)
select {
case r <- dhtPeer:
case <-ctx.Done():
return r
default:
return r
}
}
// We are here if numPeers > closestPeers
return r
}
}
// EncodeProtectorKey converts a byte slice to its hex string representation. // EncodeProtectorKey converts a byte slice to its hex string representation.
func EncodeProtectorKey(secretBytes []byte) string { func EncodeProtectorKey(secretBytes []byte) string {
return hex.EncodeToString(secretBytes) return hex.EncodeToString(secretBytes)