Encapsulate Raft functions better and simplify the Consensus component

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-02-01 18:16:09 +01:00
parent 6c18c02106
commit 89ecc1ce89
9 changed files with 254 additions and 215 deletions

View File

@ -100,7 +100,7 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
cluster.rpcClient = rpcClient
// Setup Consensus
consensus, err := NewConsensus(cfg, host, state)
consensus, err := NewConsensus(pm.peers(), host, cfg.ConsensusDataFolder, state)
if err != nil {
logger.Errorf("error creating consensus: %s", err)
cluster.Shutdown()
@ -439,6 +439,7 @@ func (c *Cluster) Recover(h *cid.Cid) (GlobalPinInfo, error) {
func (c *Cluster) Pins() []*cid.Cid {
cState, err := c.consensus.State()
if err != nil {
logger.Error(err)
return []*cid.Cid{}
}
return cState.ListPins()

View File

@ -6,9 +6,7 @@ import (
"fmt"
"io/ioutil"
"sync"
"time"
hashiraft "github.com/hashicorp/raft"
crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
@ -55,9 +53,6 @@ type Config struct {
// the Consensus component.
ConsensusDataFolder string
// Hashicorp's Raft configuration
RaftConfig *hashiraft.Config
// if a config has been loaded from disk, track the path
// so it can be saved to the same place.
path string
@ -95,18 +90,6 @@ type JSONConfig struct {
// Storage folder for snapshots, log store etc. Used by
// the Consensus component.
ConsensusDataFolder string `json:"consensus_data_folder"`
// Raft configuration
RaftConfig *RaftConfig `json:"raft_config"`
}
// RaftConfig is a configuration section which affects the behaviour of
// the Raft component. See https://godoc.org/github.com/hashicorp/raft#Config
// for more information. Only the options below are customizable, the rest will
// take the default values from raft.DefaultConfig().
type RaftConfig struct {
SnapshotIntervalSeconds int `json:"snapshot_interval_seconds"`
EnableSingleNode bool `json:"enable_single_node"`
}
// ToJSONConfig converts a Config object to its JSON representation which
@ -140,10 +123,6 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) {
IPFSProxyListenMultiaddress: cfg.IPFSProxyAddr.String(),
IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(),
ConsensusDataFolder: cfg.ConsensusDataFolder,
RaftConfig: &RaftConfig{
SnapshotIntervalSeconds: int(cfg.RaftConfig.SnapshotInterval / time.Second),
EnableSingleNode: cfg.RaftConfig.EnableSingleNode,
},
}
return
}
@ -201,14 +180,6 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
return
}
raftCfg := hashiraft.DefaultConfig()
raftCfg.DisableBootstrapAfterElect = false
raftCfg.ShutdownOnRemove = false
if jcfg.RaftConfig != nil {
raftCfg.SnapshotInterval = time.Duration(jcfg.RaftConfig.SnapshotIntervalSeconds) * time.Second
raftCfg.EnableSingleNode = jcfg.RaftConfig.EnableSingleNode
}
c = &Config{
ID: id,
PrivateKey: pKey,
@ -217,7 +188,6 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
APIAddr: apiAddr,
IPFSProxyAddr: ipfsProxyAddr,
IPFSNodeAddr: ipfsNodeAddr,
RaftConfig: raftCfg,
ConsensusDataFolder: jcfg.ConsensusDataFolder,
}
return
@ -275,11 +245,6 @@ func NewDefaultConfig() (*Config, error) {
return nil, err
}
raftCfg := hashiraft.DefaultConfig()
raftCfg.DisableBootstrapAfterElect = false
raftCfg.EnableSingleNode = true
raftCfg.ShutdownOnRemove = false
clusterAddr, _ := ma.NewMultiaddr(DefaultClusterAddr)
apiAddr, _ := ma.NewMultiaddr(DefaultAPIAddr)
ipfsProxyAddr, _ := ma.NewMultiaddr(DefaultIPFSProxyAddr)
@ -294,7 +259,6 @@ func NewDefaultConfig() (*Config, error) {
IPFSProxyAddr: ipfsProxyAddr,
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: "ipfscluster-data",
RaftConfig: raftCfg,
}, nil
}

View File

@ -11,10 +11,6 @@ func testingConfig() *Config {
APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002",
IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001",
ConsensusDataFolder: "./raftFolderFromTests",
RaftConfig: &RaftConfig{
EnableSingleNode: true,
SnapshotIntervalSeconds: 120,
},
}
cfg, _ := jcfg.ToConfig()

View File

@ -3,7 +3,6 @@ package ipfscluster
import (
"context"
"errors"
"strings"
"sync"
"time"
@ -15,11 +14,6 @@ import (
libp2praft "github.com/libp2p/go-libp2p-raft"
)
const (
maxSnapshots = 5
raftSingleMode = true
)
// Type of pin operation
const (
LogOpPin = iota + 1
@ -101,13 +95,12 @@ ROLLBACK:
type Consensus struct {
ctx context.Context
cfg *Config
host host.Host
consensus consensus.OpLogConsensus
actor consensus.Actor
baseOp *clusterLogOp
p2pRaft *libp2pRaftWrap
raft *Raft
rpcClient *rpc.Client
rpcReady chan struct{}
@ -122,33 +115,33 @@ type Consensus struct {
// NewConsensus builds a new ClusterConsensus component. The state
// is used to initialize the Consensus system, so any information in it
// is discarded.
func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) {
func NewConsensus(clusterPeers []peer.ID, host host.Host, dataFolder string, state State) (*Consensus, error) {
ctx := context.Background()
op := &clusterLogOp{
ctx: context.Background(),
}
logger.Infof("starting Consensus and waiting for a leader...")
consensus := libp2praft.NewOpLog(state, op)
raft, err := NewRaft(clusterPeers, host, dataFolder, consensus.FSM())
if err != nil {
return nil, err
}
actor := libp2praft.NewActor(raft.raft)
consensus.SetActor(actor)
cc := &Consensus{
ctx: ctx,
cfg: cfg,
host: host,
consensus: consensus,
actor: actor,
baseOp: op,
raft: raft,
shutdownCh: make(chan struct{}, 1),
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
}
logger.Infof("starting Consensus and waiting leader...")
con, actor, wrapper, err := makeLibp2pRaft(cc.cfg,
cc.host, state, cc.baseOp)
if err != nil {
return nil, err
}
con.SetActor(actor)
cc.actor = actor
cc.consensus = con
cc.p2pRaft = wrapper
cc.run()
return cc, nil
}
@ -162,108 +155,67 @@ func (cc *Consensus) run() {
cc.ctx = ctx
cc.baseOp.ctx = ctx
leader, err := cc.waitForLeader()
if err != nil {
return
}
logger.Infof("Consensus leader found (%s). Syncing state...", leader.Pretty())
cc.waitForUpdates()
logger.Info("Consensus state is up to date")
// While rpc is not ready we cannot perform a sync
<-cc.rpcReady
var pInfo []PinInfo
_, err = cc.State()
// only check sync if we have a state
// avoid error on new running clusters
if err != nil {
logger.Debug("skipping state sync: ", err)
} else {
cc.rpcClient.Go(
"",
"Cluster",
"StateSync",
struct{}{},
&pInfo,
nil)
}
cc.readyCh <- struct{}{}
logger.Debug("consensus ready")
go func() {
cc.finishBootstrap()
}()
<-cc.shutdownCh
}()
}
// waits until there is a raft leader
func (cc *Consensus) waitForLeader() (peer.ID, error) {
// Wait for a leader
leader := peer.ID("")
var err error
rounds := 0
for {
select {
case <-cc.ctx.Done():
return "", errors.New("shutdown")
default:
if rounds%20 == 0 { //every 10 secs
logger.Info("Consensus is waiting for a leader...")
}
rounds++
time.Sleep(500 * time.Millisecond)
leader, err = cc.Leader()
if err == nil && leader != "" {
return leader, nil
}
}
}
return leader, nil
}
// waits until there is a consensus leader and syncs the state
// to the tracker
func (cc *Consensus) finishBootstrap() {
cc.raft.WaitForLeader(cc.ctx)
cc.raft.WaitForUpdates(cc.ctx)
logger.Info("Consensus state is up to date")
// waits until the appliedIndex is the same as the lastIndex
func (cc *Consensus) waitForUpdates() {
// Wait for state catch up
logger.Debug("consensus state is catching up")
time.Sleep(time.Second)
for {
select {
case <-cc.ctx.Done():
return
default:
lai := cc.p2pRaft.raft.AppliedIndex()
li := cc.p2pRaft.raft.LastIndex()
logger.Debugf("current Raft index: %d/%d",
lai, li)
if lai == li {
return
}
time.Sleep(500 * time.Millisecond)
}
}
}
// raft stores peer add/rm operations. This is how to force a peer set.
func (cc *Consensus) setPeers() {
logger.Debug("forcefully setting Raft peers to known set")
var peersStr []string
var peers []peer.ID
err := cc.rpcClient.Call("",
"Cluster",
"PeerManagerPeers",
struct{}{},
&peers)
if err != nil {
logger.Error(err)
// While rpc is not ready we cannot perform a sync
select {
case <-cc.ctx.Done():
return
case <-cc.rpcReady:
}
for _, p := range peers {
peersStr = append(peersStr, p.Pretty())
var pInfo []PinInfo
_, err := cc.State()
// only check sync if we have a state
// avoid error on new running clusters
if err != nil {
logger.Debug("skipping state sync: ", err)
} else {
cc.rpcClient.Go(
"",
"Cluster",
"StateSync",
struct{}{},
&pInfo,
nil)
}
cc.p2pRaft.raft.SetPeers(peersStr)
cc.readyCh <- struct{}{}
logger.Debug("consensus ready") // not accurate if we are shutting down
}
// // raft stores peer add/rm operations. This is how to force a peer set.
// func (cc *Consensus) setPeers() {
// logger.Debug("forcefully setting Raft peers to known set")
// var peersStr []string
// var peers []peer.ID
// err := cc.rpcClient.Call("",
// "Cluster",
// "PeerManagerPeers",
// struct{}{},
// &peers)
// if err != nil {
// logger.Error(err)
// return
// }
// for _, p := range peers {
// peersStr = append(peersStr, p.Pretty())
// }
// cc.p2pRaft.raft.SetPeers(peersStr)
// }
// Shutdown stops the component so it will not process any
// more updates. The underlying consensus is permanently
// shutdown, along with the libp2p transport.
@ -283,22 +235,13 @@ func (cc *Consensus) Shutdown() error {
// Raft shutdown
errMsgs := ""
f := cc.p2pRaft.raft.Snapshot()
err := f.Error()
if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") {
errMsgs += "could not take snapshot: " + err.Error() + ".\n"
}
f = cc.p2pRaft.raft.Shutdown()
err = f.Error()
err := cc.raft.Snapshot()
if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
errMsgs += err.Error()
}
err = cc.p2pRaft.boltdb.Close() // important!
err = cc.raft.Shutdown()
if err != nil {
errMsgs += "could not close boltdb: " + err.Error() + ".\n"
errMsgs += err.Error()
}
if errMsgs != "" {
@ -396,22 +339,18 @@ func (cc *Consensus) AddPeer(p peer.ID) error {
//if err != nil || redirected {
// return err
// }
// We are the leader
future := cc.p2pRaft.raft.AddPeer(peer.IDB58Encode(p))
err := future.Error()
return err
return cc.raft.AddPeer(peer.IDB58Encode(p))
}
// RemovePeer attempts to remove a peer from the consensus.
func (cc *Consensus) RemovePeer(p peer.ID) error {
//redirected, err := cc.redirectToLeader("ConsensusRmPeer", p)
//redirected, err := cc.redirectToLeader("ConsensusRemovePeer", p)
//if err != nil || redirected {
// return err
//}
future := cc.p2pRaft.raft.RemovePeer(peer.IDB58Encode(p))
err := future.Error()
return err
return cc.raft.RemovePeer(peer.IDB58Encode(p))
}
// State retrieves the current consensus State. It may error
@ -433,7 +372,6 @@ func (cc *Consensus) State() (State, error) {
// Leader returns the peerID of the Leader of the
// cluster. It returns an error when there is no leader.
func (cc *Consensus) Leader() (peer.ID, error) {
// FIXME: Hashicorp Raft specific
raftactor := cc.actor.(*libp2praft.Actor)
return raftactor.Leader()
}

View File

@ -7,6 +7,7 @@ import (
"time"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
func TestApplyToPin(t *testing.T) {
@ -92,7 +93,7 @@ func testingConsensus(t *testing.T) *Consensus {
t.Fatal("cannot create host:", err)
}
st := NewMapState()
cc, err := NewConsensus(cfg, h, st)
cc, err := NewConsensus([]peer.ID{cfg.ID}, h, cfg.ConsensusDataFolder, st)
if err != nil {
t.Fatal("cannot create Consensus:", err)
}

View File

@ -25,6 +25,7 @@ func SetLogLevel(l string) {
DEBUG
*/
logging.SetLogLevel("cluster", l)
//logging.SetLogLevel("raft", l)
//logging.SetLogLevel("p2p-gorpc", l)
//logging.SetLogLevel("swarm2", l)
//logging.SetLogLevel("libp2p-raft", l)

View File

@ -53,7 +53,10 @@ func TestClustersPeerAdd(t *testing.T) {
}
h, _ := cid.Decode(testCid)
clusters[1].Pin(h)
err := clusters[1].Pin(h)
if err != nil {
t.Fatal(err)
}
delay()
f := func(t *testing.T, c *Cluster) {
@ -67,6 +70,7 @@ func TestClustersPeerAdd(t *testing.T) {
// Check that they are part of the consensus
pins := c.Pins()
if len(pins) != 1 {
t.Log(pins)
t.Error("expected 1 pin everywhere")
}

207
raft.go
View File

@ -1,8 +1,12 @@
package ipfscluster
import (
"context"
"errors"
"io/ioutil"
"path/filepath"
"strings"
"time"
hashiraft "github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
@ -11,9 +15,21 @@ import (
libp2praft "github.com/libp2p/go-libp2p-raft"
)
// libp2pRaftWrap wraps the stuff that we need to run
// hashicorp raft. We carry it around for convenience
type libp2pRaftWrap struct {
// DefaultRaftConfig allows to tweak Raft configuration used by Cluster from
// from the outside.
var DefaultRaftConfig = hashiraft.DefaultConfig()
// RaftMaxSnapshots indicates how many snapshots to keep in the consensus data
// folder.
var RaftMaxSnapshots = 5
// is this running 64 bits arch? https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ
const sixtyfour = uint64(^uint(0)) == ^uint64(0)
// Raft performs all Raft-specific operations which are needed by Cluster but
// are not fulfilled by the consensus interface. It should contain most of the
// Raft-related stuff so it can be easily replaced in the future, if need be.
type Raft struct {
raft *hashiraft.Raft
transport *libp2praft.Libp2pTransport
snapshotStore hashiraft.SnapshotStore
@ -23,63 +39,62 @@ type libp2pRaftWrap struct {
boltdb *raftboltdb.BoltStore
}
// This function does all heavy the work which is specifically related to
// hashicorp's Raft. Other places should just rely on the Consensus interface.
func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) (*libp2praft.Consensus, *libp2praft.Actor, *libp2pRaftWrap, error) {
func defaultRaftConfig() *hashiraft.Config {
// These options are imposed over any Default Raft Config.
// Changing them causes cluster peers difficult-to-understand,
// behaviours, usually around the add/remove of peers.
// That means that changing them will make users wonder why something
// does not work the way it is expected to.
// i.e. ShutdownOnRemove will cause that no snapshot will be taken
// when trying to shutdown a peer after removing it from a cluster.
DefaultRaftConfig.DisableBootstrapAfterElect = false
DefaultRaftConfig.EnableSingleNode = true
DefaultRaftConfig.ShutdownOnRemove = false
// Set up logging
DefaultRaftConfig.LogOutput = ioutil.Discard
DefaultRaftConfig.Logger = raftStdLogger // see logging.go
return DefaultRaftConfig
}
// NewRaft launches a go-libp2p-raft consensus peer.
func NewRaft(peers []peer.ID, host host.Host, dataFolder string, fsm hashiraft.FSM) (*Raft, error) {
logger.Debug("creating libp2p Raft transport")
transport, err := libp2praft.NewLibp2pTransportWithHost(host)
if err != nil {
logger.Error("creating libp2p-raft transport: ", err)
return nil, nil, nil, err
return nil, err
}
//logger.Debug("opening connections")
//transport.OpenConns()
pstore := &libp2praft.Peerstore{}
strPeers := []string{peer.IDB58Encode(host.ID())}
for _, addr := range cfg.ClusterPeers {
p, _, err := multiaddrSplit(addr)
if err != nil {
return nil, nil, nil, err
}
strPeers = append(strPeers, p.Pretty())
peersStr := make([]string, len(peers), len(peers))
for i, p := range peers {
peersStr[i] = peer.IDB58Encode(p)
}
pstore.SetPeers(strPeers)
logger.Debug("creating OpLog")
cons := libp2praft.NewOpLog(state, op)
raftCfg := cfg.RaftConfig
if raftCfg == nil {
raftCfg = hashiraft.DefaultConfig()
raftCfg.EnableSingleNode = raftSingleMode
}
raftCfg.LogOutput = ioutil.Discard
raftCfg.Logger = raftStdLogger
pstore.SetPeers(peersStr)
logger.Debug("creating file snapshot store")
snapshots, err := hashiraft.NewFileSnapshotStoreWithLogger(cfg.ConsensusDataFolder, maxSnapshots, raftStdLogger)
snapshots, err := hashiraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, raftStdLogger)
if err != nil {
logger.Error("creating file snapshot store: ", err)
return nil, nil, nil, err
return nil, err
}
logger.Debug("creating BoltDB log store")
logStore, err := raftboltdb.NewBoltStore(filepath.Join(cfg.ConsensusDataFolder, "raft.db"))
logStore, err := raftboltdb.NewBoltStore(filepath.Join(dataFolder, "raft.db"))
if err != nil {
logger.Error("creating bolt store: ", err)
return nil, nil, nil, err
return nil, err
}
logger.Debug("creating Raft")
r, err := hashiraft.NewRaft(raftCfg, cons.FSM(), logStore, logStore, snapshots, pstore, transport)
r, err := hashiraft.NewRaft(defaultRaftConfig(), fsm, logStore, logStore, snapshots, pstore, transport)
if err != nil {
logger.Error("initializing raft: ", err)
return nil, nil, nil, err
return nil, err
}
return cons, libp2praft.NewActor(r), &libp2pRaftWrap{
return &Raft{
raft: r,
transport: transport,
snapshotStore: snapshots,
@ -89,3 +104,123 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp)
boltdb: logStore,
}, nil
}
// WaitForLeader holds until Raft says we have a leader
func (r *Raft) WaitForLeader(ctx context.Context) {
// Using Raft observers panics on non-64 architectures.
// This is a work around
if sixtyfour {
r.waitForLeader(ctx)
} else {
r.waitForLeaderLegacy(ctx)
}
}
func (r *Raft) waitForLeader(ctx context.Context) {
obsCh := make(chan hashiraft.Observation)
filter := func(o *hashiraft.Observation) bool {
switch o.Data.(type) {
case hashiraft.LeaderObservation:
return true
default:
return false
}
}
observer := hashiraft.NewObserver(obsCh, true, filter)
r.raft.RegisterObserver(observer)
defer r.raft.DeregisterObserver(observer)
select {
case obs := <-obsCh:
leaderObs := obs.Data.(hashiraft.LeaderObservation)
logger.Infof("Raft Leader elected: %s", leaderObs.Leader)
case <-ctx.Done():
return
}
}
func (r *Raft) waitForLeaderLegacy(ctx context.Context) {
for {
leader := r.raft.Leader()
if leader != "" {
logger.Infof("Raft Leader elected: %s", leader)
return
}
select {
case <-ctx.Done():
return
default:
time.Sleep(500 * time.Millisecond)
}
}
}
// WaitForUpdates holds until Raft has synced to the last index in the log
func (r *Raft) WaitForUpdates(ctx context.Context) {
logger.Debug("Raft state is catching up")
for {
select {
case <-ctx.Done():
return
default:
lai := r.raft.AppliedIndex()
li := r.raft.LastIndex()
logger.Debugf("current Raft index: %d/%d",
lai, li)
if lai == li {
return
}
time.Sleep(500 * time.Millisecond)
}
}
}
// Snapshot tells Raft to take a snapshot.
func (r *Raft) Snapshot() error {
future := r.raft.Snapshot()
err := future.Error()
if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") {
return errors.New("could not take snapshot: " + err.Error())
}
return nil
}
// Shutdown shutdown Raft and closes the BoltDB.
func (r *Raft) Shutdown() error {
future := r.raft.Shutdown()
err := future.Error()
errMsgs := ""
if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
}
err = r.boltdb.Close() // important!
if err != nil {
errMsgs += "could not close boltdb: " + err.Error()
}
if errMsgs != "" {
return errors.New(errMsgs)
}
return nil
}
// AddPeer adds a peer to Raft
func (r *Raft) AddPeer(peer string) error {
future := r.raft.AddPeer(peer)
err := future.Error()
return err
}
// RemovePeer removes a peer from Raft
func (r *Raft) RemovePeer(peer string) error {
future := r.raft.RemovePeer(peer)
err := future.Error()
return err
}
// Leader returns Raft's leader. It may be an empty string if
// there is no leader or it is unknown.
func (r *Raft) Leader() string {
return r.raft.Leader()
}

View File

@ -297,7 +297,6 @@ func (api *RPCAPI) ConsensusLogUnpin(in *CidArg, out *struct{}) error {
/*
Peer Manager methods
*/
// PeerManagerAddPeer runs peerManager.addPeer().