FreeSpace metric impl (including descendalloc) refactored and tested.

This commit is contained in:
dgrisham 2017-08-04 13:35:30 -06:00
parent a46ab3dda9
commit 2d2a9da793
9 changed files with 323 additions and 92 deletions

View File

@ -1,13 +1,13 @@
// Package ascendalloc implements an ipfscluster.Allocator returns allocations
// based on sorting the metrics in ascending order. Thus, peers with smallest
// metrics are first in the list. This allocator can be used with a number
// of informers, as long as they provide a numeric metric value.
// Package ascendalloc implements an ipfscluster.PinAllocator, which returns
// allocations based on sorting the metrics in ascending order. Thus, peers with
// smallest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package ascendalloc
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
@ -18,79 +18,27 @@ import (
var logger = logging.Logger("ascendalloc")
// Allocator implements ipfscluster.Allocate.
type Allocator struct{}
// AscendAllocator extends the SimpleAllocator
type AscendAllocator struct{}
// NewAllocator returns an initialized Allocator
func NewAllocator() *Allocator {
return &Allocator{}
// NewAscendAllocator returns an initialized AscendAllocator
func NewAllocator() AscendAllocator {
return AscendAllocator{}
}
// SetClient does nothing in this allocator
func (alloc *Allocator) SetClient(c *rpc.Client) {}
func (alloc AscendAllocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc *Allocator) Shutdown() error { return nil }
func (alloc AscendAllocator) Shutdown() error { return nil }
// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the candidates
// based on their metric values (from smallest to largest).
func (alloc *Allocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
sortable := newMetricsSorter(candidates)
sortable := util.NewMetricSorter(candidates, false)
sort.Sort(sortable)
return sortable.peers, nil
}
// metricsSorter attaches sort.Interface methods to our metrics and sorts
// a slice of peers in the way that interest us
type metricsSorter struct {
peers []peer.ID
m map[peer.ID]int
}
func newMetricsSorter(m map[peer.ID]api.Metric) *metricsSorter {
vMap := make(map[peer.ID]int)
peers := make([]peer.ID, 0, len(m))
for k, v := range m {
if v.Discard() {
continue
}
val, err := strconv.Atoi(v.Value)
if err != nil {
continue
}
peers = append(peers, k)
vMap[k] = val
}
sorter := &metricsSorter{
m: vMap,
peers: peers,
}
return sorter
}
// Len returns the number of metrics
func (s metricsSorter) Len() int {
return len(s.peers)
}
// Less reports if the element in position i is less than the element in j
func (s metricsSorter) Less(i, j int) bool {
peeri := s.peers[i]
peerj := s.peers[j]
x := s.m[peeri]
y := s.m[peerj]
return x < y
}
// Swap swaps the elements in positions i and j
func (s metricsSorter) Swap(i, j int) {
temp := s.peers[i]
s.peers[i] = s.peers[j]
s.peers[j] = temp
return sortable.Peers, nil
}

View File

@ -97,7 +97,7 @@ var testCases = []testcase{
}
func Test(t *testing.T) {
alloc := &Allocator{}
alloc := &AscendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates)

View File

@ -0,0 +1,44 @@
// Package descendalloc implements an ipfscluster.util.Allocator returns
// allocations based on sorting the metrics in descending order. Thus, peers
// with largest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package descendalloc
import (
"sort"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("descendalloc")
// DescendAllocator extends the SimpleAllocator
type DescendAllocator struct{}
// NewDescendAllocator returns an initialized DescendAllocator
func NewAllocator() DescendAllocator {
return DescendAllocator{}
}
// SetClient does nothing in this allocator
func (alloc DescendAllocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc DescendAllocator) Shutdown() error { return nil }
// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
sortable := util.NewMetricSorter(candidates, true)
sort.Sort(sortable)
return sortable.Peers, nil
}

View File

@ -0,0 +1,116 @@
package descendalloc
import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
type testcase struct {
candidates map[peer.ID]api.Metric
current map[peer.ID]api.Metric
expected []peer.ID
}
var (
peer0 = peer.ID("QmUQ6Nsejt1SuZAu8yL8WgqQZHHAYreLVYYa4VPsLUCed7")
peer1 = peer.ID("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
peer2 = peer.ID("QmPrSBATWGAN56fiiEWEhKX3L1F3mTghEQR7vQwaeo7zHi")
peer3 = peer.ID("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)
var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano)
var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: api.Metric{
Name: numpin.MetricName,
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: api.Metric{
Name: numpin.MetricName,
Value: "2",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1, peer3, peer2, peer0},
},
{ // filter invalid
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
}
func Test(t *testing.T) {
alloc := &DescendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates)
if err != nil {
t.Fatal(err)
}
if len(res) == 0 {
t.Fatal("0 allocations")
}
for i, r := range res {
if e := tc.expected[len(res)-i-1]; r != e {
t.Errorf("Expect r[%d]=%s but got %s", i, r, e)
}
}
}
}

View File

@ -0,0 +1,65 @@
package util
import (
"strconv"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-peer"
)
type MetricSorter struct {
Peers []peer.ID
M map[peer.ID]int
Reverse bool
}
func NewMetricSorter(m map[peer.ID]api.Metric, reverse bool) *MetricSorter {
vMap := make(map[peer.ID]int)
peers := make([]peer.ID, 0, len(m))
for k, v := range m {
if v.Discard() {
continue
}
val, err := strconv.Atoi(v.Value)
if err != nil {
continue
}
peers = append(peers, k)
vMap[k] = val
}
sorter := &MetricSorter{
M: vMap,
Peers: peers,
Reverse: reverse,
}
return sorter
}
// Len returns the number of metrics
func (s MetricSorter) Len() int {
return len(s.Peers)
}
// Swap swaps the elements in positions i and j
func (s MetricSorter) Swap(i, j int) {
temp := s.Peers[i]
s.Peers[i] = s.Peers[j]
s.Peers[j] = temp
}
// Less reports if the element in position i is less than the element in j
// (important to override this)
func (s MetricSorter) Less(i, j int) bool {
peeri := s.Peers[i]
peerj := s.Peers[j]
x := s.M[peeri]
y := s.M[peerj]
if s.Reverse {
return x > y
}
return x < y
}

View File

@ -1,6 +1,7 @@
// Package disk implements an ipfs-cluster informer which determines
// the current RepoSize of the ipfs daemon datastore and returns it as an
// api.Metric.
// Package disk implements an ipfs-cluster informer which uses a metric (e.g.
// RepoSize or FreeSpace of the IPFS daemon datastore) and returns it as an
// api.Metric. The supported metrics are listed as the keys in the nameToRPC
// map below.
package disk
import (
@ -12,17 +13,14 @@ import (
"github.com/ipfs/ipfs-cluster/api"
)
// TODO: switch default to disk-freespace
const DefaultMetric = "disk-reposize"
const DefaultMetric = "disk-freespace"
var logger = logging.Logger("diskinfo")
// MetricTTL specifies how long our reported metric is valid in seconds.
var MetricTTL = 30
// MetricName specifies the name of our metric
var MetricName string
// nameToRPC maps from a specified metric name to the corrresponding RPC call
var nameToRPC = map[string]string{
"disk-freespace": "IPFSFreeSpace",
"disk-reposize": "IPFSRepoSize",
@ -31,20 +29,28 @@ var nameToRPC = map[string]string{
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
rpcClient *rpc.Client
metricName string
rpcClient *rpc.Client
}
// NewInformer returns an initialized Informer.
// NewInformer returns an initialized Informer that uses the DefaultMetric
func NewInformer() *Informer {
MetricName = DefaultMetric
return &Informer{}
return &Informer{
metricName: DefaultMetric,
}
}
// NewInformer returns an initialized Informer.
// NewInformerWithMetric returns an initialized Informer with a specified metric
// name, if that name is valid, or nil. A metric name is valid if it is a key in
// the nameToRPC map.
func NewInformerWithMetric(metric string) *Informer {
// assume `metric` has been checked already
MetricName = metric
return &Informer{}
// check whether specified metric is supported
if _, valid := nameToRPC[metric]; valid {
return &Informer{
metricName: metric,
}
}
return nil
}
// SetClient provides us with an rpc.Client which allows
@ -62,7 +68,7 @@ func (disk *Informer) Shutdown() error {
// Name returns the name of this informer.
func (disk *Informer) Name() string {
return MetricName
return disk.metricName
}
func (disk *Informer) GetMetric() api.Metric {
@ -76,7 +82,7 @@ func (disk *Informer) GetMetric() api.Metric {
valid := true
err := disk.rpcClient.Call("",
"Cluster",
nameToRPC[MetricName],
nameToRPC[disk.metricName],
struct{}{},
&metric)
if err != nil {
@ -85,7 +91,7 @@ func (disk *Informer) GetMetric() api.Metric {
}
m := api.Metric{
Name: MetricName,
Name: disk.metricName,
Value: fmt.Sprintf("%d", metric),
Valid: valid,
}

View File

@ -49,6 +49,12 @@ func (mock *badRPCService) IPFSRepoSize(in struct{}, out *int) error {
return errors.New("fake error")
}
func (mock *badRPCService) IPFSFreeSpace(in struct{}, out *int) error {
*out = 2
mock.nthCall++
return errors.New("fake error")
}
func Test(t *testing.T) {
inf := NewInformer()
defer inf.Shutdown()
@ -64,6 +70,44 @@ func Test(t *testing.T) {
if !m.Valid {
t.Error("metric should be valid")
}
}
func TestFreeSpace(t *testing.T) {
inf := NewInformerWithMetric("disk-freespace")
if inf == nil {
t.Error("informer not initialized properly")
}
defer inf.Shutdown()
m := inf.GetMetric()
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric()
if !m.Valid {
t.Error("metric should be valid")
}
// The mock client reports 100KB and 2 pins of 1 KB
if m.Value != "98000" {
t.Error("bad metric value")
}
}
func TestRepoSize(t *testing.T) {
inf := NewInformerWithMetric("disk-reposize")
if inf == nil {
t.Error("informer not initialized properly")
}
defer inf.Shutdown()
m := inf.GetMetric()
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric()
if !m.Valid {
t.Error("metric should be valid")
}
// The mock client reports 100KB and 2 pins of 1 KB
if m.Value != "2000" {
t.Error("bad metric value")

View File

@ -19,6 +19,7 @@ import (
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/api/restapi"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin"
@ -331,6 +332,7 @@ var facilities = []string{
"consensus",
"pintracker",
"ascendalloc",
"descendalloc",
"diskinfo",
}
@ -342,10 +344,10 @@ func setupLogging(lvl string) {
func setupAllocation(strategy string) (ipfscluster.Informer, ipfscluster.PinAllocator) {
switch strategy {
case "disk":
case "disk", "disk-freespace":
informer := disk.NewInformer()
return informer, ascendalloc.NewAllocator()
case "disk-freespace", "disk-reposize":
return informer, descendalloc.NewAllocator()
case "disk-reposize":
informer := disk.NewInformerWithMetric(strategy)
return informer, ascendalloc.NewAllocator()
case "numpin", "pincount":

View File

@ -268,3 +268,9 @@ func (mock *mockService) IPFSRepoSize(in struct{}, out *int) error {
*out = 2000
return nil
}
func (mock *mockService) IPFSFreeSpace(in struct{}, out *int) error {
// RepoSize is 2KB, StorageMax is 100KB
*out = 98000
return nil
}