eafc747305
Snapshot saving state commands (upgrade and import) now save raft config peers as consensus peers in snapshot. Snapshot index 1 -> 2 when saving from a fresh import to force replication when bootstrapping. License: MIT Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
610 lines
16 KiB
Go
610 lines
16 KiB
Go
package raft
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
hraft "github.com/hashicorp/raft"
|
|
raftboltdb "github.com/hashicorp/raft-boltdb"
|
|
host "github.com/libp2p/go-libp2p-host"
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
p2praft "github.com/libp2p/go-libp2p-raft"
|
|
|
|
"github.com/ipfs/ipfs-cluster/state"
|
|
)
|
|
|
|
// errBadRaftState is returned when the consensus component cannot start
|
|
// because the cluster peers do not match the raft peers.
|
|
var errBadRaftState = errors.New("cluster peers do not match raft peers")
|
|
|
|
// ErrWaitingForSelf is returned when we are waiting for ourselves to depart
|
|
// the peer set, which won't happen
|
|
var errWaitingForSelf = errors.New("waiting for ourselves to depart")
|
|
|
|
// RaftMaxSnapshots indicates how many snapshots to keep in the consensus data
|
|
// folder.
|
|
// TODO: Maybe include this in Config. Not sure how useful it is to touch
|
|
// this anyways.
|
|
var RaftMaxSnapshots = 5
|
|
|
|
// RaftLogCacheSize is the maximum number of logs to cache in-memory.
|
|
// This is used to reduce disk I/O for the recently committed entries.
|
|
var RaftLogCacheSize = 512
|
|
|
|
// Are we compiled on a 64-bit architecture?
|
|
// https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ
|
|
// This is used below because raft Observers panic on 32-bit.
|
|
const sixtyfour = uint64(^uint(0)) == ^uint64(0)
|
|
|
|
// How long we wait for updates during shutdown before snapshotting
|
|
var waitForUpdatesShutdownTimeout = 5 * time.Second
|
|
var waitForUpdatesInterval = 100 * time.Millisecond
|
|
|
|
// How many times to retry snapshotting when shutting down
|
|
var maxShutdownSnapshotRetries = 5
|
|
|
|
// raftWrapper performs all Raft-specific operations which are needed by
|
|
// Cluster but are not fulfilled by the consensus interface. It should contain
|
|
// most of the Raft-related stuff so it can be easily replaced in the future,
|
|
// if need be.
|
|
type raftWrapper struct {
|
|
raft *hraft.Raft
|
|
dataFolder string
|
|
srvConfig hraft.Configuration
|
|
transport *hraft.NetworkTransport
|
|
snapshotStore hraft.SnapshotStore
|
|
logStore hraft.LogStore
|
|
stableStore hraft.StableStore
|
|
boltdb *raftboltdb.BoltStore
|
|
}
|
|
|
|
// newRaft launches a go-libp2p-raft consensus peer.
|
|
func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM) (*raftWrapper, error) {
|
|
// Set correct LocalID
|
|
cfg.RaftConfig.LocalID = hraft.ServerID(peer.IDB58Encode(host.ID()))
|
|
|
|
// Prepare data folder
|
|
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
srvCfg := makeServerConf(peers)
|
|
|
|
logger.Debug("creating libp2p Raft transport")
|
|
transport, err := p2praft.NewLibp2pTransport(host, cfg.NetworkTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var log hraft.LogStore
|
|
var stable hraft.StableStore
|
|
var snap hraft.SnapshotStore
|
|
|
|
logger.Debug("creating raft snapshot store")
|
|
snapstore, err := hraft.NewFileSnapshotStoreWithLogger(
|
|
dataFolder, RaftMaxSnapshots, raftStdLogger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logger.Debug("creating BoltDB store")
|
|
store, err := raftboltdb.NewBoltStore(
|
|
filepath.Join(dataFolder, "raft.db"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// wraps the store in a LogCache to improve performance.
|
|
// See consul/agent/consul/serger.go
|
|
cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stable = store
|
|
log = cacheStore
|
|
snap = snapstore
|
|
|
|
logger.Debug("checking for existing raft states")
|
|
hasState, err := hraft.HasExistingState(log, stable, snap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !hasState {
|
|
logger.Info("initializing raft cluster")
|
|
err := hraft.BootstrapCluster(cfg.RaftConfig,
|
|
log, stable, snap, transport, srvCfg)
|
|
if err != nil {
|
|
logger.Error("bootstrapping cluster: ", err)
|
|
return nil, err
|
|
}
|
|
} else {
|
|
logger.Debug("raft cluster is already initialized")
|
|
}
|
|
|
|
logger.Debug("creating Raft")
|
|
r, err := hraft.NewRaft(cfg.RaftConfig,
|
|
fsm, log, stable, snap, transport)
|
|
if err != nil {
|
|
logger.Error("initializing raft: ", err)
|
|
return nil, err
|
|
}
|
|
|
|
raftW := &raftWrapper{
|
|
raft: r,
|
|
dataFolder: dataFolder,
|
|
srvConfig: srvCfg,
|
|
transport: transport,
|
|
snapshotStore: snap,
|
|
logStore: log,
|
|
stableStore: stable,
|
|
boltdb: store,
|
|
}
|
|
|
|
// Handle existing, different configuration
|
|
if hasState {
|
|
cf := r.GetConfiguration()
|
|
if err := cf.Error(); err != nil {
|
|
return nil, err
|
|
}
|
|
currentCfg := cf.Configuration()
|
|
added, removed := diffConfigurations(srvCfg, currentCfg)
|
|
if len(added)+len(removed) > 0 {
|
|
raftW.Shutdown()
|
|
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
|
|
logger.Error("Raft peers do not match cluster peers from the configuration.")
|
|
logger.Error("This likely indicates that this peer has left the cluster and/or")
|
|
logger.Error("has a dirty state. Clean the raft state for this peer")
|
|
logger.Errorf("(%s)", dataFolder)
|
|
logger.Error("bootstrap it to a working cluster.")
|
|
logger.Error("Raft peers:")
|
|
for _, s := range currentCfg.Servers {
|
|
logger.Errorf(" - %s", s.ID)
|
|
}
|
|
logger.Error("Cluster configuration peers:")
|
|
for _, s := range srvCfg.Servers {
|
|
logger.Errorf(" - %s", s.ID)
|
|
}
|
|
logger.Errorf("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
|
|
return nil, errBadRaftState
|
|
//return nil, errors.New("Bad cluster peers")
|
|
}
|
|
}
|
|
|
|
return raftW, nil
|
|
}
|
|
|
|
// returns the folder path after creating it.
|
|
// if folder is empty, it uses baseDir+Default.
|
|
func makeDataFolder(baseDir, folder string) (string, error) {
|
|
if folder == "" {
|
|
folder = filepath.Join(baseDir, DefaultDataSubFolder)
|
|
}
|
|
|
|
err := os.MkdirAll(folder, 0700)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return folder, nil
|
|
}
|
|
|
|
// create Raft servers configuration
|
|
func makeServerConf(peers []peer.ID) hraft.Configuration {
|
|
sm := make(map[string]struct{})
|
|
|
|
servers := make([]hraft.Server, 0)
|
|
for _, pid := range peers {
|
|
p := peer.IDB58Encode(pid)
|
|
_, ok := sm[p]
|
|
if !ok { // avoid dups
|
|
sm[p] = struct{}{}
|
|
servers = append(servers, hraft.Server{
|
|
Suffrage: hraft.Voter,
|
|
ID: hraft.ServerID(p),
|
|
Address: hraft.ServerAddress(p),
|
|
})
|
|
}
|
|
}
|
|
return hraft.Configuration{
|
|
Servers: servers,
|
|
}
|
|
}
|
|
|
|
// diffConfigurations returns the serverIDs added and removed from
|
|
// c2 in relation to c1.
|
|
func diffConfigurations(
|
|
c1, c2 hraft.Configuration) (added, removed []hraft.ServerID) {
|
|
m1 := make(map[hraft.ServerID]struct{})
|
|
m2 := make(map[hraft.ServerID]struct{})
|
|
added = make([]hraft.ServerID, 0)
|
|
removed = make([]hraft.ServerID, 0)
|
|
for _, s := range c1.Servers {
|
|
m1[s.ID] = struct{}{}
|
|
}
|
|
for _, s := range c2.Servers {
|
|
m2[s.ID] = struct{}{}
|
|
}
|
|
for k := range m1 {
|
|
_, ok := m2[k]
|
|
if !ok {
|
|
removed = append(removed, k)
|
|
}
|
|
}
|
|
for k := range m2 {
|
|
_, ok := m1[k]
|
|
if !ok {
|
|
added = append(added, k)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// WaitForLeader holds until Raft says we have a leader.
|
|
// Returns uf ctx is cancelled.
|
|
func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) {
|
|
obsCh := make(chan hraft.Observation, 1)
|
|
if sixtyfour { // 32-bit systems don't support observers
|
|
observer := hraft.NewObserver(obsCh, false, nil)
|
|
rw.raft.RegisterObserver(observer)
|
|
defer rw.raft.DeregisterObserver(observer)
|
|
}
|
|
ticker := time.NewTicker(time.Second / 2)
|
|
for {
|
|
select {
|
|
case obs := <-obsCh:
|
|
_ = obs
|
|
// See https://github.com/hashicorp/raft/issues/254
|
|
// switch obs.Data.(type) {
|
|
// case hraft.LeaderObservation:
|
|
// lObs := obs.Data.(hraft.LeaderObservation)
|
|
// logger.Infof("Raft Leader elected: %s",
|
|
// lObs.Leader)
|
|
// return string(lObs.Leader), nil
|
|
// }
|
|
case <-ticker.C:
|
|
if l := rw.raft.Leader(); l != "" {
|
|
logger.Debug("waitForleaderTimer")
|
|
logger.Infof("Current Raft Leader: %s", l)
|
|
ticker.Stop()
|
|
return string(l), nil
|
|
}
|
|
case <-ctx.Done():
|
|
return "", ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForUpdates holds until Raft has synced to the last index in the log
|
|
func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
|
|
logger.Debug("Raft state is catching up to the latest known version. Please wait...")
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
lai := rw.raft.AppliedIndex()
|
|
li := rw.raft.LastIndex()
|
|
logger.Debugf("current Raft index: %d/%d",
|
|
lai, li)
|
|
if lai == li {
|
|
return nil
|
|
}
|
|
time.Sleep(waitForUpdatesInterval)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rw *raftWrapper) WaitForPeer(ctx context.Context, pid string, depart bool) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
peers, err := rw.Peers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(peers) == 1 && pid == peers[0] && depart {
|
|
return errWaitingForSelf
|
|
}
|
|
|
|
found := find(peers, pid)
|
|
|
|
// departing
|
|
if depart && !found {
|
|
return nil
|
|
}
|
|
|
|
// joining
|
|
if !depart && found {
|
|
return nil
|
|
}
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Snapshot tells Raft to take a snapshot.
|
|
func (rw *raftWrapper) Snapshot() error {
|
|
future := rw.raft.Snapshot()
|
|
err := future.Error()
|
|
if err != nil && err.Error() != hraft.ErrNothingNewToSnapshot.Error() {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// snapshotOnShutdown attempts to take a snapshot before a shutdown.
|
|
// Snapshotting might fail if the raft applied index is not the last index.
|
|
// This waits for the updates and tries to take a snapshot when the
|
|
// applied index is up to date.
|
|
// It will retry if the snapshot still fails, in case more updates have arrived.
|
|
// If waiting for updates times-out, it will not try anymore, since something
|
|
// is wrong. This is a best-effort solution as there is no way to tell Raft
|
|
// to stop processing entries because we want to take a snapshot before
|
|
// shutting down.
|
|
func (rw *raftWrapper) snapshotOnShutdown() error {
|
|
var err error
|
|
for i := 0; i < maxShutdownSnapshotRetries; i++ {
|
|
done := false
|
|
ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout)
|
|
err := rw.WaitForUpdates(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
logger.Warning("timed out waiting for state updates before shutdown. Snapshotting may fail")
|
|
done = true // let's not wait for updates again
|
|
}
|
|
|
|
err = rw.Snapshot()
|
|
if err != nil {
|
|
err = errors.New("could not snapshot raft: " + err.Error())
|
|
} else {
|
|
err = nil
|
|
done = true
|
|
}
|
|
|
|
if done {
|
|
break
|
|
}
|
|
logger.Warningf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Shutdown shutdown Raft and closes the BoltDB.
|
|
func (rw *raftWrapper) Shutdown() error {
|
|
errMsgs := ""
|
|
|
|
err := rw.snapshotOnShutdown()
|
|
if err != nil {
|
|
errMsgs += err.Error() + ".\n"
|
|
}
|
|
|
|
future := rw.raft.Shutdown()
|
|
err = future.Error()
|
|
if err != nil {
|
|
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
|
|
}
|
|
|
|
err = rw.boltdb.Close() // important!
|
|
if err != nil {
|
|
errMsgs += "could not close boltdb: " + err.Error()
|
|
}
|
|
|
|
if errMsgs != "" {
|
|
return errors.New(errMsgs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddPeer adds a peer to Raft
|
|
func (rw *raftWrapper) AddPeer(peer string) error {
|
|
// Check that we don't have it to not waste
|
|
// log entries if so.
|
|
peers, err := rw.Peers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if find(peers, peer) {
|
|
logger.Infof("%s is already a raft peer", peer)
|
|
return nil
|
|
}
|
|
|
|
future := rw.raft.AddVoter(
|
|
hraft.ServerID(peer),
|
|
hraft.ServerAddress(peer),
|
|
0,
|
|
0) // TODO: Extra cfg value?
|
|
err = future.Error()
|
|
if err != nil {
|
|
logger.Error("raft cannot add peer: ", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// RemovePeer removes a peer from Raft
|
|
func (rw *raftWrapper) RemovePeer(peer string) error {
|
|
// Check that we have it to not waste
|
|
// log entries if we don't.
|
|
peers, err := rw.Peers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !find(peers, peer) {
|
|
logger.Infof("%s is not among raft peers", peer)
|
|
return nil
|
|
}
|
|
|
|
if len(peers) == 1 && peers[0] == peer {
|
|
return errors.New("cannot remove ourselves from a 1-peer cluster")
|
|
}
|
|
|
|
rmFuture := rw.raft.RemoveServer(
|
|
hraft.ServerID(peer),
|
|
0,
|
|
0) // TODO: Extra cfg value?
|
|
err = rmFuture.Error()
|
|
if err != nil {
|
|
logger.Error("raft cannot remove peer: ", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Leader returns Raft's leader. It may be an empty string if
|
|
// there is no leader or it is unknown.
|
|
func (rw *raftWrapper) Leader() string {
|
|
return string(rw.raft.Leader())
|
|
}
|
|
|
|
func (rw *raftWrapper) Peers() ([]string, error) {
|
|
ids := make([]string, 0)
|
|
|
|
configFuture := rw.raft.GetConfiguration()
|
|
if err := configFuture.Error(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, server := range configFuture.Configuration().Servers {
|
|
ids = append(ids, string(server.ID))
|
|
}
|
|
|
|
return ids, nil
|
|
}
|
|
|
|
// latestSnapshot looks for the most recent raft snapshot stored at the
|
|
// provided basedir. It returns a boolean indicating if any snapshot is
|
|
// readable, the snapshot's metadata, and a reader to the snapshot's bytes
|
|
func latestSnapshot(raftDataFolder string) (*hraft.SnapshotMeta, io.ReadCloser, error) {
|
|
store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
snapMetas, err := store.List()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if len(snapMetas) == 0 { // no error if snapshot isn't found
|
|
return nil, nil, nil
|
|
}
|
|
meta, r, err := store.Open(snapMetas[0].ID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return meta, r, nil
|
|
}
|
|
|
|
// LastStateRaw returns the bytes of the last snapshot stored, its metadata,
|
|
// and a flag indicating whether any snapshot was found.
|
|
func LastStateRaw(cfg *Config) (io.Reader, bool, error) {
|
|
// Read most recent snapshot
|
|
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
meta, r, err := latestSnapshot(dataFolder)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
if meta == nil { // no snapshots could be read
|
|
return nil, false, nil
|
|
}
|
|
return r, true, nil
|
|
}
|
|
|
|
// SnapshotSave saves the provided state to a snapshot in the
|
|
// raft data path. Old raft data is backed up and replaced
|
|
// by the new snapshot. pids contains the config-specified
|
|
// peer ids to include in the snapshot metadata if no snapshot exists
|
|
// from which to copy the raft metadata
|
|
func SnapshotSave(cfg *Config, newState state.State, pids []peer.ID) error {
|
|
newStateBytes, err := p2praft.EncodeSnapshot(newState)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
meta, _, err := latestSnapshot(dataFolder)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// make a new raft snapshot
|
|
var raftSnapVersion hraft.SnapshotVersion
|
|
raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1
|
|
configIndex := uint64(1)
|
|
var raftIndex uint64
|
|
var raftTerm uint64
|
|
var srvCfg hraft.Configuration
|
|
if meta != nil {
|
|
raftIndex = meta.Index
|
|
raftTerm = meta.Term
|
|
srvCfg = meta.Configuration
|
|
CleanupRaft(dataFolder)
|
|
} else {
|
|
// Begin the log after the index of a fresh start so that
|
|
// the snapshot's state propagate's during bootstrap
|
|
raftIndex = uint64(2)
|
|
raftTerm = uint64(1)
|
|
srvCfg = makeServerConf(pids)
|
|
}
|
|
|
|
snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, dummyTransport := hraft.NewInmemTransport("")
|
|
|
|
sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = sink.Write(newStateBytes)
|
|
if err != nil {
|
|
sink.Cancel()
|
|
return err
|
|
}
|
|
err = sink.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CleanupRaft moves the current data folder to a backup location
|
|
func CleanupRaft(dataFolder string) error {
|
|
dbh := newDataBackupHelper(dataFolder)
|
|
err := dbh.makeBackup()
|
|
if err != nil {
|
|
logger.Warning(err)
|
|
logger.Warning("the state could not be cleaned properly")
|
|
logger.Warning("manual intervention may be needed before starting cluster again")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// only call when Raft is shutdown
|
|
func (rw *raftWrapper) Clean() error {
|
|
return CleanupRaft(rw.dataFolder)
|
|
}
|
|
|
|
func find(s []string, elem string) bool {
|
|
for _, selem := range s {
|
|
if selem == elem {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|