Informer impl refactored; SortNumeric added for allocators.

This commit is contained in:
dgrisham 2017-08-22 11:13:53 -06:00
parent 2d2a9da793
commit 407fd9f68a
10 changed files with 110 additions and 93 deletions

View File

@ -5,8 +5,6 @@
package ascendalloc
import (
"sort"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
@ -38,7 +36,5 @@ func (alloc AscendAllocator) Shutdown() error { return nil }
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
sortable := util.NewMetricSorter(candidates, false)
sort.Sort(sortable)
return sortable.Peers, nil
return util.SortNumeric(candidates, false), nil
}

View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
@ -31,25 +30,25 @@ var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
@ -61,13 +60,13 @@ var testCases = []testcase{
{ // filter invalid
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
@ -79,13 +78,13 @@ var testCases = []testcase{
{ // filter bad value
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,

View File

@ -5,8 +5,6 @@
package descendalloc
import (
"sort"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
@ -38,7 +36,5 @@ func (alloc DescendAllocator) Shutdown() error { return nil }
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
sortable := util.NewMetricSorter(candidates, true)
sort.Sort(sortable)
return sortable.Peers, nil
return util.SortNumeric(candidates, true), nil
}

View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
@ -31,25 +30,25 @@ var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
@ -61,13 +60,13 @@ var testCases = []testcase{
{ // filter invalid
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
@ -79,13 +78,13 @@ var testCases = []testcase{
{ // filter bad value
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,

View File

@ -1,6 +1,11 @@
// Package allocator.util is a utility package used by the allocator
// implementations. This package provides the SortNumeric function, which may be
// used by an allocator to sort peers by their metric values (ascending or
// descending).
package util
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
@ -8,16 +13,13 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)
type MetricSorter struct {
Peers []peer.ID
M map[peer.ID]int
Reverse bool
}
func NewMetricSorter(m map[peer.ID]api.Metric, reverse bool) *MetricSorter {
// SortNumeric returns a list of peers sorted by their metric values. If reverse
// is false (true), peers will be sorted from smallest to largest (largest to
// smallest) metric
func SortNumeric(candidates map[peer.ID]api.Metric, reverse bool) []peer.ID {
vMap := make(map[peer.ID]int)
peers := make([]peer.ID, 0, len(m))
for k, v := range m {
peers := make([]peer.ID, 0, len(candidates))
for k, v := range candidates {
if v.Discard() {
continue
}
@ -29,36 +31,44 @@ func NewMetricSorter(m map[peer.ID]api.Metric, reverse bool) *MetricSorter {
vMap[k] = val
}
sorter := &MetricSorter{
M: vMap,
Peers: peers,
Reverse: reverse,
sorter := &metricSorter{
m: vMap,
peers: peers,
reverse: reverse,
}
return sorter
sort.Sort(sorter)
return sorter.peers
}
// metricSorter implements the sort.Sort interface
type metricSorter struct {
peers []peer.ID
m map[peer.ID]int
reverse bool
}
// Len returns the number of metrics
func (s MetricSorter) Len() int {
return len(s.Peers)
func (s metricSorter) Len() int {
return len(s.peers)
}
// Swap swaps the elements in positions i and j
func (s MetricSorter) Swap(i, j int) {
temp := s.Peers[i]
s.Peers[i] = s.Peers[j]
s.Peers[j] = temp
// Swap Swaps the elements in positions i and j
func (s metricSorter) Swap(i, j int) {
temp := s.peers[i]
s.peers[i] = s.peers[j]
s.peers[j] = temp
}
// Less reports if the element in position i is less than the element in j
// Less reports if the element in position i is Less than the element in j
// (important to override this)
func (s MetricSorter) Less(i, j int) bool {
peeri := s.Peers[i]
peerj := s.Peers[j]
func (s metricSorter) Less(i, j int) bool {
peeri := s.peers[i]
peerj := s.peers[j]
x := s.M[peeri]
y := s.M[peerj]
x := s.m[peeri]
y := s.m[peerj]
if s.Reverse {
if s.reverse {
return x > y
}
return x < y

View File

@ -79,7 +79,7 @@ func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, e
func (ipfs *mockConnector) ConnectSwarms() error { return nil }
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
func (ipfs *mockConnector) FreeSpace() (int, error) { return 0, nil }
func (ipfs *mockConnector) FreeSpace() (int, error) { return 100, nil }
func (ipfs *mockConnector) RepoSize() (int, error) { return 0, nil }
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {

View File

@ -1,6 +1,6 @@
// Package disk implements an ipfs-cluster informer which uses a metric (e.g.
// RepoSize or FreeSpace of the IPFS daemon datastore) and returns it as an
// api.Metric. The supported metrics are listed as the keys in the nameToRPC
// api.Metric. The supported metrics are listed as the keys in the metricToRPC
// map below.
package disk
@ -13,46 +13,63 @@ import (
"github.com/ipfs/ipfs-cluster/api"
)
const DefaultMetric = "disk-freespace"
type MetricType int
const (
MetricFreeSpace = iota
MetricRepoSize
)
const DefaultMetric = MetricFreeSpace
var logger = logging.Logger("diskinfo")
// MetricTTL specifies how long our reported metric is valid in seconds.
var MetricTTL = 30
// nameToRPC maps from a specified metric name to the corrresponding RPC call
var nameToRPC = map[string]string{
"disk-freespace": "IPFSFreeSpace",
"disk-reposize": "IPFSRepoSize",
// metricToRPC maps from a specified metric name to the corrresponding RPC call
var metricToRPC = map[MetricType]string{
MetricFreeSpace: "IPFSFreeSpace",
MetricRepoSize: "IPFSRepoSize",
}
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
metricName string
rpcClient *rpc.Client
Type MetricType
name string
rpcClient *rpc.Client
rpcName string
}
// NewInformer returns an initialized Informer that uses the DefaultMetric
func NewInformer() *Informer {
// NewInformer returns an initialized Informer that uses the DefaultMetric. The
// name argument is meant as a user-facing identifier for the Informer and can
// be anything.
func NewInformer(name string) *Informer {
return &Informer{
metricName: DefaultMetric,
name: name,
rpcName: metricToRPC[DefaultMetric],
}
}
// NewInformerWithMetric returns an initialized Informer with a specified metric
// name, if that name is valid, or nil. A metric name is valid if it is a key in
// the nameToRPC map.
func NewInformerWithMetric(metric string) *Informer {
// NewInformerWithMetric returns an Informer that uses the input MetricType. The
// name argument has the same purpose as in NewInformer.
func NewInformerWithMetric(metric MetricType, name string) *Informer {
// check whether specified metric is supported
if _, valid := nameToRPC[metric]; valid {
if rpc, valid := metricToRPC[metric]; valid {
return &Informer{
metricName: metric,
name: name,
rpcName: rpc,
}
}
return nil
}
// Name returns the user-facing name of this informer.
func (disk *Informer) Name() string {
return disk.name
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (disk *Informer) SetClient(c *rpc.Client) {
@ -66,11 +83,6 @@ func (disk *Informer) Shutdown() error {
return nil
}
// Name returns the name of this informer.
func (disk *Informer) Name() string {
return disk.metricName
}
func (disk *Informer) GetMetric() api.Metric {
if disk.rpcClient == nil {
return api.Metric{
@ -82,7 +94,7 @@ func (disk *Informer) GetMetric() api.Metric {
valid := true
err := disk.rpcClient.Call("",
"Cluster",
nameToRPC[disk.metricName],
disk.rpcName,
struct{}{},
&metric)
if err != nil {
@ -91,7 +103,7 @@ func (disk *Informer) GetMetric() api.Metric {
}
m := api.Metric{
Name: disk.metricName,
Name: disk.name,
Value: fmt.Sprintf("%d", metric),
Valid: valid,
}

View File

@ -56,9 +56,9 @@ func (mock *badRPCService) IPFSFreeSpace(in struct{}, out *int) error {
}
func Test(t *testing.T) {
inf := NewInformer()
inf := NewInformer("name")
defer inf.Shutdown()
if inf.Name() != DefaultMetric {
if inf.Type != DefaultMetric {
t.Error("careful when changing the name of an informer")
}
m := inf.GetMetric()
@ -73,7 +73,7 @@ func Test(t *testing.T) {
}
func TestFreeSpace(t *testing.T) {
inf := NewInformerWithMetric("disk-freespace")
inf := NewInformerWithMetric(MetricFreeSpace, "disk-freespace")
if inf == nil {
t.Error("informer not initialized properly")
}
@ -94,7 +94,7 @@ func TestFreeSpace(t *testing.T) {
}
func TestRepoSize(t *testing.T) {
inf := NewInformerWithMetric("disk-reposize")
inf := NewInformerWithMetric(MetricRepoSize, "disk-reposize")
if inf == nil {
t.Error("informer not initialized properly")
}
@ -115,7 +115,7 @@ func TestRepoSize(t *testing.T) {
}
func TestWithErrors(t *testing.T) {
inf := NewInformer()
inf := NewInformer("name")
defer inf.Shutdown()
inf.SetClient(badRPCClient(t))
m := inf.GetMetric()

View File

@ -342,16 +342,21 @@ func setupLogging(lvl string) {
}
}
func setupAllocation(strategy string) (ipfscluster.Informer, ipfscluster.PinAllocator) {
switch strategy {
case "disk", "disk-freespace":
informer := disk.NewInformer()
func setupAllocation(name string) (ipfscluster.Informer, ipfscluster.PinAllocator) {
switch name {
case "disk":
// set strategy to default for disk, continue through cases
name = "disk-freespace"
fallthrough
case "disk-freespace":
informer := disk.NewInformerWithMetric(disk.MetricFreeSpace, name)
return informer, descendalloc.NewAllocator()
case "disk-reposize":
informer := disk.NewInformerWithMetric(strategy)
informer := disk.NewInformerWithMetric(disk.MetricRepoSize, name)
return informer, ascendalloc.NewAllocator()
case "numpin", "pincount":
return numpin.NewInformer(), ascendalloc.NewAllocator()
informer := numpin.NewInformer()
return informer, ascendalloc.NewAllocator()
default:
err := errors.New("unknown allocation strategy")
checkErr("", err)

View File

@ -96,7 +96,7 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, API,
mon := basic.NewStdPeerMonitor(cfg.MonitoringIntervalSeconds)
alloc := ascendalloc.NewAllocator()
disk.MetricTTL = 1 // second
inf := disk.NewInformer()
inf := disk.NewInformer("name")
return cfg, api, ipfs, state, tracker, mon, alloc, inf, mock
}