Informers: GetMetric() -> GetMetrics()

Support returning multiple metrics per informer.
This commit is contained in:
Hector Sanjuan 2021-09-13 14:12:36 +02:00
parent b6a46cd8a4
commit ea5e18078c
11 changed files with 92 additions and 55 deletions

View File

@ -46,7 +46,7 @@ func TestSortNumeric(t *testing.T) {
}
metrics[4].Expire = 0 // manually expire
sorted := SortNumeric(metrics, false)
sorted := SortNumeric(metrics)
if len(sorted) != 3 {
t.Fatal("sorter did not remove invalid metrics:")
}
@ -56,7 +56,7 @@ func TestSortNumeric(t *testing.T) {
t.Error("not sorted properly")
}
sortedRev := SortNumeric(metrics, true)
sortedRev := SortNumericReverse(metrics)
if len(sortedRev) != 3 {
t.Fatal("sorted did not remove invalid metrics")
}

View File

@ -16,6 +16,7 @@ import (
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/version"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
@ -282,28 +283,48 @@ func (c *Cluster) watchPinset() {
}
}
func (c *Cluster) sendInformerMetric(ctx context.Context, informer Informer) (*api.Metric, error) {
func (c *Cluster) sendInformerMetrics(ctx context.Context, informer Informer) (time.Duration, error) {
ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric")
defer span.End()
metric := informer.GetMetric(ctx)
metric.Peer = c.id
return metric, c.monitor.PublishMetric(ctx, metric)
var minTTL time.Duration
var errors error
metrics := informer.GetMetrics(ctx)
if len(metrics) == 0 {
logger.Errorf("informer %s produced no metrics", informer.Name())
return minTTL, nil
}
func (c *Cluster) sendInformersMetrics(ctx context.Context) ([]*api.Metric, error) {
for _, metric := range metrics {
metric.Peer = c.id
ttl := metric.GetTTL()
if minTTL == 0 {
minTTL = metric.GetTTL()
}
if ttl < minTTL && ttl > 0 {
minTTL = ttl
}
err := c.monitor.PublishMetric(ctx, metric)
if multierr.AppendInto(&errors, err) {
logger.Warnf("error sending metric %s: %s", metric.Name, err)
}
}
return minTTL, nil
}
func (c *Cluster) sendInformersMetrics(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "cluster/sendInformersMetrics")
defer span.End()
var metrics []*api.Metric
var errors error
for _, informer := range c.informers {
m, err := c.sendInformerMetric(ctx, informer)
if err != nil {
return nil, err
_, err := c.sendInformerMetrics(ctx, informer)
if multierr.AppendInto(&errors, err) {
logger.Warnf("informer %s did not send all metrics", informer.Name())
}
metrics = append(metrics, m)
}
return metrics, nil
return errors
}
// pushInformerMetrics loops and publishes informers metrics using the
@ -331,20 +352,24 @@ func (c *Cluster) pushInformerMetrics(ctx context.Context, informer Informer) {
// wait
}
metric, err := c.sendInformerMetric(ctx, informer)
minTTL, err := c.sendInformerMetrics(ctx, informer)
if minTTL == 0 {
logger.Errorf("informer %s reported metric ttl 0. This must be a bug. Aborting this informer", informer.Name())
return
}
if err != nil {
if (retries % retryWarnMod) == 0 {
logger.Errorf("error broadcasting metric: %s", err)
retries++
}
// retry sooner
timer.Reset(metric.GetTTL() / 4)
timer.Reset(minTTL / 4)
continue
}
retries = 0
// send metric again in TTL/2
timer.Reset(metric.GetTTL() / 2)
timer.Reset(minTTL / 2)
}
}
@ -964,7 +989,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
}
// Broadcast our metrics to the world
_, err = c.sendInformersMetrics(ctx)
err = c.sendInformersMetrics(ctx)
if err != nil {
logger.Warn(err)
}

View File

@ -92,9 +92,9 @@ func (disk *Informer) Shutdown(ctx context.Context) error {
return nil
}
// GetMetric returns the metric obtained by this
// Informer.
func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
// GetMetrics returns the metric obtained by this Informer. It must always
// return at least one metric.
func (disk *Informer) GetMetrics(ctx context.Context) []*api.Metric {
ctx, span := trace.StartSpan(ctx, "informer/disk/GetMetric")
defer span.End()
@ -103,10 +103,10 @@ func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
disk.mu.Unlock()
if rpcClient == nil {
return &api.Metric{
return []*api.Metric{&api.Metric{
Name: disk.Name(),
Valid: false,
}
}}
}
var repoStat api.IPFSRepoStat
@ -147,5 +147,5 @@ func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
}
m.SetTTL(disk.config.MetricTTL)
return m
return []*api.Metric{m}
}

View File

