From 33d9cdd3c402c6d307945db5c5d9d1fc97d6f59d Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Sun, 29 Apr 2018 00:22:23 +0200 Subject: [PATCH] Feat: emancipate Consensus from the Cluster component This commit promotes the Consensus component (and Raft) to become a fully independent thing like other components, passed to NewCluster during initialization. Cluster (main component) no longer creates the consensus layer internally. This has triggered a number of breaking changes that I will explain below. Motivation: Future work will require the possibility of running Cluster with a consensus layer that is not Raft. The "consensus" layer is in charge of maintaining two things: * The current cluster peerset, as required by the implementation * The current cluster pinset (shared state) While the pinset maintenance has always been in the consensus layer, the peerset maintenance was handled by the main component (starting by the "peers" key in the configuration) AND the Raft component (internally) and this generated lots of confusion: if the user edited the peers in the configuration they would be greeted with an error. The bootstrap process (adding a peer to an existing cluster) and configuration key also complicated many things, since the main component did it, but only when the consensus was initialized and in single peer mode. In all this we also mixed the peerstore (list of peer addresses in the libp2p host) with the peerset, when they need not to be linked. By initializing the consensus layer before calling NewCluster, all the difficulties in maintaining the current implementation in the same way have come to light. Thus, the following changes have been introduced: * Remove "peers" and "bootstrap" keys from the configuration: we no longer edit or save the configuration files. This was a very bad practice, requiring write permissions by the process to the file containing the private key and additionally made things like Puppet deployments of cluster difficult as configuration would mutate from its initial version. Needless to say all the maintenance associated to making sure peers and bootstrap had correct values when peers are bootstrapped or removed. A loud and detailed error message has been added when staring cluster with an old config, along with instructions on how to move forward. * Introduce a PeerstoreFile ("peerstore") which stores peer addresses: in ipfs, the peerstore is not persisted because it can be re-built from the network bootstrappers and the DHT. Cluster should probably also allow discoverability of peers addresses (when not bootstrapping, as in that case we have it), but in the meantime, we will read and persist the peerstore addresses for cluster peers in this file, different from the configuration. Note that dns multiaddresses are now fully supported and no IPs are saved when we have DNS multiaddresses for a peer. * The former "peer_manager" code is now a pstoremgr module, providing utilities to parse, add, list and generally maintain the libp2p host peerstore, including operations on the PeerstoreFile. This "pstoremgr" can now also be extended to perform address autodiscovery and other things indepedently from Cluster. * Create and initialize Raft outside of the main Cluster component: since we can now launch Raft independently from Cluster, we have more degrees of freedom. A new "staging" option when creating the object allows a raft peer to be launched in Staging mode, waiting to be added to a running consensus, and thus, not electing itself as leader or doing anything like we were doing before. This additionally allows us to track when the peer has become a Voter, which only happens when it's caught up with the state, something that was wonky previously. * The raft configuration now includes an InitPeerset key, which allows to provide a peerset for new peers and which is ignored when staging==true. The whole Raft initialization code is way cleaner and stronger now. * Cluster peer bootsrapping is now an ipfs-cluster-service feature. The --bootstrap flag works as before (additionally allowing comma-separated-list of entries). What bootstrap does, is to initialize Raft with staging == true, and then call Join in the main cluster component. Only when the Raft peer transitions to Voter, consensus becomes ready, and cluster becomes Ready. This is cleaner, works better and is less complex than before (supporting both flags and config values). We also backup and clean the state whenever we are boostrapping, automatically * ipfs-cluster-service no longer runs the daemon. Starting cluster needs now "ipfs-cluster-service daemon". The daemon specific flags (bootstrap, alloc) are now flags for the daemon subcommand. Here we mimic ipfs ("ipfs" does not start the daemon but print help) and pave the path for merging both service and ctl in the future. While this brings some breaking changes, it significantly reduces the complexity of the configuration, the code and most importantly, the documentation. It should be easier now to explain the user what is the right way to launch a cluster peer, and more difficult to make mistakes. As a side effect, the PR also: * Fixes #381 - peers with dynamic addresses * Fixes #371 - peers should be Raft configuration option * Fixes #378 - waitForUpdates may return before state fully synced * Fixes #235 - config option shadowing (no cfg saves, no need to shadow) License: MIT Signed-off-by: Hector Sanjuan --- .gitignore | 1 + Dockerfile-test | 2 +- Makefile | 20 +- api/rest/restapi.go | 8 +- cluster.go | 154 ++------- cluster_config.go | 118 +++---- cluster_config_test.go | 28 -- cluster_test.go | 15 +- config_test.go | 3 +- consensus/raft/config.go | 77 +++-- consensus/raft/config_test.go | 15 +- consensus/raft/consensus.go | 76 +++-- consensus/raft/consensus_test.go | 4 +- consensus/raft/data_helper.go | 33 +- consensus/raft/data_helper_test.go | 12 +- consensus/raft/raft.go | 353 +++++++++++--------- ipfs-cluster-service/configs.go | 61 ++++ ipfs-cluster-service/daemon.go | 226 +++++++++++++ ipfs-cluster-service/main.go | 261 ++------------- ipfs-cluster-service/state.go | 13 +- ipfscluster.go | 4 +- ipfscluster_test.go | 96 ++---- peer_manager.go | 86 ----- peer_manager_test.go | 33 +- pstoremgr/pstoremgr.go | 231 +++++++++++++ pstoremgr/pstoremgr_test.go | 105 ++++++ rpc_api.go | 4 +- sharness/config/basic_auth/service.json | 2 - sharness/config/ssl-basic_auth/service.json | 2 - sharness/config/ssl/service.json | 2 - sharness/lib/test-lib.sh | 2 +- 31 files changed, 1182 insertions(+), 865 deletions(-) create mode 100644 ipfs-cluster-service/configs.go create mode 100644 ipfs-cluster-service/daemon.go delete mode 100644 peer_manager.go create mode 100644 pstoremgr/pstoremgr.go create mode 100644 pstoremgr/pstoremgr_test.go diff --git a/.gitignore b/.gitignore index 1d146428..c1adab9b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ sharness/test-results sharness/trash* raftFolderFromTest* +peerstore # Compiled Object files, Static and Dynamic libs (Shared Objects) *.o diff --git a/Dockerfile-test b/Dockerfile-test index d0402a09..9c355fe7 100644 --- a/Dockerfile-test +++ b/Dockerfile-test @@ -54,4 +54,4 @@ VOLUME $IPFS_CLUSTER_PATH ENTRYPOINT ["/usr/local/bin/start-daemons.sh"] # Defaults would go here -CMD [] +CMD ["daemon", "--upgrade"] diff --git a/Makefile b/Makefile index 15190758..837d7684 100644 --- a/Makefile +++ b/Makefile @@ -96,15 +96,15 @@ publish: rwundo $(gx_bin) publish docker: - @docker build -t cluster-image -f Dockerfile . - @docker run --name tmp-make-cluster -d cluster-image && sleep 8 - @docker exec tmp-make-cluster sh -c "ipfs-cluster-ctl version" - @docker exec tmp-make-cluster sh -c "ipfs-cluster-service -v" - @docker stop tmp-make-cluster && docker rm tmp-make-cluster - @docker build -t cluster-image -f Dockerfile-test . - @docker run --name tmp-make-cluster -d cluster-image && sleep 8 - @docker exec tmp-make-cluster sh -c "ipfs-cluster-ctl version" - @docker exec tmp-make-cluster sh -c "ipfs-cluster-service -v" - @docker stop tmp-make-cluster && docker rm tmp-make-cluster + docker build -t cluster-image -f Dockerfile . + docker run --name tmp-make-cluster -d --rm cluster-image && sleep 4 + docker exec tmp-make-cluster sh -c "ipfs-cluster-ctl version" + docker exec tmp-make-cluster sh -c "ipfs-cluster-service -v" + docker kill tmp-make-cluster + docker build -t cluster-image-test -f Dockerfile-test . + docker run --name tmp-make-cluster-test -d --rm cluster-image && sleep 8 + docker exec tmp-make-cluster-test sh -c "ipfs-cluster-ctl version" + docker exec tmp-make-cluster-test sh -c "ipfs-cluster-service -v" + docker kill tmp-make-cluster-test .PHONY: all gx deps test test_sharness clean_sharness rw rwundo publish service ctl install clean gx-clean docker diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 80d480b8..c26057c1 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -12,6 +12,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "fmt" "net" "net/http" "strconv" @@ -393,11 +394,14 @@ func (api *API) runHTTPServer() { func (api *API) runLibp2pServer() { defer api.wg.Done() <-api.rpcReady - logger.Info("REST API (libp2p-http): ENABLED") + + listenMsg := "" for _, a := range api.host.Addrs() { - logger.Infof(" - %s/ipfs/%s", a, api.host.ID().Pretty()) + listenMsg += fmt.Sprintf(" %s/ipfs/%s\n", a, api.host.ID().Pretty()) } + logger.Infof("REST API (libp2p-http): ENABLED. Listening on:\n%s\n", listenMsg) + err := api.server.Serve(api.libp2pListener) if err != nil && !strings.Contains(err.Error(), "context canceled") { logger.Error(err) diff --git a/cluster.go b/cluster.go index 71fa6bee..5f495d18 100644 --- a/cluster.go +++ b/cluster.go @@ -3,11 +3,12 @@ package ipfscluster import ( "context" "errors" + "fmt" "sync" "time" "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/consensus/raft" + "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/state" rpc "github.com/hsanjuan/go-libp2p-gorpc" @@ -17,10 +18,11 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -// Common errors -var ( - ErrBootstrap = errors.New("bootstrap unsuccessful") -) +// ReadyTimeout specifies the time before giving up +// during startup (waiting for consensus to be ready) +// It may need adjustment according to timeouts in the +// consensus layer. +var ReadyTimeout = 30 * time.Second // Cluster is the main IPFS cluster component. It provides // the go-API for it and orchestrates the components that make up the system. @@ -33,7 +35,7 @@ type Cluster struct { host host.Host rpcServer *rpc.Server rpcClient *rpc.Client - peerManager *peerManager + peerManager *pstoremgr.Manager consensus Consensus api API @@ -64,7 +66,7 @@ type Cluster struct { func NewCluster( host host.Host, cfg *Config, - consensusCfg *raft.Config, + consensus Consensus, api API, ipfs IPFSConnector, st state.State, @@ -78,35 +80,31 @@ func NewCluster( return nil, err } - ctx, cancel := context.WithCancel(context.Background()) - if host == nil { - host, err = NewClusterHost(ctx, cfg) - if err != nil { - cancel() - return nil, err - } + return nil, errors.New("cluster host is nil") + } + + listenAddrs := "" + for _, addr := range host.Addrs() { + listenAddrs += fmt.Sprintf(" %s/ipfs/%s\n", addr, host.ID().Pretty()) } if c := Commit; len(c) >= 8 { - logger.Infof("IPFS Cluster v%s-%s listening on:", Version, Commit[0:8]) + logger.Infof("IPFS Cluster v%s-%s listening on:\n%s\n", Version, Commit[0:8], listenAddrs) } else { - logger.Infof("IPFS Cluster v%s listening on:", Version) - } - for _, addr := range host.Addrs() { - logger.Infof(" %s/ipfs/%s", addr, host.ID().Pretty()) + logger.Infof("IPFS Cluster v%s listening on:\n%s\n", Version, listenAddrs) } - peerManager := newPeerManager(host) - peerManager.importAddresses(cfg.Peers, false) - peerManager.importAddresses(cfg.Bootstrap, false) + peerManager := pstoremgr.New(host, cfg.GetPeerstorePath()) + ctx, cancel := context.WithCancel(context.Background()) c := &Cluster{ ctx: ctx, cancel: cancel, id: host.ID(), config: cfg, host: host, + consensus: consensus, api: api, ipfs: ipfs, state: st, @@ -128,20 +126,9 @@ func NewCluster( return nil, err } - err = c.setupConsensus(consensusCfg) - if err != nil { - c.Shutdown() - return nil, err - } c.setupRPCClients() - ok := c.bootstrap() - if !ok { - logger.Error(ErrBootstrap) - c.Shutdown() - return nil, ErrBootstrap - } go func() { - c.ready(consensusCfg.WaitForLeaderTimeout * 2) + c.ready(ReadyTimeout) c.run() }() return c, nil @@ -159,30 +146,6 @@ func (c *Cluster) setupRPC() error { return nil } -func (c *Cluster) setupConsensus(consensuscfg *raft.Config) error { - var startPeers []peer.ID - - if len(c.config.Peers) > 0 { - startPeers = PeersFromMultiaddrs(c.config.Peers) - } else { - // start as single cluster before being added - // to the bootstrapper peers' cluster. - startPeers = []peer.ID{} - } - - consensus, err := raft.NewConsensus( - append(startPeers, c.id), - c.host, - consensuscfg, - c.state) - if err != nil { - logger.Errorf("error creating consensus: %s", err) - return err - } - c.consensus = consensus - return nil -} - func (c *Cluster) setupRPCClients() { c.tracker.SetClient(c.rpcClient) c.ipfs.SetClient(c.rpcClient) @@ -237,7 +200,7 @@ func (c *Cluster) broadcastMetric(m api.Metric) error { if leader == c.id { // Leader needs to broadcast its metric to everyone // in case it goes down (new leader will have to detect this node went down) - logger.Debugf("Leader %s about to broadcast metric %s to %s. Expires: %s", c.id, m.Name, peers, m.Expire) + logger.Debugf("Leader %s about to broadcast metric %s to %s. Expires: %d", c.id, m.Name, peers, m.Expire) errs := c.multiRPC(peers, "Cluster", "PeerMonitorLogMetric", @@ -345,7 +308,7 @@ func (c *Cluster) alertsHandler() { // detects that we have been removed from the peerset, it shuts down this peer. func (c *Cluster) watchPeers() { ticker := time.NewTicker(c.config.PeerWatchInterval) - lastPeers := PeersFromMultiaddrs(c.config.Peers) + lastPeers := PeersFromMultiaddrs(c.peerManager.LoadPeerstore()) for { select { @@ -381,15 +344,13 @@ func (c *Cluster) watchPeers() { if !hasMe { logger.Infof("%s: removed from raft. Initiating shutdown", c.id.Pretty()) c.removed = true - c.config.Bootstrap = c.peerManager.addresses(peers) - c.config.savePeers([]ma.Multiaddr{}) go c.Shutdown() return } if save { - logger.Info("peerset change detected") - c.config.savePeers(c.peerManager.addresses(peers)) + logger.Info("peerset change detected. Saving peers addresses") + c.peerManager.SavePeerstoreForPeers(peers) } } } @@ -439,18 +400,23 @@ func (c *Cluster) ready(timeout time.Duration) { ************************************************** This peer was not able to become part of the cluster. This might be due to one or several causes: + - Check the logs above this message for errors - Check that there is connectivity to the "peers" multiaddresses - Check that all cluster peers are using the same "secret" - Check that this peer is reachable on its "listen_multiaddress" by all peers - Check that the current cluster is healthy (has a leader). Otherwise make sure to start enough peers so that a leader election can happen. - - Check that the peer you are trying to connect to is running the + - Check that the peer(s) you are trying to connect to is running the same version of IPFS-cluster. ************************************************** `) c.Shutdown() return case <-c.consensus.Ready(): + // Consensus ready means the state is up to date so we can sync + // it to the tracker. We ignore errors (normal when state + // doesn't exist in new peers). + c.StateSync() case <-c.ctx.Done(): return } @@ -479,44 +445,6 @@ This might be due to one or several causes: logger.Info("** IPFS Cluster is READY **") } -func (c *Cluster) bootstrap() bool { - // Cases in which we do not bootstrap - if len(c.config.Bootstrap) == 0 || len(c.config.Peers) > 0 { - return true - } - - var err error - for _, b := range c.config.Bootstrap { - logger.Infof("Bootstrapping to %s", b) - err = c.Join(b) - if err == nil { - return true - } - logger.Error(err) - } - - logger.Error("***** ipfs-cluster bootstrap failed (tips below) *****") - logger.Errorf(` -************************************************** -This peer was not able to become part of the cluster. The bootstrap process -failed for all bootstrap peers. The last error was: - -%s - -There are some common reasons for failed bootstraps: - - Check that there is connectivity to the "bootstrap" multiaddresses - - Check that the cluster "secret" is the same for all peers - - Check that this peer is reachable on its "listen_multiaddress" by all peers - - Check that all the peers in the current cluster are healthy, otherwise - remove unhealthy ones first and re-add them later - - Check that the peer you are trying to connect to is running the - same version of IPFS-cluster. -************************************************** -`, err) - - return false -} - // Ready returns a channel which signals when this peer is // fully initialized (including consensus). func (c *Cluster) Ready() <-chan struct{} { @@ -538,10 +466,10 @@ func (c *Cluster) Shutdown() error { // Only attempt to leave if: // - consensus is initialized // - cluster was ready (no bootstrapping error) - // - We are not removed already (means watchPeers() called uss) + // - We are not removed already (means watchPeers() called us) if c.consensus != nil && c.config.LeaveOnShutdown && c.readyB && !c.removed { c.removed = true - peers, err := c.consensus.Peers() + _, err := c.consensus.Peers() if err == nil { // best effort logger.Warning("attempting to leave the cluster. This may take some seconds") @@ -549,9 +477,6 @@ func (c *Cluster) Shutdown() error { if err != nil { logger.Error("leaving cluster: " + err.Error()) } - // save peers as bootstrappers - c.config.Bootstrap = c.peerManager.addresses(peers) - c.config.savePeers([]ma.Multiaddr{}) } } @@ -637,7 +562,7 @@ func (c *Cluster) ID() api.ID { //PublicKey: c.host.Peerstore().PubKey(c.id), Addresses: addrs, ClusterPeers: peers, - ClusterPeersAddresses: c.peerManager.addresses(peers), + ClusterPeersAddresses: c.peerManager.PeersAddresses(peers), Version: Version, Commit: Commit, RPCProtocolVersion: RPCProtocol, @@ -708,7 +633,7 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) { } // Send cluster peers to the new peer. - clusterPeers := append(c.peerManager.addresses(peers), + clusterPeers := append(c.peerManager.PeersAddresses(peers), addrSerial.ToMultiaddr()) err = c.rpcClient.Call(pid, "Cluster", @@ -786,11 +711,6 @@ func (c *Cluster) PeerRemove(pid peer.ID) error { func (c *Cluster) Join(addr ma.Multiaddr) error { logger.Debugf("Join(%s)", addr) - //if len(c.peerManager.peers()) > 1 { - // logger.Error(c.peerManager.peers()) - // return errors.New("only single-node clusters can be joined") - //} - pid, _, err := api.Libp2pMultiaddrSplit(addr) if err != nil { logger.Error(err) @@ -803,7 +723,7 @@ func (c *Cluster) Join(addr ma.Multiaddr) error { } // Add peer to peerstore so we can talk to it - c.peerManager.addPeer(addr, true) + c.peerManager.ImportPeer(addr, true) // Note that PeerAdd() on the remote peer will // figure out what our real address is (obviously not @@ -834,7 +754,7 @@ func (c *Cluster) Join(addr ma.Multiaddr) error { if err != nil { logger.Error(err) } else { - c.config.savePeers(c.peerManager.addresses(peers)) + c.peerManager.SavePeerstoreForPeers(peers) } c.StateSync() diff --git a/cluster_config.go b/cluster_config.go index 04fee8eb..a4caf82f 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "os" + "path/filepath" "sync" "time" @@ -32,6 +33,7 @@ const ( DefaultReplicationFactor = -1 DefaultLeaveOnShutdown = false DefaultDisableRepinning = false + DefaultPeerstoreFile = "peerstore" ) // Config is the configuration object containing customizable variables to @@ -39,7 +41,8 @@ const ( // config.ComponentConfig interface. type Config struct { config.Saver - lock sync.Mutex + lock sync.Mutex + peerstoreLock sync.Mutex // Libp2p ID and private key for Cluster communication (including) // the Consensus component. @@ -54,17 +57,6 @@ type Config struct { // 64 characters and contain only hexadecimal characters (`[0-9a-f]`). Secret []byte - // Peers is the list of peers in the Cluster. They are used - // as the initial peers in the consensus. When bootstrapping a peer, - // Peers will be filled in automatically for the next run upon - // shutdown. - Peers []ma.Multiaddr - - // Bootstrap peers multiaddresses. This peer will attempt to - // join the clusters of the peers in this list after booting. - // Leave empty for a single-peer-cluster. - Bootstrap []ma.Multiaddr - // Leave Cluster on shutdown. Politely informs other peers // of the departure and removes itself from the consensus // peer set. The Cluster size will be reduced by one. @@ -122,6 +114,10 @@ type Config struct { // This is useful when doing certain types of maintainance, or simply // when not wanting to rely on the monitoring system which needs a revamp. DisableRepinning bool + + // Peerstore file specifies the file on which we persist the + // libp2p host peerstore addresses. This file is regularly saved. + PeerstoreFile string } // configJSON represents a Cluster configuration as it will look when it is @@ -132,8 +128,8 @@ type configJSON struct { Peername string `json:"peername"` PrivateKey string `json:"private_key"` Secret string `json:"secret"` - Peers []string `json:"peers"` - Bootstrap []string `json:"bootstrap"` + Peers []string `json:"peers,omitempty"` // DEPRECATED + Bootstrap []string `json:"bootstrap,omitempty"` // DEPRECATED LeaveOnShutdown bool `json:"leave_on_shutdown"` ListenMultiaddress string `json:"listen_multiaddress"` StateSyncInterval string `json:"state_sync_interval"` @@ -144,6 +140,7 @@ type configJSON struct { MonitorPingInterval string `json:"monitor_ping_interval"` PeerWatchInterval string `json:"peer_watch_interval"` DisableRepinning bool `json:"disable_repinning"` + PeerstoreFile string `json:"peerstore_file,omitempty"` } // ConfigKey returns a human-readable string to identify @@ -199,14 +196,6 @@ func (cfg *Config) Validate() error { return errors.New("cluster.ID does not match the private_key") } - if cfg.Peers == nil { - return errors.New("cluster.peers is undefined") - } - - if cfg.Bootstrap == nil { - return errors.New("cluster.bootstrap is undefined") - } - if cfg.ListenAddr == nil { return errors.New("cluster.listen_addr is indefined") } @@ -268,8 +257,6 @@ func (cfg *Config) setDefaults() { addr, _ := ma.NewMultiaddr(DefaultListenAddr) cfg.ListenAddr = addr - cfg.Peers = []ma.Multiaddr{} - cfg.Bootstrap = []ma.Multiaddr{} cfg.LeaveOnShutdown = DefaultLeaveOnShutdown cfg.StateSyncInterval = DefaultStateSyncInterval cfg.IPFSSyncInterval = DefaultIPFSSyncInterval @@ -278,6 +265,7 @@ func (cfg *Config) setDefaults() { cfg.MonitorPingInterval = DefaultMonitorPingInterval cfg.PeerWatchInterval = DefaultPeerWatchInterval cfg.DisableRepinning = DefaultDisableRepinning + cfg.PeerstoreFile = "" // empty so it gets ommited. } // LoadJSON receives a raw json-formatted configuration and @@ -293,6 +281,27 @@ func (cfg *Config) LoadJSON(raw []byte) error { // Make sure all non-defined keys have good values. cfg.setDefaults() + config.SetIfNotDefault(jcfg.PeerstoreFile, &cfg.PeerstoreFile) + + if jcfg.Peers != nil || jcfg.Bootstrap != nil { + logger.Error(` +Your configuration is using cluster.Peers and/or cluster.Bootstrap +keys. Starting at version 0.4.0 these keys have been deprecated and replaced by +the Peerstore file and the consensus.raft.InitialPeers key. + +Bootstrap keeps working but only as a flag: + +"ipfs-cluster-service daemon --bootstrap " + +If you want to upgrade the existing peers that belong to a cluster: + +* Write your peers multiaddresses in the peerstore file (1 per line): ~/.ipfs-cluster/peerstore +* Remove Peers and Bootstrap from your configuration + +Please check the docs (https://cluster.ipfs.io/documentation/configuration/) +for more information.`) + return errors.New("cluster.Peers and cluster.Bootstrap keys have been deprecated") + } parseDuration := func(txt string) time.Duration { d, _ := time.ParseDuration(txt) @@ -330,32 +339,6 @@ func (cfg *Config) LoadJSON(raw []byte) error { } cfg.Secret = clusterSecret - parseMultiaddrs := func(strs []string) ([]ma.Multiaddr, error) { - addrs := make([]ma.Multiaddr, len(strs)) - for i, p := range strs { - maddr, err := ma.NewMultiaddr(p) - if err != nil { - m := "error parsing multiaddress for peer %s: %s" - err = fmt.Errorf(m, p, err) - return nil, err - } - addrs[i] = maddr - } - return addrs, nil - } - - clusterPeers, err := parseMultiaddrs(jcfg.Peers) - if err != nil { - return err - } - cfg.Peers = clusterPeers - - bootstrap, err := parseMultiaddrs(jcfg.Bootstrap) - if err != nil { - return err - } - cfg.Bootstrap = bootstrap - clusterAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress) if err != nil { err = fmt.Errorf("error parsing cluster_listen_multiaddress: %s", err) @@ -406,25 +389,11 @@ func (cfg *Config) ToJSON() (raw []byte, err error) { } pKey := base64.StdEncoding.EncodeToString(pkeyBytes) - // Peers - clusterPeers := make([]string, len(cfg.Peers), len(cfg.Peers)) - for i := 0; i < len(cfg.Peers); i++ { - clusterPeers[i] = cfg.Peers[i].String() - } - - // Bootstrap peers - bootstrap := make([]string, len(cfg.Bootstrap), len(cfg.Bootstrap)) - for i := 0; i < len(cfg.Bootstrap); i++ { - bootstrap[i] = cfg.Bootstrap[i].String() - } - // Set all configuration fields jcfg.ID = cfg.ID.Pretty() jcfg.Peername = cfg.Peername jcfg.PrivateKey = pKey jcfg.Secret = EncodeProtectorKey(cfg.Secret) - jcfg.Peers = clusterPeers - jcfg.Bootstrap = bootstrap jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax jcfg.LeaveOnShutdown = cfg.LeaveOnShutdown @@ -434,16 +403,27 @@ func (cfg *Config) ToJSON() (raw []byte, err error) { jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String() jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String() jcfg.DisableRepinning = cfg.DisableRepinning + jcfg.PeerstoreFile = cfg.PeerstoreFile raw, err = json.MarshalIndent(jcfg, "", " ") return } -func (cfg *Config) savePeers(addrs []ma.Multiaddr) { - cfg.lock.Lock() - cfg.Peers = addrs - cfg.lock.Unlock() - cfg.NotifySave() +// GetPeerstorePath returns the full path of the +// PeerstoreFile, obtained by concatenating that value +// with BaseDir of the configuration, if set. +// An empty string is returned when BaseDir is not set. +func (cfg *Config) GetPeerstorePath() string { + if cfg.BaseDir == "" { + return "" + } + + filename := DefaultPeerstoreFile + if cfg.PeerstoreFile != "" { + filename = cfg.PeerstoreFile + } + + return filepath.Join(cfg.BaseDir, filename) } // DecodeClusterSecret parses a hex-encoded string, checks that it is exactly diff --git a/cluster_config_test.go b/cluster_config_test.go index 9dac7659..004be665 100644 --- a/cluster_config_test.go +++ b/cluster_config_test.go @@ -11,12 +11,6 @@ var ccfgTestJSON = []byte(` "peername": "testpeer", "private_key": "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4", "secret": "2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed", - "peers": [ - "/ip4/1.2.3.4/tcp/10000/ipfs/QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHH123" - ], - "bootstrap": [ - "/ip4/1.2.3.4/tcp/10000/ipfs/QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHH125" - ], "leave_on_shutdown": true, "listen_multiaddress": "/ip4/127.0.0.1/tcp/10000", "state_sync_interval": "1m0s", @@ -39,10 +33,6 @@ func TestLoadJSON(t *testing.T) { t.Error("expected peername 'testpeer'") } - if len(cfg.Peers) != 1 || len(cfg.Bootstrap) != 1 { - t.Error("expected 1 peer and 1 bootstrap") - } - if cfg.ReplicationFactorMin != 5 { t.Error("expected replication factor min == 5") } @@ -97,24 +87,6 @@ func TestLoadJSON(t *testing.T) { t.Error("expected error decoding secret") } - j = &configJSON{} - json.Unmarshal(ccfgTestJSON, j) - j.Bootstrap = []string{"abc"} - tst, _ = json.Marshal(j) - err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error decoding bootstrap address") - } - - j = &configJSON{} - json.Unmarshal(ccfgTestJSON, j) - j.Peers = []string{"abc"} - tst, _ = json.Marshal(j) - err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error decoding bootstrap address") - } - j = &configJSON{} json.Unmarshal(ccfgTestJSON, j) j.ReplicationFactor = 0 diff --git a/cluster_test.go b/cluster_test.go index cdb99e40..5051aa25 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,6 +1,7 @@ package ipfscluster import ( + "context" "errors" "os" "path/filepath" @@ -9,6 +10,7 @@ import ( "github.com/ipfs/ipfs-cluster/allocator/ascendalloc" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" @@ -92,21 +94,30 @@ func (ipfs *mockConnector) RepoSize() (uint64, error) { retu func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) { clusterCfg, _, _, consensusCfg, trackerCfg, monCfg, _ := testingConfigs() + host, err := NewClusterHost(context.Background(), clusterCfg) + if err != nil { + t.Fatal(err) + } + api := &mockAPI{} ipfs := &mockConnector{} st := mapstate.NewMapState() tracker := maptracker.NewMapPinTracker(trackerCfg, clusterCfg.ID) monCfg.CheckInterval = 2 * time.Second + + raftcon, _ := raft.NewConsensus(host, consensusCfg, st, false) mon, _ := basic.NewMonitor(monCfg) alloc := ascendalloc.NewAllocator() numpinCfg := &numpin.Config{} numpinCfg.Default() inf, _ := numpin.NewInformer(numpinCfg) + ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second + cl, err := NewCluster( - nil, + host, clusterCfg, - consensusCfg, + raftcon, api, ipfs, st, diff --git a/config_test.go b/config_test.go index e547a4ff..f2e39d14 100644 --- a/config_test.go +++ b/config_test.go @@ -15,8 +15,6 @@ var testingClusterCfg = []byte(`{ "id": "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA", "private_key": "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4", "secret": "2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed", - "peers": [], - "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/127.0.0.1/tcp/10000", "state_sync_interval": "1m0s", @@ -33,6 +31,7 @@ var testingRaftCfg = []byte(`{ "wait_for_leader_timeout": "10s", "commit_retries": 2, "commit_retry_delay": "50ms", + "backups_rotate": 2, "network_timeout": "5s", "heartbeat_timeout": "100ms", "election_timeout": "100ms", diff --git a/consensus/raft/config.go b/consensus/raft/config.go index c8ba418e..40bb3d36 100644 --- a/consensus/raft/config.go +++ b/consensus/raft/config.go @@ -4,11 +4,14 @@ import ( "encoding/json" "errors" "io/ioutil" + "path/filepath" "time" + "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/config" hraft "github.com/hashicorp/raft" + peer "github.com/libp2p/go-libp2p-peer" ) // ConfigKey is the default configuration key for holding this component's @@ -22,6 +25,7 @@ var ( DefaultCommitRetries = 1 DefaultNetworkTimeout = 10 * time.Second DefaultCommitRetryDelay = 200 * time.Millisecond + DefaultBackupsRotate = 6 ) // Config allows to configure the Raft Consensus component for ipfs-cluster. @@ -33,10 +37,13 @@ type Config struct { // will shutdown libp2p host on shutdown. Useful for testing hostShutdown bool - // A Hashicorp Raft's configuration object. - RaftConfig *hraft.Config // A folder to store Raft's data. DataFolder string + + // InitPeerset provides the list of initial cluster peers for new Raft + // peers (with no prior state). It is ignored when Raft was already + // initialized or when starting in staging mode. + InitPeerset []peer.ID // LeaderTimeout specifies how long to wait for a leader before // failing an operation. WaitForLeaderTimeout time.Duration @@ -48,6 +55,12 @@ type Config struct { CommitRetries int // How long to wait between retries CommitRetryDelay time.Duration + // BackupsRotate specifies the maximum number of Raft's DataFolder + // copies that we keep as backups (renaming) after cleanup. + BackupsRotate int + + // A Hashicorp Raft's configuration object. + RaftConfig *hraft.Config } // ConfigJSON represents a human-friendly Config @@ -61,6 +74,11 @@ type jsonConfig struct { // the Raft. DataFolder string `json:"data_folder,omitempty"` + // InitPeerset provides the list of initial cluster peers for new Raft + // peers (with no prior state). It is ignored when Raft was already + // initialized or when starting in staging mode. + InitPeerset []string `json:"init_peerset"` + // How long to wait for a leader before failing WaitForLeaderTimeout string `json:"wait_for_leader_timeout"` @@ -73,6 +91,10 @@ type jsonConfig struct { // How long to wait between commit retries CommitRetryDelay string `json:"commit_retry_delay"` + // BackupsRotate specifies the maximum number of Raft's DataFolder + // copies that we keep as backups (renaming) after cleanup. + BackupsRotate int `json:"backups_rotate"` + // HeartbeatTimeout specifies the time in follower state without // a leader before we attempt an election. HeartbeatTimeout string `json:"heartbeat_timeout,omitempty"` @@ -106,10 +128,6 @@ type jsonConfig struct { // step down as leader. LeaderLeaseTimeout string `json:"leader_lease_timeout,omitempty"` - // StartAsLeader forces Raft to start in the leader state. This should - // never be used except for testing purposes, as it can cause a split-brain. - StartAsLeader bool `json:"start_as_leader,omitempty"` - // The unique ID for this server across all time. When running with // ProtocolVersion < 3, you must set this to be the same as the network // address of your transport. @@ -143,6 +161,10 @@ func (cfg *Config) Validate() error { return errors.New("commit_retry_delay is invalid") } + if cfg.BackupsRotate <= 0 { + return errors.New("backups_rotate should be larger than 0") + } + return hraft.ValidateConfig(cfg.RaftConfig) } @@ -186,6 +208,7 @@ func (cfg *Config) LoadJSON(raw []byte) error { config.SetIfNotDefault(networkTimeout, &cfg.NetworkTimeout) cfg.CommitRetries = jcfg.CommitRetries config.SetIfNotDefault(commitRetryDelay, &cfg.CommitRetryDelay) + config.SetIfNotDefault(jcfg.BackupsRotate, &cfg.BackupsRotate) // Raft values config.SetIfNotDefault(heartbeatTimeout, &cfg.RaftConfig.HeartbeatTimeout) @@ -197,25 +220,29 @@ func (cfg *Config) LoadJSON(raw []byte) error { config.SetIfNotDefault(jcfg.SnapshotThreshold, &cfg.RaftConfig.SnapshotThreshold) config.SetIfNotDefault(leaderLeaseTimeout, &cfg.RaftConfig.LeaderLeaseTimeout) + cfg.InitPeerset = api.StringsToPeers(jcfg.InitPeerset) return cfg.Validate() } // ToJSON returns the pretty JSON representation of a Config. func (cfg *Config) ToJSON() ([]byte, error) { - jcfg := &jsonConfig{} - jcfg.DataFolder = cfg.DataFolder - jcfg.WaitForLeaderTimeout = cfg.WaitForLeaderTimeout.String() - jcfg.NetworkTimeout = cfg.NetworkTimeout.String() - jcfg.CommitRetries = cfg.CommitRetries - jcfg.CommitRetryDelay = cfg.CommitRetryDelay.String() - jcfg.HeartbeatTimeout = cfg.RaftConfig.HeartbeatTimeout.String() - jcfg.ElectionTimeout = cfg.RaftConfig.ElectionTimeout.String() - jcfg.CommitTimeout = cfg.RaftConfig.CommitTimeout.String() - jcfg.MaxAppendEntries = cfg.RaftConfig.MaxAppendEntries - jcfg.TrailingLogs = cfg.RaftConfig.TrailingLogs - jcfg.SnapshotInterval = cfg.RaftConfig.SnapshotInterval.String() - jcfg.SnapshotThreshold = cfg.RaftConfig.SnapshotThreshold - jcfg.LeaderLeaseTimeout = cfg.RaftConfig.LeaderLeaseTimeout.String() + jcfg := &jsonConfig{ + DataFolder: cfg.DataFolder, + InitPeerset: api.PeersToStrings(cfg.InitPeerset), + WaitForLeaderTimeout: cfg.WaitForLeaderTimeout.String(), + NetworkTimeout: cfg.NetworkTimeout.String(), + CommitRetries: cfg.CommitRetries, + CommitRetryDelay: cfg.CommitRetryDelay.String(), + BackupsRotate: cfg.BackupsRotate, + HeartbeatTimeout: cfg.RaftConfig.HeartbeatTimeout.String(), + ElectionTimeout: cfg.RaftConfig.ElectionTimeout.String(), + CommitTimeout: cfg.RaftConfig.CommitTimeout.String(), + MaxAppendEntries: cfg.RaftConfig.MaxAppendEntries, + TrailingLogs: cfg.RaftConfig.TrailingLogs, + SnapshotInterval: cfg.RaftConfig.SnapshotInterval.String(), + SnapshotThreshold: cfg.RaftConfig.SnapshotThreshold, + LeaderLeaseTimeout: cfg.RaftConfig.LeaderLeaseTimeout.String(), + } return config.DefaultJSONMarshal(jcfg) } @@ -223,10 +250,12 @@ func (cfg *Config) ToJSON() ([]byte, error) { // Default initializes this configuration with working defaults. func (cfg *Config) Default() error { cfg.DataFolder = "" // empty so it gets omitted + cfg.InitPeerset = []peer.ID{} cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout cfg.NetworkTimeout = DefaultNetworkTimeout cfg.CommitRetries = DefaultCommitRetries cfg.CommitRetryDelay = DefaultCommitRetryDelay + cfg.BackupsRotate = DefaultBackupsRotate cfg.RaftConfig = hraft.DefaultConfig() // These options are imposed over any Default Raft Config. @@ -238,3 +267,11 @@ func (cfg *Config) Default() error { cfg.RaftConfig.Logger = raftStdLogger // see logging.go return nil } + +// GetDataFolder returns the Raft data folder that we are using. +func (cfg *Config) GetDataFolder() string { + if cfg.DataFolder == "" { + return filepath.Join(cfg.BaseDir, DefaultDataSubFolder) + } + return cfg.DataFolder +} diff --git a/consensus/raft/config_test.go b/consensus/raft/config_test.go index 286bcf47..b826a2e2 100644 --- a/consensus/raft/config_test.go +++ b/consensus/raft/config_test.go @@ -9,9 +9,13 @@ import ( var cfgJSON = []byte(` { - "heartbeat_timeout": "1s", - "commit_retries": 1, + "init_peerset": [], "wait_for_leader_timeout": "15s", + "network_timeout": "1s", + "commit_retries": 1, + "commit_retry_delay": "200ms", + "backups_rotate": 5, + "heartbeat_timeout": "1s", "election_timeout": "1s", "commit_timeout": "50ms", "max_append_entries": 64, @@ -94,4 +98,11 @@ func TestDefault(t *testing.T) { if cfg.Validate() == nil { t.Fatal("expected error validating") } + + cfg.Default() + cfg.BackupsRotate = 0 + + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } } diff --git a/consensus/raft/consensus.go b/consensus/raft/consensus.go index 5ea9af86..9bf46497 100644 --- a/consensus/raft/consensus.go +++ b/consensus/raft/consensus.go @@ -47,10 +47,17 @@ type Consensus struct { shutdown bool } -// NewConsensus builds a new ClusterConsensus component. The state -// is used to initialize the Consensus system, so any information in it -// is discarded. -func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state state.State) (*Consensus, error) { +// NewConsensus builds a new ClusterConsensus component using Raft. The state +// is used to initialize the Consensus system, so any information +// in it is discarded once the raft state is loaded. +// The singlePeer parameter controls whether this Raft peer is be expected to +// join a cluster or it should run on its own. +func NewConsensus( + host host.Host, + cfg *Config, + state state.State, + staging bool, // this peer must not be bootstrapped if no state exists +) (*Consensus, error) { err := cfg.Validate() if err != nil { return nil, err @@ -60,7 +67,7 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta logger.Debug("starting Consensus and waiting for a leader...") consensus := libp2praft.NewOpLog(state, baseOp) - raft, err := newRaftWrapper(clusterPeers, host, cfg, consensus.FSM()) + raft, err := newRaftWrapper(host, cfg, consensus.FSM(), staging) if err != nil { logger.Error("error creating raft: ", err) return nil, err @@ -95,10 +102,31 @@ func (cc *Consensus) WaitForSync() error { cc.ctx, cc.config.WaitForLeaderTimeout) defer cancel() + + // 1 - wait for leader + // 2 - wait until we are a Voter + // 3 - wait until last index is applied + + // From raft docs: + + // once a staging server receives enough log entries to be sufficiently + // caught up to the leader's log, the leader will invoke a membership + // change to change the Staging server to a Voter + + // Thus, waiting to be a Voter is a guarantee that we have a reasonable + // up to date state. Otherwise, we might return too early (see + // https://github.com/ipfs/ipfs-cluster/issues/378) + _, err := cc.raft.WaitForLeader(leaderCtx) if err != nil { return errors.New("error waiting for leader: " + err.Error()) } + + err = cc.raft.WaitForVoter(cc.ctx) + if err != nil { + return errors.New("error waiting to become a Voter: " + err.Error()) + } + err = cc.raft.WaitForUpdates(cc.ctx) if err != nil { return errors.New("error waiting for consensus updates: " + err.Error()) @@ -107,15 +135,10 @@ func (cc *Consensus) WaitForSync() error { } // waits until there is a consensus leader and syncs the state -// to the tracker +// to the tracker. If errors happen, this will return and never +// signal the component as Ready. func (cc *Consensus) finishBootstrap() { - err := cc.WaitForSync() - if err != nil { - return - } - logger.Debug("Raft state is now up to date") - - // While rpc is not ready we cannot perform a sync + // wait until we have RPC to perform any actions. if cc.rpcClient == nil { select { case <-cc.ctx.Done(): @@ -124,24 +147,20 @@ func (cc *Consensus) finishBootstrap() { } } - st, err := cc.State() - _ = st - // only check sync if we have a state - // avoid error on new running clusters + // Sometimes bootstrap is a no-op. It only applies when + // no state exists and staging=false. + _, err := cc.raft.Bootstrap() if err != nil { - logger.Debug("skipping state sync: ", err) - } else { - var pInfoSerial []api.PinInfoSerial - cc.rpcClient.Go( - "", - "Cluster", - "StateSync", - struct{}{}, - &pInfoSerial, - nil) + return } - cc.readyCh <- struct{}{} + + err = cc.WaitForSync() + if err != nil { + return + } + logger.Debug("Raft state is now up to date") logger.Debug("consensus ready") + cc.readyCh <- struct{}{} } // Shutdown stops the component so it will not process any @@ -403,7 +422,6 @@ func (cc *Consensus) Clean() error { if err != nil { return err } - logger.Info("consensus data cleaned") return nil } diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index a832f403..eb80c2ae 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -15,7 +15,6 @@ import ( logging "github.com/ipfs/go-log" libp2p "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-host" - peer "github.com/libp2p/go-libp2p-peer" peerstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" ) @@ -59,13 +58,12 @@ func testingConsensus(t *testing.T, idn int) *Consensus { cfg.DataFolder = fmt.Sprintf("raftFolderFromTests-%d", idn) cfg.hostShutdown = true - cc, err := NewConsensus([]peer.ID{h.ID()}, h, cfg, st) + cc, err := NewConsensus(h, cfg, st, false) if err != nil { t.Fatal("cannot create Consensus:", err) } cc.SetClient(test.NewMockRPCClientWithHost(t, h)) <-cc.Ready() - time.Sleep(2 * time.Second) return cc } diff --git a/consensus/raft/data_helper.go b/consensus/raft/data_helper.go index 483bbfbb..e7fa66cb 100644 --- a/consensus/raft/data_helper.go +++ b/consensus/raft/data_helper.go @@ -6,24 +6,22 @@ import ( "path/filepath" ) -// RaftDataBackupKeep indicates the number of data folders we keep around -// after consensus.Clean() has been called. -var RaftDataBackupKeep = 5 - // dataBackupHelper helps making and rotating backups from a folder. // it will name them .old.0, .old.1... and so on. // when a new backup is made, the old.0 is renamed to old.1 and so on. -// when the RaftDataBackupKeep number is reached, the last is always +// when the "keep" number is reached, the oldest is always // discarded. type dataBackupHelper struct { baseDir string folderName string + keep int } -func newDataBackupHelper(dataFolder string) *dataBackupHelper { +func newDataBackupHelper(dataFolder string, keep int) *dataBackupHelper { return &dataBackupHelper{ baseDir: filepath.Dir(dataFolder), folderName: filepath.Base(dataFolder), + keep: keep, } } @@ -33,7 +31,7 @@ func (dbh *dataBackupHelper) makeName(i int) string { func (dbh *dataBackupHelper) listBackups() []string { backups := []string{} - for i := 0; i < RaftDataBackupKeep; i++ { + for i := 0; i < dbh.keep; i++ { name := dbh.makeName(i) if _, err := os.Stat(name); os.IsNotExist(err) { return backups @@ -44,19 +42,32 @@ func (dbh *dataBackupHelper) listBackups() []string { } func (dbh *dataBackupHelper) makeBackup() error { + folder := filepath.Join(dbh.baseDir, dbh.folderName) + if _, err := os.Stat(folder); os.IsNotExist(err) { + // nothing to backup + logger.Debug("nothing to backup") + return nil + } + + // make sure config folder exists err := os.MkdirAll(dbh.baseDir, 0700) if err != nil { return err } + + // list all backups in it backups := dbh.listBackups() - // remove last / oldest - if len(backups) >= RaftDataBackupKeep { + // remove last / oldest. Ex. if max is five, remove name.old.4 + if len(backups) >= dbh.keep { os.RemoveAll(backups[len(backups)-1]) - } else { + } else { // append new backup folder. Ex, if 2 exist: add name.old.2 backups = append(backups, dbh.makeName(len(backups))) } - // increase number for all backups folders + // increase number for all backups folders. + // If there are 3: 1->2, 0->1. + // Note in all cases the last backup in the list does not exist + // (either removed or not created, just added to this list) for i := len(backups) - 1; i > 0; i-- { err := os.Rename(backups[i-1], backups[i]) if err != nil { diff --git a/consensus/raft/data_helper_test.go b/consensus/raft/data_helper_test.go index aacbff46..c06ca9f1 100644 --- a/consensus/raft/data_helper_test.go +++ b/consensus/raft/data_helper_test.go @@ -7,9 +7,11 @@ import ( ) func TestDataBackupHelper(t *testing.T) { + keep := 5 + cleanup := func() { os.RemoveAll("data_helper_testing") - for i := 0; i < 2*RaftDataBackupKeep; i++ { + for i := 0; i < 2*keep; i++ { os.RemoveAll(fmt.Sprintf("data_helper_testing.old.%d", i)) } } @@ -17,15 +19,15 @@ func TestDataBackupHelper(t *testing.T) { defer cleanup() os.MkdirAll("data_helper_testing", 0700) - helper := newDataBackupHelper("data_helper_testing") - for i := 0; i < 2*RaftDataBackupKeep; i++ { + helper := newDataBackupHelper("data_helper_testing", keep) + for i := 0; i < 2*keep; i++ { err := helper.makeBackup() if err != nil { t.Fatal(err) } backups := helper.listBackups() - if (i < RaftDataBackupKeep && len(backups) != i+1) || - (i >= RaftDataBackupKeep && len(backups) != RaftDataBackupKeep) { + if (i < keep && len(backups) != i+1) || + (i >= keep && len(backups) != keep) { t.Fatal("incorrect number of backups saved") } os.MkdirAll("data_helper_testing", 0700) diff --git a/consensus/raft/raft.go b/consensus/raft/raft.go index 0c63dba0..53282b0a 100644 --- a/consensus/raft/raft.go +++ b/consensus/raft/raft.go @@ -3,6 +3,7 @@ package raft import ( "context" "errors" + "fmt" "io" "os" "path/filepath" @@ -47,156 +48,194 @@ var waitForUpdatesInterval = 100 * time.Millisecond // How many times to retry snapshotting when shutting down var maxShutdownSnapshotRetries = 5 -// raftWrapper performs all Raft-specific operations which are needed by -// Cluster but are not fulfilled by the consensus interface. It should contain -// most of the Raft-related stuff so it can be easily replaced in the future, -// if need be. +// raftWrapper wraps the hraft.Raft object and related things like the +// different stores used or the hraft.Configuration. +// Its methods provide functionality for working with Raft. type raftWrapper struct { raft *hraft.Raft - dataFolder string - srvConfig hraft.Configuration + config *Config + host host.Host + serverConfig hraft.Configuration transport *hraft.NetworkTransport snapshotStore hraft.SnapshotStore logStore hraft.LogStore stableStore hraft.StableStore boltdb *raftboltdb.BoltStore + staging bool } -// newRaft launches a go-libp2p-raft consensus peer. -func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM) (*raftWrapper, error) { +// newRaftWrapper creates a Raft instance and initializes +// everything leaving it ready to use. Note, that Bootstrap() should be called +// to make sure the raft instance is usable. +func newRaftWrapper( + host host.Host, + cfg *Config, + fsm hraft.FSM, + staging bool, +) (*raftWrapper, error) { + + raftW := &raftWrapper{} + raftW.config = cfg + raftW.host = host + raftW.staging = staging // Set correct LocalID cfg.RaftConfig.LocalID = hraft.ServerID(peer.IDB58Encode(host.ID())) - // Prepare data folder - dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder) - if err != nil { - return nil, err - } - srvCfg := makeServerConf(peers) - - logger.Debug("creating libp2p Raft transport") - transport, err := p2praft.NewLibp2pTransport(host, cfg.NetworkTimeout) + df := cfg.GetDataFolder() + err := makeDataFolder(df) if err != nil { return nil, err } - var log hraft.LogStore - var stable hraft.StableStore - var snap hraft.SnapshotStore + raftW.makeServerConfig() - logger.Debug("creating raft snapshot store") - snapstore, err := hraft.NewFileSnapshotStoreWithLogger( - dataFolder, RaftMaxSnapshots, raftStdLogger) + err = raftW.makeTransport() if err != nil { return nil, err } - logger.Debug("creating BoltDB store") - store, err := raftboltdb.NewBoltStore( - filepath.Join(dataFolder, "raft.db")) + err = raftW.makeStores() if err != nil { return nil, err } - // wraps the store in a LogCache to improve performance. - // See consul/agent/consul/serger.go - cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store) - if err != nil { - return nil, err - } - - stable = store - log = cacheStore - snap = snapstore - - logger.Debug("checking for existing raft states") - hasState, err := hraft.HasExistingState(log, stable, snap) - if err != nil { - return nil, err - } - if !hasState { - logger.Info("initializing raft cluster") - err := hraft.BootstrapCluster(cfg.RaftConfig, - log, stable, snap, transport, srvCfg) - if err != nil { - logger.Error("bootstrapping cluster: ", err) - return nil, err - } - } else { - logger.Debug("raft cluster is already initialized") - } - logger.Debug("creating Raft") - r, err := hraft.NewRaft(cfg.RaftConfig, - fsm, log, stable, snap, transport) + raftW.raft, err = hraft.NewRaft( + cfg.RaftConfig, + fsm, + raftW.logStore, + raftW.stableStore, + raftW.snapshotStore, + raftW.transport, + ) if err != nil { logger.Error("initializing raft: ", err) return nil, err } - raftW := &raftWrapper{ - raft: r, - dataFolder: dataFolder, - srvConfig: srvCfg, - transport: transport, - snapshotStore: snap, - logStore: log, - stableStore: stable, - boltdb: store, - } - - // Handle existing, different configuration - if hasState { - cf := r.GetConfiguration() - if err := cf.Error(); err != nil { - return nil, err - } - currentCfg := cf.Configuration() - added, removed := diffConfigurations(srvCfg, currentCfg) - if len(added)+len(removed) > 0 { - raftW.Shutdown() - logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - logger.Error("Raft peers do not match cluster peers from the configuration.") - logger.Error("This likely indicates that this peer has left the cluster and/or") - logger.Error("has a dirty state. Clean the raft state for this peer") - logger.Errorf("(%s)", dataFolder) - logger.Error("bootstrap it to a working cluster.") - logger.Error("Raft peers:") - for _, s := range currentCfg.Servers { - logger.Errorf(" - %s", s.ID) - } - logger.Error("Cluster configuration peers:") - for _, s := range srvCfg.Servers { - logger.Errorf(" - %s", s.ID) - } - logger.Errorf("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - return nil, errBadRaftState - //return nil, errors.New("Bad cluster peers") - } - } - return raftW, nil } -// returns the folder path after creating it. -// if folder is empty, it uses baseDir+Default. -func makeDataFolder(baseDir, folder string) (string, error) { - if folder == "" { - folder = filepath.Join(baseDir, DefaultDataSubFolder) - } - +// makeDataFolder creates the folder that is meant +// to store Raft data. +func makeDataFolder(folder string) error { err := os.MkdirAll(folder, 0700) if err != nil { - return "", err + return err } - return folder, nil + return nil } -// create Raft servers configuration +func (rw *raftWrapper) makeTransport() (err error) { + logger.Debug("creating libp2p Raft transport") + rw.transport, err = p2praft.NewLibp2pTransport( + rw.host, + rw.config.NetworkTimeout, + ) + return err +} + +func (rw *raftWrapper) makeStores() error { + logger.Debug("creating BoltDB store") + df := rw.config.GetDataFolder() + store, err := raftboltdb.NewBoltStore(filepath.Join(df, "raft.db")) + if err != nil { + return err + } + + // wraps the store in a LogCache to improve performance. + // See consul/agent/consul/server.go + cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store) + if err != nil { + return err + } + + logger.Debug("creating raft snapshot store") + snapstore, err := hraft.NewFileSnapshotStoreWithLogger( + df, + RaftMaxSnapshots, + raftStdLogger, + ) + if err != nil { + return err + } + + rw.logStore = cacheStore + rw.stableStore = store + rw.snapshotStore = snapstore + rw.boltdb = store + return nil +} + +// Bootstrap calls BootstrapCluster on the Raft instance with a valid +// Configuration (generated from InitPeerset) when Raft has no state +// and we are not setting up a staging peer. It returns if Raft +// was boostrapped (true) and an error. +func (rw *raftWrapper) Bootstrap() (bool, error) { + logger.Debug("checking for existing raft states") + hasState, err := hraft.HasExistingState( + rw.logStore, + rw.stableStore, + rw.snapshotStore, + ) + if err != nil { + return false, err + } + + if hasState { + logger.Debug("raft cluster is already initialized") + + // Inform the user that we are working with a pre-existing peerset + logger.Info("existing Raft state found! raft.InitPeerset will be ignored") + cf := rw.raft.GetConfiguration() + if err := cf.Error(); err != nil { + logger.Debug(err) + return false, err + } + currentCfg := cf.Configuration() + srvs := "" + for _, s := range currentCfg.Servers { + srvs += fmt.Sprintf(" %s\n", s.ID) + } + + logger.Debugf("Current Raft Peerset:\n%s\n", srvs) + return false, nil + } + + if rw.staging { + logger.Debug("staging servers do not need initialization") + logger.Info("peer is ready to join a cluster") + return false, nil + } + + voters := "" + for _, s := range rw.serverConfig.Servers { + voters += fmt.Sprintf(" %s\n", s.ID) + } + + logger.Infof("initializing raft cluster with the following voters:\n%s\n", voters) + + future := rw.raft.BootstrapCluster(rw.serverConfig) + if err := future.Error(); err != nil { + logger.Error("bootstrapping cluster: ", err) + return true, err + } + return true, nil +} + +// create Raft servers configuration. The result is used +// by Bootstrap() when it proceeds to Bootstrap. +func (rw *raftWrapper) makeServerConfig() { + rw.serverConfig = makeServerConf(append(rw.config.InitPeerset, rw.host.ID())) +} + +// creates a server configuration with all peers as Voters. func makeServerConf(peers []peer.ID) hraft.Configuration { sm := make(map[string]struct{}) servers := make([]hraft.Server, 0) + + // Servers are peers + self. We avoid duplicate entries below for _, pid := range peers { p := peer.IDB58Encode(pid) _, ok := sm[p] @@ -209,38 +248,7 @@ func makeServerConf(peers []peer.ID) hraft.Configuration { }) } } - return hraft.Configuration{ - Servers: servers, - } -} - -// diffConfigurations returns the serverIDs added and removed from -// c2 in relation to c1. -func diffConfigurations( - c1, c2 hraft.Configuration) (added, removed []hraft.ServerID) { - m1 := make(map[hraft.ServerID]struct{}) - m2 := make(map[hraft.ServerID]struct{}) - added = make([]hraft.ServerID, 0) - removed = make([]hraft.ServerID, 0) - for _, s := range c1.Servers { - m1[s.ID] = struct{}{} - } - for _, s := range c2.Servers { - m2[s.ID] = struct{}{} - } - for k := range m1 { - _, ok := m2[k] - if !ok { - removed = append(removed, k) - } - } - for k := range m2 { - _, ok := m1[k] - if !ok { - added = append(added, k) - } - } - return + return hraft.Configuration{Servers: servers} } // WaitForLeader holds until Raft says we have a leader. @@ -278,6 +286,38 @@ func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) { } } +func (rw *raftWrapper) WaitForVoter(ctx context.Context) error { + logger.Debug("waiting until we are promoted to a voter") + + pid := hraft.ServerID(peer.IDB58Encode(rw.host.ID())) + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + configFuture := rw.raft.GetConfiguration() + if err := configFuture.Error(); err != nil { + return err + } + + if isVoter(pid, configFuture.Configuration()) { + return nil + } + + time.Sleep(waitForUpdatesInterval) + } + } +} + +func isVoter(srvID hraft.ServerID, cfg hraft.Configuration) bool { + for _, server := range cfg.Servers { + if server.ID == srvID && server.Suffrage == hraft.Voter { + return true + } + } + return false +} + // WaitForUpdates holds until Raft has synced to the last index in the log func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error { logger.Debug("Raft state is catching up to the latest known version. Please wait...") @@ -481,8 +521,8 @@ func (rw *raftWrapper) Peers() ([]string, error) { } // latestSnapshot looks for the most recent raft snapshot stored at the -// provided basedir. It returns a boolean indicating if any snapshot is -// readable, the snapshot's metadata, and a reader to the snapshot's bytes +// provided basedir. It returns the snapshot's metadata, and a reader +// to the snapshot's bytes func latestSnapshot(raftDataFolder string) (*hraft.SnapshotMeta, io.ReadCloser, error) { store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil) if err != nil { @@ -506,10 +546,12 @@ func latestSnapshot(raftDataFolder string) (*hraft.SnapshotMeta, io.ReadCloser, // and a flag indicating whether any snapshot was found. func LastStateRaw(cfg *Config) (io.Reader, bool, error) { // Read most recent snapshot - dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder) - if err != nil { - return nil, false, err + dataFolder := cfg.GetDataFolder() + if _, err := os.Stat(dataFolder); os.IsNotExist(err) { + // nothing to read + return nil, false, nil } + meta, r, err := latestSnapshot(dataFolder) if err != nil { return nil, false, err @@ -530,7 +572,8 @@ func SnapshotSave(cfg *Config, newState state.State, pids []peer.ID) error { if err != nil { return err } - dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder) + dataFolder := cfg.GetDataFolder() + err = makeDataFolder(dataFolder) if err != nil { return err } @@ -550,7 +593,7 @@ func SnapshotSave(cfg *Config, newState state.State, pids []peer.ID) error { raftIndex = meta.Index raftTerm = meta.Term srvCfg = meta.Configuration - CleanupRaft(dataFolder) + CleanupRaft(dataFolder, cfg.BackupsRotate) } else { // Begin the log after the index of a fresh start so that // the snapshot's state propagate's during bootstrap @@ -583,9 +626,19 @@ func SnapshotSave(cfg *Config, newState state.State, pids []peer.ID) error { } // CleanupRaft moves the current data folder to a backup location -func CleanupRaft(dataFolder string) error { - dbh := newDataBackupHelper(dataFolder) - err := dbh.makeBackup() +func CleanupRaft(dataFolder string, keep int) error { + meta, _, err := latestSnapshot(dataFolder) + if meta == nil && err == nil { + // no snapshots at all. Avoid creating backups + // from empty state folders. + logger.Infof("cleaning empty Raft data folder (%s)", dataFolder) + os.RemoveAll(dataFolder) + return nil + } + + logger.Infof("cleaning and backing up Raft data folder (%s)", dataFolder) + dbh := newDataBackupHelper(dataFolder, keep) + err = dbh.makeBackup() if err != nil { logger.Warning(err) logger.Warning("the state could not be cleaned properly") @@ -596,7 +649,7 @@ func CleanupRaft(dataFolder string) error { // only call when Raft is shutdown func (rw *raftWrapper) Clean() error { - return CleanupRaft(rw.dataFolder) + return CleanupRaft(rw.config.GetDataFolder(), rw.config.BackupsRotate) } func find(s []string, elem string) bool { diff --git a/ipfs-cluster-service/configs.go b/ipfs-cluster-service/configs.go new file mode 100644 index 00000000..e9cb3e9f --- /dev/null +++ b/ipfs-cluster-service/configs.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + + ipfscluster "github.com/ipfs/ipfs-cluster" + "github.com/ipfs/ipfs-cluster/api/rest" + "github.com/ipfs/ipfs-cluster/config" + "github.com/ipfs/ipfs-cluster/consensus/raft" + "github.com/ipfs/ipfs-cluster/informer/disk" + "github.com/ipfs/ipfs-cluster/informer/numpin" + "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" + "github.com/ipfs/ipfs-cluster/monitor/basic" + "github.com/ipfs/ipfs-cluster/pintracker/maptracker" +) + +type cfgs struct { + clusterCfg *ipfscluster.Config + apiCfg *rest.Config + ipfshttpCfg *ipfshttp.Config + consensusCfg *raft.Config + trackerCfg *maptracker.Config + monCfg *basic.Config + diskInfCfg *disk.Config + numpinInfCfg *numpin.Config +} + +func makeConfigs() (*config.Manager, *cfgs) { + cfg := config.NewManager() + clusterCfg := &ipfscluster.Config{} + apiCfg := &rest.Config{} + ipfshttpCfg := &ipfshttp.Config{} + consensusCfg := &raft.Config{} + trackerCfg := &maptracker.Config{} + monCfg := &basic.Config{} + diskInfCfg := &disk.Config{} + numpinInfCfg := &numpin.Config{} + cfg.RegisterComponent(config.Cluster, clusterCfg) + cfg.RegisterComponent(config.API, apiCfg) + cfg.RegisterComponent(config.IPFSConn, ipfshttpCfg) + cfg.RegisterComponent(config.Consensus, consensusCfg) + cfg.RegisterComponent(config.PinTracker, trackerCfg) + cfg.RegisterComponent(config.Monitor, monCfg) + cfg.RegisterComponent(config.Informer, diskInfCfg) + cfg.RegisterComponent(config.Informer, numpinInfCfg) + return cfg, &cfgs{clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg} +} + +func saveConfig(cfg *config.Manager, force bool) { + if _, err := os.Stat(configPath); err == nil && !force { + err := fmt.Errorf("%s exists. Try running: %s -f init", configPath, programName) + checkErr("", err) + } + + err := os.MkdirAll(filepath.Dir(configPath), 0700) + err = cfg.SaveJSON(configPath) + checkErr("saving new configuration", err) + out("%s configuration written to %s\n", programName, configPath) +} diff --git a/ipfs-cluster-service/daemon.go b/ipfs-cluster-service/daemon.go new file mode 100644 index 00000000..4596f9c5 --- /dev/null +++ b/ipfs-cluster-service/daemon.go @@ -0,0 +1,226 @@ +package main + +import ( + "context" + "errors" + "os" + "os/signal" + "syscall" + "time" + + "github.com/urfave/cli" + + ipfscluster "github.com/ipfs/ipfs-cluster" + "github.com/ipfs/ipfs-cluster/allocator/ascendalloc" + "github.com/ipfs/ipfs-cluster/allocator/descendalloc" + "github.com/ipfs/ipfs-cluster/api/rest" + "github.com/ipfs/ipfs-cluster/consensus/raft" + "github.com/ipfs/ipfs-cluster/informer/disk" + "github.com/ipfs/ipfs-cluster/informer/numpin" + "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" + "github.com/ipfs/ipfs-cluster/monitor/basic" + "github.com/ipfs/ipfs-cluster/pintracker/maptracker" + "github.com/ipfs/ipfs-cluster/pstoremgr" + "github.com/ipfs/ipfs-cluster/state/mapstate" + + ma "github.com/multiformats/go-multiaddr" +) + +func parseBootstraps(flagVal []string) (bootstraps []ma.Multiaddr) { + for _, a := range flagVal { + bAddr, err := ma.NewMultiaddr(a) + checkErr("error parsing bootstrap multiaddress (%s)", err, a) + bootstraps = append(bootstraps, bAddr) + } + return +} + +// Runs the cluster peer +func daemon(c *cli.Context) error { + logger.Info("Initializing. For verbose output run with \"-l debug\". Please wait...") + + // Load all the configurations + cfgMgr, cfgs := makeConfigs() + + // Run any migrations + if c.Bool("upgrade") { + err := upgrade() + if err != errNoSnapshot { + checkErr("upgrading state", err) + } // otherwise continue + } + + bootstraps := parseBootstraps(c.StringSlice("bootstrap")) + + // Execution lock + err := locker.lock() + checkErr("acquiring execution lock", err) + defer locker.tryUnlock() + + // Load all the configurations + // always wait for configuration to be saved + defer cfgMgr.Shutdown() + + err = cfgMgr.LoadJSONFromFile(configPath) + checkErr("loading configuration", err) + + // Cleanup state if bootstrapping + raftStaging := false + if len(bootstraps) > 0 { + cleanupState(cfgs.consensusCfg) + raftStaging = true + } + + if c.Bool("leave") { + cfgs.clusterCfg.LeaveOnShutdown = true + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := createCluster(ctx, c, cfgs, raftStaging) + checkErr("starting cluster", err) + + // noop if no bootstraps + // if bootstrapping fails, consensus will never be ready + // and timeout. So this can happen in background and we + // avoid worrying about error handling here (since Cluster + // will realize). + go bootstrap(cluster, bootstraps) + + return handleSignals(cluster) +} + +func createCluster( + ctx context.Context, + c *cli.Context, + cfgs *cfgs, + raftStaging bool, +) (*ipfscluster.Cluster, error) { + + host, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg) + checkErr("creating libP2P Host", err) + + peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath()) + peerstoreMgr.ImportPeersFromPeerstore(false) + + api, err := rest.NewAPIWithHost(cfgs.apiCfg, host) + checkErr("creating REST API component", err) + + proxy, err := ipfshttp.NewConnector(cfgs.ipfshttpCfg) + checkErr("creating IPFS Connector component", err) + + state := mapstate.NewMapState() + + err = validateVersion(cfgs.clusterCfg, cfgs.consensusCfg) + checkErr("validating version", err) + + raftcon, err := raft.NewConsensus( + host, + cfgs.consensusCfg, + state, + raftStaging, + ) + checkErr("creating consensus component", err) + + tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, cfgs.clusterCfg.ID) + mon, err := basic.NewMonitor(cfgs.monCfg) + checkErr("creating Monitor component", err) + informer, alloc := setupAllocation(c.String("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg) + + ipfscluster.ReadyTimeout = cfgs.consensusCfg.WaitForLeaderTimeout + 5*time.Second + + return ipfscluster.NewCluster( + host, + cfgs.clusterCfg, + raftcon, + api, + proxy, + state, + tracker, + mon, + alloc, + informer, + ) +} + +// bootstrap will bootstrap this peer to one of the bootstrap addresses +// if there are any. +func bootstrap(cluster *ipfscluster.Cluster, bootstraps []ma.Multiaddr) { + for _, bstrap := range bootstraps { + logger.Infof("Bootstrapping to %s", bstrap) + err := cluster.Join(bstrap) + if err != nil { + logger.Errorf("bootstrap to %s failed: %s", bstrap, err) + } + } +} + +func handleSignals(cluster *ipfscluster.Cluster) error { + signalChan := make(chan os.Signal, 20) + signal.Notify( + signalChan, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGHUP, + ) + + var ctrlcCount int + for { + select { + case <-signalChan: + ctrlcCount++ + handleCtrlC(cluster, ctrlcCount) + case <-cluster.Done(): + return nil + } + } +} + +func handleCtrlC(cluster *ipfscluster.Cluster, ctrlcCount int) { + switch ctrlcCount { + case 1: + go func() { + err := cluster.Shutdown() + checkErr("shutting down cluster", err) + }() + case 2: + out(` + + +!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +Shutdown is taking too long! Press Ctrl-c again to manually kill cluster. +Note that this may corrupt the local cluster state. +!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + + +`) + case 3: + out("exiting cluster NOW") + os.Exit(-1) + } +} + +func setupAllocation(name string, + diskInfCfg *disk.Config, + numpinInfCfg *numpin.Config, +) (ipfscluster.Informer, ipfscluster.PinAllocator) { + switch name { + case "disk", "disk-freespace": + informer, err := disk.NewInformer(diskInfCfg) + checkErr("creating informer", err) + return informer, descendalloc.NewAllocator() + case "disk-reposize": + informer, err := disk.NewInformer(diskInfCfg) + checkErr("creating informer", err) + return informer, ascendalloc.NewAllocator() + case "numpin", "pincount": + informer, err := numpin.NewInformer(numpinInfCfg) + checkErr("creating informer", err) + return informer, ascendalloc.NewAllocator() + default: + err := errors.New("unknown allocation strategy") + checkErr("", err) + return nil, nil + } +} diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 8eae60f6..1cbdbdab 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -2,39 +2,30 @@ package main import ( "bufio" - "context" - "errors" "fmt" "io" "os" - "os/signal" "os/user" "path/filepath" - "syscall" // _ "net/http/pprof" logging "github.com/ipfs/go-log" - ma "github.com/multiformats/go-multiaddr" cli "github.com/urfave/cli" ipfscluster "github.com/ipfs/ipfs-cluster" - "github.com/ipfs/ipfs-cluster/allocator/ascendalloc" - "github.com/ipfs/ipfs-cluster/allocator/descendalloc" - "github.com/ipfs/ipfs-cluster/api/rest" - "github.com/ipfs/ipfs-cluster/config" - "github.com/ipfs/ipfs-cluster/consensus/raft" - "github.com/ipfs/ipfs-cluster/informer/disk" - "github.com/ipfs/ipfs-cluster/informer/numpin" - "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" - "github.com/ipfs/ipfs-cluster/monitor/basic" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/state/mapstate" ) // ProgramName of this application const programName = `ipfs-cluster-service` +// flag defaults +const ( + defaultAllocation = "disk-freespace" + defaultLogLevel = "info" +) + // We store a commit id here var commit string @@ -88,11 +79,11 @@ $ ipfs-cluster-service init Launch a cluster: -$ ipfs-cluster-service +$ ipfs-cluster-service daemon Launch a peer and join existing cluster: -$ ipfs-cluster-service --bootstrap /ip4/192.168.1.2/tcp/9096/ipfs/QmPSoSaPXpyunaBwHs1rZBKYSqRV4bLRk32VGYLuvdrypL +$ ipfs-cluster-service daemon --bootstrap /ip4/192.168.1.2/tcp/9096/ipfs/QmPSoSaPXpyunaBwHs1rZBKYSqRV4bLRk32VGYLuvdrypL `, programName, programName, @@ -174,29 +165,15 @@ func main() { Name: "force, f", Usage: "forcefully proceed with some actions. i.e. overwriting configuration", }, - cli.StringFlag{ - Name: "bootstrap, j", - Usage: "join a cluster providing an existing peer's `multiaddress`. Overrides the \"bootstrap\" values from the configuration", - }, - cli.BoolFlag{ - Name: "leave, x", - Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"", - Hidden: true, - }, cli.BoolFlag{ Name: "debug, d", Usage: "enable full debug logging (very verbose)", }, cli.StringFlag{ Name: "loglevel, l", - Value: "info", + Value: defaultLogLevel, Usage: "set the loglevel for cluster components only [critical, error, warning, info, debug]", }, - cli.StringFlag{ - Name: "alloc, a", - Value: "disk-freespace", - Usage: "allocation strategy to use [disk-freespace,disk-reposize,numpin].", - }, } app.Commands = []cli.Command{ @@ -251,6 +228,20 @@ configuration. Name: "upgrade, u", Usage: "run necessary state migrations before starting cluster service", }, + cli.StringSliceFlag{ + Name: "bootstrap, j", + Usage: "join a cluster providing an existing peers multiaddress(es)", + }, + cli.BoolFlag{ + Name: "leave, x", + Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"", + Hidden: true, + }, + cli.StringFlag{ + Name: "alloc, a", + Value: defaultAllocation, + Usage: "allocation strategy to use [disk-freespace,disk-reposize,numpin].", + }, }, Action: daemon, }, @@ -384,13 +375,11 @@ the mth data folder (m currently defaults to 5) cfgMgr, cfgs := makeConfigs() err = cfgMgr.LoadJSONFromFile(configPath) - checkErr("initializing configs", err) + checkErr("reading configuration", err) - dataFolder := filepath.Join(cfgs.consensusCfg.BaseDir, raft.DefaultDataSubFolder) - err = raft.CleanupRaft(dataFolder) + err = cleanupState(cfgs.consensusCfg) checkErr("Cleaning up consensus data", err) - logger.Warningf("the %s folder has been rotated. Next start will use an empty state", dataFolder) - + logger.Warningf("the %s folder has been rotated. Next start will use an empty state", cfgs.consensusCfg.GetDataFolder()) return nil }, }, @@ -436,122 +425,22 @@ the mth data folder (m currently defaults to 5) // run daemon() by default, or error. func run(c *cli.Context) error { - if len(c.Args()) > 0 { - return fmt.Errorf("unknown subcommand. Run \"%s help\" for more info", programName) - } - return daemon(c) -} - -func daemon(c *cli.Context) error { - logger.Info("Initializing. For verbose output run with \"-l debug\". Please wait...") - - // Load all the configurations - cfgMgr, cfgs := makeConfigs() - - // Run any migrations - if c.Bool("upgrade") { - err := upgrade() - if err != errNoSnapshot { - checkErr("upgrading state", err) - } // otherwise continue - } - - // Execution lock - err := locker.lock() - checkErr("acquiring execution lock", err) - defer locker.tryUnlock() - - // Load all the configurations - // always wait for configuration to be saved - defer cfgMgr.Shutdown() - - err = cfgMgr.LoadJSONFromFile(configPath) - checkErr("loading configuration", err) - - if a := c.String("bootstrap"); a != "" { - if len(cfgs.clusterCfg.Peers) > 0 && !c.Bool("force") { - return errors.New("the configuration provides cluster.Peers. Use -f to ignore and proceed bootstrapping") - } - joinAddr, err := ma.NewMultiaddr(a) - checkErr("error parsing multiaddress: %s", err) - cfgs.clusterCfg.Bootstrap = []ma.Multiaddr{joinAddr} - cfgs.clusterCfg.Peers = []ma.Multiaddr{} - } - - if c.Bool("leave") { - cfgs.clusterCfg.LeaveOnShutdown = true - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - cluster, err := initializeCluster(ctx, c, cfgs) - checkErr("starting cluster", err) - - signalChan := make(chan os.Signal, 20) - signal.Notify( - signalChan, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGHUP, - ) - - var ctrlcCount int - for { - select { - case <-signalChan: - ctrlcCount++ - handleCtrlC(cluster, ctrlcCount) - case <-cluster.Done(): - return nil - } - } + cli.ShowAppHelp(c) + os.Exit(1) + return nil } func setupLogLevel(lvl string) { for f := range ipfscluster.LoggingFacilities { ipfscluster.SetFacilityLogLevel(f, lvl) } + ipfscluster.SetFacilityLogLevel("service", lvl) } func setupDebug() { ipfscluster.SetFacilityLogLevel("*", "DEBUG") } -func setupAllocation(name string, diskInfCfg *disk.Config, numpinInfCfg *numpin.Config) (ipfscluster.Informer, ipfscluster.PinAllocator) { - switch name { - case "disk", "disk-freespace": - informer, err := disk.NewInformer(diskInfCfg) - checkErr("creating informer", err) - return informer, descendalloc.NewAllocator() - case "disk-reposize": - informer, err := disk.NewInformer(diskInfCfg) - checkErr("creating informer", err) - return informer, ascendalloc.NewAllocator() - case "numpin", "pincount": - informer, err := numpin.NewInformer(numpinInfCfg) - checkErr("creating informer", err) - return informer, ascendalloc.NewAllocator() - default: - err := errors.New("unknown allocation strategy") - checkErr("", err) - return nil, nil - } -} - -func saveConfig(cfg *config.Manager, force bool) { - if _, err := os.Stat(configPath); err == nil && !force { - err := fmt.Errorf("%s exists. Try running: %s -f init", configPath, programName) - checkErr("", err) - } - - err := os.MkdirAll(filepath.Dir(configPath), 0700) - err = cfg.SaveJSON(configPath) - checkErr("saving new configuration", err) - out("%s configuration written to %s\n", - programName, configPath) -} - func userProvidedSecret(enterSecret bool) ([]byte, bool) { var secret string if enterSecret { @@ -592,93 +481,3 @@ func yesNoPrompt(prompt string) bool { } return false } - -func makeConfigs() (*config.Manager, *cfgs) { - cfg := config.NewManager() - clusterCfg := &ipfscluster.Config{} - apiCfg := &rest.Config{} - ipfshttpCfg := &ipfshttp.Config{} - consensusCfg := &raft.Config{} - trackerCfg := &maptracker.Config{} - monCfg := &basic.Config{} - diskInfCfg := &disk.Config{} - numpinInfCfg := &numpin.Config{} - cfg.RegisterComponent(config.Cluster, clusterCfg) - cfg.RegisterComponent(config.API, apiCfg) - cfg.RegisterComponent(config.IPFSConn, ipfshttpCfg) - cfg.RegisterComponent(config.Consensus, consensusCfg) - cfg.RegisterComponent(config.PinTracker, trackerCfg) - cfg.RegisterComponent(config.Monitor, monCfg) - cfg.RegisterComponent(config.Informer, diskInfCfg) - cfg.RegisterComponent(config.Informer, numpinInfCfg) - return cfg, &cfgs{clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg} -} - -type cfgs struct { - clusterCfg *ipfscluster.Config - apiCfg *rest.Config - ipfshttpCfg *ipfshttp.Config - consensusCfg *raft.Config - trackerCfg *maptracker.Config - monCfg *basic.Config - diskInfCfg *disk.Config - numpinInfCfg *numpin.Config -} - -func initializeCluster(ctx context.Context, c *cli.Context, cfgs *cfgs) (*ipfscluster.Cluster, error) { - host, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg) - checkErr("creating libP2P Host", err) - - api, err := rest.NewAPIWithHost(cfgs.apiCfg, host) - checkErr("creating REST API component", err) - - proxy, err := ipfshttp.NewConnector(cfgs.ipfshttpCfg) - checkErr("creating IPFS Connector component", err) - - state := mapstate.NewMapState() - - err = validateVersion(cfgs.clusterCfg, cfgs.consensusCfg) - checkErr("validating version", err) - - tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, cfgs.clusterCfg.ID) - mon, err := basic.NewMonitor(cfgs.monCfg) - checkErr("creating Monitor component", err) - informer, alloc := setupAllocation(c.GlobalString("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg) - - return ipfscluster.NewCluster( - host, - cfgs.clusterCfg, - cfgs.consensusCfg, - api, - proxy, - state, - tracker, - mon, - alloc, - informer, - ) -} - -func handleCtrlC(cluster *ipfscluster.Cluster, ctrlcCount int) { - switch ctrlcCount { - case 1: - go func() { - err := cluster.Shutdown() - checkErr("shutting down cluster", err) - }() - case 2: - out(` - - -!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -Shutdown is taking too long! Press Ctrl-c again to manually kill cluster. -Note that this may corrupt the local cluster state. -!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - - -`) - case 3: - out("exiting cluster NOW") - os.Exit(-1) - } -} diff --git a/ipfs-cluster-service/state.go b/ipfs-cluster-service/state.go index 3af6df75..3e43cfb3 100644 --- a/ipfs-cluster-service/state.go +++ b/ipfs-cluster-service/state.go @@ -10,6 +10,7 @@ import ( ipfscluster "github.com/ipfs/ipfs-cluster" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/consensus/raft" + "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/state/mapstate" ) @@ -33,7 +34,8 @@ func upgrade() error { return err } - raftPeers := append(ipfscluster.PeersFromMultiaddrs(cfgs.clusterCfg.Peers), cfgs.clusterCfg.ID) + pm := pstoremgr.New(nil, cfgs.clusterCfg.GetPeerstorePath()) + raftPeers := append(ipfscluster.PeersFromMultiaddrs(pm.LoadPeerstore()), cfgs.clusterCfg.ID) return raft.SnapshotSave(cfgs.consensusCfg, newState, raftPeers) } @@ -111,7 +113,9 @@ func stateImport(r io.Reader) error { return err } } - raftPeers := append(ipfscluster.PeersFromMultiaddrs(cfgs.clusterCfg.Peers), cfgs.clusterCfg.ID) + + pm := pstoremgr.New(nil, cfgs.clusterCfg.GetPeerstorePath()) + raftPeers := append(ipfscluster.PeersFromMultiaddrs(pm.LoadPeerstore()), cfgs.clusterCfg.ID) return raft.SnapshotSave(cfgs.consensusCfg, stateToImport, raftPeers) } @@ -159,3 +163,8 @@ func exportState(state *mapstate.MapState, w io.Writer) error { enc.SetIndent("", " ") return enc.Encode(pinSerials) } + +// CleanupState cleans the state +func cleanupState(cCfg *raft.Config) error { + return raft.CleanupRaft(cCfg.GetDataFolder(), cCfg.BackupsRotate) +} diff --git a/ipfscluster.go b/ipfscluster.go index 32f5cde7..2eab8baa 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -37,8 +37,8 @@ type Component interface { // the Cluster main component. type Consensus interface { Component - // Returns a channel to signal that the consensus - // algorithm is ready + // Returns a channel to signal that the consensus layer is ready + // allowing the main component to wait for it during start. Ready() <-chan struct{} // Logs a pin operation LogPin(c api.Pin) error diff --git a/ipfscluster_test.go b/ipfscluster_test.go index eac868d5..b391d297 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -79,7 +79,7 @@ func randomBytes() []byte { return bs } -func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft.Config, API, IPFSConnector, state.State, PinTracker, PeerMonitor, PinAllocator, Informer, *test.IpfsMock) { +func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (host.Host, *Config, *raft.Consensus, API, IPFSConnector, state.State, PinTracker, PeerMonitor, PinAllocator, Informer, *test.IpfsMock) { mock := test.NewIpfsMock() // //clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i)) @@ -106,6 +106,13 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft clusterCfg.Secret = clusterSecret clusterCfg.ListenAddr = clusterAddr clusterCfg.LeaveOnShutdown = false + clusterCfg.SetBaseDir("./e2eTestRaft/" + pid.Pretty()) + + ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second + + host, err := NewClusterHost(context.Background(), clusterCfg) + checkErr(t, err) + apiCfg.HTTPListenAddr = apiAddr ipfshttpCfg.ProxyAddr = proxyAddr ipfshttpCfg.NodeAddr = nodeAddr @@ -122,27 +129,29 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft alloc := descendalloc.NewAllocator() inf, err := disk.NewInformer(diskInfCfg) checkErr(t, err) + raftCon, err := raft.NewConsensus(host, consensusCfg, state, staging) + checkErr(t, err) - return clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock + return host, clusterCfg, raftCon, api, ipfs, state, tracker, mon, alloc, inf, mock } -func createCluster(t *testing.T, host host.Host, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster { - cl, err := NewCluster(host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf) +func createCluster(t *testing.T, host host.Host, clusterCfg *Config, raftCons *raft.Consensus, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster { + cl, err := NewCluster(host, clusterCfg, raftCons, api, ipfs, state, tracker, mon, alloc, inf) checkErr(t, err) - <-cl.Ready() return cl } func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster, *test.IpfsMock) { - clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth, clusterSecret) - cl := createCluster(t, nil, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf) + host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth, clusterSecret, false) + cl := createCluster(t, host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf) + <-cl.Ready() return cl, mock } func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { os.RemoveAll("./e2eTestRaft") cfgs := make([]*Config, nClusters, nClusters) - concfgs := make([]*raft.Config, nClusters, nClusters) + raftCons := make([]*raft.Consensus, nClusters, nClusters) apis := make([]API, nClusters, nClusters) ipfss := make([]IPFSConnector, nClusters, nClusters) states := make([]state.State, nClusters, nClusters) @@ -159,9 +168,11 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { // clusterPeers := make([]ma.Multiaddr, nClusters, nClusters) for i := 0; i < nClusters; i++ { - clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, i, testingClusterSecret) + // staging = true for all except first (i==0) + host, clusterCfg, raftCon, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, i, testingClusterSecret, i != 0) + hosts[i] = host cfgs[i] = clusterCfg - concfgs[i] = consensusCfg + raftCons[i] = raftCon apis[i] = api ipfss[i] = ipfs states[i] = state @@ -170,44 +181,6 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { allocs[i] = alloc infs[i] = inf ipfsMocks[i] = mock - - // Uncomment with testing with fixed ports and ClusterPeers - // addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s", - // clusterPort+i, - // clusterCfg.ID.Pretty())) - // clusterPeers[i] = addr - } - - // ---------------------------------------------------------- - - // // Set up the cluster using ClusterPeers - // for i := 0; i < nClusters; i++ { - // cfgs[i].Peers = make([]ma.Multiaddr, nClusters, nClusters) - // for j := 0; j < nClusters; j++ { - // cfgs[i].Peers[j] = clusterPeers[j] - // } - // } - - // var wg sync.WaitGroup - // for i := 0; i < nClusters; i++ { - // wg.Add(1) - // go func(i int) { - // clusters[i] = createCluster(t, cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i]) - // wg.Done() - // }(i) - // } - // wg.Wait() - - // ---------------------------------------------- - - // Alternative way of starting using bootstrap - // Create hosts - var err error - for i := 0; i < nClusters; i++ { - hosts[i], err = NewClusterHost(context.Background(), cfgs[i]) - if err != nil { - t.Fatal(err) - } } // open connections among all hosts @@ -225,29 +198,22 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { } // Start first node - clusters[0] = createCluster(t, hosts[0], cfgs[0], concfgs[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0]) - // Find out where it binded + clusters[0] = createCluster(t, hosts[0], cfgs[0], raftCons[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0]) + <-clusters[0].Ready() bootstrapAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", clusters[0].host.Addrs()[0], clusters[0].id.Pretty())) - // Use first node to bootstrap - for i := 1; i < nClusters; i++ { - cfgs[i].Bootstrap = []ma.Multiaddr{bootstrapAddr} - } - waitForLeader(t, clusters[0:1]) - // Start the rest - // We don't do this in parallel because it causes libp2p dial backoffs + // Start the rest and join for i := 1; i < nClusters; i++ { - clusters[i] = createCluster(t, hosts[i], cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i]) + clusters[i] = createCluster(t, hosts[i], cfgs[i], raftCons[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i]) + err := clusters[i].Join(bootstrapAddr) + if err != nil { + logger.Error(err) + t.Fatal(err) + } + <-clusters[i].Ready() } waitForLeader(t, clusters) - // --------------------------------------------- - - // Yet an alternative way using PeerAdd - // for i := 1; i < nClusters; i++ { - // clusters[0].PeerAdd(clusterAddr(clusters[i])) - // } - return clusters, ipfsMocks } diff --git a/peer_manager.go b/peer_manager.go deleted file mode 100644 index bc92acbf..00000000 --- a/peer_manager.go +++ /dev/null @@ -1,86 +0,0 @@ -package ipfscluster - -import ( - "context" - "fmt" - "time" - - "github.com/ipfs/ipfs-cluster/api" - - host "github.com/libp2p/go-libp2p-host" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" - madns "github.com/multiformats/go-multiaddr-dns" -) - -// peerManager provides wrappers peerset control -type peerManager struct { - host host.Host - ctx context.Context -} - -func newPeerManager(h host.Host) *peerManager { - return &peerManager{ - ctx: context.Background(), - host: h, - } -} - -func (pm *peerManager) addPeer(addr ma.Multiaddr, connect bool) error { - logger.Debugf("adding peer address %s", addr) - pid, decapAddr, err := api.Libp2pMultiaddrSplit(addr) - if err != nil { - return err - } - pm.host.Peerstore().AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL) - - // dns multiaddresses need to be resolved because libp2p only does that - // on explicit bhost.Connect(). - if madns.Matches(addr) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - defer cancel() - resolvedAddrs, err := madns.Resolve(ctx, addr) - if err != nil { - logger.Error(err) - return err - } - pm.importAddresses(resolvedAddrs, connect) - } - if connect { - pm.host.Network().DialPeer(pm.ctx, pid) - } - return nil -} - -func (pm *peerManager) rmPeer(pid peer.ID) error { - logger.Debugf("forgetting peer %s", pid.Pretty()) - pm.host.Peerstore().ClearAddrs(pid) - return nil -} - -// cluster peer addresses (NOT including ourselves) -func (pm *peerManager) addresses(peers []peer.ID) []ma.Multiaddr { - addrs := []ma.Multiaddr{} - if peers == nil { - return addrs - } - - for _, p := range peers { - if p == pm.host.ID() { - continue - } - peerAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(p))) - for _, a := range pm.host.Peerstore().Addrs(p) { - addrs = append(addrs, a.Encapsulate(peerAddr)) - } - } - return addrs -} - -func (pm *peerManager) importAddresses(addrs []ma.Multiaddr, connect bool) error { - for _, a := range addrs { - pm.addPeer(a, connect) - } - return nil -} diff --git a/peer_manager_test.go b/peer_manager_test.go index 4b623b47..cfa7e263 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -87,18 +87,21 @@ func TestClustersPeerAdd(t *testing.T) { t.Error("By now cluster peers should reflect all peers") } - // check that they are part of the configuration - // This only works because each peer only has one multiaddress - // (localhost) - if len(PeersFromMultiaddrs(c.config.Peers)) != nClusters-1 { - t.Error(c.config.Peers) - t.Errorf("%s: expected different cluster peers in the configuration", c.id) + // check that all peers are part of the peerstore + // (except ourselves) + addrs := c.peerManager.LoadPeerstore() + peerMap := make(map[peer.ID]struct{}) + for _, a := range addrs { + pid, _, err := api.Libp2pMultiaddrSplit(a) + if err != nil { + t.Fatal(err) + } + peerMap[pid] = struct{}{} } - for _, peer := range c.config.Peers { - if peer == nil { - t.Error("something went wrong adding peer to config") - } + if len(peerMap) != nClusters-1 { + t.Error(c.peerManager.LoadPeerstore()) + t.Errorf("%s: expected different cluster peers in the peerstore", c.id) } } runF(t, clusters, f) @@ -173,7 +176,6 @@ func TestClustersPeerRemove(t *testing.T) { } p := clusters[1].ID().ID - //t.Logf("remove %s from %s", p.Pretty(), clusters[0].config.ClusterPeers) err := clusters[0].PeerRemove(p) if err != nil { t.Error(err) @@ -187,18 +189,11 @@ func TestClustersPeerRemove(t *testing.T) { if ok { t.Error("removed peer should have exited") } - // if len(c.config.ClusterPeers) != 0 { - // t.Error("cluster peers should be empty") - // } } else { ids := c.Peers() if len(ids) != nClusters-1 { t.Error("should have removed 1 peer") } - // if len(c.config.ClusterPeers) != nClusters-1 { - // t.Log(c.config.ClusterPeers) - // t.Error("should have removed peer from config") - // } } } @@ -542,7 +537,7 @@ func TestClustersPeerRejoin(t *testing.T) { // Forget peer so we can re-add one in same address/port f := func(t *testing.T, c *Cluster) { - c.peerManager.rmPeer(clusters[0].id) + c.peerManager.RmPeer(clusters[0].id) } runF(t, clusters[1:], f) diff --git a/pstoremgr/pstoremgr.go b/pstoremgr/pstoremgr.go new file mode 100644 index 00000000..7fca948a --- /dev/null +++ b/pstoremgr/pstoremgr.go @@ -0,0 +1,231 @@ +// Package pstoremgr provides a Manager that simplifies handling +// addition, listing and removal of cluster peer multiaddresses from +// the libp2p Host. This includes resolving DNS addresses, decapsulating +// and encapsulating the /p2p/ (/ipfs/) protocol as needed, listing, saving +// and loading addresses. +package pstoremgr + +import ( + "bufio" + "context" + "fmt" + "os" + "sync" + "time" + + "github.com/ipfs/ipfs-cluster/api" + + logging "github.com/ipfs/go-log" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" + madns "github.com/multiformats/go-multiaddr-dns" +) + +var logger = logging.Logger("pstoremgr") + +// Timeouts for network operations triggered by the Manager +var ( + DNSTimeout = 2 * time.Second + ConnectTimeout = 10 * time.Second +) + +// Manager provides utilities for handling cluster peer addresses +// and storing them in a libp2p Host peerstore. +type Manager struct { + ctx context.Context + host host.Host + peerstoreLock sync.Mutex + peerstorePath string +} + +// New creates a Manager with the given libp2p Host and peerstorePath. +// The path indicates the place to persist and read peer addresses from. +// If empty, these operations (LoadPeerstore, SavePeerstore) will no-op. +func New(h host.Host, peerstorePath string) *Manager { + return &Manager{ + ctx: context.Background(), + host: h, + peerstorePath: peerstorePath, + } +} + +// ImportPeer adds a new peer address to the host's peerstore, optionally +// dialing to it. It will resolve any DNS multiaddresses before adding them. +// The address is expected to include the /ipfs/ protocol part. +func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool) error { + if pm.host == nil { + return nil + } + + logger.Debugf("adding peer address %s", addr) + pid, decapAddr, err := api.Libp2pMultiaddrSplit(addr) + if err != nil { + return err + } + pm.host.Peerstore().AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL) + + // dns multiaddresses need to be resolved because libp2p only does that + // on explicit bhost.Connect(). + if madns.Matches(addr) { + ctx, cancel := context.WithTimeout(pm.ctx, DNSTimeout) + defer cancel() + resolvedAddrs, err := madns.Resolve(ctx, addr) + if err != nil { + logger.Error(err) + return err + } + pm.ImportPeers(resolvedAddrs, connect) + } + if connect { + ctx, cancel := context.WithTimeout(pm.ctx, ConnectTimeout) + defer cancel() + pm.host.Network().DialPeer(ctx, pid) + } + return nil +} + +// RmPeer clear all addresses for a given peer ID from the host's peerstore. +func (pm *Manager) RmPeer(pid peer.ID) error { + if pm.host == nil { + return nil + } + + logger.Debugf("forgetting peer %s", pid.Pretty()) + pm.host.Peerstore().ClearAddrs(pid) + return nil +} + +// if the peer has dns addresses, return only those, otherwise +// return all. In all cases, encapsulate the peer ID. +func (pm *Manager) filteredPeerAddrs(p peer.ID) []ma.Multiaddr { + all := pm.host.Peerstore().Addrs(p) + peerAddrs := []ma.Multiaddr{} + peerDNSAddrs := []ma.Multiaddr{} + peerPart, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(p))) + + for _, a := range all { + encAddr := a.Encapsulate(peerPart) + if madns.Matches(encAddr) { + peerDNSAddrs = append(peerDNSAddrs, encAddr) + } else { + peerAddrs = append(peerAddrs, encAddr) + } + } + + if len(peerDNSAddrs) > 0 { + return peerDNSAddrs + } + + return peerAddrs +} + +// PeersAddresses returns the list of multiaddresses (encapsulating the +// /ipfs/ part) for the given set of peers. For peers for which +// we know DNS multiaddresses, we only return those. Otherwise, we return +// all the multiaddresses known for that peer. +func (pm *Manager) PeersAddresses(peers []peer.ID) []ma.Multiaddr { + if pm.host == nil { + return nil + } + + if peers == nil { + return nil + } + + var addrs []ma.Multiaddr + for _, p := range peers { + if p == pm.host.ID() { + continue + } + addrs = append(addrs, pm.filteredPeerAddrs(p)...) + } + return addrs +} + +// ImportPeers calls ImportPeer for every address in the given slice, using the +// given connect parameter. +func (pm *Manager) ImportPeers(addrs []ma.Multiaddr, connect bool) error { + for _, a := range addrs { + pm.ImportPeer(a, connect) + } + return nil +} + +// ImportPeersFromPeerstore reads the peerstore file and calls ImportPeers with +// the addresses obtained from it. +func (pm *Manager) ImportPeersFromPeerstore(connect bool) error { + return pm.ImportPeers(pm.LoadPeerstore(), connect) +} + +// LoadPeerstore parses the peerstore file and returns the list +// of addresses read from it. +func (pm *Manager) LoadPeerstore() (addrs []ma.Multiaddr) { + if pm.peerstorePath == "" { + return + } + pm.peerstoreLock.Lock() + defer pm.peerstoreLock.Unlock() + + f, err := os.Open(pm.peerstorePath) + if err != nil { + return // nothing to load + } + + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + addrStr := scanner.Text() + if addrStr[0] != '/' { + // skip anything that is not going to be a multiaddress + continue + } + addr, err := ma.NewMultiaddr(addrStr) + if err != nil { + logger.Error( + "error parsing multiaddress from %s: %s", + pm.peerstorePath, + err, + ) + } + addrs = append(addrs, addr) + } + if err := scanner.Err(); err != nil { + logger.Errorf("reading %s: %s", pm.peerstorePath, err) + } + return addrs +} + +// SavePeerstore stores a slice of multiaddresses in the peerstore file, one +// per line. +func (pm *Manager) SavePeerstore(addrs []ma.Multiaddr) { + if pm.peerstorePath == "" { + return + } + + pm.peerstoreLock.Lock() + defer pm.peerstoreLock.Unlock() + + f, err := os.Create(pm.peerstorePath) + if err != nil { + logger.Errorf( + "could not save peer addresses to %s: %s", + pm.peerstorePath, + err, + ) + return + } + defer f.Close() + + for _, a := range addrs { + f.Write([]byte(fmt.Sprintf("%s\n", a.String()))) + } +} + +// SavePeerstoreForPeers calls PeersAddresses and then saves the peerstore +// file using the result. +func (pm *Manager) SavePeerstoreForPeers(peers []peer.ID) { + pm.SavePeerstore(pm.PeersAddresses(peers)) +} diff --git a/pstoremgr/pstoremgr_test.go b/pstoremgr/pstoremgr_test.go new file mode 100644 index 00000000..40bd764b --- /dev/null +++ b/pstoremgr/pstoremgr_test.go @@ -0,0 +1,105 @@ +package pstoremgr + +import ( + "context" + "os" + "testing" + + "github.com/ipfs/ipfs-cluster/api" + + libp2p "github.com/libp2p/go-libp2p" + ma "github.com/multiformats/go-multiaddr" +) + +var pid = "QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc" + +func makeMgr(t *testing.T) *Manager { + h, err := libp2p.New(context.Background()) + if err != nil { + t.Fatal(err) + } + return New(h, "peerstore") +} + +func clean(pm *Manager) { + if path := pm.peerstorePath; path != "" { + os.RemoveAll(path) + } +} + +func TestManager(t *testing.T) { + pm := makeMgr(t) + defer clean(pm) + + testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid) + + err := pm.ImportPeer(testPeer, false) + if err != nil { + t.Fatal(err) + } + + peers := api.StringsToPeers([]string{pid, pm.host.ID().Pretty()}) + addrs := pm.PeersAddresses(peers) + if len(addrs) != 1 { + t.Fatal("expected 1 address") + } + + if !addrs[0].Equal(testPeer) { + t.Error("expected same address as added") + } + + pm.RmPeer(peers[0]) + addrs = pm.PeersAddresses(peers) + if len(addrs) != 0 { + t.Fatal("expected 0 addresses") + } +} + +func TestManagerDNS(t *testing.T) { + pm := makeMgr(t) + defer clean(pm) + + testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid) + testPeer2, _ := ma.NewMultiaddr("/dns4/localhost/tcp/1235/ipfs/" + pid) + + err := pm.ImportPeers([]ma.Multiaddr{testPeer, testPeer2}, false) + if err != nil { + t.Fatal(err) + } + + addrs := pm.PeersAddresses(api.StringsToPeers([]string{pid})) + if len(addrs) != 1 { + t.Fatal("expected 1 address") + } + + if !addrs[0].Equal(testPeer2) { + t.Error("expected only the dns address") + } +} + +func TestPeerstore(t *testing.T) { + pm := makeMgr(t) + defer clean(pm) + + testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid) + testPeer2, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235/ipfs/" + pid) + + err := pm.ImportPeers([]ma.Multiaddr{testPeer, testPeer2}, false) + if err != nil { + t.Fatal(err) + } + + pm.SavePeerstoreForPeers(api.StringsToPeers([]string{pid})) + + pm2 := makeMgr(t) + defer clean(pm2) + + err = pm2.ImportPeersFromPeerstore(false) + if err != nil { + t.Fatal(err) + } + + if len(pm2.PeersAddresses(api.StringsToPeers([]string{pid}))) != 2 { + t.Error("expected 2 addresses from the peerstore") + } +} diff --git a/rpc_api.go b/rpc_api.go index 4ad32b93..a8120c12 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -349,14 +349,14 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe // PeerManagerAddPeer runs peerManager.addPeer(). func (rpcapi *RPCAPI) PeerManagerAddPeer(ctx context.Context, in api.MultiaddrSerial, out *struct{}) error { addr := in.ToMultiaddr() - err := rpcapi.c.peerManager.addPeer(addr, false) + err := rpcapi.c.peerManager.ImportPeer(addr, false) return err } // PeerManagerImportAddresses runs peerManager.importAddresses(). func (rpcapi *RPCAPI) PeerManagerImportAddresses(ctx context.Context, in api.MultiaddrsSerial, out *struct{}) error { addrs := in.ToMultiaddrs() - err := rpcapi.c.peerManager.importAddresses(addrs, false) + err := rpcapi.c.peerManager.ImportPeers(addrs, false) return err } diff --git a/sharness/config/basic_auth/service.json b/sharness/config/basic_auth/service.json index 3d52d392..2f64d716 100644 --- a/sharness/config/basic_auth/service.json +++ b/sharness/config/basic_auth/service.json @@ -4,8 +4,6 @@ "peername": "testname", "private_key": "CAASqAkwggSkAgEAAoIBAQC/ZmfWDbwyI0nJdRxgHcTdEaBFQo8sky9E+OOvtwZa5WKoLdHyHOLWxCAdpIHUBbhxz5rkMEWLwPI6ykqLIJToMPO8lJbKVzphOjv4JwpiAPdmeSiYMKLjx5V8MpqU2rwj/Uf3sRL8Gg9/Tei3PZ8cftxN1rkQQeeaOtk0CBxUFZSHEsyut1fbgIeL7TAY+4vCmXW0DBr4wh9fnoES/YivOvSiN9rScgWg6N65LfkI78hzaOJ4Nok2S4vYFCxjTAI9NWFUbhP5eJIFzTU+bZuQZxOn2qsoyw8pNZwuF+JClA/RcgBcCvVZcDH2ueVq/zT++bGCN+EWsAEdvJqJ5bsjAgMBAAECggEAaGDUZ6t94mnUJ4UyQEh7v4OJP7wYkFqEAL0qjfzl/lPyBX1XbQ3Ltwul6AR6uMGV4JszARZCFwDWGLGRDWZrTmTDxyfRQ+9l6vfzFFVWGDQmtz+Dn9uGOWnyX5TJMDxJNec+hBmRHOKpaOd37dYxGz0jr19V9UO7piRJp1J1AHUCypUGv5x1IekioSCu5fEyc7dyWwnmITHBjD08st+bCcjrIUFeXSdJKC8SymYeXdaVE3xH3zVEISKnrfT7bhuKZY1iibZIlXbVLNpyX36LkYJOiCqsMum3u70LH0VvTypkqiDbD4S6qfJ4vvUakpmKpOPutikiP7jkSP+AkaO0AQKBgQDkTuhnDK6+Y0a/HgpHJisji0coO+g2gsIszargHk8nNY2AB8t+EUn7C+Qu8cmrem5V8EXcdxS6z7iAXpJmY1Xepnsz+JP7Q91Lgt3OoqK5EybzUXXKkmNCD65n70Xxn2fEFzm6+GJP3c/HymlDKU2KBCYIyuUeaREjT0Fu3v6tgQKBgQDWnXppJwn4LJHhzFOCeO4zomDJDbLTZCabdKZoFP9r+vtEHAnclDDKx4AYbomSqgERe+DX6HR/tPHRVizP63RYPf7al2mJmPzt1nTkoc1/q5hQoD+oE154dADsW1pUp7AQjwCtys4iq5S0qAwIDpuY8M8bOHwZ+QmBvHYAigJCowKBgQC3HH6TX/2rH463bE2MARXqXSPGJj45sigwrQfW1xhe9zm1LQtN4mn2mvP5nt1D1l82OA6gIzYSGtX8x10eF5/ggqAf78goZ6bOkHh76b8fNzgvQO97eGt5qYAVRjhP8azU/lfEGMEpE1s5/6LrRe41utwSg0C+YkBnlIKDfQDAgQKBgDoBTCF5hK9H1JHzuKpt5uubuo78ndWWnvyrNYKyEirsJddNwLiWcO2NqChyT8qNGkbQdX/Fex89F5KduPTlTYfAEc6g18xxxgK+UM+uj60vArbf6PSTb5gculcnha2VuPdwvx050Cb8uu9s7/uJfzKB+2f/B0O51ID1H+ubYWsDAoGBAKrwGKHyqFTHSPg3XuRA1FgDAoOsfzP9ZJvMEXUWyu/VxjNt+0mRlyGeZ5qb9UZG+K/In4FbC/ux2P/PucCUIbgy/XGPtPXVavMwNbx0MquAcU0FihKXP0CUpi8zwiYc42MF7n/SztQnismxigBMSuJEDurcXXazjfcSRTypduNn", "secret": "84399cd0be811c2ca372d6ca473ffd73c09034f991c5e306fe9ada6c5fcfb641", - "peers": [], - "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", diff --git a/sharness/config/ssl-basic_auth/service.json b/sharness/config/ssl-basic_auth/service.json index f4a8ae0d..bb51a93f 100644 --- a/sharness/config/ssl-basic_auth/service.json +++ b/sharness/config/ssl-basic_auth/service.json @@ -4,8 +4,6 @@ "peername": "testname", "private_key": "CAASqAkwggSkAgEAAoIBAQC/ZmfWDbwyI0nJdRxgHcTdEaBFQo8sky9E+OOvtwZa5WKoLdHyHOLWxCAdpIHUBbhxz5rkMEWLwPI6ykqLIJToMPO8lJbKVzphOjv4JwpiAPdmeSiYMKLjx5V8MpqU2rwj/Uf3sRL8Gg9/Tei3PZ8cftxN1rkQQeeaOtk0CBxUFZSHEsyut1fbgIeL7TAY+4vCmXW0DBr4wh9fnoES/YivOvSiN9rScgWg6N65LfkI78hzaOJ4Nok2S4vYFCxjTAI9NWFUbhP5eJIFzTU+bZuQZxOn2qsoyw8pNZwuF+JClA/RcgBcCvVZcDH2ueVq/zT++bGCN+EWsAEdvJqJ5bsjAgMBAAECggEAaGDUZ6t94mnUJ4UyQEh7v4OJP7wYkFqEAL0qjfzl/lPyBX1XbQ3Ltwul6AR6uMGV4JszARZCFwDWGLGRDWZrTmTDxyfRQ+9l6vfzFFVWGDQmtz+Dn9uGOWnyX5TJMDxJNec+hBmRHOKpaOd37dYxGz0jr19V9UO7piRJp1J1AHUCypUGv5x1IekioSCu5fEyc7dyWwnmITHBjD08st+bCcjrIUFeXSdJKC8SymYeXdaVE3xH3zVEISKnrfT7bhuKZY1iibZIlXbVLNpyX36LkYJOiCqsMum3u70LH0VvTypkqiDbD4S6qfJ4vvUakpmKpOPutikiP7jkSP+AkaO0AQKBgQDkTuhnDK6+Y0a/HgpHJisji0coO+g2gsIszargHk8nNY2AB8t+EUn7C+Qu8cmrem5V8EXcdxS6z7iAXpJmY1Xepnsz+JP7Q91Lgt3OoqK5EybzUXXKkmNCD65n70Xxn2fEFzm6+GJP3c/HymlDKU2KBCYIyuUeaREjT0Fu3v6tgQKBgQDWnXppJwn4LJHhzFOCeO4zomDJDbLTZCabdKZoFP9r+vtEHAnclDDKx4AYbomSqgERe+DX6HR/tPHRVizP63RYPf7al2mJmPzt1nTkoc1/q5hQoD+oE154dADsW1pUp7AQjwCtys4iq5S0qAwIDpuY8M8bOHwZ+QmBvHYAigJCowKBgQC3HH6TX/2rH463bE2MARXqXSPGJj45sigwrQfW1xhe9zm1LQtN4mn2mvP5nt1D1l82OA6gIzYSGtX8x10eF5/ggqAf78goZ6bOkHh76b8fNzgvQO97eGt5qYAVRjhP8azU/lfEGMEpE1s5/6LrRe41utwSg0C+YkBnlIKDfQDAgQKBgDoBTCF5hK9H1JHzuKpt5uubuo78ndWWnvyrNYKyEirsJddNwLiWcO2NqChyT8qNGkbQdX/Fex89F5KduPTlTYfAEc6g18xxxgK+UM+uj60vArbf6PSTb5gculcnha2VuPdwvx050Cb8uu9s7/uJfzKB+2f/B0O51ID1H+ubYWsDAoGBAKrwGKHyqFTHSPg3XuRA1FgDAoOsfzP9ZJvMEXUWyu/VxjNt+0mRlyGeZ5qb9UZG+K/In4FbC/ux2P/PucCUIbgy/XGPtPXVavMwNbx0MquAcU0FihKXP0CUpi8zwiYc42MF7n/SztQnismxigBMSuJEDurcXXazjfcSRTypduNn", "secret": "84399cd0be811c2ca372d6ca473ffd73c09034f991c5e306fe9ada6c5fcfb641", - "peers": [], - "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", diff --git a/sharness/config/ssl/service.json b/sharness/config/ssl/service.json index 933e54a6..4e2902b3 100644 --- a/sharness/config/ssl/service.json +++ b/sharness/config/ssl/service.json @@ -4,8 +4,6 @@ "peername": "testname", "private_key": "CAASqAkwggSkAgEAAoIBAQC/ZmfWDbwyI0nJdRxgHcTdEaBFQo8sky9E+OOvtwZa5WKoLdHyHOLWxCAdpIHUBbhxz5rkMEWLwPI6ykqLIJToMPO8lJbKVzphOjv4JwpiAPdmeSiYMKLjx5V8MpqU2rwj/Uf3sRL8Gg9/Tei3PZ8cftxN1rkQQeeaOtk0CBxUFZSHEsyut1fbgIeL7TAY+4vCmXW0DBr4wh9fnoES/YivOvSiN9rScgWg6N65LfkI78hzaOJ4Nok2S4vYFCxjTAI9NWFUbhP5eJIFzTU+bZuQZxOn2qsoyw8pNZwuF+JClA/RcgBcCvVZcDH2ueVq/zT++bGCN+EWsAEdvJqJ5bsjAgMBAAECggEAaGDUZ6t94mnUJ4UyQEh7v4OJP7wYkFqEAL0qjfzl/lPyBX1XbQ3Ltwul6AR6uMGV4JszARZCFwDWGLGRDWZrTmTDxyfRQ+9l6vfzFFVWGDQmtz+Dn9uGOWnyX5TJMDxJNec+hBmRHOKpaOd37dYxGz0jr19V9UO7piRJp1J1AHUCypUGv5x1IekioSCu5fEyc7dyWwnmITHBjD08st+bCcjrIUFeXSdJKC8SymYeXdaVE3xH3zVEISKnrfT7bhuKZY1iibZIlXbVLNpyX36LkYJOiCqsMum3u70LH0VvTypkqiDbD4S6qfJ4vvUakpmKpOPutikiP7jkSP+AkaO0AQKBgQDkTuhnDK6+Y0a/HgpHJisji0coO+g2gsIszargHk8nNY2AB8t+EUn7C+Qu8cmrem5V8EXcdxS6z7iAXpJmY1Xepnsz+JP7Q91Lgt3OoqK5EybzUXXKkmNCD65n70Xxn2fEFzm6+GJP3c/HymlDKU2KBCYIyuUeaREjT0Fu3v6tgQKBgQDWnXppJwn4LJHhzFOCeO4zomDJDbLTZCabdKZoFP9r+vtEHAnclDDKx4AYbomSqgERe+DX6HR/tPHRVizP63RYPf7al2mJmPzt1nTkoc1/q5hQoD+oE154dADsW1pUp7AQjwCtys4iq5S0qAwIDpuY8M8bOHwZ+QmBvHYAigJCowKBgQC3HH6TX/2rH463bE2MARXqXSPGJj45sigwrQfW1xhe9zm1LQtN4mn2mvP5nt1D1l82OA6gIzYSGtX8x10eF5/ggqAf78goZ6bOkHh76b8fNzgvQO97eGt5qYAVRjhP8azU/lfEGMEpE1s5/6LrRe41utwSg0C+YkBnlIKDfQDAgQKBgDoBTCF5hK9H1JHzuKpt5uubuo78ndWWnvyrNYKyEirsJddNwLiWcO2NqChyT8qNGkbQdX/Fex89F5KduPTlTYfAEc6g18xxxgK+UM+uj60vArbf6PSTb5gculcnha2VuPdwvx050Cb8uu9s7/uJfzKB+2f/B0O51ID1H+ubYWsDAoGBAKrwGKHyqFTHSPg3XuRA1FgDAoOsfzP9ZJvMEXUWyu/VxjNt+0mRlyGeZ5qb9UZG+K/In4FbC/ux2P/PucCUIbgy/XGPtPXVavMwNbx0MquAcU0FihKXP0CUpi8zwiYc42MF7n/SztQnismxigBMSuJEDurcXXazjfcSRTypduNn", "secret": "84399cd0be811c2ca372d6ca473ffd73c09034f991c5e306fe9ada6c5fcfb641", - "peers": [], - "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", diff --git a/sharness/lib/test-lib.sh b/sharness/lib/test-lib.sh index 8c4f18e9..89d99075 100755 --- a/sharness/lib/test-lib.sh +++ b/sharness/lib/test-lib.sh @@ -116,7 +116,7 @@ cluster_kill(){ } cluster_start(){ - ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 & + ipfs-cluster-service --config "test-config" daemon >"$IPFS_OUTPUT" 2>&1 & export CLUSTER_D_PID=$! while ! curl -s 'localhost:9095/api/v0/version' >/dev/null; do sleep 0.2