Monitor: remove accrual detection. Add LatestForPeer method.

Fixes #939
This commit is contained in:
Hector Sanjuan 2022-01-28 16:38:29 +01:00
parent ed348f29c1
commit d4591b8442
8 changed files with 33 additions and 779 deletions

View File

@ -175,10 +175,12 @@ type PeerMonitor interface {
// PublishMetric sends a metric to the rest of the peers. // PublishMetric sends a metric to the rest of the peers.
// How to send it, and to who, is to be decided by the implementation. // How to send it, and to who, is to be decided by the implementation.
PublishMetric(context.Context, *api.Metric) error PublishMetric(context.Context, *api.Metric) error
// LatestMetrics returns a map with the latest metrics of matching // LatestMetrics returns a map with the latest valid metrics of matching
// name for the current cluster peers. The result should only contain // name for the current cluster peers. The result should only contain
// one metric per peer at most. // one metric per peer at most.
LatestMetrics(ctx context.Context, name string) []*api.Metric LatestMetrics(ctx context.Context, name string) []*api.Metric
// Returns the latest metric received from a peer. It may be expired.
LatestForPeer(ctx context.Context, name string, pid peer.ID) *api.Metric
// MetricNames returns a list of metric names. // MetricNames returns a list of metric names.
MetricNames(ctx context.Context) []string MetricNames(ctx context.Context) []string
// Alerts delivers alerts generated when this peer monitor detects // Alerts delivers alerts generated when this peer monitor detects

View File

@ -25,18 +25,12 @@ var MaxAlertThreshold = 1
// ErrAlertChannelFull is returned if the alert channel is full. // ErrAlertChannelFull is returned if the alert channel is full.
var ErrAlertChannelFull = errors.New("alert channel is full") var ErrAlertChannelFull = errors.New("alert channel is full")
// accrualMetricsNum represents the number metrics required for
// accrual to function appropriately, and under which we use
// TTL to determine whether a peer may have failed.
var accrualMetricsNum = 6
// Checker provides utilities to find expired metrics // Checker provides utilities to find expired metrics
// for a given peerset and send alerts if it proceeds to do so. // for a given peerset and send alerts if it proceeds to do so.
type Checker struct { type Checker struct {
ctx context.Context ctx context.Context
alertCh chan *api.Alert alertCh chan *api.Alert
metrics *Store metrics *Store
threshold float64
failedPeersMu sync.Mutex failedPeersMu sync.Mutex
failedPeers map[peer.ID]map[string]int failedPeers map[peer.ID]map[string]int
@ -48,12 +42,11 @@ type Checker struct {
// The greater the threshold value the more leniency is granted. // The greater the threshold value the more leniency is granted.
// //
// A value between 2.0 and 4.0 is suggested for the threshold. // A value between 2.0 and 4.0 is suggested for the threshold.
func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker { func NewChecker(ctx context.Context, metrics *Store) *Checker {
return &Checker{ return &Checker{
ctx: ctx, ctx: ctx,
alertCh: make(chan *api.Alert, AlertChannelCap), alertCh: make(chan *api.Alert, AlertChannelCap),
metrics: metrics, metrics: metrics,
threshold: threshold,
failedPeers: make(map[peer.ID]map[string]int), failedPeers: make(map[peer.ID]map[string]int),
} }
} }
@ -168,36 +161,6 @@ func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]pe
// FailedMetric returns if a peer is marked as failed for a particular metric. // FailedMetric returns if a peer is marked as failed for a particular metric.
func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool { func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
_, _, _, result := mc.failed(metric, pid)
return result
}
// failed returns all the values involved in making the decision
// as to whether a peer has failed or not. The debugging parameter
// enables a more computation heavy path of the function but
// allows for insight into the return phi value.
func (mc *Checker) failed(metric string, pid peer.ID) (float64, []float64, float64, bool) {
latest := mc.metrics.PeerLatest(metric, pid) latest := mc.metrics.PeerLatest(metric, pid)
if latest == nil { return latest.Expired()
return 0.0, nil, 0.0, true
}
// A peer is never failed if the latest metric from it has
// not expired or we do not have enough number of metrics
// for accrual detection
if !latest.Expired() {
return 0.0, nil, 0.0, false
}
// The latest metric has expired
pmtrs := mc.metrics.PeerMetricAll(metric, pid)
// Not enough values for accrual and metric expired. Peer failed.
if len(pmtrs) < accrualMetricsNum {
return 0.0, nil, 0.0, true
}
v := time.Now().UnixNano() - latest.ReceivedAt
dv := mc.metrics.Distribution(metric, pid)
phiv := phi(float64(v), dv)
return float64(v), dv, phiv, phiv >= mc.threshold
} }

View File

@ -2,8 +2,6 @@ package metrics
import ( import (
"context" "context"
"fmt"
"math"
"testing" "testing"
"time" "time"
@ -11,17 +9,12 @@ import (
"github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
"gonum.org/v1/plot"
"gonum.org/v1/plot/plotter"
"gonum.org/v1/plot/plotutil"
"gonum.org/v1/plot/vg"
) )
func TestChecker_CheckPeers(t *testing.T) { func TestChecker_CheckPeers(t *testing.T) {
t.Run("check with single metric", func(t *testing.T) { t.Run("check with single metric", func(t *testing.T) {
metrics := NewStore() metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 2.0) checker := NewChecker(context.Background(), metrics)
metr := &api.Metric{ metr := &api.Metric{
Name: "ping", Name: "ping",
@ -64,7 +57,7 @@ func TestChecker_CheckPeers(t *testing.T) {
func TestChecker_CheckAll(t *testing.T) { func TestChecker_CheckAll(t *testing.T) {
t.Run("checkall with single metric", func(t *testing.T) { t.Run("checkall with single metric", func(t *testing.T) {
metrics := NewStore() metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 2.0) checker := NewChecker(context.Background(), metrics)
metr := &api.Metric{ metr := &api.Metric{
Name: "ping", Name: "ping",
@ -109,7 +102,7 @@ func TestChecker_Watch(t *testing.T) {
defer cancel() defer cancel()
metrics := NewStore() metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 2.0) checker := NewChecker(context.Background(), metrics)
metr := &api.Metric{ metr := &api.Metric{
Name: "ping", Name: "ping",
@ -137,45 +130,18 @@ func TestChecker_Watch(t *testing.T) {
func TestChecker_Failed(t *testing.T) { func TestChecker_Failed(t *testing.T) {
t.Run("standard failure check", func(t *testing.T) { t.Run("standard failure check", func(t *testing.T) {
metrics := NewStore() metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 2.0) checker := NewChecker(context.Background(), metrics)
for i := 0; i < 10; i++ { metrics.Add(makePeerMetric(test.PeerID1, "1", 100*time.Millisecond))
metrics.Add(makePeerMetric(test.PeerID1, "1", 3*time.Millisecond)) time.Sleep(50 * time.Millisecond)
time.Sleep(time.Duration(2) * time.Millisecond) got := checker.FailedMetric("ping", test.PeerID1)
if got {
t.Error("should not have failed so soon")
} }
for i := 0; i < 10; i++ { time.Sleep(100 * time.Millisecond)
metrics.Add(makePeerMetric(test.PeerID1, "1", 3*time.Millisecond)) got = checker.FailedMetric("ping", test.PeerID1)
got := checker.FailedMetric("ping", test.PeerID1) if !got {
// the magic number 17 represents the point at which t.Error("should have failed")
// the time between metrics addition has gotten
// so large that the probability that the service
// has failed goes over the threshold.
if i >= 17 && !got {
t.Fatal("threshold should have been passed by now")
}
time.Sleep(time.Duration(i) * time.Millisecond)
}
})
t.Run("ttl must expire before phiv causes failure", func(t *testing.T) {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 0.05)
for i := 0; i < 10; i++ {
metrics.Add(makePeerMetric(test.PeerID1, "1", 10*time.Millisecond))
time.Sleep(time.Duration(200) * time.Millisecond)
}
for i, j := 0, 10; i < 8; i, j = i+1, j*2 {
time.Sleep(time.Duration(j) * time.Millisecond)
metrics.Add(makePeerMetric(test.PeerID1, "1", 10*time.Millisecond))
v, _, phiv, got := checker.failed("ping", test.PeerID1)
t.Logf("i: %d: j: %d v: %f, phiv: %f, got: %v\n", i, j, v, phiv, got)
if i < 7 && got {
t.Fatal("threshold should not have been reached already")
}
if i >= 10 && !got {
t.Fatal("threshold should have been reached by now")
}
} }
}) })
} }
@ -186,7 +152,7 @@ func TestChecker_alert(t *testing.T) {
defer cancel() defer cancel()
metrics := NewStore() metrics := NewStore()
checker := NewChecker(ctx, metrics, 2.0) checker := NewChecker(ctx, metrics)
metr := &api.Metric{ metr := &api.Metric{
Name: "ping", Name: "ping",
@ -222,199 +188,6 @@ func TestChecker_alert(t *testing.T) {
}) })
} }
//////////////////
// HELPER TESTS //
//////////////////
func TestThresholdValues(t *testing.T) {
t.Log("TestThresholdValues is useful for testing out different threshold values")
t.Log("It doesn't actually perform any 'tests', so it is skipped by default")
t.SkipNow()
thresholds := []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.8, 0.9, 1.3, 1.4, 1.5, 1.8, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0, 20.0}
t.Run("linear threshold test", func(t *testing.T) {
dists := make([]timeseries, 0)
phivs := make([]timeseries, 0)
for _, v := range thresholds {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, v)
tsName := fmt.Sprintf("%f", v)
distTS := newTS(tsName)
phivTS := newTS(tsName)
var failed bool
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
fmt.Printf("threshold: %f\n", v)
fmt.Printf("latest: %f\n", inputv)
fmt.Printf("distribution: %v\n", dist)
}
phivTS.record(phiv)
failed = got
if failed {
distTS.record(float64(i))
}
return got
}
// set baseline static distribution of ping intervals
for i := 0; i < 10; i++ {
check(i)
distTS.record(float64(10))
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
time.Sleep(time.Duration(10) * time.Millisecond)
}
// start linearly increasing the interval values
for i := 10; i < 100 && !check(i); i++ {
distTS.record(float64(i))
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
time.Sleep(time.Duration(i) * time.Millisecond)
}
if failed {
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
continue
}
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
// if a threshold has made it to this point, all greater
// thresholds will make it here so print the threshold and skip
t.Log("threshold that was not meet by linear increase: ", v)
break
}
plotDistsPhivs("linear_", dists, phivs)
})
t.Run("cubic threshold test", func(t *testing.T) {
dists := make([]timeseries, 0)
phivs := make([]timeseries, 0)
for _, v := range thresholds {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, v)
tsName := fmt.Sprintf("%f", v)
distTS := newTS(tsName)
phivTS := newTS(tsName)
var failed bool
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
fmt.Printf("threshold: %f\n", v)
fmt.Printf("latest: %f\n", inputv)
fmt.Printf("distribution: %v\n", dist)
}
phivTS.record(phiv)
failed = got
if failed {
diff := math.Pow(float64(i), float64(i))
distTS.record(diff)
}
return got
}
// set baseline static distribution of ping intervals
for i := 0; i < 10; i++ {
check(i)
distTS.record(float64(8))
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
time.Sleep(time.Duration(8) * time.Millisecond)
}
for i := 2; !check(i) && i < 20; i++ {
diff := math.Pow(float64(i), 3)
distTS.record(diff)
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
time.Sleep(time.Duration(diff) * time.Millisecond)
}
if failed {
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
continue
}
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
// if a threshold has made it to this point, all greater
// thresholds will make it here so print the threshold and skip
t.Log("threshold that was not meet by cubic increase: ", v)
break
}
plotDistsPhivs("cubic_", dists, phivs)
})
t.Run("14x threshold test", func(t *testing.T) {
dists := make([]timeseries, 0)
phivs := make([]timeseries, 0)
for _, v := range thresholds {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, v)
tsName := fmt.Sprintf("%f", v)
distTS := newTS(tsName)
phivTS := newTS(tsName)
var failed bool
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
fmt.Printf("threshold: %f\n", v)
fmt.Printf("latest: %f\n", inputv)
fmt.Printf("distribution: %v\n", dist)
}
phivTS.record(phiv)
failed = got
if failed {
diff := i * 50
distTS.record(float64(diff))
}
return got
}
for i := 0; i < 10; i++ {
check(i)
distTS.record(float64(i))
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
time.Sleep(time.Duration(i) * time.Millisecond)
}
for i := 10; !check(i) && i < 30; i++ {
diff := i * 50
distTS.record(float64(diff))
metrics.Add(makePeerMetric(test.PeerID1, "1", 1*time.Second))
time.Sleep(time.Duration(diff) * time.Millisecond)
}
if failed {
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
continue
}
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
// if a threshold has made it to this point, all greater
// thresholds will make it here so print the threshold and skip
t.Log("threshold that was not meet by 14x increase: ", v)
break
}
plotDistsPhivs("14x_", dists, phivs)
})
}
func makePeerMetric(pid peer.ID, value string, ttl time.Duration) *api.Metric { func makePeerMetric(pid peer.ID, value string, ttl time.Duration) *api.Metric {
metr := &api.Metric{ metr := &api.Metric{
Name: "ping", Name: "ping",
@ -425,75 +198,3 @@ func makePeerMetric(pid peer.ID, value string, ttl time.Duration) *api.Metric {
metr.SetTTL(ttl) metr.SetTTL(ttl)
return metr return metr
} }
type timeseries struct {
name string
values []float64
}
func newTS(name string) *timeseries {
v := make([]float64, 0)
return &timeseries{name: name, values: v}
}
func (ts *timeseries) record(v float64) {
ts.values = append(ts.values, v)
}
func (ts *timeseries) ToXYs() plotter.XYs {
pts := make(plotter.XYs, len(ts.values))
for i, v := range ts.values {
pts[i].X = float64(i)
if math.IsInf(v, 0) {
pts[i].Y = -5
} else {
pts[i].Y = v
}
}
return pts
}
func toPlotLines(ts []timeseries) ([]string, [][]plot.Plotter) {
labels := make([]string, 0)
plotters := make([][]plot.Plotter, 0)
for i, t := range ts {
l, s, err := plotter.NewLinePoints(t.ToXYs())
if err != nil {
panic(err)
}
l.Width = vg.Points(2)
l.Color = plotutil.Color(i)
l.Dashes = plotutil.Dashes(i)
s.Color = plotutil.Color(i)
s.Shape = plotutil.Shape(i)
labels = append(labels, t.name)
plotters = append(plotters, []plot.Plotter{l, s})
}
return labels, plotters
}
func plotDistsPhivs(prefix string, dists, phivs []timeseries) {
plotTS(prefix+"dists", dists)
plotTS(prefix+"phivs", phivs)
}
func plotTS(name string, ts []timeseries) {
p, err := plot.New()
if err != nil {
panic(err)
}
p.Title.Text = name
p.Add(plotter.NewGrid())
labels, plotters := toPlotLines(ts)
for i := range labels {
l, pts := plotters[i][0], plotters[i][1]
p.Add(l, pts)
p.Legend.Add(labels[i], l.(*plotter.Line), pts.(*plotter.Scatter))
}
if err := p.Save(20*vg.Inch, 15*vg.Inch, name+".png"); err != nil {
panic(err)
}
}

View File

@ -1,80 +0,0 @@
/*
Copyright (©) 2015 Timothée Peignier <timothee.peignier@tryphon.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
Reference of what was originally copied: https://ipfs.io/ipfs/QmeJDSL6g6u4NSuzUqRxZWhyKPJ6wJAqieQpqX5eXPCjj5
*/
package metrics
import (
"math"
"gonum.org/v1/gonum/floats"
)
// phi returns the φ-failure for the given value and distribution.
// Two edge cases that are dealt with in phi:
// 1. phi == math.+Inf
// 2. phi == math.NaN
//
// Edge case 1. is most certainly a failure, the value of v is is so large
// in comparison to the distribution that the cdf function returns a 1,
// which equates to a math.Log10(0) which is one of its special cases, i.e
// returns -Inf. In this case, phi() will return the math.+Inf value, as it
// will be a valid comparison against the threshold value in checker.Failed.
//
// Edge case 2. could be a failure but may not be. phi() will return NaN
// when the standard deviation of the distribution is 0, i.e. the entire
// distribution is the same number, {1,1,1,1,1}. Considering that we are
// using UnixNano timestamps this would be highly unlikely, but just in case
// phi() will return a -1 value, indicating that the caller should retry.
func phi(v float64, d []float64) float64 {
u, o := meanStdDev(d)
phi := -math.Log10(1 - cdf(u, o, v))
if math.IsNaN(phi) {
return -1
}
return phi
}
// cdf returns the cumulative distribution function if the given
// normal function, for the given value.
func cdf(u, o, v float64) float64 {
return ((1.0 / 2.0) * (1 + math.Erf((v-u)/(o*math.Sqrt2))))
}
func meanStdDev(v []float64) (m, sd float64) {
var variance float64
m, variance = meanVariance(v)
sd = math.Sqrt(variance)
return
}
func meanVariance(values []float64) (m, v float64) {
if len(values) == 0 {
return 0.0, 0.0
}
m = floats.Sum(values) / float64(len(values))
floats.AddConst(-m, values)
floats.Mul(values, values)
v = floats.Sum(values) / float64(len(values))
return
}

View File

@ -1,317 +0,0 @@
package metrics
import (
"math"
"math/rand"
"testing"
"time"
)
// NOTE: Test_phi and Test_cdf contain float64 want values that are 'precise',
// they look like golden test data, they ARE NOT. They have been calculated
// using Wolfram Alpha. The following three links provide examples of calculating
// the phi value:
// - standardDeviation: https://www.wolframalpha.com/input/?i=population+standard+deviation+-2,+-4,+-4,+-4,+-5,+-5,+-7,+-9
// - mean: https://www.wolframalpha.com/input/?i=mean+-2,+-4,+-4,+-4,+-5,+-5,+-7,+-9
// - cdf: https://www.wolframalpha.com/input/?i=(((1.0+%2F+2.0)+*+(1+%2B+Erf((-4--5)%2F(2*Sqrt2)))))
// - phi: https://www.wolframalpha.com/input/?i=-log10(1+-+0.691462461274013103637704610608337739883602175554577936)
//
// Output from the each calculation needs to copy-pasted over. Look at the phi source code
// to understand where each variable should go in the cdf calculation.
func Test_phi(t *testing.T) {
type args struct {
v float64
d []float64
}
tests := []struct {
name string
args args
want float64
}{
{
"infinity edge case",
args{
10,
[]float64{0, 0, 0, 0, 1, 1, 1, 1, 2, 2},
},
math.Inf(1),
},
{
"NaN edge case",
args{10000001, []float64{10000001, 10000001}},
-1, // phi() replaces math.NaN with -1 so it is still comparable
},
{
"increasing values",
args{
4,
[]float64{2, 4, 4, 4, 5, 5, 7, 9},
},
0.160231392277849,
},
{
"decreasing values",
args{
-4,
[]float64{-2, -4, -4, -4, -5, -5, -7, -9},
},
0.5106919892652407,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := phi(tt.args.v, tt.args.d)
if got != tt.want && !math.IsNaN(got) {
t.Errorf("phi() = %v, want %v", got, tt.want)
}
})
}
}
func Test_cdf(t *testing.T) {
type args struct {
values []float64
v float64
}
tests := []struct {
name string
args args
want float64
}{
{
"zero values",
args{[]float64{0}, 0},
math.NaN(),
},
{
"increasing values",
args{
[]float64{2, 4, 4, 4, 5, 5, 7, 9},
4,
},
0.3085375387259869,
},
{
"decreasing values",
args{
[]float64{-2, -4, -4, -4, -5, -5, -7, -9},
-4,
},
0.6914624612740131,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, sd := meanStdDev(tt.args.values)
got := cdf(m, sd, tt.args.v)
if got != tt.want && !math.IsNaN(got) {
t.Errorf("cdf() = %v, want %v", got, tt.want)
}
})
}
}
func Test_meanVariance(t *testing.T) {
type args struct {
values []float64
}
tests := []struct {
name string
args args
wantMean float64
wantVariance float64
}{
{
"zero values",
args{[]float64{}},
0,
0,
},
{
"increasing values",
args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}},
5,
4,
},
{
"decreasing values",
args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}},
-5,
4,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, v := meanVariance(tt.args.values)
if m != tt.wantMean {
t.Errorf("mean() = %v, want %v", m, tt.wantMean)
}
if v != tt.wantVariance {
t.Errorf("variance() = %v, want %v", v, tt.wantVariance)
}
})
}
}
func Test_meanStdDev(t *testing.T) {
type args struct {
v []float64
}
tests := []struct {
name string
args args
want float64
}{
{
"zero values",
args{[]float64{}},
0,
},
{
"increasing values",
args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}},
2,
},
{
"decreasing values",
args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}},
2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// ignore mean value as it was tested in Test_meanVariance
// it is the same underlying implementation.
if _, got := meanStdDev(tt.args.v); got != tt.want {
t.Errorf("standardDeviation() = %v, want %v", got, tt.want)
}
})
}
}
func Benchmark_prob_phi(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
phi(v, d)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
phi(v, d)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
phi(v, d)
}
})
}
func Benchmark_prob_cdf(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
cdf(u, o, v)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
cdf(u, o, v)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
cdf(u, o, v)
}
})
}
func Benchmark_prob_meanVariance(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanVariance(d)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanVariance(d)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanVariance(d)
}
})
}
func Benchmark_prob_meanStdDev(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanStdDev(d)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanStdDev(d)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanStdDev(d)
}
})
}
func makeRandSlice(size int) []float64 {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
s := make([]float64, size)
for i := 0; i < size-1; i++ {
s[i] = float64(r.Int63n(25)) + r.Float64()
}
return s
}

