5452b59a2e
* Update go-libp2p to v0.22.0 * Testing with go1.19 * build(deps): bump github.com/multiformats/go-multicodec Bumps [github.com/multiformats/go-multicodec](https://github.com/multiformats/go-multicodec) from 0.5.0 to 0.6.0. - [Release notes](https://github.com/multiformats/go-multicodec/releases) - [Commits](https://github.com/multiformats/go-multicodec/compare/v0.5.0...v0.6.0) --- updated-dependencies: - dependency-name: github.com/multiformats/go-multicodec dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipld/go-car from 0.4.0 to 0.5.0 Bumps [github.com/ipld/go-car](https://github.com/ipld/go-car) from 0.4.0 to 0.5.0. - [Release notes](https://github.com/ipld/go-car/releases) - [Commits](https://github.com/ipld/go-car/compare/v0.4.0...v0.5.0) --- updated-dependencies: - dependency-name: github.com/ipld/go-car dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/prometheus/client_golang Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.12.2 to 1.13.0. - [Release notes](https://github.com/prometheus/client_golang/releases) - [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md) - [Commits](https://github.com/prometheus/client_golang/compare/v1.12.2...v1.13.0) --- updated-dependencies: - dependency-name: github.com/prometheus/client_golang dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/hashicorp/go-hclog from 1.2.1 to 1.3.0 Bumps [github.com/hashicorp/go-hclog](https://github.com/hashicorp/go-hclog) from 1.2.1 to 1.3.0. - [Release notes](https://github.com/hashicorp/go-hclog/releases) - [Commits](https://github.com/hashicorp/go-hclog/compare/v1.2.1...v1.3.0) --- updated-dependencies: - dependency-name: github.com/hashicorp/go-hclog dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-ds-crdt from 0.3.6 to 0.3.7 Bumps [github.com/ipfs/go-ds-crdt](https://github.com/ipfs/go-ds-crdt) from 0.3.6 to 0.3.7. - [Release notes](https://github.com/ipfs/go-ds-crdt/releases) - [Commits](https://github.com/ipfs/go-ds-crdt/compare/v0.3.6...v0.3.7) --- updated-dependencies: - dependency-name: github.com/ipfs/go-ds-crdt dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/urfave/cli/v2 from 2.10.2 to 2.14.1 Bumps [github.com/urfave/cli/v2](https://github.com/urfave/cli) from 2.10.2 to 2.14.1. - [Release notes](https://github.com/urfave/cli/releases) - [Changelog](https://github.com/urfave/cli/blob/main/docs/CHANGELOG.md) - [Commits](https://github.com/urfave/cli/compare/v2.10.2...v2.14.1) --- updated-dependencies: - dependency-name: github.com/urfave/cli/v2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/libp2p/go-libp2p-http from 0.3.0 to 0.4.0 Bumps [github.com/libp2p/go-libp2p-http](https://github.com/libp2p/go-libp2p-http) from 0.3.0 to 0.4.0. - [Release notes](https://github.com/libp2p/go-libp2p-http/releases) - [Commits](https://github.com/libp2p/go-libp2p-http/compare/v0.3.0...v0.4.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-http dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/libp2p/go-libp2p-gorpc from 0.4.0 to 0.5.0 Bumps [github.com/libp2p/go-libp2p-gorpc](https://github.com/libp2p/go-libp2p-gorpc) from 0.4.0 to 0.5.0. - [Release notes](https://github.com/libp2p/go-libp2p-gorpc/releases) - [Commits](https://github.com/libp2p/go-libp2p-gorpc/compare/v0.4.0...v0.5.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-gorpc dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump contrib.go.opencensus.io/exporter/prometheus Bumps [contrib.go.opencensus.io/exporter/prometheus](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus) from 0.4.1 to 0.4.2. - [Release notes](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus/releases) - [Commits](https://github.com/census-ecosystem/opencensus-go-exporter-prometheus/compare/v0.4.1...v0.4.2) --- updated-dependencies: - dependency-name: contrib.go.opencensus.io/exporter/prometheus dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/libp2p/go-libp2p-raft from 0.1.8 to 0.2.0 Bumps [github.com/libp2p/go-libp2p-raft](https://github.com/libp2p/go-libp2p-raft) from 0.1.8 to 0.2.0. - [Release notes](https://github.com/libp2p/go-libp2p-raft/releases) - [Commits](https://github.com/libp2p/go-libp2p-raft/compare/v0.1.8...v0.2.0) --- updated-dependencies: - dependency-name: github.com/libp2p/go-libp2p-raft dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/urfave/cli from 1.22.9 to 1.22.10 Bumps [github.com/urfave/cli](https://github.com/urfave/cli) from 1.22.9 to 1.22.10. - [Release notes](https://github.com/urfave/cli/releases) - [Changelog](https://github.com/urfave/cli/blob/main/docs/CHANGELOG.md) - [Commits](https://github.com/urfave/cli/compare/v1.22.9...v1.22.10) --- updated-dependencies: - dependency-name: github.com/urfave/cli dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * Fix checker/linter/staticcheck warnings Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
314 lines
6.5 KiB
Go
314 lines
6.5 KiB
Go
package pubsubmon
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
|
"github.com/ipfs-cluster/ipfs-cluster/test"
|
|
|
|
libp2p "github.com/libp2p/go-libp2p"
|
|
host "github.com/libp2p/go-libp2p/core/host"
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
)
|
|
|
|
func init() {
|
|
// GossipSub needs to heartbeat to discover newly connected hosts
|
|
// This speeds things up a little.
|
|
pubsub.GossipSubHeartbeatInterval = 50 * time.Millisecond
|
|
}
|
|
|
|
type metricFactory struct {
|
|
l sync.Mutex
|
|
counter int
|
|
}
|
|
|
|
func newMetricFactory() *metricFactory {
|
|
return &metricFactory{
|
|
counter: 0,
|
|
}
|
|
}
|
|
|
|
func (mf *metricFactory) newMetric(n string, p peer.ID) api.Metric {
|
|
mf.l.Lock()
|
|
defer mf.l.Unlock()
|
|
m := api.Metric{
|
|
Name: n,
|
|
Peer: p,
|
|
Value: fmt.Sprintf("%d", mf.counter),
|
|
Valid: true,
|
|
}
|
|
m.SetTTL(5 * time.Second)
|
|
mf.counter++
|
|
return m
|
|
}
|
|
|
|
func (mf *metricFactory) count() int {
|
|
mf.l.Lock()
|
|
defer mf.l.Unlock()
|
|
return mf.counter
|
|
}
|
|
|
|
func peers(ctx context.Context) ([]peer.ID, error) {
|
|
return []peer.ID{test.PeerID1, test.PeerID2, test.PeerID3}, nil
|
|
}
|
|
|
|
func testPeerMonitor(t *testing.T) (*Monitor, host.Host, func()) {
|
|
ctx := context.Background()
|
|
h, err := libp2p.New(
|
|
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
psub, err := pubsub.NewGossipSub(
|
|
ctx,
|
|
h,
|
|
pubsub.WithMessageSigning(true),
|
|
pubsub.WithStrictSignatureVerification(true),
|
|
)
|
|
if err != nil {
|
|
h.Close()
|
|
t.Fatal(err)
|
|
}
|
|
|
|
mock := test.NewMockRPCClientWithHost(t, h)
|
|
cfg := &Config{}
|
|
cfg.Default()
|
|
cfg.CheckInterval = 2 * time.Second
|
|
mon, err := New(ctx, cfg, psub, peers)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
mon.SetClient(mock)
|
|
|
|
shutdownF := func() {
|
|
mon.Shutdown(ctx)
|
|
h.Close()
|
|
}
|
|
|
|
return mon, h, shutdownF
|
|
}
|
|
|
|
func TestPeerMonitorShutdown(t *testing.T) {
|
|
ctx := context.Background()
|
|
pm, _, shutdown := testPeerMonitor(t)
|
|
defer shutdown()
|
|
|
|
err := pm.Shutdown(ctx)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
err = pm.Shutdown(ctx)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
func TestLogMetricConcurrent(t *testing.T) {
|
|
ctx := context.Background()
|
|
pm, _, shutdown := testPeerMonitor(t)
|
|
defer shutdown()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(3)
|
|
|
|
// Insert 25 metrics
|
|
f := func() {
|
|
defer wg.Done()
|
|
for i := 0; i < 25; i++ {
|
|
mt := api.Metric{
|
|
Name: "test",
|
|
Peer: test.PeerID1,
|
|
Value: fmt.Sprintf("%d", time.Now().UnixNano()),
|
|
Valid: true,
|
|
}
|
|
mt.SetTTL(150 * time.Millisecond)
|
|
pm.LogMetric(ctx, mt)
|
|
time.Sleep(75 * time.Millisecond)
|
|
}
|
|
}
|
|
go f()
|
|
go f()
|
|
go f()
|
|
|
|
// Wait for at least two metrics to be inserted
|
|
time.Sleep(200 * time.Millisecond)
|
|
last := time.Now().Add(-500 * time.Millisecond)
|
|
|
|
for i := 0; i <= 20; i++ {
|
|
lastMtrcs := pm.LatestMetrics(ctx, "test")
|
|
|
|
// There should always 1 valid LatestMetric "test"
|
|
if len(lastMtrcs) != 1 {
|
|
t.Error("no valid metrics", len(lastMtrcs), i)
|
|
time.Sleep(75 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
n, err := strconv.Atoi(lastMtrcs[0].Value)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// The timestamp of the metric cannot be older than
|
|
// the timestamp from the last
|
|
current := time.Unix(0, int64(n))
|
|
if current.Before(last) {
|
|
t.Errorf("expected newer metric: Current: %s, Last: %s", current, last)
|
|
}
|
|
last = current
|
|
time.Sleep(75 * time.Millisecond)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestPeerMonitorLogMetric(t *testing.T) {
|
|
ctx := context.Background()
|
|
pm, _, shutdown := testPeerMonitor(t)
|
|
defer shutdown()
|
|
mf := newMetricFactory()
|
|
|
|
// dont fill window
|
|
pm.LogMetric(ctx, mf.newMetric("test", test.PeerID1))
|
|
pm.LogMetric(ctx, mf.newMetric("test", test.PeerID2))
|
|
pm.LogMetric(ctx, mf.newMetric("test", test.PeerID3))
|
|
|
|
// fill window
|
|
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
|
|
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
|
|
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
|
|
pm.LogMetric(ctx, mf.newMetric("test2", test.PeerID3))
|
|
|
|
latestMetrics := pm.LatestMetrics(ctx, "testbad")
|
|
if len(latestMetrics) != 0 {
|
|
t.Logf("%+v", latestMetrics)
|
|
t.Error("metrics should be empty")
|
|
}
|
|
|
|
latestMetrics = pm.LatestMetrics(ctx, "test")
|
|
if len(latestMetrics) != 3 {
|
|
t.Error("metrics should correspond to 3 hosts")
|
|
}
|
|
|
|
for _, v := range latestMetrics {
|
|
switch v.Peer {
|
|
case test.PeerID1:
|
|
if v.Value != "0" {
|
|
t.Error("bad metric value")
|
|
}
|
|
case test.PeerID2:
|
|
if v.Value != "1" {
|
|
t.Error("bad metric value")
|
|
}
|
|
case test.PeerID3:
|
|
if v.Value != "2" {
|
|
t.Error("bad metric value")
|
|
}
|
|
default:
|
|
t.Error("bad peer")
|
|
}
|
|
}
|
|
|
|
latestMetrics = pm.LatestMetrics(ctx, "test2")
|
|
if len(latestMetrics) != 1 {
|
|
t.Fatal("should only be one metric")
|
|
}
|
|
if latestMetrics[0].Value != fmt.Sprintf("%d", mf.count()-1) {
|
|
t.Error("metric is not last")
|
|
}
|
|
}
|
|
|
|
func TestPeerMonitorPublishMetric(t *testing.T) {
|
|
ctx := context.Background()
|
|
pm, host, shutdown := testPeerMonitor(t)
|
|
defer shutdown()
|
|
|
|
pm2, host2, shutdown2 := testPeerMonitor(t)
|
|
defer shutdown2()
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
err := host.Connect(
|
|
context.Background(),
|
|
peer.AddrInfo{
|
|
ID: host2.ID(),
|
|
Addrs: host2.Addrs(),
|
|
},
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
mf := newMetricFactory()
|
|
|
|
metric := mf.newMetric("test", test.PeerID1)
|
|
err = pm.PublishMetric(ctx, metric)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
checkMetric := func(t *testing.T, pm *Monitor) {
|
|
latestMetrics := pm.LatestMetrics(ctx, "test")
|
|
if len(latestMetrics) != 1 {
|
|
t.Fatal(host.ID(), "expected 1 published metric")
|
|
}
|
|
t.Log(host.ID(), "received metric")
|
|
|
|
receivedMetric := latestMetrics[0]
|
|
if receivedMetric.Peer != metric.Peer ||
|
|
receivedMetric.Expire != metric.Expire ||
|
|
receivedMetric.Value != metric.Value ||
|
|
receivedMetric.Valid != metric.Valid ||
|
|
receivedMetric.Name != metric.Name {
|
|
t.Fatal("it should be exactly the same metric we published")
|
|
}
|
|
}
|
|
|
|
t.Log("pm1")
|
|
checkMetric(t, pm)
|
|
t.Log("pm2")
|
|
checkMetric(t, pm2)
|
|
}
|
|
|
|
func TestPeerMonitorAlerts(t *testing.T) {
|
|
ctx := context.Background()
|
|
pm, _, shutdown := testPeerMonitor(t)
|
|
defer shutdown()
|
|
mf := newMetricFactory()
|
|
|
|
mtr := mf.newMetric("test", test.PeerID1)
|
|
mtr.SetTTL(0)
|
|
pm.LogMetric(ctx, mtr)
|
|
time.Sleep(time.Second)
|
|
timeout := time.NewTimer(time.Second * 5)
|
|
|
|
// it should alert once.
|
|
for i := 0; i < 1; i++ {
|
|
select {
|
|
case <-timeout.C:
|
|
t.Fatal("should have thrown an alert by now")
|
|
case alrt := <-pm.Alerts():
|
|
if alrt.Name != "test" {
|
|
t.Error("Alert should be for test")
|
|
}
|
|
if alrt.Peer != test.PeerID1 {
|
|
t.Error("Peer should be TestPeerID1")
|
|
}
|
|
}
|
|
}
|
|
}
|