Fix tests: Make metric broadcasting async
When a peer is down, metric broadcasting hangs, and no more ticks are sent for a while.
This commit is contained in:
parent
4ea1777050
commit
8d3c72b766
|
@ -12,6 +12,7 @@ install:
|
||||||
- go get github.com/mattn/goveralls
|
- go get github.com/mattn/goveralls
|
||||||
- make deps
|
- make deps
|
||||||
script:
|
script:
|
||||||
|
#- make test_problem
|
||||||
- make service && make ctl && ./coverage.sh
|
- make service && make ctl && ./coverage.sh
|
||||||
- make install
|
- make install
|
||||||
- make test_sharness && make clean_sharness
|
- make test_sharness && make clean_sharness
|
||||||
|
|
6
Makefile
6
Makefile
|
@ -10,6 +10,9 @@ gx-go_bin=$(deptools)/$(gx-go)
|
||||||
bin_env=$(shell go env GOHOSTOS)-$(shell go env GOHOSTARCH)
|
bin_env=$(shell go env GOHOSTOS)-$(shell go env GOHOSTARCH)
|
||||||
sharness = sharness/lib/sharness
|
sharness = sharness/lib/sharness
|
||||||
|
|
||||||
|
# For debugging
|
||||||
|
problematic_test = TestClustersReplicationRealloc
|
||||||
|
|
||||||
export PATH := $(deptools):$(PATH)
|
export PATH := $(deptools):$(PATH)
|
||||||
|
|
||||||
all: service ctl
|
all: service ctl
|
||||||
|
@ -63,6 +66,9 @@ test: deps
|
||||||
test_sharness: $(sharness)
|
test_sharness: $(sharness)
|
||||||
@sh sharness/run-sharness-tests.sh
|
@sh sharness/run-sharness-tests.sh
|
||||||
|
|
||||||
|
test_problem: deps
|
||||||
|
go test -tags debug -v -run $(problematic_test)
|
||||||
|
|
||||||
$(sharness):
|
$(sharness):
|
||||||
@echo "Downloading sharness"
|
@echo "Downloading sharness"
|
||||||
@wget -q -O sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
|
@wget -q -O sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
|
||||||
|
|
65
cluster.go
65
cluster.go
|
@ -10,10 +10,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
pnet "github.com/libp2p/go-libp2p-pnet"
|
||||||
|
|
||||||
"github.com/ipfs/ipfs-cluster/api"
|
"github.com/ipfs/ipfs-cluster/api"
|
||||||
"github.com/ipfs/ipfs-cluster/consensus/raft"
|
"github.com/ipfs/ipfs-cluster/consensus/raft"
|
||||||
"github.com/ipfs/ipfs-cluster/state"
|
"github.com/ipfs/ipfs-cluster/state"
|
||||||
pnet "github.com/libp2p/go-libp2p-pnet"
|
|
||||||
|
|
||||||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
|
@ -217,31 +218,41 @@ func (c *Cluster) broadcastMetric(m api.Metric) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if leader == c.id {
|
// If a peer is down, the rpc call will get locked. Therefore,
|
||||||
// Leader needs to broadcast its metric to everyone
|
// we need to do it async. This way we keep broadcasting
|
||||||
// in case it goes down (new leader will have to detect this node went down)
|
// even if someone is down. Eventually those requests will
|
||||||
errs := c.multiRPC(peers,
|
// timeout in libp2p and the errors logged.
|
||||||
"Cluster",
|
go func() {
|
||||||
"PeerMonitorLogMetric",
|
if leader == c.id {
|
||||||
m,
|
// Leader needs to broadcast its metric to everyone
|
||||||
copyEmptyStructToIfaces(make([]struct{}, len(peers), len(peers))))
|
// in case it goes down (new leader will have to detect this node went down)
|
||||||
for i, e := range errs {
|
logger.Debugf("Leader %s about to broadcast metric %s to %s. Expires: %s", c.id, m.Name, peers, m.Expire)
|
||||||
if e != nil {
|
|
||||||
logger.Errorf("error pushing metric to %s: %s", peers[i].Pretty(), e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// non-leaders just need to forward their metrics to the leader
|
|
||||||
err := c.rpcClient.Call(leader,
|
|
||||||
"Cluster", "PeerMonitorLogMetric",
|
|
||||||
m, &struct{}{})
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Debugf("sent metric %s", m.Name)
|
errs := c.multiRPC(peers,
|
||||||
|
"Cluster",
|
||||||
|
"PeerMonitorLogMetric",
|
||||||
|
m,
|
||||||
|
copyEmptyStructToIfaces(make([]struct{}, len(peers), len(peers))))
|
||||||
|
for i, e := range errs {
|
||||||
|
if e != nil {
|
||||||
|
logger.Errorf("error pushing metric to %s: %s", peers[i].Pretty(), e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.Debugf("Leader %s broadcasted metric %s to %s. Expires: %s", c.id, m.Name, peers, m.Expire)
|
||||||
|
} else {
|
||||||
|
// non-leaders just need to forward their metrics to the leader
|
||||||
|
logger.Debugf("Peer %s about to send metric %s to %s. Expires: %s", c.id, m.Name, leader, m.Expire)
|
||||||
|
|
||||||
|
err := c.rpcClient.Call(leader,
|
||||||
|
"Cluster", "PeerMonitorLogMetric",
|
||||||
|
m, &struct{}{})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
}
|
||||||
|
logger.Debugf("Peer %s sent metric %s to %s. Expires: %s", c.id, m.Name, leader, m.Expire)
|
||||||
|
|
||||||
|
}
|
||||||
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,6 +283,7 @@ func (c *Cluster) pushInformerMetrics() {
|
||||||
timer.Stop() // no need to drain C if we are here
|
timer.Stop() // no need to drain C if we are here
|
||||||
timer.Reset(metric.GetTTL() / 2)
|
timer.Reset(metric.GetTTL() / 2)
|
||||||
}
|
}
|
||||||
|
logger.Debugf("Peer %s. Finished pushInformerMetrics", c.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) pushPingMetrics() {
|
func (c *Cluster) pushPingMetrics() {
|
||||||
|
@ -291,6 +303,7 @@ func (c *Cluster) pushPingMetrics() {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
logger.Debugf("Peer %s. Finished pushPingMetrics", c.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// read the alerts channel from the monitor and triggers repins
|
// read the alerts channel from the monitor and triggers repins
|
||||||
|
@ -303,7 +316,7 @@ func (c *Cluster) alertsHandler() {
|
||||||
// only the leader handles alerts
|
// only the leader handles alerts
|
||||||
leader, err := c.consensus.Leader()
|
leader, err := c.consensus.Leader()
|
||||||
if err == nil && leader == c.id {
|
if err == nil && leader == c.id {
|
||||||
logger.Warningf("Received alert for %s in %s", alrt.MetricName, alrt.Peer.Pretty())
|
logger.Warningf("Peer %s received alert for %s in %s", c.id, alrt.MetricName, alrt.Peer.Pretty())
|
||||||
switch alrt.MetricName {
|
switch alrt.MetricName {
|
||||||
case "ping":
|
case "ping":
|
||||||
c.repinFromPeer(alrt.Peer)
|
c.repinFromPeer(alrt.Peer)
|
||||||
|
|
|
@ -865,6 +865,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
|
||||||
if pinfo.Status == api.TrackerStatusPinned {
|
if pinfo.Status == api.TrackerStatusPinned {
|
||||||
//t.Logf("Killing %s", c.id.Pretty())
|
//t.Logf("Killing %s", c.id.Pretty())
|
||||||
killedClusterIndex = i
|
killedClusterIndex = i
|
||||||
|
t.Logf("Shutting down %s", c.ID().ID)
|
||||||
c.Shutdown()
|
c.Shutdown()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -873,8 +874,9 @@ func TestClustersReplicationRealloc(t *testing.T) {
|
||||||
// let metrics expire and give time for the cluster to
|
// let metrics expire and give time for the cluster to
|
||||||
// see if they have lost the leader
|
// see if they have lost the leader
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
|
|
||||||
waitForLeader(t, clusters)
|
waitForLeader(t, clusters)
|
||||||
|
// wait for new metrics to arrive
|
||||||
|
delay()
|
||||||
|
|
||||||
// Make sure we haven't killed our randomly
|
// Make sure we haven't killed our randomly
|
||||||
// selected cluster
|
// selected cluster
|
||||||
|
|
|
@ -178,7 +178,7 @@ func (mon *StdPeerMonitor) LogMetric(m api.Metric) {
|
||||||
mbyp[peer] = pmets
|
mbyp[peer] = pmets
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("logged '%s' metric from '%s'", name, peer)
|
logger.Debugf("logged '%s' metric from '%s'. Expires on %s", name, peer, m.Expire)
|
||||||
pmets.add(m)
|
pmets.add(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,6 +308,7 @@ func (mon *StdPeerMonitor) checkMetrics(peers []peer.ID, metricName string) {
|
||||||
}
|
}
|
||||||
// send alert if metric is expired (but was valid at some point)
|
// send alert if metric is expired (but was valid at some point)
|
||||||
if last.Valid && last.Expired() {
|
if last.Valid && last.Expired() {
|
||||||
|
logger.Debugf("Metric %s from peer %s expired at %s", metricName, p, last.Expire)
|
||||||
mon.sendAlert(p, metricName)
|
mon.sendAlert(p, metricName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user