Merge pull request #458 from ipfs/fix/340-races
Issue #340: fix some data races
This commit is contained in:
commit
0f2a978bcd
|
@ -406,7 +406,9 @@ This might be due to one or several causes:
|
||||||
}
|
}
|
||||||
|
|
||||||
close(c.readyCh)
|
close(c.readyCh)
|
||||||
|
c.shutdownLock.Lock()
|
||||||
c.readyB = true
|
c.readyB = true
|
||||||
|
c.shutdownLock.Unlock()
|
||||||
logger.Info("** IPFS Cluster is READY **")
|
logger.Info("** IPFS Cluster is READY **")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ type Consensus struct {
|
||||||
rpcReady chan struct{}
|
rpcReady chan struct{}
|
||||||
readyCh chan struct{}
|
readyCh chan struct{}
|
||||||
|
|
||||||
shutdownLock sync.Mutex
|
shutdownLock sync.RWMutex
|
||||||
shutdown bool
|
shutdown bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,13 +139,11 @@ func (cc *Consensus) WaitForSync() error {
|
||||||
// signal the component as Ready.
|
// signal the component as Ready.
|
||||||
func (cc *Consensus) finishBootstrap() {
|
func (cc *Consensus) finishBootstrap() {
|
||||||
// wait until we have RPC to perform any actions.
|
// wait until we have RPC to perform any actions.
|
||||||
if cc.rpcClient == nil {
|
|
||||||
select {
|
select {
|
||||||
case <-cc.ctx.Done():
|
case <-cc.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-cc.rpcReady:
|
case <-cc.rpcReady:
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Sometimes bootstrap is a no-op. It only applies when
|
// Sometimes bootstrap is a no-op. It only applies when
|
||||||
// no state exists and staging=false.
|
// no state exists and staging=false.
|
||||||
|
@ -291,9 +289,9 @@ func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) er
|
||||||
// Being here means we are the LEADER. We can commit.
|
// Being here means we are the LEADER. We can commit.
|
||||||
|
|
||||||
// now commit the changes to our state
|
// now commit the changes to our state
|
||||||
cc.shutdownLock.Lock() // do not shut down while committing
|
cc.shutdownLock.RLock() // do not shut down while committing
|
||||||
_, finalErr = cc.consensus.CommitOp(op)
|
_, finalErr = cc.consensus.CommitOp(op)
|
||||||
cc.shutdownLock.Unlock()
|
cc.shutdownLock.RUnlock()
|
||||||
if finalErr != nil {
|
if finalErr != nil {
|
||||||
goto RETRY
|
goto RETRY
|
||||||
}
|
}
|
||||||
|
@ -347,9 +345,9 @@ func (cc *Consensus) AddPeer(pid peer.ID) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Being here means we are the leader and can commit
|
// Being here means we are the leader and can commit
|
||||||
cc.shutdownLock.Lock() // do not shutdown while committing
|
cc.shutdownLock.RLock() // do not shutdown while committing
|
||||||
finalErr = cc.raft.AddPeer(peer.IDB58Encode(pid))
|
finalErr = cc.raft.AddPeer(peer.IDB58Encode(pid))
|
||||||
cc.shutdownLock.Unlock()
|
cc.shutdownLock.RUnlock()
|
||||||
if finalErr != nil {
|
if finalErr != nil {
|
||||||
time.Sleep(cc.config.CommitRetryDelay)
|
time.Sleep(cc.config.CommitRetryDelay)
|
||||||
continue
|
continue
|
||||||
|
@ -374,9 +372,9 @@ func (cc *Consensus) RmPeer(pid peer.ID) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Being here means we are the leader and can commit
|
// Being here means we are the leader and can commit
|
||||||
cc.shutdownLock.Lock() // do not shutdown while committing
|
cc.shutdownLock.RLock() // do not shutdown while committing
|
||||||
finalErr = cc.raft.RemovePeer(peer.IDB58Encode(pid))
|
finalErr = cc.raft.RemovePeer(peer.IDB58Encode(pid))
|
||||||
cc.shutdownLock.Unlock()
|
cc.shutdownLock.RUnlock()
|
||||||
if finalErr != nil {
|
if finalErr != nil {
|
||||||
time.Sleep(cc.config.CommitRetryDelay)
|
time.Sleep(cc.config.CommitRetryDelay)
|
||||||
continue
|
continue
|
||||||
|
@ -414,6 +412,8 @@ func (cc *Consensus) Leader() (peer.ID, error) {
|
||||||
// Clean removes all raft data from disk. Next time
|
// Clean removes all raft data from disk. Next time
|
||||||
// a full new peer will be bootstrapped.
|
// a full new peer will be bootstrapped.
|
||||||
func (cc *Consensus) Clean() error {
|
func (cc *Consensus) Clean() error {
|
||||||
|
cc.shutdownLock.RLock()
|
||||||
|
defer cc.shutdownLock.RUnlock()
|
||||||
if !cc.shutdown {
|
if !cc.shutdown {
|
||||||
return errors.New("consensus component is not shutdown")
|
return errors.New("consensus component is not shutdown")
|
||||||
}
|
}
|
||||||
|
@ -438,6 +438,9 @@ func (cc *Consensus) Rollback(state state.State) error {
|
||||||
// Peers return the current list of peers in the consensus.
|
// Peers return the current list of peers in the consensus.
|
||||||
// The list will be sorted alphabetically.
|
// The list will be sorted alphabetically.
|
||||||
func (cc *Consensus) Peers() ([]peer.ID, error) {
|
func (cc *Consensus) Peers() ([]peer.ID, error) {
|
||||||
|
cc.shutdownLock.RLock() // prevent shutdown while here
|
||||||
|
defer cc.shutdownLock.RUnlock()
|
||||||
|
|
||||||
if cc.shutdown { // things hang a lot in this case
|
if cc.shutdown { // things hang a lot in this case
|
||||||
return nil, errors.New("consensus is shutdown")
|
return nil, errors.New("consensus is shutdown")
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,6 +103,7 @@ func init() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ReadyTimeout = 11 * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkErr(t *testing.T, err error) {
|
func checkErr(t *testing.T, err error) {
|
||||||
|
@ -149,8 +150,6 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (
|
||||||
clusterCfg.LeaveOnShutdown = false
|
clusterCfg.LeaveOnShutdown = false
|
||||||
clusterCfg.SetBaseDir("./e2eTestRaft/" + pid.Pretty())
|
clusterCfg.SetBaseDir("./e2eTestRaft/" + pid.Pretty())
|
||||||
|
|
||||||
ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second
|
|
||||||
|
|
||||||
host, err := NewClusterHost(context.Background(), clusterCfg)
|
host, err := NewClusterHost(context.Background(), clusterCfg)
|
||||||
checkErr(t, err)
|
checkErr(t, err)
|
||||||
|
|
||||||
|
|
|
@ -136,6 +136,11 @@
|
||||||
"hash": "QmZAsayEQakfFbHyakgHRKHwBTWrwuSBTfaMyxJZUG97VC",
|
"hash": "QmZAsayEQakfFbHyakgHRKHwBTWrwuSBTfaMyxJZUG97VC",
|
||||||
"name": "go-libp2p-kad-dht",
|
"name": "go-libp2p-kad-dht",
|
||||||
"version": "4.3.1"
|
"version": "4.3.1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"hash": "Qmbq7kGxgcpALGLPaWDyTa6KUq5kBUKdEvkvPZcBkJoLex",
|
||||||
|
"name": "go-log",
|
||||||
|
"version": "1.5.6"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"gxVersion": "0.11.0",
|
"gxVersion": "0.11.0",
|
||||||
|
@ -145,4 +150,3 @@
|
||||||
"releaseCmd": "git commit -S -a -m \"gx publish $VERSION\"",
|
"releaseCmd": "git commit -S -a -m \"gx publish $VERSION\"",
|
||||||
"version": "0.4.0"
|
"version": "0.4.0"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -211,6 +211,9 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Otherwise fails when running with -race
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
|
||||||
err = spt.Track(fastPin)
|
err = spt.Track(fastPin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user