Fix publish cancelling contexts too early.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-05-02 09:43:16 +02:00
parent 3c3341e491
commit 72e1d64de2
2 changed files with 56 additions and 35 deletions

View File

@ -177,13 +177,20 @@ func (c *Cluster) syncWatcher() {
}
}
// push metrics loops and pushes metrics to the leader's monitor
// pushInformerMetrics loops and publishes informers metrics using the
// cluster monitor. Metrics are pushed normally at a TTL/2 rate. If an error
// occurs, they are pushed at a TTL/4 rate.
func (c *Cluster) pushInformerMetrics() {
timer := time.NewTimer(0) // fire immediately first
// The following control how often to make and log
// a retry
// retries counts how many retries we have made
retries := 0
retryWarnMod := 60
// retryWarnMod controls how often do we log
// "error broadcasting metric".
// It will do it in the first error, and then on every
// 10th.
retryWarnMod := 10
for {
select {
case <-c.ctx.Done():

View File

@ -5,6 +5,9 @@ package basic
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
@ -149,40 +152,51 @@ func (mon *Monitor) PublishMetric(m api.Metric) error {
logger.Error("PublishPeers could not list peers:", err)
}
ctxs, cancels := rpcutil.CtxsWithTimeout(mon.ctx, len(peers), m.GetTTL())
ctxs, cancels := rpcutil.CtxsWithTimeout(mon.ctx, len(peers), m.GetTTL()/2)
defer rpcutil.Multicancel(cancels)
// If a peer is down, the rpc call will get locked. Therefore,
// we need to do it async. This way we keep broadcasting
// even if someone is down. Eventually those requests will
// timeout in libp2p and the errors logged.
go func() {
logger.Debugf(
"broadcasting metric %s to %s. Expires: %d",
m.Name,
peers,
m.Expire,
)
errs := mon.rpcClient.MultiCall(
ctxs,
peers,
"Cluster",
"PeerMonitorLogMetric",
m,
rpcutil.RPCDiscardReplies(len(peers)),
)
for i, e := range errs {
if e != nil {
logger.Errorf("error pushing metric to %s: %s", peers[i].Pretty(), e)
}
logger.Debugf(
"broadcasting metric %s to %s. Expires: %d",
m.Name,
peers,
m.Expire,
)
// This may hang if one of the calls does, but we will return when the
// context expires.
errs := mon.rpcClient.MultiCall(
ctxs,
peers,
"Cluster",
"PeerMonitorLogMetric",
m,
rpcutil.RPCDiscardReplies(len(peers)),
)
var errStrs []string
for i, e := range errs {
if e != nil {
errStr := fmt.Sprintf(
"error pushing metric to %s: %s",
peers[i].Pretty(),
e,
)
logger.Errorf(errStr)
errStrs = append(errStrs, errStr)
}
logger.Debugf(
"broadcasted metric %s to [%s]. Expires: %d",
m.Name,
peers,
m.Expire,
)
}()
}
if len(errStrs) > 0 {
return errors.New(strings.Join(errStrs, "\n"))
}
logger.Debugf(
"broadcasted metric %s to [%s]. Expires: %d",
m.Name,
peers,
m.Expire,
)
return nil
}