Merge pull request #978 from ipfs/feat/alerts

ipfs-cluster-ctl alerts
This commit is contained in:
Hector Sanjuan 2021-01-14 00:31:16 +01:00 committed by GitHub
commit f9d67f52e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 229 additions and 18 deletions

View File

@ -96,6 +96,10 @@ type Client interface {
// Otherwise, it happens everywhere.
RecoverAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error)
// Alerts returns information health events in the cluster (expired
// metrics etc.).
Alerts(ctx context.Context) ([]*api.Alert, error)
// Version returns the ipfs-cluster peer's version.
Version(context.Context) (*api.Version, error)
@ -166,7 +170,7 @@ type Config struct {
}
// AsTemplateFor creates client configs from resolved multiaddresses
func (c *Config) AsTemplateFor(addrs []ma.Multiaddr) ([]*Config) {
func (c *Config) AsTemplateFor(addrs []ma.Multiaddr) []*Config {
var cfgs []*Config
for _, addr := range addrs {
cfg := *c
@ -388,7 +392,7 @@ func resolveAddr(ctx context.Context, addr ma.Multiaddr) ([]ma.Multiaddr, error)
if err != nil {
return nil, err
}
if len(resolved) == 0 {
return nil, fmt.Errorf("resolving %s returned 0 results", addr)
}

View File

@ -300,6 +300,19 @@ func (lc *loadBalancingClient) RecoverAll(ctx context.Context, local bool) ([]*a
return pinInfos, err
}
// Alerts returns things that are wrong with cluster.
func (lc *loadBalancingClient) Alerts(ctx context.Context) ([]*api.Alert, error) {
var alerts []*api.Alert
call := func(c Client) error {
var err error
alerts, err = c.Alerts(ctx)
return err
}
err := lc.retry(0, call)
return alerts, err
}
// Version returns the ipfs-cluster peer's version.
func (lc *loadBalancingClient) Version(ctx context.Context) (*api.Version, error) {
var v *api.Version

View File

@ -274,6 +274,17 @@ func (c *defaultClient) RecoverAll(ctx context.Context, local bool) ([]*api.Glob
return gpis, err
}
// Alerts returns information health events in the cluster (expired metrics
// etc.).
func (c *defaultClient) Alerts(ctx context.Context) ([]*api.Alert, error) {
ctx, span := trace.StartSpan(ctx, "client/Alert")
defer span.End()
var alerts []*api.Alert
err := c.do(ctx, "GET", "/health/alerts", nil, nil, &alerts)
return alerts, err
}
// Version returns the ipfs-cluster peer's version.
func (c *defaultClient) Version(ctx context.Context) (*api.Version, error) {
ctx, span := trace.StartSpan(ctx, "client/Version")

View File

@ -414,6 +414,28 @@ func TestRecoverAll(t *testing.T) {
testClients(t, api, testF)
}
func TestAlerts(t *testing.T) {
ctx := context.Background()
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c Client) {
alerts, err := c.Alerts(ctx)
if err != nil {
t.Fatal(err)
}
if len(alerts) != 1 {
t.Fatal("expected 1 alert")
}
pID2 := peer.Encode(test.PeerID2)
if alerts[0].Peer != test.PeerID2 {
t.Errorf("expected an alert from %s", pID2)
}
}
testClients(t, api, testF)
}
func TestGetConnectGraph(t *testing.T) {
ctx := context.Background()
api := testAPI(t)

View File

@ -466,6 +466,12 @@ func (api *API) routes() []route {
"/health/graph",
api.graphHandler,
},
{
"Alerts",
"GET",
"/health/alerts",
api.alertsHandler,
},
{
"Metrics",
"GET",
@ -659,6 +665,19 @@ func (api *API) metricNamesHandler(w http.ResponseWriter, r *http.Request) {
api.sendResponse(w, autoStatus, err, metricNames)
}
func (api *API) alertsHandler(w http.ResponseWriter, r *http.Request) {
var alerts []types.Alert
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"Alerts",
struct{}{},
&alerts,
)
api.sendResponse(w, autoStatus, err, alerts)
}
func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
reader, err := r.MultipartReader()
if err != nil {

View File

@ -849,6 +849,22 @@ func TestAPIMetricNamesEndpoint(t *testing.T) {
testBothEndpoints(t, tf)
}
func TestAPIAlertsEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url urlF) {
var resp []api.Alert
makeGet(t, rest, url(rest)+"/health/alerts", &resp)
if len(resp) != 1 {
t.Error("expected one alert")
}
}
testBothEndpoints(t, tf)
}
func TestAPIStatusAllEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)

View File

@ -1069,10 +1069,10 @@ func (es MetricSlice) Less(i, j int) bool {
return es[i].Peer < es[j].Peer
}
// Alert carries alerting information about a peer. WIP.
// Alert carries alerting information about a peer.
type Alert struct {
Peer peer.ID
MetricName string
Metric
TriggeredAt time.Time `json:"triggered_at" codec:"r,omitempty"`
}
// Error can be used by APIs to return errors.

View File

@ -42,6 +42,7 @@ const (
bootstrapCount = 3
reBootstrapInterval = 30 * time.Second
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
maxAlerts = 1000
)
var (
@ -74,6 +75,9 @@ type Cluster struct {
informers []Informer
tracer Tracer
alerts []api.Alert
alertsMux sync.Mutex
doneCh chan struct{}
readyCh chan struct{}
readyB bool
@ -160,6 +164,7 @@ func NewCluster(
allocator: allocator,
informers: informers,
tracer: tracer,
alerts: []api.Alert{},
peerManager: peerManager,
shutdownB: false,
removed: false,
@ -384,6 +389,23 @@ func (c *Cluster) pushPingMetrics(ctx context.Context) {
}
}
// Alerts returns the last alerts recorded by this cluster peer with the most
// recent first.
func (c *Cluster) Alerts() []api.Alert {
alerts := make([]api.Alert, len(c.alerts))
c.alertsMux.Lock()
{
total := len(alerts)
for i, a := range c.alerts {
alerts[total-1-i] = a
}
}
c.alertsMux.Unlock()
return alerts
}
// read the alerts channel from the monitor and triggers repins
func (c *Cluster) alertsHandler() {
for {
@ -397,8 +419,18 @@ func (c *Cluster) alertsHandler() {
continue
}
logger.Warnf("metric alert for %s: Peer: %s.", alrt.MetricName, alrt.Peer)
if alrt.MetricName != pingMetricName {
logger.Warnf("metric alert for %s: Peer: %s.", alrt.Name, alrt.Peer)
c.alertsMux.Lock()
{
if len(c.alerts) > maxAlerts {
c.alerts = c.alerts[:0]
}
c.alerts = append(c.alerts, *alrt)
}
c.alertsMux.Unlock()
if alrt.Name != pingMetricName {
continue // only handle ping alerts
}

View File

@ -66,6 +66,8 @@ func textFormatObject(resp interface{}) {
textFormatPrintError(r)
case *api.Metric:
textFormatPrintMetric(r)
case *api.Alert:
textFormatPrintAlert(r)
case []*api.ID:
for _, item := range r {
textFormatObject(item)
@ -96,6 +98,10 @@ func textFormatObject(resp interface{}) {
for _, item := range r {
textFormatObject(item)
}
case []*api.Alert:
for _, item := range r {
textFormatObject(item)
}
default:
checkErr("", errors.New("unsupported type returned"))
}
@ -240,6 +246,15 @@ func textFormatPrintMetric(obj *api.Metric) {
fmt.Printf("%s | %s | Expires in: %s\n", peer.Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire)))
}
func textFormatPrintAlert(obj *api.Alert) {
fmt.Printf("%s: %s. Expired at: %s. Triggered at: %s\n",
obj.Peer,
obj.Name,
humanize.Time(time.Unix(0, obj.Expire)),
humanize.Time(obj.TriggeredAt),
)
}
func textFormatPrintGlobalRepoGC(obj *api.GlobalRepoGC) {
peers := make(sort.StringSlice, 0, len(obj.PeerMap))
for peer := range obj.PeerMap {

View File

@ -970,6 +970,24 @@ but usually are:
return nil
},
},
{
Name: "alerts",
Usage: "List the latest expired metric alerts",
Description: `
This command provides a list of "alerts" that the cluster has seen.
An alert is triggered when one of the metrics seen for a peer expires, and no
new metrics have been received.
Different alerts may be handled in different ways. i.e. ping alerts may
trigger automatic repinnings if configured.
`,
Action: func(c *cli.Context) error {
resp, cerr := globalClient.Alerts(ctx)
formatResponse(c, resp, cerr)
return nil
},
},
},
},
{

View File

@ -401,19 +401,22 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
}
func shutdownClusters(t *testing.T, clusters []*Cluster, m []*test.IpfsMock) {
ctx := context.Background()
for i, c := range clusters {
err := c.Shutdown(ctx)
if err != nil {
t.Error(err)
}
c.dht.Close()
c.host.Close()
m[i].Close()
shutdownCluster(t, c, m[i])
}
os.RemoveAll(testsFolder)
}
func shutdownCluster(t *testing.T, c *Cluster, m *test.IpfsMock) {
err := c.Shutdown(context.Background())
if err != nil {
t.Error(err)
}
c.dht.Close()
c.host.Close()
m.Close()
}
func runF(t *testing.T, clusters []*Cluster, f func(*testing.T, *Cluster)) {
t.Helper()
var wg sync.WaitGroup
@ -2125,3 +2128,26 @@ func TestClusterPinsWithExpiration(t *testing.T) {
t.Error("pin should not be part of the state")
}
}
func TestClusterAlerts(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
if len(clusters) < 2 {
t.Skip("need at least 2 nodes for this test")
}
ttlDelay()
for _, c := range clusters[1:] {
c.Shutdown(ctx)
}
ttlDelay()
alerts := clusters[0].Alerts()
if len(alerts) == 0 {
t.Error("expected at least one alert")
}
}

View File

@ -99,6 +99,13 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
mc.failedPeers[pid] = make(map[string]int)
}
failedMetrics := mc.failedPeers[pid]
lastMetric := mc.metrics.PeerLatest(metricName, pid)
if lastMetric == nil {
lastMetric = &api.Metric{
Name: metricName,
Peer: pid,
}
}
// If above threshold, remove all metrics for that peer
// and clean up failedPeers when no failed metrics are left.
@ -114,8 +121,8 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
failedMetrics[metricName]++
alrt := &api.Alert{
Peer: pid,
MetricName: metricName,
Metric: *lastMetric,
TriggeredAt: time.Now(),
}
select {
case mc.alertCh <- alrt:

View File

@ -303,7 +303,7 @@ func TestPeerMonitorAlerts(t *testing.T) {
case <-timeout.C:
t.Fatal("should have thrown an alert by now")
case alrt := <-pm.Alerts():
if alrt.MetricName != "test" {
if alrt.Name != "test" {
t.Error("Alert should be for test")
}
if alrt.Peer != test.PeerID1 {

View File

@ -426,6 +426,13 @@ func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct
return nil
}
// Alerts runs Cluster.Alerts().
func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *[]api.Alert) error {
alerts := rpcapi.c.Alerts()
*out = alerts
return nil
}
/*
Tracker component methods
*/

View File

@ -26,6 +26,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.RepoGCLocal": RPCTrusted,
"Cluster.SendInformerMetric": RPCClosed,
"Cluster.SendInformersMetrics": RPCClosed,
"Cluster.Alerts": RPCClosed,
"Cluster.Status": RPCClosed,
"Cluster.StatusAll": RPCClosed,
"Cluster.StatusAllLocal": RPCClosed,

View File

@ -25,6 +25,10 @@ test_expect_success IPFS,CLUSTER "list latest metrics logged by this peer" '
ipfs-cluster-ctl health metrics freespace | grep -q -E "(^$pid \| freespace: [0-9]+ (G|M|K)B \| Expires in: [0-9]+ seconds from now)"
'
test_expect_success IPFS,CLUSTER "alerts must succeed" '
ipfs-cluster-ctl health alerts
'
test_clean_ipfs
test_clean_cluster

View File

@ -341,6 +341,22 @@ func (mock *mockCluster) SendInformerMetric(ctx context.Context, in struct{}, ou
return nil
}
func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *[]api.Alert) error {
*out = []api.Alert{
api.Alert{
Metric: api.Metric{
Name: "ping",
Peer: PeerID2,
Expire: time.Now().Add(-30 * time.Second).UnixNano(),
Valid: true,
ReceivedAt: time.Now().Add(-60 * time.Second).UnixNano(),
},
TriggeredAt: time.Now(),
},
}
return nil
}
/* Tracker methods */
func (mock *mockPinTracker) Track(ctx context.Context, in *api.Pin, out *struct{}) error {