View File

@ -14,8 +14,7 @@ const envConfigKey = "cluster_pubsubmon"
// Default values for this Config. // Default values for this Config.
const ( const (
DefaultCheckInterval = 15 * time.Second DefaultCheckInterval = 15 * time.Second
DefaultFailureThreshold = 3.0
) )
// Config allows to initialize a Monitor and customize some parameters. // Config allows to initialize a Monitor and customize some parameters.
@ -23,15 +22,10 @@ type Config struct {
config.Saver config.Saver
CheckInterval time.Duration CheckInterval time.Duration
// FailureThreshold indicates when a peer should be considered failed.
// The greater the threshold value the more leniency is granted.
// A value between 2.0 and 4.0 is suggested for the threshold.
FailureThreshold float64
} }
type jsonConfig struct { type jsonConfig struct {
CheckInterval string `json:"check_interval"` CheckInterval string `json:"check_interval"`
FailureThreshold *float64 `json:"failure_threshold"`
} }
// ConfigKey provides a human-friendly identifier for this type of Config. // ConfigKey provides a human-friendly identifier for this type of Config.
@ -42,7 +36,6 @@ func (cfg *Config) ConfigKey() string {
// Default sets the fields of this Config to sensible values. // Default sets the fields of this Config to sensible values.
func (cfg *Config) Default() error { func (cfg *Config) Default() error {
cfg.CheckInterval = DefaultCheckInterval cfg.CheckInterval = DefaultCheckInterval
cfg.FailureThreshold = DefaultFailureThreshold
return nil return nil
} }
@ -66,10 +59,6 @@ func (cfg *Config) Validate() error {
return errors.New("pubsubmon.check_interval too low") return errors.New("pubsubmon.check_interval too low")
} }
if cfg.FailureThreshold <= 0 {
return errors.New("pubsubmon.failure_threshold too low")
}
return nil return nil
} }
@ -91,9 +80,6 @@ func (cfg *Config) LoadJSON(raw []byte) error {
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
interval, _ := time.ParseDuration(jcfg.CheckInterval) interval, _ := time.ParseDuration(jcfg.CheckInterval)
cfg.CheckInterval = interval cfg.CheckInterval = interval
if jcfg.FailureThreshold != nil {
cfg.FailureThreshold = *jcfg.FailureThreshold
}
return cfg.Validate() return cfg.Validate()
} }
@ -107,8 +93,7 @@ func (cfg *Config) ToJSON() ([]byte, error) {
func (cfg *Config) toJSONConfig() *jsonConfig { func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{ return &jsonConfig{
CheckInterval: cfg.CheckInterval.String(), CheckInterval: cfg.CheckInterval.String(),
FailureThreshold: &cfg.FailureThreshold,
} }
} }

