From 0069c0062f892ac0325928cdc8950237001fca2a Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 5 Apr 2018 16:09:41 +0200 Subject: [PATCH] Fix metric expire type. Do not discard metrics in Allocate(). License: MIT Signed-off-by: Hector Sanjuan --- allocate.go | 7 +-- allocator/ascendalloc/ascendalloc_test.go | 2 +- allocator/descendalloc/descendalloc_test.go | 2 +- api/types.go | 26 +++------- config_test.go | 2 +- ipfscluster_test.go | 2 +- monitor/basic/peer_monitor.go | 2 +- monitor/basic/peer_monitor_test.go | 54 +++++++++++++++++++++ peer_manager_test.go | 2 + 9 files changed, 72 insertions(+), 27 deletions(-) diff --git a/allocate.go b/allocate.go index 34c1efc5..3c4b0f12 100644 --- a/allocate.go +++ b/allocate.go @@ -59,11 +59,12 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I priorityMetrics := make(map[peer.ID]api.Metric) // Divide metrics between current and candidates. + // All metrics in metrics are valid (at least the + // moment they were compiled by the monitor) for _, m := range metrics { switch { - case m.Discard() || containsPeer(blacklist, m.Peer): - // discard peers with invalid metrics and - // those in the blacklist + case containsPeer(blacklist, m.Peer): + // discard blacklisted peers continue case containsPeer(currentAllocs, m.Peer): currentMetrics[m.Peer] = m diff --git a/allocator/ascendalloc/ascendalloc_test.go b/allocator/ascendalloc/ascendalloc_test.go index 379f88e9..5d0fd646 100644 --- a/allocator/ascendalloc/ascendalloc_test.go +++ b/allocator/ascendalloc/ascendalloc_test.go @@ -24,7 +24,7 @@ var ( testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") ) -var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano) +var inAMinute = time.Now().Add(time.Minute).UnixNano() var testCases = []testcase{ { // regular sort diff --git a/allocator/descendalloc/descendalloc_test.go b/allocator/descendalloc/descendalloc_test.go index b79b4699..2552d1f6 100644 --- a/allocator/descendalloc/descendalloc_test.go +++ b/allocator/descendalloc/descendalloc_test.go @@ -24,7 +24,7 @@ var ( testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") ) -var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano) +var inAMinute = time.Now().Add(time.Minute).UnixNano() var testCases = []testcase{ { // regular sort diff --git a/api/types.go b/api/types.go index c428f19d..49edd39f 100644 --- a/api/types.go +++ b/api/types.go @@ -602,8 +602,8 @@ type Metric struct { Name string Peer peer.ID // filled-in by Cluster. Value string - Expire string // RFC3339Nano - Valid bool // if the metric is not valid it will be discarded + Expire int64 // UnixNano + Valid bool // if the metric is not valid it will be discarded } // SetTTL sets Metric to expire after the given seconds @@ -615,31 +615,19 @@ func (m *Metric) SetTTL(seconds int) { // SetTTLDuration sets Metric to expire after the given time.Duration func (m *Metric) SetTTLDuration(d time.Duration) { exp := time.Now().Add(d) - m.Expire = exp.UTC().Format(time.RFC3339Nano) + m.Expire = exp.UnixNano() } // GetTTL returns the time left before the Metric expires func (m *Metric) GetTTL() time.Duration { - if m.Expire == "" { - return 0 - } - exp, err := time.Parse(time.RFC3339Nano, m.Expire) - if err != nil { - panic(err) - } - return exp.Sub(time.Now()) + expDate := time.Unix(0, m.Expire) + return expDate.Sub(time.Now()) } // Expired returns if the Metric has expired func (m *Metric) Expired() bool { - if m.Expire == "" { - return true - } - exp, err := time.Parse(time.RFC3339Nano, m.Expire) - if err != nil { - panic(err) - } - return time.Now().After(exp) + expDate := time.Unix(0, m.Expire) + return time.Now().After(expDate) } // Discard returns if the metric not valid or has expired diff --git a/config_test.go b/config_test.go index 29cd9b5c..aad03f56 100644 --- a/config_test.go +++ b/config_test.go @@ -72,7 +72,7 @@ var testingTrackerCfg = []byte(` `) var testingMonCfg = []byte(`{ - "check_interval": "400ms" + "check_interval": "300ms" }`) var testingDiskInfCfg = []byte(`{ diff --git a/ipfscluster_test.go b/ipfscluster_test.go index cce85ecd..6d2e99ce 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -23,12 +23,12 @@ import ( "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state/mapstate" "github.com/ipfs/ipfs-cluster/test" - peerstore "github.com/libp2p/go-libp2p-peerstore" cid "github.com/ipfs/go-cid" crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" ) diff --git a/monitor/basic/peer_monitor.go b/monitor/basic/peer_monitor.go index 341c7659..b76f66c8 100644 --- a/monitor/basic/peer_monitor.go +++ b/monitor/basic/peer_monitor.go @@ -184,7 +184,7 @@ func (mon *Monitor) LogMetric(m api.Metric) { mbyp[peer] = pmets } - logger.Debugf("logged '%s' metric from '%s'. Expires on %s", name, peer, m.Expire) + logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire) pmets.add(m) } diff --git a/monitor/basic/peer_monitor_test.go b/monitor/basic/peer_monitor_test.go index 4f357a7f..87aeb120 100644 --- a/monitor/basic/peer_monitor_test.go +++ b/monitor/basic/peer_monitor_test.go @@ -2,6 +2,8 @@ package basic import ( "fmt" + "strconv" + "sync" "testing" "time" @@ -51,6 +53,58 @@ func TestPeerMonitorShutdown(t *testing.T) { } } +func TestLogMetricConcurrent(t *testing.T) { + pm := testPeerMonitor(t) + defer pm.Shutdown() + + var wg sync.WaitGroup + wg.Add(3) + + f := func() { + defer wg.Done() + for i := 0; i < 25; i++ { + mt := api.Metric{ + Name: "test", + Peer: test.TestPeerID1, + Value: fmt.Sprintf("%d", time.Now().UnixNano()), + Valid: true, + } + mt.SetTTLDuration(150 * time.Millisecond) + pm.LogMetric(mt) + time.Sleep(75 * time.Millisecond) + } + } + go f() + go f() + go f() + + time.Sleep(150 * time.Millisecond) + last := time.Now().Add(-500 * time.Millisecond) + + for i := 0; i <= 20; i++ { + lastMtrcs := pm.LastMetrics("test") + + if len(lastMtrcs) != 1 { + t.Error("no valid metrics", len(lastMtrcs), i) + time.Sleep(75 * time.Millisecond) + continue + } + + n, err := strconv.Atoi(lastMtrcs[0].Value) + if err != nil { + t.Fatal(err) + } + current := time.Unix(0, int64(n)) + if current.Before(last) { + t.Errorf("expected newer metric: Current: %s, Last: %s", current, last) + } + last = current + time.Sleep(75 * time.Millisecond) + } + + wg.Wait() +} + func TestPeerMonitorLogMetric(t *testing.T) { pm := testPeerMonitor(t) defer pm.Shutdown() diff --git a/peer_manager_test.go b/peer_manager_test.go index 5a9e3f2f..4b623b47 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -371,7 +371,9 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { t.Fatal("error removing peer:", err) } + delay() waitForLeaderAndMetrics(t, clusters) + delay() // this seems to fail when not waiting enough... for _, icid := range interestingCids { // Now check that the allocations are new.