fc237b21d4
This enables support for testing in jenkins. Several minor adjustments have been performed to improve the probability that the tests pass, but there are still some random problems appearing with libp2p conections not becoming available or stopping working (similar to travis, but perhaps more often). MacOS and Windows builds are broken in worse ways (those issues will need to be addressed in the future). Thanks to @zenground0 and @victorbjelkholm for support! License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
570 lines
13 KiB
Go
570 lines
13 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"math/rand"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/test"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
|
|
cls := make([]*Cluster, nClusters, nClusters)
|
|
mocks := make([]*test.IpfsMock, nClusters, nClusters)
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < nClusters; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
cl, m := createOnePeerCluster(t, i, testingClusterSecret)
|
|
cls[i] = cl
|
|
mocks[i] = m
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
delay()
|
|
return cls, mocks
|
|
}
|
|
|
|
func clusterAddr(c *Cluster) ma.Multiaddr {
|
|
return multiaddrJoin(c.config.ListenAddr, c.ID().ID)
|
|
}
|
|
|
|
func TestClustersPeerAdd(t *testing.T) {
|
|
clusters, mocks := peerManagerClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 2 {
|
|
t.Skip("need at least 2 nodes for this test")
|
|
}
|
|
|
|
for i := 1; i < len(clusters); i++ {
|
|
addr := clusterAddr(clusters[i])
|
|
id, err := clusters[0].PeerAdd(addr)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if len(id.ClusterPeers) != i+1 {
|
|
// ClusterPeers is originally empty and contains nodes as we add them
|
|
t.Log(i, id.ClusterPeers)
|
|
t.Fatal("cluster peers should be up to date with the cluster")
|
|
}
|
|
}
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
err := clusters[1].Pin(api.PinCid(h))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
delay()
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
ids := c.Peers()
|
|
|
|
// check they are tracked by the peer manager
|
|
if len(ids) != nClusters {
|
|
//t.Log(ids)
|
|
t.Error("added clusters are not part of clusters")
|
|
}
|
|
|
|
// Check that they are part of the consensus
|
|
pins := c.Pins()
|
|
if len(pins) != 1 {
|
|
t.Log(pins)
|
|
t.Error("expected 1 pin everywhere")
|
|
}
|
|
|
|
if len(c.ID().ClusterPeers) != nClusters {
|
|
t.Log(c.ID().ClusterPeers)
|
|
t.Error("By now cluster peers should reflect all peers")
|
|
}
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// check that they are part of the configuration
|
|
// This only works because each peer only has one multiaddress
|
|
// (localhost)
|
|
if len(peersFromMultiaddrs(c.config.Peers)) != nClusters-1 {
|
|
t.Error(c.config.Peers)
|
|
t.Errorf("%s: expected different cluster peers in the configuration", c.id)
|
|
}
|
|
|
|
for _, peer := range c.config.Peers {
|
|
if peer == nil {
|
|
t.Error("something went wrong adding peer to config")
|
|
}
|
|
}
|
|
}
|
|
runF(t, clusters, f)
|
|
}
|
|
|
|
func TestClustersPeerAddBadPeer(t *testing.T) {
|
|
clusters, mocks := peerManagerClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 2 {
|
|
t.Skip("need at least 2 nodes for this test")
|
|
}
|
|
|
|
// We add a cluster that has been shutdown
|
|
// (closed transports)
|
|
clusters[1].Shutdown()
|
|
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
|
|
if err == nil {
|
|
t.Error("expected an error")
|
|
}
|
|
ids := clusters[0].Peers()
|
|
if len(ids) != 1 {
|
|
t.Error("cluster should have only one member")
|
|
}
|
|
}
|
|
|
|
func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
|
|
clusters, mocks := peerManagerClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 3 {
|
|
t.Skip("need at least 3 nodes for this test")
|
|
}
|
|
|
|
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
|
|
ids := clusters[1].Peers()
|
|
if len(ids) != 2 {
|
|
t.Error("expected 2 peers")
|
|
}
|
|
|
|
// Now we shutdown one member of the running cluster
|
|
// and try to add someone else.
|
|
err = clusters[1].Shutdown()
|
|
if err != nil {
|
|
t.Error("Shutdown should be clean: ", err)
|
|
}
|
|
_, err = clusters[0].PeerAdd(clusterAddr(clusters[2]))
|
|
|
|
if err == nil {
|
|
t.Error("expected an error")
|
|
}
|
|
|
|
ids = clusters[0].Peers()
|
|
if len(ids) != 2 {
|
|
t.Error("cluster should still have 2 peers")
|
|
}
|
|
}
|
|
|
|
func TestClustersPeerRemove(t *testing.T) {
|
|
clusters, mocks := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 2 {
|
|
t.Skip("test needs at least 2 clusters")
|
|
}
|
|
|
|
p := clusters[1].ID().ID
|
|
//t.Logf("remove %s from %s", p.Pretty(), clusters[0].config.ClusterPeers)
|
|
err := clusters[0].PeerRemove(p)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
delay()
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
if c.ID().ID == p { //This is the removed cluster
|
|
_, ok := <-c.Done()
|
|
if ok {
|
|
t.Error("removed peer should have exited")
|
|
}
|
|
// if len(c.config.ClusterPeers) != 0 {
|
|
// t.Error("cluster peers should be empty")
|
|
// }
|
|
} else {
|
|
ids := c.Peers()
|
|
if len(ids) != nClusters-1 {
|
|
t.Error("should have removed 1 peer")
|
|
}
|
|
// if len(c.config.ClusterPeers) != nClusters-1 {
|
|
// t.Log(c.config.ClusterPeers)
|
|
// t.Error("should have removed peer from config")
|
|
// }
|
|
}
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
}
|
|
|
|
func TestClustersPeerRemoveSelf(t *testing.T) {
|
|
// this test hangs sometimes if there are problems
|
|
clusters, mocks := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
for i := 0; i < len(clusters); i++ {
|
|
peers := clusters[i].Peers()
|
|
t.Logf("Current cluster size: %d", len(peers))
|
|
if len(peers) != (len(clusters) - i) {
|
|
t.Fatal("Previous peers not removed correctly")
|
|
}
|
|
err := clusters[i].PeerRemove(clusters[i].ID().ID)
|
|
// Last peer member won't be able to remove itself
|
|
// In this case, we shut it down.
|
|
if err != nil {
|
|
if i != len(clusters)-1 { //not last
|
|
t.Error(err)
|
|
} else {
|
|
err := clusters[i].Shutdown()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
}
|
|
_, more := <-clusters[i].Done()
|
|
if more {
|
|
t.Error("should be done")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClustersPeerRemoveLeader(t *testing.T) {
|
|
// this test is like the one above, except it always
|
|
// removes the current leader.
|
|
// this test hangs sometimes if there are problems
|
|
clusters, mocks := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
findLeader := func() *Cluster {
|
|
var l peer.ID
|
|
for _, c := range clusters {
|
|
if !c.shutdownB {
|
|
waitForLeader(t, clusters)
|
|
l, _ = c.consensus.Leader()
|
|
}
|
|
}
|
|
for _, c := range clusters {
|
|
if c.id == l {
|
|
return c
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
for i := 0; i < len(clusters); i++ {
|
|
leader := findLeader()
|
|
peers := leader.Peers()
|
|
t.Logf("Current cluster size: %d", len(peers))
|
|
if len(peers) != (len(clusters) - i) {
|
|
t.Fatal("Previous peers not removed correctly")
|
|
}
|
|
err := leader.PeerRemove(leader.id)
|
|
// Last peer member won't be able to remove itself
|
|
// In this case, we shut it down.
|
|
if err != nil {
|
|
if i != len(clusters)-1 { //not last
|
|
t.Error(err)
|
|
} else {
|
|
err := leader.Shutdown()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
}
|
|
_, more := <-leader.Done()
|
|
if more {
|
|
t.Error("should be done")
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|
clusters, mocks := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 3 {
|
|
t.Skip("test needs at least 3 clusters")
|
|
}
|
|
|
|
// Adjust the replication factor for re-allocation
|
|
for _, c := range clusters {
|
|
c.config.ReplicationFactor = nClusters - 1
|
|
}
|
|
|
|
// We choose to remove the leader, to make things even more interesting
|
|
leaderID, err := clusters[0].consensus.Leader()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var leader *Cluster
|
|
var leaderi int
|
|
for i, cl := range clusters {
|
|
if id := cl.ID().ID; id == leaderID {
|
|
leader = cl
|
|
leaderi = i
|
|
break
|
|
}
|
|
}
|
|
if leader == nil {
|
|
t.Fatal("did not find a leader?")
|
|
}
|
|
|
|
leaderMock := mocks[leaderi]
|
|
|
|
// Remove leader from set
|
|
clusters = append(clusters[:leaderi], clusters[leaderi+1:]...)
|
|
mocks = append(mocks[:leaderi], mocks[leaderi+1:]...)
|
|
defer leader.Shutdown()
|
|
defer leaderMock.Close()
|
|
|
|
tmpCid, _ := cid.Decode(test.TestCid1)
|
|
prefix := tmpCid.Prefix()
|
|
|
|
// Pin nCluster random pins. This ensures each peer will
|
|
// pin the same number of Cids.
|
|
for i := 0; i < nClusters; i++ {
|
|
h, err := prefix.Sum(randomBytes())
|
|
checkErr(t, err)
|
|
err = leader.Pin(api.PinCid(h))
|
|
checkErr(t, err)
|
|
time.Sleep(time.Second) // time to update the metrics
|
|
}
|
|
|
|
delay()
|
|
|
|
// At this point, all peers must have 1 pin associated to them.
|
|
// Find out which pin is associated to leader.
|
|
interestingCids := []*cid.Cid{}
|
|
|
|
pins := leader.Pins()
|
|
if len(pins) != nClusters {
|
|
t.Fatal("expected number of tracked pins to be nClusters")
|
|
}
|
|
for _, p := range pins {
|
|
if containsPeer(p.Allocations, leaderID) {
|
|
//t.Logf("%s pins %s", leaderID, p.Cid)
|
|
interestingCids = append(interestingCids, p.Cid)
|
|
}
|
|
}
|
|
|
|
if len(interestingCids) != nClusters-1 {
|
|
//t.Fatal("The number of allocated Cids is not expected")
|
|
t.Fatalf("Expected %d allocated CIDs but got %d", nClusters-1,
|
|
len(interestingCids))
|
|
}
|
|
|
|
// Now the leader removes itself
|
|
err = leader.PeerRemove(leaderID)
|
|
if err != nil {
|
|
t.Fatal("error removing peer:", err)
|
|
}
|
|
|
|
time.Sleep(2 * time.Second)
|
|
waitForLeader(t, clusters)
|
|
delay()
|
|
|
|
for _, icid := range interestingCids {
|
|
// Now check that the allocations are new.
|
|
newPin, err := clusters[1].PinGet(icid)
|
|
if err != nil {
|
|
t.Fatal("error getting the new allocations for", icid)
|
|
}
|
|
if containsPeer(newPin.Allocations, leaderID) {
|
|
t.Fatal("pin should not be allocated to the removed peer")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestClustersPeerJoin(t *testing.T) {
|
|
clusters, mocks := peerManagerClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 3 {
|
|
t.Skip("test needs at least 3 clusters")
|
|
}
|
|
|
|
for i := 1; i < len(clusters); i++ {
|
|
err := clusters[i].Join(clusterAddr(clusters[0]))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
hash, _ := cid.Decode(test.TestCid1)
|
|
clusters[0].Pin(api.PinCid(hash))
|
|
delay()
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
peers := c.Peers()
|
|
if len(peers) != nClusters {
|
|
t.Error("all peers should be connected")
|
|
}
|
|
pins := c.Pins()
|
|
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
|
|
t.Error("all peers should have pinned the cid")
|
|
}
|
|
}
|
|
runF(t, clusters, f)
|
|
}
|
|
|
|
func TestClustersPeerJoinAllAtOnce(t *testing.T) {
|
|
clusters, mocks := peerManagerClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 2 {
|
|
t.Skip("test needs at least 2 clusters")
|
|
}
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
err := c.Join(clusterAddr(clusters[0]))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
runF(t, clusters[1:], f)
|
|
|
|
hash, _ := cid.Decode(test.TestCid1)
|
|
clusters[0].Pin(api.PinCid(hash))
|
|
delay()
|
|
|
|
f2 := func(t *testing.T, c *Cluster) {
|
|
peers := c.Peers()
|
|
if len(peers) != nClusters {
|
|
t.Error("all peers should be connected")
|
|
}
|
|
pins := c.Pins()
|
|
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
|
|
t.Error("all peers should have pinned the cid")
|
|
}
|
|
}
|
|
runF(t, clusters, f2)
|
|
}
|
|
|
|
func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
|
|
clusters, mocks := peerManagerClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
if len(clusters) < 3 {
|
|
t.Skip("test needs at least 3 clusters")
|
|
}
|
|
|
|
// We have a 2 node cluster and the rest of nodes join
|
|
// one of the two seeds randomly
|
|
|
|
err := clusters[1].Join(clusterAddr(clusters[0]))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
j := rand.Intn(2)
|
|
err := c.Join(clusterAddr(clusters[j]))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
runF(t, clusters[2:], f)
|
|
|
|
hash, _ := cid.Decode(test.TestCid1)
|
|
clusters[0].Pin(api.PinCid(hash))
|
|
delay()
|
|
|
|
f2 := func(t *testing.T, c *Cluster) {
|
|
peers := c.Peers()
|
|
if len(peers) != nClusters {
|
|
peersIds := []peer.ID{}
|
|
for _, p := range peers {
|
|
peersIds = append(peersIds, p.ID)
|
|
}
|
|
t.Errorf("%s sees %d peers: %s", c.id, len(peers), peersIds)
|
|
}
|
|
pins := c.Pins()
|
|
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
|
|
t.Error("all peers should have pinned the cid")
|
|
}
|
|
}
|
|
runF(t, clusters, f2)
|
|
}
|
|
|
|
// Tests that a peer catches up on the state correctly after rejoining
|
|
func TestClustersPeerRejoin(t *testing.T) {
|
|
clusters, mocks := peerManagerClusters(t)
|
|
defer shutdownClusters(t, clusters, mocks)
|
|
|
|
// pin something in c0
|
|
pin1, _ := cid.Decode(test.TestCid1)
|
|
err := clusters[0].Pin(api.PinCid(pin1))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// add all clusters
|
|
for i := 1; i < len(clusters); i++ {
|
|
addr := clusterAddr(clusters[i])
|
|
_, err := clusters[0].PeerAdd(addr)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
delay()
|
|
|
|
// all added peers should have the content
|
|
for i := 1; i < len(clusters); i++ {
|
|
pinfo := clusters[i].tracker.Status(pin1)
|
|
if pinfo.Status != api.TrackerStatusPinned {
|
|
t.Error("Added peers should pin the content")
|
|
}
|
|
}
|
|
|
|
clusters[0].config.LeaveOnShutdown = true
|
|
err = clusters[0].Shutdown()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
mocks[0].Close()
|
|
|
|
delay()
|
|
|
|
// Forget peer so we can re-add one in same address/port
|
|
f := func(t *testing.T, c *Cluster) {
|
|
c.peerManager.rmPeer(clusters[0].id)
|
|
}
|
|
runF(t, clusters[1:], f)
|
|
|
|
// Pin something on the rest
|
|
pin2, _ := cid.Decode(test.TestCid2)
|
|
err = clusters[1].Pin(api.PinCid(pin2))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
delay()
|
|
|
|
// Rejoin c0
|
|
c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret)
|
|
clusters[0] = c0
|
|
mocks[0] = m0
|
|
addr := clusterAddr(c0)
|
|
_, err = clusters[1].PeerAdd(addr)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
delay()
|
|
|
|
pinfo := clusters[0].tracker.Status(pin2)
|
|
if pinfo.Status != api.TrackerStatusPinned {
|
|
t.Error("re-joined cluster should have caught up")
|
|
}
|
|
|
|
pinfo = clusters[0].tracker.Status(pin1)
|
|
if pinfo.Status != api.TrackerStatusPinned {
|
|
t.Error("re-joined cluster should have original pin")
|
|
}
|
|
}
|