Raft/PeerRm: attempt more orderly peer removal
Wait until FSM has applied the operation. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
7540e7b056
commit
199dbb944a
|
@ -54,12 +54,10 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta
|
|||
return nil, err
|
||||
}
|
||||
|
||||
op := &LogOp{
|
||||
ctx: context.Background(),
|
||||
}
|
||||
baseOp := &LogOp{}
|
||||
|
||||
logger.Infof("starting Consensus and waiting for a leader...")
|
||||
consensus := libp2praft.NewOpLog(state, op)
|
||||
consensus := libp2praft.NewOpLog(state, baseOp)
|
||||
raft, err := newRaftWrapper(clusterPeers, host, cfg, consensus.FSM())
|
||||
if err != nil {
|
||||
logger.Error("error creating raft: ", err)
|
||||
|
@ -69,7 +67,6 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta
|
|||
consensus.SetActor(actor)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
op.ctx = ctx
|
||||
|
||||
cc := &Consensus{
|
||||
ctx: ctx,
|
||||
|
@ -78,12 +75,14 @@ func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state sta
|
|||
host: host,
|
||||
consensus: consensus,
|
||||
actor: actor,
|
||||
baseOp: op,
|
||||
baseOp: baseOp,
|
||||
raft: raft,
|
||||
rpcReady: make(chan struct{}, 1),
|
||||
readyCh: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
baseOp.consensus = cc
|
||||
|
||||
go cc.finishBootstrap()
|
||||
return cc, nil
|
||||
}
|
||||
|
@ -172,7 +171,6 @@ func (cc *Consensus) Shutdown() error {
|
|||
// SetClient makes the component ready to perform RPC requets
|
||||
func (cc *Consensus) SetClient(c *rpc.Client) {
|
||||
cc.rpcClient = c
|
||||
cc.baseOp.rpcClient = c
|
||||
cc.rpcReady <- struct{}{}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/state"
|
||||
|
||||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||
consensus "github.com/libp2p/go-libp2p-consensus"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
)
|
||||
|
@ -30,8 +28,7 @@ type LogOp struct {
|
|||
Cid api.PinSerial
|
||||
Peer api.MultiaddrSerial
|
||||
Type LogOpType
|
||||
ctx context.Context
|
||||
rpcClient *rpc.Client
|
||||
consensus *Consensus
|
||||
}
|
||||
|
||||
// ApplyTo applies the operation to the State
|
||||
|
@ -50,7 +47,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
|
|||
goto ROLLBACK
|
||||
}
|
||||
// Async, we let the PinTracker take care of any problems
|
||||
op.rpcClient.Go("",
|
||||
op.consensus.rpcClient.Go("",
|
||||
"Cluster",
|
||||
"Track",
|
||||
op.Cid,
|
||||
|
@ -62,14 +59,14 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
|
|||
goto ROLLBACK
|
||||
}
|
||||
// Async, we let the PinTracker take care of any problems
|
||||
op.rpcClient.Go("",
|
||||
op.consensus.rpcClient.Go("",
|
||||
"Cluster",
|
||||
"Untrack",
|
||||
op.Cid,
|
||||
&struct{}{},
|
||||
nil)
|
||||
case LogOpAddPeer:
|
||||
op.rpcClient.Call("",
|
||||
op.consensus.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"PeerManagerAddPeer",
|
||||
op.Peer,
|
||||
|
@ -80,7 +77,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
|
|||
if err != nil {
|
||||
panic("could not decode a PID we ourselves encoded")
|
||||
}
|
||||
op.rpcClient.Call("",
|
||||
op.consensus.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"PeerManagerRmPeer",
|
||||
pid,
|
||||
|
|
|
@ -328,17 +328,29 @@ func (rw *raftWrapper) RemovePeer(peer string) error {
|
|||
return errors.New("cannot remove ourselves from a 1-peer cluster")
|
||||
}
|
||||
|
||||
future := rw.raft.RemoveServer(
|
||||
rmFuture := rw.raft.RemoveServer(
|
||||
hraft.ServerID(peer),
|
||||
0,
|
||||
0) // TODO: Extra cfg value?
|
||||
err = future.Error()
|
||||
err = rmFuture.Error()
|
||||
if err != nil {
|
||||
logger.Error("raft cannot remove peer: ", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// make sure change is applied everywhere before continuing
|
||||
// this makes sure that a leaving node gets the memo
|
||||
// before we shut it down.
|
||||
bFuture := rw.raft.Barrier(10 * time.Second)
|
||||
err = bFuture.Error()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Leader returns Raft's leader. It may be an empty string if
|
||||
// there is no leader or it is unknown.
|
||||
func (rw *raftWrapper) Leader() string {
|
||||
|
|
Loading…
Reference in New Issue
Block a user