Issue #340: Fix some data races

Unfortunately, there are still some data races in yamux
https://github.com/libp2p/go-libp2p/issues/396 so we can't
enable this by default.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-08-15 12:27:01 +02:00
parent e86c2e6c59
commit 5bbc699bb4
5 changed files with 27 additions and 16 deletions

View File

@ -406,7 +406,9 @@ This might be due to one or several causes:
}
close(c.readyCh)
c.shutdownLock.Lock()
c.readyB = true
c.shutdownLock.Unlock()
logger.Info("** IPFS Cluster is READY **")
}

View File

@ -43,7 +43,7 @@ type Consensus struct {
rpcReady chan struct{}
readyCh chan struct{}
shutdownLock sync.Mutex
shutdownLock sync.RWMutex
shutdown bool
}
@ -139,12 +139,10 @@ func (cc *Consensus) WaitForSync() error {
// signal the component as Ready.
func (cc *Consensus) finishBootstrap() {
// wait until we have RPC to perform any actions.
if cc.rpcClient == nil {
select {
case <-cc.ctx.Done():
return
case <-cc.rpcReady:
}
select {
case <-cc.ctx.Done():
return
case <-cc.rpcReady:
}
// Sometimes bootstrap is a no-op. It only applies when
@ -291,9 +289,9 @@ func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) er
// Being here means we are the LEADER. We can commit.
// now commit the changes to our state
cc.shutdownLock.Lock() // do not shut down while committing
cc.shutdownLock.RLock() // do not shut down while committing
_, finalErr = cc.consensus.CommitOp(op)
cc.shutdownLock.Unlock()
cc.shutdownLock.RUnlock()
if finalErr != nil {
goto RETRY
}
@ -347,9 +345,9 @@ func (cc *Consensus) AddPeer(pid peer.ID) error {
return err
}
// Being here means we are the leader and can commit
cc.shutdownLock.Lock() // do not shutdown while committing
cc.shutdownLock.RLock() // do not shutdown while committing
finalErr = cc.raft.AddPeer(peer.IDB58Encode(pid))
cc.shutdownLock.Unlock()
cc.shutdownLock.RUnlock()
if finalErr != nil {
time.Sleep(cc.config.CommitRetryDelay)
continue
@ -374,9 +372,9 @@ func (cc *Consensus) RmPeer(pid peer.ID) error {
return err
}
// Being here means we are the leader and can commit
cc.shutdownLock.Lock() // do not shutdown while committing
cc.shutdownLock.RLock() // do not shutdown while committing
finalErr = cc.raft.RemovePeer(peer.IDB58Encode(pid))
cc.shutdownLock.Unlock()
cc.shutdownLock.RUnlock()
if finalErr != nil {
time.Sleep(cc.config.CommitRetryDelay)
continue
@ -414,6 +412,8 @@ func (cc *Consensus) Leader() (peer.ID, error) {
// Clean removes all raft data from disk. Next time
// a full new peer will be bootstrapped.
func (cc *Consensus) Clean() error {
cc.shutdownLock.RLock()
defer cc.shutdownLock.RUnlock()
if !cc.shutdown {
return errors.New("consensus component is not shutdown")
}
@ -438,6 +438,9 @@ func (cc *Consensus) Rollback(state state.State) error {
// Peers return the current list of peers in the consensus.
// The list will be sorted alphabetically.
func (cc *Consensus) Peers() ([]peer.ID, error) {
cc.shutdownLock.RLock() // prevent shutdown while here
defer cc.shutdownLock.RUnlock()
if cc.shutdown { // things hang a lot in this case
return nil, errors.New("consensus is shutdown")
}

View File

@ -103,6 +103,7 @@ func init() {
continue
}
}
ReadyTimeout = 11 * time.Second
}
func checkErr(t *testing.T, err error) {
@ -149,8 +150,6 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (
clusterCfg.LeaveOnShutdown = false
clusterCfg.SetBaseDir("./e2eTestRaft/" + pid.Pretty())
ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second
host, err := NewClusterHost(context.Background(), clusterCfg)
checkErr(t, err)

View File

@ -136,6 +136,11 @@
"hash": "QmZAsayEQakfFbHyakgHRKHwBTWrwuSBTfaMyxJZUG97VC",
"name": "go-libp2p-kad-dht",
"version": "4.3.1"
},
{
"hash": "Qmbq7kGxgcpALGLPaWDyTa6KUq5kBUKdEvkvPZcBkJoLex",
"name": "go-log",
"version": "1.5.6"
}
],
"gxVersion": "0.11.0",
@ -145,4 +150,3 @@
"releaseCmd": "git commit -S -a -m \"gx publish $VERSION\"",
"version": "0.4.0"
}

View File

@ -211,6 +211,9 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
t.Fatal(err)
}
// Otherwise fails when running with -race
time.Sleep(300 * time.Millisecond)
err = spt.Track(fastPin)
if err != nil {
t.Fatal(err)