Fix metric expire type. Do not discard metrics in Allocate().
License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
parent
f5f56f2d11
commit
0069c0062f
|
@ -59,11 +59,12 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
|
|||
priorityMetrics := make(map[peer.ID]api.Metric)
|
||||
|
||||
// Divide metrics between current and candidates.
|
||||
// All metrics in metrics are valid (at least the
|
||||
// moment they were compiled by the monitor)
|
||||
for _, m := range metrics {
|
||||
switch {
|
||||
case m.Discard() || containsPeer(blacklist, m.Peer):
|
||||
// discard peers with invalid metrics and
|
||||
// those in the blacklist
|
||||
case containsPeer(blacklist, m.Peer):
|
||||
// discard blacklisted peers
|
||||
continue
|
||||
case containsPeer(currentAllocs, m.Peer):
|
||||
currentMetrics[m.Peer] = m
|
||||
|
|
|
@ -24,7 +24,7 @@ var (
|
|||
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
|
||||
)
|
||||
|
||||
var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano)
|
||||
var inAMinute = time.Now().Add(time.Minute).UnixNano()
|
||||
|
||||
var testCases = []testcase{
|
||||
{ // regular sort
|
||||
|
|
|
@ -24,7 +24,7 @@ var (
|
|||
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
|
||||
)
|
||||
|
||||
var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano)
|
||||
var inAMinute = time.Now().Add(time.Minute).UnixNano()
|
||||
|
||||
var testCases = []testcase{
|
||||
{ // regular sort
|
||||
|
|
26
api/types.go
26
api/types.go
|
@ -602,8 +602,8 @@ type Metric struct {
|
|||
Name string
|
||||
Peer peer.ID // filled-in by Cluster.
|
||||
Value string
|
||||
Expire string // RFC3339Nano
|
||||
Valid bool // if the metric is not valid it will be discarded
|
||||
Expire int64 // UnixNano
|
||||
Valid bool // if the metric is not valid it will be discarded
|
||||
}
|
||||
|
||||
// SetTTL sets Metric to expire after the given seconds
|
||||
|
@ -615,31 +615,19 @@ func (m *Metric) SetTTL(seconds int) {
|
|||
// SetTTLDuration sets Metric to expire after the given time.Duration
|
||||
func (m *Metric) SetTTLDuration(d time.Duration) {
|
||||
exp := time.Now().Add(d)
|
||||
m.Expire = exp.UTC().Format(time.RFC3339Nano)
|
||||
m.Expire = exp.UnixNano()
|
||||
}
|
||||
|
||||
// GetTTL returns the time left before the Metric expires
|
||||
func (m *Metric) GetTTL() time.Duration {
|
||||
if m.Expire == "" {
|
||||
return 0
|
||||
}
|
||||
exp, err := time.Parse(time.RFC3339Nano, m.Expire)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return exp.Sub(time.Now())
|
||||
expDate := time.Unix(0, m.Expire)
|
||||
return expDate.Sub(time.Now())
|
||||
}
|
||||
|
||||
// Expired returns if the Metric has expired
|
||||
func (m *Metric) Expired() bool {
|
||||
if m.Expire == "" {
|
||||
return true
|
||||
}
|
||||
exp, err := time.Parse(time.RFC3339Nano, m.Expire)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return time.Now().After(exp)
|
||||
expDate := time.Unix(0, m.Expire)
|
||||
return time.Now().After(expDate)
|
||||
}
|
||||
|
||||
// Discard returns if the metric not valid or has expired
|
||||
|
|
|
@ -72,7 +72,7 @@ var testingTrackerCfg = []byte(`
|
|||
`)
|
||||
|
||||
var testingMonCfg = []byte(`{
|
||||
"check_interval": "400ms"
|
||||
"check_interval": "300ms"
|
||||
}`)
|
||||
|
||||
var testingDiskInfCfg = []byte(`{
|
||||
|
|
|
@ -23,12 +23,12 @@ import (
|
|||
"github.com/ipfs/ipfs-cluster/state"
|
||||
"github.com/ipfs/ipfs-cluster/state/mapstate"
|
||||
"github.com/ipfs/ipfs-cluster/test"
|
||||
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
|
|
|
@ -184,7 +184,7 @@ func (mon *Monitor) LogMetric(m api.Metric) {
|
|||
mbyp[peer] = pmets
|
||||
}
|
||||
|
||||
logger.Debugf("logged '%s' metric from '%s'. Expires on %s", name, peer, m.Expire)
|
||||
logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire)
|
||||
pmets.add(m)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ package basic
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -51,6 +53,58 @@ func TestPeerMonitorShutdown(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLogMetricConcurrent(t *testing.T) {
|
||||
pm := testPeerMonitor(t)
|
||||
defer pm.Shutdown()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
|
||||
f := func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 25; i++ {
|
||||
mt := api.Metric{
|
||||
Name: "test",
|
||||
Peer: test.TestPeerID1,
|
||||
Value: fmt.Sprintf("%d", time.Now().UnixNano()),
|
||||
Valid: true,
|
||||
}
|
||||
mt.SetTTLDuration(150 * time.Millisecond)
|
||||
pm.LogMetric(mt)
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
go f()
|
||||
go f()
|
||||
go f()
|
||||
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
last := time.Now().Add(-500 * time.Millisecond)
|
||||
|
||||
for i := 0; i <= 20; i++ {
|
||||
lastMtrcs := pm.LastMetrics("test")
|
||||
|
||||
if len(lastMtrcs) != 1 {
|
||||
t.Error("no valid metrics", len(lastMtrcs), i)
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
n, err := strconv.Atoi(lastMtrcs[0].Value)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
current := time.Unix(0, int64(n))
|
||||
if current.Before(last) {
|
||||
t.Errorf("expected newer metric: Current: %s, Last: %s", current, last)
|
||||
}
|
||||
last = current
|
||||
time.Sleep(75 * time.Millisecond)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestPeerMonitorLogMetric(t *testing.T) {
|
||||
pm := testPeerMonitor(t)
|
||||
defer pm.Shutdown()
|
||||
|
|
|
@ -371,7 +371,9 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
t.Fatal("error removing peer:", err)
|
||||
}
|
||||
|
||||
delay()
|
||||
waitForLeaderAndMetrics(t, clusters)
|
||||
delay() // this seems to fail when not waiting enough...
|
||||
|
||||
for _, icid := range interestingCids {
|
||||
// Now check that the allocations are new.
|
||||
|
|
Loading…
Reference in New Issue
Block a user