diff --git a/monitor/pubsubmon/pubsubmon.go b/monitor/pubsubmon/pubsubmon.go index 65ef192f..77f83fd1 100644 --- a/monitor/pubsubmon/pubsubmon.go +++ b/monitor/pubsubmon/pubsubmon.go @@ -117,6 +117,11 @@ func (mon *Monitor) logFromPubsub() { ctx, span := trace.StartSpan(mon.ctx, "monitor/pubsub/logFromPubsub") defer span.End() + decodeWarningPrinted := false + // Previous versions use multicodec with the following header, which + // we need to remove. + multicodecPrefix := append([]byte{byte(9)}, []byte("/msgpack\n")...) + for { select { case <-ctx.Done(): @@ -133,9 +138,25 @@ func (mon *Monitor) logFromPubsub() { metric := api.Metric{} err = dec.Decode(&metric) if err != nil { - logger.Error(err) - continue + if bytes.HasPrefix(data, multicodecPrefix) { + buf := bytes.NewBuffer(data[len(multicodecPrefix):]) + dec := gocodec.NewDecoder(buf, msgpackHandle) + err = dec.Decode(&metric) + if err != nil { + logger.Error(err) + continue + } + // managed to decode an older version metric. Warn about it once. + if !decodeWarningPrinted { + logger.Warning("Peers in versions <= v0.13.3 detected. These peers will not receive metrics from this or other newer peers. Please upgrade them.") + decodeWarningPrinted = true + } + } else { + logger.Error(err) + continue + } } + logger.Debugf( "received pubsub metric '%s' from '%s'", metric.Name,