diff --git a/cluster.go b/cluster.go index 72e0078c..b76e7f34 100644 --- a/cluster.go +++ b/cluster.go @@ -165,7 +165,9 @@ func (c *Cluster) setupConsensus(consensuscfg *raft.Config) error { if len(c.config.Peers) > 0 { startPeers = peersFromMultiaddrs(c.config.Peers) } else { - startPeers = peersFromMultiaddrs(c.config.Bootstrap) + // start as single cluster before being added + // to the bootstrapper peers' cluster. + startPeers = []peer.ID{} } consensus, err := raft.NewConsensus( @@ -259,6 +261,11 @@ func (c *Cluster) broadcastMetric(m api.Metric) error { // push metrics loops and pushes metrics to the leader's monitor func (c *Cluster) pushInformerMetrics() { timer := time.NewTimer(0) // fire immediately first + // The following control how often to make and log + // a retry + retries := 0 + retryDelay := 500 * time.Millisecond + retryWarnMod := 60 for { select { case <-c.ctx.Done(): @@ -273,17 +280,19 @@ func (c *Cluster) pushInformerMetrics() { err := c.broadcastMetric(metric) if err != nil { - logger.Errorf("error broadcasting metric: %s", err) - // retry in half second - timer.Stop() - timer.Reset(500 * time.Millisecond) + if (retries % retryWarnMod) == 0 { + logger.Errorf("error broadcasting metric: %s", err) + retries++ + } + // retry in retryDelay + timer.Reset(retryDelay) continue } - timer.Stop() // no need to drain C if we are here + retries = 0 + // send metric again in TTL/2 timer.Reset(metric.GetTTL() / 2) } - logger.Debugf("Peer %s. Finished pushInformerMetrics", c.id) } func (c *Cluster) pushPingMetrics() { diff --git a/config_test.go b/config_test.go index 934bd681..2330a851 100644 --- a/config_test.go +++ b/config_test.go @@ -27,6 +27,10 @@ var testingClusterCfg = []byte(`{ var testingRaftCfg = []byte(`{ "data_folder": "raftFolderFromTests", + "wait_for_leader_timeout": "15s", + "commit_retries": 1, + "commit_retry_delay": "1s", + "network_timeout": "2s", "heartbeat_timeout": "1s", "election_timeout": "1s", "commit_timeout": "50ms", diff --git a/consensus/raft/config.go b/consensus/raft/config.go index f6cda3be..7827682f 100644 --- a/consensus/raft/config.go +++ b/consensus/raft/config.go @@ -8,15 +8,21 @@ import ( "github.com/ipfs/ipfs-cluster/config" - hashiraft "github.com/hashicorp/raft" + hraft "github.com/hashicorp/raft" ) // ConfigKey is the default configuration key for holding this component's // configuration section. var configKey = "raft" -// DefaultDataSubFolder is the default subfolder in which Raft's data is stored. -var DefaultDataSubFolder = "ipfs-cluster-data" +// Configuration defaults +var ( + DefaultDataSubFolder = "ipfs-cluster-data" + DefaultWaitForLeaderTimeout = 15 * time.Second + DefaultCommitRetries = 1 + DefaultNetworkTimeout = 10 * time.Second + DefaultCommitRetryDelay = 200 * time.Millisecond +) // Config allows to configure the Raft Consensus component for ipfs-cluster. // The component's configuration section is represented by ConfigJSON. @@ -25,9 +31,20 @@ type Config struct { config.Saver // A Hashicorp Raft's configuration object. - HashiraftCfg *hashiraft.Config + RaftConfig *hraft.Config // A folder to store Raft's data. DataFolder string + // LeaderTimeout specifies how long to wait for a leader before + // failing an operation. + WaitForLeaderTimeout time.Duration + // NetworkTimeout specifies how long before a Raft network + // operation is timed out + NetworkTimeout time.Duration + // CommitRetries specifies how many times we retry a failed commit until + // we give up. + CommitRetries int + // How long to wait between retries + CommitRetryDelay time.Duration } // ConfigJSON represents a human-friendly Config @@ -41,6 +58,18 @@ type jsonConfig struct { // the Raft. DataFolder string `json:"data_folder,omitempty"` + // How long to wait for a leader before failing + WaitForLeaderTimeout string `json:"wait_for_leader_timeout"` + + // How long to wait before timing out network operations + NetworkTimeout string `json:"network_timeout"` + + // How many retries to make upon a failed commit + CommitRetries int `json:"commit_retries"` + + // How long to wait between commit retries + CommitRetryDelay string `json:"commit_retry_delay"` + // HeartbeatTimeout specifies the time in follower state without // a leader before we attempt an election. HeartbeatTimeout string `json:"heartbeat_timeout,omitempty"` @@ -57,14 +86,6 @@ type jsonConfig struct { // to send at once. MaxAppendEntries int `json:"max_append_entries,omitempty"` - // If we are a member of a cluster, and RemovePeer is invoked for the - // local node, then we forget all peers and transition into the - // follower state. - // If ShutdownOnRemove is is set, we additional shutdown Raft. - // Otherwise, we can become a leader of a cluster containing - // only this node. - ShutdownOnRemove bool `json:"shutdown_on_remove,omitempty"` - // TrailingLogs controls how many logs we leave after a snapshot. TrailingLogs uint64 `json:"trailing_logs,omitempty"` @@ -100,11 +121,26 @@ func (cfg *Config) ConfigKey() string { // Validate checks that this configuration has working values, // at least in appereance. func (cfg *Config) Validate() error { - if cfg.HashiraftCfg == nil { + if cfg.RaftConfig == nil { return errors.New("No hashicorp/raft.Config") } + if cfg.WaitForLeaderTimeout <= 0 { + return errors.New("wait_for_leader_timeout <= 0") + } - return hashiraft.ValidateConfig(cfg.HashiraftCfg) + if cfg.NetworkTimeout <= 0 { + return errors.New("network_timeout <= 0") + } + + if cfg.CommitRetries < 0 { + return errors.New("commit_retries is invalid") + } + + if cfg.CommitRetryDelay <= 0 { + return errors.New("commit_retry_delay is invalid") + } + + return hraft.ValidateConfig(cfg.RaftConfig) } // LoadJSON parses a json-encoded configuration (see jsonConfig). @@ -118,81 +154,77 @@ func (cfg *Config) LoadJSON(raw []byte) error { return err } - cfg.setDefaults() + cfg.Default() - // Parse durations from strings - heartbeatTimeout, err := time.ParseDuration(jcfg.HeartbeatTimeout) - if err != nil && jcfg.HeartbeatTimeout != "" { - logger.Error("Error parsing heartbeat_timeout") - return err + parseDuration := func(txt string) time.Duration { + d, _ := time.ParseDuration(txt) + if txt != "" && d == 0 { + logger.Warningf("%s is not a valid duration. Default will be used", txt) + } + return d } - electionTimeout, err := time.ParseDuration(jcfg.ElectionTimeout) - if err != nil && jcfg.ElectionTimeout != "" { - logger.Error("Error parsing election_timeout") - return err - } - - commitTimeout, err := time.ParseDuration(jcfg.CommitTimeout) - if err != nil && jcfg.CommitTimeout != "" { - logger.Error("Error parsing commit_timeout") - return err - } - - snapshotInterval, err := time.ParseDuration(jcfg.SnapshotInterval) - if err != nil && jcfg.SnapshotInterval != "" { - logger.Error("Error parsing snapshot_interval") - return err - } - - leaderLeaseTimeout, err := time.ParseDuration(jcfg.LeaderLeaseTimeout) - if err != nil && jcfg.LeaderLeaseTimeout != "" { - logger.Error("Error parsing leader_lease_timeout") - return err - } + // Parse durations. We ignore errors as 0 will take Default values. + waitForLeaderTimeout := parseDuration(jcfg.WaitForLeaderTimeout) + networkTimeout := parseDuration(jcfg.NetworkTimeout) + commitRetryDelay := parseDuration(jcfg.CommitRetryDelay) + heartbeatTimeout := parseDuration(jcfg.HeartbeatTimeout) + electionTimeout := parseDuration(jcfg.ElectionTimeout) + commitTimeout := parseDuration(jcfg.CommitTimeout) + snapshotInterval := parseDuration(jcfg.SnapshotInterval) + leaderLeaseTimeout := parseDuration(jcfg.LeaderLeaseTimeout) + // Set all values in config. For some, take defaults if they are 0. // Set values from jcfg if they are not 0 values - config.SetIfNotDefault(jcfg.DataFolder, &cfg.DataFolder) - config.SetIfNotDefault(heartbeatTimeout, &cfg.HashiraftCfg.HeartbeatTimeout) - config.SetIfNotDefault(electionTimeout, &cfg.HashiraftCfg.ElectionTimeout) - config.SetIfNotDefault(commitTimeout, &cfg.HashiraftCfg.CommitTimeout) - config.SetIfNotDefault(jcfg.MaxAppendEntries, &cfg.HashiraftCfg.MaxAppendEntries) - config.SetIfNotDefault(jcfg.ShutdownOnRemove, &cfg.HashiraftCfg.ShutdownOnRemove) - config.SetIfNotDefault(jcfg.TrailingLogs, &cfg.HashiraftCfg.TrailingLogs) - config.SetIfNotDefault(snapshotInterval, &cfg.HashiraftCfg.SnapshotInterval) - config.SetIfNotDefault(jcfg.SnapshotThreshold, &cfg.HashiraftCfg.SnapshotThreshold) - config.SetIfNotDefault(leaderLeaseTimeout, &cfg.HashiraftCfg.LeaderLeaseTimeout) - return nil + // Own values + config.SetIfNotDefault(jcfg.DataFolder, &cfg.DataFolder) + config.SetIfNotDefault(waitForLeaderTimeout, &cfg.WaitForLeaderTimeout) + config.SetIfNotDefault(networkTimeout, &cfg.NetworkTimeout) + cfg.CommitRetries = jcfg.CommitRetries + config.SetIfNotDefault(commitRetryDelay, &cfg.CommitRetryDelay) + + // Raft values + config.SetIfNotDefault(heartbeatTimeout, &cfg.RaftConfig.HeartbeatTimeout) + config.SetIfNotDefault(electionTimeout, &cfg.RaftConfig.ElectionTimeout) + config.SetIfNotDefault(commitTimeout, &cfg.RaftConfig.CommitTimeout) + config.SetIfNotDefault(jcfg.MaxAppendEntries, &cfg.RaftConfig.MaxAppendEntries) + config.SetIfNotDefault(jcfg.TrailingLogs, &cfg.RaftConfig.TrailingLogs) + config.SetIfNotDefault(snapshotInterval, &cfg.RaftConfig.SnapshotInterval) + config.SetIfNotDefault(jcfg.SnapshotThreshold, &cfg.RaftConfig.SnapshotThreshold) + config.SetIfNotDefault(leaderLeaseTimeout, &cfg.RaftConfig.LeaderLeaseTimeout) + + return cfg.Validate() } // ToJSON returns the pretty JSON representation of a Config. func (cfg *Config) ToJSON() ([]byte, error) { jcfg := &jsonConfig{} jcfg.DataFolder = cfg.DataFolder - jcfg.HeartbeatTimeout = cfg.HashiraftCfg.HeartbeatTimeout.String() - jcfg.ElectionTimeout = cfg.HashiraftCfg.ElectionTimeout.String() - jcfg.CommitTimeout = cfg.HashiraftCfg.CommitTimeout.String() - jcfg.MaxAppendEntries = cfg.HashiraftCfg.MaxAppendEntries - jcfg.ShutdownOnRemove = cfg.HashiraftCfg.ShutdownOnRemove - jcfg.TrailingLogs = cfg.HashiraftCfg.TrailingLogs - jcfg.SnapshotInterval = cfg.HashiraftCfg.SnapshotInterval.String() - jcfg.SnapshotThreshold = cfg.HashiraftCfg.SnapshotThreshold - jcfg.LeaderLeaseTimeout = cfg.HashiraftCfg.LeaderLeaseTimeout.String() + jcfg.WaitForLeaderTimeout = cfg.WaitForLeaderTimeout.String() + jcfg.NetworkTimeout = cfg.NetworkTimeout.String() + jcfg.CommitRetries = cfg.CommitRetries + jcfg.CommitRetryDelay = cfg.CommitRetryDelay.String() + jcfg.HeartbeatTimeout = cfg.RaftConfig.HeartbeatTimeout.String() + jcfg.ElectionTimeout = cfg.RaftConfig.ElectionTimeout.String() + jcfg.CommitTimeout = cfg.RaftConfig.CommitTimeout.String() + jcfg.MaxAppendEntries = cfg.RaftConfig.MaxAppendEntries + jcfg.TrailingLogs = cfg.RaftConfig.TrailingLogs + jcfg.SnapshotInterval = cfg.RaftConfig.SnapshotInterval.String() + jcfg.SnapshotThreshold = cfg.RaftConfig.SnapshotThreshold + jcfg.LeaderLeaseTimeout = cfg.RaftConfig.LeaderLeaseTimeout.String() return config.DefaultJSONMarshal(jcfg) } // Default initializes this configuration with working defaults. func (cfg *Config) Default() error { - cfg.setDefaults() - return nil -} - -// most defaults come directly from hashiraft.DefaultConfig() -func (cfg *Config) setDefaults() { - cfg.DataFolder = "" - cfg.HashiraftCfg = hashiraft.DefaultConfig() + cfg.DataFolder = "" // empty so it gets ommitted + cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout + cfg.NetworkTimeout = DefaultNetworkTimeout + cfg.CommitRetries = DefaultCommitRetries + cfg.CommitRetryDelay = DefaultCommitRetryDelay + cfg.RaftConfig = hraft.DefaultConfig() // These options are imposed over any Default Raft Config. // Changing them causes cluster peers to show @@ -202,11 +234,11 @@ func (cfg *Config) setDefaults() { // does not work the way it is expected to. // i.e. ShutdownOnRemove will cause that no snapshot will be taken // when trying to shutdown a peer after removing it from a cluster. - cfg.HashiraftCfg.DisableBootstrapAfterElect = false - cfg.HashiraftCfg.EnableSingleNode = true - cfg.HashiraftCfg.ShutdownOnRemove = false + cfg.RaftConfig.ShutdownOnRemove = false + cfg.RaftConfig.LocalID = "will_be_set_automatically" // Set up logging - cfg.HashiraftCfg.LogOutput = ioutil.Discard - cfg.HashiraftCfg.Logger = raftStdLogger // see logging.go + cfg.RaftConfig.LogOutput = ioutil.Discard + cfg.RaftConfig.Logger = raftStdLogger // see logging.go + return nil } diff --git a/consensus/raft/config_test.go b/consensus/raft/config_test.go index cccc56f7..286bcf47 100644 --- a/consensus/raft/config_test.go +++ b/consensus/raft/config_test.go @@ -3,11 +3,15 @@ package raft import ( "encoding/json" "testing" + + hraft "github.com/hashicorp/raft" ) var cfgJSON = []byte(` { "heartbeat_timeout": "1s", + "commit_retries": 1, + "wait_for_leader_timeout": "15s", "election_timeout": "1s", "commit_timeout": "50ms", "max_append_entries": 64, @@ -27,47 +31,23 @@ func TestLoadJSON(t *testing.T) { j := &jsonConfig{} json.Unmarshal(cfgJSON, j) - j.HeartbeatTimeout = "-1" + j.HeartbeatTimeout = "1us" tst, _ := json.Marshal(j) err = cfg.LoadJSON(tst) if err == nil { t.Error("expected error decoding heartbeat_timeout") } - j = &jsonConfig{} - json.Unmarshal(cfgJSON, j) - j.ElectionTimeout = "abc" - tst, _ = json.Marshal(j) - err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error in timeout") - } - - j = &jsonConfig{} - json.Unmarshal(cfgJSON, j) - j.CommitTimeout = "abc" - tst, _ = json.Marshal(j) - err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error in timeout") - } - - j = &jsonConfig{} json.Unmarshal(cfgJSON, j) j.LeaderLeaseTimeout = "abc" tst, _ = json.Marshal(j) err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error in timeout") + if err != nil { + t.Fatal(err) } - - j = &jsonConfig{} - json.Unmarshal(cfgJSON, j) - j.SnapshotInterval = "abc" - tst, _ = json.Marshal(j) - err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error in snapshot_interval") + def := hraft.DefaultConfig() + if cfg.RaftConfig.LeaderLeaseTimeout != def.LeaderLeaseTimeout { + t.Error("expected default leader lease") } } @@ -92,13 +72,25 @@ func TestDefault(t *testing.T) { t.Fatal("error validating") } - cfg.HashiraftCfg.HeartbeatTimeout = 0 + cfg.RaftConfig.HeartbeatTimeout = 0 if cfg.Validate() == nil { t.Fatal("expected error validating") } cfg.Default() - cfg.HashiraftCfg = nil + cfg.RaftConfig = nil + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } + + cfg.Default() + cfg.CommitRetries = -1 + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } + + cfg.Default() + cfg.WaitForLeaderTimeout = 0 if cfg.Validate() == nil { t.Fatal("expected error validating") } diff --git a/consensus/raft/consensus.go b/consensus/raft/consensus.go index 3d2f34b4..e14d31eb 100644 --- a/consensus/raft/consensus.go +++ b/consensus/raft/consensus.go @@ -22,27 +22,20 @@ import ( var logger = logging.Logger("consensus") -// LeaderTimeout specifies how long to wait before failing an operation -// because there is no leader -var LeaderTimeout = 15 * time.Second - -// CommitRetries specifies how many times we retry a failed commit until -// we give up -var CommitRetries = 2 - // Consensus handles the work of keeping a shared-state between // the peers of an IPFS Cluster, as well as modifying that state and // applying any updates in a thread-safe manner. type Consensus struct { ctx context.Context cancel func() + config *Config host host.Host consensus consensus.OpLogConsensus actor consensus.Actor baseOp *LogOp - raft *Raft + raft *raftWrapper rpcClient *rpc.Client rpcReady chan struct{} @@ -67,8 +60,9 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta logger.Infof("starting Consensus and waiting for a leader...") consensus := libp2praft.NewOpLog(state, op) - raft, err := NewRaft(clusterPeers, host, cfg, consensus.FSM()) + raft, err := newRaftWrapper(clusterPeers, host, cfg, consensus.FSM()) if err != nil { + logger.Error("error creating raft: ", err) return nil, err } actor := libp2praft.NewActor(raft.raft) @@ -80,6 +74,7 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta cc := &Consensus{ ctx: ctx, cancel: cancel, + config: cfg, host: host, consensus: consensus, actor: actor, @@ -95,9 +90,11 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta // WaitForSync waits for a leader and for the state to be up to date, then returns. func (cc *Consensus) WaitForSync() error { - leaderCtx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout) + leaderCtx, cancel := context.WithTimeout( + cc.ctx, + cc.config.WaitForLeaderTimeout) defer cancel() - err := cc.raft.WaitForLeader(leaderCtx) + _, err := cc.raft.WaitForLeader(leaderCtx) if err != nil { return errors.New("error waiting for leader: " + err.Error()) } @@ -160,26 +157,15 @@ func (cc *Consensus) Shutdown() error { logger.Info("stopping Consensus component") - cc.cancel() - close(cc.rpcReady) - - // Raft shutdown - errMsgs := "" - err := cc.raft.Snapshot() + // Raft Shutdown + err := cc.raft.Shutdown() if err != nil { - errMsgs += err.Error() - } - err = cc.raft.Shutdown() - if err != nil { - errMsgs += err.Error() - } - - if errMsgs != "" { - errMsgs += "Consensus shutdown unsuccessful" - logger.Error(errMsgs) - return errors.New(errMsgs) + logger.Error(err) + return err } cc.shutdown = true + cc.cancel() + close(cc.rpcReady) return nil } @@ -214,176 +200,155 @@ func (cc *Consensus) op(argi interface{}, t LogOpType) *LogOp { } // returns true if the operation was redirected to the leader +// note that if the leader just dissappeared, the rpc call will +// fail because we haven't heard that it's gone. func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) { - leader, err := cc.Leader() - if err != nil { - rctx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout) - defer cancel() - err := cc.raft.WaitForLeader(rctx) - if err != nil { - return false, err - } - } - if leader == cc.host.ID() { - return false, nil - } - - err = cc.rpcClient.Call( - leader, - "Cluster", - method, - arg, - &struct{}{}) - return true, err -} - -func (cc *Consensus) logOpCid(rpcOp string, opType LogOpType, pin api.Pin) error { var finalErr error - for i := 0; i < CommitRetries; i++ { - logger.Debugf("Try %d", i) - redirected, err := cc.redirectToLeader( - rpcOp, pin.ToSerial()) + + // Retry redirects + for i := 0; i <= cc.config.CommitRetries; i++ { + logger.Debugf("redirect try %d", i) + leader, err := cc.Leader() + + // No leader, wait for one if err != nil { - finalErr = err + logger.Warningf("there seems to be no leader. Waiting for one") + rctx, cancel := context.WithTimeout( + cc.ctx, + cc.config.WaitForLeaderTimeout) + defer cancel() + pidstr, err := cc.raft.WaitForLeader(rctx) + + // means we timed out waiting for a leader + // we don't retry in this case + if err != nil { + return false, errors.New("timed out waiting for leader") + } + leader, err = peer.IDB58Decode(pidstr) + if err != nil { + return false, err + } + } + + // We are the leader. Do not redirect + if leader == cc.host.ID() { + return false, nil + } + + logger.Debugf("redirecting to leader: %s", leader) + finalErr = cc.rpcClient.Call( + leader, + "Cluster", + method, + arg, + &struct{}{}) + if finalErr != nil { + logger.Error(finalErr) + logger.Info("retrying to redirect request to leader") + time.Sleep(2 * cc.config.RaftConfig.HeartbeatTimeout) continue } - - if redirected { - return nil - } - - // It seems WE are the leader. - - op := cc.op(pin, opType) - _, err = cc.consensus.CommitOp(op) - if err != nil { - // This means the op did not make it to the log - finalErr = err - time.Sleep(200 * time.Millisecond) - continue - } - finalErr = nil break } - if finalErr != nil { - return finalErr - } - switch opType { - case LogOpPin: - logger.Infof("pin committed to global state: %s", pin.Cid) - case LogOpUnpin: - logger.Infof("unpin committed to global state: %s", pin.Cid) + // We tried to redirect, but something happened + return true, finalErr +} + +// commit submits a cc.consensus commit. It retries upon failures. +func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) error { + var finalErr error + for i := 0; i <= cc.config.CommitRetries; i++ { + logger.Debug("attempt #%d: committing %+v", i, op) + + // this means we are retrying + if finalErr != nil { + logger.Error("retrying upon failed commit (retry %d): ", + i, finalErr) + } + + // try to send it to the leader + // redirectToLeader has it's own retry loop. If this fails + // we're done here. + ok, err := cc.redirectToLeader(rpcOp, redirectArg) + if err != nil || ok { + return err + } + + // Being here means we are the LEADER. We can commit. + + // now commit the changes to our state + _, finalErr := cc.consensus.CommitOp(op) + if finalErr != nil { + goto RETRY + } + + // addPeer and rmPeer need to apply the change to Raft directly. + switch op.Type { + case LogOpAddPeer: + pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr()) + finalErr = cc.raft.AddPeer(pidstr) + if finalErr != nil { + goto RETRY + } + logger.Infof("peer committed to global state: %s", pidstr) + case LogOpRmPeer: + pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr()) + finalErr = cc.raft.RemovePeer(pidstr) + if finalErr != nil { + goto RETRY + } + logger.Infof("peer removed from global state: %s", pidstr) + } + break + + RETRY: + time.Sleep(cc.config.CommitRetryDelay) } - return nil + return finalErr } // LogPin submits a Cid to the shared state of the cluster. It will forward // the operation to the leader if this is not it. -func (cc *Consensus) LogPin(c api.Pin) error { - return cc.logOpCid("ConsensusLogPin", LogOpPin, c) +func (cc *Consensus) LogPin(pin api.Pin) error { + op := cc.op(pin, LogOpPin) + err := cc.commit(op, "ConsensusLogPin", pin.ToSerial()) + if err != nil { + return err + } + logger.Infof("pin committed to global state: %s", pin.Cid) + return nil } // LogUnpin removes a Cid from the shared state of the cluster. -func (cc *Consensus) LogUnpin(c api.Pin) error { - return cc.logOpCid("ConsensusLogUnpin", LogOpUnpin, c) +func (cc *Consensus) LogUnpin(pin api.Pin) error { + op := cc.op(pin, LogOpUnpin) + err := cc.commit(op, "ConsensusLogUnpin", pin.ToSerial()) + if err != nil { + return err + } + logger.Infof("unpin committed to global state: %s", pin.Cid) + return nil } // LogAddPeer submits a new peer to the shared state of the cluster. It will // forward the operation to the leader if this is not it. func (cc *Consensus) LogAddPeer(addr ma.Multiaddr) error { - var finalErr error - for i := 0; i < CommitRetries; i++ { - logger.Debugf("Try %d", i) - redirected, err := cc.redirectToLeader( - "ConsensusLogAddPeer", api.MultiaddrToSerial(addr)) - if err != nil { - finalErr = err - continue - } - - if redirected { - return nil - } - - // It seems WE are the leader. - pidStr, err := addr.ValueForProtocol(ma.P_IPFS) - if err != nil { - return err - } - pid, err := peer.IDB58Decode(pidStr) - if err != nil { - return err - } - - // Create pin operation for the log - op := cc.op(addr, LogOpAddPeer) - _, err = cc.consensus.CommitOp(op) - if err != nil { - // This means the op did not make it to the log - finalErr = err - time.Sleep(200 * time.Millisecond) - continue - } - err = cc.raft.AddPeer(peer.IDB58Encode(pid)) - if err != nil { - finalErr = err - continue - } - finalErr = nil - break - } - if finalErr != nil { - return finalErr - } - logger.Infof("peer committed to global state: %s", addr) - return nil + addrS := api.MultiaddrToSerial(addr) + op := cc.op(addr, LogOpAddPeer) + return cc.commit(op, "ConsensusLogAddPeer", addrS) } // LogRmPeer removes a peer from the shared state of the cluster. It will // forward the operation to the leader if this is not it. func (cc *Consensus) LogRmPeer(pid peer.ID) error { - var finalErr error - for i := 0; i < CommitRetries; i++ { - logger.Debugf("Try %d", i) - redirected, err := cc.redirectToLeader("ConsensusLogRmPeer", pid) - if err != nil { - finalErr = err - continue - } - - if redirected { - return nil - } - - // It seems WE are the leader. - - // Create pin operation for the log - addr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(pid)) - if err != nil { - return err - } - op := cc.op(addr, LogOpRmPeer) - _, err = cc.consensus.CommitOp(op) - if err != nil { - // This means the op did not make it to the log - finalErr = err - continue - } - err = cc.raft.RemovePeer(peer.IDB58Encode(pid)) - if err != nil { - finalErr = err - time.Sleep(200 * time.Millisecond) - continue - } - finalErr = nil - break + // Create rmPeer operation for the log + addr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(pid)) + if err != nil { + return err } - if finalErr != nil { - return finalErr - } - logger.Infof("peer removed from global state: %s", pid) - return nil + op := cc.op(addr, LogOpRmPeer) + return cc.commit(op, "ConsensusLogRmPeer", pid) } // State retrieves the current consensus State. It may error @@ -405,6 +370,7 @@ func (cc *Consensus) State() (state.State, error) { // Leader returns the peerID of the Leader of the // cluster. It returns an error when there is no leader. func (cc *Consensus) Leader() (peer.ID, error) { + // Note the hard-dependency on raft here... raftactor := cc.actor.(*libp2praft.Actor) return raftactor.Leader() } @@ -413,5 +379,16 @@ func (cc *Consensus) Leader() (peer.ID, error) { // state with the state provided. Only the consensus leader // can perform this operation. func (cc *Consensus) Rollback(state state.State) error { + // This is unused. It *might* be used for upgrades. + // There is rather untested magic in libp2p-raft's FSM() + // to make this possible. return cc.consensus.Rollback(state) } + +func parsePIDFromMultiaddr(addr ma.Multiaddr) string { + pidstr, err := addr.ValueForProtocol(ma.P_IPFS) + if err != nil { + panic("peer badly encoded") + } + return pidstr +} diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index fcbd7cf8..800a5fd8 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -60,7 +60,7 @@ func testingConsensus(t *testing.T, port int) *Consensus { if err != nil { t.Fatal("cannot create Consensus:", err) } - cc.SetClient(test.NewMockRPCClient(t)) + cc.SetClient(test.NewMockRPCClientWithHost(t, h)) <-cc.Ready() return cc } @@ -68,18 +68,23 @@ func testingConsensus(t *testing.T, port int) *Consensus { func TestShutdownConsensus(t *testing.T) { // Bring it up twice to make sure shutdown cleans up properly // but also to make sure raft comes up ok when re-initialized - defer cleanRaft(p2pPort) cc := testingConsensus(t, p2pPort) err := cc.Shutdown() if err != nil { t.Fatal("Consensus cannot shutdown:", err) } - cc.Shutdown() + err = cc.Shutdown() // should be fine to shutdown twice + if err != nil { + t.Fatal("Consensus should be able to shutdown several times") + } + cleanRaft(p2pPort) + cc = testingConsensus(t, p2pPort) err = cc.Shutdown() if err != nil { t.Fatal("Consensus cannot shutdown:", err) } + cleanRaft(p2pPort) } func TestConsensusPin(t *testing.T) { @@ -129,6 +134,7 @@ func TestConsensusLogAddPeer(t *testing.T) { addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPortAlt)) haddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", cc2.host.ID().Pretty())) + cc.host.Peerstore().AddAddr(cc2.host.ID(), addr, peerstore.TempAddrTTL) err := cc.LogAddPeer(addr.Encapsulate(haddr)) if err != nil { @@ -138,13 +144,44 @@ func TestConsensusLogAddPeer(t *testing.T) { func TestConsensusLogRmPeer(t *testing.T) { cc := testingConsensus(t, p2pPort) + cc2 := testingConsensus(t, p2pPortAlt) defer cleanRaft(p2pPort) + defer cleanRaft(p2pPortAlt) defer cc.Shutdown() + defer cc2.Shutdown() - err := cc.LogRmPeer(test.TestPeerID1) + addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPortAlt)) + haddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", cc2.host.ID().Pretty())) + cc.host.Peerstore().AddAddr(cc2.host.ID(), addr, peerstore.TempAddrTTL) + + err := cc.LogAddPeer(addr.Encapsulate(haddr)) + if err != nil { + t.Error("could not add peer:", err) + } + + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + cc.raft.WaitForLeader(ctx) + + c, _ := cid.Decode(test.TestCid1) + err = cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1}) + if err != nil { + t.Error("could not pin after adding peer:", err) + } + + time.Sleep(2 * time.Second) + + // Remove unexisting peer + err = cc.LogRmPeer(test.TestPeerID1) if err != nil { t.Error("the operation did not make it to the log:", err) } + + // Remove real peer. At least the leader can succeed + err = cc2.LogRmPeer(cc.host.ID()) + err2 := cc.LogRmPeer(cc2.host.ID()) + if err != nil && err2 != nil { + t.Error("could not remove peer:", err, err2) + } } func TestConsensusLeader(t *testing.T) { diff --git a/consensus/raft/log_op.go b/consensus/raft/log_op.go index a5e7de20..f2b0a967 100644 --- a/consensus/raft/log_op.go +++ b/consensus/raft/log_op.go @@ -10,7 +10,6 @@ import ( rpc "github.com/hsanjuan/go-libp2p-gorpc" consensus "github.com/libp2p/go-libp2p-consensus" peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" ) // Type of consensus operation @@ -46,8 +45,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { switch op.Type { case LogOpPin: - arg := op.Cid.ToPin() - err = state.Add(arg) + err = state.Add(op.Cid.ToPin()) if err != nil { goto ROLLBACK } @@ -55,12 +53,11 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { op.rpcClient.Go("", "Cluster", "Track", - arg.ToSerial(), + op.Cid, &struct{}{}, nil) case LogOpUnpin: - arg := op.Cid.ToPin() - err = state.Rm(arg.Cid) + err = state.Rm(op.Cid.ToPin().Cid) if err != nil { goto ROLLBACK } @@ -68,23 +65,18 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { op.rpcClient.Go("", "Cluster", "Untrack", - arg.ToSerial(), + op.Cid, &struct{}{}, nil) case LogOpAddPeer: - addr := op.Peer.ToMultiaddr() op.rpcClient.Call("", "Cluster", "PeerManagerAddPeer", - api.MultiaddrToSerial(addr), + op.Peer, &struct{}{}) // TODO rebalance ops case LogOpRmPeer: - addr := op.Peer.ToMultiaddr() - pidstr, err := addr.ValueForProtocol(ma.P_IPFS) - if err != nil { - panic("peer badly encoded") - } + pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr()) pid, err := peer.IDB58Decode(pidstr) if err != nil { panic("could not decode a PID we ourselves encoded") diff --git a/consensus/raft/logging.go b/consensus/raft/logging.go index 1c9a9cae..2db83c90 100644 --- a/consensus/raft/logging.go +++ b/consensus/raft/logging.go @@ -7,9 +7,23 @@ import ( logging "github.com/ipfs/go-log" ) +const ( + debug = iota + info + warn + err +) + // This provides a custom logger for Raft which intercepts Raft log messages // and rewrites us to our own logger (for "raft" facility). -type logForwarder struct{} +type logForwarder struct { + last map[int]*lastMsg +} + +type lastMsg struct { + msg string + tipped bool +} var raftStdLogger = log.New(&logForwarder{}, "", 0) var raftLogger = logging.Logger("raft") @@ -17,19 +31,61 @@ var raftLogger = logging.Logger("raft") // Write forwards to our go-log logger. // According to https://golang.org/pkg/log/#Logger.Output // it is called per line. -func (fw *logForwarder) Write(p []byte) (n int, err error) { +func (fw *logForwarder) Write(p []byte) (n int, e error) { t := strings.TrimSuffix(string(p), "\n") + switch { case strings.Contains(t, "[DEBUG]"): - raftLogger.Debug(strings.TrimPrefix(t, "[DEBUG] raft: ")) + if !fw.repeated(debug, t) { + fw.log(debug, strings.TrimPrefix(t, "[DEBUG] raft: ")) + } case strings.Contains(t, "[WARN]"): - raftLogger.Warning(strings.TrimPrefix(t, "[WARN] raft: ")) + if !fw.repeated(warn, t) { + fw.log(warn, strings.TrimPrefix(t, "[WARN] raft: ")) + } case strings.Contains(t, "[ERR]"): - raftLogger.Error(strings.TrimPrefix(t, "[ERR] raft: ")) + if !fw.repeated(err, t) { + fw.log(err, strings.TrimPrefix(t, "[ERR] raft: ")) + } case strings.Contains(t, "[INFO]"): - raftLogger.Info(strings.TrimPrefix(t, "[INFO] raft: ")) + if !fw.repeated(info, t) { + fw.log(info, strings.TrimPrefix(t, "[INFO] raft: ")) + } default: - raftLogger.Debug(t) + fw.log(debug, t) } return len(p), nil } + +func (fw *logForwarder) repeated(t int, msg string) bool { + if fw.last == nil { + fw.last = make(map[int]*lastMsg) + } + + last, ok := fw.last[t] + if !ok || last.msg != msg { + fw.last[t] = &lastMsg{msg, false} + return false + } else { + if !last.tipped { + fw.log(t, "NOTICE: The last RAFT log message repeats and will only be logged once") + last.tipped = true + } + return true + } +} + +func (fw *logForwarder) log(t int, msg string) { + switch t { + case debug: + raftLogger.Debug(msg) + case info: + raftLogger.Info(msg) + case warn: + raftLogger.Warning(msg) + case err: + raftLogger.Error(msg) + default: + raftLogger.Debug(msg) + } +} diff --git a/consensus/raft/raft.go b/consensus/raft/raft.go index f9ef4bbc..9f250eb9 100644 --- a/consensus/raft/raft.go +++ b/consensus/raft/raft.go @@ -5,173 +5,237 @@ import ( "errors" "os" "path/filepath" - "strings" "time" - hashiraft "github.com/hashicorp/raft" + hraft "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" - libp2praft "github.com/libp2p/go-libp2p-raft" + p2praft "github.com/libp2p/go-libp2p-raft" ) // RaftMaxSnapshots indicates how many snapshots to keep in the consensus data // folder. +// TODO: Maybe include this in Config. Not sure how useful it is to touch +// this anyways. var RaftMaxSnapshots = 5 -// is this running 64 bits arch? https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ +// Are we compiled on a 64-bit architecture? +// https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ +// This is used below because raft Observers panic on 32-bit. const sixtyfour = uint64(^uint(0)) == ^uint64(0) -// Raft performs all Raft-specific operations which are needed by Cluster but -// are not fulfilled by the consensus interface. It should contain most of the -// Raft-related stuff so it can be easily replaced in the future, if need be. -type Raft struct { - raft *hashiraft.Raft - transport *libp2praft.Libp2pTransport - snapshotStore hashiraft.SnapshotStore - logStore hashiraft.LogStore - stableStore hashiraft.StableStore - peerstore *libp2praft.Peerstore +// raftWrapper performs all Raft-specific operations which are needed by +// Cluster but are not fulfilled by the consensus interface. It should contain +// most of the Raft-related stuff so it can be easily replaced in the future, +// if need be. +type raftWrapper struct { + raft *hraft.Raft + srvConfig hraft.Configuration + transport *hraft.NetworkTransport + snapshotStore hraft.SnapshotStore + logStore hraft.LogStore + stableStore hraft.StableStore boltdb *raftboltdb.BoltStore - dataFolder string } -// NewRaft launches a go-libp2p-raft consensus peer. -func NewRaft(peers []peer.ID, host host.Host, cfg *Config, fsm hashiraft.FSM) (*Raft, error) { +// newRaft launches a go-libp2p-raft consensus peer. +func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM) (*raftWrapper, error) { + // Set correct LocalID + cfg.RaftConfig.LocalID = hraft.ServerID(peer.IDB58Encode(host.ID())) + + // Prepare data folder + dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder) + if err != nil { + return nil, err + } + srvCfg := makeServerConf(peers) + logger.Debug("creating libp2p Raft transport") - transport, err := libp2praft.NewLibp2pTransportWithHost(host) + transport, err := p2praft.NewLibp2pTransport(host, cfg.NetworkTimeout) if err != nil { - logger.Error("creating libp2p-raft transport: ", err) return nil, err } - pstore := &libp2praft.Peerstore{} - peersStr := make([]string, len(peers), len(peers)) - for i, p := range peers { - peersStr[i] = peer.IDB58Encode(p) - } - pstore.SetPeers(peersStr) - - logger.Debug("creating file snapshot store") - dataFolder := cfg.DataFolder - if dataFolder == "" { - dataFolder = filepath.Join(cfg.BaseDir, DefaultDataSubFolder) - } - - err = os.MkdirAll(dataFolder, 0700) + logger.Debug("creating raft snapshot store") + snapshots, err := hraft.NewFileSnapshotStoreWithLogger( + dataFolder, RaftMaxSnapshots, raftStdLogger) if err != nil { - logger.Errorf("creating cosensus data folder (%s): %s", - dataFolder, err) - return nil, err - } - snapshots, err := hashiraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, raftStdLogger) - if err != nil { - logger.Error("creating file snapshot store: ", err) return nil, err } logger.Debug("creating BoltDB log store") - logStore, err := raftboltdb.NewBoltStore(filepath.Join(dataFolder, "raft.db")) + logStore, err := raftboltdb.NewBoltStore( + filepath.Join(dataFolder, "raft.db")) if err != nil { - logger.Error("creating bolt store: ", err) return nil, err } + logger.Debug("checking for existing raft states") + hasState, err := hraft.HasExistingState(logStore, logStore, snapshots) + if err != nil { + return nil, err + } + if !hasState { + logger.Info("bootstrapping raft cluster") + err := hraft.BootstrapCluster(cfg.RaftConfig, + logStore, logStore, snapshots, transport, srvCfg) + if err != nil { + logger.Error("bootstrapping cluster: ", err) + return nil, err + } + } else { + logger.Info("raft cluster is already bootstrapped") + } + logger.Debug("creating Raft") - r, err := hashiraft.NewRaft(cfg.HashiraftCfg, fsm, logStore, logStore, snapshots, pstore, transport) + r, err := hraft.NewRaft(cfg.RaftConfig, + fsm, logStore, logStore, snapshots, transport) if err != nil { logger.Error("initializing raft: ", err) return nil, err } - raft := &Raft{ + raftW := &raftWrapper{ raft: r, + srvConfig: srvCfg, transport: transport, snapshotStore: snapshots, logStore: logStore, stableStore: logStore, - peerstore: pstore, boltdb: logStore, - dataFolder: dataFolder, } - return raft, nil + // Handle existing, different configuration + if hasState { + logger.Error("has state") + cf := r.GetConfiguration() + if err := cf.Error(); err != nil { + return nil, err + } + currentCfg := cf.Configuration() + added, removed := diffConfigurations(srvCfg, currentCfg) + if len(added)+len(removed) > 0 { + raftW.Shutdown() + logger.Error("Raft peers do not match cluster peers") + logger.Error("Aborting. Please clean this peer") + return nil, errors.New("Bad cluster peers") + } + } + + return raftW, nil +} + +// returns the folder path after creating it. +// if folder is empty, it uses baseDir+Default. +func makeDataFolder(baseDir, folder string) (string, error) { + if folder == "" { + folder = filepath.Join(baseDir, DefaultDataSubFolder) + } + + err := os.MkdirAll(folder, 0700) + if err != nil { + return "", err + } + return folder, nil +} + +// create Raft servers configuration +func makeServerConf(peers []peer.ID) hraft.Configuration { + sm := make(map[string]struct{}) + + servers := make([]hraft.Server, 0) + for _, pid := range peers { + p := peer.IDB58Encode(pid) + _, ok := sm[p] + if !ok { // avoid dups + sm[p] = struct{}{} + servers = append(servers, hraft.Server{ + Suffrage: hraft.Voter, + ID: hraft.ServerID(p), + Address: hraft.ServerAddress(p), + }) + } + } + return hraft.Configuration{ + Servers: servers, + } +} + +// diffConfigurations returns the serverIDs added and removed from +// c2 in relation to c1. +func diffConfigurations( + c1, c2 hraft.Configuration) (added, removed []hraft.ServerID) { + m1 := make(map[hraft.ServerID]struct{}) + m2 := make(map[hraft.ServerID]struct{}) + added = make([]hraft.ServerID, 0) + removed = make([]hraft.ServerID, 0) + for _, s := range c1.Servers { + m1[s.ID] = struct{}{} + } + for _, s := range c2.Servers { + m2[s.ID] = struct{}{} + } + for k, _ := range m1 { + _, ok := m2[k] + if !ok { + removed = append(removed, k) + } + } + for k, _ := range m2 { + _, ok := m1[k] + if !ok { + added = append(added, k) + } + } + return } // WaitForLeader holds until Raft says we have a leader. -// Returns an error if we don't. -func (r *Raft) WaitForLeader(ctx context.Context) error { - // Using Raft observers panics on non-64 architectures. - // This is a work around - if sixtyfour { - return r.waitForLeader(ctx) +// Returns uf ctx is cancelled. +func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) { + obsCh := make(chan hraft.Observation, 1) + if sixtyfour { // 32-bit systems don't support observers + observer := hraft.NewObserver(obsCh, false, nil) + rw.raft.RegisterObserver(observer) + defer rw.raft.DeregisterObserver(observer) } - return r.waitForLeaderLegacy(ctx) -} - -func (r *Raft) waitForLeader(ctx context.Context) error { - obsCh := make(chan hashiraft.Observation, 1) - filter := func(o *hashiraft.Observation) bool { - switch o.Data.(type) { - case hashiraft.LeaderObservation: - return true - default: - return false - } - } - observer := hashiraft.NewObserver(obsCh, false, filter) - r.raft.RegisterObserver(observer) - defer r.raft.DeregisterObserver(observer) - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(time.Second / 2) for { select { case obs := <-obsCh: - switch obs.Data.(type) { - case hashiraft.LeaderObservation: - leaderObs := obs.Data.(hashiraft.LeaderObservation) - logger.Infof("Raft Leader elected: %s", leaderObs.Leader) - return nil - } + _ = obs + // See https://github.com/hashicorp/raft/issues/254 + // switch obs.Data.(type) { + // case hraft.LeaderObservation: + // lObs := obs.Data.(hraft.LeaderObservation) + // logger.Infof("Raft Leader elected: %s", + // lObs.Leader) + // return string(lObs.Leader), nil + // } case <-ticker.C: - if l := r.raft.Leader(); l != "" { //we missed or there was no election + if l := rw.raft.Leader(); l != "" { logger.Debug("waitForleaderTimer") logger.Infof("Raft Leader elected: %s", l) ticker.Stop() - return nil + return string(l), nil } case <-ctx.Done(): - return ctx.Err() - } - } -} - -// 32-bit systems should use this. -func (r *Raft) waitForLeaderLegacy(ctx context.Context) error { - for { - leader := r.raft.Leader() - if leader != "" { - logger.Infof("Raft Leader elected: %s", leader) - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - time.Sleep(500 * time.Millisecond) + return "", ctx.Err() } } } // WaitForUpdates holds until Raft has synced to the last index in the log -func (r *Raft) WaitForUpdates(ctx context.Context) error { - logger.Debug("Raft state is catching up") +func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error { + logger.Info("Raft state is catching up") for { select { case <-ctx.Done(): return ctx.Err() default: - lai := r.raft.AppliedIndex() - li := r.raft.LastIndex() + lai := rw.raft.AppliedIndex() + li := rw.raft.LastIndex() logger.Debugf("current Raft index: %d/%d", lai, li) if lai == li { @@ -183,25 +247,25 @@ func (r *Raft) WaitForUpdates(ctx context.Context) error { } // Snapshot tells Raft to take a snapshot. -func (r *Raft) Snapshot() error { - future := r.raft.Snapshot() +func (rw *raftWrapper) Snapshot() error { + future := rw.raft.Snapshot() err := future.Error() - if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") { - return errors.New("could not take snapshot: " + err.Error()) + if err != nil && err.Error() != hraft.ErrNothingNewToSnapshot.Error() { + return err } return nil } // Shutdown shutdown Raft and closes the BoltDB. -func (r *Raft) Shutdown() error { - future := r.raft.Shutdown() +func (rw *raftWrapper) Shutdown() error { + future := rw.raft.Shutdown() err := future.Error() errMsgs := "" if err != nil { errMsgs += "could not shutdown raft: " + err.Error() + ".\n" } - err = r.boltdb.Close() // important! + err = rw.boltdb.Close() // important! if err != nil { errMsgs += "could not close boltdb: " + err.Error() } @@ -210,76 +274,88 @@ func (r *Raft) Shutdown() error { return errors.New(errMsgs) } - // If the shutdown worked correctly - // (including snapshot) we can remove the Raft - // database (which traces peers additions - // and removals). It makes re-start of the peer - // way less confusing for Raft while the state - // can be restored from the snapshot. - //os.Remove(filepath.Join(r.dataFolder, "raft.db")) return nil } // AddPeer adds a peer to Raft -func (r *Raft) AddPeer(peer string) error { - if r.hasPeer(peer) { - logger.Debug("skipping raft add as already in peer set") +func (rw *raftWrapper) AddPeer(peer string) error { + // Check that we don't have it to not waste + // log entries if so. + peers, err := rw.Peers() + if err != nil { + return err + } + if find(peers, peer) { + logger.Infof("%s is already a raft peer", peer) return nil } - future := r.raft.AddPeer(peer) - err := future.Error() + future := rw.raft.AddVoter( + hraft.ServerID(peer), + hraft.ServerAddress(peer), + 0, + 0) // TODO: Extra cfg value? + err = future.Error() if err != nil { logger.Error("raft cannot add peer: ", err) - return err } - peers, _ := r.peerstore.Peers() - logger.Debugf("raft peerstore: %s", peers) return err } // RemovePeer removes a peer from Raft -func (r *Raft) RemovePeer(peer string) error { - if !r.hasPeer(peer) { +func (rw *raftWrapper) RemovePeer(peer string) error { + // Check that we have it to not waste + // log entries if we don't. + peers, err := rw.Peers() + if err != nil { + return err + } + if !find(peers, peer) { + logger.Infof("%s is not among raft peers", peer) return nil } - future := r.raft.RemovePeer(peer) - err := future.Error() + if len(peers) == 1 && peers[0] == peer { + return errors.New("cannot remove ourselves from a 1-peer cluster") + } + + future := rw.raft.RemoveServer( + hraft.ServerID(peer), + 0, + 0) // TODO: Extra cfg value? + err = future.Error() if err != nil { logger.Error("raft cannot remove peer: ", err) - return err } - peers, _ := r.peerstore.Peers() - logger.Debugf("raft peerstore: %s", peers) return err } -// func (r *Raft) SetPeers(peers []string) error { -// logger.Debugf("SetPeers(): %s", peers) -// future := r.raft.SetPeers(peers) -// err := future.Error() -// if err != nil { -// logger.Error(err) -// } -// return err -// } - // Leader returns Raft's leader. It may be an empty string if // there is no leader or it is unknown. -func (r *Raft) Leader() string { - return r.raft.Leader() +func (rw *raftWrapper) Leader() string { + return string(rw.raft.Leader()) } -func (r *Raft) hasPeer(peer string) bool { - found := false - peers, _ := r.peerstore.Peers() - for _, p := range peers { - if p == peer { - found = true - break - } +func (rw *raftWrapper) Peers() ([]string, error) { + ids := make([]string, 0) + + configFuture := rw.raft.GetConfiguration() + if err := configFuture.Error(); err != nil { + return nil, err } - return found + for _, server := range configFuture.Configuration().Servers { + ids = append(ids, string(server.ID)) + } + + return ids, nil +} + +func find(s []string, elem string) bool { + for _, selem := range s { + if selem == elem { + return true + } + } + return false } diff --git a/package.json b/package.json index 882e83ab..9753b46d 100644 --- a/package.json +++ b/package.json @@ -15,21 +15,21 @@ }, { "author": "whyrusleeping", - "hash": "QmQA5mdxru8Bh6dpC9PJfSkumqnmHgJX7knxSgBo5Lpime", + "hash": "QmefgzMbKZYsmHFkLqxgaTBG9ypeEjrdWRD5WXH4j1cWDL", "name": "go-libp2p", - "version": "4.3.12" + "version": "5.0.5" }, { "author": "hsanjuan", - "hash": "QmSLvGe32QcBEDjqrC93pnZzB6gh4VZTi2wVLt9EyUTUWX", + "hash": "QmcjpDnmS6jGrYrTnuT4RDHUHduF97w2V8JuYs6eynbFg2", "name": "go-libp2p-raft", - "version": "1.0.6" + "version": "1.1.0" }, { "author": "whyrusleeping", - "hash": "QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD", + "hash": "QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ", "name": "go-cid", - "version": "0.7.7" + "version": "0.7.18" }, { "author": "urfave", @@ -39,9 +39,9 @@ }, { "author": "hashicorp", - "hash": "QmfNTPUT6FKg2wyPcqgn3tKuGJWnvu6QqwHoo56FgcR4c2", + "hash": "QmZa48BnsaEMVNf1hT2HYP2ak97fqyTnadXu6xSu2Y8xui", "name": "raft-boltdb", - "version": "2017.7.27" + "version": "2017.10.24" }, { "author": "gorilla", @@ -51,15 +51,15 @@ }, { "author": "hsanjuan", - "hash": "QmayPizdYNaSKGyFFxcjKf4ZkZ6kriQePqZkFwZQyvteDp", + "hash": "QmW2qYs8uJUSAsdGrWZo1LnzAwTQjPBRKk66X9H16EABMg", "name": "go-libp2p-gorpc", - "version": "1.0.2" + "version": "1.0.4" }, { "author": "libp2p", - "hash": "QmTJoXQ24GqDf9MqAUwf3vW38HG6ahE9S7GzZoRMEeE8Kc", + "hash": "QmcWmYQEQCrezztaQ81nXzMx2jaAEow17wdesDAjjR769r", "name": "go-libp2p-pnet", - "version": "2.2.4" + "version": "2.3.1" } ], "gxVersion": "0.11.0", diff --git a/peer_manager_test.go b/peer_manager_test.go index 97bd2cb2..fd78aa6e 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -135,7 +135,10 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { // Now we shutdown one member of the running cluster // and try to add someone else. - clusters[1].Shutdown() + err = clusters[1].Shutdown() + if err != nil { + t.Error("Shutdown should be clean: ", err) + } _, err = clusters[0].PeerAdd(clusterAddr(clusters[2])) if err == nil { @@ -194,9 +197,23 @@ func TestClusterPeerRemoveSelf(t *testing.T) { defer shutdownClusters(t, clusters, mocks) for i := 0; i < len(clusters); i++ { + peers := clusters[i].Peers() + t.Logf("Current cluster size: %d", len(peers)) + if len(peers) != (len(clusters) - i) { + t.Fatal("Previous peers not removed correctly") + } err := clusters[i].PeerRemove(clusters[i].ID().ID) + // Last peer member won't be able to remove itself + // In this case, we shut it down. if err != nil { - t.Error(err) + if i != len(clusters)-1 { //not last + t.Error(err) + } else { + err := clusters[i].Shutdown() + if err != nil { + t.Fatal(err) + } + } } time.Sleep(time.Second) _, more := <-clusters[i].Done() @@ -383,3 +400,71 @@ func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) { } runF(t, clusters, f2) } + +// Tests that a peer catches up on the state correctly after rejoining +func TestClustersPeerRejoin(t *testing.T) { + clusters, mocks := peerManagerClusters(t) + defer shutdownClusters(t, clusters, mocks) + + // pin something in c0 + pin1, _ := cid.Decode(test.TestCid1) + err := clusters[0].Pin(api.PinCid(pin1)) + if err != nil { + t.Fatal(err) + } + + // add all clusters + for i := 1; i < len(clusters); i++ { + addr := clusterAddr(clusters[i]) + _, err := clusters[0].PeerAdd(addr) + if err != nil { + t.Fatal(err) + } + } + + delay() + + // all added peers should have the content + for i := 1; i < len(clusters); i++ { + pinfo := clusters[i].tracker.Status(pin1) + if pinfo.Status != api.TrackerStatusPinned { + t.Error("Added peers should pin the content") + } + } + + clusters[0].Shutdown() + mocks[0].Close() + + //delay() + + // Pin something on the rest + pin2, _ := cid.Decode(test.TestCid2) + err = clusters[1].Pin(api.PinCid(pin2)) + if err != nil { + t.Fatal(err) + } + + delay() + + // Rejoin c0 + c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret) + clusters[0] = c0 + mocks[0] = m0 + addr := clusterAddr(c0) + _, err = clusters[1].PeerAdd(addr) + if err != nil { + t.Fatal(err) + } + + delay() + + pinfo := clusters[0].tracker.Status(pin2) + if pinfo.Status != api.TrackerStatusPinned { + t.Error("re-joined cluster should have caught up") + } + + pinfo = clusters[0].tracker.Status(pin1) + if pinfo.Status != api.TrackerStatusPinned { + t.Error("re-joined cluster should have original pin") + } +} diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 8db021d5..8d51c732 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -9,6 +9,7 @@ import ( rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" + host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" ) @@ -21,8 +22,14 @@ type mockService struct{} // NewMockRPCClient creates a mock ipfs-cluster RPC server and returns // a client to it. func NewMockRPCClient(t *testing.T) *rpc.Client { - s := rpc.NewServer(nil, "mock") - c := rpc.NewClientWithServer(nil, "mock", s) + return NewMockRPCClientWithHost(t, nil) +} + +// NewMockRPCClientWithHost returns a mock ipfs-cluster RPC server +// initialized with a given host. +func NewMockRPCClientWithHost(t *testing.T, h host.Host) *rpc.Client { + s := rpc.NewServer(h, "mock") + c := rpc.NewClientWithServer(h, "mock", s) err := s.RegisterName("Cluster", &mockService{}) if err != nil { t.Fatal(err) @@ -274,3 +281,11 @@ func (mock *mockService) IPFSFreeSpace(in struct{}, out *uint64) error { *out = 98000 return nil } + +func (mock *mockService) ConsensusLogAddPeer(in api.MultiaddrSerial, out *struct{}) error { + return errors.New("mock rpc cannot redirect") +} + +func (mock *mockService) ConsensusLogRmPeer(in peer.ID, out *struct{}) error { + return errors.New("mock rpc cannot redirect") +}