diff --git a/go.mod b/go.mod index 0e8c9ea9..008f2af3 100644 --- a/go.mod +++ b/go.mod @@ -87,5 +87,6 @@ require ( golang.org/x/sync v0.0.0-20190412183630-56d357773e84 // indirect golang.org/x/sys v0.0.0-20190416152802-12500544f89f // indirect golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 // indirect + gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a google.golang.org/grpc v1.19.1 // indirect ) diff --git a/go.sum b/go.sum index a2f537f9..f5bc3fba 100644 --- a/go.sum +++ b/go.sum @@ -568,6 +568,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49 h1:mE9V9RMa141kxdQR3pfZM3mkg0MPyw+FOPpnciBXkbE= golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2 h1:y102fOLFqhV41b+4GPiJoa0k/x+pJcEi2/HB1Y5T6fU= +golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -625,6 +627,7 @@ golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c h1:vamGzbGri8IKo20MQncCuljcQ5uAO6kaCeawQPVblAI= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -633,6 +636,10 @@ golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3 h1:P6iTFmrTQqWrqLZPX1VMz golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 h1:PPwnA7z1Pjf7XYaBP9GL1VAMZmcIWyFz7QCMSIIa3Bg= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a h1:XffIu/i+IJIC+M8WoBEmJm8N/YYbA8Pvh748YgzU7kI= +gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0= +gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc= +gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= google.golang.org/api v0.0.0-20181220000619-583d854617af h1:iQMS7JKv/0w/iiWf1M49Cg3dmOkBoBZT5KheqPDpaac= google.golang.org/api v0.0.0-20181220000619-583d854617af/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.3.1 h1:oJra/lMfmtm13/rgY/8i3MzjFWYXvQIAKjQ3HqofMk8= diff --git a/monitor/metrics/checker_test.go b/monitor/metrics/checker_test.go index bbf8c162..cb62147d 100644 --- a/monitor/metrics/checker_test.go +++ b/monitor/metrics/checker_test.go @@ -83,21 +83,24 @@ func TestChecker_Watch(t *testing.T) { } func TestChecker_Failed(t *testing.T) { - metrics := NewStore() - checker := NewChecker(metrics, 2.0) + t.Run("standard failure check", func(t *testing.T) { + metrics := NewStore() + checker := NewChecker(metrics, 2.0) - for i := 0; i < 10; i++ { - metrics.Add(makePeerMetric(test.PeerID1, "1")) - time.Sleep(time.Duration(2) * time.Second) - } - for i := 0; i < 10; i++ { - metrics.Add(makePeerMetric(test.PeerID1, "1")) - time.Sleep(time.Duration(500*i) * time.Millisecond) - got := checker.Failed(test.PeerID1) - if i >= 17 && !got { - t.Fatal("threshold should have been passed by now") + for i := 0; i < 10; i++ { + metrics.Add(makePeerMetric(test.PeerID1, "1")) + time.Sleep(time.Duration(2) * time.Second) } - } + for i := 0; i < 10; i++ { + metrics.Add(makePeerMetric(test.PeerID1, "1")) + time.Sleep(time.Duration(500*i) * time.Millisecond) + got := checker.Failed(test.PeerID1) + // TODO(lanzafame): explain magic number 17 + if i >= 17 && !got { + t.Fatal("threshold should have been passed by now") + } + } + }) } func makePeerMetric(pid peer.ID, value string) *api.Metric { diff --git a/monitor/metrics/prob.go b/monitor/metrics/prob.go index 49cb9f91..8368bb7c 100644 --- a/monitor/metrics/prob.go +++ b/monitor/metrics/prob.go @@ -24,16 +24,33 @@ package metrics import ( "math" + + "gonum.org/v1/gonum/floats" ) // Phi returns the φ-failure for the given value and distribution. -func phi(v float64, d []int64) float64 { - u := mean(d) - o := standardDeviation(d) - if phi := -math.Log10(1 - cdf(u, o, v)); !math.IsInf(phi, 1) { - return phi +// Two edge cases that are dealt with in phi: +// 1. phi == math.+Inf +// 2. phi == math.NaN +// +// Edge case 1. is most certainly a failure, the value of v is is so large +// in comparison to the distribution that the cdf function returns a 1, +// which equates to a math.Log10(0) which is one of its special cases, i.e +// returns -Inf. In this case, phi() will return the math.+Inf value, as it +// will be a valid comparison against the threshold value in checker.Failed. +// +// Edge case 2. could be a failure but may not be. phi() will return NaN +// when the standard deviation of the distribution is 0, i.e. the entire +// distribution is the same number, {1,1,1,1,1}. Considering that we are +// using UnixNano timestamps this would be highly unlikely, but just in case +// phi() will return a -1 value, indicating that the caller should retry. +func phi(v float64, d []float64) float64 { + u, o := meanStdDev(d) + phi := -math.Log10(1 - cdf(u, o, v)) + if math.IsNaN(phi) { + return -1 } - return 0 + return phi } // CDF returns the cumulative distribution function if the given @@ -42,34 +59,20 @@ func cdf(u, o, v float64) float64 { return ((1.0 / 2.0) * (1 + math.Erf((v-u)/(o*math.Sqrt2)))) } -// Mean returns the mean of the given sample. -func mean(values []int64) float64 { +func meanStdDev(v []float64) (m, sd float64) { + var variance float64 + m, variance = meanVariance(v) + sd = math.Sqrt(variance) + return +} + +func meanVariance(values []float64) (m, v float64) { if len(values) == 0 { - return 0.0 + return 0.0, 0.0 } - var sum int64 - for _, v := range values { - sum += v - } - - return float64(sum) / float64(len(values)) -} - -// StandardDeviation returns standard deviation of the given sample. -func standardDeviation(v []int64) float64 { - return math.Sqrt(variance(v)) -} - -// Variance returns variance if the given sample. -func variance(values []int64) float64 { - if len(values) == 0 { - return 0.0 - } - m := mean(values) - var sum float64 - for _, v := range values { - d := float64(v) - m - sum += d * d - } - return sum / float64(len(values)) + m = floats.Sum(values) / float64(len(values)) + floats.AddConst(-m, values) + floats.Mul(values, values) + v = floats.Sum(values) / float64(len(values)) + return } diff --git a/monitor/metrics/prob_test.go b/monitor/metrics/prob_test.go index bc2b838f..38c45838 100644 --- a/monitor/metrics/prob_test.go +++ b/monitor/metrics/prob_test.go @@ -21,7 +21,7 @@ import ( func Test_phi(t *testing.T) { type args struct { v float64 - d []int64 + d []float64 } tests := []struct { name string @@ -29,15 +29,23 @@ func Test_phi(t *testing.T) { want float64 }{ { - "zero values", - args{0, []int64{0}}, - math.NaN(), // won't actually be used in comparison; see math.IsNaN() def + "infinity edge case", + args{ + 10, + []float64{0, 0, 0, 0, 1, 1, 1, 1, 2, 2}, + }, + math.Inf(1), + }, + { + "NaN edge case", + args{10000001, []float64{10000001, 10000001}}, + -1, // phi() replaces math.NaN with -1 so it is still comparable }, { "increasing values", args{ 4, - []int64{2, 4, 4, 4, 5, 5, 7, 9}, + []float64{2, 4, 4, 4, 5, 5, 7, 9}, }, 0.160231392277849, }, @@ -45,7 +53,7 @@ func Test_phi(t *testing.T) { "decreasing values", args{ -4, - []int64{-2, -4, -4, -4, -5, -5, -7, -9}, + []float64{-2, -4, -4, -4, -5, -5, -7, -9}, }, 0.5106919892652407, }, @@ -62,7 +70,7 @@ func Test_phi(t *testing.T) { func Test_cdf(t *testing.T) { type args struct { - values []int64 + values []float64 v float64 } tests := []struct { @@ -72,13 +80,13 @@ func Test_cdf(t *testing.T) { }{ { "zero values", - args{[]int64{0}, 0}, + args{[]float64{0}, 0}, math.NaN(), }, { "increasing values", args{ - []int64{2, 4, 4, 4, 5, 5, 7, 9}, + []float64{2, 4, 4, 4, 5, 5, 7, 9}, 4, }, 0.3085375387259869, @@ -86,7 +94,7 @@ func Test_cdf(t *testing.T) { { "decreasing values", args{ - []int64{-2, -4, -4, -4, -5, -5, -7, -9}, + []float64{-2, -4, -4, -4, -5, -5, -7, -9}, -4, }, 0.6914624612740131, @@ -94,8 +102,7 @@ func Test_cdf(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := mean(tt.args.values) - sd := standardDeviation(tt.args.values) + m, sd := meanStdDev(tt.args.values) got := cdf(m, sd, tt.args.v) if got != tt.want && !math.IsNaN(got) { t.Errorf("cdf() = %v, want %v", got, tt.want) @@ -104,43 +111,51 @@ func Test_cdf(t *testing.T) { } } -func Test_mean(t *testing.T) { +func Test_meanVariance(t *testing.T) { type args struct { - values []int64 + values []float64 } tests := []struct { - name string - args args - want float64 + name string + args args + wantMean float64 + wantVariance float64 }{ { "zero values", - args{[]int64{}}, + args{[]float64{}}, + 0, 0, }, { "increasing values", - args{[]int64{2, 4, 4, 4, 5, 5, 7, 9}}, + args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}}, 5, + 4, }, { "decreasing values", - args{[]int64{-2, -4, -4, -4, -5, -5, -7, -9}}, + args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}}, -5, + 4, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := mean(tt.args.values); got != tt.want { - t.Errorf("mean() = %v, want %v", got, tt.want) + m, v := meanVariance(tt.args.values) + if m != tt.wantMean { + t.Errorf("mean() = %v, want %v", m, tt.wantMean) + } + if v != tt.wantVariance { + t.Errorf("variance() = %v, want %v", v, tt.wantVariance) } }) } } -func Test_standardDeviation(t *testing.T) { +func Test_meanStdDev(t *testing.T) { type args struct { - v []int64 + v []float64 } tests := []struct { name string @@ -149,63 +164,31 @@ func Test_standardDeviation(t *testing.T) { }{ { "zero values", - args{[]int64{}}, + args{[]float64{}}, 0, }, { "increasing values", - args{[]int64{2, 4, 4, 4, 5, 5, 7, 9}}, + args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}}, 2, }, { "decreasing values", - args{[]int64{-2, -4, -4, -4, -5, -5, -7, -9}}, + args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}}, 2, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := standardDeviation(tt.args.v); got != tt.want { + // ignore mean value as it was tested in Test_meanVariance + // it is the same underlying implementation. + if _, got := meanStdDev(tt.args.v); got != tt.want { t.Errorf("standardDeviation() = %v, want %v", got, tt.want) } }) } } -func Test_variance(t *testing.T) { - type args struct { - values []int64 - } - tests := []struct { - name string - args args - want float64 - }{ - { - "zero values", - args{[]int64{}}, - 0, - }, - { - "increasing values", - args{[]int64{2, 4, 4, 4, 5, 5, 7, 9}}, - 4, - }, - { - "decreasing values", - args{[]int64{-2, -4, -4, -4, -5, -5, -7, -9}}, - 4, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := variance(tt.args.values); got != tt.want { - t.Errorf("variance() = %.5f, want %v", got, tt.want) - } - }) - } -} - func Benchmark_prob_phi(b *testing.B) { r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -242,8 +225,7 @@ func Benchmark_prob_cdf(b *testing.B) { b.Run("distribution size 10", func(b *testing.B) { d := makeRandSlice(10) - u := mean(d) - o := standardDeviation(d) + u, o := meanStdDev(d) v := float64(r.Int63n(25)) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -253,8 +235,7 @@ func Benchmark_prob_cdf(b *testing.B) { b.Run("distribution size 50", func(b *testing.B) { d := makeRandSlice(50) - u := mean(d) - o := standardDeviation(d) + u, o := meanStdDev(d) v := float64(r.Int63n(25)) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -264,8 +245,7 @@ func Benchmark_prob_cdf(b *testing.B) { b.Run("distribution size 1000", func(b *testing.B) { d := makeRandSlice(1000) - u := mean(d) - o := standardDeviation(d) + u, o := meanStdDev(d) v := float64(r.Int63n(25)) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -274,12 +254,12 @@ func Benchmark_prob_cdf(b *testing.B) { }) } -func Benchmark_prob_mean(b *testing.B) { +func Benchmark_prob_meanVariance(b *testing.B) { b.Run("distribution size 10", func(b *testing.B) { d := makeRandSlice(10) b.ResetTimer() for i := 0; i < b.N; i++ { - mean(d) + meanVariance(d) } }) @@ -287,7 +267,7 @@ func Benchmark_prob_mean(b *testing.B) { d := makeRandSlice(50) b.ResetTimer() for i := 0; i < b.N; i++ { - mean(d) + meanVariance(d) } }) @@ -295,17 +275,17 @@ func Benchmark_prob_mean(b *testing.B) { d := makeRandSlice(1000) b.ResetTimer() for i := 0; i < b.N; i++ { - mean(d) + meanVariance(d) } }) } -func Benchmark_prob_standardDeviation(b *testing.B) { +func Benchmark_prob_meanStdDev(b *testing.B) { b.Run("distribution size 10", func(b *testing.B) { d := makeRandSlice(10) b.ResetTimer() for i := 0; i < b.N; i++ { - standardDeviation(d) + meanStdDev(d) } }) @@ -313,7 +293,7 @@ func Benchmark_prob_standardDeviation(b *testing.B) { d := makeRandSlice(50) b.ResetTimer() for i := 0; i < b.N; i++ { - standardDeviation(d) + meanStdDev(d) } }) @@ -321,48 +301,12 @@ func Benchmark_prob_standardDeviation(b *testing.B) { d := makeRandSlice(1000) b.ResetTimer() for i := 0; i < b.N; i++ { - standardDeviation(d) + meanStdDev(d) } }) } -func Benchmark_prob_variance(b *testing.B) { - b.Run("distribution size 10", func(b *testing.B) { - d := makeRandSlice(10) - b.ResetTimer() - for i := 0; i < b.N; i++ { - variance(d) - } - }) - - b.Run("distribution size 50", func(b *testing.B) { - d := makeRandSlice(50) - b.ResetTimer() - for i := 0; i < b.N; i++ { - variance(d) - } - }) - - b.Run("distribution size 1000", func(b *testing.B) { - d := makeRandSlice(1000) - b.ResetTimer() - for i := 0; i < b.N; i++ { - variance(d) - } - }) -} - -func makeRandSlice(size int) []int64 { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := make([]int64, size, size) - - for i := 0; i < size-1; i++ { - s[i] = r.Int63n(25) - } - return s -} - -func makeRandSliceFloat64(size int) []float64 { +func makeRandSlice(size int) []float64 { r := rand.New(rand.NewSource(time.Now().UnixNano())) s := make([]float64, size, size) diff --git a/monitor/metrics/store.go b/monitor/metrics/store.go index 849a5340..0a92711b 100644 --- a/monitor/metrics/store.go +++ b/monitor/metrics/store.go @@ -137,7 +137,7 @@ func (mtrs *Store) PeerLatest(name string, pid peer.ID) *api.Metric { // Distribution returns the distribution of a particular metrics // for a particular peer. -func (mtrs *Store) Distribution(name string, pid peer.ID) []int64 { +func (mtrs *Store) Distribution(name string, pid peer.ID) []float64 { mtrs.mux.RLock() defer mtrs.mux.RUnlock() diff --git a/monitor/metrics/window.go b/monitor/metrics/window.go index 8d8f7e71..35735e47 100644 --- a/monitor/metrics/window.go +++ b/monitor/metrics/window.go @@ -99,14 +99,14 @@ func (mw *Window) All() []*api.Metric { // values contained in the current window. This will // only return values if the api.Metric.Type() is "ping", // which are used for accural failure detection. -func (mw *Window) Distribution() []int64 { +func (mw *Window) Distribution() []float64 { ms := mw.All() - dist := make([]int64, 0, len(ms)-1) + dist := make([]float64, 0, len(ms)-1) // the last value can't be used to calculate a delta for i, v := range ms[:len(ms)-1] { // All() provides an order slice, where ms[i] is younger than ms[i+1] delta := v.ReceivedAt - ms[i+1].ReceivedAt - dist = append(dist, delta) + dist = append(dist, float64(delta)) } return dist diff --git a/monitor/metrics/window_test.go b/monitor/metrics/window_test.go index 14b8390c..0d9bdaca 100644 --- a/monitor/metrics/window_test.go +++ b/monitor/metrics/window_test.go @@ -354,30 +354,30 @@ func BenchmarkWindow_All(b *testing.B) { func TestWindow_Distribution(t *testing.T) { var tests = []struct { name string - heartbeats []int64 - want []int64 + heartbeats []float64 + want []float64 }{ { "even 1 sec distribution", - []int64{1, 1, 1, 1}, - []int64{1, 1, 1, 1}, + []float64{1, 1, 1, 1}, + []float64{1, 1, 1, 1}, }, { "increasing latency distribution", - []int64{1, 1, 2, 2, 3, 3, 4}, - []int64{4, 3, 3, 2, 2, 1, 1}, + []float64{1, 1, 2, 2, 3, 3, 4}, + []float64{4, 3, 3, 2, 2, 1, 1}, }, { "random latency distribution", - []int64{4, 1, 3, 9, 7, 8, 11, 18}, - []int64{18, 11, 8, 7, 9, 3, 1, 4}, + []float64{4, 1, 3, 9, 7, 8, 11, 18}, + []float64{18, 11, 8, 7, 9, 3, 1, 4}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mw := NewWindow(len(tt.heartbeats) + 1) for i, v := range tt.heartbeats { - mw.Add(makeMetric(string(v * 10))) + mw.Add(makeMetric(string(int64(v * 10)))) // time.Sleep on the 1s of milliseconds level is // susceptible to scheduler variance. Hence we // multiple the input by 10 and this combined with @@ -396,10 +396,11 @@ func TestWindow_Distribution(t *testing.T) { t.Errorf("want len: %v, got len: %v", len(tt.want), len(got)) } - var gotseconds []int64 + var gotseconds []float64 for _, v := range got { // truncate nanoseconds to seconds for testing purposes - gotseconds = append(gotseconds, v/10000000) + // also truncate decimal places by converting to int and then back + gotseconds = append(gotseconds, float64(int64(v/10000000))) } for i, s := range gotseconds {