ipfs-cluster/consensus/raft/consensus_test.go
Wyatt 47b744f1c0 ipfs-cluster-service state upgrade cli command
ipfs-cluster-service now has a migration subcommand that upgrades
    persistant state snapshots with an out-of-date format version to the
    newest version of raft state. If all cluster members shutdown with
    consistent state, upgrade ipfs-cluster, and run the state upgrade command,
    the new version of cluster will be compatible with persistent storage.
    ipfs-cluster now validates its persistent state upon loading it and exits
    with a clear error in the case the state format version is not up to date.

    Raft snapshotting is enforced on all shutdowns and the json backup is no
    longer run.  This commit makes use of recent changes to libp2p-raft
    allowing raft states to implement their own marshaling strategies. Now
    mapstate handles the logic for its (de)serialization.  In the interest of
    supporting various potential upgrade formats the state serialization
    begins with a varint (right now one byte) describing the version.

    Some go tests are modified and a go test is added to cover new ipfs-cluster
    raft snapshot reading functions.  Sharness tests are added to cover the
    state upgrade command.
2017-11-28 22:35:48 -05:00

287 lines
7.0 KiB
Go

package raft
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
)
var p2pPort = 10000
var p2pPortAlt = 11000
func cleanRaft(port int) {
os.RemoveAll(fmt.Sprintf("raftFolderFromTests%d", port))
}
func init() {
_ = logging.LevelDebug
//logging.SetLogLevel("consensus", "DEBUG")
}
func makeTestingHost(t *testing.T, port int) host.Host {
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
pid, _ := peer.IDFromPublicKey(pub)
maddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
ps := peerstore.NewPeerstore()
ps.AddPubKey(pid, pub)
ps.AddPrivKey(pid, priv)
n, _ := swarm.NewNetwork(
context.Background(),
[]ma.Multiaddr{maddr},
pid, ps, nil)
return basichost.New(n)
}
func testingConsensus(t *testing.T, port int) *Consensus {
h := makeTestingHost(t, port)
st := mapstate.NewMapState()
cfg := &Config{}
cfg.Default()
cfg.DataFolder = fmt.Sprintf("raftFolderFromTests%d", port)
cfg.hostShutdown = true
cc, err := NewConsensus([]peer.ID{h.ID()}, h, cfg, st)
if err != nil {
t.Fatal("cannot create Consensus:", err)
}
cc.SetClient(test.NewMockRPCClientWithHost(t, h))
<-cc.Ready()
return cc
}
func TestShutdownConsensus(t *testing.T) {
// Bring it up twice to make sure shutdown cleans up properly
// but also to make sure raft comes up ok when re-initialized
cc := testingConsensus(t, p2pPort)
err := cc.Shutdown()
if err != nil {
t.Fatal("Consensus cannot shutdown:", err)
}
err = cc.Shutdown() // should be fine to shutdown twice
if err != nil {
t.Fatal("Consensus should be able to shutdown several times")
}
cleanRaft(p2pPort)
cc = testingConsensus(t, p2pPort)
err = cc.Shutdown()
if err != nil {
t.Fatal("Consensus cannot shutdown:", err)
}
cleanRaft(p2pPort)
}
func TestConsensusPin(t *testing.T) {
cc := testingConsensus(t, p2pPort)
defer cleanRaft(p2pPort) // Remember defer runs in LIFO order
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1})
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
st, err := cc.State()
if err != nil {
t.Fatal("error gettinng state:", err)
}
pins := st.List()
if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 {
t.Error("the added pin should be in the state")
}
}
func TestConsensusUnpin(t *testing.T) {
cc := testingConsensus(t, p2pPort)
defer cleanRaft(p2pPort)
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid2)
err := cc.LogUnpin(api.PinCid(c))
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
}
func TestConsensusAddPeer(t *testing.T) {
cc := testingConsensus(t, p2pPort)
cc2 := testingConsensus(t, p2pPortAlt)
t.Log(cc.host.ID().Pretty())
t.Log(cc2.host.ID().Pretty())
defer cleanRaft(p2pPort)
defer cleanRaft(p2pPortAlt)
defer cc.Shutdown()
defer cc2.Shutdown()
addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPortAlt))
cc.host.Peerstore().AddAddr(cc2.host.ID(), addr, peerstore.TempAddrTTL)
err := cc.AddPeer(cc2.host.ID())
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
err = cc2.raft.WaitForPeer(ctx, cc.host.ID().Pretty(), false)
if err != nil {
t.Fatal(err)
}
peers, err := cc2.raft.Peers()
if err != nil {
t.Fatal(err)
}
if len(peers) != 2 {
t.Error("peer was not added")
}
}
func TestConsensusRmPeer(t *testing.T) {
cc := testingConsensus(t, p2pPort)
cc2 := testingConsensus(t, p2pPortAlt)
defer cleanRaft(p2pPort)
defer cleanRaft(p2pPortAlt)
defer cc.Shutdown()
defer cc2.Shutdown()
addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPortAlt))
cc.host.Peerstore().AddAddr(cc2.host.ID(), addr, peerstore.TempAddrTTL)
err := cc.AddPeer(cc2.host.ID())
if err != nil {
t.Error("could not add peer:", err)
}
ctx, _ := context.WithTimeout(context.Background(), 20*time.Second)
err = cc.raft.WaitForPeer(ctx, cc2.host.ID().Pretty(), false)
if err != nil {
t.Fatal(err)
}
cc.raft.WaitForLeader(ctx)
c, _ := cid.Decode(test.TestCid1)
err = cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1})
if err != nil {
t.Error("could not pin after adding peer:", err)
}
time.Sleep(2 * time.Second)
// Remove unexisting peer
err = cc.RmPeer(test.TestPeerID1)
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
// Remove real peer. At least the leader can succeed
err = cc2.RmPeer(cc.host.ID())
err2 := cc.RmPeer(cc2.host.ID())
if err != nil && err2 != nil {
t.Error("could not remove peer:", err, err2)
}
err = cc.raft.WaitForPeer(ctx, cc2.host.ID().Pretty(), true)
if err != nil {
t.Fatal(err)
}
}
func TestConsensusLeader(t *testing.T) {
cc := testingConsensus(t, p2pPort)
pID := cc.host.ID()
defer cleanRaft(p2pPort)
defer cc.Shutdown()
l, err := cc.Leader()
if err != nil {
t.Fatal("No leader:", err)
}
if l != pID {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}
func TestRaftLatestSnapshot(t *testing.T) {
cc := testingConsensus(t, p2pPort)
defer cleanRaft(p2pPort)
defer cc.Shutdown()
// Make pin 1
c1, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1})
if err != nil {
t.Error("the first pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the first snapshot was not taken successfully")
}
// Make pin 2
c2, _ := cid.Decode(test.TestCid2)
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1})
if err != nil {
t.Error("the second pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the second snapshot was not taken successfully")
}
// Make pin 3
c3, _ := cid.Decode(test.TestCid3)
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1})
if err != nil {
t.Error("the third pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the third snapshot was not taken successfully")
}
// Call raft.LastState and ensure we get the correct state
snapState := mapstate.NewMapState()
r, snapExists, err := LastStateRaw(cc.config)
if !snapExists {
t.Fatal("No snapshot found by LastStateRaw")
}
if err != nil {
t.Fatal("Error while taking snapshot", err)
}
err = snapState.Restore(r)
if err != nil {
t.Fatal("Snapshot bytes returned could not restore to state")
}
pins := snapState.List()
if len(pins) != 3 {
t.Fatal("Latest snapshot not read")
}
}