Tests: multiple fixes

This fixes multiple issues in and around tests while
increasing ttls and delays in 100ms. Multiple issues, including
races, tests not running with consensus-crdt missing log messages
and better initialization have been fixed.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-08-15 12:30:00 +02:00
parent 9692e368f3
commit 8e6eefb714
12 changed files with 125 additions and 98 deletions

View File

@ -25,15 +25,15 @@ jobs:
- stage: "Testing stage" - stage: "Testing stage"
name: "Tests (all modules) + Coverage" name: "Tests (all modules) + Coverage"
script: script:
- go test -v -failfast -coverprofile=coverage.txt -covermode=atomic ./... - go test -v -failfast -timeout 15m -coverprofile=coverage.txt -covermode=atomic .
after_success: after_success:
- bash <(curl -s https://codecov.io/bash) - bash <(curl -s https://codecov.io/bash)
- name: "Main Tests with crdt consensus" - name: "Main Tests with crdt consensus"
script: script:
- go test -v -failfast . -consensus crdt - go test -v -failfast -consensus crdt .
- name: "Main Tests with stateless tracker" - name: "Main Tests with stateless tracker"
script: script:
- go test -v -failfast . -tracker stateless - go test -v -failfast -tracker stateless .
- name: "Golint and go vet" - name: "Golint and go vet"
script: script:
- go get -u golang.org/x/lint/golint - go get -u golang.org/x/lint/golint

View File

@ -71,7 +71,7 @@ test_sharness: $(sharness)
@sh sharness/run-sharness-tests.sh @sh sharness/run-sharness-tests.sh
test_problem: gx-deps test_problem: gx-deps
go test -timeout 20m -loglevel "DEBUG" -v -run $(problematic_test) go test -loglevel "DEBUG" -v -run $(problematic_test)
$(sharness): $(sharness):
@echo "Downloading sharness" @echo "Downloading sharness"

View File

@ -41,7 +41,7 @@ func TestAdd(t *testing.T) {
t.Error(pin.Error) t.Error(pin.Error)
} }
if pin.Status != api.TrackerStatusPinned { if pin.Status != api.TrackerStatusPinned {
t.Error("item should be pinned") t.Error("item should be pinned and is", pin.Status)
} }
} }

View File

