diff --git a/monitor/metrics/checker.go b/monitor/metrics/checker.go index e8c58973..309c4fb1 100644 --- a/monitor/metrics/checker.go +++ b/monitor/metrics/checker.go @@ -19,16 +19,18 @@ var ErrAlertChannelFull = errors.New("alert channel is full") // Checker provides utilities to find expired metrics // for a given peerset and send alerts if it proceeds to do so. type Checker struct { - alertCh chan *api.Alert - metrics *Store + alertCh chan *api.Alert + metrics *Store + threshold float64 } // NewChecker creates a Checker using the given // MetricsStore. -func NewChecker(metrics *Store) *Checker { +func NewChecker(metrics *Store, threshold float64) *Checker { return &Checker{ - alertCh: make(chan *api.Alert, AlertChannelCap), - metrics: metrics, + alertCh: make(chan *api.Alert, AlertChannelCap), + metrics: metrics, + threshold: threshold, } } @@ -114,3 +116,17 @@ func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]pe } } } + +// Failed returns if a peer has potentially failed. Peers +// that are not present in the metrics store will return +// as failed. +func (mc *Checker) Failed(pid peer.ID) bool { + latest := mc.metrics.PeerLatest("ping", pid) + if latest == nil { + return true + } + v := time.Now().UnixNano() - latest.TS + dv := mc.metrics.Distribution("ping", pid) + phiv := phi(float64(v), dv) + return phiv >= mc.threshold +} diff --git a/monitor/metrics/checker_test.go b/monitor/metrics/checker_test.go index bc97cd7f..bbf8c162 100644 --- a/monitor/metrics/checker_test.go +++ b/monitor/metrics/checker_test.go @@ -13,7 +13,7 @@ import ( func TestChecker(t *testing.T) { metrics := NewStore() - checker := NewChecker(metrics) + checker := NewChecker(metrics, 2.0) metr := &api.Metric{ Name: "test", @@ -52,12 +52,12 @@ func TestChecker(t *testing.T) { } } -func TestCheckerWatch(t *testing.T) { +func TestChecker_Watch(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() metrics := NewStore() - checker := NewChecker(metrics) + checker := NewChecker(metrics, 2.0) metr := &api.Metric{ Name: "test", @@ -81,3 +81,31 @@ func TestCheckerWatch(t *testing.T) { t.Fatal("should have received an alert") } } + +func TestChecker_Failed(t *testing.T) { + metrics := NewStore() + checker := NewChecker(metrics, 2.0) + + for i := 0; i < 10; i++ { + metrics.Add(makePeerMetric(test.PeerID1, "1")) + time.Sleep(time.Duration(2) * time.Second) + } + for i := 0; i < 10; i++ { + metrics.Add(makePeerMetric(test.PeerID1, "1")) + time.Sleep(time.Duration(500*i) * time.Millisecond) + got := checker.Failed(test.PeerID1) + if i >= 17 && !got { + t.Fatal("threshold should have been passed by now") + } + } +} + +func makePeerMetric(pid peer.ID, value string) *api.Metric { + metr := &api.Metric{ + Name: "ping", + Peer: pid, + Value: value, + Valid: true, + } + return metr +} diff --git a/monitor/metrics/prob.go b/monitor/metrics/prob.go new file mode 100644 index 00000000..ead55b65 --- /dev/null +++ b/monitor/metrics/prob.go @@ -0,0 +1,86 @@ +/* +Copyright (©) 2015 Timothée Peignier + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +package metrics + +import ( + "math" + "math/big" +) + +// Phi returns the φ-failure for the given value and distribution. +func phi(v float64, d []int64) float64 { + u := mean(d) + o := standardDeviation(d) + cdf := cdf(u, o, big.NewFloat(v)) + phi := -math.Log10(1 - cdf) + if math.IsInf(phi, 1) { + phi = 0 + } + return phi +} + +// CDF returns the cumulative distribution function if the given +// normal function, for the given value. +func cdf(u, o, v *big.Float) float64 { + var a, b, c big.Float + c.Quo(b.Sub(v, u), a.Mul(o, big.NewFloat(math.Sqrt2))) + cf, _ := c.Float64() + cdf := ((1.0 / 2.0) * (1 + math.Erf(cf))) + return cdf +} + +// Mean returns the mean of the given sample. +func mean(values []int64) *big.Float { + if len(values) == 0 { + return big.NewFloat(0.0) + } + var sum int64 + for _, v := range values { + sum += v + } + var q big.Float + return q.Quo(big.NewFloat(float64(sum)), big.NewFloat(float64(len(values)))) +} + +// StandardDeviation returns standard deviation of the given sample. +func standardDeviation(v []int64) *big.Float { + var z big.Float + z.Sqrt(variance(v)).Float64() + return &z +} + +// Variance returns variance if the given sample. +func variance(values []int64) *big.Float { + if len(values) == 0 { + return big.NewFloat(0.0) + } + m := mean(values) + var sum, pwr, res big.Float + for _, v := range values { + d := big.NewFloat(float64(v)) + d.Sub(d, m) + pwr.Mul(d, d) + sum.Add(&sum, &pwr) + } + return res.Quo(&sum, big.NewFloat(float64(len(values)))) +} diff --git a/monitor/metrics/store.go b/monitor/metrics/store.go index 5ac04667..849a5340 100644 --- a/monitor/metrics/store.go +++ b/monitor/metrics/store.go @@ -61,6 +61,7 @@ func (mtrs *Store) LatestValid(name string) []*api.Metric { metrics := make([]*api.Metric, 0, len(byPeer)) for _, window := range byPeer { m, err := window.Latest() + // TODO(ajl): for accrual, does it matter if a ping has expired? if err != nil || m.Discard() { continue } @@ -110,3 +111,45 @@ func (mtrs *Store) PeerMetrics(pid peer.ID) []*api.Metric { } return result } + +// PeerLatest returns the latest of a particular metric for a +// particular peer. It may return an expired metric. +func (mtrs *Store) PeerLatest(name string, pid peer.ID) *api.Metric { + mtrs.mux.RLock() + defer mtrs.mux.RUnlock() + + byPeer, ok := mtrs.byName[name] + if !ok { + return nil + } + + window, ok := byPeer[pid] + if !ok { + return nil + } + m, err := window.Latest() + if err != nil { + // ignoring error, as nil metric is indicative enough + return nil + } + return m +} + +// Distribution returns the distribution of a particular metrics +// for a particular peer. +func (mtrs *Store) Distribution(name string, pid peer.ID) []int64 { + mtrs.mux.RLock() + defer mtrs.mux.RUnlock() + + byPeer, ok := mtrs.byName[name] + if !ok { + return nil + } + + window, ok := byPeer[pid] + if !ok { + return nil + } + + return window.Distribution() +}