From 5bbc699bb427c319733a4a625adf9a7c03bbaa7f Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Wed, 15 Aug 2018 12:27:01 +0200 Subject: [PATCH] Issue #340: Fix some data races Unfortunately, there are still some data races in yamux https://github.com/libp2p/go-libp2p/issues/396 so we can't enable this by default. License: MIT Signed-off-by: Hector Sanjuan --- cluster.go | 2 ++ consensus/raft/consensus.go | 29 ++++++++++++++------------ ipfscluster_test.go | 3 +-- package.json | 6 +++++- pintracker/stateless/stateless_test.go | 3 +++ 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/cluster.go b/cluster.go index 340b689b..57708202 100644 --- a/cluster.go +++ b/cluster.go @@ -406,7 +406,9 @@ This might be due to one or several causes: } close(c.readyCh) + c.shutdownLock.Lock() c.readyB = true + c.shutdownLock.Unlock() logger.Info("** IPFS Cluster is READY **") } diff --git a/consensus/raft/consensus.go b/consensus/raft/consensus.go index 9bf46497..ee95a96b 100644 --- a/consensus/raft/consensus.go +++ b/consensus/raft/consensus.go @@ -43,7 +43,7 @@ type Consensus struct { rpcReady chan struct{} readyCh chan struct{} - shutdownLock sync.Mutex + shutdownLock sync.RWMutex shutdown bool } @@ -139,12 +139,10 @@ func (cc *Consensus) WaitForSync() error { // signal the component as Ready. func (cc *Consensus) finishBootstrap() { // wait until we have RPC to perform any actions. - if cc.rpcClient == nil { - select { - case <-cc.ctx.Done(): - return - case <-cc.rpcReady: - } + select { + case <-cc.ctx.Done(): + return + case <-cc.rpcReady: } // Sometimes bootstrap is a no-op. It only applies when @@ -291,9 +289,9 @@ func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) er // Being here means we are the LEADER. We can commit. // now commit the changes to our state - cc.shutdownLock.Lock() // do not shut down while committing + cc.shutdownLock.RLock() // do not shut down while committing _, finalErr = cc.consensus.CommitOp(op) - cc.shutdownLock.Unlock() + cc.shutdownLock.RUnlock() if finalErr != nil { goto RETRY } @@ -347,9 +345,9 @@ func (cc *Consensus) AddPeer(pid peer.ID) error { return err } // Being here means we are the leader and can commit - cc.shutdownLock.Lock() // do not shutdown while committing + cc.shutdownLock.RLock() // do not shutdown while committing finalErr = cc.raft.AddPeer(peer.IDB58Encode(pid)) - cc.shutdownLock.Unlock() + cc.shutdownLock.RUnlock() if finalErr != nil { time.Sleep(cc.config.CommitRetryDelay) continue @@ -374,9 +372,9 @@ func (cc *Consensus) RmPeer(pid peer.ID) error { return err } // Being here means we are the leader and can commit - cc.shutdownLock.Lock() // do not shutdown while committing + cc.shutdownLock.RLock() // do not shutdown while committing finalErr = cc.raft.RemovePeer(peer.IDB58Encode(pid)) - cc.shutdownLock.Unlock() + cc.shutdownLock.RUnlock() if finalErr != nil { time.Sleep(cc.config.CommitRetryDelay) continue @@ -414,6 +412,8 @@ func (cc *Consensus) Leader() (peer.ID, error) { // Clean removes all raft data from disk. Next time // a full new peer will be bootstrapped. func (cc *Consensus) Clean() error { + cc.shutdownLock.RLock() + defer cc.shutdownLock.RUnlock() if !cc.shutdown { return errors.New("consensus component is not shutdown") } @@ -438,6 +438,9 @@ func (cc *Consensus) Rollback(state state.State) error { // Peers return the current list of peers in the consensus. // The list will be sorted alphabetically. func (cc *Consensus) Peers() ([]peer.ID, error) { + cc.shutdownLock.RLock() // prevent shutdown while here + defer cc.shutdownLock.RUnlock() + if cc.shutdown { // things hang a lot in this case return nil, errors.New("consensus is shutdown") } diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 31317139..c5d9adfb 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -103,6 +103,7 @@ func init() { continue } } + ReadyTimeout = 11 * time.Second } func checkErr(t *testing.T, err error) { @@ -149,8 +150,6 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) ( clusterCfg.LeaveOnShutdown = false clusterCfg.SetBaseDir("./e2eTestRaft/" + pid.Pretty()) - ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second - host, err := NewClusterHost(context.Background(), clusterCfg) checkErr(t, err) diff --git a/package.json b/package.json index d14d1e00..a0a3bdf4 100644 --- a/package.json +++ b/package.json @@ -136,6 +136,11 @@ "hash": "QmZAsayEQakfFbHyakgHRKHwBTWrwuSBTfaMyxJZUG97VC", "name": "go-libp2p-kad-dht", "version": "4.3.1" + }, + { + "hash": "Qmbq7kGxgcpALGLPaWDyTa6KUq5kBUKdEvkvPZcBkJoLex", + "name": "go-log", + "version": "1.5.6" } ], "gxVersion": "0.11.0", @@ -145,4 +150,3 @@ "releaseCmd": "git commit -S -a -m \"gx publish $VERSION\"", "version": "0.4.0" } - diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index 7096bebd..ad0d6bd1 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -211,6 +211,9 @@ func TestTrackUntrackWithNoCancel(t *testing.T) { t.Fatal(err) } + // Otherwise fails when running with -race + time.Sleep(300 * time.Millisecond) + err = spt.Track(fastPin) if err != nil { t.Fatal(err)