cluster: add mDNS service discovery
I always thought the libp2p node would do this, but it is not the case. With this, CRDT peers are able to autodiscover on local networks.
This commit is contained in:
parent
f9889e712f
commit
dc3170b1d2
|
@ -3,19 +3,29 @@ package ipfscluster
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/ipfs-cluster/config"
|
"github.com/ipfs/ipfs-cluster/config"
|
||||||
|
"github.com/ipfs/ipfs-cluster/pstoremgr"
|
||||||
libp2p "github.com/libp2p/go-libp2p"
|
libp2p "github.com/libp2p/go-libp2p"
|
||||||
connmgr "github.com/libp2p/go-libp2p-connmgr"
|
connmgr "github.com/libp2p/go-libp2p-connmgr"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||||
crypto "github.com/libp2p/go-libp2p-crypto"
|
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||||
host "github.com/libp2p/go-libp2p-host"
|
host "github.com/libp2p/go-libp2p-host"
|
||||||
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
|
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
|
||||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||||
pnet "github.com/libp2p/go-libp2p-pnet"
|
pnet "github.com/libp2p/go-libp2p-pnet"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
discovery "github.com/libp2p/go-libp2p/p2p/discovery"
|
||||||
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
|
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
|
||||||
|
mdnsInterval = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// NewClusterHost creates a libp2p Host with the options from the provided
|
// NewClusterHost creates a libp2p Host with the options from the provided
|
||||||
// cluster configuration. Using that host, it creates pubsub and a DHT
|
// cluster configuration. Using that host, it creates pubsub and a DHT
|
||||||
// instances, for shared use by all cluster components. The returned host uses
|
// instances, for shared use by all cluster components. The returned host uses
|
||||||
|
@ -24,7 +34,7 @@ func NewClusterHost(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
ident *config.Identity,
|
ident *config.Identity,
|
||||||
cfg *Config,
|
cfg *Config,
|
||||||
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) {
|
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, discovery.Service, error) {
|
||||||
|
|
||||||
connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)
|
connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)
|
||||||
|
|
||||||
|
@ -37,22 +47,29 @@ func NewClusterHost(
|
||||||
libp2p.ConnectionManager(connman),
|
libp2p.ConnectionManager(connman),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
psub, err := newPubSub(ctx, h)
|
psub, err := newPubSub(ctx, h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.Close()
|
h.Close()
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
idht, err := newDHT(ctx, h)
|
idht, err := newDHT(ctx, h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.Close()
|
h.Close()
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return routedHost(h, idht), psub, idht, nil
|
mdns, err := discovery.NewMdnsService(ctx, h, mdnsInterval, mdnsServiceTag)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mdns.RegisterNotifee(&mdnsNotifee{mgr: pstoremgr.New(ctx, h, "")})
|
||||||
|
|
||||||
|
return routedHost(h, idht), psub, idht, mdns, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
|
func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
|
||||||
|
@ -98,6 +115,26 @@ func routedHost(h host.Host, d *dht.IpfsDHT) host.Host {
|
||||||
return routedhost.Wrap(h, d)
|
return routedhost.Wrap(h, d)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mdnsNotifee struct {
|
||||||
|
mgr *pstoremgr.Manager
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mdnsNot *mdnsNotifee) HandlePeerFound(p peer.AddrInfo) {
|
||||||
|
addrs, err := peer.AddrInfoToP2pAddrs(&p)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// actually mdns returns a single address but let's do things
|
||||||
|
// as if there were several
|
||||||
|
for _, a := range addrs {
|
||||||
|
_, err = mdnsNot.mgr.ImportPeer(a, true, peerstore.ConnectedAddrTTL)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// EncodeProtectorKey converts a byte slice to its hex string representation.
|
// EncodeProtectorKey converts a byte slice to its hex string representation.
|
||||||
func EncodeProtectorKey(secretBytes []byte) string {
|
func EncodeProtectorKey(secretBytes []byte) string {
|
||||||
return hex.EncodeToString(secretBytes)
|
return hex.EncodeToString(secretBytes)
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
discovery "github.com/libp2p/go-libp2p/p2p/discovery"
|
||||||
|
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
|
@ -51,7 +52,7 @@ func daemon(c *cli.Context) error {
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
var bootstraps []ma.Multiaddr
|
var bootstraps []ma.Multiaddr
|
||||||
if bootStr :=c.String("bootstrap"); bootStr != "" {
|
if bootStr := c.String("bootstrap"); bootStr != "" {
|
||||||
bootstraps = parseBootstraps(strings.Split(bootStr, ","))
|
bootstraps = parseBootstraps(strings.Split(bootStr, ","))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +91,7 @@ func daemon(c *cli.Context) error {
|
||||||
cfgs.Cluster.LeaveOnShutdown = true
|
cfgs.Cluster.LeaveOnShutdown = true
|
||||||
}
|
}
|
||||||
|
|
||||||
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
|
host, pubsub, dht, mdns, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
|
||||||
checkErr("creating libp2p host", err)
|
checkErr("creating libp2p host", err)
|
||||||
|
|
||||||
cluster, err := createCluster(ctx, c, cfgHelper, host, pubsub, dht, raftStaging)
|
cluster, err := createCluster(ctx, c, cfgHelper, host, pubsub, dht, raftStaging)
|
||||||
|
@ -103,7 +104,7 @@ func daemon(c *cli.Context) error {
|
||||||
// will realize).
|
// will realize).
|
||||||
go bootstrap(ctx, cluster, bootstraps)
|
go bootstrap(ctx, cluster, bootstraps)
|
||||||
|
|
||||||
return handleSignals(ctx, cancel, cluster, host, dht)
|
return handleSignals(ctx, cancel, cluster, host, dht, mdns)
|
||||||
}
|
}
|
||||||
|
|
||||||
// createCluster creates all the necessary things to produce the cluster
|
// createCluster creates all the necessary things to produce the cluster
|
||||||
|
@ -223,6 +224,7 @@ func handleSignals(
|
||||||
cluster *ipfscluster.Cluster,
|
cluster *ipfscluster.Cluster,
|
||||||
host host.Host,
|
host host.Host,
|
||||||
dht *dht.IpfsDHT,
|
dht *dht.IpfsDHT,
|
||||||
|
mdns discovery.Service,
|
||||||
) error {
|
) error {
|
||||||
signalChan := make(chan os.Signal, 20)
|
signalChan := make(chan os.Signal, 20)
|
||||||
signal.Notify(
|
signal.Notify(
|
||||||
|
@ -241,6 +243,7 @@ func handleSignals(
|
||||||
case <-cluster.Done():
|
case <-cluster.Done():
|
||||||
cancel()
|
cancel()
|
||||||
dht.Close()
|
dht.Close()
|
||||||
|
mdns.Close()
|
||||||
host.Close()
|
host.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -1,5 +1,6 @@
|
||||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
|
cloud.google.com/go v0.38.0 h1:ROfEUZz+Gh5pa62DJWXSaonyu3StP6EA6lPEXPI6mCo=
|
||||||
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
|
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
|
||||||
contrib.go.opencensus.io/exporter/jaeger v0.1.0 h1:WNc9HbA38xEQmsI40Tjd/MNU/g8byN2Of7lwIjv0Jdc=
|
contrib.go.opencensus.io/exporter/jaeger v0.1.0 h1:WNc9HbA38xEQmsI40Tjd/MNU/g8byN2Of7lwIjv0Jdc=
|
||||||
contrib.go.opencensus.io/exporter/jaeger v0.1.0/go.mod h1:VYianECmuFPwU37O699Vc1GOcy+y8kOsfaxHRImmjbA=
|
contrib.go.opencensus.io/exporter/jaeger v0.1.0/go.mod h1:VYianECmuFPwU37O699Vc1GOcy+y8kOsfaxHRImmjbA=
|
||||||
|
@ -555,6 +556,7 @@ github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE
|
||||||
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||||
|
github.com/miekg/dns v1.1.12 h1:WMhc1ik4LNkTg8U9l3hI1LvxKmIL+f1+WV/SZtCbDDA=
|
||||||
github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
|
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
|
||||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
|
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
|
||||||
|
@ -718,6 +720,7 @@ github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f h1:M/lL3
|
||||||
github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f/go.mod h1:cZNvX9cFybI01GriPRMXDtczuvUhgbcYr9iCGaNlRv8=
|
github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f/go.mod h1:cZNvX9cFybI01GriPRMXDtczuvUhgbcYr9iCGaNlRv8=
|
||||||
github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA=
|
github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA=
|
||||||
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
|
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
|
||||||
|
github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30 h1:nMCC9Pwz1pxfC1Y6mYncdk+kq8d5aLx0Q+/gyZGE44M=
|
||||||
github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
|
github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
|
||||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
|
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
|
||||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
|
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
|
||||||
|
@ -803,6 +806,7 @@ golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smto
|
||||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||||
|
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
|
||||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
|
|
@ -45,10 +45,11 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host)
|
||||||
cfg.Secret = testingClusterSecret
|
cfg.Secret = testingClusterSecret
|
||||||
|
|
||||||
// Create a bootstrapping libp2p host
|
// Create a bootstrapping libp2p host
|
||||||
h, _, dht, err := NewClusterHost(context.Background(), ident, cfg)
|
h, _, dht, mdns, err := NewClusterHost(context.Background(), ident, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
mdns.Close()
|
||||||
|
|
||||||
// Connect all peers to that host. This will allow that they
|
// Connect all peers to that host. This will allow that they
|
||||||
// can discover each others via DHT.
|
// can discover each others via DHT.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user