Issue #131: Destroy raft data when the peer has been removed
License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
7a5f8f184b
commit
c912cfd205
12
cluster.go
12
cluster.go
|
@ -51,7 +51,7 @@ type Cluster struct {
|
|||
|
||||
shutdownLock sync.Mutex
|
||||
shutdownB bool
|
||||
shutdownOnce sync.Once
|
||||
removed bool
|
||||
doneCh chan struct{}
|
||||
readyCh chan struct{}
|
||||
readyB bool
|
||||
|
@ -108,6 +108,7 @@ func NewCluster(
|
|||
allocator: allocator,
|
||||
informer: informer,
|
||||
shutdownB: false,
|
||||
removed: false,
|
||||
doneCh: make(chan struct{}),
|
||||
readyCh: make(chan struct{}),
|
||||
readyB: false,
|
||||
|
@ -437,6 +438,7 @@ func (c *Cluster) Shutdown() error {
|
|||
if err != nil {
|
||||
logger.Error("leaving cluster: " + err.Error())
|
||||
}
|
||||
c.removed = true
|
||||
}
|
||||
|
||||
// Cancel contexts
|
||||
|
@ -456,6 +458,14 @@ func (c *Cluster) Shutdown() error {
|
|||
c.backupState()
|
||||
}
|
||||
|
||||
// We left the cluster or were removed. Destroy the Raft state.
|
||||
if c.removed && c.readyB {
|
||||
err := c.consensus.Clean()
|
||||
if err != nil {
|
||||
logger.Error("cleaning consensus: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.monitor.Shutdown(); err != nil {
|
||||
logger.Errorf("error stopping monitor: %s", err)
|
||||
return err
|
||||
|
|
|
@ -375,6 +375,21 @@ func (cc *Consensus) Leader() (peer.ID, error) {
|
|||
return raftactor.Leader()
|
||||
}
|
||||
|
||||
// Clean removes all raft data from disk. Next time
|
||||
// a full new peer will be bootstrapped.
|
||||
func (cc *Consensus) Clean() error {
|
||||
if !cc.shutdown {
|
||||
return errors.New("consensus component is not shutdown")
|
||||
}
|
||||
|
||||
err := cc.raft.Clean()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Info("consensus data cleaned")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rollback replaces the current agreed-upon
|
||||
// state with the state provided. Only the consensus leader
|
||||
// can perform this operation.
|
||||
|
|
|
@ -43,6 +43,7 @@ const sixtyfour = uint64(^uint(0)) == ^uint64(0)
|
|||
// if need be.
|
||||
type raftWrapper struct {
|
||||
raft *hraft.Raft
|
||||
dataFolder string
|
||||
srvConfig hraft.Configuration
|
||||
transport *hraft.NetworkTransport
|
||||
snapshotStore hraft.SnapshotStore
|
||||
|
@ -125,6 +126,7 @@ func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM)
|
|||
|
||||
raftW := &raftWrapper{
|
||||
raft: r,
|
||||
dataFolder: dataFolder,
|
||||
srvConfig: srvCfg,
|
||||
transport: transport,
|
||||
snapshotStore: snap,
|
||||
|
@ -425,6 +427,11 @@ func (rw *raftWrapper) Peers() ([]string, error) {
|
|||
return ids, nil
|
||||
}
|
||||
|
||||
// only call when Raft is shutdown
|
||||
func (rw *raftWrapper) Clean() error {
|
||||
return os.RemoveAll(rw.dataFolder)
|
||||
}
|
||||
|
||||
func find(s []string, elem string) bool {
|
||||
for _, selem := range s {
|
||||
if selem == elem {
|
||||
|
|
|
@ -54,6 +54,8 @@ type Consensus interface {
|
|||
// Only returns when the consensus state has all log
|
||||
// updates applied to it
|
||||
WaitForSync() error
|
||||
// Clean removes all consensus data
|
||||
Clean() error
|
||||
}
|
||||
|
||||
// API is a component which offers an API for Cluster. This is
|
||||
|
|
|
@ -77,6 +77,7 @@ func (pm *peerManager) rmPeer(pid peer.ID, save bool) error {
|
|||
pm.cluster.config.Bootstrap = pm.peersAddrs()
|
||||
pm.resetPeers()
|
||||
time.Sleep(1 * time.Second)
|
||||
pm.cluster.removed = true
|
||||
// should block and do nothing if already doing it
|
||||
pm.cluster.Shutdown()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user