Avoid shutting down consensus in the middle of a commit
I think this will prevents some random tests failures when we realize that we are not anymore in the peerset and trigger a shutdown but Raft has not finished fully committing the operation, which then triggers an error, and a retry. But the contexts are cancelled in the retry so it won't find a leader and will error finally error with that message. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
0525403f8c
commit
417f30c9ea
|
@ -272,7 +272,9 @@ func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) er
|
|||
// Being here means we are the LEADER. We can commit.
|
||||
|
||||
// now commit the changes to our state
|
||||
cc.shutdownLock.Lock() // do not shut down while committing
|
||||
_, finalErr = cc.consensus.CommitOp(op)
|
||||
cc.shutdownLock.Unlock()
|
||||
if finalErr != nil {
|
||||
goto RETRY
|
||||
}
|
||||
|
@ -326,7 +328,9 @@ func (cc *Consensus) AddPeer(pid peer.ID) error {
|
|||
return err
|
||||
}
|
||||
// Being here means we are the leader and can commit
|
||||
cc.shutdownLock.Lock() // do not shutdown while committing
|
||||
finalErr = cc.raft.AddPeer(peer.IDB58Encode(pid))
|
||||
cc.shutdownLock.Unlock()
|
||||
if finalErr != nil {
|
||||
time.Sleep(cc.config.CommitRetryDelay)
|
||||
continue
|
||||
|
@ -344,14 +348,16 @@ func (cc *Consensus) RmPeer(pid peer.ID) error {
|
|||
for i := 0; i <= cc.config.CommitRetries; i++ {
|
||||
logger.Debugf("attempt #%d: RmPeer %s", i, pid.Pretty())
|
||||
if finalErr != nil {
|
||||
logger.Errorf("retrying to add peer. Attempt #%d failed: %s", i, finalErr)
|
||||
logger.Errorf("retrying to remove peer. Attempt #%d failed: %s", i, finalErr)
|
||||
}
|
||||
ok, err := cc.redirectToLeader("ConsensusRmPeer", pid)
|
||||
if err != nil || ok {
|
||||
return err
|
||||
}
|
||||
// Being here means we are the leader and can commit
|
||||
cc.shutdownLock.Lock() // do not shutdown while committing
|
||||
finalErr = cc.raft.RemovePeer(peer.IDB58Encode(pid))
|
||||
cc.shutdownLock.Unlock()
|
||||
if finalErr != nil {
|
||||
time.Sleep(cc.config.CommitRetryDelay)
|
||||
continue
|
||||
|
|
2
debug.go
2
debug.go
|
@ -12,7 +12,7 @@ func init() {
|
|||
//SetFacilityLogLevel("consensus", l)
|
||||
//SetFacilityLogLevel("monitor", "INFO")
|
||||
//SetFacilityLogLevel("raft", l)
|
||||
SetFacilityLogLevel("p2p-gorpc", l)
|
||||
//SetFacilityLogLevel("p2p-gorpc", l)
|
||||
//SetFacilityLogLevel("swarm2", l)
|
||||
//SetFacilityLogLevel("libp2p-raft", l)
|
||||
}
|
||||
|
|
|
@ -230,6 +230,57 @@ func TestClustersPeerRemoveSelf(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClustersPeerRemoveLeader(t *testing.T) {
|
||||
// this test is like the one above, except it always
|
||||
// removes the current leader.
|
||||
// this test hangs sometimes if there are problems
|
||||
clusters, mocks := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mocks)
|
||||
|
||||
findLeader := func() *Cluster {
|
||||
var l peer.ID
|
||||
for _, c := range clusters {
|
||||
if !c.shutdownB {
|
||||
waitForLeader(t, clusters)
|
||||
l, _ = c.consensus.Leader()
|
||||
}
|
||||
}
|
||||
for _, c := range clusters {
|
||||
if c.id == l {
|
||||
return c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < len(clusters); i++ {
|
||||
leader := findLeader()
|
||||
peers := leader.Peers()
|
||||
t.Logf("Current cluster size: %d", len(peers))
|
||||
if len(peers) != (len(clusters) - i) {
|
||||
t.Fatal("Previous peers not removed correctly")
|
||||
}
|
||||
err := leader.PeerRemove(leader.id)
|
||||
// Last peer member won't be able to remove itself
|
||||
// In this case, we shut it down.
|
||||
if err != nil {
|
||||
if i != len(clusters)-1 { //not last
|
||||
t.Error(err)
|
||||
} else {
|
||||
err := leader.Shutdown()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
_, more := <-leader.Done()
|
||||
if more {
|
||||
t.Error("should be done")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
||||
clusters, mocks := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mocks)
|
||||
|
|
Loading…
Reference in New Issue
Block a user