From 45c31846d1d51c81af4e9c6d42942291ae8ffe08 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 15 Dec 2016 15:08:43 +0100 Subject: [PATCH] Fix startup sync mechanism License: MIT Signed-off-by: Hector Sanjuan --- TODO.md | 4 ++-- cluster.go | 2 ++ consensus.go | 54 ++++++++++++++++++++++++++++++++++------------------ rpc.go | 1 + 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/TODO.md b/TODO.md index a04db3ca..970351ed 100644 --- a/TODO.md +++ b/TODO.md @@ -2,8 +2,8 @@ Things that need to be done: -* Start up sync fix -* Allow to shutdown multiple times (+ test) +* ~~Start up sync fix~~ +* ~~Allow to shutdown multiple times (+ test)~~ * Efficient SyncAll (use single ipfs pin ls call) * LeaderRPC implementation * /pin/add /pin/rm hijack diff --git a/cluster.go b/cluster.go index 5b0768a8..1672f537 100644 --- a/cluster.go +++ b/cluster.go @@ -221,6 +221,8 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) { data = c.Members() case PinListRPC: data = c.tracker.ListPins() + case SyncRPC: + err = c.Sync() case RollbackRPC: state, ok := grpc.Argument.(ClusterState) if !ok { diff --git a/consensus.go b/consensus.go index 48a4384a..9985e86e 100644 --- a/consensus.go +++ b/consensus.go @@ -28,9 +28,12 @@ const ( type clusterLogOpType int -// We will wait for the consensus state to be updated up to this -// amount of seconds. -var MaxStartupDelay = 10 * time.Second +// FirstSyncDelay specifies what is the maximum delay +// before the we trigger a Sync operation after starting +// Raft. This is because Raft will need time to sync the global +// state. If not all the ops have been applied after this +// delay, at least the pin tracker will have a partial valid state. +var FirstSyncDelay = 10 * time.Second // clusterLogOp represents an operation for the OpLogConsensus system. // It implements the consensus.Op interface. @@ -141,21 +144,6 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) } cc.run() - - // FIXME: this is broken. - logger.Info("waiting for Consensus state to catch up") - time.Sleep(1 * time.Second) - start := time.Now() - for { - time.Sleep(500 * time.Millisecond) - li := wrapper.raft.LastIndex() - lai := wrapper.raft.AppliedIndex() - if lai == li || time.Since(start) > MaxStartupDelay { - break - } - logger.Debugf("waiting for Raft index: %d/%d", lai, li) - } - return cc, nil } @@ -167,6 +155,36 @@ func (cc *ClusterConsensus) run() { defer cancel() cc.ctx = ctx cc.baseOp.ctx = ctx + + upToDate := make(chan struct{}) + go func() { + logger.Info("consensus state is catching up") + time.Sleep(time.Second) + for { + lai := cc.p2pRaft.raft.AppliedIndex() + li := cc.p2pRaft.raft.LastIndex() + logger.Infof("current Raft index: %d/%d", lai, li) + if lai == li { + upToDate <- struct{}{} + break + } + time.Sleep(500 * time.Millisecond) + } + }() + + logger.Info("consensus state is catching up") + timer := time.NewTimer(FirstSyncDelay) + quitLoop := false + for !quitLoop { + select { + case <-timer.C: // Make a first sync + MakeRPC(ctx, cc.rpcCh, RPC(SyncRPC, nil), false) + case <-upToDate: + MakeRPC(ctx, cc.rpcCh, RPC(SyncRPC, nil), false) + quitLoop = true + } + } + <-cc.shutdownCh }() } diff --git a/rpc.go b/rpc.go index 27a2c240..5d62a51b 100644 --- a/rpc.go +++ b/rpc.go @@ -14,6 +14,7 @@ const ( MemberListRPC RollbackRPC LeaderRPC + SyncRPC NoopRPC )