allocator: rework the whole allocator system

The new "metrics" allocator is about to partition metrics and distribe
allocations among the partitions.

For example: given a region, an availability zone and free space on disk, the
allocator would be able to choose allocations by distributing among regions
and availability zones as much as possible, and for those peers in the same
region/az, selecting those with most free space first.

This requires a major overhaul of the allocator component.
This commit is contained in:
Hector Sanjuan 2021-09-13 11:23:30 +02:00
parent 4060f4196b
commit b6a46cd8a4
23 changed files with 977 additions and 480 deletions

View File

@ -47,7 +47,7 @@ import (
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pin, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pin, rplMin, rplMax int, blacklist []peer.ID, priorityList []peer.ID) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
defer span.End()
@ -64,37 +64,32 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pi
if currentPin != nil {
currentAllocs = currentPin.Allocations
}
metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name())
currentMetrics := make(map[peer.ID]*api.Metric)
candidatesMetrics := make(map[peer.ID]*api.Metric)
priorityMetrics := make(map[peer.ID]*api.Metric)
// Divide metrics between current and candidates.
// All metrics in metrics are valid (at least the
// moment they were compiled by the monitor)
for _, m := range metrics {
switch {
case containsPeer(blacklist, m.Peer):
// discard blacklisted peers
continue
case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m
case containsPeer(prioritylist, m.Peer):
priorityMetrics[m.Peer] = m
default:
candidatesMetrics[m.Peer] = m
}
mSet := make(api.MetricsSet)
for _, informer := range c.informers {
metricType := informer.Name()
mSet[metricType] = c.monitor.LatestMetrics(ctx, metricType)
}
curSet, curPeers, candSet, candPeers, prioSet, prioPeers := filterMetrics(
mSet,
len(c.informers),
currentAllocs,
priorityList,
blacklist,
)
newAllocs, err := c.obtainAllocations(
ctx,
hash,
rplMin,
rplMax,
currentMetrics,
candidatesMetrics,
priorityMetrics,
curSet,
candSet,
prioSet,
curPeers,
candPeers,
prioPeers,
)
if err != nil {
return newAllocs, err
@ -105,6 +100,67 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pi
return newAllocs, nil
}
// Given metrics from all informers, split them into 3 MetricsSet:
// - Those corresponding to currently allocated peers
// - Those corresponding to priority allocations
// - Those corresponding to "candidate" allocations
// And return also an slice of the peers in those groups.
//
// For a metric/peer to be included in a group, it is necessary that it has
// metrics for all informers.
func filterMetrics(mSet api.MetricsSet, numInformers int, currentAllocs, priorityList, blacklist []peer.ID) (
curSet api.MetricsSet,
curPeers []peer.ID,
candSet api.MetricsSet,
candPeers []peer.ID,
prioSet api.MetricsSet,
prioPeers []peer.ID,
) {
curPeersMap := make(map[peer.ID][]*api.Metric)
candPeersMap := make(map[peer.ID][]*api.Metric)
prioPeersMap := make(map[peer.ID][]*api.Metric)
// Divide the metric by current/candidate/prio and by peer
for _, metrics := range mSet {
for _, m := range metrics {
switch {
case containsPeer(blacklist, m.Peer):
// discard blacklisted peers
continue
case containsPeer(currentAllocs, m.Peer):
curPeersMap[m.Peer] = append(curPeersMap[m.Peer], m)
case containsPeer(priorityList, m.Peer):
prioPeersMap[m.Peer] = append(prioPeersMap[m.Peer], m)
default:
candPeersMap[m.Peer] = append(candPeersMap[m.Peer], m)
}
}
}
fillMetricsSet := func(peersMap map[peer.ID][]*api.Metric) (api.MetricsSet, []peer.ID) {
mSet := make(api.MetricsSet)
peers := make([]peer.ID, 0, len(peersMap))
// Put the metrics in their sets if peers have metrics for all informers
// Record peers
for p, metrics := range peersMap {
if len(metrics) == numInformers {
for _, m := range metrics {
mSet[m.Name] = append(mSet[m.Name], m)
}
peers = append(peers, p)
} // otherwise this peer will be ignored.
}
return mSet, peers
}
curSet, curPeers = fillMetricsSet(curPeersMap)
candSet, candPeers = fillMetricsSet(candPeersMap)
prioSet, prioPeers = fillMetricsSet(prioPeersMap)
return
}
// allocationError logs an allocation error
func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID) error {
logger.Errorf("Not enough candidates to allocate %s:", hash)
@ -126,26 +182,20 @@ func (c *Cluster) obtainAllocations(
ctx context.Context,
hash cid.Cid,
rplMin, rplMax int,
currentValidMetrics map[peer.ID]*api.Metric,
candidatesMetrics map[peer.ID]*api.Metric,
priorityMetrics map[peer.ID]*api.Metric,
currentValidMetrics, candidatesMetrics, priorityMetrics api.MetricsSet,
currentPeers, candidatePeers, priorityPeers []peer.ID,
) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
defer span.End()
// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
for k := range currentValidMetrics {
validAllocations = append(validAllocations, k)
}
nCurrentValid := len(validAllocations)
nCandidatesValid := len(candidatesMetrics) + len(priorityMetrics)
nCurrentValid := len(currentPeers)
nCandidatesValid := len(candidatePeers) + len(priorityPeers)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid)
logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid)
logger.Debugf("obtainAllocations: current: %d", nCurrentValid)
logger.Debugf("obtainAllocations: candidates: %d", nCandidatesValid)
logger.Debugf("obtainAllocations: priority: %d", len(priorityPeers))
logger.Debugf("obtainAllocations: Needed: %d", needed)
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
@ -155,7 +205,7 @@ func (c *Cluster) obtainAllocations(
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
return validAllocations[0 : len(validAllocations)+wanted], nil
return currentPeers[0 : len(currentPeers)+wanted], nil
}
if needed <= 0 { // allocations are above minimal threshold
@ -164,14 +214,7 @@ func (c *Cluster) obtainAllocations(
}
if nCandidatesValid < needed { // not enough candidates
candidatesValid := []peer.ID{}
for k := range priorityMetrics {
candidatesValid = append(candidatesValid, k)
}
for k := range candidatesMetrics {
candidatesValid = append(candidatesValid, k)
}
return nil, allocationError(hash, needed, wanted, candidatesValid)
return nil, allocationError(hash, needed, wanted, append(priorityPeers, candidatePeers...))
}
// We can allocate from this point. Use the allocator to decide
@ -201,5 +244,5 @@ func (c *Cluster) obtainAllocations(
// the final result is the currently valid allocations
// along with the ones provided by the allocator
return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil
return append(currentPeers, finalAllocs[0:allocationsToUse]...), nil
}

View File

@ -1,45 +0,0 @@
// Package ascendalloc implements an ipfscluster.PinAllocator, which returns
// allocations based on sorting the metrics in ascending order. Thus, peers with
// smallest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package ascendalloc
import (
"context"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
// AscendAllocator extends the SimpleAllocator
type AscendAllocator struct{}
// NewAllocator returns an initialized AscendAllocator
func NewAllocator() AscendAllocator {
return AscendAllocator{}
}
// SetClient does nothing in this allocator
func (alloc AscendAllocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc AscendAllocator) Shutdown(_ context.Context) error { return nil }
// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority map[peer.ID]*api.Metric,
) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, false)
last := util.SortNumeric(candidates, false)
return append(first, last...), nil
}

