clusterhost: place dht records in persistened DHT
With this cluster peers will start remembering provider records accross re-starts.
This commit is contained in:
parent
b95745e0eb
commit
ab74987bc8
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/ipfs/ipfs-cluster/config"
|
||||
libp2p "github.com/libp2p/go-libp2p"
|
||||
relay "github.com/libp2p/go-libp2p-circuit"
|
||||
|
@ -13,6 +15,7 @@ import (
|
|||
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
secio "github.com/libp2p/go-libp2p-secio"
|
||||
libp2ptls "github.com/libp2p/go-libp2p-tls"
|
||||
|
@ -20,6 +23,8 @@ import (
|
|||
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
|
||||
)
|
||||
|
||||
const dhtNamespace = "dht"
|
||||
|
||||
func init() {
|
||||
// Cluster peers should advertise their public IPs as soon as they
|
||||
// learn about them. Default for this is 4, which prevents clusters
|
||||
|
@ -33,13 +38,14 @@ func init() {
|
|||
|
||||
// NewClusterHost creates a fully-featured libp2p Host with the options from
|
||||
// the provided cluster configuration. Using that host, it creates pubsub and
|
||||
// a DHT instances, for shared use by all cluster components. The returned
|
||||
// host uses the DHT for routing. Relay and NATService are additionally
|
||||
// setup for this host.
|
||||
// a DHT instances (persisting to the given datastore), for shared use by all
|
||||
// cluster components. The returned host uses the DHT for routing. Relay and
|
||||
// NATService are additionally setup for this host.
|
||||
func NewClusterHost(
|
||||
ctx context.Context,
|
||||
ident *config.Identity,
|
||||
cfg *Config,
|
||||
ds datastore.Datastore,
|
||||
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) {
|
||||
|
||||
connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)
|
||||
|
@ -57,7 +63,7 @@ func NewClusterHost(
|
|||
libp2p.EnableNATService(),
|
||||
libp2p.ConnectionManager(connman),
|
||||
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
|
||||
idht, err = newDHT(ctx, h)
|
||||
idht, err = newDHT(ctx, h, ds)
|
||||
return idht, err
|
||||
}),
|
||||
libp2p.EnableRelay(relayOpts...),
|
||||
|
@ -114,8 +120,16 @@ func baseOpts(psk corepnet.PSK) []libp2p.Option {
|
|||
}
|
||||
}
|
||||
|
||||
func newDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
|
||||
return dht.New(ctx, h)
|
||||
func newDHT(ctx context.Context, h host.Host, store datastore.Datastore) (*dht.IpfsDHT, error) {
|
||||
opts := []dhtopts.Option{}
|
||||
|
||||
if batchingDs, ok := store.(datastore.Batching); ok {
|
||||
dhtDatastore := namespace.Wrap(batchingDs, datastore.NewKey(dhtNamespace))
|
||||
opts = append(opts, dhtopts.Datastore(dhtDatastore))
|
||||
logger.Debug("enabling DHT record persistance to datastore")
|
||||
}
|
||||
|
||||
return dht.New(ctx, h, opts...)
|
||||
}
|
||||
|
||||
func newPubSub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
|
||||
|
|
|
@ -281,9 +281,19 @@ func runCmd(c *cli.Context) error {
|
|||
cfgHelper.Manager().Shutdown()
|
||||
cfgs := cfgHelper.Configs()
|
||||
|
||||
stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgs)
|
||||
if err != nil {
|
||||
return cli.Exit(errors.Wrap(err, "creating state manager"), 1)
|
||||
}
|
||||
|
||||
store, err := stmgr.GetStore()
|
||||
if err != nil {
|
||||
return cli.Exit(errors.Wrap(err, "creating datastore"), 1)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
|
||||
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster, store)
|
||||
if err != nil {
|
||||
return cli.Exit(errors.Wrap(err, "error creating libp2p components"), 1)
|
||||
}
|
||||
|
@ -325,16 +335,6 @@ func runCmd(c *cli.Context) error {
|
|||
}
|
||||
alloc := descendalloc.NewAllocator()
|
||||
|
||||
stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgs)
|
||||
if err != nil {
|
||||
return cli.Exit(errors.Wrap(err, "creating state manager"), 1)
|
||||
}
|
||||
|
||||
store, err := stmgr.GetStore()
|
||||
if err != nil {
|
||||
return cli.Exit(errors.Wrap(err, "creating datastore"), 1)
|
||||
}
|
||||
|
||||
crdtcons, err := crdt.New(
|
||||
host,
|
||||
dht,
|
||||
|
|
|
@ -87,10 +87,12 @@ func daemon(c *cli.Context) error {
|
|||
cfgs.Cluster.LeaveOnShutdown = true
|
||||
}
|
||||
|
||||
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
|
||||
store := setupDatastore(cfgHelper)
|
||||
|
||||
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster, store)
|
||||
checkErr("creating libp2p host", err)
|
||||
|
||||
cluster, err := createCluster(ctx, c, cfgHelper, host, pubsub, dht, raftStaging)
|
||||
cluster, err := createCluster(ctx, c, cfgHelper, host, pubsub, dht, store, raftStaging)
|
||||
checkErr("starting cluster", err)
|
||||
|
||||
// noop if no bootstraps
|
||||
|
@ -113,6 +115,7 @@ func createCluster(
|
|||
host host.Host,
|
||||
pubsub *pubsub.PubSub,
|
||||
dht *dht.IpfsDHT,
|
||||
store ds.Datastore,
|
||||
raftStaging bool,
|
||||
) (*ipfscluster.Cluster, error) {
|
||||
|
||||
|
@ -161,8 +164,6 @@ func createCluster(
|
|||
tracer, err := observations.SetupTracing(cfgs.Tracing)
|
||||
checkErr("setting up Tracing", err)
|
||||
|
||||
store := setupDatastore(cfgHelper)
|
||||
|
||||
cons, err := setupConsensus(
|
||||
cfgHelper,
|
||||
host,
|
||||
|
|
Loading…
Reference in New Issue
Block a user