Merge pull request #350 from ipfs/feat/339-faster-tests

Feat #339: faster tests
This commit is contained in:
Hector Sanjuan 2018-04-05 18:16:41 +02:00 committed by GitHub
commit da0915a098
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 347 additions and 207 deletions

View File

@ -6,21 +6,37 @@ go:
- '1.9' - '1.9'
services: services:
- docker - docker
before_install:
- docker pull ipfs/go-ipfs cache:
- sudo apt-get update directories:
- sudo apt-get install -y jq curl - $GOPATH/src/gx
install:
- go get golang.org/x/tools/cmd/cover install: true
- go get github.com/mattn/goveralls
- go get github.com/golang/lint/golint jobs:
- make deps include:
script: - stage: "build and test (1: tests | 2: checks | 3: docker | 4: sharness)"
- make docker script:
- make check - go get -u github.com/mattn/goveralls
- make service && make ctl && ./coverage.sh - go get -u golang.org/x/tools/cmd/cover
- make install - make deps
- make test_sharness && make clean_sharness - ./coverage.sh
- script:
- go get -u github.com/golang/lint/golint
- make deps
- make check
- make service
- make ctl
- script:
- make docker
- script:
- sudo apt-get update
- sudo apt-get install -y jq curl
- make deps
- make install
- docker pull ipfs/go-ipfs
- make test_sharness && make clean_sharness
env: env:
global: global:
- secure: M3K3y9+D933tCda7+blW3qqVV8fA6PBDRdJoQvmQc1f0XYbWinJ+bAziFp6diKkF8sMQ+cPwLMONYJuaNT2h7/PkG+sIwF0PuUo5VVCbhGmSDrn2qOjmSnfawNs8wW31f44FQA8ICka1EFZcihohoIMf0e5xZ0tXA9jqw+ngPJiRnv4zyzC3r6t4JMAZcbS9w4KTYpIev5Yj72eCvk6lGjadSVCDVXo2sVs27tNt+BSgtMXiH6Sv8GLOnN2kFspGITgivHgB/jtU6QVtFXB+cbBJJAs3lUYnzmQZ5INecbjweYll07ilwFiCVNCX67+L15gpymKGJbQggloIGyTWrAOa2TMaB/bvblzwwQZ8wE5P3Rss5L0TFkUAcdU+3BUHM+TwV4e8F9x10v1PjgWNBRJQzd1sjKKgGUBCeyCY7VeYDKn9AXI5llISgY/AAfCZwm2cbckMHZZJciMjm+U3Q1FCF+rfhlvUcMG1VEj8r9cGpmWIRjFYVm0NmpUDDNjlC3/lUfTCOOJJyM254EUw63XxabbK6EtDN1yQe8kYRcXH//2rtEwgtMBgqHVY+OOkekzGz8Ra3EBkh6jXrAQL3zKu/GwRlK7/a1OU5MQ7dWcTjbx1AQ6Zfyjg5bZ+idqPgMbqM9Zn2+OaSby8HEEXS0QeZVooDVf/6wdYO4MQ/0A= - secure: M3K3y9+D933tCda7+blW3qqVV8fA6PBDRdJoQvmQc1f0XYbWinJ+bAziFp6diKkF8sMQ+cPwLMONYJuaNT2h7/PkG+sIwF0PuUo5VVCbhGmSDrn2qOjmSnfawNs8wW31f44FQA8ICka1EFZcihohoIMf0e5xZ0tXA9jqw+ngPJiRnv4zyzC3r6t4JMAZcbS9w4KTYpIev5Yj72eCvk6lGjadSVCDVXo2sVs27tNt+BSgtMXiH6Sv8GLOnN2kFspGITgivHgB/jtU6QVtFXB+cbBJJAs3lUYnzmQZ5INecbjweYll07ilwFiCVNCX67+L15gpymKGJbQggloIGyTWrAOa2TMaB/bvblzwwQZ8wE5P3Rss5L0TFkUAcdU+3BUHM+TwV4e8F9x10v1PjgWNBRJQzd1sjKKgGUBCeyCY7VeYDKn9AXI5llISgY/AAfCZwm2cbckMHZZJciMjm+U3Q1FCF+rfhlvUcMG1VEj8r9cGpmWIRjFYVm0NmpUDDNjlC3/lUfTCOOJJyM254EUw63XxabbK6EtDN1yQe8kYRcXH//2rtEwgtMBgqHVY+OOkekzGz8Ra3EBkh6jXrAQL3zKu/GwRlK7/a1OU5MQ7dWcTjbx1AQ6Zfyjg5bZ+idqPgMbqM9Zn2+OaSby8HEEXS0QeZVooDVf/6wdYO4MQ/0A=
@ -35,6 +51,3 @@ deploy:
script: docker run -v $(pwd):$(pwd) -t snapcore/snapcraft sh -c "apt update -qq script: docker run -v $(pwd):$(pwd) -t snapcore/snapcraft sh -c "apt update -qq
&& cd $(pwd) && snapcraft && snapcraft push *.snap --release edge" && cd $(pwd) && snapcraft && snapcraft push *.snap --release edge"
skip_cleanup: true skip_cleanup: true
cache:
directories:
- $GOPATH/src/gx

View File

@ -68,7 +68,7 @@ check:
golint -set_exit_status -min_confidence 0.3 ./... golint -set_exit_status -min_confidence 0.3 ./...
test: deps test: deps
go test -timeout 20m -loglevel "CRITICAL" -v ./... go test -loglevel "CRITICAL" -v ./...
test_sharness: $(sharness) test_sharness: $(sharness)
@sh sharness/run-sharness-tests.sh @sh sharness/run-sharness-tests.sh

View File

