refactor prob to use gonum and pass []float64

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
Adrian Lanzafame 2019-03-21 20:42:56 +10:00
parent bcbe7b453f
commit 4338ea6905
No known key found for this signature in database
GPG Key ID: 87E40C5D62EAE192
8 changed files with 134 additions and 175 deletions

1
go.mod
View File

@ -87,5 +87,6 @@ require (
golang.org/x/sync v0.0.0-20190412183630-56d357773e84 // indirect
golang.org/x/sys v0.0.0-20190416152802-12500544f89f // indirect
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 // indirect
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a
google.golang.org/grpc v1.19.1 // indirect
)

7
go.sum
View File

@ -568,6 +568,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49 h1:mE9V9RMa141kxdQR3pfZM3mkg0MPyw+FOPpnciBXkbE=
golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2 h1:y102fOLFqhV41b+4GPiJoa0k/x+pJcEi2/HB1Y5T6fU=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@ -625,6 +627,7 @@ golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c h1:vamGzbGri8IKo20MQncCuljcQ5uAO6kaCeawQPVblAI=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@ -633,6 +636,10 @@ golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3 h1:P6iTFmrTQqWrqLZPX1VMz
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 h1:PPwnA7z1Pjf7XYaBP9GL1VAMZmcIWyFz7QCMSIIa3Bg=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a h1:XffIu/i+IJIC+M8WoBEmJm8N/YYbA8Pvh748YgzU7kI=
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
google.golang.org/api v0.0.0-20181220000619-583d854617af h1:iQMS7JKv/0w/iiWf1M49Cg3dmOkBoBZT5KheqPDpaac=
google.golang.org/api v0.0.0-20181220000619-583d854617af/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.3.1 h1:oJra/lMfmtm13/rgY/8i3MzjFWYXvQIAKjQ3HqofMk8=

View File

