Merge branch 'master' into dependency-upgrades
This commit is contained in:
commit
321c894df2
51
CHANGELOG.md
51
CHANGELOG.md
|
@ -1,5 +1,56 @@
|
||||||
# IPFS Cluster Changelog
|
# IPFS Cluster Changelog
|
||||||
|
|
||||||
|
### v0.13.1 - 2021-01-14
|
||||||
|
|
||||||
|
IPFS Cluster v0.13.1 is a maintenance release with some bugfixes and updated
|
||||||
|
dependencies. It should be fully backwards compatible.
|
||||||
|
|
||||||
|
This release deprecates `secio` (as required by libp2p), but this was already
|
||||||
|
the lowest priority security transport and `tls` would have been used by default.
|
||||||
|
The new `noise` transport becomes the preferred option.
|
||||||
|
|
||||||
|
#### List of changes
|
||||||
|
|
||||||
|
##### Features
|
||||||
|
|
||||||
|
* Support for multiple architectures added to the Docker container | [ipfs/ipfs-cluster#1085](https://github.com/ipfs/ipfs-cluster/issues/1085) | [ipfs/ipfs-cluster#1196](https://github.com/ipfs/ipfs-cluster/issues/1196)
|
||||||
|
* Add `--name` and `--expire` to `ipfs-cluster-ctl pin update` | [ipfs/ipfs-cluster#1184](https://github.com/ipfs/ipfs-cluster/issues/1184) | [ipfs/ipfs-cluster#1195](https://github.com/ipfs/ipfs-cluster/issues/1195)
|
||||||
|
* Failover client integrated in `ipfs-cluster-ctl` | [ipfs/ipfs-cluster#1222](https://github.com/ipfs/ipfs-cluster/issues/1222) | [ipfs/ipfs-cluster#1250](https://github.com/ipfs/ipfs-cluster/issues/1250)
|
||||||
|
* `ipfs-cluster-ctl health alerts` lists the last expired metrics seen by the peer | [ipfs/ipfs-cluster#165](https://github.com/ipfs/ipfs-cluster/issues/165) | [ipfs/ipfs-cluster#978](https://github.com/ipfs/ipfs-cluster/issues/978)
|
||||||
|
|
||||||
|
##### Bug fixes
|
||||||
|
|
||||||
|
* IPFS Proxy: pin progress objects wrongly includes non empty `Hash` key | [ipfs/ipfs-cluster#1286](https://github.com/ipfs/ipfs-cluster/issues/1286) | [ipfs/ipfs-cluster#1287](https://github.com/ipfs/ipfs-cluster/issues/1287)
|
||||||
|
* CRDT: Fix pubsub peer validation check | [ipfs/ipfs-cluster#1288](https://github.com/ipfs/ipfs-cluster/issues/1288)
|
||||||
|
|
||||||
|
##### Other changes
|
||||||
|
|
||||||
|
* Typos | [ipfs/ipfs-cluster#1181](https://github.com/ipfs/ipfs-cluster/issues/1181) | [ipfs/ipfs-cluster#1183](https://github.com/ipfs/ipfs-cluster/issues/1183)
|
||||||
|
* Reduce default pin_timeout to 2 minutes | [ipfs/ipfs-cluster#1160](https://github.com/ipfs/ipfs-cluster/issues/1160)
|
||||||
|
* Dependency upgrades | [ipfs/ipfs-cluster#1125](https://github.com/ipfs/ipfs-cluster/issues/1125) | [ipfs/ipfs-cluster#1238](https://github.com/ipfs/ipfs-cluster/issues/1238)
|
||||||
|
* Remove `secio` security transport | [ipfs/ipfs-cluster#1214](https://github.com/ipfs/ipfs-cluster/issues/1214) | [ipfs/ipfs-cluster#1227](https://github.com/ipfs/ipfs-cluster/issues/1227)
|
||||||
|
|
||||||
|
#### Upgrading notices
|
||||||
|
|
||||||
|
##### Configuration changes
|
||||||
|
|
||||||
|
The new default for `ipfs_http.pin_timeout` is `2m`. This is the time that
|
||||||
|
needs to pass for a pin operation to error and it starts counting from the
|
||||||
|
last block pinned.
|
||||||
|
|
||||||
|
##### REST API
|
||||||
|
|
||||||
|
A new `/health/alerts` endpoint exists to support `ipfs-cluster-ctl health alerts`.
|
||||||
|
|
||||||
|
##### Go APIs
|
||||||
|
|
||||||
|
The definition of `types.Alert` has changed. This type was not exposed to the
|
||||||
|
outside before. RPC endpoints affected are only used locally.
|
||||||
|
|
||||||
|
##### Other
|
||||||
|
|
||||||
|
Nothing.
|
||||||
|
|
||||||
### v0.13.0 - 2020-05-19
|
### v0.13.0 - 2020-05-19
|
||||||
|
|
||||||
IPFS Cluster v0.13.0 provides many improvements and bugfixes on multiple fronts.
|
IPFS Cluster v0.13.0 provides many improvements and bugfixes on multiple fronts.
|
||||||
|
|
|
@ -96,6 +96,10 @@ type Client interface {
|
||||||
// Otherwise, it happens everywhere.
|
// Otherwise, it happens everywhere.
|
||||||
RecoverAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error)
|
RecoverAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error)
|
||||||
|
|
||||||
|
// Alerts returns information health events in the cluster (expired
|
||||||
|
// metrics etc.).
|
||||||
|
Alerts(ctx context.Context) ([]*api.Alert, error)
|
||||||
|
|
||||||
// Version returns the ipfs-cluster peer's version.
|
// Version returns the ipfs-cluster peer's version.
|
||||||
Version(context.Context) (*api.Version, error)
|
Version(context.Context) (*api.Version, error)
|
||||||
|
|
||||||
|
@ -166,7 +170,7 @@ type Config struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AsTemplateFor creates client configs from resolved multiaddresses
|
// AsTemplateFor creates client configs from resolved multiaddresses
|
||||||
func (c *Config) AsTemplateFor(addrs []ma.Multiaddr) ([]*Config) {
|
func (c *Config) AsTemplateFor(addrs []ma.Multiaddr) []*Config {
|
||||||
var cfgs []*Config
|
var cfgs []*Config
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
cfg := *c
|
cfg := *c
|
||||||
|
@ -388,7 +392,7 @@ func resolveAddr(ctx context.Context, addr ma.Multiaddr) ([]ma.Multiaddr, error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(resolved) == 0 {
|
if len(resolved) == 0 {
|
||||||
return nil, fmt.Errorf("resolving %s returned 0 results", addr)
|
return nil, fmt.Errorf("resolving %s returned 0 results", addr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -300,6 +300,19 @@ func (lc *loadBalancingClient) RecoverAll(ctx context.Context, local bool) ([]*a
|
||||||
return pinInfos, err
|
return pinInfos, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Alerts returns things that are wrong with cluster.
|
||||||
|
func (lc *loadBalancingClient) Alerts(ctx context.Context) ([]*api.Alert, error) {
|
||||||
|
var alerts []*api.Alert
|
||||||
|
call := func(c Client) error {
|
||||||
|
var err error
|
||||||
|
alerts, err = c.Alerts(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err := lc.retry(0, call)
|
||||||
|
return alerts, err
|
||||||
|
}
|
||||||
|
|
||||||
// Version returns the ipfs-cluster peer's version.
|
// Version returns the ipfs-cluster peer's version.
|
||||||
func (lc *loadBalancingClient) Version(ctx context.Context) (*api.Version, error) {
|
func (lc *loadBalancingClient) Version(ctx context.Context) (*api.Version, error) {
|
||||||
var v *api.Version
|
var v *api.Version
|
||||||
|
|
|
@ -274,6 +274,17 @@ func (c *defaultClient) RecoverAll(ctx context.Context, local bool) ([]*api.Glob
|
||||||
return gpis, err
|
return gpis, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Alerts returns information health events in the cluster (expired metrics
|
||||||
|
// etc.).
|
||||||
|
func (c *defaultClient) Alerts(ctx context.Context) ([]*api.Alert, error) {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "client/Alert")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var alerts []*api.Alert
|
||||||
|
err := c.do(ctx, "GET", "/health/alerts", nil, nil, &alerts)
|
||||||
|
return alerts, err
|
||||||
|
}
|
||||||
|
|
||||||
// Version returns the ipfs-cluster peer's version.
|
// Version returns the ipfs-cluster peer's version.
|
||||||
func (c *defaultClient) Version(ctx context.Context) (*api.Version, error) {
|
func (c *defaultClient) Version(ctx context.Context) (*api.Version, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "client/Version")
|
ctx, span := trace.StartSpan(ctx, "client/Version")
|
||||||
|
|
|
@ -414,6 +414,28 @@ func TestRecoverAll(t *testing.T) {
|
||||||
testClients(t, api, testF)
|
testClients(t, api, testF)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAlerts(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
api := testAPI(t)
|
||||||
|
defer shutdown(api)
|
||||||
|
|
||||||
|
testF := func(t *testing.T, c Client) {
|
||||||
|
alerts, err := c.Alerts(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(alerts) != 1 {
|
||||||
|
t.Fatal("expected 1 alert")
|
||||||
|
}
|
||||||
|
pID2 := peer.Encode(test.PeerID2)
|
||||||
|
if alerts[0].Peer != test.PeerID2 {
|
||||||
|
t.Errorf("expected an alert from %s", pID2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testClients(t, api, testF)
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetConnectGraph(t *testing.T) {
|
func TestGetConnectGraph(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
api := testAPI(t)
|
api := testAPI(t)
|
||||||
|
|
|
@ -466,6 +466,12 @@ func (api *API) routes() []route {
|
||||||
"/health/graph",
|
"/health/graph",
|
||||||
api.graphHandler,
|
api.graphHandler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"Alerts",
|
||||||
|
"GET",
|
||||||
|
"/health/alerts",
|
||||||
|
api.alertsHandler,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"Metrics",
|
"Metrics",
|
||||||
"GET",
|
"GET",
|
||||||
|
@ -659,6 +665,19 @@ func (api *API) metricNamesHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
api.sendResponse(w, autoStatus, err, metricNames)
|
api.sendResponse(w, autoStatus, err, metricNames)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (api *API) alertsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var alerts []types.Alert
|
||||||
|
err := api.rpcClient.CallContext(
|
||||||
|
r.Context(),
|
||||||
|
"",
|
||||||
|
"Cluster",
|
||||||
|
"Alerts",
|
||||||
|
struct{}{},
|
||||||
|
&alerts,
|
||||||
|
)
|
||||||
|
api.sendResponse(w, autoStatus, err, alerts)
|
||||||
|
}
|
||||||
|
|
||||||
func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
reader, err := r.MultipartReader()
|
reader, err := r.MultipartReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -849,6 +849,22 @@ func TestAPIMetricNamesEndpoint(t *testing.T) {
|
||||||
testBothEndpoints(t, tf)
|
testBothEndpoints(t, tf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAPIAlertsEndpoint(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
rest := testAPI(t)
|
||||||
|
defer rest.Shutdown(ctx)
|
||||||
|
|
||||||
|
tf := func(t *testing.T, url urlF) {
|
||||||
|
var resp []api.Alert
|
||||||
|
makeGet(t, rest, url(rest)+"/health/alerts", &resp)
|
||||||
|
if len(resp) != 1 {
|
||||||
|
t.Error("expected one alert")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testBothEndpoints(t, tf)
|
||||||
|
}
|
||||||
|
|
||||||
func TestAPIStatusAllEndpoint(t *testing.T) {
|
func TestAPIStatusAllEndpoint(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
rest := testAPI(t)
|
rest := testAPI(t)
|
||||||
|
|
|
@ -1069,10 +1069,10 @@ func (es MetricSlice) Less(i, j int) bool {
|
||||||
return es[i].Peer < es[j].Peer
|
return es[i].Peer < es[j].Peer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Alert carries alerting information about a peer. WIP.
|
// Alert carries alerting information about a peer.
|
||||||
type Alert struct {
|
type Alert struct {
|
||||||
Peer peer.ID
|
Metric
|
||||||
MetricName string
|
TriggeredAt time.Time `json:"triggered_at" codec:"r,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Error can be used by APIs to return errors.
|
// Error can be used by APIs to return errors.
|
||||||
|
|
42
cluster.go
42
cluster.go
|
@ -42,6 +42,7 @@ const (
|
||||||
bootstrapCount = 3
|
bootstrapCount = 3
|
||||||
reBootstrapInterval = 30 * time.Second
|
reBootstrapInterval = 30 * time.Second
|
||||||
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
|
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
|
||||||
|
maxAlerts = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -74,6 +75,9 @@ type Cluster struct {
|
||||||
informers []Informer
|
informers []Informer
|
||||||
tracer Tracer
|
tracer Tracer
|
||||||
|
|
||||||
|
alerts []api.Alert
|
||||||
|
alertsMux sync.Mutex
|
||||||
|
|
||||||
doneCh chan struct{}
|
doneCh chan struct{}
|
||||||
readyCh chan struct{}
|
readyCh chan struct{}
|
||||||
readyB bool
|
readyB bool
|
||||||
|
@ -137,10 +141,10 @@ func NewCluster(
|
||||||
if cfg.MDNSInterval > 0 {
|
if cfg.MDNSInterval > 0 {
|
||||||
mdns, err := discovery.NewMdnsService(ctx, host, cfg.MDNSInterval, mdnsServiceTag)
|
mdns, err := discovery.NewMdnsService(ctx, host, cfg.MDNSInterval, mdnsServiceTag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
logger.Warnf("mDNS could not be started: %s", err)
|
||||||
return nil, err
|
} else {
|
||||||
|
mdns.RegisterNotifee(peerManager)
|
||||||
}
|
}
|
||||||
mdns.RegisterNotifee(peerManager)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &Cluster{
|
c := &Cluster{
|
||||||
|
@ -160,6 +164,7 @@ func NewCluster(
|
||||||
allocator: allocator,
|
allocator: allocator,
|
||||||
informers: informers,
|
informers: informers,
|
||||||
tracer: tracer,
|
tracer: tracer,
|
||||||
|
alerts: []api.Alert{},
|
||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
shutdownB: false,
|
shutdownB: false,
|
||||||
removed: false,
|
removed: false,
|
||||||
|
@ -384,6 +389,23 @@ func (c *Cluster) pushPingMetrics(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Alerts returns the last alerts recorded by this cluster peer with the most
|
||||||
|
// recent first.
|
||||||
|
func (c *Cluster) Alerts() []api.Alert {
|
||||||
|
alerts := make([]api.Alert, len(c.alerts))
|
||||||
|
|
||||||
|
c.alertsMux.Lock()
|
||||||
|
{
|
||||||
|
total := len(alerts)
|
||||||
|
for i, a := range c.alerts {
|
||||||
|
alerts[total-1-i] = a
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.alertsMux.Unlock()
|
||||||
|
|
||||||
|
return alerts
|
||||||
|
}
|
||||||
|
|
||||||
// read the alerts channel from the monitor and triggers repins
|
// read the alerts channel from the monitor and triggers repins
|
||||||
func (c *Cluster) alertsHandler() {
|
func (c *Cluster) alertsHandler() {
|
||||||
for {
|
for {
|
||||||
|
@ -397,8 +419,18 @@ func (c *Cluster) alertsHandler() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Warnf("metric alert for %s: Peer: %s.", alrt.MetricName, alrt.Peer)
|
logger.Warnf("metric alert for %s: Peer: %s.", alrt.Name, alrt.Peer)
|
||||||
if alrt.MetricName != pingMetricName {
|
c.alertsMux.Lock()
|
||||||
|
{
|
||||||
|
if len(c.alerts) > maxAlerts {
|
||||||
|
c.alerts = c.alerts[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
c.alerts = append(c.alerts, *alrt)
|
||||||
|
}
|
||||||
|
c.alertsMux.Unlock()
|
||||||
|
|
||||||
|
if alrt.Name != pingMetricName {
|
||||||
continue // only handle ping alerts
|
continue // only handle ping alerts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,8 @@ func textFormatObject(resp interface{}) {
|
||||||
textFormatPrintError(r)
|
textFormatPrintError(r)
|
||||||
case *api.Metric:
|
case *api.Metric:
|
||||||
textFormatPrintMetric(r)
|
textFormatPrintMetric(r)
|
||||||
|
case *api.Alert:
|
||||||
|
textFormatPrintAlert(r)
|
||||||
case []*api.ID:
|
case []*api.ID:
|
||||||
for _, item := range r {
|
for _, item := range r {
|
||||||
textFormatObject(item)
|
textFormatObject(item)
|
||||||
|
@ -96,6 +98,10 @@ func textFormatObject(resp interface{}) {
|
||||||
for _, item := range r {
|
for _, item := range r {
|
||||||
textFormatObject(item)
|
textFormatObject(item)
|
||||||
}
|
}
|
||||||
|
case []*api.Alert:
|
||||||
|
for _, item := range r {
|
||||||
|
textFormatObject(item)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
checkErr("", errors.New("unsupported type returned"))
|
checkErr("", errors.New("unsupported type returned"))
|
||||||
}
|
}
|
||||||
|
@ -240,6 +246,15 @@ func textFormatPrintMetric(obj *api.Metric) {
|
||||||
fmt.Printf("%s | %s | Expires in: %s\n", peer.Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire)))
|
fmt.Printf("%s | %s | Expires in: %s\n", peer.Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func textFormatPrintAlert(obj *api.Alert) {
|
||||||
|
fmt.Printf("%s: %s. Expired at: %s. Triggered at: %s\n",
|
||||||
|
obj.Peer,
|
||||||
|
obj.Name,
|
||||||
|
humanize.Time(time.Unix(0, obj.Expire)),
|
||||||
|
humanize.Time(obj.TriggeredAt),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func textFormatPrintGlobalRepoGC(obj *api.GlobalRepoGC) {
|
func textFormatPrintGlobalRepoGC(obj *api.GlobalRepoGC) {
|
||||||
peers := make(sort.StringSlice, 0, len(obj.PeerMap))
|
peers := make(sort.StringSlice, 0, len(obj.PeerMap))
|
||||||
for peer := range obj.PeerMap {
|
for peer := range obj.PeerMap {
|
||||||
|
|
|
@ -28,7 +28,7 @@ const programName = `ipfs-cluster-ctl`
|
||||||
|
|
||||||
// Version is the cluster-ctl tool version. It should match
|
// Version is the cluster-ctl tool version. It should match
|
||||||
// the IPFS cluster's version
|
// the IPFS cluster's version
|
||||||
const Version = "0.13.0-next"
|
const Version = "0.13.1-next"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
defaultHost = "/ip4/127.0.0.1/tcp/9094"
|
defaultHost = "/ip4/127.0.0.1/tcp/9094"
|
||||||
|
@ -385,6 +385,15 @@ content.
|
||||||
Name: "nocopy",
|
Name: "nocopy",
|
||||||
Usage: "Add the URL using filestore. Implies raw-leaves. (experimental)",
|
Usage: "Add the URL using filestore. Implies raw-leaves. (experimental)",
|
||||||
},
|
},
|
||||||
|
cli.BoolFlag{
|
||||||
|
Name: "wait",
|
||||||
|
Usage: "Wait for all nodes to report a status of pinned before returning",
|
||||||
|
},
|
||||||
|
cli.DurationFlag{
|
||||||
|
Name: "wait-timeout, wt",
|
||||||
|
Value: 0,
|
||||||
|
Usage: "How long to --wait (in seconds), default is indefinitely",
|
||||||
|
},
|
||||||
// TODO: Uncomment when sharding is supported.
|
// TODO: Uncomment when sharding is supported.
|
||||||
// cli.BoolFlag{
|
// cli.BoolFlag{
|
||||||
// Name: "shard",
|
// Name: "shard",
|
||||||
|
@ -493,13 +502,15 @@ content.
|
||||||
if bufferResults { // we buffered.
|
if bufferResults { // we buffered.
|
||||||
if qq { // [last elem]
|
if qq { // [last elem]
|
||||||
formatResponse(c, []*addedOutputQuiet{lastBuf}, nil)
|
formatResponse(c, []*addedOutputQuiet{lastBuf}, nil)
|
||||||
return
|
} else { // [all elems]
|
||||||
|
formatResponse(c, buffered, nil)
|
||||||
}
|
}
|
||||||
// [all elems]
|
|
||||||
formatResponse(c, buffered, nil)
|
|
||||||
} else if qq { // we already printed unless Quieter
|
} else if qq { // we already printed unless Quieter
|
||||||
formatResponse(c, lastBuf, nil)
|
formatResponse(c, lastBuf, nil)
|
||||||
return
|
}
|
||||||
|
if c.Bool("wait") {
|
||||||
|
var _, cerr = waitFor(lastBuf.AddedOutput.Cid, api.TrackerStatusPinned, c.Duration("wait-timeout"))
|
||||||
|
checkErr("waiting for pin status", cerr)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -970,6 +981,24 @@ but usually are:
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "alerts",
|
||||||
|
Usage: "List the latest expired metric alerts",
|
||||||
|
Description: `
|
||||||
|
This command provides a list of "alerts" that the cluster has seen.
|
||||||
|
|
||||||
|
An alert is triggered when one of the metrics seen for a peer expires, and no
|
||||||
|
new metrics have been received.
|
||||||
|
|
||||||
|
Different alerts may be handled in different ways. i.e. ping alerts may
|
||||||
|
trigger automatic repinnings if configured.
|
||||||
|
`,
|
||||||
|
Action: func(c *cli.Context) error {
|
||||||
|
resp, cerr := globalClient.Alerts(ctx)
|
||||||
|
formatResponse(c, resp, cerr)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -59,7 +59,6 @@ require (
|
||||||
github.com/libp2p/go-ws-transport v0.4.0
|
github.com/libp2p/go-ws-transport v0.4.0
|
||||||
github.com/multiformats/go-multiaddr v0.3.1
|
github.com/multiformats/go-multiaddr v0.3.1
|
||||||
github.com/multiformats/go-multiaddr-dns v0.2.0
|
github.com/multiformats/go-multiaddr-dns v0.2.0
|
||||||
github.com/multiformats/go-multicodec v0.1.6
|
|
||||||
github.com/multiformats/go-multihash v0.0.14
|
github.com/multiformats/go-multihash v0.0.14
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/prometheus/client_golang v1.9.0
|
github.com/prometheus/client_golang v1.9.0
|
||||||
|
|
5
go.sum
5
go.sum
|
@ -416,8 +416,7 @@ github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9Dr
|
||||||
github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBRn4FS6UHUk=
|
github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBRn4FS6UHUk=
|
||||||
github.com/ipfs/go-ds-badger v0.2.6 h1:Hy8jw4rifxtRDrqpvC1yh36oIyE37KDzsUzlHUPOFiU=
|
github.com/ipfs/go-ds-badger v0.2.6 h1:Hy8jw4rifxtRDrqpvC1yh36oIyE37KDzsUzlHUPOFiU=
|
||||||
github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
|
github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
|
||||||
github.com/ipfs/go-ds-crdt v0.1.18 h1:pNoo/Q+J/K0L3eGZG2VwvFxeotbDH58u/ldmWsl0N7k=
|
github.com/ipfs/go-ds-crdt v0.1.19 h1:+i1wKzrvtUqqpZHszacVLfApd40o1dXInZb///gHtOo=
|
||||||
github.com/ipfs/go-ds-crdt v0.1.18/go.mod h1:pQsHZM76D03svN5pjnnUroB9IfFFgIQXICExchNw+Xk=
|
|
||||||
github.com/ipfs/go-ds-crdt v0.1.19/go.mod h1:pQsHZM76D03svN5pjnnUroB9IfFFgIQXICExchNw+Xk=
|
github.com/ipfs/go-ds-crdt v0.1.19/go.mod h1:pQsHZM76D03svN5pjnnUroB9IfFFgIQXICExchNw+Xk=
|
||||||
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
|
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
|
||||||
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
|
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
|
||||||
|
@ -979,8 +978,6 @@ github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5
|
||||||
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
|
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
|
||||||
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
|
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
|
||||||
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
|
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
|
||||||
github.com/multiformats/go-multicodec v0.1.6 h1:4u6lcjbE4VVVoigU4QJSSVYsGVP4j2jtDkR8lPwOrLE=
|
|
||||||
github.com/multiformats/go-multicodec v0.1.6/go.mod h1:lliaRHbcG8q33yf4Ot9BGD7JqR/Za9HE7HTyVyKwrUQ=
|
|
||||||
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
|
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
|
||||||
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
|
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
|
||||||
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
|
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
|
||||||
|
|
|
@ -401,19 +401,22 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func shutdownClusters(t *testing.T, clusters []*Cluster, m []*test.IpfsMock) {
|
func shutdownClusters(t *testing.T, clusters []*Cluster, m []*test.IpfsMock) {
|
||||||
ctx := context.Background()
|
|
||||||
for i, c := range clusters {
|
for i, c := range clusters {
|
||||||
err := c.Shutdown(ctx)
|
shutdownCluster(t, c, m[i])
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
c.dht.Close()
|
|
||||||
c.host.Close()
|
|
||||||
m[i].Close()
|
|
||||||
}
|
}
|
||||||
os.RemoveAll(testsFolder)
|
os.RemoveAll(testsFolder)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func shutdownCluster(t *testing.T, c *Cluster, m *test.IpfsMock) {
|
||||||
|
err := c.Shutdown(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
c.dht.Close()
|
||||||
|
c.host.Close()
|
||||||
|
m.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func runF(t *testing.T, clusters []*Cluster, f func(*testing.T, *Cluster)) {
|
func runF(t *testing.T, clusters []*Cluster, f func(*testing.T, *Cluster)) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
@ -2125,3 +2128,26 @@ func TestClusterPinsWithExpiration(t *testing.T) {
|
||||||
t.Error("pin should not be part of the state")
|
t.Error("pin should not be part of the state")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClusterAlerts(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
clusters, mock := createClusters(t)
|
||||||
|
defer shutdownClusters(t, clusters, mock)
|
||||||
|
|
||||||
|
if len(clusters) < 2 {
|
||||||
|
t.Skip("need at least 2 nodes for this test")
|
||||||
|
}
|
||||||
|
|
||||||
|
ttlDelay()
|
||||||
|
|
||||||
|
for _, c := range clusters[1:] {
|
||||||
|
c.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
ttlDelay()
|
||||||
|
|
||||||
|
alerts := clusters[0].Alerts()
|
||||||
|
if len(alerts) == 0 {
|
||||||
|
t.Error("expected at least one alert")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -99,6 +99,13 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
|
||||||
mc.failedPeers[pid] = make(map[string]int)
|
mc.failedPeers[pid] = make(map[string]int)
|
||||||
}
|
}
|
||||||
failedMetrics := mc.failedPeers[pid]
|
failedMetrics := mc.failedPeers[pid]
|
||||||
|
lastMetric := mc.metrics.PeerLatest(metricName, pid)
|
||||||
|
if lastMetric == nil {
|
||||||
|
lastMetric = &api.Metric{
|
||||||
|
Name: metricName,
|
||||||
|
Peer: pid,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If above threshold, remove all metrics for that peer
|
// If above threshold, remove all metrics for that peer
|
||||||
// and clean up failedPeers when no failed metrics are left.
|
// and clean up failedPeers when no failed metrics are left.
|
||||||
|
@ -114,8 +121,8 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
|
||||||
failedMetrics[metricName]++
|
failedMetrics[metricName]++
|
||||||
|
|
||||||
alrt := &api.Alert{
|
alrt := &api.Alert{
|
||||||
Peer: pid,
|
Metric: *lastMetric,
|
||||||
MetricName: metricName,
|
TriggeredAt: time.Now(),
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case mc.alertCh <- alrt:
|
case mc.alertCh <- alrt:
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
msgpack "github.com/multiformats/go-multicodec/msgpack"
|
gocodec "github.com/ugorji/go/codec"
|
||||||
|
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
)
|
)
|
||||||
|
@ -25,7 +25,7 @@ var logger = logging.Logger("monitor")
|
||||||
// PubsubTopic specifies the topic used to publish Cluster metrics.
|
// PubsubTopic specifies the topic used to publish Cluster metrics.
|
||||||
var PubsubTopic = "monitor.metrics"
|
var PubsubTopic = "monitor.metrics"
|
||||||
|
|
||||||
var msgpackHandle = msgpack.DefaultMsgpackHandle()
|
var msgpackHandle = &gocodec.MsgpackHandle{}
|
||||||
|
|
||||||
// Monitor is a component in charge of monitoring peers, logging
|
// Monitor is a component in charge of monitoring peers, logging
|
||||||
// metrics and detecting failures
|
// metrics and detecting failures
|
||||||
|
@ -129,7 +129,7 @@ func (mon *Monitor) logFromPubsub() {
|
||||||
|
|
||||||
data := msg.GetData()
|
data := msg.GetData()
|
||||||
buf := bytes.NewBuffer(data)
|
buf := bytes.NewBuffer(data)
|
||||||
dec := msgpack.Multicodec(msgpackHandle).Decoder(buf)
|
dec := gocodec.NewDecoder(buf, msgpackHandle)
|
||||||
metric := api.Metric{}
|
metric := api.Metric{}
|
||||||
err = dec.Decode(&metric)
|
err = dec.Decode(&metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -203,7 +203,7 @@ func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error {
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
|
|
||||||
enc := msgpack.Multicodec(msgpackHandle).Encoder(&b)
|
enc := gocodec.NewEncoder(&b, msgpackHandle)
|
||||||
err := enc.Encode(m)
|
err := enc.Encode(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
|
|
|
@ -303,7 +303,7 @@ func TestPeerMonitorAlerts(t *testing.T) {
|
||||||
case <-timeout.C:
|
case <-timeout.C:
|
||||||
t.Fatal("should have thrown an alert by now")
|
t.Fatal("should have thrown an alert by now")
|
||||||
case alrt := <-pm.Alerts():
|
case alrt := <-pm.Alerts():
|
||||||
if alrt.MetricName != "test" {
|
if alrt.Name != "test" {
|
||||||
t.Error("Alert should be for test")
|
t.Error("Alert should be for test")
|
||||||
}
|
}
|
||||||
if alrt.Peer != test.PeerID1 {
|
if alrt.Peer != test.PeerID1 {
|
||||||
|
|
|
@ -22,7 +22,7 @@ if [[ "$version" == *"-next" ]]; then
|
||||||
exit 0
|
exit 0
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# RC versions, commit and make a non-annotated tag.
|
# RC versions, commit and make a tag without history.
|
||||||
if [[ "$version" == *"-rc"* ]]; then
|
if [[ "$version" == *"-rc"* ]]; then
|
||||||
git commit -S -m "Release candidate v${version}"
|
git commit -S -m "Release candidate v${version}"
|
||||||
git tag -s "v${version}"
|
git tag -s "v${version}"
|
||||||
|
@ -32,7 +32,7 @@ fi
|
||||||
# Actual releases, commit and make an annotated tag with all the commits
|
# Actual releases, commit and make an annotated tag with all the commits
|
||||||
# since the last.
|
# since the last.
|
||||||
git commit -S -m "Release v${version}"
|
git commit -S -m "Release v${version}"
|
||||||
lastver=`git describe --abrev=0`
|
lastver=`git describe --abbrev=0 --exclude '*rc*'`
|
||||||
echo "Tag for Release ${version}" > tag_annotation
|
echo "Tag for Release ${version}" > tag_annotation
|
||||||
echo >> tag_annotation
|
echo >> tag_annotation
|
||||||
git log --pretty=oneline ${lastver}..HEAD >> tag_annotation
|
git log --pretty=oneline ${lastver}..HEAD >> tag_annotation
|
||||||
|
|
|
@ -426,6 +426,13 @@ func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Alerts runs Cluster.Alerts().
|
||||||
|
func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *[]api.Alert) error {
|
||||||
|
alerts := rpcapi.c.Alerts()
|
||||||
|
*out = alerts
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Tracker component methods
|
Tracker component methods
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -26,6 +26,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
|
||||||
"Cluster.RepoGCLocal": RPCTrusted,
|
"Cluster.RepoGCLocal": RPCTrusted,
|
||||||
"Cluster.SendInformerMetric": RPCClosed,
|
"Cluster.SendInformerMetric": RPCClosed,
|
||||||
"Cluster.SendInformersMetrics": RPCClosed,
|
"Cluster.SendInformersMetrics": RPCClosed,
|
||||||
|
"Cluster.Alerts": RPCClosed,
|
||||||
"Cluster.Status": RPCClosed,
|
"Cluster.Status": RPCClosed,
|
||||||
"Cluster.StatusAll": RPCClosed,
|
"Cluster.StatusAll": RPCClosed,
|
||||||
"Cluster.StatusAllLocal": RPCClosed,
|
"Cluster.StatusAllLocal": RPCClosed,
|
||||||
|
|
|
@ -25,6 +25,10 @@ test_expect_success IPFS,CLUSTER "list latest metrics logged by this peer" '
|
||||||
ipfs-cluster-ctl health metrics freespace | grep -q -E "(^$pid \| freespace: [0-9]+ (G|M|K)B \| Expires in: [0-9]+ seconds from now)"
|
ipfs-cluster-ctl health metrics freespace | grep -q -E "(^$pid \| freespace: [0-9]+ (G|M|K)B \| Expires in: [0-9]+ seconds from now)"
|
||||||
'
|
'
|
||||||
|
|
||||||
|
test_expect_success IPFS,CLUSTER "alerts must succeed" '
|
||||||
|
ipfs-cluster-ctl health alerts
|
||||||
|
'
|
||||||
|
|
||||||
test_clean_ipfs
|
test_clean_ipfs
|
||||||
test_clean_cluster
|
test_clean_cluster
|
||||||
|
|
||||||
|
|
|
@ -341,6 +341,22 @@ func (mock *mockCluster) SendInformerMetric(ctx context.Context, in struct{}, ou
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *[]api.Alert) error {
|
||||||
|
*out = []api.Alert{
|
||||||
|
api.Alert{
|
||||||
|
Metric: api.Metric{
|
||||||
|
Name: "ping",
|
||||||
|
Peer: PeerID2,
|
||||||
|
Expire: time.Now().Add(-30 * time.Second).UnixNano(),
|
||||||
|
Valid: true,
|
||||||
|
ReceivedAt: time.Now().Add(-60 * time.Second).UnixNano(),
|
||||||
|
},
|
||||||
|
TriggeredAt: time.Now(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
/* Tracker methods */
|
/* Tracker methods */
|
||||||
|
|
||||||
func (mock *mockPinTracker) Track(ctx context.Context, in *api.Pin, out *struct{}) error {
|
func (mock *mockPinTracker) Track(ctx context.Context, in *api.Pin, out *struct{}) error {
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Version is the current cluster version.
|
// Version is the current cluster version.
|
||||||
var Version = semver.MustParse("0.13.0-next")
|
var Version = semver.MustParse("0.13.1-next")
|
||||||
|
|
||||||
// RPCProtocol is protocol handler used to send libp2p-rpc messages between
|
// RPCProtocol is protocol handler used to send libp2p-rpc messages between
|
||||||
// cluster peers. All peers in the cluster need to speak the same protocol
|
// cluster peers. All peers in the cluster need to speak the same protocol
|
||||||
|
|
Loading…
Reference in New Issue
Block a user