ipfs-cluster/peer_manager_test.go
dgrisham 7bcf64f6b5 build succeeds, PNETs seem to work -- still need tests
License: MIT
Signed-off-by: David Grisham <dgrisham@mines.edu>
2017-06-27 10:30:15 -06:00

314 lines
7.1 KiB
Go

package ipfscluster
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
)
func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
cls := make([]*Cluster, nClusters, nClusters)
mocks := make([]*test.IpfsMock, nClusters, nClusters)
var wg sync.WaitGroup
for i := 0; i < nClusters; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
cl, m := createOnePeerCluster(t, i)
cls[i] = cl
mocks[i] = m
}(i)
}
wg.Wait()
return cls, mocks
}
func clusterAddr(c *Cluster) ma.Multiaddr {
return multiaddrJoin(c.config.ClusterAddr, c.ID().ID)
}
func TestClustersPeerAdd(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("need at least 2 nodes for this test")
}
for i := 1; i < len(clusters); i++ {
addr := clusterAddr(clusters[i])
id, err := clusters[0].PeerAdd(addr)
if err != nil {
t.Fatal(err)
}
if len(id.ClusterPeers) != i {
// ClusterPeers is originally empty and contains nodes as we add them
t.Log(id.ClusterPeers)
t.Fatal("cluster peers should be up to date with the cluster")
}
}
h, _ := cid.Decode(test.TestCid1)
err := clusters[1].Pin(api.PinCid(h))
if err != nil {
t.Fatal(err)
}
delay()
f := func(t *testing.T, c *Cluster) {
ids := c.Peers()
// check they are tracked by the peer manager
if len(ids) != nClusters {
//t.Log(ids)
t.Error("added clusters are not part of clusters")
}
// Check that they are part of the consensus
pins := c.Pins()
if len(pins) != 1 {
t.Log(pins)
t.Error("expected 1 pin everywhere")
}
if len(c.ID().ClusterPeers) != nClusters-1 {
t.Log(c.ID().ClusterPeers)
t.Error("By now cluster peers should reflect all peers")
}
// // check that its part of the configuration
// if len(c.config.ClusterPeers) != nClusters-1 {
// t.Error("expected different cluster peers in the configuration")
// }
// for _, peer := range c.config.ClusterPeers {
// if peer == nil {
// t.Error("something went wrong adding peer to config")
// }
// }
}
runF(t, clusters, f)
}
func TestClustersPeerAddBadPeer(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("need at least 2 nodes for this test")
}
// We add a cluster that has been shutdown
// (closed transports)
clusters[1].Shutdown()
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
if err == nil {
t.Error("expected an error")
}
ids := clusters[0].Peers()
if len(ids) != 1 {
t.Error("cluster should have only one member")
}
}
func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("need at least 3 nodes for this test")
}
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
ids := clusters[1].Peers()
if len(ids) != 2 {
t.Error("expected 2 peers")
}
// Now we shutdown one member of the running cluster
// and try to add someone else.
clusters[1].Shutdown()
_, err = clusters[0].PeerAdd(clusterAddr(clusters[2]))
if err == nil {
t.Error("expected an error")
}
ids = clusters[0].Peers()
if len(ids) != 2 {
t.Error("cluster should still have 2 peers")
}
}
func TestClustersPeerRemove(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
}
p := clusters[1].ID().ID
//t.Logf("remove %s from %s", p.Pretty(), clusters[0].config.ClusterPeers)
err := clusters[0].PeerRemove(p)
if err != nil {
t.Error(err)
}
delay()
f := func(t *testing.T, c *Cluster) {
if c.ID().ID == p { //This is the removed cluster
_, ok := <-c.Done()
if ok {
t.Error("removed peer should have exited")
}
// if len(c.config.ClusterPeers) != 0 {
// t.Error("cluster peers should be empty")
// }
} else {
ids := c.Peers()
if len(ids) != nClusters-1 {
t.Error("should have removed 1 peer")
}
// if len(c.config.ClusterPeers) != nClusters-1 {
// t.Log(c.config.ClusterPeers)
// t.Error("should have removed peer from config")
// }
}
}
runF(t, clusters, f)
}
func TestClusterPeerRemoveSelf(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
for i := 0; i < len(clusters); i++ {
err := clusters[i].PeerRemove(clusters[i].ID().ID)
if err != nil {
t.Error(err)
}
time.Sleep(time.Second)
_, more := <-clusters[i].Done()
if more {
t.Error("should be done")
}
}
}
func TestClustersPeerJoin(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
}
for i := 1; i < len(clusters); i++ {
err := clusters[i].Join(clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
}
}
hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(hash))
delay()
f := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
runF(t, clusters, f)
}
func TestClustersPeerJoinAllAtOnce(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
}
f := func(t *testing.T, c *Cluster) {
err := c.Join(clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
}
}
runF(t, clusters[1:], f)
hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(hash))
delay()
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
runF(t, clusters, f2)
}
func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
}
// We have a 2 node cluster and the rest of nodes join
// one of the two seeds randomly
err := clusters[1].Join(clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
}
f := func(t *testing.T, c *Cluster) {
j := rand.Intn(2)
err := c.Join(clusterAddr(clusters[j]))
if err != nil {
t.Fatal(err)
}
}
runF(t, clusters[2:], f)
hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(hash))
delay()
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
runF(t, clusters, f2)
}