Balanced allocator: weight-based ordering of partitions

This fixes the issue about partitions not being picked based
on the amount of freespace available in them.

It additionally removes the metrics registry and carries information directly
in the metric.

Metrics have two additional fields: Weight and Partitionable.

Informers have been updated to make use of these fields. Partitions have
weights that equals to the weight of the metrics under them.

Older cluster versions will not set these fields. Partitionable is false by
default and weight has a GetWeight() function to convert value->weight when
unset. This provides backwards compatibility for the freespace metric.
This commit is contained in:
Hector Sanjuan 2021-10-06 14:10:06 +02:00
parent 26e229df94
commit db00d651bf
10 changed files with 241 additions and 431 deletions

View File

@ -2,22 +2,25 @@
// based on multiple metrics, where metrics may be an arbitrary way to
// partition a set of peers.
//
// For example, allocating by ["tag:region", "disk"] will
// first order candidate peers by tag metric, and then by "disk" metric.
// The final list will pick up allocations from each tag metric group.
// based on the given order of metrics.
// For example, allocating by ["tag:region", "disk"] the resulting peer
// candidate order will balanced between regions and ordered by the value of
// the weight of the disk metric.
package balanced
import (
"context"
"fmt"
"sort"
"github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
api "github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var logger = logging.Logger("allocator")
// Allocator is an allocator that partitions metrics and orders
// the final list of allocation by selecting for each partition.
type Allocator struct {
@ -57,6 +60,123 @@ type partitionedMetric struct {
partitions []*partition // they are in order of their values
}
type partition struct {
value string
weight int64
peers map[peer.ID]bool // the bool tracks whether the peer has been picked already out of the partition when doing the final sort.
sub *partitionedMetric // all peers in sub-partitions will have the same value for this metric
}
// Returns a partitionedMetric which has partitions and subpartitions based
// on the metrics and values given by the "by" slice. The partitions
// are ordered based on the cumulative weight.
func partitionMetrics(set api.MetricsSet, by []string) *partitionedMetric {
rootMetric := by[0]
pnedMetric := &partitionedMetric{
metricName: rootMetric,
partitions: partitionValues(set[rootMetric]),
}
// For sorting based on weight (more to less)
lessF := func(i, j int) bool {
wi := pnedMetric.partitions[i].weight
wj := pnedMetric.partitions[j].weight
// Strict order
if wi == wj {
return pnedMetric.partitions[i].value < pnedMetric.partitions[j].value
}
// Descending!
return wj < wi
}
if len(by) == 1 { // we are done
sort.Slice(pnedMetric.partitions, lessF)
return pnedMetric
}
// process sub-partitions
for _, partition := range pnedMetric.partitions {
filteredSet := make(api.MetricsSet)
for k, v := range set {
if k == rootMetric { // not needed anymore
continue
}
for _, m := range v {
// only leave metrics for peers in current partition
if _, ok := partition.peers[m.Peer]; ok {
filteredSet[k] = append(filteredSet[k], m)
}
}
}
partition.sub = partitionMetrics(filteredSet, by[1:])
// Add the weight of our subpartitions
for _, subp := range partition.sub.partitions {
partition.weight += subp.weight
}
}
sort.Slice(pnedMetric.partitions, lessF)
return pnedMetric
}
func partitionValues(metrics []*api.Metric) []*partition {
partitions := []*partition{}
if len(metrics) <= 0 {
return partitions
}
// We group peers with the same value in the same partition.
partitionsByValue := make(map[string]*partition)
for _, m := range metrics {
// Sometimes two metrics have the same value / weight, but we
// still want to put them in different partitions. Otherwise
// their weights get added and they form a bucket and
// therefore not they are not selected in order: 3 peers with
// freespace=100 and one peer with freespace=200 would result
// in one of the peers with freespace 100 being chosen first
// because the partition's weight is 300.
//
// We are going to call these metrics (like free-space),
// non-partitionable metrics. This is going to be the default
// (for backwards compat reasons).
//
// The informers must set the Partitionable field accordingly
// when two metrics with the same value must be grouped in the
// same partition.
if !m.Partitionable {
partitions = append(partitions, &partition{
value: m.Value,
weight: m.GetWeight(),
peers: map[peer.ID]bool{
m.Peer: false,
},
})
continue
}
// Any other case, we partition by value.
if p, ok := partitionsByValue[m.Value]; ok {
p.peers[m.Peer] = false
p.weight += m.GetWeight()
} else {
partitionsByValue[m.Value] = &partition{
value: m.Value,
weight: m.GetWeight(),
peers: map[peer.ID]bool{
m.Peer: false,
},
}
}
}
for _, p := range partitionsByValue {
partitions = append(partitions, p)
}
return partitions
}
// Returns a list of peers sorted by never choosing twice from the same
// partition if there is some other partition to choose from.
func (pnedm *partitionedMetric) sortedPeers() []peer.ID {
@ -118,78 +238,6 @@ func (pnedm *partitionedMetric) chooseNext() peer.ID {
return peer
}
type partition struct {
value string
peers map[peer.ID]bool // the bool tracks whether the peer has been picked already out of the partition when doing the final sort.
sub *partitionedMetric // all peers in sub-partitions will have the same value for this metric
}
func partitionMetrics(sortedSet api.MetricsSet, by []string) *partitionedMetric {
rootMetric := by[0]
informer := informers[rootMetric]
pnedMetric := &partitionedMetric{
metricName: rootMetric,
partitions: partitionValues(sortedSet[rootMetric], informer),
}
if len(by) == 1 { // we are done
return pnedMetric
}
// process sub-partitions
for _, partition := range pnedMetric.partitions {
filteredSet := make(api.MetricsSet)
for k, v := range sortedSet {
if k == rootMetric { // not needed anymore
continue
}
for _, m := range v {
// only leave metrics for peers in current partition
if _, ok := partition.peers[m.Peer]; ok {
filteredSet[k] = append(filteredSet[k], m)
}
}
}
partition.sub = partitionMetrics(filteredSet, by[1:])
}
return pnedMetric
}
func partitionValues(sortedMetrics []*api.Metric, inf informer) []*partition {
partitions := []*partition{}
if len(sortedMetrics) <= 0 {
return partitions
}
// For not partitionable metrics we create one partition
// per value, even if two values are the same.
groupable := inf.partitionable
curPartition := &partition{
value: sortedMetrics[0].Value,
peers: map[peer.ID]bool{
sortedMetrics[0].Peer: false,
},
}
partitions = append(partitions, curPartition)
for _, m := range sortedMetrics[1:] {
if groupable && m.Value == curPartition.value {
curPartition.peers[m.Peer] = false
} else {
curPartition = &partition{
value: m.Value,
peers: map[peer.ID]bool{
m.Peer: false,
},
}
partitions = append(partitions, curPartition)
}
}
return partitions
}
// Allocate produces a sorted list of cluster peer IDs based on different
// metrics provided for those peer IDs.
// It works as follows:
@ -210,20 +258,6 @@ func (a *Allocator) Allocate(
current, candidates, priority api.MetricsSet,
) ([]peer.ID, error) {
// sort all metrics. TODO: it should not remove invalids.
for _, arg := range []api.MetricsSet{current, candidates, priority} {
if arg == nil {
continue
}
for _, by := range a.config.AllocateBy {
sorter := informers[by].sorter
if sorter == nil {
return nil, fmt.Errorf("allocate_by contains an unknown metric name: %s", by)
}
arg[by] = sorter(arg[by])
}
}
// For the allocation to work well, there have to be metrics of all
// the types for all the peers. There cannot be a metric of one type
// for a peer that does not appear in the other types.
@ -236,8 +270,7 @@ func (a *Allocator) Allocate(
candidatePartition := partitionMetrics(candidates, a.config.AllocateBy)
priorityPartition := partitionMetrics(priority, a.config.AllocateBy)
//fmt.Println("---")
//printPartition(candidatePartition)
logger.Debugf("Balanced allocator partitions:\n%s\n", printPartition(candidatePartition, 0))
first := priorityPartition.sortedPeers()
last := candidatePartition.sortedPeers()
@ -251,16 +284,24 @@ func (a *Allocator) Metrics() []string {
return a.config.AllocateBy
}
// func printPartition(p *partitionedMetric) {
// fmt.Println(p.metricName)
// for _, p := range p.partitions {
// fmt.Printf("%s - [", p.value)
// for p, u := range p.peers {
// fmt.Printf("%s|%t, ", p, u)
// }
// fmt.Println("]")
// if p.sub != nil {
// printPartition(p.sub)
// }
// }
// }
func printPartition(m *partitionedMetric, ind int) string {
str := ""
indent := func() {
for i := 0; i < ind+2; i++ {
str += " "
}
}
for _, p := range m.partitions {
indent()
str += fmt.Sprintf(" | %s:%s - %d - [", m.metricName, p.value, p.weight)
for p, u := range p.peers {
str += fmt.Sprintf("%s|%t, ", p, u)
}
str += "]\n"
if p.sub != nil {
str += printPartition(p.sub, ind+2)
}
}
return str
}

View File

@ -5,27 +5,24 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
api "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer"
)
func makeMetric(name, value string, peer peer.ID) *api.Metric {
func makeMetric(name, value string, weight int64, peer peer.ID, partitionable bool) *api.Metric {
return &api.Metric{
Name: name,
Value: value,
Peer: peer,
Valid: true,
Expire: time.Now().Add(time.Minute).UnixNano(),
Name: name,
Value: value,
Weight: weight,
Peer: peer,
Valid: true,
Partitionable: partitionable,
Expire: time.Now().Add(time.Minute).UnixNano(),
}
}
func TestAllocate(t *testing.T) {
RegisterInformer("region", sorter.SortText, true)
RegisterInformer("az", sorter.SortText, true)
RegisterInformer("freespace", sorter.SortNumericReverse, false)
alloc, err := New(&Config{
AllocateBy: []string{
"region",
@ -39,63 +36,60 @@ func TestAllocate(t *testing.T) {
candidates := api.MetricsSet{
"abc": []*api.Metric{ // don't want anything in results
makeMetric("abc", "a", test.PeerID1),
makeMetric("abc", "b", test.PeerID2),
makeMetric("abc", "a", 0, test.PeerID1, true),
makeMetric("abc", "b", 0, test.PeerID2, true),
},
"region": []*api.Metric{
makeMetric("region", "a-us", test.PeerID1),
makeMetric("region", "a-us", test.PeerID2),
makeMetric("region", "a-us", 0, test.PeerID1, true),
makeMetric("region", "a-us", 0, test.PeerID2, true),
makeMetric("region", "b-eu", test.PeerID3),
makeMetric("region", "b-eu", test.PeerID4),
makeMetric("region", "b-eu", test.PeerID5),
makeMetric("region", "b-eu", 0, test.PeerID3, true),
makeMetric("region", "b-eu", 0, test.PeerID4, true),
makeMetric("region", "b-eu", 0, test.PeerID5, true),
makeMetric("region", "c-au", test.PeerID6),
makeMetric("region", "c-au", test.PeerID7),
makeMetric("region", "c-au", test.PeerID8), // I don't want to see this in results
makeMetric("region", "c-au", 0, test.PeerID6, true),
makeMetric("region", "c-au", 0, test.PeerID7, true),
makeMetric("region", "c-au", 0, test.PeerID8, true), // I don't want to see this in results
},
"az": []*api.Metric{
makeMetric("az", "us1", test.PeerID1),
makeMetric("az", "us2", test.PeerID2),
makeMetric("az", "us1", 0, test.PeerID1, true),
makeMetric("az", "us2", 0, test.PeerID2, true),
makeMetric("az", "eu1", test.PeerID3),
makeMetric("az", "eu1", test.PeerID4),
makeMetric("az", "eu2", test.PeerID5),
makeMetric("az", "eu1", 0, test.PeerID3, true),
makeMetric("az", "eu1", 0, test.PeerID4, true),
makeMetric("az", "eu2", 0, test.PeerID5, true),
makeMetric("az", "au1", test.PeerID6),
makeMetric("az", "au1", test.PeerID7),
makeMetric("az", "au1", 0, test.PeerID6, true),
makeMetric("az", "au1", 0, test.PeerID7, true),
},
"freespace": []*api.Metric{
makeMetric("freespace", "100", test.PeerID1),
makeMetric("freespace", "500", test.PeerID2),
makeMetric("freespace", "100", 100, test.PeerID1, false),
makeMetric("freespace", "500", 500, test.PeerID2, false),
makeMetric("freespace", "200", test.PeerID3),
makeMetric("freespace", "400", test.PeerID4),
makeMetric("freespace", "10", test.PeerID5),
makeMetric("freespace", "200", 0, test.PeerID3, false), // weight to 0 to test GetWeight() compat
makeMetric("freespace", "400", 0, test.PeerID4, false), // weight to 0 to test GetWeight() compat
makeMetric("freespace", "10", 10, test.PeerID5, false),
makeMetric("freespace", "50", test.PeerID6),
makeMetric("freespace", "600", test.PeerID7),
makeMetric("freespace", "50", 50, test.PeerID6, false),
makeMetric("freespace", "600", 600, test.PeerID7, false),
makeMetric("freespace", "10000", test.PeerID8),
makeMetric("freespace", "10000", 10000, test.PeerID8, false),
},
}
// Regions weights: a-us (pids 1,2): 600. b-eu (pids 3,4,5): 610. c-au (pids 6,7): 650
// Az weights: us1: 100. us2: 500. eu1: 600. eu2: 10. au1: 650
// Based on the algorithm it should choose:
//
// - For the region us, az us1, it should select the peer with most
// freespace: ID2
// - Then switch to next region: ID4
// - And so on:
// - us-us1-100 ID1 - only peer in az
// - eu-eu1-400 ID4 - over ID3
// - au-au1-600 ID7 - over ID6
// - us-us2-500 ID2 - only peer in az
// - eu-eu2-10 ID5 - only peer in az
// - au-au1-50 ID6 - id7 already used
// - // no more in us
// - eu-eu1-ID3 - ID4 already used
// - // no more peers in au
// - // no more peers
// - c-au (most-weight)->au1->pid7
// - b-eu->eu1->pid4
// - a-us->us2->pid2
// - <repeat regions>
// - c-au->au1 (nowhere else to choose)->pid6 (region exausted)
// - b-eu->eu2 (already had in eu1)->pid5
// - a-us->us1 (already had in us2)->pid1
// - <repeat regions>
// - b-eu->eu1->pid3 (only peer left)
peers, err := alloc.Allocate(context.Background(),
test.Cid1,
@ -115,7 +109,7 @@ func TestAllocate(t *testing.T) {
t.Logf("%d - %s", i, p)
switch i {
case 0:
if p != test.PeerID1 {
if p != test.PeerID7 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 1:
@ -123,11 +117,11 @@ func TestAllocate(t *testing.T) {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 2:
if p != test.PeerID7 {
if p != test.PeerID2 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 3:
if p != test.PeerID2 {
if p != test.PeerID6 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 4:
@ -135,7 +129,7 @@ func TestAllocate(t *testing.T) {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 5:
if p != test.PeerID6 {
if p != test.PeerID1 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 6:

View File

@ -1,26 +0,0 @@
package balanced
import "github.com/ipfs/ipfs-cluster/api"
type informer struct {
sorter func([]*api.Metric) []*api.Metric
partitionable bool
}
var informers map[string]informer
// Registers an informer with this allocator so that we know how to
// sort and use metrics coming from it.
func RegisterInformer(
name string,
sorter func([]*api.Metric) []*api.Metric,
partitionable bool,
) {
if informers == nil {
informers = make(map[string]informer)
}
informers[name] = informer{
sorter: sorter,
partitionable: partitionable,
}
}

View File

@ -1,137 +0,0 @@
// Package sorter is a utility package used by the allocator
// implementations. This package provides the sort function for metrics of
// different value types.
package sorter
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
)
// SortNumeric sorts a list of metrics of Uint values from smallest to
// largest. Invalid or non-numeric metrics are discarded.
func SortNumeric(metrics []*api.Metric) []*api.Metric {
return sortNumeric(metrics, false)
}
// SortNumericReverse sorts a list of metrics of Uint values from largest to
// smallest. Invalid or non-numeric metrics are discarded.
func SortNumericReverse(metrics []*api.Metric) []*api.Metric {
return sortNumeric(metrics, true)
}
func sortNumeric(metrics []*api.Metric, reverse bool) []*api.Metric {
filteredMetrics := make([]*api.Metric, 0, len(metrics))
values := make([]uint64, 0, len(metrics))
// Parse and discard
for _, m := range metrics {
if m.Discard() {
continue
}
val, err := strconv.ParseUint(m.Value, 10, 64)
if err != nil {
continue
}
filteredMetrics = append(filteredMetrics, m)
values = append(values, val)
}
sorter := &numericSorter{
values: values,
metrics: filteredMetrics,
reverse: reverse,
}
sort.Sort(sorter)
return sorter.metrics
}
// SortText sorts a list of metrics of string values.
// Invalid metrics are discarded.
func SortText(metrics []*api.Metric) []*api.Metric {
filteredMetrics := make([]*api.Metric, 0, len(metrics))
// Parse and discard
for _, m := range metrics {
if m.Discard() {
continue
}
filteredMetrics = append(filteredMetrics, m)
}
sorter := &textSorter{
metrics: filteredMetrics,
reverse: false,
}
sort.Sort(sorter)
return sorter.metrics
}
// numericSorter implements the sort.Sort interface
type numericSorter struct {
values []uint64
metrics []*api.Metric
reverse bool
}
func (s numericSorter) Len() int {
return len(s.values)
}
func (s numericSorter) Swap(i, j int) {
tempVal := s.values[i]
s.values[i] = s.values[j]
s.values[j] = tempVal
tempMetric := s.metrics[i]
s.metrics[i] = s.metrics[j]
s.metrics[j] = tempMetric
}
// We make it a strict order by ordering by peer ID.
func (s numericSorter) Less(i, j int) bool {
if s.values[i] == s.values[j] {
if s.reverse {
return string(s.metrics[i].Peer) > string(s.metrics[j].Peer)
}
return string(s.metrics[i].Peer) < string(s.metrics[j].Peer)
}
if s.reverse {
return s.values[i] > s.values[j]
}
return s.values[i] < s.values[j]
}
// textSorter implements the sort.Sort interface
type textSorter struct {
metrics []*api.Metric
reverse bool
}
func (s textSorter) Len() int {
return len(s.metrics)
}
func (s textSorter) Swap(i, j int) {
tempMetric := s.metrics[i]
s.metrics[i] = s.metrics[j]
s.metrics[j] = tempMetric
}
// We make it a strict order by ordering by peer ID.
func (s textSorter) Less(i, j int) bool {
if s.metrics[i].Value == s.metrics[j].Value {
if s.reverse {
return string(s.metrics[i].Peer) > string(s.metrics[j].Peer)
}
return string(s.metrics[i].Peer) < string(s.metrics[j].Peer)
}
if s.reverse {
return s.metrics[i].Value > s.metrics[j].Value
}
return s.metrics[i].Value < s.metrics[j].Value
}

View File

@ -1,70 +0,0 @@
package sorter
import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
)
func TestSortNumeric(t *testing.T) {
metrics := []*api.Metric{
&api.Metric{
Name: "num",
Value: "5",
Valid: true,
},
&api.Metric{
Name: "num",
Value: "0",
Valid: true,
},
&api.Metric{
Name: "num",
Value: "-1",
Valid: true,
},
&api.Metric{
Name: "num3",
Value: "abc",
Valid: true,
},
&api.Metric{
Name: "num2",
Value: "10",
Valid: true,
},
&api.Metric{
Name: "num2",
Value: "4",
Valid: true,
},
}
for _, m := range metrics {
m.SetTTL(time.Minute)
}
metrics[4].Expire = 0 // manually expire
sorted := SortNumeric(metrics)
if len(sorted) != 3 {
t.Fatal("sorter did not remove invalid metrics:")
}
if sorted[0].Value != "0" ||
sorted[1].Value != "4" ||
sorted[2].Value != "5" {
t.Error("not sorted properly")
}
sortedRev := SortNumericReverse(metrics)
if len(sortedRev) != 3 {
t.Fatal("sorted did not remove invalid metrics")
}
if sortedRev[0].Value != "5" ||
sortedRev[1].Value != "4" ||
sortedRev[2].Value != "0" {
t.Error("not sorted properly")
}
}

View File

@ -1109,12 +1109,14 @@ type MetricsSet map[string][]*Metric
// The ReceivedAt value is a timestamp representing when a peer has received
// the metric value.
type Metric struct {
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
Value string `json:"value" codec:"v,omitempty"`
Expire int64 `json:"expire" codec:"e,omitempty"`
Valid bool `json:"valid" codec:"d,omitempty"`
ReceivedAt int64 `json:"received_at" codec:"t,omitempty"` // ReceivedAt contains a UnixNano timestamp
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
Value string `json:"value" codec:"v,omitempty"`
Expire int64 `json:"expire" codec:"e,omitempty"`
Valid bool `json:"valid" codec:"d,omitempty"`
Weight int64 `json:"weight" codec:"w,omitempty"`
Partitionable bool `json:"partitionable" codec:"o,omitempty"`
ReceivedAt int64 `json:"received_at" codec:"t,omitempty"` // ReceivedAt contains a UnixNano timestamp
}
// SetTTL sets Metric to expire after the given time.Duration
@ -1144,6 +1146,21 @@ func (m *Metric) Discard() bool {
return !m.Valid || m.Expired()
}
// GetWeight returns the weight of the metric. When it is 0,
// it tries to parse the Value and use it as weight.
// This is for compatiblity.
func (m *Metric) GetWeight() int64 {
if m.Weight != 0 {
return m.Weight
}
val, err := strconv.ParseInt(m.Value, 10, 64)
if err != nil {
return 0
}
return val
}
// MetricSlice is a sortable Metric array.
type MetricSlice []*Metric

View File

@ -7,8 +7,6 @@ import (
"fmt"
"sync"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
logging "github.com/ipfs/go-log/v2"
@ -40,11 +38,6 @@ func (t MetricType) String() string {
var logger = logging.Logger("diskinfo")
func init() {
balanced.RegisterInformer(MetricFreeSpace.String(), sorter.SortNumericReverse, false)
balanced.RegisterInformer(MetricRepoSize.String(), sorter.SortNumeric, false)
}
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
@ -141,9 +134,11 @@ func (disk *Informer) GetMetrics(ctx context.Context) []*api.Metric {
}
m := &api.Metric{
Name: disk.Name(),
Value: fmt.Sprintf("%d", metric),
Valid: valid,
Name: disk.Name(),
Value: fmt.Sprintf("%d", metric),
Valid: valid,
Weight: int64(metric),
Partitionable: false,
}
m.SetTTL(disk.config.MetricTTL)

View File

@ -6,8 +6,6 @@ import (
"context"
"fmt"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/libp2p/go-libp2p-gorpc"
@ -18,10 +16,6 @@ import (
// MetricName specifies the name of our metric
var MetricName = "numpin"
func init() {
balanced.RegisterInformer(MetricName, sorter.SortNumeric, false)
}
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces
type Informer struct {
@ -91,9 +85,10 @@ func (npi *Informer) GetMetrics(ctx context.Context) []*api.Metric {
valid := err == nil
m := &api.Metric{
Name: MetricName,
Value: fmt.Sprintf("%d", len(pinMap)),
Valid: valid,
Name: MetricName,
Value: fmt.Sprintf("%d", len(pinMap)),
Valid: valid,
Partitionable: false,
}
m.SetTTL(npi.config.MetricTTL)

View File

@ -6,8 +6,6 @@ import (
"context"
"sync"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
logging "github.com/ipfs/go-log/v2"
@ -35,10 +33,6 @@ func New(cfg *Config) (*Informer, error) {
return nil, err
}
for k := range cfg.Tags {
balanced.RegisterInformer("tag:"+k, sorter.SortText, true)
}
return &Informer{
config: cfg,
}, nil
@ -72,12 +66,17 @@ func (tags *Informer) Shutdown(ctx context.Context) error {
// The metric name is set as "tags:<tag_name>". When no tags are defined,
// a single invalid metric is returned.
func (tags *Informer) GetMetrics(ctx context.Context) []*api.Metric {
// Note we could potentially extend the tag:value syntax to include manual weights
// ie: { "region": "us:100", ... }
// This would potentially allow to always give priority to peers of a certain group
if len(tags.config.Tags) == 0 {
logger.Debug("no tags defined in tags informer")
m := &api.Metric{
Name: "tag:none",
Value: "",
Valid: false,
Name: "tag:none",
Value: "",
Valid: false,
Partitionable: true,
}
m.SetTTL(tags.config.MetricTTL)
return []*api.Metric{m}
@ -86,9 +85,10 @@ func (tags *Informer) GetMetrics(ctx context.Context) []*api.Metric {
metrics := make([]*api.Metric, 0, len(tags.config.Tags))
for n, v := range tags.config.Tags {
m := &api.Metric{
Name: "tag:" + n,
Value: v,
Valid: true,
Name: "tag:" + n,
Value: v,
Valid: true,
Partitionable: true,
}
m.SetTTL(tags.config.MetricTTL)
metrics = append(metrics, m)

View File

@ -29,6 +29,7 @@ var LoggingFacilities = map[string]string{
"adder": "INFO",
"optracker": "INFO",
"pstoremgr": "INFO",
"allocator": "INFO",
}
// LoggingFacilitiesExtra provides logging identifiers