View File

@ -9,8 +9,7 @@ import (
var cfgJSON = []byte(` var cfgJSON = []byte(`
{ {
"check_interval": "15s", "check_interval": "15s"
"failure_threshold": 3.0
} }
`) `)
@ -54,7 +53,6 @@ func TestDefault(t *testing.T) {
} }
cfg.CheckInterval = 0 cfg.CheckInterval = 0
cfg.FailureThreshold = -0.1
if cfg.Validate() == nil { if cfg.Validate() == nil {
t.Fatal("expected error validating") t.Fatal("expected error validating")
} }
@ -62,14 +60,10 @@ func TestDefault(t *testing.T) {
func TestApplyEnvVars(t *testing.T) { func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_PUBSUBMON_CHECKINTERVAL", "22s") os.Setenv("CLUSTER_PUBSUBMON_CHECKINTERVAL", "22s")
os.Setenv("CLUSTER_PUBSUBMON_FAILURETHRESHOLD", "4.0")
cfg := &Config{} cfg := &Config{}
cfg.ApplyEnvVars() cfg.ApplyEnvVars()
if cfg.CheckInterval != 22*time.Second { if cfg.CheckInterval != 22*time.Second {
t.Fatal("failed to override check_interval with env var") t.Fatal("failed to override check_interval with env var")
} }
if cfg.FailureThreshold != 4.0 {
t.Fatal("failed to override failure_threshold with env var")
}
} }

View File

@ -72,7 +72,7 @@ func New(
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
mtrs := metrics.NewStore() mtrs := metrics.NewStore()
checker := metrics.NewChecker(ctx, mtrs, cfg.FailureThreshold) checker := metrics.NewChecker(ctx, mtrs)
topic, err := psub.Join(PubsubTopic) topic, err := psub.Join(PubsubTopic)
if err != nil { if err != nil {
@ -261,6 +261,12 @@ func (mon *Monitor) LatestMetrics(ctx context.Context, name string) []*api.Metri
return metrics.PeersetFilter(latest, peers) return metrics.PeersetFilter(latest, peers)
} }
// LatestForPeer returns the latest metric received for a peer (it may have
// expired). It returns nil if no metric exists.
func (mon *Monitor) LatestForPeer(ctx context.Context, name string, pid peer.ID) *api.Metric {
return mon.metrics.PeerLatest(name, pid)
}
// Alerts returns a channel on which alerts are sent when the // Alerts returns a channel on which alerts are sent when the
// monitor detects a failure. // monitor detects a failure.
func (mon *Monitor) Alerts() <-chan *api.Alert { func (mon *Monitor) Alerts() <-chan *api.Alert {