From 8e6eefb7141ad7048722b147b85a0e4a13b771b6 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Wed, 15 Aug 2018 12:30:00 +0200 Subject: [PATCH] Tests: multiple fixes This fixes multiple issues in and around tests while increasing ttls and delays in 100ms. Multiple issues, including races, tests not running with consensus-crdt missing log messages and better initialization have been fixed. License: MIT Signed-off-by: Hector Sanjuan --- .travis.yml | 6 +-- Makefile | 2 +- add_test.go | 2 +- cluster.go | 7 ++- cluster_config.go | 2 +- cluster_test.go | 101 ++++++++++++++++++++---------------- clusterhost.go | 22 +++++--- config_test.go | 15 +++--- consensus/crdt/consensus.go | 2 + ipfscluster_test.go | 56 ++++++++++---------- peer_manager_test.go | 4 ++ pnet_test.go | 4 +- 12 files changed, 125 insertions(+), 98 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3b35af31..a50fb727 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,15 +25,15 @@ jobs: - stage: "Testing stage" name: "Tests (all modules) + Coverage" script: - - go test -v -failfast -coverprofile=coverage.txt -covermode=atomic ./... + - go test -v -failfast -timeout 15m -coverprofile=coverage.txt -covermode=atomic . after_success: - bash <(curl -s https://codecov.io/bash) - name: "Main Tests with crdt consensus" script: - - go test -v -failfast . -consensus crdt + - go test -v -failfast -consensus crdt . - name: "Main Tests with stateless tracker" script: - - go test -v -failfast . -tracker stateless + - go test -v -failfast -tracker stateless . - name: "Golint and go vet" script: - go get -u golang.org/x/lint/golint diff --git a/Makefile b/Makefile index bad8d56c..670d0518 100644 --- a/Makefile +++ b/Makefile @@ -71,7 +71,7 @@ test_sharness: $(sharness) @sh sharness/run-sharness-tests.sh test_problem: gx-deps - go test -timeout 20m -loglevel "DEBUG" -v -run $(problematic_test) + go test -loglevel "DEBUG" -v -run $(problematic_test) $(sharness): @echo "Downloading sharness" diff --git a/add_test.go b/add_test.go index ae47f7a9..900cf647 100644 --- a/add_test.go +++ b/add_test.go @@ -41,7 +41,7 @@ func TestAdd(t *testing.T) { t.Error(pin.Error) } if pin.Status != api.TrackerStatusPinned { - t.Error("item should be pinned") + t.Error("item should be pinned and is", pin.Status) } } diff --git a/cluster.go b/cluster.go index b514900b..4d416592 100644 --- a/cluster.go +++ b/cluster.go @@ -304,6 +304,11 @@ func (c *Cluster) alertsHandler() { case <-c.ctx.Done(): return case alrt := <-c.monitor.Alerts(): + logger.Warningf("metric alert for %s: Peer: %s.", alrt.MetricName, alrt.Peer) + if alrt.MetricName != pingMetricName { + continue // only handle ping alerts + } + cState, err := c.consensus.State(c.ctx) if err != nil { logger.Warning(err) @@ -374,7 +379,7 @@ func (c *Cluster) watchPeers() { if !hasMe { c.shutdownLock.Lock() defer c.shutdownLock.Unlock() - logger.Infof("%s: removed from raft. Initiating shutdown", c.id.Pretty()) + logger.Info("peer no longer in peerset. Initiating shutdown") c.removed = true go c.Shutdown(c.ctx) return diff --git a/cluster_config.go b/cluster_config.go index 520b37e5..b5e7c389 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -182,7 +182,7 @@ func (cfg *Config) ApplyEnvVars() error { // seem to be working ones. func (cfg *Config) Validate() error { if cfg.ListenAddr == nil { - return errors.New("cluster.listen_addr is indefined") + return errors.New("cluster.listen_multiaddress is undefined") } if cfg.StateSyncInterval <= 0 { diff --git a/cluster_test.go b/cluster_test.go index d1341824..be2a3b74 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -15,8 +15,6 @@ import ( "github.com/ipfs/ipfs-cluster/allocator/ascendalloc" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/config" - "github.com/ipfs/ipfs-cluster/consensus/raft" - "github.com/ipfs/ipfs-cluster/datastore/inmem" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/state" @@ -141,13 +139,16 @@ type mockTracer struct { } func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) { - ident, clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() + ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() ctx := context.Background() - host, pubsub, dht, err := NewClusterHost(ctx, ident, clusterCfg) - if err != nil { - t.Fatal(err) - } + host, pubsub, dht := createHost(t, ident.PrivateKey, clusterCfg.Secret, clusterCfg.ListenAddr) + + folder := filepath.Join(testsFolder, host.ID().Pretty()) + cleanState() + clusterCfg.SetBaseDir(folder) + raftCfg.DataFolder = folder + badgerCfg.Folder = filepath.Join(folder, "badger") api := &mockAPI{} proxy := &mockProxy{} @@ -155,11 +156,15 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) tracer := &mockTracer{} - store := inmem.New() - raftcon, _ := raft.NewConsensus(host, raftCfg, store, false) + store := makeStore(t, badgerCfg) + cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, false, crdtCfg) + var peersF func(context.Context) ([]peer.ID, error) + if consensus == "raft" { + peersF = cons.Peers + } psmonCfg.CheckInterval = 2 * time.Second - mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, raftcon.Peers) + mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, peersF) if err != nil { t.Fatal(err) } @@ -177,7 +182,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke dht, clusterCfg, store, - raftcon, + cons, []API{api, proxy}, ipfs, tracker, @@ -193,11 +198,8 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke return cl, api, ipfs, tracker } -func cleanRaft() { - raftDirs, _ := filepath.Glob("raftFolderFromTests*") - for _, dir := range raftDirs { - os.RemoveAll(dir) - } +func cleanState() { + os.RemoveAll(testsFolder) } func testClusterShutdown(t *testing.T) { @@ -217,9 +219,9 @@ func testClusterShutdown(t *testing.T) { func TestClusterStateSync(t *testing.T) { ctx := context.Background() - cleanRaft() + cleanState() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) c := test.Cid1 @@ -249,7 +251,7 @@ func TestClusterStateSync(t *testing.T) { func TestClusterID(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) id := cl.ID(ctx) if len(id.Addresses) == 0 { @@ -269,7 +271,7 @@ func TestClusterID(t *testing.T) { func TestClusterPin(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) c := test.Cid1 @@ -278,21 +280,26 @@ func TestClusterPin(t *testing.T) { t.Fatal("pin should have worked:", err) } - // test an error case - cl.consensus.Shutdown(ctx) - pin := api.PinCid(c) - pin.ReplicationFactorMax = 1 - pin.ReplicationFactorMin = 1 - err = cl.Pin(ctx, pin) - if err == nil { - t.Error("expected an error but things worked") + switch consensus { + case "crdt": + return + case "raft": + // test an error case + cl.consensus.Shutdown(ctx) + pin := api.PinCid(c) + pin.ReplicationFactorMax = 1 + pin.ReplicationFactorMin = 1 + err = cl.Pin(ctx, pin) + if err == nil { + t.Error("expected an error but things worked") + } } } func TestClusterPinPath(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) pin, err := cl.PinPath(ctx, &api.PinPath{Path: test.PathIPFS2}) @@ -313,7 +320,7 @@ func TestClusterPinPath(t *testing.T) { func TestAddFile(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) sth := test.NewShardingTestHelper() defer sth.Clean(t) @@ -373,7 +380,7 @@ func TestAddFile(t *testing.T) { func TestUnpinShard(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) sth := test.NewShardingTestHelper() defer sth.Clean(t) @@ -500,7 +507,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterPinMeta(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // singleShardedPin(t, cl) @@ -508,7 +515,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterUnpinShardFail(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // singleShardedPin(t, cl) @@ -532,7 +539,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterUnpinMeta(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // singleShardedPin(t, cl) @@ -577,7 +584,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterPinShardTwoParents(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // pinTwoParentsOneShard(t, cl) @@ -594,7 +601,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterUnpinShardSecondParent(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // pinTwoParentsOneShard(t, cl) @@ -627,7 +634,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterUnpinShardFirstParent(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // pinTwoParentsOneShard(t, cl) @@ -663,7 +670,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterPinTwoMethodsFail(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // // First pin normally then sharding pin fails @@ -699,7 +706,7 @@ func TestUnpinShard(t *testing.T) { // func TestClusterRePinShard(t *testing.T) { // cl, _, _, _ := testingCluster(t) -// defer cleanRaft() +// defer cleanState() // defer cl.Shutdown() // cCdag, _ := cid.Decode(test.CdagCid) @@ -735,7 +742,7 @@ func TestUnpinShard(t *testing.T) { func TestClusterPins(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) c := test.Cid1 @@ -744,6 +751,8 @@ func TestClusterPins(t *testing.T) { t.Fatal("pin should have worked:", err) } + pinDelay() + pins, err := cl.Pins(ctx) if err != nil { t.Fatal(err) @@ -759,7 +768,7 @@ func TestClusterPins(t *testing.T) { func TestClusterPinGet(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) c := test.Cid1 @@ -785,7 +794,7 @@ func TestClusterPinGet(t *testing.T) { func TestClusterUnpin(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) c := test.Cid1 @@ -816,7 +825,7 @@ func TestClusterUnpin(t *testing.T) { func TestClusterUnpinPath(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) // Unpin should error without pin being committed to state @@ -846,7 +855,7 @@ func TestClusterUnpinPath(t *testing.T) { func TestClusterPeers(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) peers := cl.Peers(ctx) if len(peers) != 1 { @@ -869,7 +878,7 @@ func TestClusterPeers(t *testing.T) { func TestVersion(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) if cl.Version() != version.Version.String() { t.Error("bad Version()") @@ -879,7 +888,7 @@ func TestVersion(t *testing.T) { func TestClusterRecoverAllLocal(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) - defer cleanRaft() + defer cleanState() defer cl.Shutdown(ctx) err := cl.Pin(ctx, api.PinCid(test.Cid1)) diff --git a/clusterhost.go b/clusterhost.go index 76558ee9..623203bd 100644 --- a/clusterhost.go +++ b/clusterhost.go @@ -13,7 +13,6 @@ import ( pnet "github.com/libp2p/go-libp2p-pnet" pubsub "github.com/libp2p/go-libp2p-pubsub" routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" - ma "github.com/multiformats/go-multiaddr" ) // NewClusterHost creates a libp2p Host with the options from the provided @@ -26,7 +25,13 @@ func NewClusterHost( cfg *Config, ) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) { - h, err := newHost(ctx, cfg.Secret, ident.PrivateKey, []ma.Multiaddr{cfg.ListenAddr}) + h, err := newHost( + ctx, + cfg.Secret, + ident.PrivateKey, + libp2p.ListenAddrs(cfg.ListenAddr), + libp2p.NATPortMap(), + ) if err != nil { return nil, nil, nil, err } @@ -46,7 +51,7 @@ func NewClusterHost( return routedHost(h, idht), psub, idht, nil } -func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, listenAddrs []ma.Multiaddr) (host.Host, error) { +func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) { var prot ipnet.Protector var err error @@ -60,12 +65,15 @@ func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, listenAddr } } + finalOpts := []libp2p.Option{ + libp2p.Identity(priv), + libp2p.PrivateNetwork(prot), + } + finalOpts = append(finalOpts, opts...) + return libp2p.New( ctx, - libp2p.Identity(priv), - libp2p.ListenAddrs(listenAddrs...), - libp2p.PrivateNetwork(prot), - libp2p.NATPortMap(), + finalOpts..., ) } diff --git a/config_test.go b/config_test.go index 3a2c73e2..95c274b3 100644 --- a/config_test.go +++ b/config_test.go @@ -18,8 +18,8 @@ import ( var testingClusterSecret, _ = DecodeClusterSecret("2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed") var testingIdentity = []byte(`{ - "id": "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA", - "private_key": "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4" + "id": "12D3KooWQiK1sYbGNnD9XtWF1sP95cawwwNy3d2WUwtP71McwUfZ", + "private_key": "CAESQJZ0wHQyoWGizG7eSATrDtTVlyyr99O8726jIu1lf2D+3VJBBAu6HXPRkbdNINBWlPMn+PK3bO6EgGGuaou8bKg=" }`) var testingClusterCfg = []byte(`{ @@ -29,11 +29,10 @@ var testingClusterCfg = []byte(`{ "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", "replication_factor": -1, - "monitor_ping_interval": "150ms", + "monitor_ping_interval": "250ms", "peer_watch_interval": "100ms", "disable_repinning": false -} -`) +}`) var testingRaftCfg = []byte(`{ "data_folder": "raftFolderFromTests", @@ -107,12 +106,12 @@ var testingTrackerCfg = []byte(` `) var testingMonCfg = []byte(`{ - "check_interval": "300ms", - "failure_threshold": 5 + "check_interval": "400ms", + "failure_threshold": 6 }`) var testingDiskInfCfg = []byte(`{ - "metric_ttl": "150ms", + "metric_ttl": "250ms", "metric_type": "freespace" }`) diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 0ca039e3..4e156b3a 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -188,6 +188,7 @@ func (css *Consensus) setup() { if err != nil { logger.Error(err) } + logger.Infof("new pin added: %s", pin.Cid) } opts.DeleteHook = func(k ds.Key) { c, err := dshelp.DsKeyToCid(k) @@ -208,6 +209,7 @@ func (css *Consensus) setup() { if err != nil { logger.Error(err) } + logger.Infof("pin removed: %s", c) } crdt, err := crdt.New( diff --git a/ipfscluster_test.go b/ipfscluster_test.go index a8613cdd..65245d0e 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -14,12 +14,6 @@ import ( "testing" "time" - crypto "github.com/libp2p/go-libp2p-crypto" - peerstore "github.com/libp2p/go-libp2p-peerstore" - pubsub "github.com/libp2p/go-libp2p-pubsub" - - dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/ipfs/ipfs-cluster/allocator/descendalloc" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/rest" @@ -37,8 +31,13 @@ import ( "github.com/ipfs/ipfs-cluster/version" ds "github.com/ipfs/go-datastore" + libp2p "github.com/libp2p/go-libp2p" + crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" + dht "github.com/libp2p/go-libp2p-kad-dht" peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + pubsub "github.com/libp2p/go-libp2p-pubsub" ma "github.com/multiformats/go-multiaddr" ) @@ -274,34 +273,37 @@ func createHosts(t *testing.T, clusterSecret []byte, nClusters int) ([]host.Host pubsubs := make([]*pubsub.PubSub, nClusters, nClusters) dhts := make([]*dht.IpfsDHT, nClusters, nClusters) - ctx := context.Background() - - clusterAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - + listen, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") for i := range hosts { priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) checkErr(t, err) - h, err := newHost(ctx, clusterSecret, priv, []ma.Multiaddr{clusterAddr}) - checkErr(t, err) - // DHT needs to be created BEFORE connecting the peers, but - // bootstrapped AFTER - d, err := newDHT(ctx, h) - checkErr(t, err) + h, p, d := createHost(t, priv, clusterSecret, listen) + hosts[i] = h dhts[i] = d - - hosts[i] = routedHost(h, d) - - // Pubsub needs to be created BEFORE connecting the peers, - // otherwise they are not picked up. - psub, err := newPubSub(ctx, hosts[i]) - checkErr(t, err) - pubsubs[i] = psub + pubsubs[i] = p } return hosts, pubsubs, dhts } +func createHost(t *testing.T, priv crypto.PrivKey, clusterSecret []byte, listen ma.Multiaddr) (host.Host, *pubsub.PubSub, *dht.IpfsDHT) { + ctx := context.Background() + h, err := newHost(ctx, clusterSecret, priv, libp2p.ListenAddrs(listen)) + checkErr(t, err) + + // DHT needs to be created BEFORE connecting the peers, but + // bootstrapped AFTER + d, err := newDHT(ctx, h) + checkErr(t, err) + + // Pubsub needs to be created BEFORE connecting the peers, + // otherwise they are not picked up. + psub, err := newPubSub(ctx, h) + checkErr(t, err) + return routedHost(h, d), psub, d +} + func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { ctx := context.Background() os.RemoveAll(testsFolder) @@ -378,6 +380,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { } waitForLeader(t, clusters) + waitForClustersHealthy(t, clusters) return clusters, ipfsMocks } @@ -433,7 +436,7 @@ func delay() { } func pinDelay() { - time.Sleep(500 * time.Millisecond) + time.Sleep(800 * time.Millisecond) } func ttlDelay() { @@ -1063,9 +1066,6 @@ func TestClustersReplicationOverall(t *testing.T) { c.config.ReplicationFactorMax = nClusters - 1 } - // wait for clusters to stablise - waitForClustersHealthy(t, clusters) - // Why is replication factor nClusters - 1? // Because that way we know that pinning nCluster // pins with an strategy like numpins/disk diff --git a/peer_manager_test.go b/peer_manager_test.go index 1eb14c25..839d2d24 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -332,6 +332,7 @@ func TestClustersPeerRemoveSelf(t *testing.T) { } } } + // potential hanging place _, more := <-clusters[i].Done() if more { t.Error("should be done") @@ -726,6 +727,9 @@ func TestClustersPeerRejoin(t *testing.T) { c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret) clusters[0] = c0 mocks[0] = m0 + + delay() + c0.consensus.Trust(ctx, clusters[1].id) err = c0.Join(ctx, clusterAddr(clusters[1])) if err != nil { diff --git a/pnet_test.go b/pnet_test.go index 8c6c11b7..15a2f8c6 100644 --- a/pnet_test.go +++ b/pnet_test.go @@ -37,7 +37,7 @@ func TestClusterSecretFormat(t *testing.T) { func TestSimplePNet(t *testing.T) { ctx := context.Background() clusters, mocks, boot := peerManagerClusters(t) - defer cleanRaft() + defer cleanState() defer shutdownClusters(t, clusters, mocks) defer boot.Close() @@ -67,7 +67,7 @@ func TestSimplePNet(t *testing.T) { // } // cl1, _ := createOnePeerCluster(t, 1, (*cl1Secret)[:]) // cl2, _ := createOnePeerCluster(t, 2, testingClusterSecret) -// defer cleanRaft() +// defer cleanState() // defer cl1.Shutdown() // defer cl2.Shutdown() // peers1 := cl1.Peers()