Introduction of ctl alerts

- basic version, just alerts if peers are down
This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-12-13 00:21:28 +05:30
parent 2da2dc8c86
commit 04069b8c81
10 changed files with 92 additions and 2 deletions

View File

@ -106,6 +106,9 @@ type Client interface {
// Otherwise, it happens everywhere.
RecoverAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error)
// Alerts returns things that are wrong with cluster.
Alerts(ctx context.Context) (map[string]api.Alert, error)
// Version returns the ipfs-cluster peer's version.
Version(context.Context) (*api.Version, error)

View File

@ -331,6 +331,19 @@ func (lc *loadBalancingClient) RecoverAll(ctx context.Context, local bool) ([]*a
return pinInfos, err
}
// Alerts returns things that are wrong with cluster.
func (lc *loadBalancingClient) Alerts(ctx context.Context) (map[string]api.Alert, error) {
var alerts map[string]api.Alert
call := func(c Client) error {
var err error
alerts, err = c.Alerts(ctx)
return err
}
err := lc.retry(0, call)
return alerts, err
}
// Version returns the ipfs-cluster peer's version.
func (lc *loadBalancingClient) Version(ctx context.Context) (*api.Version, error) {
var v *api.Version

View File

@ -306,6 +306,16 @@ func (c *defaultClient) RecoverAll(ctx context.Context, local bool) ([]*api.Glob
return gpis, err
}
// Alerts returns things that are wrong with cluster.
func (c *defaultClient) Alerts(ctx context.Context) (map[string]api.Alert, error) {
ctx, span := trace.StartSpan(ctx, "client/RecoverAll")
defer span.End()
var alerts map[string]api.Alert
err := c.do(ctx, "GET", "/alerts", nil, nil, &alerts)
return alerts, err
}
// Version returns the ipfs-cluster peer's version.
func (c *defaultClient) Version(ctx context.Context) (*api.Version, error) {
ctx, span := trace.StartSpan(ctx, "client/Version")

View File

@ -487,6 +487,12 @@ func (api *API) routes() []route {
"/monitor/metrics",
api.metricNamesHandler,
},
{
"Alerts",
"GET",
"/alerts",
api.alertsHandler,
},
}
}
@ -659,6 +665,19 @@ func (api *API) metricNamesHandler(w http.ResponseWriter, r *http.Request) {
api.sendResponse(w, autoStatus, err, metricNames)
}
func (api *API) alertsHandler(w http.ResponseWriter, r *http.Request) {
var alerts map[string]types.Alert
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"Alerts",
struct{}{},
&alerts,
)
api.sendResponse(w, autoStatus, err, alerts)
}
func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
reader, err := r.MultipartReader()
if err != nil {

View File

@ -943,8 +943,10 @@ func (es MetricSlice) Less(i, j int) bool {
// Alert carries alerting information about a peer. WIP.
type Alert struct {
Peer peer.ID
MetricName string
Peer peer.ID `json:"peer" codec:"p"`
MetricName string `json:"metric_name" codec:"m"`
Expiry int64 `json:"expiry" codec:"e"`
Value string `json:"value" codec:"v"`
}
// Error can be used by APIs to return errors.

View File

@ -75,6 +75,9 @@ type Cluster struct {
informers []Informer
tracer Tracer
alerts map[string]api.Alert
alertsMux sync.Mutex
doneCh chan struct{}
readyCh chan struct{}
readyB bool
@ -161,6 +164,7 @@ func NewCluster(
allocator: allocator,
informers: informers,
tracer: tracer,
alerts: make(map[string]api.Alert),
peerManager: peerManager,
shutdownB: false,
removed: false,
@ -391,6 +395,19 @@ func (c *Cluster) pushPingMetrics(ctx context.Context) {
}
}
// Alerts returns things that are wrong with the cluster.
func (c *Cluster) Alerts() map[string]api.Alert {
alerts := make(map[string]api.Alert)
c.alertsMux.Lock()
defer c.alertsMux.Unlock()
for i, alert := range c.alerts {
alerts[i] = alert
}
return alerts
}
// read the alerts channel from the monitor and triggers repins
func (c *Cluster) alertsHandler() {
for {
@ -399,6 +416,10 @@ func (c *Cluster) alertsHandler() {
return
case alrt := <-c.monitor.Alerts():
logger.Warningf("metric alert for %s: Peer: %s.", alrt.MetricName, alrt.Peer)
c.alertsMux.Lock()
c.alerts[peer.IDB58Encode(alrt.Peer)] = *alrt
c.alertsMux.Unlock()
if alrt.MetricName != pingMetricName {
continue // only handle ping alerts
}

View File

@ -97,6 +97,10 @@ func textFormatObject(resp interface{}) {
for _, item := range resp.([]string) {
textFormatObject(item)
}
case map[string]api.Alert:
for i := range resp.(map[string]api.Alert) {
fmt.Printf("peer is down: %s\n", i)
}
default:
checkErr("", errors.New("unsupported type returned"))
}

View File

@ -977,6 +977,16 @@ but usually are:
},
},
},
{
Name: "alerts",
Usage: "Show things which are wrong with this cluster",
Description: "Show things which are wrong with this cluster",
Action: func(c *cli.Context) error {
resp, cerr := globalClient.Alerts(ctx)
formatResponse(c, resp, cerr)
return nil
},
},
{
Name: "ipfs",
Usage: "Manage IPFS daemon",

View File

@ -459,6 +459,13 @@ func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct
return nil
}
// Alerts runs Cluster.Alerts().
func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *map[string]api.Alert) error {
alerts := rpcapi.c.Alerts()
*out = alerts
return nil
}
/*
Tracker component methods
*/

View File

@ -26,6 +26,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.RepoGCLocal": RPCTrusted,
"Cluster.SendInformerMetric": RPCClosed,
"Cluster.SendInformersMetrics": RPCClosed,
"Cluster.Alerts": RPCClosed,
"Cluster.Status": RPCClosed,
"Cluster.StatusAll": RPCClosed,
"Cluster.StatusAllLocal": RPCClosed,