Fix #139: Update cluster to Raft 1.0.0

The main differences is that the new version of Raft is more strict
about starting raft peers which already contain configurations.

For a start, cluster will fail to start if the configured cluster
peers are different from the Raft peers. The user will have to
manually cleanup Raft (TODO: an ipfs-cluster-service command for it).

Additionally, this commit adds extra options to the consensus/raft
configuration section, adds tests and improves existing ones and
improves certain code sections.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-10-23 13:46:37 +02:00
parent 71a37e8303
commit 848023e381
12 changed files with 753 additions and 478 deletions

View File

@ -165,7 +165,9 @@ func (c *Cluster) setupConsensus(consensuscfg *raft.Config) error {
if len(c.config.Peers) > 0 { if len(c.config.Peers) > 0 {
startPeers = peersFromMultiaddrs(c.config.Peers) startPeers = peersFromMultiaddrs(c.config.Peers)
} else { } else {
startPeers = peersFromMultiaddrs(c.config.Bootstrap) // start as single cluster before being added
// to the bootstrapper peers' cluster.
startPeers = []peer.ID{}
} }
consensus, err := raft.NewConsensus( consensus, err := raft.NewConsensus(
@ -259,6 +261,11 @@ func (c *Cluster) broadcastMetric(m api.Metric) error {
// push metrics loops and pushes metrics to the leader's monitor // push metrics loops and pushes metrics to the leader's monitor
func (c *Cluster) pushInformerMetrics() { func (c *Cluster) pushInformerMetrics() {
timer := time.NewTimer(0) // fire immediately first timer := time.NewTimer(0) // fire immediately first
// The following control how often to make and log
// a retry
retries := 0
retryDelay := 500 * time.Millisecond
retryWarnMod := 60
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
@ -273,17 +280,19 @@ func (c *Cluster) pushInformerMetrics() {
err := c.broadcastMetric(metric) err := c.broadcastMetric(metric)
if err != nil { if err != nil {
logger.Errorf("error broadcasting metric: %s", err) if (retries % retryWarnMod) == 0 {
// retry in half second logger.Errorf("error broadcasting metric: %s", err)
timer.Stop() retries++
timer.Reset(500 * time.Millisecond) }
// retry in retryDelay
timer.Reset(retryDelay)
continue continue
} }
timer.Stop() // no need to drain C if we are here retries = 0
// send metric again in TTL/2
timer.Reset(metric.GetTTL() / 2) timer.Reset(metric.GetTTL() / 2)
} }
logger.Debugf("Peer %s. Finished pushInformerMetrics", c.id)
} }
func (c *Cluster) pushPingMetrics() { func (c *Cluster) pushPingMetrics() {

View File

@ -27,6 +27,10 @@ var testingClusterCfg = []byte(`{
var testingRaftCfg = []byte(`{ var testingRaftCfg = []byte(`{
"data_folder": "raftFolderFromTests", "data_folder": "raftFolderFromTests",
"wait_for_leader_timeout": "15s",
"commit_retries": 1,
"commit_retry_delay": "1s",
"network_timeout": "2s",
"heartbeat_timeout": "1s", "heartbeat_timeout": "1s",
"election_timeout": "1s", "election_timeout": "1s",
"commit_timeout": "50ms", "commit_timeout": "50ms",

View File

@ -8,15 +8,21 @@ import (
"github.com/ipfs/ipfs-cluster/config" "github.com/ipfs/ipfs-cluster/config"
hashiraft "github.com/hashicorp/raft" hraft "github.com/hashicorp/raft"
) )
// ConfigKey is the default configuration key for holding this component's // ConfigKey is the default configuration key for holding this component's
// configuration section. // configuration section.
var configKey = "raft" var configKey = "raft"
// DefaultDataSubFolder is the default subfolder in which Raft's data is stored. // Configuration defaults
var DefaultDataSubFolder = "ipfs-cluster-data" var (
DefaultDataSubFolder = "ipfs-cluster-data"
DefaultWaitForLeaderTimeout = 15 * time.Second
DefaultCommitRetries = 1
DefaultNetworkTimeout = 10 * time.Second
DefaultCommitRetryDelay = 200 * time.Millisecond
)
// Config allows to configure the Raft Consensus component for ipfs-cluster. // Config allows to configure the Raft Consensus component for ipfs-cluster.
// The component's configuration section is represented by ConfigJSON. // The component's configuration section is represented by ConfigJSON.
@ -25,9 +31,20 @@ type Config struct {
config.Saver config.Saver
// A Hashicorp Raft's configuration object. // A Hashicorp Raft's configuration object.
HashiraftCfg *hashiraft.Config RaftConfig *hraft.Config
// A folder to store Raft's data. // A folder to store Raft's data.
DataFolder string DataFolder string
// LeaderTimeout specifies how long to wait for a leader before
// failing an operation.
WaitForLeaderTimeout time.Duration
// NetworkTimeout specifies how long before a Raft network
// operation is timed out
NetworkTimeout time.Duration
// CommitRetries specifies how many times we retry a failed commit until
// we give up.
CommitRetries int
// How long to wait between retries
CommitRetryDelay time.Duration
} }
// ConfigJSON represents a human-friendly Config // ConfigJSON represents a human-friendly Config
@ -41,6 +58,18 @@ type jsonConfig struct {
// the Raft. // the Raft.
DataFolder string `json:"data_folder,omitempty"` DataFolder string `json:"data_folder,omitempty"`
// How long to wait for a leader before failing
WaitForLeaderTimeout string `json:"wait_for_leader_timeout"`
// How long to wait before timing out network operations
NetworkTimeout string `json:"network_timeout"`
// How many retries to make upon a failed commit
CommitRetries int `json:"commit_retries"`
// How long to wait between commit retries
CommitRetryDelay string `json:"commit_retry_delay"`
// HeartbeatTimeout specifies the time in follower state without // HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election. // a leader before we attempt an election.
HeartbeatTimeout string `json:"heartbeat_timeout,omitempty"` HeartbeatTimeout string `json:"heartbeat_timeout,omitempty"`
@ -57,14 +86,6 @@ type jsonConfig struct {
// to send at once. // to send at once.
MaxAppendEntries int `json:"max_append_entries,omitempty"` MaxAppendEntries int `json:"max_append_entries,omitempty"`
// If we are a member of a cluster, and RemovePeer is invoked for the
// local node, then we forget all peers and transition into the
// follower state.
// If ShutdownOnRemove is is set, we additional shutdown Raft.
// Otherwise, we can become a leader of a cluster containing
// only this node.
ShutdownOnRemove bool `json:"shutdown_on_remove,omitempty"`
// TrailingLogs controls how many logs we leave after a snapshot. // TrailingLogs controls how many logs we leave after a snapshot.
TrailingLogs uint64 `json:"trailing_logs,omitempty"` TrailingLogs uint64 `json:"trailing_logs,omitempty"`
@ -100,11 +121,26 @@ func (cfg *Config) ConfigKey() string {
// Validate checks that this configuration has working values, // Validate checks that this configuration has working values,
// at least in appereance. // at least in appereance.
func (cfg *Config) Validate() error { func (cfg *Config) Validate() error {
if cfg.HashiraftCfg == nil { if cfg.RaftConfig == nil {
return errors.New("No hashicorp/raft.Config") return errors.New("No hashicorp/raft.Config")
} }
if cfg.WaitForLeaderTimeout <= 0 {
return errors.New("wait_for_leader_timeout <= 0")
}
return hashiraft.ValidateConfig(cfg.HashiraftCfg) if cfg.NetworkTimeout <= 0 {
return errors.New("network_timeout <= 0")
}
if cfg.CommitRetries < 0 {
return errors.New("commit_retries is invalid")
}
if cfg.CommitRetryDelay <= 0 {
return errors.New("commit_retry_delay is invalid")
}
return hraft.ValidateConfig(cfg.RaftConfig)
} }
// LoadJSON parses a json-encoded configuration (see jsonConfig). // LoadJSON parses a json-encoded configuration (see jsonConfig).
@ -118,81 +154,77 @@ func (cfg *Config) LoadJSON(raw []byte) error {
return err return err
} }
cfg.setDefaults() cfg.Default()
// Parse durations from strings parseDuration := func(txt string) time.Duration {
heartbeatTimeout, err := time.ParseDuration(jcfg.HeartbeatTimeout) d, _ := time.ParseDuration(txt)
if err != nil && jcfg.HeartbeatTimeout != "" { if txt != "" && d == 0 {
logger.Error("Error parsing heartbeat_timeout") logger.Warningf("%s is not a valid duration. Default will be used", txt)
return err }
return d
} }
electionTimeout, err := time.ParseDuration(jcfg.ElectionTimeout) // Parse durations. We ignore errors as 0 will take Default values.
if err != nil && jcfg.ElectionTimeout != "" { waitForLeaderTimeout := parseDuration(jcfg.WaitForLeaderTimeout)
logger.Error("Error parsing election_timeout") networkTimeout := parseDuration(jcfg.NetworkTimeout)
return err commitRetryDelay := parseDuration(jcfg.CommitRetryDelay)
} heartbeatTimeout := parseDuration(jcfg.HeartbeatTimeout)
electionTimeout := parseDuration(jcfg.ElectionTimeout)
commitTimeout, err := time.ParseDuration(jcfg.CommitTimeout) commitTimeout := parseDuration(jcfg.CommitTimeout)
if err != nil && jcfg.CommitTimeout != "" { snapshotInterval := parseDuration(jcfg.SnapshotInterval)
logger.Error("Error parsing commit_timeout") leaderLeaseTimeout := parseDuration(jcfg.LeaderLeaseTimeout)
return err
}
snapshotInterval, err := time.ParseDuration(jcfg.SnapshotInterval)
if err != nil && jcfg.SnapshotInterval != "" {
logger.Error("Error parsing snapshot_interval")
return err
}
leaderLeaseTimeout, err := time.ParseDuration(jcfg.LeaderLeaseTimeout)
if err != nil && jcfg.LeaderLeaseTimeout != "" {
logger.Error("Error parsing leader_lease_timeout")
return err
}
// Set all values in config. For some, take defaults if they are 0.
// Set values from jcfg if they are not 0 values // Set values from jcfg if they are not 0 values
config.SetIfNotDefault(jcfg.DataFolder, &cfg.DataFolder)
config.SetIfNotDefault(heartbeatTimeout, &cfg.HashiraftCfg.HeartbeatTimeout)
config.SetIfNotDefault(electionTimeout, &cfg.HashiraftCfg.ElectionTimeout)
config.SetIfNotDefault(commitTimeout, &cfg.HashiraftCfg.CommitTimeout)
config.SetIfNotDefault(jcfg.MaxAppendEntries, &cfg.HashiraftCfg.MaxAppendEntries)
config.SetIfNotDefault(jcfg.ShutdownOnRemove, &cfg.HashiraftCfg.ShutdownOnRemove)
config.SetIfNotDefault(jcfg.TrailingLogs, &cfg.HashiraftCfg.TrailingLogs)
config.SetIfNotDefault(snapshotInterval, &cfg.HashiraftCfg.SnapshotInterval)
config.SetIfNotDefault(jcfg.SnapshotThreshold, &cfg.HashiraftCfg.SnapshotThreshold)
config.SetIfNotDefault(leaderLeaseTimeout, &cfg.HashiraftCfg.LeaderLeaseTimeout)
return nil // Own values
config.SetIfNotDefault(jcfg.DataFolder, &cfg.DataFolder)
config.SetIfNotDefault(waitForLeaderTimeout, &cfg.WaitForLeaderTimeout)
config.SetIfNotDefault(networkTimeout, &cfg.NetworkTimeout)
cfg.CommitRetries = jcfg.CommitRetries
config.SetIfNotDefault(commitRetryDelay, &cfg.CommitRetryDelay)
// Raft values
config.SetIfNotDefault(heartbeatTimeout, &cfg.RaftConfig.HeartbeatTimeout)
config.SetIfNotDefault(electionTimeout, &cfg.RaftConfig.ElectionTimeout)
config.SetIfNotDefault(commitTimeout, &cfg.RaftConfig.CommitTimeout)
config.SetIfNotDefault(jcfg.MaxAppendEntries, &cfg.RaftConfig.MaxAppendEntries)
config.SetIfNotDefault(jcfg.TrailingLogs, &cfg.RaftConfig.TrailingLogs)
config.SetIfNotDefault(snapshotInterval, &cfg.RaftConfig.SnapshotInterval)
config.SetIfNotDefault(jcfg.SnapshotThreshold, &cfg.RaftConfig.SnapshotThreshold)
config.SetIfNotDefault(leaderLeaseTimeout, &cfg.RaftConfig.LeaderLeaseTimeout)
return cfg.Validate()
} }
// ToJSON returns the pretty JSON representation of a Config. // ToJSON returns the pretty JSON representation of a Config.
func (cfg *Config) ToJSON() ([]byte, error) { func (cfg *Config) ToJSON() ([]byte, error) {
jcfg := &jsonConfig{} jcfg := &jsonConfig{}
jcfg.DataFolder = cfg.DataFolder jcfg.DataFolder = cfg.DataFolder
jcfg.HeartbeatTimeout = cfg.HashiraftCfg.HeartbeatTimeout.String() jcfg.WaitForLeaderTimeout = cfg.WaitForLeaderTimeout.String()
jcfg.ElectionTimeout = cfg.HashiraftCfg.ElectionTimeout.String() jcfg.NetworkTimeout = cfg.NetworkTimeout.String()
jcfg.CommitTimeout = cfg.HashiraftCfg.CommitTimeout.String() jcfg.CommitRetries = cfg.CommitRetries
jcfg.MaxAppendEntries = cfg.HashiraftCfg.MaxAppendEntries jcfg.CommitRetryDelay = cfg.CommitRetryDelay.String()
jcfg.ShutdownOnRemove = cfg.HashiraftCfg.ShutdownOnRemove jcfg.HeartbeatTimeout = cfg.RaftConfig.HeartbeatTimeout.String()
jcfg.TrailingLogs = cfg.HashiraftCfg.TrailingLogs jcfg.ElectionTimeout = cfg.RaftConfig.ElectionTimeout.String()
jcfg.SnapshotInterval = cfg.HashiraftCfg.SnapshotInterval.String() jcfg.CommitTimeout = cfg.RaftConfig.CommitTimeout.String()
jcfg.SnapshotThreshold = cfg.HashiraftCfg.SnapshotThreshold jcfg.MaxAppendEntries = cfg.RaftConfig.MaxAppendEntries
jcfg.LeaderLeaseTimeout = cfg.HashiraftCfg.LeaderLeaseTimeout.String() jcfg.TrailingLogs = cfg.RaftConfig.TrailingLogs
jcfg.SnapshotInterval = cfg.RaftConfig.SnapshotInterval.String()
jcfg.SnapshotThreshold = cfg.RaftConfig.SnapshotThreshold
jcfg.LeaderLeaseTimeout = cfg.RaftConfig.LeaderLeaseTimeout.String()
return config.DefaultJSONMarshal(jcfg) return config.DefaultJSONMarshal(jcfg)
} }
// Default initializes this configuration with working defaults. // Default initializes this configuration with working defaults.
func (cfg *Config) Default() error { func (cfg *Config) Default() error {
cfg.setDefaults() cfg.DataFolder = "" // empty so it gets ommitted
return nil cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout
} cfg.NetworkTimeout = DefaultNetworkTimeout
cfg.CommitRetries = DefaultCommitRetries
// most defaults come directly from hashiraft.DefaultConfig() cfg.CommitRetryDelay = DefaultCommitRetryDelay
func (cfg *Config) setDefaults() { cfg.RaftConfig = hraft.DefaultConfig()
cfg.DataFolder = ""
cfg.HashiraftCfg = hashiraft.DefaultConfig()
// These options are imposed over any Default Raft Config. // These options are imposed over any Default Raft Config.
// Changing them causes cluster peers to show // Changing them causes cluster peers to show
@ -202,11 +234,11 @@ func (cfg *Config) setDefaults() {
// does not work the way it is expected to. // does not work the way it is expected to.
// i.e. ShutdownOnRemove will cause that no snapshot will be taken // i.e. ShutdownOnRemove will cause that no snapshot will be taken
// when trying to shutdown a peer after removing it from a cluster. // when trying to shutdown a peer after removing it from a cluster.
cfg.HashiraftCfg.DisableBootstrapAfterElect = false cfg.RaftConfig.ShutdownOnRemove = false
cfg.HashiraftCfg.EnableSingleNode = true cfg.RaftConfig.LocalID = "will_be_set_automatically"
cfg.HashiraftCfg.ShutdownOnRemove = false
// Set up logging // Set up logging
cfg.HashiraftCfg.LogOutput = ioutil.Discard cfg.RaftConfig.LogOutput = ioutil.Discard
cfg.HashiraftCfg.Logger = raftStdLogger // see logging.go cfg.RaftConfig.Logger = raftStdLogger // see logging.go
return nil
} }

View File

@ -3,11 +3,15 @@ package raft
import ( import (
"encoding/json" "encoding/json"
"testing" "testing"
hraft "github.com/hashicorp/raft"
) )
var cfgJSON = []byte(` var cfgJSON = []byte(`
{ {
"heartbeat_timeout": "1s", "heartbeat_timeout": "1s",
"commit_retries": 1,
"wait_for_leader_timeout": "15s",
"election_timeout": "1s", "election_timeout": "1s",
"commit_timeout": "50ms", "commit_timeout": "50ms",
"max_append_entries": 64, "max_append_entries": 64,
@ -27,47 +31,23 @@ func TestLoadJSON(t *testing.T) {
j := &jsonConfig{} j := &jsonConfig{}
json.Unmarshal(cfgJSON, j) json.Unmarshal(cfgJSON, j)
j.HeartbeatTimeout = "-1" j.HeartbeatTimeout = "1us"
tst, _ := json.Marshal(j) tst, _ := json.Marshal(j)
err = cfg.LoadJSON(tst) err = cfg.LoadJSON(tst)
if err == nil { if err == nil {
t.Error("expected error decoding heartbeat_timeout") t.Error("expected error decoding heartbeat_timeout")
} }
j = &jsonConfig{}
json.Unmarshal(cfgJSON, j)
j.ElectionTimeout = "abc"
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error in timeout")
}
j = &jsonConfig{}
json.Unmarshal(cfgJSON, j)
j.CommitTimeout = "abc"
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error in timeout")
}
j = &jsonConfig{}
json.Unmarshal(cfgJSON, j) json.Unmarshal(cfgJSON, j)
j.LeaderLeaseTimeout = "abc" j.LeaderLeaseTimeout = "abc"
tst, _ = json.Marshal(j) tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst) err = cfg.LoadJSON(tst)
if err == nil { if err != nil {
t.Error("expected error in timeout") t.Fatal(err)
} }
def := hraft.DefaultConfig()
j = &jsonConfig{} if cfg.RaftConfig.LeaderLeaseTimeout != def.LeaderLeaseTimeout {
json.Unmarshal(cfgJSON, j) t.Error("expected default leader lease")
j.SnapshotInterval = "abc"
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error in snapshot_interval")
} }
} }
@ -92,13 +72,25 @@ func TestDefault(t *testing.T) {
t.Fatal("error validating") t.Fatal("error validating")
} }
cfg.HashiraftCfg.HeartbeatTimeout = 0 cfg.RaftConfig.HeartbeatTimeout = 0
if cfg.Validate() == nil { if cfg.Validate() == nil {
t.Fatal("expected error validating") t.Fatal("expected error validating")
} }
cfg.Default() cfg.Default()
cfg.HashiraftCfg = nil cfg.RaftConfig = nil
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.CommitRetries = -1
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.WaitForLeaderTimeout = 0
if cfg.Validate() == nil { if cfg.Validate() == nil {
t.Fatal("expected error validating") t.Fatal("expected error validating")
} }

View File

@ -22,27 +22,20 @@ import (
var logger = logging.Logger("consensus") var logger = logging.Logger("consensus")
// LeaderTimeout specifies how long to wait before failing an operation
// because there is no leader
var LeaderTimeout = 15 * time.Second
// CommitRetries specifies how many times we retry a failed commit until
// we give up
var CommitRetries = 2
// Consensus handles the work of keeping a shared-state between // Consensus handles the work of keeping a shared-state between
// the peers of an IPFS Cluster, as well as modifying that state and // the peers of an IPFS Cluster, as well as modifying that state and
// applying any updates in a thread-safe manner. // applying any updates in a thread-safe manner.
type Consensus struct { type Consensus struct {
ctx context.Context ctx context.Context
cancel func() cancel func()
config *Config
host host.Host host host.Host
consensus consensus.OpLogConsensus consensus consensus.OpLogConsensus
actor consensus.Actor actor consensus.Actor
baseOp *LogOp baseOp *LogOp
raft *Raft raft *raftWrapper
rpcClient *rpc.Client rpcClient *rpc.Client
rpcReady chan struct{} rpcReady chan struct{}
@ -67,8 +60,9 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta
logger.Infof("starting Consensus and waiting for a leader...") logger.Infof("starting Consensus and waiting for a leader...")
consensus := libp2praft.NewOpLog(state, op) consensus := libp2praft.NewOpLog(state, op)
raft, err := NewRaft(clusterPeers, host, cfg, consensus.FSM()) raft, err := newRaftWrapper(clusterPeers, host, cfg, consensus.FSM())
if err != nil { if err != nil {
logger.Error("error creating raft: ", err)
return nil, err return nil, err
} }
actor := libp2praft.NewActor(raft.raft) actor := libp2praft.NewActor(raft.raft)
@ -80,6 +74,7 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta
cc := &Consensus{ cc := &Consensus{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
config: cfg,
host: host, host: host,
consensus: consensus, consensus: consensus,
actor: actor, actor: actor,
@ -95,9 +90,11 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta
// WaitForSync waits for a leader and for the state to be up to date, then returns. // WaitForSync waits for a leader and for the state to be up to date, then returns.
func (cc *Consensus) WaitForSync() error { func (cc *Consensus) WaitForSync() error {
leaderCtx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout) leaderCtx, cancel := context.WithTimeout(
cc.ctx,
cc.config.WaitForLeaderTimeout)
defer cancel() defer cancel()
err := cc.raft.WaitForLeader(leaderCtx) _, err := cc.raft.WaitForLeader(leaderCtx)
if err != nil { if err != nil {
return errors.New("error waiting for leader: " + err.Error()) return errors.New("error waiting for leader: " + err.Error())
} }
@ -160,26 +157,15 @@ func (cc *Consensus) Shutdown() error {
logger.Info("stopping Consensus component") logger.Info("stopping Consensus component")
cc.cancel() // Raft Shutdown
close(cc.rpcReady) err := cc.raft.Shutdown()
// Raft shutdown
errMsgs := ""
err := cc.raft.Snapshot()
if err != nil { if err != nil {
errMsgs += err.Error() logger.Error(err)
} return err
err = cc.raft.Shutdown()
if err != nil {
errMsgs += err.Error()
}
if errMsgs != "" {
errMsgs += "Consensus shutdown unsuccessful"
logger.Error(errMsgs)
return errors.New(errMsgs)
} }
cc.shutdown = true cc.shutdown = true
cc.cancel()
close(cc.rpcReady)
return nil return nil
} }
@ -214,176 +200,155 @@ func (cc *Consensus) op(argi interface{}, t LogOpType) *LogOp {
} }
// returns true if the operation was redirected to the leader // returns true if the operation was redirected to the leader
// note that if the leader just dissappeared, the rpc call will
// fail because we haven't heard that it's gone.
func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) { func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) {
leader, err := cc.Leader()
if err != nil {
rctx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout)
defer cancel()
err := cc.raft.WaitForLeader(rctx)
if err != nil {
return false, err
}
}
if leader == cc.host.ID() {
return false, nil
}
err = cc.rpcClient.Call(
leader,
"Cluster",
method,
arg,
&struct{}{})
return true, err
}
func (cc *Consensus) logOpCid(rpcOp string, opType LogOpType, pin api.Pin) error {
var finalErr error var finalErr error
for i := 0; i < CommitRetries; i++ {
logger.Debugf("Try %d", i) // Retry redirects
redirected, err := cc.redirectToLeader( for i := 0; i <= cc.config.CommitRetries; i++ {
rpcOp, pin.ToSerial()) logger.Debugf("redirect try %d", i)
leader, err := cc.Leader()
// No leader, wait for one
if err != nil { if err != nil {
finalErr = err logger.Warningf("there seems to be no leader. Waiting for one")
rctx, cancel := context.WithTimeout(
cc.ctx,
cc.config.WaitForLeaderTimeout)
defer cancel()
pidstr, err := cc.raft.WaitForLeader(rctx)
// means we timed out waiting for a leader
// we don't retry in this case
if err != nil {
return false, errors.New("timed out waiting for leader")
}
leader, err = peer.IDB58Decode(pidstr)
if err != nil {
return false, err
}
}
// We are the leader. Do not redirect
if leader == cc.host.ID() {
return false, nil
}
logger.Debugf("redirecting to leader: %s", leader)
finalErr = cc.rpcClient.Call(
leader,
"Cluster",
method,
arg,
&struct{}{})
if finalErr != nil {
logger.Error(finalErr)
logger.Info("retrying to redirect request to leader")
time.Sleep(2 * cc.config.RaftConfig.HeartbeatTimeout)
continue continue
} }
if redirected {
return nil
}
// It seems WE are the leader.
op := cc.op(pin, opType)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log
finalErr = err
time.Sleep(200 * time.Millisecond)
continue
}
finalErr = nil
break break
} }
if finalErr != nil {
return finalErr
}
switch opType { // We tried to redirect, but something happened
case LogOpPin: return true, finalErr
logger.Infof("pin committed to global state: %s", pin.Cid) }
case LogOpUnpin:
logger.Infof("unpin committed to global state: %s", pin.Cid) // commit submits a cc.consensus commit. It retries upon failures.
func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) error {
var finalErr error
for i := 0; i <= cc.config.CommitRetries; i++ {
logger.Debug("attempt #%d: committing %+v", i, op)
// this means we are retrying
if finalErr != nil {
logger.Error("retrying upon failed commit (retry %d): ",
i, finalErr)
}
// try to send it to the leader
// redirectToLeader has it's own retry loop. If this fails
// we're done here.
ok, err := cc.redirectToLeader(rpcOp, redirectArg)
if err != nil || ok {
return err
}
// Being here means we are the LEADER. We can commit.
// now commit the changes to our state
_, finalErr := cc.consensus.CommitOp(op)
if finalErr != nil {
goto RETRY
}
// addPeer and rmPeer need to apply the change to Raft directly.
switch op.Type {
case LogOpAddPeer:
pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr())
finalErr = cc.raft.AddPeer(pidstr)
if finalErr != nil {
goto RETRY
}
logger.Infof("peer committed to global state: %s", pidstr)
case LogOpRmPeer:
pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr())
finalErr = cc.raft.RemovePeer(pidstr)
if finalErr != nil {
goto RETRY
}
logger.Infof("peer removed from global state: %s", pidstr)
}
break
RETRY:
time.Sleep(cc.config.CommitRetryDelay)
} }
return nil return finalErr
} }
// LogPin submits a Cid to the shared state of the cluster. It will forward // LogPin submits a Cid to the shared state of the cluster. It will forward
// the operation to the leader if this is not it. // the operation to the leader if this is not it.
func (cc *Consensus) LogPin(c api.Pin) error { func (cc *Consensus) LogPin(pin api.Pin) error {
return cc.logOpCid("ConsensusLogPin", LogOpPin, c) op := cc.op(pin, LogOpPin)
err := cc.commit(op, "ConsensusLogPin", pin.ToSerial())
if err != nil {
return err
}
logger.Infof("pin committed to global state: %s", pin.Cid)
return nil
} }
// LogUnpin removes a Cid from the shared state of the cluster. // LogUnpin removes a Cid from the shared state of the cluster.
func (cc *Consensus) LogUnpin(c api.Pin) error { func (cc *Consensus) LogUnpin(pin api.Pin) error {
return cc.logOpCid("ConsensusLogUnpin", LogOpUnpin, c) op := cc.op(pin, LogOpUnpin)
err := cc.commit(op, "ConsensusLogUnpin", pin.ToSerial())
if err != nil {
return err
}
logger.Infof("unpin committed to global state: %s", pin.Cid)
return nil
} }
// LogAddPeer submits a new peer to the shared state of the cluster. It will // LogAddPeer submits a new peer to the shared state of the cluster. It will
// forward the operation to the leader if this is not it. // forward the operation to the leader if this is not it.
func (cc *Consensus) LogAddPeer(addr ma.Multiaddr) error { func (cc *Consensus) LogAddPeer(addr ma.Multiaddr) error {
var finalErr error addrS := api.MultiaddrToSerial(addr)
for i := 0; i < CommitRetries; i++ { op := cc.op(addr, LogOpAddPeer)
logger.Debugf("Try %d", i) return cc.commit(op, "ConsensusLogAddPeer", addrS)
redirected, err := cc.redirectToLeader(
"ConsensusLogAddPeer", api.MultiaddrToSerial(addr))
if err != nil {
finalErr = err
continue
}
if redirected {
return nil
}
// It seems WE are the leader.
pidStr, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
return err
}
pid, err := peer.IDB58Decode(pidStr)
if err != nil {
return err
}
// Create pin operation for the log
op := cc.op(addr, LogOpAddPeer)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log
finalErr = err
time.Sleep(200 * time.Millisecond)
continue
}
err = cc.raft.AddPeer(peer.IDB58Encode(pid))
if err != nil {
finalErr = err
continue
}
finalErr = nil
break
}
if finalErr != nil {
return finalErr
}
logger.Infof("peer committed to global state: %s", addr)
return nil
} }
// LogRmPeer removes a peer from the shared state of the cluster. It will // LogRmPeer removes a peer from the shared state of the cluster. It will
// forward the operation to the leader if this is not it. // forward the operation to the leader if this is not it.
func (cc *Consensus) LogRmPeer(pid peer.ID) error { func (cc *Consensus) LogRmPeer(pid peer.ID) error {
var finalErr error // Create rmPeer operation for the log
for i := 0; i < CommitRetries; i++ { addr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(pid))
logger.Debugf("Try %d", i) if err != nil {
redirected, err := cc.redirectToLeader("ConsensusLogRmPeer", pid) return err
if err != nil {
finalErr = err
continue
}
if redirected {
return nil
}
// It seems WE are the leader.
// Create pin operation for the log
addr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(pid))
if err != nil {
return err
}
op := cc.op(addr, LogOpRmPeer)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log
finalErr = err
continue
}
err = cc.raft.RemovePeer(peer.IDB58Encode(pid))
if err != nil {
finalErr = err
time.Sleep(200 * time.Millisecond)
continue
}
finalErr = nil
break
} }
if finalErr != nil { op := cc.op(addr, LogOpRmPeer)
return finalErr return cc.commit(op, "ConsensusLogRmPeer", pid)
}
logger.Infof("peer removed from global state: %s", pid)
return nil
} }
// State retrieves the current consensus State. It may error // State retrieves the current consensus State. It may error
@ -405,6 +370,7 @@ func (cc *Consensus) State() (state.State, error) {
// Leader returns the peerID of the Leader of the // Leader returns the peerID of the Leader of the
// cluster. It returns an error when there is no leader. // cluster. It returns an error when there is no leader.
func (cc *Consensus) Leader() (peer.ID, error) { func (cc *Consensus) Leader() (peer.ID, error) {
// Note the hard-dependency on raft here...
raftactor := cc.actor.(*libp2praft.Actor) raftactor := cc.actor.(*libp2praft.Actor)
return raftactor.Leader() return raftactor.Leader()
} }
@ -413,5 +379,16 @@ func (cc *Consensus) Leader() (peer.ID, error) {
// state with the state provided. Only the consensus leader // state with the state provided. Only the consensus leader
// can perform this operation. // can perform this operation.
func (cc *Consensus) Rollback(state state.State) error { func (cc *Consensus) Rollback(state state.State) error {
// This is unused. It *might* be used for upgrades.
// There is rather untested magic in libp2p-raft's FSM()
// to make this possible.
return cc.consensus.Rollback(state) return cc.consensus.Rollback(state)
} }
func parsePIDFromMultiaddr(addr ma.Multiaddr) string {
pidstr, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
panic("peer badly encoded")
}
return pidstr
}

View File

@ -60,7 +60,7 @@ func testingConsensus(t *testing.T, port int) *Consensus {
if err != nil { if err != nil {
t.Fatal("cannot create Consensus:", err) t.Fatal("cannot create Consensus:", err)
} }
cc.SetClient(test.NewMockRPCClient(t)) cc.SetClient(test.NewMockRPCClientWithHost(t, h))
<-cc.Ready() <-cc.Ready()
return cc return cc
} }
@ -68,18 +68,23 @@ func testingConsensus(t *testing.T, port int) *Consensus {
func TestShutdownConsensus(t *testing.T) { func TestShutdownConsensus(t *testing.T) {
// Bring it up twice to make sure shutdown cleans up properly // Bring it up twice to make sure shutdown cleans up properly
// but also to make sure raft comes up ok when re-initialized // but also to make sure raft comes up ok when re-initialized
defer cleanRaft(p2pPort)
cc := testingConsensus(t, p2pPort) cc := testingConsensus(t, p2pPort)
err := cc.Shutdown() err := cc.Shutdown()
if err != nil { if err != nil {
t.Fatal("Consensus cannot shutdown:", err) t.Fatal("Consensus cannot shutdown:", err)
} }
cc.Shutdown() err = cc.Shutdown() // should be fine to shutdown twice
if err != nil {
t.Fatal("Consensus should be able to shutdown several times")
}
cleanRaft(p2pPort)
cc = testingConsensus(t, p2pPort) cc = testingConsensus(t, p2pPort)
err = cc.Shutdown() err = cc.Shutdown()
if err != nil { if err != nil {
t.Fatal("Consensus cannot shutdown:", err) t.Fatal("Consensus cannot shutdown:", err)
} }
cleanRaft(p2pPort)
} }
func TestConsensusPin(t *testing.T) { func TestConsensusPin(t *testing.T) {
@ -129,6 +134,7 @@ func TestConsensusLogAddPeer(t *testing.T) {
addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPortAlt)) addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPortAlt))
haddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", cc2.host.ID().Pretty())) haddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", cc2.host.ID().Pretty()))
cc.host.Peerstore().AddAddr(cc2.host.ID(), addr, peerstore.TempAddrTTL) cc.host.Peerstore().AddAddr(cc2.host.ID(), addr, peerstore.TempAddrTTL)
err := cc.LogAddPeer(addr.Encapsulate(haddr)) err := cc.LogAddPeer(addr.Encapsulate(haddr))
if err != nil { if err != nil {
@ -138,13 +144,44 @@ func TestConsensusLogAddPeer(t *testing.T) {
func TestConsensusLogRmPeer(t *testing.T) { func TestConsensusLogRmPeer(t *testing.T) {
cc := testingConsensus(t, p2pPort) cc := testingConsensus(t, p2pPort)
cc2 := testingConsensus(t, p2pPortAlt)
defer cleanRaft(p2pPort) defer cleanRaft(p2pPort)
defer cleanRaft(p2pPortAlt)
defer cc.Shutdown() defer cc.Shutdown()
defer cc2.Shutdown()
err := cc.LogRmPeer(test.TestPeerID1) addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPortAlt))
haddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", cc2.host.ID().Pretty()))
cc.host.Peerstore().AddAddr(cc2.host.ID(), addr, peerstore.TempAddrTTL)
err := cc.LogAddPeer(addr.Encapsulate(haddr))
if err != nil {
t.Error("could not add peer:", err)
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
cc.raft.WaitForLeader(ctx)
c, _ := cid.Decode(test.TestCid1)
err = cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1})
if err != nil {
t.Error("could not pin after adding peer:", err)
}
time.Sleep(2 * time.Second)
// Remove unexisting peer
err = cc.LogRmPeer(test.TestPeerID1)
if err != nil { if err != nil {
t.Error("the operation did not make it to the log:", err) t.Error("the operation did not make it to the log:", err)
} }
// Remove real peer. At least the leader can succeed
err = cc2.LogRmPeer(cc.host.ID())
err2 := cc.LogRmPeer(cc2.host.ID())
if err != nil && err2 != nil {
t.Error("could not remove peer:", err, err2)
}
} }
func TestConsensusLeader(t *testing.T) { func TestConsensusLeader(t *testing.T) {

View File

@ -10,7 +10,6 @@ import (
rpc "github.com/hsanjuan/go-libp2p-gorpc" rpc "github.com/hsanjuan/go-libp2p-gorpc"
consensus "github.com/libp2p/go-libp2p-consensus" consensus "github.com/libp2p/go-libp2p-consensus"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
) )
// Type of consensus operation // Type of consensus operation
@ -46,8 +45,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
switch op.Type { switch op.Type {
case LogOpPin: case LogOpPin:
arg := op.Cid.ToPin() err = state.Add(op.Cid.ToPin())
err = state.Add(arg)
if err != nil { if err != nil {
goto ROLLBACK goto ROLLBACK
} }
@ -55,12 +53,11 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
op.rpcClient.Go("", op.rpcClient.Go("",
"Cluster", "Cluster",
"Track", "Track",
arg.ToSerial(), op.Cid,
&struct{}{}, &struct{}{},
nil) nil)
case LogOpUnpin: case LogOpUnpin:
arg := op.Cid.ToPin() err = state.Rm(op.Cid.ToPin().Cid)
err = state.Rm(arg.Cid)
if err != nil { if err != nil {
goto ROLLBACK goto ROLLBACK
} }
@ -68,23 +65,18 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
op.rpcClient.Go("", op.rpcClient.Go("",
"Cluster", "Cluster",
"Untrack", "Untrack",
arg.ToSerial(), op.Cid,
&struct{}{}, &struct{}{},
nil) nil)
case LogOpAddPeer: case LogOpAddPeer:
addr := op.Peer.ToMultiaddr()
op.rpcClient.Call("", op.rpcClient.Call("",
"Cluster", "Cluster",
"PeerManagerAddPeer", "PeerManagerAddPeer",
api.MultiaddrToSerial(addr), op.Peer,
&struct{}{}) &struct{}{})
// TODO rebalance ops // TODO rebalance ops
case LogOpRmPeer: case LogOpRmPeer:
addr := op.Peer.ToMultiaddr() pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr())
pidstr, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
panic("peer badly encoded")
}
pid, err := peer.IDB58Decode(pidstr) pid, err := peer.IDB58Decode(pidstr)
if err != nil { if err != nil {
panic("could not decode a PID we ourselves encoded") panic("could not decode a PID we ourselves encoded")

View File

@ -7,9 +7,23 @@ import (
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
) )
const (
debug = iota
info
warn
err
)
// This provides a custom logger for Raft which intercepts Raft log messages // This provides a custom logger for Raft which intercepts Raft log messages
// and rewrites us to our own logger (for "raft" facility). // and rewrites us to our own logger (for "raft" facility).
type logForwarder struct{} type logForwarder struct {
last map[int]*lastMsg
}
type lastMsg struct {
msg string
tipped bool
}
var raftStdLogger = log.New(&logForwarder{}, "", 0) var raftStdLogger = log.New(&logForwarder{}, "", 0)
var raftLogger = logging.Logger("raft") var raftLogger = logging.Logger("raft")
@ -17,19 +31,61 @@ var raftLogger = logging.Logger("raft")
// Write forwards to our go-log logger. // Write forwards to our go-log logger.
// According to https://golang.org/pkg/log/#Logger.Output // According to https://golang.org/pkg/log/#Logger.Output
// it is called per line. // it is called per line.
func (fw *logForwarder) Write(p []byte) (n int, err error) { func (fw *logForwarder) Write(p []byte) (n int, e error) {
t := strings.TrimSuffix(string(p), "\n") t := strings.TrimSuffix(string(p), "\n")
switch { switch {
case strings.Contains(t, "[DEBUG]"): case strings.Contains(t, "[DEBUG]"):
raftLogger.Debug(strings.TrimPrefix(t, "[DEBUG] raft: ")) if !fw.repeated(debug, t) {
fw.log(debug, strings.TrimPrefix(t, "[DEBUG] raft: "))
}
case strings.Contains(t, "[WARN]"): case strings.Contains(t, "[WARN]"):
raftLogger.Warning(strings.TrimPrefix(t, "[WARN] raft: ")) if !fw.repeated(warn, t) {
fw.log(warn, strings.TrimPrefix(t, "[WARN] raft: "))
}
case strings.Contains(t, "[ERR]"): case strings.Contains(t, "[ERR]"):
raftLogger.Error(strings.TrimPrefix(t, "[ERR] raft: ")) if !fw.repeated(err, t) {
fw.log(err, strings.TrimPrefix(t, "[ERR] raft: "))
}
case strings.Contains(t, "[INFO]"): case strings.Contains(t, "[INFO]"):
raftLogger.Info(strings.TrimPrefix(t, "[INFO] raft: ")) if !fw.repeated(info, t) {
fw.log(info, strings.TrimPrefix(t, "[INFO] raft: "))
}
default: default:
raftLogger.Debug(t) fw.log(debug, t)
} }
return len(p), nil return len(p), nil
} }
func (fw *logForwarder) repeated(t int, msg string) bool {
if fw.last == nil {
fw.last = make(map[int]*lastMsg)
}
last, ok := fw.last[t]
if !ok || last.msg != msg {
fw.last[t] = &lastMsg{msg, false}
return false
} else {
if !last.tipped {
fw.log(t, "NOTICE: The last RAFT log message repeats and will only be logged once")
last.tipped = true
}
return true
}
}
func (fw *logForwarder) log(t int, msg string) {
switch t {
case debug:
raftLogger.Debug(msg)
case info:
raftLogger.Info(msg)
case warn:
raftLogger.Warning(msg)
case err:
raftLogger.Error(msg)
default:
raftLogger.Debug(msg)
}
}

View File

@ -5,173 +5,237 @@ import (
"errors" "errors"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"time" "time"
hashiraft "github.com/hashicorp/raft" hraft "github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb" raftboltdb "github.com/hashicorp/raft-boltdb"
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
libp2praft "github.com/libp2p/go-libp2p-raft" p2praft "github.com/libp2p/go-libp2p-raft"
) )
// RaftMaxSnapshots indicates how many snapshots to keep in the consensus data // RaftMaxSnapshots indicates how many snapshots to keep in the consensus data
// folder. // folder.
// TODO: Maybe include this in Config. Not sure how useful it is to touch
// this anyways.
var RaftMaxSnapshots = 5 var RaftMaxSnapshots = 5
// is this running 64 bits arch? https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ // Are we compiled on a 64-bit architecture?
// https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ
// This is used below because raft Observers panic on 32-bit.
const sixtyfour = uint64(^uint(0)) == ^uint64(0) const sixtyfour = uint64(^uint(0)) == ^uint64(0)
// Raft performs all Raft-specific operations which are needed by Cluster but // raftWrapper performs all Raft-specific operations which are needed by
// are not fulfilled by the consensus interface. It should contain most of the // Cluster but are not fulfilled by the consensus interface. It should contain
// Raft-related stuff so it can be easily replaced in the future, if need be. // most of the Raft-related stuff so it can be easily replaced in the future,
type Raft struct { // if need be.
raft *hashiraft.Raft type raftWrapper struct {
transport *libp2praft.Libp2pTransport raft *hraft.Raft
snapshotStore hashiraft.SnapshotStore srvConfig hraft.Configuration
logStore hashiraft.LogStore transport *hraft.NetworkTransport
stableStore hashiraft.StableStore snapshotStore hraft.SnapshotStore
peerstore *libp2praft.Peerstore logStore hraft.LogStore
stableStore hraft.StableStore
boltdb *raftboltdb.BoltStore boltdb *raftboltdb.BoltStore
dataFolder string
} }
// NewRaft launches a go-libp2p-raft consensus peer. // newRaft launches a go-libp2p-raft consensus peer.
func NewRaft(peers []peer.ID, host host.Host, cfg *Config, fsm hashiraft.FSM) (*Raft, error) { func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM) (*raftWrapper, error) {
// Set correct LocalID
cfg.RaftConfig.LocalID = hraft.ServerID(peer.IDB58Encode(host.ID()))
// Prepare data folder
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return nil, err
}
srvCfg := makeServerConf(peers)
logger.Debug("creating libp2p Raft transport") logger.Debug("creating libp2p Raft transport")
transport, err := libp2praft.NewLibp2pTransportWithHost(host) transport, err := p2praft.NewLibp2pTransport(host, cfg.NetworkTimeout)
if err != nil { if err != nil {
logger.Error("creating libp2p-raft transport: ", err)
return nil, err return nil, err
} }
pstore := &libp2praft.Peerstore{} logger.Debug("creating raft snapshot store")
peersStr := make([]string, len(peers), len(peers)) snapshots, err := hraft.NewFileSnapshotStoreWithLogger(
for i, p := range peers { dataFolder, RaftMaxSnapshots, raftStdLogger)
peersStr[i] = peer.IDB58Encode(p)
}
pstore.SetPeers(peersStr)
logger.Debug("creating file snapshot store")
dataFolder := cfg.DataFolder
if dataFolder == "" {
dataFolder = filepath.Join(cfg.BaseDir, DefaultDataSubFolder)
}
err = os.MkdirAll(dataFolder, 0700)
if err != nil { if err != nil {
logger.Errorf("creating cosensus data folder (%s): %s",
dataFolder, err)
return nil, err
}
snapshots, err := hashiraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, raftStdLogger)
if err != nil {
logger.Error("creating file snapshot store: ", err)
return nil, err return nil, err
} }
logger.Debug("creating BoltDB log store") logger.Debug("creating BoltDB log store")
logStore, err := raftboltdb.NewBoltStore(filepath.Join(dataFolder, "raft.db")) logStore, err := raftboltdb.NewBoltStore(
filepath.Join(dataFolder, "raft.db"))
if err != nil { if err != nil {
logger.Error("creating bolt store: ", err)
return nil, err return nil, err
} }
logger.Debug("checking for existing raft states")
hasState, err := hraft.HasExistingState(logStore, logStore, snapshots)
if err != nil {
return nil, err
}
if !hasState {
logger.Info("bootstrapping raft cluster")
err := hraft.BootstrapCluster(cfg.RaftConfig,
logStore, logStore, snapshots, transport, srvCfg)
if err != nil {
logger.Error("bootstrapping cluster: ", err)
return nil, err
}
} else {
logger.Info("raft cluster is already bootstrapped")
}
logger.Debug("creating Raft") logger.Debug("creating Raft")
r, err := hashiraft.NewRaft(cfg.HashiraftCfg, fsm, logStore, logStore, snapshots, pstore, transport) r, err := hraft.NewRaft(cfg.RaftConfig,
fsm, logStore, logStore, snapshots, transport)
if err != nil { if err != nil {
logger.Error("initializing raft: ", err) logger.Error("initializing raft: ", err)
return nil, err return nil, err
} }
raft := &Raft{ raftW := &raftWrapper{
raft: r, raft: r,
srvConfig: srvCfg,
transport: transport, transport: transport,
snapshotStore: snapshots, snapshotStore: snapshots,
logStore: logStore, logStore: logStore,
stableStore: logStore, stableStore: logStore,
peerstore: pstore,
boltdb: logStore, boltdb: logStore,
dataFolder: dataFolder,
} }
return raft, nil // Handle existing, different configuration
if hasState {
logger.Error("has state")
cf := r.GetConfiguration()
if err := cf.Error(); err != nil {
return nil, err
}
currentCfg := cf.Configuration()
added, removed := diffConfigurations(srvCfg, currentCfg)
if len(added)+len(removed) > 0 {
raftW.Shutdown()
logger.Error("Raft peers do not match cluster peers")
logger.Error("Aborting. Please clean this peer")
return nil, errors.New("Bad cluster peers")
}
}
return raftW, nil
}
// returns the folder path after creating it.
// if folder is empty, it uses baseDir+Default.
func makeDataFolder(baseDir, folder string) (string, error) {
if folder == "" {
folder = filepath.Join(baseDir, DefaultDataSubFolder)
}
err := os.MkdirAll(folder, 0700)
if err != nil {
return "", err
}
return folder, nil
}
// create Raft servers configuration
func makeServerConf(peers []peer.ID) hraft.Configuration {
sm := make(map[string]struct{})
servers := make([]hraft.Server, 0)
for _, pid := range peers {
p := peer.IDB58Encode(pid)
_, ok := sm[p]
if !ok { // avoid dups
sm[p] = struct{}{}
servers = append(servers, hraft.Server{
Suffrage: hraft.Voter,
ID: hraft.ServerID(p),
Address: hraft.ServerAddress(p),
})
}
}
return hraft.Configuration{
Servers: servers,
}
}
// diffConfigurations returns the serverIDs added and removed from
// c2 in relation to c1.
func diffConfigurations(
c1, c2 hraft.Configuration) (added, removed []hraft.ServerID) {
m1 := make(map[hraft.ServerID]struct{})
m2 := make(map[hraft.ServerID]struct{})
added = make([]hraft.ServerID, 0)
removed = make([]hraft.ServerID, 0)
for _, s := range c1.Servers {
m1[s.ID] = struct{}{}
}
for _, s := range c2.Servers {
m2[s.ID] = struct{}{}
}
for k, _ := range m1 {
_, ok := m2[k]
if !ok {
removed = append(removed, k)
}
}
for k, _ := range m2 {
_, ok := m1[k]
if !ok {
added = append(added, k)
}
}
return
} }
// WaitForLeader holds until Raft says we have a leader. // WaitForLeader holds until Raft says we have a leader.
// Returns an error if we don't. // Returns uf ctx is cancelled.
func (r *Raft) WaitForLeader(ctx context.Context) error { func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) {
// Using Raft observers panics on non-64 architectures. obsCh := make(chan hraft.Observation, 1)
// This is a work around if sixtyfour { // 32-bit systems don't support observers
if sixtyfour { observer := hraft.NewObserver(obsCh, false, nil)
return r.waitForLeader(ctx) rw.raft.RegisterObserver(observer)
defer rw.raft.DeregisterObserver(observer)
} }
return r.waitForLeaderLegacy(ctx) ticker := time.NewTicker(time.Second / 2)
}
func (r *Raft) waitForLeader(ctx context.Context) error {
obsCh := make(chan hashiraft.Observation, 1)
filter := func(o *hashiraft.Observation) bool {
switch o.Data.(type) {
case hashiraft.LeaderObservation:
return true
default:
return false
}
}
observer := hashiraft.NewObserver(obsCh, false, filter)
r.raft.RegisterObserver(observer)
defer r.raft.DeregisterObserver(observer)
ticker := time.NewTicker(time.Second)
for { for {
select { select {
case obs := <-obsCh: case obs := <-obsCh:
switch obs.Data.(type) { _ = obs
case hashiraft.LeaderObservation: // See https://github.com/hashicorp/raft/issues/254
leaderObs := obs.Data.(hashiraft.LeaderObservation) // switch obs.Data.(type) {
logger.Infof("Raft Leader elected: %s", leaderObs.Leader) // case hraft.LeaderObservation:
return nil // lObs := obs.Data.(hraft.LeaderObservation)
} // logger.Infof("Raft Leader elected: %s",
// lObs.Leader)
// return string(lObs.Leader), nil
// }
case <-ticker.C: case <-ticker.C:
if l := r.raft.Leader(); l != "" { //we missed or there was no election if l := rw.raft.Leader(); l != "" {
logger.Debug("waitForleaderTimer") logger.Debug("waitForleaderTimer")
logger.Infof("Raft Leader elected: %s", l) logger.Infof("Raft Leader elected: %s", l)
ticker.Stop() ticker.Stop()
return nil return string(l), nil
} }
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return "", ctx.Err()
}
}
}
// 32-bit systems should use this.
func (r *Raft) waitForLeaderLegacy(ctx context.Context) error {
for {
leader := r.raft.Leader()
if leader != "" {
logger.Infof("Raft Leader elected: %s", leader)
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
default:
time.Sleep(500 * time.Millisecond)
} }
} }
} }
// WaitForUpdates holds until Raft has synced to the last index in the log // WaitForUpdates holds until Raft has synced to the last index in the log
func (r *Raft) WaitForUpdates(ctx context.Context) error { func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
logger.Debug("Raft state is catching up") logger.Info("Raft state is catching up")
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
default: default:
lai := r.raft.AppliedIndex() lai := rw.raft.AppliedIndex()
li := r.raft.LastIndex() li := rw.raft.LastIndex()
logger.Debugf("current Raft index: %d/%d", logger.Debugf("current Raft index: %d/%d",
lai, li) lai, li)
if lai == li { if lai == li {
@ -183,25 +247,25 @@ func (r *Raft) WaitForUpdates(ctx context.Context) error {
} }
// Snapshot tells Raft to take a snapshot. // Snapshot tells Raft to take a snapshot.
func (r *Raft) Snapshot() error { func (rw *raftWrapper) Snapshot() error {
future := r.raft.Snapshot() future := rw.raft.Snapshot()
err := future.Error() err := future.Error()
if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") { if err != nil && err.Error() != hraft.ErrNothingNewToSnapshot.Error() {
return errors.New("could not take snapshot: " + err.Error()) return err
} }
return nil return nil
} }
// Shutdown shutdown Raft and closes the BoltDB. // Shutdown shutdown Raft and closes the BoltDB.
func (r *Raft) Shutdown() error { func (rw *raftWrapper) Shutdown() error {
future := r.raft.Shutdown() future := rw.raft.Shutdown()
err := future.Error() err := future.Error()
errMsgs := "" errMsgs := ""
if err != nil { if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n" errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
} }
err = r.boltdb.Close() // important! err = rw.boltdb.Close() // important!
if err != nil { if err != nil {
errMsgs += "could not close boltdb: " + err.Error() errMsgs += "could not close boltdb: " + err.Error()
} }
@ -210,76 +274,88 @@ func (r *Raft) Shutdown() error {
return errors.New(errMsgs) return errors.New(errMsgs)
} }
// If the shutdown worked correctly
// (including snapshot) we can remove the Raft
// database (which traces peers additions
// and removals). It makes re-start of the peer
// way less confusing for Raft while the state
// can be restored from the snapshot.
//os.Remove(filepath.Join(r.dataFolder, "raft.db"))
return nil return nil
} }
// AddPeer adds a peer to Raft // AddPeer adds a peer to Raft
func (r *Raft) AddPeer(peer string) error { func (rw *raftWrapper) AddPeer(peer string) error {
if r.hasPeer(peer) { // Check that we don't have it to not waste
logger.Debug("skipping raft add as already in peer set") // log entries if so.
peers, err := rw.Peers()
if err != nil {
return err
}
if find(peers, peer) {
logger.Infof("%s is already a raft peer", peer)
return nil return nil
} }
future := r.raft.AddPeer(peer) future := rw.raft.AddVoter(
err := future.Error() hraft.ServerID(peer),
hraft.ServerAddress(peer),
0,
0) // TODO: Extra cfg value?
err = future.Error()
if err != nil { if err != nil {
logger.Error("raft cannot add peer: ", err) logger.Error("raft cannot add peer: ", err)
return err
} }
peers, _ := r.peerstore.Peers()
logger.Debugf("raft peerstore: %s", peers)
return err return err
} }
// RemovePeer removes a peer from Raft // RemovePeer removes a peer from Raft
func (r *Raft) RemovePeer(peer string) error { func (rw *raftWrapper) RemovePeer(peer string) error {
if !r.hasPeer(peer) { // Check that we have it to not waste
// log entries if we don't.
peers, err := rw.Peers()
if err != nil {
return err
}
if !find(peers, peer) {
logger.Infof("%s is not among raft peers", peer)
return nil return nil
} }
future := r.raft.RemovePeer(peer) if len(peers) == 1 && peers[0] == peer {
err := future.Error() return errors.New("cannot remove ourselves from a 1-peer cluster")
}
future := rw.raft.RemoveServer(
hraft.ServerID(peer),
0,
0) // TODO: Extra cfg value?
err = future.Error()
if err != nil { if err != nil {
logger.Error("raft cannot remove peer: ", err) logger.Error("raft cannot remove peer: ", err)
return err
} }
peers, _ := r.peerstore.Peers()
logger.Debugf("raft peerstore: %s", peers)
return err return err
} }
// func (r *Raft) SetPeers(peers []string) error {
// logger.Debugf("SetPeers(): %s", peers)
// future := r.raft.SetPeers(peers)
// err := future.Error()
// if err != nil {
// logger.Error(err)
// }
// return err
// }
// Leader returns Raft's leader. It may be an empty string if // Leader returns Raft's leader. It may be an empty string if
// there is no leader or it is unknown. // there is no leader or it is unknown.
func (r *Raft) Leader() string { func (rw *raftWrapper) Leader() string {
return r.raft.Leader() return string(rw.raft.Leader())
} }
func (r *Raft) hasPeer(peer string) bool { func (rw *raftWrapper) Peers() ([]string, error) {
found := false ids := make([]string, 0)
peers, _ := r.peerstore.Peers()
for _, p := range peers { configFuture := rw.raft.GetConfiguration()
if p == peer { if err := configFuture.Error(); err != nil {
found = true return nil, err
break
}
} }
return found for _, server := range configFuture.Configuration().Servers {
ids = append(ids, string(server.ID))
}
return ids, nil
}
func find(s []string, elem string) bool {
for _, selem := range s {
if selem == elem {
return true
}
}
return false
} }

View File

@ -15,21 +15,21 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmQA5mdxru8Bh6dpC9PJfSkumqnmHgJX7knxSgBo5Lpime", "hash": "QmefgzMbKZYsmHFkLqxgaTBG9ypeEjrdWRD5WXH4j1cWDL",
"name": "go-libp2p", "name": "go-libp2p",
"version": "4.3.12" "version": "5.0.5"
}, },
{ {
"author": "hsanjuan", "author": "hsanjuan",
"hash": "QmSLvGe32QcBEDjqrC93pnZzB6gh4VZTi2wVLt9EyUTUWX", "hash": "QmcjpDnmS6jGrYrTnuT4RDHUHduF97w2V8JuYs6eynbFg2",
"name": "go-libp2p-raft", "name": "go-libp2p-raft",
"version": "1.0.6" "version": "1.1.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD", "hash": "QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ",
"name": "go-cid", "name": "go-cid",
"version": "0.7.7" "version": "0.7.18"
}, },
{ {
"author": "urfave", "author": "urfave",
@ -39,9 +39,9 @@
}, },
{ {
"author": "hashicorp", "author": "hashicorp",
"hash": "QmfNTPUT6FKg2wyPcqgn3tKuGJWnvu6QqwHoo56FgcR4c2", "hash": "QmZa48BnsaEMVNf1hT2HYP2ak97fqyTnadXu6xSu2Y8xui",
"name": "raft-boltdb", "name": "raft-boltdb",
"version": "2017.7.27" "version": "2017.10.24"
}, },
{ {
"author": "gorilla", "author": "gorilla",
@ -51,15 +51,15 @@
}, },
{ {
"author": "hsanjuan", "author": "hsanjuan",
"hash": "QmayPizdYNaSKGyFFxcjKf4ZkZ6kriQePqZkFwZQyvteDp", "hash": "QmW2qYs8uJUSAsdGrWZo1LnzAwTQjPBRKk66X9H16EABMg",
"name": "go-libp2p-gorpc", "name": "go-libp2p-gorpc",
"version": "1.0.2" "version": "1.0.4"
}, },
{ {
"author": "libp2p", "author": "libp2p",
"hash": "QmTJoXQ24GqDf9MqAUwf3vW38HG6ahE9S7GzZoRMEeE8Kc", "hash": "QmcWmYQEQCrezztaQ81nXzMx2jaAEow17wdesDAjjR769r",
"name": "go-libp2p-pnet", "name": "go-libp2p-pnet",
"version": "2.2.4" "version": "2.3.1"
} }
], ],
"gxVersion": "0.11.0", "gxVersion": "0.11.0",

View File

@ -135,7 +135,10 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
// Now we shutdown one member of the running cluster // Now we shutdown one member of the running cluster
// and try to add someone else. // and try to add someone else.
clusters[1].Shutdown() err = clusters[1].Shutdown()
if err != nil {
t.Error("Shutdown should be clean: ", err)
}
_, err = clusters[0].PeerAdd(clusterAddr(clusters[2])) _, err = clusters[0].PeerAdd(clusterAddr(clusters[2]))
if err == nil { if err == nil {
@ -194,9 +197,23 @@ func TestClusterPeerRemoveSelf(t *testing.T) {
defer shutdownClusters(t, clusters, mocks) defer shutdownClusters(t, clusters, mocks)
for i := 0; i < len(clusters); i++ { for i := 0; i < len(clusters); i++ {
peers := clusters[i].Peers()
t.Logf("Current cluster size: %d", len(peers))
if len(peers) != (len(clusters) - i) {
t.Fatal("Previous peers not removed correctly")
}
err := clusters[i].PeerRemove(clusters[i].ID().ID) err := clusters[i].PeerRemove(clusters[i].ID().ID)
// Last peer member won't be able to remove itself
// In this case, we shut it down.
if err != nil { if err != nil {
t.Error(err) if i != len(clusters)-1 { //not last
t.Error(err)
} else {
err := clusters[i].Shutdown()
if err != nil {
t.Fatal(err)
}
}
} }
time.Sleep(time.Second) time.Sleep(time.Second)
_, more := <-clusters[i].Done() _, more := <-clusters[i].Done()
@ -383,3 +400,71 @@ func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
} }
runF(t, clusters, f2) runF(t, clusters, f2)
} }
// Tests that a peer catches up on the state correctly after rejoining
func TestClustersPeerRejoin(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
// pin something in c0
pin1, _ := cid.Decode(test.TestCid1)
err := clusters[0].Pin(api.PinCid(pin1))
if err != nil {
t.Fatal(err)
}
// add all clusters
for i := 1; i < len(clusters); i++ {
addr := clusterAddr(clusters[i])
_, err := clusters[0].PeerAdd(addr)
if err != nil {
t.Fatal(err)
}
}
delay()
// all added peers should have the content
for i := 1; i < len(clusters); i++ {
pinfo := clusters[i].tracker.Status(pin1)
if pinfo.Status != api.TrackerStatusPinned {
t.Error("Added peers should pin the content")
}
}
clusters[0].Shutdown()
mocks[0].Close()
//delay()
// Pin something on the rest
pin2, _ := cid.Decode(test.TestCid2)
err = clusters[1].Pin(api.PinCid(pin2))
if err != nil {
t.Fatal(err)
}
delay()
// Rejoin c0
c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret)
clusters[0] = c0
mocks[0] = m0
addr := clusterAddr(c0)
_, err = clusters[1].PeerAdd(addr)
if err != nil {
t.Fatal(err)
}
delay()
pinfo := clusters[0].tracker.Status(pin2)
if pinfo.Status != api.TrackerStatusPinned {
t.Error("re-joined cluster should have caught up")
}
pinfo = clusters[0].tracker.Status(pin1)
if pinfo.Status != api.TrackerStatusPinned {
t.Error("re-joined cluster should have original pin")
}
}

View File

@ -9,6 +9,7 @@ import (
rpc "github.com/hsanjuan/go-libp2p-gorpc" rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
) )
@ -21,8 +22,14 @@ type mockService struct{}
// NewMockRPCClient creates a mock ipfs-cluster RPC server and returns // NewMockRPCClient creates a mock ipfs-cluster RPC server and returns
// a client to it. // a client to it.
func NewMockRPCClient(t *testing.T) *rpc.Client { func NewMockRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock") return NewMockRPCClientWithHost(t, nil)
c := rpc.NewClientWithServer(nil, "mock", s) }
// NewMockRPCClientWithHost returns a mock ipfs-cluster RPC server
// initialized with a given host.
func NewMockRPCClientWithHost(t *testing.T, h host.Host) *rpc.Client {
s := rpc.NewServer(h, "mock")
c := rpc.NewClientWithServer(h, "mock", s)
err := s.RegisterName("Cluster", &mockService{}) err := s.RegisterName("Cluster", &mockService{})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -274,3 +281,11 @@ func (mock *mockService) IPFSFreeSpace(in struct{}, out *uint64) error {
*out = 98000 *out = 98000
return nil return nil
} }
func (mock *mockService) ConsensusLogAddPeer(in api.MultiaddrSerial, out *struct{}) error {
return errors.New("mock rpc cannot redirect")
}
func (mock *mockService) ConsensusLogRmPeer(in peer.ID, out *struct{}) error {
return errors.New("mock rpc cannot redirect")
}