@ -304,6 +304,11 @@ func (c *Cluster) alertsHandler() {
case <-c.ctx.Done(): case <-c.ctx.Done():
return return
case alrt := <-c.monitor.Alerts(): case alrt := <-c.monitor.Alerts():
logger.Warningf("metric alert for %s: Peer: %s.", alrt.MetricName, alrt.Peer)
if alrt.MetricName != pingMetricName {
continue // only handle ping alerts
}
cState, err := c.consensus.State(c.ctx) cState, err := c.consensus.State(c.ctx)
if err != nil { if err != nil {
logger.Warning(err) logger.Warning(err)
@ -374,7 +379,7 @@ func (c *Cluster) watchPeers() {
if !hasMe { if !hasMe {
c.shutdownLock.Lock() c.shutdownLock.Lock()
defer c.shutdownLock.Unlock() defer c.shutdownLock.Unlock()
logger.Infof("%s: removed from raft. Initiating shutdown", c.id.Pretty()) logger.Info("peer no longer in peerset. Initiating shutdown")
c.removed = true c.removed = true
go c.Shutdown(c.ctx) go c.Shutdown(c.ctx)
return return

View File

@ -182,7 +182,7 @@ func (cfg *Config) ApplyEnvVars() error {
// seem to be working ones. // seem to be working ones.
func (cfg *Config) Validate() error { func (cfg *Config) Validate() error {
if cfg.ListenAddr == nil { if cfg.ListenAddr == nil {
return errors.New("cluster.listen_addr is indefined") return errors.New("cluster.listen_multiaddress is undefined")
} }
if cfg.StateSyncInterval <= 0 { if cfg.StateSyncInterval <= 0 {

View File

@ -15,8 +15,6 @@ import (
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc" "github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/config" "github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/datastore/inmem"
"github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state"
@ -141,13 +139,16 @@ type mockTracer struct {
} }
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) { func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
ident, clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ctx := context.Background() ctx := context.Background()
host, pubsub, dht, err := NewClusterHost(ctx, ident, clusterCfg) host, pubsub, dht := createHost(t, ident.PrivateKey, clusterCfg.Secret, clusterCfg.ListenAddr)
if err != nil {
t.Fatal(err) folder := filepath.Join(testsFolder, host.ID().Pretty())
} cleanState()
clusterCfg.SetBaseDir(folder)
raftCfg.DataFolder = folder
badgerCfg.Folder = filepath.Join(folder, "badger")
api := &mockAPI{} api := &mockAPI{}
proxy := &mockProxy{} proxy := &mockProxy{}
@ -155,11 +156,15 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
tracer := &mockTracer{} tracer := &mockTracer{}
store := inmem.New() store := makeStore(t, badgerCfg)
raftcon, _ := raft.NewConsensus(host, raftCfg, store, false) cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, false, crdtCfg)
var peersF func(context.Context) ([]peer.ID, error)
if consensus == "raft" {
peersF = cons.Peers
}
psmonCfg.CheckInterval = 2 * time.Second psmonCfg.CheckInterval = 2 * time.Second
mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, raftcon.Peers) mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, peersF)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -177,7 +182,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
dht, dht,
clusterCfg, clusterCfg,
store, store,
raftcon, cons,
[]API{api, proxy}, []API{api, proxy},
ipfs, ipfs,
tracker, tracker,
@ -193,11 +198,8 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
return cl, api, ipfs, tracker return cl, api, ipfs, tracker
} }
func cleanRaft() { func cleanState() {
raftDirs, _ := filepath.Glob("raftFolderFromTests*") os.RemoveAll(testsFolder)
for _, dir := range raftDirs {
os.RemoveAll(dir)
}
} }
func testClusterShutdown(t *testing.T) { func testClusterShutdown(t *testing.T) {
@ -217,9 +219,9 @@ func testClusterShutdown(t *testing.T) {
func TestClusterStateSync(t *testing.T) { func TestClusterStateSync(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cleanRaft() cleanState()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
c := test.Cid1 c := test.Cid1
@ -249,7 +251,7 @@ func TestClusterStateSync(t *testing.T) {
func TestClusterID(t *testing.T) { func TestClusterID(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
id := cl.ID(ctx) id := cl.ID(ctx)
if len(id.Addresses) == 0 { if len(id.Addresses) == 0 {
@ -269,7 +271,7 @@ func TestClusterID(t *testing.T) {
func TestClusterPin(t *testing.T) { func TestClusterPin(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
c := test.Cid1 c := test.Cid1
@ -278,21 +280,26 @@ func TestClusterPin(t *testing.T) {
t.Fatal("pin should have worked:", err) t.Fatal("pin should have worked:", err)
} }
// test an error case switch consensus {
cl.consensus.Shutdown(ctx) case "crdt":
pin := api.PinCid(c) return
pin.ReplicationFactorMax = 1 case "raft":
pin.ReplicationFactorMin = 1 // test an error case
err = cl.Pin(ctx, pin) cl.consensus.Shutdown(ctx)
if err == nil { pin := api.PinCid(c)
t.Error("expected an error but things worked") pin.ReplicationFactorMax = 1
pin.ReplicationFactorMin = 1
err = cl.Pin(ctx, pin)
if err == nil {
t.Error("expected an error but things worked")
}
} }
} }
func TestClusterPinPath(t *testing.T) { func TestClusterPinPath(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
pin, err := cl.PinPath(ctx, &api.PinPath{Path: test.PathIPFS2}) pin, err := cl.PinPath(ctx, &api.PinPath{Path: test.PathIPFS2})
@ -313,7 +320,7 @@ func TestClusterPinPath(t *testing.T) {
func TestAddFile(t *testing.T) { func TestAddFile(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
sth := test.NewShardingTestHelper() sth := test.NewShardingTestHelper()
defer sth.Clean(t) defer sth.Clean(t)
@ -373,7 +380,7 @@ func TestAddFile(t *testing.T) {
func TestUnpinShard(t *testing.T) { func TestUnpinShard(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
sth := test.NewShardingTestHelper() sth := test.NewShardingTestHelper()
defer sth.Clean(t) defer sth.Clean(t)
@ -500,7 +507,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterPinMeta(t *testing.T) { // func TestClusterPinMeta(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// singleShardedPin(t, cl) // singleShardedPin(t, cl)
@ -508,7 +515,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterUnpinShardFail(t *testing.T) { // func TestClusterUnpinShardFail(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// singleShardedPin(t, cl) // singleShardedPin(t, cl)
@ -532,7 +539,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterUnpinMeta(t *testing.T) { // func TestClusterUnpinMeta(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// singleShardedPin(t, cl) // singleShardedPin(t, cl)
@ -577,7 +584,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterPinShardTwoParents(t *testing.T) { // func TestClusterPinShardTwoParents(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// pinTwoParentsOneShard(t, cl) // pinTwoParentsOneShard(t, cl)
@ -594,7 +601,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterUnpinShardSecondParent(t *testing.T) { // func TestClusterUnpinShardSecondParent(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// pinTwoParentsOneShard(t, cl) // pinTwoParentsOneShard(t, cl)
@ -627,7 +634,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterUnpinShardFirstParent(t *testing.T) { // func TestClusterUnpinShardFirstParent(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// pinTwoParentsOneShard(t, cl) // pinTwoParentsOneShard(t, cl)
@ -663,7 +670,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterPinTwoMethodsFail(t *testing.T) { // func TestClusterPinTwoMethodsFail(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// // First pin normally then sharding pin fails // // First pin normally then sharding pin fails
@ -699,7 +706,7 @@ func TestUnpinShard(t *testing.T) {
// func TestClusterRePinShard(t *testing.T) { // func TestClusterRePinShard(t *testing.T) {
// cl, _, _, _ := testingCluster(t) // cl, _, _, _ := testingCluster(t)
// defer cleanRaft() // defer cleanState()
// defer cl.Shutdown() // defer cl.Shutdown()
// cCdag, _ := cid.Decode(test.CdagCid) // cCdag, _ := cid.Decode(test.CdagCid)
@ -735,7 +742,7 @@ func TestUnpinShard(t *testing.T) {
func TestClusterPins(t *testing.T) { func TestClusterPins(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
c := test.Cid1 c := test.Cid1
@ -744,6 +751,8 @@ func TestClusterPins(t *testing.T) {
t.Fatal("pin should have worked:", err) t.Fatal("pin should have worked:", err)
} }
pinDelay()
pins, err := cl.Pins(ctx) pins, err := cl.Pins(ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -759,7 +768,7 @@ func TestClusterPins(t *testing.T) {
func TestClusterPinGet(t *testing.T) { func TestClusterPinGet(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
c := test.Cid1 c := test.Cid1
@ -785,7 +794,7 @@ func TestClusterPinGet(t *testing.T) {
func TestClusterUnpin(t *testing.T) { func TestClusterUnpin(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
c := test.Cid1 c := test.Cid1
@ -816,7 +825,7 @@ func TestClusterUnpin(t *testing.T) {
func TestClusterUnpinPath(t *testing.T) { func TestClusterUnpinPath(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
// Unpin should error without pin being committed to state // Unpin should error without pin being committed to state
@ -846,7 +855,7 @@ func TestClusterUnpinPath(t *testing.T) {
func TestClusterPeers(t *testing.T) { func TestClusterPeers(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
peers := cl.Peers(ctx) peers := cl.Peers(ctx)
if len(peers) != 1 { if len(peers) != 1 {
@ -869,7 +878,7 @@ func TestClusterPeers(t *testing.T) {
func TestVersion(t *testing.T) { func TestVersion(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
if cl.Version() != version.Version.String() { if cl.Version() != version.Version.String() {
t.Error("bad Version()") t.Error("bad Version()")
@ -879,7 +888,7 @@ func TestVersion(t *testing.T) {
func TestClusterRecoverAllLocal(t *testing.T) { func TestClusterRecoverAllLocal(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cl, _, _, _ := testingCluster(t) cl, _, _, _ := testingCluster(t)
defer cleanRaft() defer cleanState()
defer cl.Shutdown(ctx) defer cl.Shutdown(ctx)
err := cl.Pin(ctx, api.PinCid(test.Cid1)) err := cl.Pin(ctx, api.PinCid(test.Cid1))

View File

@ -13,7 +13,6 @@ import (
pnet "github.com/libp2p/go-libp2p-pnet" pnet "github.com/libp2p/go-libp2p-pnet"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
) )
// NewClusterHost creates a libp2p Host with the options from the provided // NewClusterHost creates a libp2p Host with the options from the provided
@ -26,7 +25,13 @@ func NewClusterHost(
cfg *Config, cfg *Config,
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) { ) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) {
h, err := newHost(ctx, cfg.Secret, ident.PrivateKey, []ma.Multiaddr{cfg.ListenAddr}) h, err := newHost(
ctx,
cfg.Secret,
ident.PrivateKey,
libp2p.ListenAddrs(cfg.ListenAddr),
libp2p.NATPortMap(),
)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -46,7 +51,7 @@ func NewClusterHost(
return routedHost(h, idht), psub, idht, nil return routedHost(h, idht), psub, idht, nil
} }
func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, listenAddrs []ma.Multiaddr) (host.Host, error) { func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
var prot ipnet.Protector var prot ipnet.Protector
var err error var err error
@ -60,12 +65,15 @@ func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, listenAddr
} }
} }
finalOpts := []libp2p.Option{
libp2p.Identity(priv),
libp2p.PrivateNetwork(prot),
}
finalOpts = append(finalOpts, opts...)
return libp2p.New( return libp2p.New(
ctx, ctx,
libp2p.Identity(priv), finalOpts...,
libp2p.ListenAddrs(listenAddrs...),
libp2p.PrivateNetwork(prot),
libp2p.NATPortMap(),
) )
} }

View File

@ -18,8 +18,8 @@ import (
var testingClusterSecret, _ = DecodeClusterSecret("2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed") var testingClusterSecret, _ = DecodeClusterSecret("2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed")
var testingIdentity = []byte(`{ var testingIdentity = []byte(`{
"id": "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA", "id": "12D3KooWQiK1sYbGNnD9XtWF1sP95cawwwNy3d2WUwtP71McwUfZ",
"private_key": "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4" "private_key": "CAESQJZ0wHQyoWGizG7eSATrDtTVlyyr99O8726jIu1lf2D+3VJBBAu6HXPRkbdNINBWlPMn+PK3bO6EgGGuaou8bKg="
}`) }`)
var testingClusterCfg = []byte(`{ var testingClusterCfg = []byte(`{
@ -29,11 +29,10 @@ var testingClusterCfg = []byte(`{
"state_sync_interval": "1m0s", "state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s", "ipfs_sync_interval": "2m10s",
"replication_factor": -1, "replication_factor": -1,
"monitor_ping_interval": "150ms", "monitor_ping_interval": "250ms",
"peer_watch_interval": "100ms", "peer_watch_interval": "100ms",
"disable_repinning": false "disable_repinning": false
} }`)
`)
var testingRaftCfg = []byte(`{ var testingRaftCfg = []byte(`{
"data_folder": "raftFolderFromTests", "data_folder": "raftFolderFromTests",
@ -107,12 +106,12 @@ var testingTrackerCfg = []byte(`
`) `)
var testingMonCfg = []byte(`{ var testingMonCfg = []byte(`{
"check_interval": "300ms", "check_interval": "400ms",
"failure_threshold": 5 "failure_threshold": 6
}`) }`)
var testingDiskInfCfg = []byte(`{ var testingDiskInfCfg = []byte(`{
"metric_ttl": "150ms", "metric_ttl": "250ms",
"metric_type": "freespace" "metric_type": "freespace"
}`) }`)

View File

@ -188,6 +188,7 @@ func (css *Consensus) setup() {
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
} }
logger.Infof("new pin added: %s", pin.Cid)
} }
opts.DeleteHook = func(k ds.Key) { opts.DeleteHook = func(k ds.Key) {
c, err := dshelp.DsKeyToCid(k) c, err := dshelp.DsKeyToCid(k)
@ -208,6 +209,7 @@ func (css *Consensus) setup() {
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
} }
logger.Infof("pin removed: %s", c)
} }
crdt, err := crdt.New( crdt, err := crdt.New(

View File

@ -14,12 +14,6 @@ import (
"testing" "testing"
"time" "time"
crypto "github.com/libp2p/go-libp2p-crypto"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc" "github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest" "github.com/ipfs/ipfs-cluster/api/rest"
@ -37,8 +31,13 @@ import (
"github.com/ipfs/ipfs-cluster/version" "github.com/ipfs/ipfs-cluster/version"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
@ -274,34 +273,37 @@ func createHosts(t *testing.T, clusterSecret []byte, nClusters int) ([]host.Host
pubsubs := make([]*pubsub.PubSub, nClusters, nClusters) pubsubs := make([]*pubsub.PubSub, nClusters, nClusters)
dhts := make([]*dht.IpfsDHT, nClusters, nClusters) dhts := make([]*dht.IpfsDHT, nClusters, nClusters)
ctx := context.Background() listen, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
clusterAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
for i := range hosts { for i := range hosts {
priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048)
checkErr(t, err) checkErr(t, err)
h, err := newHost(ctx, clusterSecret, priv, []ma.Multiaddr{clusterAddr})
checkErr(t, err)
// DHT needs to be created BEFORE connecting the peers, but h, p, d := createHost(t, priv, clusterSecret, listen)
// bootstrapped AFTER hosts[i] = h
d, err := newDHT(ctx, h)
checkErr(t, err)
dhts[i] = d dhts[i] = d
pubsubs[i] = p
hosts[i] = routedHost(h, d)
// Pubsub needs to be created BEFORE connecting the peers,
// otherwise they are not picked up.
psub, err := newPubSub(ctx, hosts[i])
checkErr(t, err)
pubsubs[i] = psub
} }
return hosts, pubsubs, dhts return hosts, pubsubs, dhts
} }
func createHost(t *testing.T, priv crypto.PrivKey, clusterSecret []byte, listen ma.Multiaddr) (host.Host, *pubsub.PubSub, *dht.IpfsDHT) {
ctx := context.Background()
h, err := newHost(ctx, clusterSecret, priv, libp2p.ListenAddrs(listen))
checkErr(t, err)
// DHT needs to be created BEFORE connecting the peers, but
// bootstrapped AFTER
d, err := newDHT(ctx, h)
checkErr(t, err)
// Pubsub needs to be created BEFORE connecting the peers,
// otherwise they are not picked up.
psub, err := newPubSub(ctx, h)
checkErr(t, err)
return routedHost(h, d), psub, d
}
func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
ctx := context.Background() ctx := context.Background()
os.RemoveAll(testsFolder) os.RemoveAll(testsFolder)
@ -378,6 +380,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
} }
waitForLeader(t, clusters) waitForLeader(t, clusters)
waitForClustersHealthy(t, clusters)
return clusters, ipfsMocks return clusters, ipfsMocks
} }
@ -433,7 +436,7 @@ func delay() {
} }
func pinDelay() { func pinDelay() {
time.Sleep(500 * time.Millisecond) time.Sleep(800 * time.Millisecond)
} }
func ttlDelay() { func ttlDelay() {
@ -1063,9 +1066,6 @@ func TestClustersReplicationOverall(t *testing.T) {
c.config.ReplicationFactorMax = nClusters - 1 c.config.ReplicationFactorMax = nClusters - 1
} }
// wait for clusters to stablise
waitForClustersHealthy(t, clusters)
// Why is replication factor nClusters - 1? // Why is replication factor nClusters - 1?
// Because that way we know that pinning nCluster // Because that way we know that pinning nCluster
// pins with an strategy like numpins/disk // pins with an strategy like numpins/disk

View File

@ -332,6 +332,7 @@ func TestClustersPeerRemoveSelf(t *testing.T) {
} }
} }
} }
// potential hanging place
_, more := <-clusters[i].Done() _, more := <-clusters[i].Done()
if more { if more {
t.Error("should be done") t.Error("should be done")
@ -726,6 +727,9 @@ func TestClustersPeerRejoin(t *testing.T) {
c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret) c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret)
clusters[0] = c0 clusters[0] = c0
mocks[0] = m0 mocks[0] = m0
delay()
c0.consensus.Trust(ctx, clusters[1].id) c0.consensus.Trust(ctx, clusters[1].id)
err = c0.Join(ctx, clusterAddr(clusters[1])) err = c0.Join(ctx, clusterAddr(clusters[1]))
if err != nil { if err != nil {

View File

@ -37,7 +37,7 @@ func TestClusterSecretFormat(t *testing.T) {
func TestSimplePNet(t *testing.T) { func TestSimplePNet(t *testing.T) {
ctx := context.Background() ctx := context.Background()
clusters, mocks, boot := peerManagerClusters(t) clusters, mocks, boot := peerManagerClusters(t)
defer cleanRaft() defer cleanState()
defer shutdownClusters(t, clusters, mocks) defer shutdownClusters(t, clusters, mocks)
defer boot.Close() defer boot.Close()
@ -67,7 +67,7 @@ func TestSimplePNet(t *testing.T) {
// } // }
// cl1, _ := createOnePeerCluster(t, 1, (*cl1Secret)[:]) // cl1, _ := createOnePeerCluster(t, 1, (*cl1Secret)[:])
// cl2, _ := createOnePeerCluster(t, 2, testingClusterSecret) // cl2, _ := createOnePeerCluster(t, 2, testingClusterSecret)
// defer cleanRaft() // defer cleanState()
// defer cl1.Shutdown() // defer cl1.Shutdown()
// defer cl2.Shutdown() // defer cl2.Shutdown()
// peers1 := cl1.Peers() // peers1 := cl1.Peers()