fix passing ctx from daemon to pubsub

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
Adrian Lanzafame 2019-04-29 17:58:28 +10:00
parent 661de45908
commit 42693eb06d
No known key found for this signature in database
GPG Key ID: 87E40C5D62EAE192
6 changed files with 19 additions and 10 deletions

View File

@ -84,6 +84,7 @@ type Cluster struct {
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up.
func NewCluster(
ctx context.Context,
host host.Host,
dht *dht.IpfsDHT,
cfg *Config,
@ -106,7 +107,7 @@ func NewCluster(
return nil, errors.New("cluster host is nil")
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
listenAddrs := ""
for _, addr := range host.Addrs() {

View File

@ -175,6 +175,7 @@ func createCluster(
}
return ipfscluster.NewCluster(
ctx,
host,
dht,
cfgs.clusterCfg,

View File

@ -8,6 +8,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
peer "github.com/libp2p/go-libp2p-peer"
)
@ -92,7 +93,11 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
}
select {
case mc.alertCh <- alrt:
stats.Record(mc.ctx, observations.Alerts.M(1))
stats.RecordWithTags(
mc.ctx,
[]tag.Mutator{tag.Upsert(observations.RemotePeerKey, pid.Pretty())},
observations.Alerts.M(1),
)
default:
return ErrAlertChannelFull
}

View File

@ -86,7 +86,7 @@ func testPeerMonitor(t *testing.T) (*Monitor, host.Host, func()) {
cfg := &Config{}
cfg.Default()
cfg.CheckInterval = 2 * time.Second
mon, err := New(cfg, psub, peers)
mon, err := New(ctx, cfg, psub, peers)
if err != nil {
t.Fatal(err)
}

View File

@ -13,8 +13,8 @@ var logger = logging.Logger("observations")
var (
// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go)
latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
)
// attributes
@ -25,6 +25,7 @@ var (
// keys
var (
HostKey = makeKey("host")
RemotePeerKey = makeKey("remote_peer")
)
// metrics
@ -43,6 +44,7 @@ var (
var (
PinsView = &view.View{
Measure: Pins,
TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(),
}
@ -60,8 +62,8 @@ var (
AlertsView = &view.View{
Measure: Alerts,
TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(),
TagKeys: []tag.Key{HostKey, RemotePeerKey},
Aggregation: messageCountDistribution,
}
DefaultViews = []*view.View{

View File

@ -69,7 +69,7 @@ func setupMetrics(cfg *MetricsConfig) error {
procCollector := prom.NewProcessCollector(prom.ProcessCollectorOpts{})
registry.MustRegister(goCollector, procCollector)
pe, err := prometheus.NewExporter(prometheus.Options{
Namespace: "cluster",
Namespace: "ipfscluster",
Registry: registry,
})
if err != nil {