@ -59,11 +59,12 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
priorityMetrics := make(map[peer.ID]api.Metric) priorityMetrics := make(map[peer.ID]api.Metric)
// Divide metrics between current and candidates. // Divide metrics between current and candidates.
// All metrics in metrics are valid (at least the
// moment they were compiled by the monitor)
for _, m := range metrics { for _, m := range metrics {
switch { switch {
case m.Discard() || containsPeer(blacklist, m.Peer): case containsPeer(blacklist, m.Peer):
// discard peers with invalid metrics and // discard blacklisted peers
// those in the blacklist
continue continue
case containsPeer(currentAllocs, m.Peer): case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m currentMetrics[m.Peer] = m

View File

@ -24,7 +24,7 @@ var (
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
) )
var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano) var inAMinute = time.Now().Add(time.Minute).UnixNano()
var testCases = []testcase{ var testCases = []testcase{
{ // regular sort { // regular sort

View File

@ -24,7 +24,7 @@ var (
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
) )
var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano) var inAMinute = time.Now().Add(time.Minute).UnixNano()
var testCases = []testcase{ var testCases = []testcase{
{ // regular sort { // regular sort

View File

@ -602,8 +602,8 @@ type Metric struct {
Name string Name string
Peer peer.ID // filled-in by Cluster. Peer peer.ID // filled-in by Cluster.
Value string Value string
Expire string // RFC3339Nano Expire int64 // UnixNano
Valid bool // if the metric is not valid it will be discarded Valid bool // if the metric is not valid it will be discarded
} }
// SetTTL sets Metric to expire after the given seconds // SetTTL sets Metric to expire after the given seconds
@ -615,31 +615,19 @@ func (m *Metric) SetTTL(seconds int) {
// SetTTLDuration sets Metric to expire after the given time.Duration // SetTTLDuration sets Metric to expire after the given time.Duration
func (m *Metric) SetTTLDuration(d time.Duration) { func (m *Metric) SetTTLDuration(d time.Duration) {
exp := time.Now().Add(d) exp := time.Now().Add(d)
m.Expire = exp.UTC().Format(time.RFC3339Nano) m.Expire = exp.UnixNano()
} }
// GetTTL returns the time left before the Metric expires // GetTTL returns the time left before the Metric expires
func (m *Metric) GetTTL() time.Duration { func (m *Metric) GetTTL() time.Duration {
if m.Expire == "" { expDate := time.Unix(0, m.Expire)
return 0 return expDate.Sub(time.Now())
}
exp, err := time.Parse(time.RFC3339Nano, m.Expire)
if err != nil {
panic(err)
}
return exp.Sub(time.Now())
} }
// Expired returns if the Metric has expired // Expired returns if the Metric has expired
func (m *Metric) Expired() bool { func (m *Metric) Expired() bool {
if m.Expire == "" { expDate := time.Unix(0, m.Expire)
return true return time.Now().After(expDate)
}
exp, err := time.Parse(time.RFC3339Nano, m.Expire)
if err != nil {
panic(err)
}
return time.Now().After(exp)
} }
// Discard returns if the metric not valid or has expired // Discard returns if the metric not valid or has expired

2
ci/Jenkinsfile vendored
View File

@ -1,2 +1,2 @@
golang([test: "go test -v -timeout 20m ./..."]) golang([test: "go test -v ./..."])

View File

@ -272,7 +272,6 @@ func (c *Cluster) pushInformerMetrics() {
// The following control how often to make and log // The following control how often to make and log
// a retry // a retry
retries := 0 retries := 0
retryDelay := 500 * time.Millisecond
retryWarnMod := 60 retryWarnMod := 60
for { for {
select { select {
@ -293,7 +292,7 @@ func (c *Cluster) pushInformerMetrics() {
retries++ retries++
} }
// retry in retryDelay // retry in retryDelay
timer.Reset(retryDelay) timer.Reset(metric.GetTTL() / 4)
continue continue
} }
@ -345,8 +344,7 @@ func (c *Cluster) alertsHandler() {
// detects any changes in the peerset and saves the configuration. When it // detects any changes in the peerset and saves the configuration. When it
// detects that we have been removed from the peerset, it shuts down this peer. // detects that we have been removed from the peerset, it shuts down this peer.
func (c *Cluster) watchPeers() { func (c *Cluster) watchPeers() {
// TODO: Config option? ticker := time.NewTicker(c.config.PeerWatchInterval)
ticker := time.NewTicker(5 * time.Second)
lastPeers := PeersFromMultiaddrs(c.config.Peers) lastPeers := PeersFromMultiaddrs(c.config.Peers)
for { for {
@ -462,11 +460,13 @@ This might be due to one or several causes:
if len(peers) == 1 { if len(peers) == 1 {
logger.Info(" - No other peers") logger.Info(" - No other peers")
} }
for _, p := range peers { for _, p := range peers {
if p != c.id { if p != c.id {
logger.Infof(" - %s", p.Pretty()) logger.Infof(" - %s", p.Pretty())
} }
} }
close(c.readyCh) close(c.readyCh)
c.readyB = true c.readyB = true
logger.Info("** IPFS Cluster is READY **") logger.Info("** IPFS Cluster is READY **")

View File

@ -28,6 +28,7 @@ const (
DefaultStateSyncInterval = 60 * time.Second DefaultStateSyncInterval = 60 * time.Second
DefaultIPFSSyncInterval = 130 * time.Second DefaultIPFSSyncInterval = 130 * time.Second
DefaultMonitorPingInterval = 15 * time.Second DefaultMonitorPingInterval = 15 * time.Second
DefaultPeerWatchInterval = 5 * time.Second
DefaultReplicationFactor = -1 DefaultReplicationFactor = -1
DefaultLeaveOnShutdown = false DefaultLeaveOnShutdown = false
) )
@ -104,10 +105,16 @@ type Config struct {
// possible. // possible.
ReplicationFactorMin int ReplicationFactorMin int
// MonitorPingInterval is frequency by which a cluster peer pings the // MonitorPingInterval is the frequency with which a cluster peer pings
// monitoring component. The ping metric has a TTL set to the double // the monitoring component. The ping metric has a TTL set to the double
// of this value. // of this value.
MonitorPingInterval time.Duration MonitorPingInterval time.Duration
// PeerWatchInterval is the frequency that we use to watch for changes
// in the consensus peerset and save new peers to the configuration
// file. This also affects how soon we realize that we have
// been removed from a cluster.
PeerWatchInterval time.Duration
} }
// configJSON represents a Cluster configuration as it will look when it is // configJSON represents a Cluster configuration as it will look when it is
@ -128,6 +135,7 @@ type configJSON struct {
ReplicationFactorMin int `json:"replication_factor_min"` ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"` ReplicationFactorMax int `json:"replication_factor_max"`
MonitorPingInterval string `json:"monitor_ping_interval"` MonitorPingInterval string `json:"monitor_ping_interval"`
PeerWatchInterval string `json:"peer_watch_interval"`
} }
// ConfigKey returns a human-readable string to identify // ConfigKey returns a human-readable string to identify
@ -207,6 +215,10 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.monitoring_interval is invalid") return errors.New("cluster.monitoring_interval is invalid")
} }
if cfg.PeerWatchInterval <= 0 {
return errors.New("cluster.peer_watch_interval is invalid")
}
rfMax := cfg.ReplicationFactorMax rfMax := cfg.ReplicationFactorMax
rfMin := cfg.ReplicationFactorMin rfMin := cfg.ReplicationFactorMin
@ -256,6 +268,7 @@ func (cfg *Config) setDefaults() {
cfg.ReplicationFactorMin = DefaultReplicationFactor cfg.ReplicationFactorMin = DefaultReplicationFactor
cfg.ReplicationFactorMax = DefaultReplicationFactor cfg.ReplicationFactorMax = DefaultReplicationFactor
cfg.MonitorPingInterval = DefaultMonitorPingInterval cfg.MonitorPingInterval = DefaultMonitorPingInterval
cfg.PeerWatchInterval = DefaultPeerWatchInterval
} }
// LoadJSON receives a raw json-formatted configuration and // LoadJSON receives a raw json-formatted configuration and
@ -353,10 +366,12 @@ func (cfg *Config) LoadJSON(raw []byte) error {
stateSyncInterval := parseDuration(jcfg.StateSyncInterval) stateSyncInterval := parseDuration(jcfg.StateSyncInterval)
ipfsSyncInterval := parseDuration(jcfg.IPFSSyncInterval) ipfsSyncInterval := parseDuration(jcfg.IPFSSyncInterval)
monitorPingInterval := parseDuration(jcfg.MonitorPingInterval) monitorPingInterval := parseDuration(jcfg.MonitorPingInterval)
peerWatchInterval := parseDuration(jcfg.PeerWatchInterval)
config.SetIfNotDefault(stateSyncInterval, &cfg.StateSyncInterval) config.SetIfNotDefault(stateSyncInterval, &cfg.StateSyncInterval)
config.SetIfNotDefault(ipfsSyncInterval, &cfg.IPFSSyncInterval) config.SetIfNotDefault(ipfsSyncInterval, &cfg.IPFSSyncInterval)
config.SetIfNotDefault(monitorPingInterval, &cfg.MonitorPingInterval) config.SetIfNotDefault(monitorPingInterval, &cfg.MonitorPingInterval)
config.SetIfNotDefault(peerWatchInterval, &cfg.PeerWatchInterval)
cfg.LeaveOnShutdown = jcfg.LeaveOnShutdown cfg.LeaveOnShutdown = jcfg.LeaveOnShutdown
@ -407,6 +422,7 @@ func (cfg *Config) ToJSON() (raw []byte, err error) {
jcfg.StateSyncInterval = cfg.StateSyncInterval.String() jcfg.StateSyncInterval = cfg.StateSyncInterval.String()
jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String() jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String()
jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String() jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String()
jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String()
raw, err = json.MarshalIndent(jcfg, "", " ") raw, err = json.MarshalIndent(jcfg, "", " ")
return return

View File

@ -315,7 +315,7 @@ func TestClusterRecoverAllLocal(t *testing.T) {
t.Fatal("pin should have worked:", err) t.Fatal("pin should have worked:", err)
} }
time.Sleep(time.Second) pinDelay()
recov, err := cl.RecoverAllLocal() recov, err := cl.RecoverAllLocal()
if err != nil { if err != nil {

View File

@ -15,6 +15,10 @@ import (
var logger = logging.Logger("config") var logger = logging.Logger("config")
// ConfigSaveInterval specifies how often to save the configuration file if
// it needs saving.
var ConfigSaveInterval = time.Second
// The ComponentConfig interface allows components to define configurations // The ComponentConfig interface allows components to define configurations
// which can be managed as part of the ipfs-cluster configuration file by the // which can be managed as part of the ipfs-cluster configuration file by the
// Manager. // Manager.
@ -116,7 +120,7 @@ func (cfg *Manager) watchSave(save <-chan struct{}) {
defer cfg.wg.Done() defer cfg.wg.Done()
// Save once per second mostly // Save once per second mostly
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(ConfigSaveInterval)
defer ticker.Stop() defer ticker.Stop()
thingsToSave := false thingsToSave := false

View File

@ -17,29 +17,30 @@ var testingClusterCfg = []byte(`{
"secret": "2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed", "secret": "2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed",
"peers": [], "peers": [],
"bootstrap": [], "bootstrap": [],
"leave_on_shutdown": true, "leave_on_shutdown": false,
"listen_multiaddress": "/ip4/127.0.0.1/tcp/10000", "listen_multiaddress": "/ip4/127.0.0.1/tcp/10000",
"state_sync_interval": "1m0s", "state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s", "ipfs_sync_interval": "2m10s",
"replication_factor": -1, "replication_factor": -1,
"monitor_ping_interval": "1s" "monitor_ping_interval": "150ms",
"peer_watch_interval": "100ms"
} }
`) `)
var testingRaftCfg = []byte(`{ var testingRaftCfg = []byte(`{
"data_folder": "raftFolderFromTests", "data_folder": "raftFolderFromTests",
"wait_for_leader_timeout": "30s", "wait_for_leader_timeout": "10s",
"commit_retries": 1, "commit_retries": 2,
"commit_retry_delay": "1s", "commit_retry_delay": "50ms",
"network_timeout": "20s", "network_timeout": "5s",
"heartbeat_timeout": "1s", "heartbeat_timeout": "100ms",
"election_timeout": "1s", "election_timeout": "100ms",
"commit_timeout": "50ms", "commit_timeout": "50ms",
"max_append_entries": 64, "max_append_entries": 256,
"trailing_logs": 10240, "trailing_logs": 10240,
"snapshot_interval": "2m0s", "snapshot_interval": "2m0s",
"snapshot_threshold": 8192, "snapshot_threshold": 8192,
"leader_lease_timeout": "500ms" "leader_lease_timeout": "80ms"
}`) }`)
var testingAPICfg = []byte(`{ var testingAPICfg = []byte(`{
@ -71,11 +72,11 @@ var testingTrackerCfg = []byte(`
`) `)
var testingMonCfg = []byte(`{ var testingMonCfg = []byte(`{
"check_interval": "1s" "check_interval": "300ms"
}`) }`)
var testingDiskInfCfg = []byte(`{ var testingDiskInfCfg = []byte(`{
"metric_ttl": "1s", "metric_ttl": "150ms",
"metric_type": "freespace" "metric_type": "freespace"
}`) }`)

View File

@ -50,6 +50,7 @@ func makeTestingHost(t *testing.T) host.Host {
} }
func testingConsensus(t *testing.T, idn int) *Consensus { func testingConsensus(t *testing.T, idn int) *Consensus {
cleanRaft(idn)
h := makeTestingHost(t) h := makeTestingHost(t)
st := mapstate.NewMapState() st := mapstate.NewMapState()
@ -72,6 +73,7 @@ func TestShutdownConsensus(t *testing.T) {
// Bring it up twice to make sure shutdown cleans up properly // Bring it up twice to make sure shutdown cleans up properly
// but also to make sure raft comes up ok when re-initialized // but also to make sure raft comes up ok when re-initialized
cc := testingConsensus(t, 1) cc := testingConsensus(t, 1)
defer cleanRaft(1)
err := cc.Shutdown() err := cc.Shutdown()
if err != nil { if err != nil {
t.Fatal("Consensus cannot shutdown:", err) t.Fatal("Consensus cannot shutdown:", err)

View File

@ -7,9 +7,9 @@ for dir in $dirs;
do do
if ls "$dir"/*.go &> /dev/null; if ls "$dir"/*.go &> /dev/null;
then then
cmdflags="-timeout 20m -v -coverprofile=profile.out -covermode=count $dir" cmdflags="-v -coverprofile=profile.out -covermode=count $dir"
if [ "$dir" == "." ]; then if [ "$dir" == "." ]; then
cmdflags="-timeout 20m -v -coverprofile=profile.out -covermode=count -loglevel CRITICAL ." cmdflags="-v -coverprofile=profile.out -covermode=count -loglevel CRITICAL ."
fi fi
echo go test $cmdflags echo go test $cmdflags
go test $cmdflags go test $cmdflags

View File

@ -1,6 +1,7 @@
package ipfscluster package ipfscluster
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"math/rand" "math/rand"
@ -25,17 +26,18 @@ import (
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto" crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
//TestClusters*
var ( var (
// number of clusters to create // number of clusters to create
nClusters = 6 nClusters = 5
// number of pins to pin/unpin/check // number of pins to pin/unpin/check
nPins = 500 nPins = 100
logLevel = "CRITICAL" logLevel = "CRITICAL"
@ -124,8 +126,8 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft
return clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock return clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock
} }
func createCluster(t *testing.T, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster { func createCluster(t *testing.T, host host.Host, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
cl, err := NewCluster(nil, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf) cl, err := NewCluster(host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
checkErr(t, err) checkErr(t, err)
<-cl.Ready() <-cl.Ready()
return cl return cl
@ -133,7 +135,7 @@ func createCluster(t *testing.T, clusterCfg *Config, consensusCfg *raft.Config,
func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster, *test.IpfsMock) { func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster, *test.IpfsMock) {
clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth, clusterSecret) clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth, clusterSecret)
cl := createCluster(t, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf) cl := createCluster(t, nil, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
return cl, mock return cl, mock
} }
@ -149,6 +151,8 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
allocs := make([]PinAllocator, nClusters, nClusters) allocs := make([]PinAllocator, nClusters, nClusters)
infs := make([]Informer, nClusters, nClusters) infs := make([]Informer, nClusters, nClusters)
ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters) ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters)
hosts := make([]host.Host, nClusters, nClusters)
clusters := make([]*Cluster, nClusters, nClusters) clusters := make([]*Cluster, nClusters, nClusters)
// Uncomment when testing with fixed ports // Uncomment when testing with fixed ports
@ -197,25 +201,45 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
// ---------------------------------------------- // ----------------------------------------------
// Alternative way of starting using bootstrap // Alternative way of starting using bootstrap
// Create hosts
var err error
for i := 0; i < nClusters; i++ {
hosts[i], err = NewClusterHost(context.Background(), cfgs[i])
if err != nil {
t.Fatal(err)
}
}
// open connections among all hosts
for _, h := range hosts {
for _, h2 := range hosts {
if h.ID() != h2.ID() {
h.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
_, err := h.Network().DialPeer(context.Background(), h2.ID())
if err != nil {
t.Fatal(err)
}
}
}
}
// Start first node // Start first node
clusters[0] = createCluster(t, cfgs[0], concfgs[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0]) clusters[0] = createCluster(t, hosts[0], cfgs[0], concfgs[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0])
// Find out where it binded // Find out where it binded
bootstrapAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", clusters[0].host.Addrs()[0], clusters[0].id.Pretty())) bootstrapAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", clusters[0].host.Addrs()[0], clusters[0].id.Pretty()))
// Use first node to bootstrap // Use first node to bootstrap
for i := 1; i < nClusters; i++ { for i := 1; i < nClusters; i++ {
cfgs[i].Bootstrap = []ma.Multiaddr{bootstrapAddr} cfgs[i].Bootstrap = []ma.Multiaddr{bootstrapAddr}
} }
waitForLeader(t, clusters[0:1])
// Start the rest // Start the rest
var wg sync.WaitGroup // We don't do this in parallel because it causes libp2p dial backoffs
for i := 1; i < nClusters; i++ { for i := 1; i < nClusters; i++ {
wg.Add(1) clusters[i] = createCluster(t, hosts[i], cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i])
go func(i int) {
clusters[i] = createCluster(t, cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i])
wg.Done()
}(i)
} }
wg.Wait() waitForLeader(t, clusters)
// --------------------------------------------- // ---------------------------------------------
@ -223,8 +247,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
// for i := 1; i < nClusters; i++ { // for i := 1; i < nClusters; i++ {
// clusters[0].PeerAdd(clusterAddr(clusters[i])) // clusters[0].PeerAdd(clusterAddr(clusters[i]))
// } // }
delay()
delay()
return clusters, ipfsMocks return clusters, ipfsMocks
} }
@ -252,29 +275,53 @@ func runF(t *testing.T, clusters []*Cluster, f func(*testing.T, *Cluster)) {
wg.Wait() wg.Wait()
} }
//////////////////////////////////////
// Delay and wait functions
//
// Delays are used in tests to wait for certain events to happen:
// * ttlDelay() waits for metrics to arrive. If you pin something
// and your next operation depends on updated metrics, you need to wait
// * pinDelay() accounts for the time necessary to pin something and for the new
// log entry to be visible in all cluster peers
// * delay just sleeps a second or two.
// * waitForLeader functions make sure there is a raft leader, for example,
// after killing the leader.
//
// The values for delays are a result of testing and adjusting so tests pass
// in travis, jenkins etc., taking into account the values used in the
// testing configuration (config_test.go).
func delay() { func delay() {
var d int var d int
if nClusters > 10 { if nClusters > 10 {
d = 8 d = 2000
} else if nClusters > 5 {
d = 5
} else { } else {
d = nClusters d = 1000
} }
time.Sleep(time.Duration(d) * time.Second) time.Sleep(time.Duration(d) * time.Millisecond)
} }
func pinDelay() {
time.Sleep(400 * time.Millisecond)
}
func ttlDelay() {
diskInfCfg := &disk.Config{}
diskInfCfg.LoadJSON(testingDiskInfCfg)
time.Sleep(diskInfCfg.MetricTTL * 3)
}
// Like waitForLeader but letting metrics expire before waiting, and
// waiting for new metrics to arrive afterwards.
func waitForLeaderAndMetrics(t *testing.T, clusters []*Cluster) {
ttlDelay()
waitForLeader(t, clusters)
ttlDelay()
}
// Makes sure there is a leader and everyone knows about it.
func waitForLeader(t *testing.T, clusters []*Cluster) { func waitForLeader(t *testing.T, clusters []*Cluster) {
timer := time.NewTimer(time.Minute) timer := time.NewTimer(time.Minute)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(100 * time.Millisecond)
// Wait for consensus to pick a new leader in case we shut it down
// Make sure we don't check on a shutdown cluster
j := rand.Intn(len(clusters))
for clusters[j].shutdownB {
j = rand.Intn(len(clusters))
}
loop: loop:
for { for {
@ -282,14 +329,22 @@ loop:
case <-timer.C: case <-timer.C:
t.Fatal("timed out waiting for a leader") t.Fatal("timed out waiting for a leader")
case <-ticker.C: case <-ticker.C:
_, err := clusters[j].consensus.Leader() for _, cl := range clusters {
if err == nil { if cl.shutdownB {
break loop continue // skip shutdown clusters
}
_, err := cl.consensus.Leader()
if err != nil {
continue loop
}
} }
break loop
} }
} }
} }
/////////////////////////////////////////
func TestClustersVersion(t *testing.T) { func TestClustersVersion(t *testing.T) {
clusters, mock := createClusters(t) clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
@ -305,7 +360,6 @@ func TestClustersVersion(t *testing.T) {
func TestClustersPeers(t *testing.T) { func TestClustersPeers(t *testing.T) {
clusters, mock := createClusters(t) clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
delay()
j := rand.Intn(nClusters) // choose a random cluster peer j := rand.Intn(nClusters) // choose a random cluster peer
peers := clusters[j].Peers() peers := clusters[j].Peers()
@ -345,6 +399,9 @@ func TestClustersPin(t *testing.T) {
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
exampleCid, _ := cid.Decode(test.TestCid1) exampleCid, _ := cid.Decode(test.TestCid1)
prefix := exampleCid.Prefix() prefix := exampleCid.Prefix()
ttlDelay()
for i := 0; i < nPins; i++ { for i := 0; i < nPins; i++ {
j := rand.Intn(nClusters) // choose a random cluster peer j := rand.Intn(nClusters) // choose a random cluster peer
h, err := prefix.Sum(randomBytes()) // create random cid h, err := prefix.Sum(randomBytes()) // create random cid
@ -360,6 +417,7 @@ func TestClustersPin(t *testing.T) {
} }
} }
delay() delay()
delay()
fpinned := func(t *testing.T, c *Cluster) { fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll() status := c.tracker.StatusAll()
for _, v := range status { for _, v := range status {
@ -378,7 +436,7 @@ func TestClustersPin(t *testing.T) {
// Unpin everything // Unpin everything
pinList := clusters[0].Pins() pinList := clusters[0].Pins()
for i := 0; i < nPins; i++ { for i := 0; i < len(pinList); i++ {
j := rand.Intn(nClusters) // choose a random cluster peer j := rand.Intn(nClusters) // choose a random cluster peer
err := clusters[j].Unpin(pinList[i].Cid) err := clusters[j].Unpin(pinList[i].Cid)
if err != nil { if err != nil {
@ -392,6 +450,7 @@ func TestClustersPin(t *testing.T) {
} }
delay() delay()
delay()
funpinned := func(t *testing.T, c *Cluster) { funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll() status := c.tracker.StatusAll()
@ -408,7 +467,7 @@ func TestClustersStatusAll(t *testing.T) {
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
delay() pinDelay()
// Global status // Global status
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
statuses, err := c.StatusAll() statuses, err := c.StatusAll()
@ -452,10 +511,11 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
delay() pinDelay()
// shutdown 1 cluster peer // shutdown 1 cluster peer
clusters[1].Shutdown() clusters[1].Shutdown()
delay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
// skip if it's the shutdown peer // skip if it's the shutdown peer
@ -513,7 +573,9 @@ func TestClustersSyncAllLocal(t *testing.T) {
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2)) clusters[0].Pin(api.PinCid(h2))
delay() pinDelay()
pinDelay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
// Sync bad ID // Sync bad ID
infos, err := c.SyncAllLocal() infos, err := c.SyncAllLocal()
@ -541,7 +603,8 @@ func TestClustersSyncLocal(t *testing.T) {
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2)) clusters[0].Pin(api.PinCid(h2))
delay() pinDelay()
pinDelay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
info, err := c.SyncLocal(h) info, err := c.SyncLocal(h)
@ -572,7 +635,8 @@ func TestClustersSyncAll(t *testing.T) {
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2)) clusters[0].Pin(api.PinCid(h2))
delay() pinDelay()
pinDelay()
j := rand.Intn(nClusters) // choose a random cluster peer j := rand.Intn(nClusters) // choose a random cluster peer
ginfos, err := clusters[j].SyncAll() ginfos, err := clusters[j].SyncAll()
@ -603,7 +667,8 @@ func TestClustersSync(t *testing.T) {
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2)) clusters[0].Pin(api.PinCid(h2))
delay() pinDelay()
pinDelay()
j := rand.Intn(nClusters) j := rand.Intn(nClusters)
ginfo, err := clusters[j].Sync(h) ginfo, err := clusters[j].Sync(h)
@ -662,10 +727,13 @@ func TestClustersRecoverLocal(t *testing.T) {
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
ttlDelay()
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2)) clusters[0].Pin(api.PinCid(h2))
pinDelay()
delay() pinDelay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
info, err := c.RecoverLocal(h) info, err := c.RecoverLocal(h)
@ -694,10 +762,14 @@ func TestClustersRecover(t *testing.T) {
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
ttlDelay()
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2)) clusters[0].Pin(api.PinCid(h2))
delay() pinDelay()
pinDelay()
j := rand.Intn(nClusters) j := rand.Intn(nClusters)
ginfo, err := clusters[j].Recover(h) ginfo, err := clusters[j].Recover(h)
@ -771,6 +843,8 @@ func TestClustersReplication(t *testing.T) {
c.config.ReplicationFactorMax = nClusters - 1 c.config.ReplicationFactorMax = nClusters - 1
} }
ttlDelay()
// Why is replication factor nClusters - 1? // Why is replication factor nClusters - 1?
// Because that way we know that pinning nCluster // Because that way we know that pinning nCluster
// pins with an strategy like numpins/disk // pins with an strategy like numpins/disk
@ -789,7 +863,7 @@ func TestClustersReplication(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
time.Sleep(time.Second) pinDelay()
// check that it is held by exactly nClusters -1 peers // check that it is held by exactly nClusters -1 peers
gpi, err := clusters[j].Status(h) gpi, err := clusters[j].Status(h)
@ -814,7 +888,7 @@ func TestClustersReplication(t *testing.T) {
if numRemote != 1 { if numRemote != 1 {
t.Errorf("We wanted 1 peer track as remote but %d do", numRemote) t.Errorf("We wanted 1 peer track as remote but %d do", numRemote)
} }
time.Sleep(time.Second) // this is for metric to be up to date ttlDelay()
} }
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
@ -875,13 +949,15 @@ func TestClustersReplicationFactorMax(t *testing.T) {
c.config.ReplicationFactorMax = nClusters - 1 c.config.ReplicationFactorMax = nClusters - 1
} }
ttlDelay()
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
err := clusters[0].Pin(api.PinCid(h)) err := clusters[0].Pin(api.PinCid(h))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
delay() pinDelay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
p, err := c.PinGet(h) p, err := c.PinGet(h)
@ -918,13 +994,15 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) {
c.config.ReplicationFactorMax = nClusters c.config.ReplicationFactorMax = nClusters
} }
ttlDelay() // make sure we have places to pin
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
err := clusters[0].Pin(api.PinCid(h)) err := clusters[0].Pin(api.PinCid(h))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
delay() pinDelay()
p1, err := clusters[0].PinGet(h) p1, err := clusters[0].PinGet(h)
if err != nil { if err != nil {
@ -944,7 +1022,7 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
delay() pinDelay()
p2, err := clusters[0].PinGet(h) p2, err := clusters[0].PinGet(h)
if err != nil { if err != nil {
@ -970,16 +1048,13 @@ func TestClustersReplicationFactorInBetween(t *testing.T) {
c.config.ReplicationFactorMax = nClusters c.config.ReplicationFactorMax = nClusters
} }
ttlDelay()
// Shutdown two peers // Shutdown two peers
clusters[nClusters-1].Shutdown() clusters[nClusters-1].Shutdown()
clusters[nClusters-2].Shutdown() clusters[nClusters-2].Shutdown()
time.Sleep(time.Second) // let metric expire waitForLeaderAndMetrics(t, clusters)
waitForLeader(t, clusters)
// allow metrics to arrive to new leader
delay()
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
err := clusters[0].Pin(api.PinCid(h)) err := clusters[0].Pin(api.PinCid(h))
@ -987,7 +1062,7 @@ func TestClustersReplicationFactorInBetween(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
delay() pinDelay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
if c == clusters[nClusters-1] || c == clusters[nClusters-2] { if c == clusters[nClusters-1] || c == clusters[nClusters-2] {
@ -1029,14 +1104,9 @@ func TestClustersReplicationFactorMin(t *testing.T) {
// Shutdown two peers // Shutdown two peers
clusters[nClusters-1].Shutdown() clusters[nClusters-1].Shutdown()
waitForLeaderAndMetrics(t, clusters)
clusters[nClusters-2].Shutdown() clusters[nClusters-2].Shutdown()
waitForLeaderAndMetrics(t, clusters)
time.Sleep(time.Second) // let metric expire
waitForLeader(t, clusters)
// allow metrics to arrive to new leader
delay()
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
err := clusters[0].Pin(api.PinCid(h)) err := clusters[0].Pin(api.PinCid(h))
@ -1063,28 +1133,29 @@ func TestClustersReplicationMinMaxNoRealloc(t *testing.T) {
c.config.ReplicationFactorMax = nClusters c.config.ReplicationFactorMax = nClusters
} }
ttlDelay()
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
err := clusters[0].Pin(api.PinCid(h)) err := clusters[0].Pin(api.PinCid(h))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pinDelay()
// Shutdown two peers // Shutdown two peers
clusters[nClusters-1].Shutdown() clusters[nClusters-1].Shutdown()
waitForLeaderAndMetrics(t, clusters)
clusters[nClusters-2].Shutdown() clusters[nClusters-2].Shutdown()
waitForLeaderAndMetrics(t, clusters)
time.Sleep(time.Second) // let metric expire
waitForLeader(t, clusters)
// allow metrics to arrive to new leader
delay()
err = clusters[0].Pin(api.PinCid(h)) err = clusters[0].Pin(api.PinCid(h))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pinDelay()
p, err := clusters[0].PinGet(h) p, err := clusters[0].PinGet(h)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1114,13 +1185,15 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
c.config.ReplicationFactorMax = 4 c.config.ReplicationFactorMax = 4
} }
ttlDelay() // make sure metrics are in
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
err := clusters[0].Pin(api.PinCid(h)) err := clusters[0].Pin(api.PinCid(h))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
delay() pinDelay()
p, err := clusters[0].PinGet(h) p, err := clusters[0].PinGet(h)
if err != nil { if err != nil {
@ -1142,12 +1215,7 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
alloc1.Shutdown() alloc1.Shutdown()
alloc2.Shutdown() alloc2.Shutdown()
time.Sleep(time.Second) // let metric expire waitForLeaderAndMetrics(t, clusters)
waitForLeader(t, clusters)
// allow metrics to arrive to new leader
delay()
// Repin - (although this might have been taken of if there was an alert // Repin - (although this might have been taken of if there was an alert
err = safePeer.Pin(api.PinCid(h)) err = safePeer.Pin(api.PinCid(h))
@ -1155,6 +1223,8 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
pinDelay()
p, err = safePeer.PinGet(h) p, err = safePeer.PinGet(h)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1176,7 +1246,7 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
lenSA := len(secondAllocations) lenSA := len(secondAllocations)
expected := minInt(nClusters-2, 4) expected := minInt(nClusters-2, 4)
if lenSA != expected { if lenSA != expected {
t.Errorf("Inssufficient reallocation, could have allocated to %d peers but instead only allocated to %d peers", expected, lenSA) t.Errorf("Insufficient reallocation, could have allocated to %d peers but instead only allocated to %d peers", expected, lenSA)
} }
if lenSA < 3 { if lenSA < 3 {
@ -1194,6 +1264,8 @@ func TestClustersReplicationRealloc(t *testing.T) {
c.config.ReplicationFactorMax = nClusters - 1 c.config.ReplicationFactorMax = nClusters - 1
} }
ttlDelay()
j := rand.Intn(nClusters) j := rand.Intn(nClusters)
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
err := clusters[j].Pin(api.PinCid(h)) err := clusters[j].Pin(api.PinCid(h))
@ -1202,7 +1274,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
} }
// Let the pin arrive // Let the pin arrive
time.Sleep(time.Second / 2) pinDelay()
pin := clusters[j].Pins()[0] pin := clusters[j].Pins()[0]
pinSerial := pin.ToSerial() pinSerial := pin.ToSerial()
@ -1217,7 +1289,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(time.Second / 2) pinDelay()
pin2 := clusters[j].Pins()[0] pin2 := clusters[j].Pins()[0]
pinSerial2 := pin2.ToSerial() pinSerial2 := pin2.ToSerial()
@ -1245,10 +1317,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
// let metrics expire and give time for the cluster to // let metrics expire and give time for the cluster to
// see if they have lost the leader // see if they have lost the leader
time.Sleep(4 * time.Second) waitForLeaderAndMetrics(t, clusters)
waitForLeader(t, clusters)
// wait for new metrics to arrive
time.Sleep(2 * time.Second)
// Make sure we haven't killed our randomly // Make sure we haven't killed our randomly
// selected cluster // selected cluster
@ -1262,7 +1331,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(time.Second / 2) pinDelay()
numPinned := 0 numPinned := 0
for i, c := range clusters { for i, c := range clusters {
@ -1303,13 +1372,12 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
} }
// Let the pin arrive // Let the pin arrive
time.Sleep(time.Second / 2) pinDelay()
clusters[0].Shutdown() clusters[0].Shutdown()
clusters[1].Shutdown() clusters[1].Shutdown()
delay() waitForLeaderAndMetrics(t, clusters)
waitForLeader(t, clusters)
err = clusters[2].Pin(api.PinCid(h)) err = clusters[2].Pin(api.PinCid(h))
if err == nil { if err == nil {
@ -1337,7 +1405,7 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
// pin something // pin something
h, _ := cid.Decode(test.TestCid1) h, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(h)) clusters[0].Pin(api.PinCid(h))
time.Sleep(time.Second * 2) // let the pin arrive pinDelay()
pinLocal := 0 pinLocal := 0
pinRemote := 0 pinRemote := 0
var localPinner peer.ID var localPinner peer.ID
@ -1361,7 +1429,7 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
t.Fatal("Not pinned as expected") t.Fatal("Not pinned as expected")
} }
// find a kill the local pinner // kill the local pinner
for _, c := range clusters { for _, c := range clusters {
if c.id == localPinner { if c.id == localPinner {
c.Shutdown() c.Shutdown()
@ -1370,8 +1438,8 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
} }
} }
// Sleep a monitoring interval delay()
time.Sleep(6 * time.Second) waitForLeaderAndMetrics(t, clusters) // in case we killed the leader
// It should be now pinned in the remote pinner // It should be now pinned in the remote pinner
if s := remotePinnerCluster.tracker.Status(h).Status; s != api.TrackerStatusPinned { if s := remotePinnerCluster.tracker.Status(h).Status; s != api.TrackerStatusPinned {
@ -1452,8 +1520,6 @@ func validateClusterGraph(t *testing.T, graph api.ConnectGraph, clusterIDs map[p
func TestClustersGraphConnected(t *testing.T) { func TestClustersGraphConnected(t *testing.T) {
clusters, mock := createClusters(t) clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
delay()
delay()
j := rand.Intn(nClusters) // choose a random cluster peer to query j := rand.Intn(nClusters) // choose a random cluster peer to query
graph, err := clusters[j].ConnectGraph() graph, err := clusters[j].ConnectGraph()
@ -1496,9 +1562,8 @@ func TestClustersGraphUnhealthy(t *testing.T) {
clusters[discon1].Shutdown() clusters[discon1].Shutdown()
clusters[discon2].Shutdown() clusters[discon2].Shutdown()
delay()
waitForLeader(t, clusters) waitForLeaderAndMetrics(t, clusters)
delay()
graph, err := clusters[j].ConnectGraph() graph, err := clusters[j].ConnectGraph()
if err != nil { if err != nil {

View File

@ -22,7 +22,7 @@ var logger = logging.Logger("monitor")
var AlertChannelCap = 256 var AlertChannelCap = 256
// WindowCap specifies how many metrics to keep for given host and metric type // WindowCap specifies how many metrics to keep for given host and metric type
var WindowCap = 10 var WindowCap = 100
// peerMetrics is just a circular queue // peerMetrics is just a circular queue
type peerMetrics struct { type peerMetrics struct {
@ -55,6 +55,7 @@ func (pmets *peerMetrics) latest() (api.Metric, error) {
// pmets.mux.RLock() // pmets.mux.RLock()
// defer pmets.mux.RUnlock() // defer pmets.mux.RUnlock()
if len(pmets.window) == 0 { if len(pmets.window) == 0 {
logger.Warning("no metrics")
return api.Metric{}, errors.New("no metrics") return api.Metric{}, errors.New("no metrics")
} }
return pmets.window[pmets.last], nil return pmets.window[pmets.last], nil
@ -183,7 +184,7 @@ func (mon *Monitor) LogMetric(m api.Metric) {
mbyp[peer] = pmets mbyp[peer] = pmets
} }
logger.Debugf("logged '%s' metric from '%s'. Expires on %s", name, peer, m.Expire) logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire)
pmets.add(m) pmets.add(m)
} }

View File

@ -2,6 +2,8 @@ package basic
import ( import (
"fmt" "fmt"
"strconv"
"sync"
"testing" "testing"
"time" "time"
@ -51,6 +53,58 @@ func TestPeerMonitorShutdown(t *testing.T) {
} }
} }
func TestLogMetricConcurrent(t *testing.T) {
pm := testPeerMonitor(t)
defer pm.Shutdown()
var wg sync.WaitGroup
wg.Add(3)
f := func() {
defer wg.Done()
for i := 0; i < 25; i++ {
mt := api.Metric{
Name: "test",
Peer: test.TestPeerID1,
Value: fmt.Sprintf("%d", time.Now().UnixNano()),
Valid: true,
}
mt.SetTTLDuration(150 * time.Millisecond)
pm.LogMetric(mt)
time.Sleep(75 * time.Millisecond)
}
}
go f()
go f()
go f()
time.Sleep(150 * time.Millisecond)
last := time.Now().Add(-500 * time.Millisecond)
for i := 0; i <= 20; i++ {
lastMtrcs := pm.LastMetrics("test")
if len(lastMtrcs) != 1 {
t.Error("no valid metrics", len(lastMtrcs), i)
time.Sleep(75 * time.Millisecond)
continue
}
n, err := strconv.Atoi(lastMtrcs[0].Value)
if err != nil {
t.Fatal(err)
}
current := time.Unix(0, int64(n))
if current.Before(last) {
t.Errorf("expected newer metric: Current: %s, Last: %s", current, last)
}
last = current
time.Sleep(75 * time.Millisecond)
}
wg.Wait()
}
func TestPeerMonitorLogMetric(t *testing.T) { func TestPeerMonitorLogMetric(t *testing.T) {
pm := testPeerMonitor(t) pm := testPeerMonitor(t)
defer pm.Shutdown() defer pm.Shutdown()

View File

@ -29,7 +29,6 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
}(i) }(i)
} }
wg.Wait() wg.Wait()
delay()
return cls, mocks return cls, mocks
} }
@ -65,7 +64,7 @@ func TestClustersPeerAdd(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
delay() pinDelay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
ids := c.Peers() ids := c.Peers()
@ -88,8 +87,6 @@ func TestClustersPeerAdd(t *testing.T) {
t.Error("By now cluster peers should reflect all peers") t.Error("By now cluster peers should reflect all peers")
} }
time.Sleep(2 * time.Second)
// check that they are part of the configuration // check that they are part of the configuration
// This only works because each peer only has one multiaddress // This only works because each peer only has one multiaddress
// (localhost) // (localhost)
@ -214,6 +211,7 @@ func TestClustersPeerRemoveSelf(t *testing.T) {
defer shutdownClusters(t, clusters, mocks) defer shutdownClusters(t, clusters, mocks)
for i := 0; i < len(clusters); i++ { for i := 0; i < len(clusters); i++ {
waitForLeaderAndMetrics(t, clusters)
peers := clusters[i].Peers() peers := clusters[i].Peers()
t.Logf("Current cluster size: %d", len(peers)) t.Logf("Current cluster size: %d", len(peers))
if len(peers) != (len(clusters) - i) { if len(peers) != (len(clusters) - i) {
@ -250,7 +248,7 @@ func TestClustersPeerRemoveLeader(t *testing.T) {
var l peer.ID var l peer.ID
for _, c := range clusters { for _, c := range clusters {
if !c.shutdownB { if !c.shutdownB {
waitForLeader(t, clusters) waitForLeaderAndMetrics(t, clusters)
l, _ = c.consensus.Leader() l, _ = c.consensus.Leader()
} }
} }
@ -286,7 +284,7 @@ func TestClustersPeerRemoveLeader(t *testing.T) {
if more { if more {
t.Error("should be done") t.Error("should be done")
} }
time.Sleep(time.Second) time.Sleep(time.Second / 2)
} }
} }
@ -341,10 +339,10 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
checkErr(t, err) checkErr(t, err)
err = leader.Pin(api.PinCid(h)) err = leader.Pin(api.PinCid(h))
checkErr(t, err) checkErr(t, err)
time.Sleep(time.Second) // time to update the metrics ttlDelay()
} }
delay() pinDelay()
// At this point, all peers must have 1 pin associated to them. // At this point, all peers must have 1 pin associated to them.
// Find out which pin is associated to leader. // Find out which pin is associated to leader.
@ -373,9 +371,9 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
t.Fatal("error removing peer:", err) t.Fatal("error removing peer:", err)
} }
time.Sleep(2 * time.Second)
waitForLeader(t, clusters)
delay() delay()
waitForLeaderAndMetrics(t, clusters)
delay() // this seems to fail when not waiting enough...
for _, icid := range interestingCids { for _, icid := range interestingCids {
// Now check that the allocations are new. // Now check that the allocations are new.
@ -405,7 +403,7 @@ func TestClustersPeerJoin(t *testing.T) {
} }
hash, _ := cid.Decode(test.TestCid1) hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(hash)) clusters[0].Pin(api.PinCid(hash))
delay() pinDelay()
f := func(t *testing.T, c *Cluster) { f := func(t *testing.T, c *Cluster) {
peers := c.Peers() peers := c.Peers()
@ -438,7 +436,7 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) {
hash, _ := cid.Decode(test.TestCid1) hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(api.PinCid(hash)) clusters[0].Pin(api.PinCid(hash))
delay() pinDelay()
f2 := func(t *testing.T, c *Cluster) { f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers() peers := c.Peers()
@ -555,7 +553,7 @@ func TestClustersPeerRejoin(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
delay() pinDelay()
// Rejoin c0 // Rejoin c0
c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret) c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret)

View File

@ -1,10 +1,6 @@
package ipfscluster package ipfscluster
import ( import "testing"
"testing"
pnet "github.com/libp2p/go-libp2p-pnet"
)
func TestClusterSecretFormat(t *testing.T) { func TestClusterSecretFormat(t *testing.T) {
goodSecret := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" goodSecret := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
@ -57,28 +53,29 @@ func TestSimplePNet(t *testing.T) {
} }
} }
func TestClusterSecretRequired(t *testing.T) { // // Adds one minute to tests. Disabled for the moment.
cl1Secret, err := pnet.GenerateV1Bytes() // func TestClusterSecretRequired(t *testing.T) {
if err != nil { // cl1Secret, err := pnet.GenerateV1Bytes()
t.Fatal("Unable to generate cluster secret.") // if err != nil {
} // t.Fatal("Unable to generate cluster secret.")
cl1, _ := createOnePeerCluster(t, 1, (*cl1Secret)[:]) // }
cl2, _ := createOnePeerCluster(t, 2, testingClusterSecret) // cl1, _ := createOnePeerCluster(t, 1, (*cl1Secret)[:])
defer cleanRaft() // cl2, _ := createOnePeerCluster(t, 2, testingClusterSecret)
defer cl1.Shutdown() // defer cleanRaft()
defer cl2.Shutdown() // defer cl1.Shutdown()
peers1 := cl1.Peers() // defer cl2.Shutdown()
peers2 := cl2.Peers() // peers1 := cl1.Peers()
// peers2 := cl2.Peers()
//
// _, err = cl1.PeerAdd(clusterAddr(cl2))
// if err == nil {
// t.Fatal("Peer entered private cluster without key.")
// }
_, err = cl1.PeerAdd(clusterAddr(cl2)) // if len(peers1) != len(peers2) {
if err == nil { // t.Fatal("Expected same number of peers")
t.Fatal("Peer entered private cluster without key.") // }
} // if len(peers1) != 1 {
// t.Fatal("Expected no peers other than self")
if len(peers1) != len(peers2) { // }
t.Fatal("Expected same number of peers") // }
}
if len(peers1) != 1 {
t.Fatal("Expected no peers other than self")
}
}