health/alerts endpoint: brush up old PR

This commit is contained in:
Hector Sanjuan 2021-01-13 22:09:21 +01:00
parent 4bcb91ee2b
commit 90208b45f9
14 changed files with 91 additions and 66 deletions

View File

@ -96,8 +96,9 @@ type Client interface {
// Otherwise, it happens everywhere.
RecoverAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error)
// Alerts returns things that are wrong with cluster.
Alerts(ctx context.Context) (map[string]api.Alert, error)
// Alerts returns information health events in the cluster (expired
// metrics etc.).
Alerts(ctx context.Context) ([]*api.Alert, error)
// Version returns the ipfs-cluster peer's version.
Version(context.Context) (*api.Version, error)
@ -169,7 +170,7 @@ type Config struct {
}
// AsTemplateFor creates client configs from resolved multiaddresses
func (c *Config) AsTemplateFor(addrs []ma.Multiaddr) ([]*Config) {
func (c *Config) AsTemplateFor(addrs []ma.Multiaddr) []*Config {
var cfgs []*Config
for _, addr := range addrs {
cfg := *c
@ -391,7 +392,7 @@ func resolveAddr(ctx context.Context, addr ma.Multiaddr) ([]ma.Multiaddr, error)
if err != nil {
return nil, err
}
if len(resolved) == 0 {
return nil, fmt.Errorf("resolving %s returned 0 results", addr)
}

View File

@ -301,8 +301,8 @@ func (lc *loadBalancingClient) RecoverAll(ctx context.Context, local bool) ([]*a
}
// Alerts returns things that are wrong with cluster.
func (lc *loadBalancingClient) Alerts(ctx context.Context) (map[string]api.Alert, error) {
var alerts map[string]api.Alert
func (lc *loadBalancingClient) Alerts(ctx context.Context) ([]*api.Alert, error) {
var alerts []*api.Alert
call := func(c Client) error {
var err error
alerts, err = c.Alerts(ctx)

View File

@ -274,13 +274,14 @@ func (c *defaultClient) RecoverAll(ctx context.Context, local bool) ([]*api.Glob
return gpis, err
}
// Alerts returns things that are wrong with cluster.
func (c *defaultClient) Alerts(ctx context.Context) (map[string]api.Alert, error) {
// Alerts returns information health events in the cluster (expired metrics
// etc.).
func (c *defaultClient) Alerts(ctx context.Context) ([]*api.Alert, error) {
ctx, span := trace.StartSpan(ctx, "client/Alert")
defer span.End()
var alerts map[string]api.Alert
err := c.do(ctx, "GET", "/alerts", nil, nil, &alerts)
var alerts []*api.Alert
err := c.do(ctx, "GET", "/health/alerts", nil, nil, &alerts)
return alerts, err
}

View File

@ -427,9 +427,8 @@ func TestAlerts(t *testing.T) {
if len(alerts) != 1 {
t.Fatal("expected 1 alert")
}
pID2 := peer.IDB58Encode(test.PeerID2)
_, ok := alerts[pID2]
if !ok {
pID2 := peer.Encode(test.PeerID2)
if alerts[0].Peer != test.PeerID2 {
t.Errorf("expected an alert from %s", pID2)
}
}

View File

@ -466,6 +466,12 @@ func (api *API) routes() []route {
"/health/graph",
api.graphHandler,
},
{
"Alerts",
"GET",
"/health/alerts",
api.alertsHandler,
},
{
"Metrics",
"GET",
@ -478,12 +484,6 @@ func (api *API) routes() []route {
"/monitor/metrics",
api.metricNamesHandler,
},
{
"Alerts",
"GET",
"/alerts",
api.alertsHandler,
},
}
}
@ -666,7 +666,7 @@ func (api *API) metricNamesHandler(w http.ResponseWriter, r *http.Request) {
}
func (api *API) alertsHandler(w http.ResponseWriter, r *http.Request) {
var alerts map[string]types.Alert
var alerts []types.Alert
err := api.rpcClient.CallContext(
r.Context(),
"",

View File

@ -855,8 +855,8 @@ func TestAPIAlertsEndpoint(t *testing.T) {
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url urlF) {
var resp map[string]api.Alert
makeGet(t, rest, url(rest)+"/alerts", &resp)
var resp []api.Alert
makeGet(t, rest, url(rest)+"/health/alerts", &resp)
if len(resp) != 1 {
t.Error("expected one alert")
}

View File

@ -1069,12 +1069,10 @@ func (es MetricSlice) Less(i, j int) bool {
return es[i].Peer < es[j].Peer
}
// Alert carries alerting information about a peer. WIP.
// Alert carries alerting information about a peer.
type Alert struct {
Peer peer.ID `json:"peer" codec:"p"`
MetricName string `json:"metric_name" codec:"m"`
Expiry int64 `json:"expiry" codec:"e"`
Value string `json:"value" codec:"v"`
Metric
TriggeredAt time.Time `json:"triggered_at" codec:"r,omitempty"`
}
// Error can be used by APIs to return errors.

View File

@ -42,6 +42,7 @@ const (
bootstrapCount = 3
reBootstrapInterval = 30 * time.Second
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
maxAlerts = 1000
)
var (
@ -74,7 +75,7 @@ type Cluster struct {
informers []Informer
tracer Tracer
alerts map[string]api.Alert
alerts []api.Alert
alertsMux sync.Mutex
doneCh chan struct{}
@ -163,7 +164,7 @@ func NewCluster(
allocator: allocator,
informers: informers,
tracer: tracer,
alerts: make(map[string]api.Alert),
alerts: []api.Alert{},
peerManager: peerManager,
shutdownB: false,
removed: false,
@ -388,16 +389,16 @@ func (c *Cluster) pushPingMetrics(ctx context.Context) {
}
}
// Alerts returns things that are wrong with the cluster.
func (c *Cluster) Alerts() map[string]api.Alert {
alerts := make(map[string]api.Alert)
// Alerts returns the last alerts recorded by this cluster peer with the most
// recent first.
func (c *Cluster) Alerts() []api.Alert {
alerts := make([]api.Alert, len(c.alerts), len(c.alerts))
c.alertsMux.Lock()
{
for i, alert := range c.alerts {
if time.Now().Before(time.Unix(0, alert.Expiry)) {
alerts[i] = alert
}
total := len(alerts)
for i, a := range c.alerts {
alerts[total-1-i] = a
}
}
c.alertsMux.Unlock()
@ -418,17 +419,18 @@ func (c *Cluster) alertsHandler() {
continue
}
logger.Warnf("metric alert for %s: Peer: %s.", alrt.MetricName, alrt.Peer)
logger.Warnf("metric alert for %s: Peer: %s.", alrt.Name, alrt.Peer)
c.alertsMux.Lock()
for pID, alert := range c.alerts {
if time.Now().After(time.Unix(0, alert.Expiry)) {
delete(c.alerts, pID)
{
if len(c.alerts) > maxAlerts {
c.alerts = c.alerts[:0]
}
c.alerts = append(c.alerts, *alrt)
}
c.alerts[peer.IDB58Encode(alrt.Peer)] = *alrt
c.alertsMux.Unlock()
if alrt.MetricName != pingMetricName {
if alrt.Name != pingMetricName {
continue // only handle ping alerts
}

View File

@ -66,6 +66,8 @@ func textFormatObject(resp interface{}) {
textFormatPrintError(r)
case *api.Metric:
textFormatPrintMetric(r)
case *api.Alert:
textFormatPrintAlert(r)
case []*api.ID:
for _, item := range r {
textFormatObject(item)
@ -96,9 +98,9 @@ func textFormatObject(resp interface{}) {
for _, item := range r {
textFormatObject(item)
}
case map[string]api.Alert:
for i := range resp.(map[string]api.Alert) {
fmt.Printf("peer is down: %s\n", i)
case []*api.Alert:
for _, item := range r {
textFormatObject(item)
}
default:
checkErr("", errors.New("unsupported type returned"))
@ -244,6 +246,15 @@ func textFormatPrintMetric(obj *api.Metric) {
fmt.Printf("%s | %s | Expires in: %s\n", peer.Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire)))
}
func textFormatPrintAlert(obj *api.Alert) {
fmt.Printf("%s: %s. Expired at: %s. Triggered at: %s\n",
obj.Peer,
obj.Name,
humanize.Time(time.Unix(0, obj.Expire)),
humanize.Time(obj.TriggeredAt),
)
}
func textFormatPrintGlobalRepoGC(obj *api.GlobalRepoGC) {
peers := make(sort.StringSlice, 0, len(obj.PeerMap))
for peer := range obj.PeerMap {

View File

@ -970,16 +970,24 @@ but usually are:
return nil
},
},
},
},
{
Name: "alerts",
Usage: "Show things which are wrong with this cluster",
Description: "Show things which are wrong with this cluster",
Action: func(c *cli.Context) error {
resp, cerr := globalClient.Alerts(ctx)
formatResponse(c, resp, cerr)
return nil
{
Name: "alerts",
Usage: "List the latest expired metric alerts",
Description: `
This command provides a list of "alerts" that the cluster has seen.
An alert is triggered when one of the metrics seen for a peer expires, and no
new metrics have been received.
Different alerts may be handled in different ways. i.e. ping alerts may
trigger automatic repinnings if configured.
`,
Action: func(c *cli.Context) error {
resp, cerr := globalClient.Alerts(ctx)
formatResponse(c, resp, cerr)
return nil
},
},
},
},
{

View File

@ -112,11 +112,11 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
}
failedMetrics[metricName]++
lastMetric := mc.metrics.PeerLatest(metricName, pid)
alrt := &api.Alert{
Peer: pid,
MetricName: metricName,
Expiry: time.Now().Add(30 * time.Second).UnixNano(),
Metric: *lastMetric,
TriggeredAt: time.Now(),
}
select {
case mc.alertCh <- alrt:

View File

@ -303,7 +303,7 @@ func TestPeerMonitorAlerts(t *testing.T) {
case <-timeout.C:
t.Fatal("should have thrown an alert by now")
case alrt := <-pm.Alerts():
if alrt.MetricName != "test" {
if alrt.Name != "test" {
t.Error("Alert should be for test")
}
if alrt.Peer != test.PeerID1 {

View File

@ -427,7 +427,7 @@ func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct
}
// Alerts runs Cluster.Alerts().
func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *map[string]api.Alert) error {
func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *[]api.Alert) error {
alerts := rpcapi.c.Alerts()
*out = alerts
return nil

View File

@ -341,12 +341,17 @@ func (mock *mockCluster) SendInformerMetric(ctx context.Context, in struct{}, ou
return nil
}
func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *map[string]api.Alert) error {
*out = map[string]api.Alert{
peer.IDB58Encode(PeerID2): api.Alert{
Peer: PeerID2,
MetricName: "ping",
Expiry: time.Now().Add(30 * time.Second).UnixNano(),
func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *[]api.Alert) error {
*out = []api.Alert{
api.Alert{
Metric: api.Metric{
Name: "ping",
Peer: PeerID2,
Expire: time.Now().Add(-30 * time.Second).UnixNano(),
Valid: true,
ReceivedAt: time.Now().Add(-60 * time.Second).UnixNano(),
},
TriggeredAt: time.Now(),
},
}
return nil