@ -28,6 +28,16 @@ func (mock *badRPCService) RepoStat(ctx context.Context, in struct{}, out *api.I
return errors.New("fake error")
}
// Returns the first metric
func getMetrics(t *testing.T, inf *Informer) *api.Metric {
t.Helper()
metrics := inf.GetMetrics(context.Background())
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
return metrics[0]
}
func Test(t *testing.T) {
ctx := context.Background()
cfg := &Config{}
@ -37,12 +47,12 @@ func Test(t *testing.T) {
t.Fatal(err)
}
defer inf.Shutdown(ctx)
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric(ctx)
m = getMetrics(t, inf)
if !m.Valid {
t.Error("metric should be valid")
}
@ -59,12 +69,12 @@ func TestFreeSpace(t *testing.T) {
t.Fatal(err)
}
defer inf.Shutdown(ctx)
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric(ctx)
m = getMetrics(t, inf)
if !m.Valid {
t.Error("metric should be valid")
}
@ -85,12 +95,12 @@ func TestRepoSize(t *testing.T) {
t.Fatal(err)
}
defer inf.Shutdown(ctx)
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric(ctx)
m = getMetrics(t, inf)
if !m.Valid {
t.Error("metric should be valid")
}
@ -110,7 +120,7 @@ func TestWithErrors(t *testing.T) {
}
defer inf.Shutdown(ctx)
inf.SetClient(badRPCClient(t))
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Errorf("metric should be invalid")
}

View File

@ -62,17 +62,17 @@ func (npi *Informer) Name() string {
return MetricName
}
// GetMetric contacts the IPFSConnector component and
// requests the `pin ls` command. We return the number
// of pins in IPFS.
func (npi *Informer) GetMetric(ctx context.Context) *api.Metric {
// GetMetrics contacts the IPFSConnector component and requests the `pin ls`
// command. We return the number of pins in IPFS. It must always return at
// least one metric.
func (npi *Informer) GetMetrics(ctx context.Context) []*api.Metric {
ctx, span := trace.StartSpan(ctx, "informer/numpin/GetMetric")
defer span.End()
if npi.rpcClient == nil {
return &api.Metric{
return []*api.Metric{&api.Metric{
Valid: false,
}
}}
}
pinMap := make(map[string]api.IPFSPinStatus)
@ -97,5 +97,5 @@ func (npi *Informer) GetMetric(ctx context.Context) *api.Metric {
}
m.SetTTL(npi.config.MetricTTL)
return m
return []*api.Metric{m}
}

View File

@ -37,12 +37,22 @@ func Test(t *testing.T) {
if err != nil {
t.Fatal(err)
}
m := inf.GetMetric(ctx)
metrics := inf.GetMetrics(ctx)
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
m := metrics[0]
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(mockRPCClient(t))
m = inf.GetMetric(ctx)
metrics = inf.GetMetrics(ctx)
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
m = metrics[0]
if !m.Valid {
t.Error("metric should be valid")
}

View File

@ -137,7 +137,7 @@ type PinTracker interface {
type Informer interface {
Component
Name() string
GetMetric(context.Context) *api.Metric
GetMetrics(context.Context) []*api.Metric
}
// PinAllocator decides where to pin certain content. In order to make such

View File

@ -990,14 +990,13 @@ func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
return nil
}
var metrics []*api.Metric
err := ipfs.rpcClient.GoContext(
ctx,
"",
"Cluster",
"SendInformersMetrics",
struct{}{},
&metrics,
&struct{}{},
nil,
)
if err != nil {

View File

@ -406,24 +406,17 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out *
}
// SendInformerMetric runs Cluster.sendInformerMetric().
func (rpcapi *ClusterRPCAPI) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
m, err := rpcapi.c.sendInformerMetric(ctx, rpcapi.c.informers[0])
func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error {
_, err := rpcapi.c.sendInformerMetrics(ctx, rpcapi.c.informers[0])
if err != nil {
return err
}
*out = *m
return nil
}
// SendInformersMetrics runs Cluster.sendInformerMetric() on all informers.
func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct{}, out *[]*api.Metric) error {
var metrics []*api.Metric
metrics, err := rpcapi.c.sendInformersMetrics(ctx)
if err != nil {
return err
}
*out = metrics
return nil
func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct{}, out *struct{}) error {
return rpcapi.c.sendInformersMetrics(ctx)
}
// Alerts runs Cluster.Alerts().

View File

@ -24,7 +24,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.RecoverLocal": RPCTrusted,
"Cluster.RepoGC": RPCClosed,
"Cluster.RepoGCLocal": RPCTrusted,
"Cluster.SendInformerMetric": RPCClosed,
"Cluster.SendInformerMetrics": RPCClosed,
"Cluster.SendInformersMetrics": RPCClosed,
"Cluster.Alerts": RPCClosed,
"Cluster.Status": RPCClosed,

View File

@ -355,7 +355,7 @@ func (mock *mockCluster) RepoGCLocal(ctx context.Context, in struct{}, out *api.
return nil
}
func (mock *mockCluster) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
func (mock *mockCluster) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error {
return nil
}