Raft: add cachestore for the log store

Just like consul does it

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-10-30 14:00:02 +01:00
parent 107ad3bdfc
commit 74ed634653
2 changed files with 30 additions and 22 deletions

View File

@ -160,7 +160,6 @@ func (cc *Consensus) Shutdown() error {
err := cc.raft.Shutdown()
if err != nil {
logger.Error(err)
return err
}
cc.shutdown = true
cc.cancel()

View File

@ -20,6 +20,10 @@ import (
// this anyways.
var RaftMaxSnapshots = 5
// RaftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently committed entries.
var RaftLogCacheSize = 512
// Are we compiled on a 64-bit architecture?
// https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ
// This is used below because raft Observers panic on 32-bit.
@ -57,29 +61,44 @@ func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM)
return nil, err
}
var log hraft.LogStore
var stable hraft.StableStore
var snap hraft.SnapshotStore
logger.Debug("creating raft snapshot store")
snapshots, err := hraft.NewFileSnapshotStoreWithLogger(
snapstore, err := hraft.NewFileSnapshotStoreWithLogger(
dataFolder, RaftMaxSnapshots, raftStdLogger)
if err != nil {
return nil, err
}
logger.Debug("creating BoltDB log store")
logStore, err := raftboltdb.NewBoltStore(
logger.Debug("creating BoltDB store")
store, err := raftboltdb.NewBoltStore(
filepath.Join(dataFolder, "raft.db"))
if err != nil {
return nil, err
}
// wraps the store in a LogCache to improve performance.
// See consul/agent/consul/serger.go
cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store)
if err != nil {
return nil, err
}
stable = store
log = cacheStore
snap = snapstore
logger.Debug("checking for existing raft states")
hasState, err := hraft.HasExistingState(logStore, logStore, snapshots)
hasState, err := hraft.HasExistingState(log, stable, snap)
if err != nil {
return nil, err
}
if !hasState {
logger.Info("bootstrapping raft cluster")
err := hraft.BootstrapCluster(cfg.RaftConfig,
logStore, logStore, snapshots, transport, srvCfg)
log, stable, snap, transport, srvCfg)
if err != nil {
logger.Error("bootstrapping cluster: ", err)
return nil, err
@ -90,7 +109,7 @@ func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM)
logger.Debug("creating Raft")
r, err := hraft.NewRaft(cfg.RaftConfig,
fsm, logStore, logStore, snapshots, transport)
fsm, log, stable, snap, transport)
if err != nil {
logger.Error("initializing raft: ", err)
return nil, err
@ -100,10 +119,10 @@ func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM)
raft: r,
srvConfig: srvCfg,
transport: transport,
snapshotStore: snapshots,
logStore: logStore,
stableStore: logStore,
boltdb: logStore,
snapshotStore: snap,
logStore: log,
stableStore: stable,
boltdb: store,
}
// Handle existing, different configuration
@ -119,7 +138,7 @@ func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM)
logger.Warning("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.Warning("Raft peers do not match cluster peers from the configuration.")
logger.Warning("If problems arise, clean this peer and bootstrap it to a working cluster.")
logger.Warning("Raft peers peers:")
logger.Warning("Raft peers:")
for _, s := range currentCfg.Servers {
logger.Warningf(" - %s", s.ID)
}
@ -338,16 +357,6 @@ func (rw *raftWrapper) RemovePeer(peer string) error {
return err
}
// make sure change is applied everywhere before continuing
// this makes sure that a leaving node gets the memo
// before we shut it down.
bFuture := rw.raft.Barrier(10 * time.Second)
err = bFuture.Error()
if err != nil {
logger.Error(err)
return err
}
return nil
}