dc3170b1d2
I always thought the libp2p node would do this, but it is not the case. With this, CRDT peers are able to autodiscover on local networks.
347 lines
8.4 KiB
Go
347 lines
8.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
ipfscluster "github.com/ipfs/ipfs-cluster"
|
|
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
|
|
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
|
|
"github.com/ipfs/ipfs-cluster/api/rest"
|
|
"github.com/ipfs/ipfs-cluster/cmdutils"
|
|
"github.com/ipfs/ipfs-cluster/config"
|
|
"github.com/ipfs/ipfs-cluster/consensus/crdt"
|
|
"github.com/ipfs/ipfs-cluster/consensus/raft"
|
|
"github.com/ipfs/ipfs-cluster/informer/disk"
|
|
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
|
|
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
|
|
"github.com/ipfs/ipfs-cluster/observations"
|
|
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
|
|
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
|
|
"go.opencensus.io/tag"
|
|
|
|
ds "github.com/ipfs/go-datastore"
|
|
host "github.com/libp2p/go-libp2p-core/host"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
discovery "github.com/libp2p/go-libp2p/p2p/discovery"
|
|
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
|
|
errors "github.com/pkg/errors"
|
|
cli "github.com/urfave/cli"
|
|
)
|
|
|
|
func parseBootstraps(flagVal []string) (bootstraps []ma.Multiaddr) {
|
|
for _, a := range flagVal {
|
|
bAddr, err := ma.NewMultiaddr(strings.TrimSpace(a))
|
|
checkErr("error parsing bootstrap multiaddress (%s)", err, a)
|
|
bootstraps = append(bootstraps, bAddr)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Runs the cluster peer
|
|
func daemon(c *cli.Context) error {
|
|
logger.Info("Initializing. For verbose output run with \"-l debug\". Please wait...")
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
var bootstraps []ma.Multiaddr
|
|
if bootStr := c.String("bootstrap"); bootStr != "" {
|
|
bootstraps = parseBootstraps(strings.Split(bootStr, ","))
|
|
}
|
|
|
|
// Execution lock
|
|
locker.lock()
|
|
defer locker.tryUnlock()
|
|
|
|
// Load all the configurations and identity
|
|
cfgHelper := loadConfigHelper()
|
|
defer cfgHelper.Manager().Shutdown()
|
|
|
|
cfgs := cfgHelper.Configs()
|
|
|
|
if c.Bool("stats") {
|
|
cfgs.Metrics.EnableStats = true
|
|
}
|
|
cfgHelper.SetupTracing(c.Bool("tracing"))
|
|
|
|
// Setup bootstrapping
|
|
raftStaging := false
|
|
switch cfgHelper.GetConsensus() {
|
|
case cfgs.Raft.ConfigKey():
|
|
if len(bootstraps) > 0 {
|
|
// Cleanup state if bootstrapping
|
|
raft.CleanupRaft(cfgs.Raft)
|
|
raftStaging = true
|
|
}
|
|
case cfgs.Crdt.ConfigKey():
|
|
if !c.Bool("no-trust") {
|
|
crdtCfg := cfgs.Crdt
|
|
crdtCfg.TrustedPeers = append(crdtCfg.TrustedPeers, ipfscluster.PeersFromMultiaddrs(bootstraps)...)
|
|
}
|
|
}
|
|
|
|
if c.Bool("leave") {
|
|
cfgs.Cluster.LeaveOnShutdown = true
|
|
}
|
|
|
|
host, pubsub, dht, mdns, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
|
|
checkErr("creating libp2p host", err)
|
|
|
|
cluster, err := createCluster(ctx, c, cfgHelper, host, pubsub, dht, raftStaging)
|
|
checkErr("starting cluster", err)
|
|
|
|
// noop if no bootstraps
|
|
// if bootstrapping fails, consensus will never be ready
|
|
// and timeout. So this can happen in background and we
|
|
// avoid worrying about error handling here (since Cluster
|
|
// will realize).
|
|
go bootstrap(ctx, cluster, bootstraps)
|
|
|
|
return handleSignals(ctx, cancel, cluster, host, dht, mdns)
|
|
}
|
|
|
|
// createCluster creates all the necessary things to produce the cluster
|
|
// object and returns it along the datastore so the lifecycle can be handled
|
|
// (the datastore needs to be Closed after shutting down the Cluster).
|
|
func createCluster(
|
|
ctx context.Context,
|
|
c *cli.Context,
|
|
cfgHelper *cmdutils.ConfigHelper,
|
|
host host.Host,
|
|
pubsub *pubsub.PubSub,
|
|
dht *dht.IpfsDHT,
|
|
raftStaging bool,
|
|
) (*ipfscluster.Cluster, error) {
|
|
|
|
cfgs := cfgHelper.Configs()
|
|
cfgMgr := cfgHelper.Manager()
|
|
|
|
ctx, err := tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty()))
|
|
checkErr("tag context with host id", err)
|
|
|
|
var apis []ipfscluster.API
|
|
if cfgMgr.IsLoadedFromJSON(config.API, cfgs.Restapi.ConfigKey()) {
|
|
rest, err := rest.NewAPIWithHost(ctx, cfgs.Restapi, host)
|
|
checkErr("creating REST API component", err)
|
|
apis = append(apis, rest)
|
|
}
|
|
|
|
if cfgMgr.IsLoadedFromJSON(config.API, cfgs.Ipfsproxy.ConfigKey()) {
|
|
proxy, err := ipfsproxy.New(cfgs.Ipfsproxy)
|
|
checkErr("creating IPFS Proxy component", err)
|
|
|
|
apis = append(apis, proxy)
|
|
}
|
|
|
|
connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp)
|
|
checkErr("creating IPFS Connector component", err)
|
|
|
|
tracker := setupPinTracker(
|
|
c.String("pintracker"),
|
|
host,
|
|
cfgs.Maptracker,
|
|
cfgs.Statelesstracker,
|
|
cfgs.Cluster.Peername,
|
|
)
|
|
|
|
informer, err := disk.NewInformer(cfgs.Diskinf)
|
|
checkErr("creating disk informer", err)
|
|
alloc := descendalloc.NewAllocator()
|
|
|
|
ipfscluster.ReadyTimeout = cfgs.Raft.WaitForLeaderTimeout + 5*time.Second
|
|
|
|
err = observations.SetupMetrics(cfgs.Metrics)
|
|
checkErr("setting up Metrics", err)
|
|
|
|
tracer, err := observations.SetupTracing(cfgs.Tracing)
|
|
checkErr("setting up Tracing", err)
|
|
|
|
store := setupDatastore(cfgHelper)
|
|
|
|
cons, err := setupConsensus(
|
|
cfgHelper,
|
|
host,
|
|
dht,
|
|
pubsub,
|
|
store,
|
|
raftStaging,
|
|
)
|
|
if err != nil {
|
|
store.Close()
|
|
checkErr("setting up Consensus", err)
|
|
}
|
|
|
|
var peersF func(context.Context) ([]peer.ID, error)
|
|
if cfgHelper.GetConsensus() == cfgs.Raft.ConfigKey() {
|
|
peersF = cons.Peers
|
|
}
|
|
|
|
mon, err := pubsubmon.New(ctx, cfgs.Pubsubmon, pubsub, peersF)
|
|
if err != nil {
|
|
store.Close()
|
|
checkErr("setting up PeerMonitor", err)
|
|
}
|
|
|
|
return ipfscluster.NewCluster(
|
|
ctx,
|
|
host,
|
|
dht,
|
|
cfgs.Cluster,
|
|
store,
|
|
cons,
|
|
apis,
|
|
connector,
|
|
tracker,
|
|
mon,
|
|
alloc,
|
|
informer,
|
|
tracer,
|
|
)
|
|
}
|
|
|
|
// bootstrap will bootstrap this peer to one of the bootstrap addresses
|
|
// if there are any.
|
|
func bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, bootstraps []ma.Multiaddr) {
|
|
for _, bstrap := range bootstraps {
|
|
logger.Infof("Bootstrapping to %s", bstrap)
|
|
err := cluster.Join(ctx, bstrap)
|
|
if err != nil {
|
|
logger.Errorf("bootstrap to %s failed: %s", bstrap, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleSignals(
|
|
ctx context.Context,
|
|
cancel context.CancelFunc,
|
|
cluster *ipfscluster.Cluster,
|
|
host host.Host,
|
|
dht *dht.IpfsDHT,
|
|
mdns discovery.Service,
|
|
) error {
|
|
signalChan := make(chan os.Signal, 20)
|
|
signal.Notify(
|
|
signalChan,
|
|
syscall.SIGINT,
|
|
syscall.SIGTERM,
|
|
syscall.SIGHUP,
|
|
)
|
|
|
|
var ctrlcCount int
|
|
for {
|
|
select {
|
|
case <-signalChan:
|
|
ctrlcCount++
|
|
handleCtrlC(ctx, cluster, ctrlcCount)
|
|
case <-cluster.Done():
|
|
cancel()
|
|
dht.Close()
|
|
mdns.Close()
|
|
host.Close()
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleCtrlC(ctx context.Context, cluster *ipfscluster.Cluster, ctrlcCount int) {
|
|
switch ctrlcCount {
|
|
case 1:
|
|
go func() {
|
|
err := cluster.Shutdown(ctx)
|
|
checkErr("shutting down cluster", err)
|
|
}()
|
|
case 2:
|
|
out(`
|
|
|
|
|
|
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
|
Shutdown is taking too long! Press Ctrl-c again to manually kill cluster.
|
|
Note that this may corrupt the local cluster state.
|
|
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
|
|
|
|
|
`)
|
|
case 3:
|
|
out("exiting cluster NOW")
|
|
locker.tryUnlock()
|
|
os.Exit(-1)
|
|
}
|
|
}
|
|
|
|
func setupPinTracker(
|
|
name string,
|
|
h host.Host,
|
|
mapCfg *maptracker.Config,
|
|
statelessCfg *stateless.Config,
|
|
peerName string,
|
|
) ipfscluster.PinTracker {
|
|
switch name {
|
|
case "map":
|
|
ptrk := maptracker.NewMapPinTracker(mapCfg, h.ID(), peerName)
|
|
logger.Debug("map pintracker loaded")
|
|
return ptrk
|
|
case "stateless":
|
|
ptrk := stateless.New(statelessCfg, h.ID(), peerName)
|
|
logger.Debug("stateless pintracker loaded")
|
|
return ptrk
|
|
default:
|
|
err := errors.New("unknown pintracker type")
|
|
checkErr("", err)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func setupDatastore(cfgHelper *cmdutils.ConfigHelper) ds.Datastore {
|
|
stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgHelper.Configs())
|
|
checkErr("creating state manager", err)
|
|
store, err := stmgr.GetStore()
|
|
checkErr("creating datastore", err)
|
|
return store
|
|
}
|
|
|
|
func setupConsensus(
|
|
cfgHelper *cmdutils.ConfigHelper,
|
|
h host.Host,
|
|
dht *dht.IpfsDHT,
|
|
pubsub *pubsub.PubSub,
|
|
store ds.Datastore,
|
|
raftStaging bool,
|
|
) (ipfscluster.Consensus, error) {
|
|
|
|
cfgs := cfgHelper.Configs()
|
|
switch cfgHelper.GetConsensus() {
|
|
case cfgs.Raft.ConfigKey():
|
|
rft, err := raft.NewConsensus(
|
|
h,
|
|
cfgHelper.Configs().Raft,
|
|
store,
|
|
raftStaging,
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "creating Raft component")
|
|
}
|
|
return rft, nil
|
|
case cfgs.Crdt.ConfigKey():
|
|
convrdt, err := crdt.New(
|
|
h,
|
|
dht,
|
|
pubsub,
|
|
cfgHelper.Configs().Crdt,
|
|
store,
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "creating CRDT component")
|
|
}
|
|
return convrdt, nil
|
|
default:
|
|
return nil, errors.New("unknown consensus component")
|
|
}
|
|
}
|