Issue #131: Improvements adding and removing

This works on remove+shutdown procedure and fixes a few small
issues.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-10-31 11:20:14 +01:00
parent 10c7afbd59
commit 7a5f8f184b
11 changed files with 208 additions and 97 deletions

View File

@ -50,7 +50,8 @@ type Cluster struct {
informer Informer
shutdownLock sync.Mutex
shutdown bool
shutdownB bool
shutdownOnce sync.Once
doneCh chan struct{}
readyCh chan struct{}
readyB bool
@ -106,6 +107,7 @@ func NewCluster(
monitor: monitor,
allocator: allocator,
informer: informer,
shutdownB: false,
doneCh: make(chan struct{}),
readyCh: make(chan struct{}),
readyB: false,
@ -142,9 +144,9 @@ func (c *Cluster) setupPeerManager() {
c.peerManager = pm
if len(c.config.Peers) > 0 {
c.peerManager.addFromMultiaddrs(c.config.Peers)
c.peerManager.addFromMultiaddrs(c.config.Peers, false)
} else {
c.peerManager.addFromMultiaddrs(c.config.Bootstrap)
c.peerManager.addFromMultiaddrs(c.config.Bootstrap, false)
}
}
@ -419,26 +421,22 @@ func (c *Cluster) Shutdown() error {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdown {
logger.Warning("Cluster is already shutdown")
if c.shutdownB {
logger.Debug("Cluster is already shutdown")
return nil
}
logger.Info("shutting down IPFS Cluster")
logger.Info("shutting down Cluster")
// Only attempt to leave if consensus is initialized and cluster
// was ready at some point. Otherwise, it would mean bootstrap failed.
if c.config.LeaveOnShutdown && c.consensus != nil && c.readyB {
// best effort
logger.Warning("Attempting to leave Cluster. This may take some seconds")
logger.Warning("attempting to leave the cluster. This may take some seconds")
err := c.consensus.LogRmPeer(c.id)
if err != nil {
logger.Error("leaving cluster: " + err.Error())
} else {
time.Sleep(2 * time.Second)
}
c.config.Bootstrap = c.peerManager.peersAddrs()
c.peerManager.resetPeers()
}
// Cancel contexts
@ -453,7 +451,8 @@ func (c *Cluster) Shutdown() error {
// Do not save anything if we were not ready
if c.readyB {
c.peerManager.savePeers()
// peers are saved usually on addPeer/rmPeer
// c.peerManager.savePeers()
c.backupState()
}
@ -477,7 +476,7 @@ func (c *Cluster) Shutdown() error {
}
c.wg.Wait()
c.host.Close() // Shutdown all network services
c.shutdown = true
c.shutdownB = true
close(c.doneCh)
return nil
}
@ -539,7 +538,7 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
// Figure out its real address if we have one
remoteAddr := getRemoteMultiaddr(c.host, pid, decapAddr)
err = c.peerManager.addPeer(remoteAddr)
err = c.peerManager.addPeer(remoteAddr, false)
if err != nil {
logger.Error(err)
id := api.ID{ID: pid, Error: err.Error()}
@ -559,7 +558,7 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
}
// Log the new peer in the log so everyone gets it.
err = c.consensus.LogAddPeer(remoteAddr)
err = c.consensus.LogAddPeer(remoteAddr) // this will save
if err != nil {
logger.Error(err)
id := api.ID{ID: pid, Error: err.Error()}
@ -602,9 +601,9 @@ func (c *Cluster) PeerRemove(pid peer.ID) error {
return fmt.Errorf("%s is not a peer", pid.Pretty())
}
// The peer is no longer among the peer set, so we can re-allocate
// any CIDs associated to it.
logger.Infof("Re-allocating all CIDs directly associated to %s", pid)
// We need to repin before removing the peer, otherwise, it won't
// be able to submit the pins.
logger.Infof("re-allocating all CIDs directly associated to %s", pid)
c.repinFromPeer(pid)
err := c.consensus.LogRmPeer(pid)
@ -613,17 +612,6 @@ func (c *Cluster) PeerRemove(pid peer.ID) error {
return err
}
// This is a best effort. It may fail
// if that peer is down
err = c.rpcClient.Call(pid,
"Cluster",
"PeerManagerRmPeerShutdown",
pid,
&struct{}{})
if err != nil {
logger.Error(err)
}
return nil
}
@ -650,7 +638,7 @@ func (c *Cluster) Join(addr ma.Multiaddr) error {
}
// Add peer to peerstore so we can talk to it
c.peerManager.addPeer(addr)
c.peerManager.addPeer(addr, false)
// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
@ -1217,15 +1205,15 @@ func (c *Cluster) backupState() {
return
}
folder := filepath.Dir(c.config.BaseDir)
err := os.MkdirAll(filepath.Join(folder, "backups"), 0700)
folder := filepath.Join(c.config.BaseDir, "backups")
err := os.MkdirAll(folder, 0700)
if err != nil {
logger.Error(err)
logger.Error("skipping backup")
return
}
fname := time.Now().UTC().Format("20060102_15:04:05")
f, err := os.Create(filepath.Join(folder, "backups", fname))
f, err := os.Create(filepath.Join(folder, fname))
if err != nil {
logger.Error(err)
return

View File

@ -5,6 +5,7 @@ package raft
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -209,7 +210,7 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err
// No leader, wait for one
if err != nil {
logger.Warningf("there seems to be no leader. Waiting for one")
logger.Warning("there seems to be no leader. Waiting for one")
rctx, cancel := context.WithTimeout(
cc.ctx,
cc.config.WaitForLeaderTimeout)
@ -219,7 +220,7 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err
// means we timed out waiting for a leader
// we don't retry in this case
if err != nil {
return false, errors.New("timed out waiting for leader")
return false, fmt.Errorf("timed out waiting for leader: %s", err)
}
leader, err = peer.IDB58Decode(pidstr)
if err != nil {
@ -232,7 +233,7 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err
return false, nil
}
logger.Debugf("redirecting to leader: %s", leader)
logger.Debugf("redirecting to leader: %s", leader.Pretty())
finalErr = cc.rpcClient.Call(
leader,
"Cluster",
@ -241,7 +242,7 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err
&struct{}{})
if finalErr != nil {
logger.Error(finalErr)
logger.Info("retrying to redirect request to leader")
logger.Error("retrying to redirect request to leader")
time.Sleep(2 * cc.config.RaftConfig.HeartbeatTimeout)
continue
}
@ -260,7 +261,7 @@ func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) er
// this means we are retrying
if finalErr != nil {
logger.Error("retrying upon failed commit (retry %d): ",
logger.Errorf("retrying upon failed commit (retry %d): %s ",
i, finalErr)
}
@ -275,7 +276,7 @@ func (cc *Consensus) commit(op *LogOp, rpcOp string, redirectArg interface{}) er
// Being here means we are the LEADER. We can commit.
// now commit the changes to our state
_, finalErr := cc.consensus.CommitOp(op)
_, finalErr = cc.consensus.CommitOp(op)
if finalErr != nil {
goto RETRY
}

View File

@ -140,6 +140,21 @@ func TestConsensusLogAddPeer(t *testing.T) {
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
err = cc2.raft.WaitForPeer(ctx, cc.host.ID().Pretty(), false)
if err != nil {
t.Fatal(err)
}
peers, err := cc2.raft.Peers()
if err != nil {
t.Fatal(err)
}
if len(peers) != 2 {
t.Error("peer was not added")
}
}
func TestConsensusLogRmPeer(t *testing.T) {
@ -159,7 +174,11 @@ func TestConsensusLogRmPeer(t *testing.T) {
t.Error("could not add peer:", err)
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ctx, _ := context.WithTimeout(context.Background(), 20*time.Second)
err = cc.raft.WaitForPeer(ctx, cc2.host.ID().Pretty(), false)
if err != nil {
t.Fatal(err)
}
cc.raft.WaitForLeader(ctx)
c, _ := cid.Decode(test.TestCid1)
@ -182,6 +201,11 @@ func TestConsensusLogRmPeer(t *testing.T) {
if err != nil && err2 != nil {
t.Error("could not remove peer:", err, err2)
}
err = cc.raft.WaitForPeer(ctx, cc2.host.ID().Pretty(), true)
if err != nil {
t.Fatal(err)
}
}
func TestConsensusLeader(t *testing.T) {

View File

@ -1,7 +1,9 @@
package raft
import (
"context"
"errors"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
@ -66,22 +68,46 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
&struct{}{},
nil)
case LogOpAddPeer:
// pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr())
op.consensus.rpcClient.Call("",
"Cluster",
"PeerManagerAddPeer",
op.Peer,
&struct{}{})
case LogOpRmPeer:
pidstr := parsePIDFromMultiaddr(op.Peer.ToMultiaddr())
pid, err := peer.IDB58Decode(pidstr)
if err != nil {
panic("could not decode a PID we ourselves encoded")
}
op.consensus.rpcClient.Call("",
"Cluster",
"PeerManagerRmPeer",
pid,
&struct{}{})
// Asynchronously wait for peer to be removed from raft
// and remove it from the peerset. Otherwise do nothing
go func() {
ctx, cancel := context.WithTimeout(op.consensus.ctx,
10*time.Second)
defer cancel()
// Do not wait if we are being removed
// as it may just hang waiting for a future.
if pid != op.consensus.host.ID() {
err = op.consensus.raft.WaitForPeer(ctx, pidstr, true)
if err != nil {
if err.Error() != errWaitingForSelf.Error() {
logger.Warningf("Peer has not been removed from raft: %s: %s", pidstr, err)
}
return
}
}
op.consensus.rpcClient.Call("",
"Cluster",
"PeerManagerRmPeer",
pid,
&struct{}{})
}()
default:
logger.Error("unknown LogOp type. Ignoring")
}

View File

@ -1,7 +1,6 @@
package raft
import (
"context"
"testing"
cid "github.com/ipfs/go-cid"
@ -12,12 +11,14 @@ import (
)
func TestApplyToPin(t *testing.T) {
cc := testingConsensus(t, p2pPort)
op := &LogOp{
Cid: api.PinSerial{Cid: test.TestCid1},
Type: LogOpPin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
consensus: cc,
}
defer cleanRaft(p2pPort)
defer cc.Shutdown()
st := mapstate.NewMapState()
op.ApplyTo(st)
@ -28,12 +29,14 @@ func TestApplyToPin(t *testing.T) {
}
func TestApplyToUnpin(t *testing.T) {
cc := testingConsensus(t, p2pPort)
op := &LogOp{
Cid: api.PinSerial{Cid: test.TestCid1},
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
consensus: cc,
}
defer cleanRaft(p2pPort)
defer cc.Shutdown()
st := mapstate.NewMapState()
c, _ := cid.Decode(test.TestCid1)
@ -53,10 +56,8 @@ func TestApplyToBadState(t *testing.T) {
}()
op := &LogOp{
Cid: api.PinSerial{Cid: test.TestCid1},
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
Cid: api.PinSerial{Cid: test.TestCid1},
Type: LogOpUnpin,
}
var st interface{}

View File

@ -14,9 +14,13 @@ import (
p2praft "github.com/libp2p/go-libp2p-raft"
)
// ErrBadRaftState is returned when the consensus component cannot start
// errBadRaftState is returned when the consensus component cannot start
// because the cluster peers do not match the raft peers.
var ErrBadRaftState = errors.New("cluster peers do not match raft peers")
var errBadRaftState = errors.New("cluster peers do not match raft peers")
// ErrWaitingForSelf is returned when we are waiting for ourselves to depart
// the peer set, which won't happen
var errWaitingForSelf = errors.New("waiting for ourselves to depart")
// RaftMaxSnapshots indicates how many snapshots to keep in the consensus data
// folder.
@ -154,7 +158,7 @@ func newRaftWrapper(peers []peer.ID, host host.Host, cfg *Config, fsm hraft.FSM)
logger.Errorf(" - %s", s.ID)
}
logger.Errorf("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil, ErrBadRaftState
return nil, errBadRaftState
//return nil, errors.New("Bad cluster peers")
}
}
@ -282,6 +286,38 @@ func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
}
}
func (rw *raftWrapper) WaitForPeer(ctx context.Context, pid string, depart bool) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
peers, err := rw.Peers()
if err != nil {
return err
}
if len(peers) == 1 && pid == peers[0] && depart {
return errWaitingForSelf
}
found := find(peers, pid)
// departing
if depart && !found {
return nil
}
// joining
if !depart && found {
return nil
}
time.Sleep(50 * time.Millisecond)
}
}
}
// Snapshot tells Raft to take a snapshot.
func (rw *raftWrapper) Snapshot() error {
future := rw.raft.Snapshot()

View File

@ -8,6 +8,9 @@ func init() {
SetFacilityLogLevel(f, l)
}
//SetFacilityLogLevel("cluster", l)
//SetFacilityLogLevel("consensus", l)
//SetFacilityLogLevel("monitor", "INFO")
//SetFacilityLogLevel("raft", l)
//SetFacilityLogLevel("p2p-gorpc", l)
//SetFacilityLogLevel("swarm2", l)

View File

@ -225,7 +225,7 @@ func waitForLeader(t *testing.T, clusters []*Cluster) {
// Make sure we don't check on a shutdown cluster
j := rand.Intn(len(clusters))
for clusters[j].shutdown {
for clusters[j].shutdownB {
j = rand.Intn(len(clusters))
}

View File

@ -29,7 +29,7 @@ func newPeerManager(c *Cluster) *peerManager {
return pm
}
func (pm *peerManager) addPeer(addr ma.Multiaddr) error {
func (pm *peerManager) addPeer(addr ma.Multiaddr, save bool) error {
logger.Debugf("adding peer %s", addr)
pid, decapAddr, err := multiaddrSplit(addr)
if err != nil {
@ -47,10 +47,15 @@ func (pm *peerManager) addPeer(addr ma.Multiaddr) error {
pm.peermap[pid] = addr
pm.m.Unlock()
if save {
pm.savePeers()
}
logger.Debugf("peers after adding %s", pm.peersAddrs())
return nil
}
func (pm *peerManager) rmPeer(pid peer.ID, selfShutdown bool) error {
func (pm *peerManager) rmPeer(pid peer.ID, save bool) error {
logger.Debugf("removing peer %s", pid.Pretty())
if pm.isPeer(pid) {
@ -61,19 +66,23 @@ func (pm *peerManager) rmPeer(pid peer.ID, selfShutdown bool) error {
delete(pm.peermap, pid)
pm.m.Unlock()
// It's ourselves. This is not very graceful
if pid == pm.self && selfShutdown {
logger.Warning("this peer has been removed from the Cluster and will shutdown itself in 5 seconds")
defer func() {
go func() {
time.Sleep(1 * time.Second)
pm.cluster.consensus.Shutdown()
pm.cluster.config.Bootstrap = pm.peersAddrs()
pm.resetPeers()
time.Sleep(4 * time.Second)
pm.cluster.Shutdown()
}()
}()
if pid == pm.self {
logger.Info("this peer has been removed and will shutdown")
// we are removing ourselves. Therefore we need to:
// - convert cluster peers to bootstrapping peers
// - shut ourselves down if we are not in the process
//
// Note that, if we are here, we have already been
// removed from the raft.
pm.cluster.config.Bootstrap = pm.peersAddrs()
pm.resetPeers()
time.Sleep(1 * time.Second)
// should block and do nothing if already doing it
pm.cluster.Shutdown()
}
if save {
pm.savePeers()
}
return nil
@ -132,13 +141,18 @@ func (pm *peerManager) peersAddrs() []ma.Multiaddr {
// return pm.addFromMultiaddrs(cfg.ClusterPeers)
// }
func (pm *peerManager) addFromMultiaddrs(addrs []ma.Multiaddr) error {
// this resets peers!
func (pm *peerManager) addFromMultiaddrs(addrs []ma.Multiaddr, save bool) error {
pm.resetPeers()
for _, m := range addrs {
err := pm.addPeer(m)
err := pm.addPeer(m, false)
if err != nil {
logger.Error(err)
return err
}
}
if save {
pm.savePeers()
}
return nil
}

View File

@ -6,6 +6,8 @@ import (
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
@ -192,7 +194,8 @@ func TestClustersPeerRemove(t *testing.T) {
runF(t, clusters, f)
}
func TestClusterPeerRemoveSelf(t *testing.T) {
func TestClustersPeerRemoveSelf(t *testing.T) {
// this test hangs sometimes if there are problems
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
@ -223,7 +226,7 @@ func TestClusterPeerRemoveSelf(t *testing.T) {
}
}
func TestClusterPeerRemoveReallocsPins(t *testing.T) {
func TestClustersPeerRemoveReallocsPins(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
@ -236,8 +239,22 @@ func TestClusterPeerRemoveReallocsPins(t *testing.T) {
c.config.ReplicationFactor = nClusters - 1
}
cpeer := clusters[0]
clusterID := cpeer.ID().ID
// We choose to remove the leader, to make things even more interesting
leaderID, err := clusters[0].consensus.Leader()
if err != nil {
t.Fatal(err)
}
var leader *Cluster
for _, cl := range clusters {
if id := cl.ID().ID; id == leaderID {
leader = cl
break
}
}
if leader == nil {
t.Fatal("did not find a leader?")
}
tmpCid, _ := cid.Decode(test.TestCid1)
prefix := tmpCid.Prefix()
@ -247,7 +264,7 @@ func TestClusterPeerRemoveReallocsPins(t *testing.T) {
for i := 0; i < nClusters; i++ {
h, err := prefix.Sum(randomBytes())
checkErr(t, err)
err = cpeer.Pin(api.PinCid(h))
err = leader.Pin(api.PinCid(h))
checkErr(t, err)
time.Sleep(time.Second)
}
@ -255,16 +272,16 @@ func TestClusterPeerRemoveReallocsPins(t *testing.T) {
delay()
// At this point, all peers must have 1 pin associated to them.
// Find out which pin is associated to cpeer.
// Find out which pin is associated to leader.
interestingCids := []*cid.Cid{}
pins := cpeer.Pins()
pins := leader.Pins()
if len(pins) != nClusters {
t.Fatal("expected number of tracked pins to be nClusters")
}
for _, p := range pins {
if containsPeer(p.Allocations, clusterID) {
//t.Logf("%s pins %s", clusterID, p.Cid)
if containsPeer(p.Allocations, leaderID) {
//t.Logf("%s pins %s", leaderID, p.Cid)
interestingCids = append(interestingCids, p.Cid)
}
}
@ -275,21 +292,23 @@ func TestClusterPeerRemoveReallocsPins(t *testing.T) {
len(interestingCids))
}
// Now remove cluster peer
err := clusters[0].PeerRemove(clusterID)
// Now the leader removes itself
err = leader.PeerRemove(leaderID)
if err != nil {
t.Fatal("error removing peer:", err)
}
time.Sleep(time.Second)
waitForLeader(t, clusters)
delay()
for _, icid := range interestingCids {
// Now check that the allocations are new.
newPin, err := clusters[0].PinGet(icid)
newPin, err := clusters[1].PinGet(icid)
if err != nil {
t.Fatal("error getting the new allocations for", icid)
}
if containsPeer(newPin.Allocations, clusterID) {
if containsPeer(newPin.Allocations, leaderID) {
t.Fatal("pin should not be allocated to the removed peer")
}
}
@ -391,7 +410,11 @@ func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
peersIds := []peer.ID{}
for _, p := range peers {
peersIds = append(peersIds, p.ID)
}
t.Errorf("%s sees %d peers: %s", c.id, len(peers), peersIds)
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {

View File

@ -290,25 +290,20 @@ func (rpcapi *RPCAPI) ConsensusLogRmPeer(in peer.ID, out *struct{}) error {
// PeerManagerAddPeer runs peerManager.addPeer().
func (rpcapi *RPCAPI) PeerManagerAddPeer(in api.MultiaddrSerial, out *struct{}) error {
addr := in.ToMultiaddr()
err := rpcapi.c.peerManager.addPeer(addr)
err := rpcapi.c.peerManager.addPeer(addr, true)
return err
}
// PeerManagerAddFromMultiaddrs runs peerManager.addFromMultiaddrs().
func (rpcapi *RPCAPI) PeerManagerAddFromMultiaddrs(in api.MultiaddrsSerial, out *struct{}) error {
addrs := in.ToMultiaddrs()
err := rpcapi.c.peerManager.addFromMultiaddrs(addrs)
err := rpcapi.c.peerManager.addFromMultiaddrs(addrs, true)
return err
}
// PeerManagerRmPeerShutdown runs peerManager.rmPeer().
func (rpcapi *RPCAPI) PeerManagerRmPeerShutdown(in peer.ID, out *struct{}) error {
return rpcapi.c.peerManager.rmPeer(in, true)
}
// PeerManagerRmPeer runs peerManager.rmPeer().
func (rpcapi *RPCAPI) PeerManagerRmPeer(in peer.ID, out *struct{}) error {
return rpcapi.c.peerManager.rmPeer(in, false)
return rpcapi.c.peerManager.rmPeer(in, true)
}
// PeerManagerPeers runs peerManager.peers().