View File

@ -1,117 +0,0 @@
package ascendalloc
import (
"context"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type testcase struct {
candidates map[peer.ID]*api.Metric
current map[peer.ID]*api.Metric
expected []peer.ID
}
var (
peer0 = peer.ID("QmUQ6Nsejt1SuZAu8yL8WgqQZHHAYreLVYYa4VPsLUCed7")
peer1 = peer.ID("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
peer2 = peer.ID("QmPrSBATWGAN56fiiEWEhKX3L1F3mTghEQR7vQwaeo7zHi")
peer3 = peer.ID("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)
var inAMinute = time.Now().Add(time.Minute).UnixNano()
var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: {
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: {
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1, peer3, peer2, peer0},
},
{ // filter invalid
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
}
func Test(t *testing.T) {
ctx := context.Background()
alloc := &AscendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
if len(res) == 0 {
t.Fatal("0 allocations")
}
for i, r := range res {
if e := tc.expected[i]; r != e {
t.Errorf("Expect r[%d]=%s but got %s", i, r, e)
}
}
}
}

View File

@ -1,41 +0,0 @@
// Package descendalloc implements an ipfscluster.PinAllocator returns
// allocations based on sorting the metrics in descending order. Thus, peers
// with largest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package descendalloc
import (
"context"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
// DescendAllocator extends the SimpleAllocator
type DescendAllocator struct{}
// NewAllocator returns an initialized DescendAllocator
func NewAllocator() DescendAllocator {
return DescendAllocator{}
}
// SetClient does nothing in this allocator
func (alloc DescendAllocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc DescendAllocator) Shutdown(_ context.Context) error { return nil }
// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]*api.Metric) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, true)
last := util.SortNumeric(candidates, true)
return append(first, last...), nil
}

View File

@ -1,117 +0,0 @@
package descendalloc
import (
"context"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type testcase struct {
candidates map[peer.ID]*api.Metric
current map[peer.ID]*api.Metric
expected []peer.ID
}
var (
peer0 = peer.ID("QmUQ6Nsejt1SuZAu8yL8WgqQZHHAYreLVYYa4VPsLUCed7")
peer1 = peer.ID("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
peer2 = peer.ID("QmPrSBATWGAN56fiiEWEhKX3L1F3mTghEQR7vQwaeo7zHi")
peer3 = peer.ID("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)
var inAMinute = time.Now().Add(time.Minute).UnixNano()
var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: {
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: {
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1, peer3, peer2, peer0},
},
{ // filter invalid
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
}
func Test(t *testing.T) {
ctx := context.Background()
alloc := &DescendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
if len(res) == 0 {
t.Fatal("0 allocations")
}
for i, r := range res {
if e := tc.expected[len(res)-i-1]; r != e {
t.Errorf("Expect r[%d]=%s but got %s", i, r, e)
}
}
}
}

103
allocator/metrics/config.go Normal file
View File

@ -0,0 +1,103 @@
package metrics
import (
"encoding/json"
"errors"
"github.com/ipfs/ipfs-cluster/config"
"github.com/kelseyhightower/envconfig"
)
const configKey = "metricsalloc"
const envConfigKey = "cluster_metricsalloc"
// These are the default values for a Config.
var (
DefaultAllocateBy = []string{"freespace"}
)
// Config allows to initialize the Allocator.
type Config struct {
config.Saver
AllocateBy []string
}
type jsonConfig struct {
AllocateBy []string `json:"allocate_by"`
}
// ConfigKey returns a human-friendly identifier for this
// Config's type.
func (cfg *Config) ConfigKey() string {
return configKey
}
// Default initializes this Config with sensible values.
func (cfg *Config) Default() error {
cfg.AllocateBy = DefaultAllocateBy
return nil
}
// ApplyEnvVars fills in any Config fields found
// as environment variables.
func (cfg *Config) ApplyEnvVars() error {
jcfg := cfg.toJSONConfig()
err := envconfig.Process(envConfigKey, jcfg)
if err != nil {
return err
}
return cfg.applyJSONConfig(jcfg)
}
// Validate checks that the fields of this configuration have
// sensible values.
func (cfg *Config) Validate() error {
if len(cfg.AllocateBy) <= 0 {
return errors.New("metricalloc.allocate_by is invalid")
}
return nil
}
// LoadJSON parses a raw JSON byte-slice as generated by ToJSON().
func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg)
if err != nil {
return err
}
cfg.Default()
return cfg.applyJSONConfig(jcfg)
}
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
// When unset - use default
if len(jcfg.AllocateBy) > 0 {
cfg.AllocateBy = jcfg.AllocateBy
}
return cfg.Validate()
}
// ToJSON generates a human-friendly JSON representation of this Config.
func (cfg *Config) ToJSON() ([]byte, error) {
jcfg := cfg.toJSONConfig()
return config.DefaultJSONMarshal(jcfg)
}
func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{
AllocateBy: cfg.AllocateBy,
}
}
// ToDisplayJSON returns JSON config as a string.
func (cfg *Config) ToDisplayJSON() ([]byte, error) {
return config.DisplayJSON(cfg.toJSONConfig())
}

View File

@ -0,0 +1,61 @@
package metrics
import (
"os"
"testing"
)
var cfgJSON = []byte(`
{
"allocate_by": ["tag", "disk"]
}
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(cfgJSON)
if err != nil {
t.Fatal(err)
}
}
func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(cfgJSON)
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
}
cfg = &Config{}
err = cfg.LoadJSON(newjson)
if err != nil {
t.Fatal(err)
}
if len(cfg.AllocateBy) != 2 {
t.Error("configuration was lost in serialization/deserialization")
}
}
func TestDefault(t *testing.T) {
cfg := &Config{}
cfg.Default()
if cfg.Validate() != nil {
t.Fatal("error validating")
}
cfg.AllocateBy = nil
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}
func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_METRICSALLOC_ALLOCATEBY", "a,b,c")
cfg := &Config{}
cfg.ApplyEnvVars()
if len(cfg.AllocateBy) != 3 {
t.Fatal("failed to override allocate_by with env var")
}
}

View File

@ -0,0 +1,266 @@
// Package metrics implements an allocator that can sort allocations
// based on multiple metrics, where metrics may be an arbitrary way to
// partition a set of peers.
//
// For example, allocating by [tags, disk] will
// first order candidate peers by tag metric, and then by disk metric.
// The final list will pick up allocations from each tag metric group.
// based on the given order of metrics.
package metrics
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
"go.opencensus.io/trace"
)
// Metrics is an allocator that partitions metrics and orders
// the final least of allocation by selecting for each partition.
type Metrics struct {
config *Config
rpcClient *rpc.Client
}
// New returns an initialized Allocator.
func New(cfg *Config) (*Metrics, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
return &Metrics{
config: cfg,
}, nil
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (m *Metrics) SetClient(c *rpc.Client) {
m.rpcClient = c
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (m *Metrics) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "allocator/metrics/Shutdown")
defer span.End()
m.rpcClient = nil
return nil
}
type partitionedMetric struct {
metricName string
curChoosingIndex int
noMore bool
partitions []*partition // they are in order of their values
}
// Returned a list of peers sorted by never choosing twice from the same
// partition if there is some other partition to choose from.
func (pnedm *partitionedMetric) sortedPeers() []peer.ID {
peers := []peer.ID{}
for {
peer := pnedm.chooseNext()
if peer == "" {
break
}
peers = append(peers, peer)
}
return peers
}
func (pnedm *partitionedMetric) chooseNext() peer.ID {
lenp := len(pnedm.partitions)
if lenp == 0 {
return ""
}
if pnedm.noMore {
return ""
}
var peer peer.ID
curPartition := pnedm.partitions[pnedm.curChoosingIndex]
done := 0
for {
if curPartition.sub != nil {
// Choose something from the sub-partitionedMetric
peer = curPartition.sub.chooseNext()
} else {
// We are a bottom-partition. Choose one of our peers
for pid, used := range curPartition.peers {
if !used {
peer = pid
curPartition.peers[pid] = true // mark as used
break
}
}
}
// look in next partition next time
pnedm.curChoosingIndex = (pnedm.curChoosingIndex + 1) % lenp
curPartition = pnedm.partitions[pnedm.curChoosingIndex]
done++
if peer != "" {
break
}
// no peer and we have looked in as many partitions as we have
if done == lenp {
pnedm.noMore = true
break
}
}
return peer
}
type partition struct {
value string
peers map[peer.ID]bool
sub *partitionedMetric // all peers in sub-partitions will have the same value for this metric
}
func partitionMetrics(sortedSet api.MetricsSet, by []string) *partitionedMetric {
rootMetric := by[0]
informer := informers[rootMetric]
pnedMetric := &partitionedMetric{
metricName: rootMetric,
partitions: partitionValues(sortedSet[rootMetric], informer),
}
if len(by) == 1 { // we are done
return pnedMetric
}
// process sub-partitions
for _, partition := range pnedMetric.partitions {
filteredSet := make(api.MetricsSet)
for k, v := range sortedSet {
if k == rootMetric { // not needed anymore
continue
}
for _, m := range v {
// only leave metrics for peers in current partition
if _, ok := partition.peers[m.Peer]; ok {
filteredSet[k] = append(filteredSet[k], m)
}
}
}
partition.sub = partitionMetrics(filteredSet, by[1:])
}
return pnedMetric
}
func partitionValues(sortedMetrics []*api.Metric, inf informer) []*partition {
partitions := []*partition{}
if len(sortedMetrics) <= 0 {
return partitions
}
// For not partitionable metrics we create one partition
// per value, even if two values are the same.
groupable := inf.partitionable
curPartition := &partition{
value: sortedMetrics[0].Value,
peers: map[peer.ID]bool{
sortedMetrics[0].Peer: false,
},
}
partitions = append(partitions, curPartition)
for _, m := range sortedMetrics[1:] {
if groupable && m.Value == curPartition.value {
curPartition.peers[m.Peer] = false
} else {
curPartition = &partition{
value: m.Value,
peers: map[peer.ID]bool{
m.Peer: false,
},
}
partitions = append(partitions, curPartition)
}
}
return partitions
}
// Allocate produces a sorted list of cluster peer IDs based on different
// metrics provided for those peer IDs.
// It works as follows:
//
// - First, it buckets each peer metrics based on the AllocateBy list. The
// metric name must match the bucket name, otherwise they are put at the end.
// - Second, based on the AllocateBy order, it orders the first bucket and
// groups peers by ordered value.
// - Third, it selects metrics on the second bucket for the most prioritary
// peers of the first bucket and orders their metrics. Then for the peers in
// second position etc.
// - It repeats the process until there is no more buckets to sort.
// - Finally, it returns the first peer of the first
// - Third, based on the AllocateBy order, it select the first metric
func (m *Metrics) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority api.MetricsSet,
) ([]peer.ID, error) {
// sort all metrics. TODO: it should not remove invalids.
for _, arg := range []api.MetricsSet{current, candidates, priority} {
if arg == nil {
continue
}
for _, by := range m.config.AllocateBy {
sorter := informers[by].sorter
if sorter == nil {
return nil, fmt.Errorf("allocate_by contains an unknown metric name: %s", by)
}
arg[by] = sorter(arg[by])
}
}
// For the allocation to work well, there have to be metrics of all
// the types for all the peers. There cannot be a metric of one type
// for a peer that does not appear in the other types.
//
// Removing such occurences is done in allocate.go, before the
// allocator is called.
//
// Otherwise, the sorting might be funny.
candidatePartition := partitionMetrics(candidates, m.config.AllocateBy)
priorityPartition := partitionMetrics(priority, m.config.AllocateBy)
//fmt.Println("---")
//printPartition(candidatePartition)
first := priorityPartition.sortedPeers()
last := candidatePartition.sortedPeers()
return append(first, last...), nil
}
// func printPartition(p *partitionedMetric) {
// fmt.Println(p.metricName)
// for _, p := range p.partitions {
// fmt.Printf("%s - [", p.value)
// for p, u := range p.peers {
// fmt.Printf("%s|%t, ", p, u)
// }
// fmt.Println("]")
// if p.sub != nil {
// printPartition(p.sub)
// }
// }
// }

View File

@ -0,0 +1,149 @@
package metrics
import (
"context"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
api "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer"
)
func makeMetric(name, value string, peer peer.ID) *api.Metric {
return &api.Metric{
Name: name,
Value: value,
Peer: peer,
Valid: true,
Expire: time.Now().Add(time.Minute).UnixNano(),
}
}
func TestAllocate(t *testing.T) {
RegisterInformer("region", sorter.SortText, true)
RegisterInformer("az", sorter.SortText, true)
RegisterInformer("freespace", sorter.SortNumericReverse, false)
alloc, err := New(&Config{
AllocateBy: []string{
"region",
"az",
"freespace",
},
})
if err != nil {
t.Fatal(err)
}
candidates := api.MetricsSet{
"abc": []*api.Metric{ // don't want anything in results
makeMetric("abc", "a", test.PeerID1),
makeMetric("abc", "b", test.PeerID2),
},
"region": []*api.Metric{
makeMetric("region", "a-us", test.PeerID1),
makeMetric("region", "a-us", test.PeerID2),
makeMetric("region", "b-eu", test.PeerID3),
makeMetric("region", "b-eu", test.PeerID4),
makeMetric("region", "b-eu", test.PeerID5),
makeMetric("region", "c-au", test.PeerID6),
makeMetric("region", "c-au", test.PeerID7),
makeMetric("region", "c-au", test.PeerID8), // I don't want to see this in results
},
"az": []*api.Metric{
makeMetric("az", "us1", test.PeerID1),
makeMetric("az", "us2", test.PeerID2),
makeMetric("az", "eu1", test.PeerID3),
makeMetric("az", "eu1", test.PeerID4),
makeMetric("az", "eu2", test.PeerID5),
makeMetric("az", "au1", test.PeerID6),
makeMetric("az", "au1", test.PeerID7),
},
"freespace": []*api.Metric{
makeMetric("freespace", "100", test.PeerID1),
makeMetric("freespace", "500", test.PeerID2),
makeMetric("freespace", "200", test.PeerID3),
makeMetric("freespace", "400", test.PeerID4),
makeMetric("freespace", "10", test.PeerID5),
makeMetric("freespace", "50", test.PeerID6),
makeMetric("freespace", "600", test.PeerID7),
makeMetric("freespace", "10000", test.PeerID8),
},
}
// Based on the algorithm it should choose:
//
// - For the region us, az us1, it should select the peer with most
// freespace: ID2
// - Then switch to next region: ID4
// - And so on:
// - us-us1-100 ID1 - only peer in az
// - eu-eu1-400 ID4 - over ID3
// - au-au1-600 ID7 - over ID6
// - us-us2-500 ID2 - only peer in az
// - eu-eu2-10 ID5 - only peer in az
// - au-au1-50 ID6 - id7 already used
// - // no more in us
// - eu-eu1-ID3 - ID4 already used
// - // no more peers in au
// - // no more peers
peers, err := alloc.Allocate(context.Background(),
test.Cid1,
nil,
candidates,
nil,
)
if err != nil {
t.Fatal(err)
}
if len(peers) < 7 {
t.Fatalf("not enough peers: %s", peers)
}
for i, p := range peers {
t.Logf("%d - %s", i, p)
switch i {
case 0:
if p != test.PeerID1 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 1:
if p != test.PeerID4 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 2:
if p != test.PeerID7 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 3:
if p != test.PeerID2 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 4:
if p != test.PeerID5 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 5:
if p != test.PeerID6 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 6:
if p != test.PeerID3 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
default:
t.Error("too many peers")
}
}
}

View File

@ -0,0 +1,26 @@
package metrics
import "github.com/ipfs/ipfs-cluster/api"
type informer struct {
sorter func([]*api.Metric) []*api.Metric
partitionable bool
}
var informers map[string]informer
// Registers an informer with this allocator so that we know how to
// sort and use metrics coming from it.
func RegisterInformer(
name string,
sorter func([]*api.Metric) []*api.Metric,
partitionable bool,
) {
if informers == nil {
informers = make(map[string]informer)
}
informers[name] = informer{
sorter: sorter,
partitionable: partitionable,
}
}

View File

@ -0,0 +1,137 @@
// Package sorter is a utility package used by the allocator
// implementations. This package provides the sort function for metrics of
// different value types.
package sorter
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
)
// SortNumeric sorts a list of metrics of Uint values from smallest to
// largest. Invalid or non-numeric metrics are discarded.
func SortNumeric(metrics []*api.Metric) []*api.Metric {
return sortNumeric(metrics, false)
}
// SortNumericReverse sorts a list of metrics of Uint values from largest to
// smallest. Invalid or non-numeric metrics are discarded.
func SortNumericReverse(metrics []*api.Metric) []*api.Metric {
return sortNumeric(metrics, true)
}
func sortNumeric(metrics []*api.Metric, reverse bool) []*api.Metric {
filteredMetrics := make([]*api.Metric, 0, len(metrics))
values := make([]uint64, 0, len(metrics))
// Parse and discard
for _, m := range metrics {
if m.Discard() {
continue
}
val, err := strconv.ParseUint(m.Value, 10, 64)
if err != nil {
continue
}
filteredMetrics = append(filteredMetrics, m)
values = append(values, val)
}
sorter := &numericSorter{
values: values,
metrics: filteredMetrics,
reverse: reverse,
}
sort.Sort(sorter)
return sorter.metrics
}
// SortText sorts a list of metrics of string values.
// Invalid metrics are discarded.
func SortText(metrics []*api.Metric) []*api.Metric {
filteredMetrics := make([]*api.Metric, 0, len(metrics))
// Parse and discard
for _, m := range metrics {
if m.Discard() {
continue
}
filteredMetrics = append(filteredMetrics, m)
}
sorter := &textSorter{
metrics: filteredMetrics,
reverse: false,
}
sort.Sort(sorter)
return sorter.metrics
}
// numericSorter implements the sort.Sort interface
type numericSorter struct {
values []uint64
metrics []*api.Metric
reverse bool
}
func (s numericSorter) Len() int {
return len(s.values)
}
func (s numericSorter) Swap(i, j int) {
tempVal := s.values[i]
s.values[i] = s.values[j]
s.values[j] = tempVal
tempMetric := s.metrics[i]
s.metrics[i] = s.metrics[j]
s.metrics[j] = tempMetric
}
// We make it a strict order by ordering by peer ID.
func (s numericSorter) Less(i, j int) bool {
if s.values[i] == s.values[j] {
if s.reverse {
return string(s.metrics[i].Peer) > string(s.metrics[j].Peer)
}
return string(s.metrics[i].Peer) < string(s.metrics[j].Peer)
}
if s.reverse {
return s.values[i] > s.values[j]
}
return s.values[i] < s.values[j]
}
// textSorter implements the sort.Sort interface
type textSorter struct {
metrics []*api.Metric
reverse bool
}
func (s textSorter) Len() int {
return len(s.metrics)
}
func (s textSorter) Swap(i, j int) {
tempMetric := s.metrics[i]
s.metrics[i] = s.metrics[j]
s.metrics[j] = tempMetric
}
// We make it a strict order by ordering by peer ID.
func (s textSorter) Less(i, j int) bool {
if s.metrics[i].Value == s.metrics[j].Value {
if s.reverse {
return string(s.metrics[i].Peer) > string(s.metrics[j].Peer)
}
return string(s.metrics[i].Peer) < string(s.metrics[j].Peer)
}
if s.reverse {
return s.metrics[i].Value > s.metrics[j].Value
}
return s.metrics[i].Value < s.metrics[j].Value
}

View File

@ -0,0 +1,70 @@
package sorter
import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
)
func TestSortNumeric(t *testing.T) {
metrics := []*api.Metric{
&api.Metric{
Name: "num",
Value: "5",
Valid: true,
},
&api.Metric{
Name: "num",
Value: "0",
Valid: true,
},
&api.Metric{
Name: "num",
Value: "-1",
Valid: true,
},
&api.Metric{
Name: "num3",
Value: "abc",
Valid: true,
},
&api.Metric{
Name: "num2",
Value: "10",
Valid: true,
},
&api.Metric{
Name: "num2",
Value: "4",
Valid: true,
},
}
for _, m := range metrics {
m.SetTTL(time.Minute)
}
metrics[4].Expire = 0 // manually expire
sorted := SortNumeric(metrics, false)
if len(sorted) != 3 {
t.Fatal("sorter did not remove invalid metrics:")
}
if sorted[0].Value != "0" ||
sorted[1].Value != "4" ||
sorted[2].Value != "5" {
t.Error("not sorted properly")
}
sortedRev := SortNumeric(metrics, true)
if len(sortedRev) != 3 {
t.Fatal("sorted did not remove invalid metrics")
}
if sortedRev[0].Value != "5" ||
sortedRev[1].Value != "4" ||
sortedRev[2].Value != "0" {
t.Error("not sorted properly")
}
}

View File

@ -1,75 +0,0 @@
// Package util is a utility package used by the allocator
// implementations. This package provides the SortNumeric function, which may be
// used by an allocator to sort peers by their metric values (ascending or
// descending).
package util
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// SortNumeric returns a list of peers sorted by their metric values. If reverse
// is false (true), peers will be sorted from smallest to largest (largest to
// smallest) metric
func SortNumeric(candidates map[peer.ID]*api.Metric, reverse bool) []peer.ID {
vMap := make(map[peer.ID]uint64)
peers := make([]peer.ID, 0, len(candidates))
for k, v := range candidates {
if v.Discard() {
continue
}
val, err := strconv.ParseUint(v.Value, 10, 64)
if err != nil {
continue
}
peers = append(peers, k)
vMap[k] = val
}
sorter := &metricSorter{
m: vMap,
peers: peers,
reverse: reverse,
}
sort.Sort(sorter)
return sorter.peers
}
// metricSorter implements the sort.Sort interface
type metricSorter struct {
peers []peer.ID
m map[peer.ID]uint64
reverse bool
}
// Len returns the number of metrics
func (s metricSorter) Len() int {
return len(s.peers)
}
// Swap Swaps the elements in positions i and j
func (s metricSorter) Swap(i, j int) {
temp := s.peers[i]
s.peers[i] = s.peers[j]
s.peers[j] = temp
}
// Less reports if the element in position i is Less than the element in j
// (important to override this)
func (s metricSorter) Less(i, j int) bool {
peeri := s.peers[i]
peerj := s.peers[j]
x := s.m[peeri]
y := s.m[peerj]
if s.reverse {
return x > y
}
return x < y
}

View File

@ -1100,6 +1100,9 @@ func (n *NodeWithMeta) Size() uint64 {
return uint64(len(n.Data))
}
// MetricsSet is map to carry slices of metric indexed by type.
type MetricsSet map[string][]*Metric
// Metric transports information about a peer.ID. It is used to decide
// pin allocations by a PinAllocator. IPFS cluster is agnostic to
// the Value, which should be interpreted by the PinAllocator.

View File

@ -12,7 +12,7 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/informer/numpin"
@ -148,7 +148,7 @@ type mockTracer struct {
}
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
ident, clusterCfg, _, _, _, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ident, clusterCfg, _, _, _, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, _, _, _ := testingConfigs()
ctx := context.Background()
host, pubsub, dht := createHost(t, ident.PrivateKey, clusterCfg.Secret, clusterCfg.ListenAddr)
@ -180,7 +180,12 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
t.Fatal(err)
}
alloc := ascendalloc.NewAllocator()
alloc, err := metrics.New(&metrics.Config{
AllocateBy: []string{"numpin"},
})
if err != nil {
t.Fatal(err)
}
numpinCfg := &numpin.Config{}
numpinCfg.Default()
inf, _ := numpin.NewInformer(numpinCfg)

View File

@ -1,6 +1,7 @@
package ipfscluster
import (
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/config"
@ -126,9 +127,12 @@ var testingMonCfg = []byte(`{
"failure_threshold": 6
}`)
var testingAllocMetricsCfg = []byte(`{
"allocate_by": ["freespace"]
}`)
var testingDiskInfCfg = []byte(`{
"metric_ttl": "900ms",
"metric_type": "freespace"
"metric_ttl": "900ms"
}`)
var testingTracerCfg = []byte(`{
@ -138,8 +142,8 @@ var testingTracerCfg = []byte(`{
"service_name": "cluster-daemon"
}`)
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *metrics.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
identity.LoadJSON(testingIdentity)
clusterCfg.LoadJSON(testingClusterCfg)
apiCfg.LoadJSON(testingAPICfg)
@ -151,13 +155,14 @@ func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Confi
crdtCfg.LoadJSON(testingCrdtCfg)
statelesstrkrCfg.LoadJSON(testingTrackerCfg)
pubsubmonCfg.LoadJSON(testingMonCfg)
allocMetricsCfg.LoadJSON(testingAllocMetricsCfg)
diskInfCfg.LoadJSON(testingDiskInfCfg)
tracingCfg.LoadJSON(testingTracerCfg)
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg
}
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *metrics.Config, *disk.Config, *observations.TracingConfig) {
identity := &config.Identity{}
clusterCfg := &Config{}
apiCfg := &rest.Config{}
@ -169,9 +174,10 @@ func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.
crdtCfg := &crdt.Config{}
statelessCfg := &stateless.Config{}
pubsubmonCfg := &pubsubmon.Config{}
allocMetricsCfg := &metrics.Config{}
diskInfCfg := &disk.Config{}
tracingCfg := &observations.TracingConfig{}
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg
}
// func TestConfigDefault(t *testing.T) {

View File

@ -18,17 +18,6 @@ const (
DefaultMetricType = MetricFreeSpace
)
// String returns a string representation for MetricType.
func (t MetricType) String() string {
switch t {
case MetricFreeSpace:
return "freespace"
case MetricRepoSize:
return "reposize"
}
return ""
}
// Config is used to initialize an Informer and customize
// the type and parameters of the metric it produces.
type Config struct {

View File

@ -7,6 +7,8 @@ import (
"fmt"
"sync"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
logging "github.com/ipfs/go-log/v2"
@ -20,13 +22,29 @@ type MetricType int
const (
// MetricFreeSpace provides the available space reported by IPFS
MetricFreeSpace = iota
MetricFreeSpace MetricType = iota
// MetricRepoSize provides the used space reported by IPFS
MetricRepoSize
)
// String returns a string representation for MetricType.
func (t MetricType) String() string {
switch t {
case MetricFreeSpace:
return "freespace"
case MetricRepoSize:
return "reposize"
}
return ""
}
var logger = logging.Logger("diskinfo")
func init() {
metrics.RegisterInformer(MetricFreeSpace.String(), sorter.SortNumericReverse, false)
metrics.RegisterInformer(MetricRepoSize.String(), sorter.SortNumeric, false)
}
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
@ -48,7 +66,7 @@ func NewInformer(cfg *Config) (*Informer, error) {
}, nil
}
// Name returns the user-facing name of this informer.
// Name returns the name of the metric issued by this informer.
func (disk *Informer) Name() string {
return disk.config.MetricType.String()
}

View File

@ -6,6 +6,8 @@ import (
"context"
"fmt"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/libp2p/go-libp2p-gorpc"
@ -16,6 +18,10 @@ import (
// MetricName specifies the name of our metric
var MetricName = "numpin"
func init() {
metrics.RegisterInformer(MetricName, sorter.SortNumeric, false)
}
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces
type Informer struct {

View File

@ -152,7 +152,7 @@ type PinAllocator interface {
// which are currently pinning the content. The candidates map
// contains the metrics for all peers which are eligible for pinning
// the content.
Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]*api.Metric) ([]peer.ID, error)
Allocate(ctx context.Context, c cid.Cid, current, candidates, priority api.MetricsSet) ([]peer.ID, error)
}
// PeerMonitor is a component in charge of publishing a peer's metrics and

View File

@ -17,7 +17,7 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/consensus/crdt"
@ -176,7 +176,7 @@ func createComponents(
peername := fmt.Sprintf("peer_%d", i)
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs()
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg := testingConfigs()
ident.ID = host.ID()
ident.PrivateKey = host.Peerstore().PrivKey(host.ID())
@ -211,7 +211,10 @@ func createComponents(
t.Fatal(err)
}
alloc := descendalloc.NewAllocator()
alloc, err := metrics.New(allocMetricsCfg)
if err != nil {
t.Fatal(err)
}
inf, err := disk.NewInformer(diskInfCfg)
if err != nil {
t.Fatal(err)

View File

@ -5,6 +5,7 @@ package pubsubmon
import (
"bytes"
"context"
"time"
"sync"
@ -157,11 +158,7 @@ func (mon *Monitor) logFromPubsub() {
}
}
logger.Debugf(
"received pubsub metric '%s' from '%s'",
metric.Name,
metric.Peer,
)
debug("recieved", &metric)
err = mon.LogMetric(ctx, &metric)
if err != nil {
@ -208,7 +205,7 @@ func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error {
defer span.End()
mon.metrics.Add(m)
logger.Debugf("pubsub mon logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire)
debug("logged", m)
return nil
}
@ -231,11 +228,7 @@ func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error {
return err
}
logger.Debugf(
"publishing metric %s to pubsub. Expires: %d",
m.Name,
m.Expire,
)
debug("publish", m)
err = mon.topic.Publish(ctx, b.Bytes())
if err != nil {
@ -281,3 +274,14 @@ func (mon *Monitor) MetricNames(ctx context.Context) []string {
return mon.metrics.MetricNames()
}
func debug(event string, m *api.Metric) {
logger.Debugf(
"%s metric: '%s' - '%s' - '%s' - '%s'",
event,
m.Peer,
m.Name,
m.Value,
time.Unix(0, m.Expire),
)
}

View File

@ -31,6 +31,9 @@ var (
PeerID4, _ = peer.Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc")
PeerID5, _ = peer.Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg")
PeerID6, _ = peer.Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
PeerID7, _ = peer.Decode("12D3KooWGHTKzeT4KaLGLrbKKyT8zKrBPXAUBRzCAN6ZMDMo4M6M")
PeerID8, _ = peer.Decode("12D3KooWFBFCDQzAkQSwPZLV883pKdsmb6urQ3sMjfJHUxn5GCVv")
PeerID9, _ = peer.Decode("12D3KooWKuJ8LPTyHbyX4nt4C7uWmUobzFsiceTVoFw7HpmoNakM")
PeerName1 = "TestPeer1"
PeerName2 = "TestPeer2"