dc3170b1d2
I always thought the libp2p node would do this, but it is not the case. With this, CRDT peers are able to autodiscover on local networks.
142 lines
3.5 KiB
Go
142 lines
3.5 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"time"
|
|
|
|
"github.com/ipfs/ipfs-cluster/config"
|
|
"github.com/ipfs/ipfs-cluster/pstoremgr"
|
|
libp2p "github.com/libp2p/go-libp2p"
|
|
connmgr "github.com/libp2p/go-libp2p-connmgr"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/libp2p/go-libp2p-core/peerstore"
|
|
crypto "github.com/libp2p/go-libp2p-crypto"
|
|
host "github.com/libp2p/go-libp2p-host"
|
|
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
pnet "github.com/libp2p/go-libp2p-pnet"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
discovery "github.com/libp2p/go-libp2p/p2p/discovery"
|
|
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
|
|
)
|
|
|
|
const (
|
|
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
|
|
mdnsInterval = 30 * time.Second
|
|
)
|
|
|
|
// NewClusterHost creates a libp2p Host with the options from the provided
|
|
// cluster configuration. Using that host, it creates pubsub and a DHT
|
|
// instances, for shared use by all cluster components. The returned host uses
|
|
// the DHT for routing. The resulting DHT is not bootstrapped.
|
|
func NewClusterHost(
|
|
ctx context.Context,
|
|
ident *config.Identity,
|
|
cfg *Config,
|
|
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, discovery.Service, error) {
|
|
|
|
connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)
|
|
|
|
h, err := newHost(
|
|
ctx,
|
|
cfg.Secret,
|
|
ident.PrivateKey,
|
|
libp2p.ListenAddrs(cfg.ListenAddr),
|
|
libp2p.NATPortMap(),
|
|
libp2p.ConnectionManager(connman),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
|
|
psub, err := newPubSub(ctx, h)
|
|
if err != nil {
|
|
h.Close()
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
|
|
idht, err := newDHT(ctx, h)
|
|
if err != nil {
|
|
h.Close()
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
|
|
mdns, err := discovery.NewMdnsService(ctx, h, mdnsInterval, mdnsServiceTag)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
|
|
mdns.RegisterNotifee(&mdnsNotifee{mgr: pstoremgr.New(ctx, h, "")})
|
|
|
|
return routedHost(h, idht), psub, idht, mdns, nil
|
|
}
|
|
|
|
func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
|
|
var prot ipnet.Protector
|
|
var err error
|
|
|
|
// Create protector if we have a secret.
|
|
if secret != nil && len(secret) > 0 {
|
|
var key [32]byte
|
|
copy(key[:], secret)
|
|
prot, err = pnet.NewV1ProtectorFromBytes(&key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
finalOpts := []libp2p.Option{
|
|
libp2p.Identity(priv),
|
|
libp2p.PrivateNetwork(prot),
|
|
}
|
|
finalOpts = append(finalOpts, opts...)
|
|
|
|
return libp2p.New(
|
|
ctx,
|
|
finalOpts...,
|
|
)
|
|
}
|
|
|
|
func newDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
|
|
return dht.New(ctx, h)
|
|
}
|
|
|
|
func newPubSub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
|
|
return pubsub.NewGossipSub(
|
|
ctx,
|
|
h,
|
|
pubsub.WithMessageSigning(true),
|
|
pubsub.WithStrictSignatureVerification(true),
|
|
)
|
|
}
|
|
|
|
func routedHost(h host.Host, d *dht.IpfsDHT) host.Host {
|
|
return routedhost.Wrap(h, d)
|
|
}
|
|
|
|
type mdnsNotifee struct {
|
|
mgr *pstoremgr.Manager
|
|
}
|
|
|
|
func (mdnsNot *mdnsNotifee) HandlePeerFound(p peer.AddrInfo) {
|
|
addrs, err := peer.AddrInfoToP2pAddrs(&p)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return
|
|
}
|
|
// actually mdns returns a single address but let's do things
|
|
// as if there were several
|
|
for _, a := range addrs {
|
|
_, err = mdnsNot.mgr.ImportPeer(a, true, peerstore.ConnectedAddrTTL)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// EncodeProtectorKey converts a byte slice to its hex string representation.
|
|
func EncodeProtectorKey(secretBytes []byte) string {
|
|
return hex.EncodeToString(secretBytes)
|
|
}
|