@ -83,21 +83,24 @@ func TestChecker_Watch(t *testing.T) {
}
func TestChecker_Failed(t *testing.T) {
metrics := NewStore()
checker := NewChecker(metrics, 2.0)
t.Run("standard failure check", func(t *testing.T) {
metrics := NewStore()
checker := NewChecker(metrics, 2.0)
for i := 0; i < 10; i++ {
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(2) * time.Second)
}
for i := 0; i < 10; i++ {
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(500*i) * time.Millisecond)
got := checker.Failed(test.PeerID1)
if i >= 17 && !got {
t.Fatal("threshold should have been passed by now")
for i := 0; i < 10; i++ {
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(2) * time.Second)
}
}
for i := 0; i < 10; i++ {
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(500*i) * time.Millisecond)
got := checker.Failed(test.PeerID1)
// TODO(lanzafame): explain magic number 17
if i >= 17 && !got {
t.Fatal("threshold should have been passed by now")
}
}
})
}
func makePeerMetric(pid peer.ID, value string) *api.Metric {

View File

@ -24,16 +24,33 @@ package metrics
import (
"math"
"gonum.org/v1/gonum/floats"
)
// Phi returns the φ-failure for the given value and distribution.
func phi(v float64, d []int64) float64 {
u := mean(d)
o := standardDeviation(d)
if phi := -math.Log10(1 - cdf(u, o, v)); !math.IsInf(phi, 1) {
return phi
// Two edge cases that are dealt with in phi:
// 1. phi == math.+Inf
// 2. phi == math.NaN
//
// Edge case 1. is most certainly a failure, the value of v is is so large
// in comparison to the distribution that the cdf function returns a 1,
// which equates to a math.Log10(0) which is one of its special cases, i.e
// returns -Inf. In this case, phi() will return the math.+Inf value, as it
// will be a valid comparison against the threshold value in checker.Failed.
//
// Edge case 2. could be a failure but may not be. phi() will return NaN
// when the standard deviation of the distribution is 0, i.e. the entire
// distribution is the same number, {1,1,1,1,1}. Considering that we are
// using UnixNano timestamps this would be highly unlikely, but just in case
// phi() will return a -1 value, indicating that the caller should retry.
func phi(v float64, d []float64) float64 {
u, o := meanStdDev(d)
phi := -math.Log10(1 - cdf(u, o, v))
if math.IsNaN(phi) {
return -1
}
return 0
return phi
}
// CDF returns the cumulative distribution function if the given
@ -42,34 +59,20 @@ func cdf(u, o, v float64) float64 {
return ((1.0 / 2.0) * (1 + math.Erf((v-u)/(o*math.Sqrt2))))
}
// Mean returns the mean of the given sample.
func mean(values []int64) float64 {
func meanStdDev(v []float64) (m, sd float64) {
var variance float64
m, variance = meanVariance(v)
sd = math.Sqrt(variance)
return
}
func meanVariance(values []float64) (m, v float64) {
if len(values) == 0 {
return 0.0
return 0.0, 0.0
}
var sum int64
for _, v := range values {
sum += v
}
return float64(sum) / float64(len(values))
}
// StandardDeviation returns standard deviation of the given sample.
func standardDeviation(v []int64) float64 {
return math.Sqrt(variance(v))
}
// Variance returns variance if the given sample.
func variance(values []int64) float64 {
if len(values) == 0 {
return 0.0
}
m := mean(values)
var sum float64
for _, v := range values {
d := float64(v) - m
sum += d * d
}
return sum / float64(len(values))
m = floats.Sum(values) / float64(len(values))
floats.AddConst(-m, values)
floats.Mul(values, values)
v = floats.Sum(values) / float64(len(values))
return
}

View File

@ -21,7 +21,7 @@ import (
func Test_phi(t *testing.T) {
type args struct {
v float64
d []int64
d []float64
}
tests := []struct {
name string
@ -29,15 +29,23 @@ func Test_phi(t *testing.T) {
want float64
}{
{
"zero values",
args{0, []int64{0}},
math.NaN(), // won't actually be used in comparison; see math.IsNaN() def
"infinity edge case",
args{
10,
[]float64{0, 0, 0, 0, 1, 1, 1, 1, 2, 2},
},
math.Inf(1),
},
{
"NaN edge case",
args{10000001, []float64{10000001, 10000001}},
-1, // phi() replaces math.NaN with -1 so it is still comparable
},
{
"increasing values",
args{
4,
[]int64{2, 4, 4, 4, 5, 5, 7, 9},
[]float64{2, 4, 4, 4, 5, 5, 7, 9},
},
0.160231392277849,
},
@ -45,7 +53,7 @@ func Test_phi(t *testing.T) {
"decreasing values",
args{
-4,
[]int64{-2, -4, -4, -4, -5, -5, -7, -9},
[]float64{-2, -4, -4, -4, -5, -5, -7, -9},
},
0.5106919892652407,
},
@ -62,7 +70,7 @@ func Test_phi(t *testing.T) {
func Test_cdf(t *testing.T) {
type args struct {
values []int64
values []float64
v float64
}
tests := []struct {
@ -72,13 +80,13 @@ func Test_cdf(t *testing.T) {
}{
{
"zero values",
args{[]int64{0}, 0},
args{[]float64{0}, 0},
math.NaN(),
},
{
"increasing values",
args{
[]int64{2, 4, 4, 4, 5, 5, 7, 9},
[]float64{2, 4, 4, 4, 5, 5, 7, 9},
4,
},
0.3085375387259869,
@ -86,7 +94,7 @@ func Test_cdf(t *testing.T) {
{
"decreasing values",
args{
[]int64{-2, -4, -4, -4, -5, -5, -7, -9},
[]float64{-2, -4, -4, -4, -5, -5, -7, -9},
-4,
},
0.6914624612740131,
@ -94,8 +102,7 @@ func Test_cdf(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := mean(tt.args.values)
sd := standardDeviation(tt.args.values)
m, sd := meanStdDev(tt.args.values)
got := cdf(m, sd, tt.args.v)
if got != tt.want && !math.IsNaN(got) {
t.Errorf("cdf() = %v, want %v", got, tt.want)
@ -104,43 +111,51 @@ func Test_cdf(t *testing.T) {
}
}
func Test_mean(t *testing.T) {
func Test_meanVariance(t *testing.T) {
type args struct {
values []int64
values []float64
}
tests := []struct {
name string
args args
want float64
name string
args args
wantMean float64
wantVariance float64
}{
{
"zero values",
args{[]int64{}},
args{[]float64{}},
0,
0,
},
{
"increasing values",
args{[]int64{2, 4, 4, 4, 5, 5, 7, 9}},
args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}},
5,
4,
},
{
"decreasing values",
args{[]int64{-2, -4, -4, -4, -5, -5, -7, -9}},
args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}},
-5,
4,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := mean(tt.args.values); got != tt.want {
t.Errorf("mean() = %v, want %v", got, tt.want)
m, v := meanVariance(tt.args.values)
if m != tt.wantMean {
t.Errorf("mean() = %v, want %v", m, tt.wantMean)
}
if v != tt.wantVariance {
t.Errorf("variance() = %v, want %v", v, tt.wantVariance)
}
})
}
}
func Test_standardDeviation(t *testing.T) {
func Test_meanStdDev(t *testing.T) {
type args struct {
v []int64
v []float64
}
tests := []struct {
name string
@ -149,63 +164,31 @@ func Test_standardDeviation(t *testing.T) {
}{
{
"zero values",
args{[]int64{}},
args{[]float64{}},
0,
},
{
"increasing values",
args{[]int64{2, 4, 4, 4, 5, 5, 7, 9}},
args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}},
2,
},
{
"decreasing values",
args{[]int64{-2, -4, -4, -4, -5, -5, -7, -9}},
args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}},
2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := standardDeviation(tt.args.v); got != tt.want {
// ignore mean value as it was tested in Test_meanVariance
// it is the same underlying implementation.
if _, got := meanStdDev(tt.args.v); got != tt.want {
t.Errorf("standardDeviation() = %v, want %v", got, tt.want)
}
})
}
}
func Test_variance(t *testing.T) {
type args struct {
values []int64
}
tests := []struct {
name string
args args
want float64
}{
{
"zero values",
args{[]int64{}},
0,
},
{
"increasing values",
args{[]int64{2, 4, 4, 4, 5, 5, 7, 9}},
4,
},
{
"decreasing values",
args{[]int64{-2, -4, -4, -4, -5, -5, -7, -9}},
4,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := variance(tt.args.values); got != tt.want {
t.Errorf("variance() = %.5f, want %v", got, tt.want)
}
})
}
}
func Benchmark_prob_phi(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -242,8 +225,7 @@ func Benchmark_prob_cdf(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
u := mean(d)
o := standardDeviation(d)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -253,8 +235,7 @@ func Benchmark_prob_cdf(b *testing.B) {
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
u := mean(d)
o := standardDeviation(d)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -264,8 +245,7 @@ func Benchmark_prob_cdf(b *testing.B) {
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
u := mean(d)
o := standardDeviation(d)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -274,12 +254,12 @@ func Benchmark_prob_cdf(b *testing.B) {
})
}
func Benchmark_prob_mean(b *testing.B) {
func Benchmark_prob_meanVariance(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mean(d)
meanVariance(d)
}
})
@ -287,7 +267,7 @@ func Benchmark_prob_mean(b *testing.B) {
d := makeRandSlice(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mean(d)
meanVariance(d)
}
})
@ -295,17 +275,17 @@ func Benchmark_prob_mean(b *testing.B) {
d := makeRandSlice(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mean(d)
meanVariance(d)
}
})
}
func Benchmark_prob_standardDeviation(b *testing.B) {
func Benchmark_prob_meanStdDev(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
standardDeviation(d)
meanStdDev(d)
}
})
@ -313,7 +293,7 @@ func Benchmark_prob_standardDeviation(b *testing.B) {
d := makeRandSlice(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
standardDeviation(d)
meanStdDev(d)
}
})
@ -321,48 +301,12 @@ func Benchmark_prob_standardDeviation(b *testing.B) {
d := makeRandSlice(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
standardDeviation(d)
meanStdDev(d)
}
})
}
func Benchmark_prob_variance(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
variance(d)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
variance(d)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
variance(d)
}
})
}
func makeRandSlice(size int) []int64 {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
s := make([]int64, size, size)
for i := 0; i < size-1; i++ {
s[i] = r.Int63n(25)
}
return s
}
func makeRandSliceFloat64(size int) []float64 {
func makeRandSlice(size int) []float64 {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
s := make([]float64, size, size)

View File

@ -137,7 +137,7 @@ func (mtrs *Store) PeerLatest(name string, pid peer.ID) *api.Metric {
// Distribution returns the distribution of a particular metrics
// for a particular peer.
func (mtrs *Store) Distribution(name string, pid peer.ID) []int64 {
func (mtrs *Store) Distribution(name string, pid peer.ID) []float64 {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()

View File

@ -99,14 +99,14 @@ func (mw *Window) All() []*api.Metric {
// values contained in the current window. This will
// only return values if the api.Metric.Type() is "ping",
// which are used for accural failure detection.
func (mw *Window) Distribution() []int64 {
func (mw *Window) Distribution() []float64 {
ms := mw.All()
dist := make([]int64, 0, len(ms)-1)
dist := make([]float64, 0, len(ms)-1)
// the last value can't be used to calculate a delta
for i, v := range ms[:len(ms)-1] {
// All() provides an order slice, where ms[i] is younger than ms[i+1]
delta := v.ReceivedAt - ms[i+1].ReceivedAt
dist = append(dist, delta)
dist = append(dist, float64(delta))
}
return dist

View File

@ -354,30 +354,30 @@ func BenchmarkWindow_All(b *testing.B) {
func TestWindow_Distribution(t *testing.T) {
var tests = []struct {
name string
heartbeats []int64
want []int64
heartbeats []float64
want []float64
}{
{
"even 1 sec distribution",
[]int64{1, 1, 1, 1},
[]int64{1, 1, 1, 1},
[]float64{1, 1, 1, 1},
[]float64{1, 1, 1, 1},
},
{
"increasing latency distribution",
[]int64{1, 1, 2, 2, 3, 3, 4},
[]int64{4, 3, 3, 2, 2, 1, 1},
[]float64{1, 1, 2, 2, 3, 3, 4},
[]float64{4, 3, 3, 2, 2, 1, 1},
},
{
"random latency distribution",
[]int64{4, 1, 3, 9, 7, 8, 11, 18},
[]int64{18, 11, 8, 7, 9, 3, 1, 4},
[]float64{4, 1, 3, 9, 7, 8, 11, 18},
[]float64{18, 11, 8, 7, 9, 3, 1, 4},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mw := NewWindow(len(tt.heartbeats) + 1)
for i, v := range tt.heartbeats {
mw.Add(makeMetric(string(v * 10)))
mw.Add(makeMetric(string(int64(v * 10))))
// time.Sleep on the 1s of milliseconds level is
// susceptible to scheduler variance. Hence we
// multiple the input by 10 and this combined with
@ -396,10 +396,11 @@ func TestWindow_Distribution(t *testing.T) {
t.Errorf("want len: %v, got len: %v", len(tt.want), len(got))
}
var gotseconds []int64
var gotseconds []float64
for _, v := range got {
// truncate nanoseconds to seconds for testing purposes
gotseconds = append(gotseconds, v/10000000)
// also truncate decimal places by converting to int and then back
gotseconds = append(gotseconds, float64(int64(v/10000000)))
}
for i, s := range gotseconds {