diff --git a/.travis.yml b/.travis.yml index e411dc8b..b3d07a16 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ install: - go get github.com/mattn/goveralls - make deps script: +#- make test_problem - make service && make ctl && ./coverage.sh - make install - make test_sharness && make clean_sharness diff --git a/Makefile b/Makefile index 08c48c44..528dc613 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,9 @@ gx-go_bin=$(deptools)/$(gx-go) bin_env=$(shell go env GOHOSTOS)-$(shell go env GOHOSTARCH) sharness = sharness/lib/sharness +# For debugging +problematic_test = TestClustersReplicationRealloc + export PATH := $(deptools):$(PATH) all: service ctl @@ -63,6 +66,9 @@ test: deps test_sharness: $(sharness) @sh sharness/run-sharness-tests.sh +test_problem: deps + go test -tags debug -v -run $(problematic_test) + $(sharness): @echo "Downloading sharness" @wget -q -O sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz diff --git a/cluster.go b/cluster.go index d033e712..9c50c980 100644 --- a/cluster.go +++ b/cluster.go @@ -10,10 +10,11 @@ import ( "sync" "time" + pnet "github.com/libp2p/go-libp2p-pnet" + "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/state" - pnet "github.com/libp2p/go-libp2p-pnet" rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" @@ -217,31 +218,41 @@ func (c *Cluster) broadcastMetric(m api.Metric) error { return err } - if leader == c.id { - // Leader needs to broadcast its metric to everyone - // in case it goes down (new leader will have to detect this node went down) - errs := c.multiRPC(peers, - "Cluster", - "PeerMonitorLogMetric", - m, - copyEmptyStructToIfaces(make([]struct{}, len(peers), len(peers)))) - for i, e := range errs { - if e != nil { - logger.Errorf("error pushing metric to %s: %s", peers[i].Pretty(), e) - } - } - } else { - // non-leaders just need to forward their metrics to the leader - err := c.rpcClient.Call(leader, - "Cluster", "PeerMonitorLogMetric", - m, &struct{}{}) - if err != nil { - logger.Error(err) - return err - } - } + // If a peer is down, the rpc call will get locked. Therefore, + // we need to do it async. This way we keep broadcasting + // even if someone is down. Eventually those requests will + // timeout in libp2p and the errors logged. + go func() { + if leader == c.id { + // Leader needs to broadcast its metric to everyone + // in case it goes down (new leader will have to detect this node went down) + logger.Debugf("Leader %s about to broadcast metric %s to %s. Expires: %s", c.id, m.Name, peers, m.Expire) - logger.Debugf("sent metric %s", m.Name) + errs := c.multiRPC(peers, + "Cluster", + "PeerMonitorLogMetric", + m, + copyEmptyStructToIfaces(make([]struct{}, len(peers), len(peers)))) + for i, e := range errs { + if e != nil { + logger.Errorf("error pushing metric to %s: %s", peers[i].Pretty(), e) + } + } + logger.Debugf("Leader %s broadcasted metric %s to %s. Expires: %s", c.id, m.Name, peers, m.Expire) + } else { + // non-leaders just need to forward their metrics to the leader + logger.Debugf("Peer %s about to send metric %s to %s. Expires: %s", c.id, m.Name, leader, m.Expire) + + err := c.rpcClient.Call(leader, + "Cluster", "PeerMonitorLogMetric", + m, &struct{}{}) + if err != nil { + logger.Error(err) + } + logger.Debugf("Peer %s sent metric %s to %s. Expires: %s", c.id, m.Name, leader, m.Expire) + + } + }() return nil } @@ -272,6 +283,7 @@ func (c *Cluster) pushInformerMetrics() { timer.Stop() // no need to drain C if we are here timer.Reset(metric.GetTTL() / 2) } + logger.Debugf("Peer %s. Finished pushInformerMetrics", c.id) } func (c *Cluster) pushPingMetrics() { @@ -291,6 +303,7 @@ func (c *Cluster) pushPingMetrics() { case <-ticker.C: } } + logger.Debugf("Peer %s. Finished pushPingMetrics", c.id) } // read the alerts channel from the monitor and triggers repins @@ -303,7 +316,7 @@ func (c *Cluster) alertsHandler() { // only the leader handles alerts leader, err := c.consensus.Leader() if err == nil && leader == c.id { - logger.Warningf("Received alert for %s in %s", alrt.MetricName, alrt.Peer.Pretty()) + logger.Warningf("Peer %s received alert for %s in %s", c.id, alrt.MetricName, alrt.Peer.Pretty()) switch alrt.MetricName { case "ping": c.repinFromPeer(alrt.Peer) diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 6c42b3c2..dbdd10a7 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -865,6 +865,7 @@ func TestClustersReplicationRealloc(t *testing.T) { if pinfo.Status == api.TrackerStatusPinned { //t.Logf("Killing %s", c.id.Pretty()) killedClusterIndex = i + t.Logf("Shutting down %s", c.ID().ID) c.Shutdown() break } @@ -873,8 +874,9 @@ func TestClustersReplicationRealloc(t *testing.T) { // let metrics expire and give time for the cluster to // see if they have lost the leader time.Sleep(4 * time.Second) - waitForLeader(t, clusters) + // wait for new metrics to arrive + delay() // Make sure we haven't killed our randomly // selected cluster diff --git a/monitor/basic/peer_monitor.go b/monitor/basic/peer_monitor.go index 5362f544..fc7462a0 100644 --- a/monitor/basic/peer_monitor.go +++ b/monitor/basic/peer_monitor.go @@ -178,7 +178,7 @@ func (mon *StdPeerMonitor) LogMetric(m api.Metric) { mbyp[peer] = pmets } - logger.Debugf("logged '%s' metric from '%s'", name, peer) + logger.Debugf("logged '%s' metric from '%s'. Expires on %s", name, peer, m.Expire) pmets.add(m) } @@ -308,6 +308,7 @@ func (mon *StdPeerMonitor) checkMetrics(peers []peer.ID, metricName string) { } // send alert if metric is expired (but was valid at some point) if last.Valid && last.Expired() { + logger.Debugf("Metric %s from peer %s expired at %s", metricName, p, last.Expire) mon.sendAlert(p, metricName) } }