Merge pull request #1468 from ipfs/fix/159-improved-allocators

Add tags informer and enable partition-based peer allocations for intelligent distribution
This commit is contained in:
Hector Sanjuan 2021-10-06 14:35:16 +02:00 committed by GitHub
commit 96db605c50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 1441 additions and 562 deletions

View File

@ -39,6 +39,16 @@ import (
// ReplicationFactorMax is reached. Error if there are less than
// ReplicationFactorMin.
// A wrapper to carry peer metrics that have been classified.
type classifiedMetrics struct {
current api.MetricsSet
currentPeers []peer.ID
candidate api.MetricsSet
candidatePeers []peer.ID
priority api.MetricsSet
priorityPeers []peer.ID
}
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with valid replicationFactors (if rplMin and rplMax
// are > 0, then rplMin <= rplMax).
@ -47,7 +57,7 @@ import (
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pin, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pin, rplMin, rplMax int, blacklist []peer.ID, priorityList []peer.ID) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
defer span.End()
@ -64,37 +74,30 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pi
if currentPin != nil {
currentAllocs = currentPin.Allocations
}
metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name())
currentMetrics := make(map[peer.ID]*api.Metric)
candidatesMetrics := make(map[peer.ID]*api.Metric)
priorityMetrics := make(map[peer.ID]*api.Metric)
// Divide metrics between current and candidates.
// All metrics in metrics are valid (at least the
// moment they were compiled by the monitor)
for _, m := range metrics {
switch {
case containsPeer(blacklist, m.Peer):
// discard blacklisted peers
continue
case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m
case containsPeer(prioritylist, m.Peer):
priorityMetrics[m.Peer] = m
default:
candidatesMetrics[m.Peer] = m
}
// Get Metrics that the allocator is interested on
mSet := make(api.MetricsSet)
metrics := c.allocator.Metrics()
for _, metricName := range metrics {
mSet[metricName] = c.monitor.LatestMetrics(ctx, metricName)
}
// Filter and divide metrics. The resulting sets only have peers that
// have all the metrics needed and are not blacklisted.
classified := filterMetrics(
mSet,
len(metrics),
currentAllocs,
priorityList,
blacklist,
)
newAllocs, err := c.obtainAllocations(
ctx,
hash,
rplMin,
rplMax,
currentMetrics,
candidatesMetrics,
priorityMetrics,
classified,
)
if err != nil {
return newAllocs, err
@ -105,19 +108,83 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pi
return newAllocs, nil
}
// Given metrics from all informers, split them into 3 MetricsSet:
// - Those corresponding to currently allocated peers
// - Those corresponding to priority allocations
// - Those corresponding to "candidate" allocations
// And return also an slice of the peers in those groups.
//
// For a metric/peer to be included in a group, it is necessary that it has
// metrics for all informers.
func filterMetrics(mSet api.MetricsSet, numMetrics int, currentAllocs, priorityList, blacklist []peer.ID) classifiedMetrics {
curPeersMap := make(map[peer.ID][]*api.Metric)
candPeersMap := make(map[peer.ID][]*api.Metric)
prioPeersMap := make(map[peer.ID][]*api.Metric)
// Divide the metric by current/candidate/prio and by peer
for _, metrics := range mSet {
for _, m := range metrics {
switch {
case containsPeer(blacklist, m.Peer):
// discard blacklisted peers
continue
case containsPeer(currentAllocs, m.Peer):
curPeersMap[m.Peer] = append(curPeersMap[m.Peer], m)
case containsPeer(priorityList, m.Peer):
prioPeersMap[m.Peer] = append(prioPeersMap[m.Peer], m)
default:
candPeersMap[m.Peer] = append(candPeersMap[m.Peer], m)
}
}
}
fillMetricsSet := func(peersMap map[peer.ID][]*api.Metric) (api.MetricsSet, []peer.ID) {
mSet := make(api.MetricsSet)
peers := make([]peer.ID, 0, len(peersMap))
// Put the metrics in their sets if peers have metrics for all
// informers Record peers. This relies on LatestMetrics
// returning exactly one metric per peer. Thus, a peer with
// all the needed metrics should have exactly numMetrics.
// Otherwise, they are ignored.
for p, metrics := range peersMap {
if len(metrics) == numMetrics {
for _, m := range metrics {
mSet[m.Name] = append(mSet[m.Name], m)
}
peers = append(peers, p)
} // otherwise this peer will be ignored.
}
return mSet, peers
}
curSet, curPeers := fillMetricsSet(curPeersMap)
candSet, candPeers := fillMetricsSet(candPeersMap)
prioSet, prioPeers := fillMetricsSet(prioPeersMap)
return classifiedMetrics{
current: curSet,
currentPeers: curPeers,
candidate: candSet,
candidatePeers: candPeers,
priority: prioSet,
priorityPeers: prioPeers,
}
}
// allocationError logs an allocation error
func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID) error {
logger.Errorf("Not enough candidates to allocate %s:", hash)
logger.Errorf(" Needed: %d", needed)
logger.Errorf(" Wanted: %d", wanted)
logger.Errorf(" Valid candidates: %d:", len(candidatesValid))
logger.Errorf(" Available candidates: %d:", len(candidatesValid))
for _, c := range candidatesValid {
logger.Errorf(" - %s", c.Pretty())
}
errorMsg := "not enough peers to allocate CID. "
errorMsg += fmt.Sprintf("Needed at least: %d. ", needed)
errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted)
errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid))
errorMsg += fmt.Sprintf("Available candidates: %d. ", len(candidatesValid))
errorMsg += "See logs for more info."
return errors.New(errorMsg)
}
@ -126,26 +193,20 @@ func (c *Cluster) obtainAllocations(
ctx context.Context,
hash cid.Cid,
rplMin, rplMax int,
currentValidMetrics map[peer.ID]*api.Metric,
candidatesMetrics map[peer.ID]*api.Metric,
priorityMetrics map[peer.ID]*api.Metric,
metrics classifiedMetrics,
) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
defer span.End()
// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
for k := range currentValidMetrics {
validAllocations = append(validAllocations, k)
}
nCurrentValid := len(validAllocations)
nCandidatesValid := len(candidatesMetrics) + len(priorityMetrics)
nCurrentValid := len(metrics.currentPeers)
nAvailableValid := len(metrics.candidatePeers) + len(metrics.priorityPeers)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid)
logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid)
logger.Debugf("obtainAllocations: current: %d", nCurrentValid)
logger.Debugf("obtainAllocations: available: %d", nAvailableValid)
logger.Debugf("obtainAllocations: candidates: %d", len(metrics.candidatePeers))
logger.Debugf("obtainAllocations: priority: %d", len(metrics.priorityPeers))
logger.Debugf("obtainAllocations: Needed: %d", needed)
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
@ -155,7 +216,7 @@ func (c *Cluster) obtainAllocations(
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
return validAllocations[0 : len(validAllocations)+wanted], nil
return metrics.currentPeers[0 : len(metrics.currentPeers)+wanted], nil
}
if needed <= 0 { // allocations are above minimal threshold
@ -163,15 +224,8 @@ func (c *Cluster) obtainAllocations(
return nil, nil
}
if nCandidatesValid < needed { // not enough candidates
candidatesValid := []peer.ID{}
for k := range priorityMetrics {
candidatesValid = append(candidatesValid, k)
}
for k := range candidatesMetrics {
candidatesValid = append(candidatesValid, k)
}
return nil, allocationError(hash, needed, wanted, candidatesValid)
if nAvailableValid < needed { // not enough candidates
return nil, allocationError(hash, needed, wanted, append(metrics.priorityPeers, metrics.candidatePeers...))
}
// We can allocate from this point. Use the allocator to decide
@ -181,9 +235,9 @@ func (c *Cluster) obtainAllocations(
finalAllocs, err := c.allocator.Allocate(
ctx,
hash,
currentValidMetrics,
candidatesMetrics,
priorityMetrics,
metrics.current,
metrics.candidate,
metrics.priority,
)
if err != nil {
return nil, logError(err.Error())
@ -201,5 +255,5 @@ func (c *Cluster) obtainAllocations(
// the final result is the currently valid allocations
// along with the ones provided by the allocator
return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil
return append(metrics.currentPeers, finalAllocs[0:allocationsToUse]...), nil
}

View File

@ -1,45 +0,0 @@
// Package ascendalloc implements an ipfscluster.PinAllocator, which returns
// allocations based on sorting the metrics in ascending order. Thus, peers with
// smallest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package ascendalloc
import (
"context"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
// AscendAllocator extends the SimpleAllocator
type AscendAllocator struct{}
// NewAllocator returns an initialized AscendAllocator
func NewAllocator() AscendAllocator {
return AscendAllocator{}
}
// SetClient does nothing in this allocator
func (alloc AscendAllocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc AscendAllocator) Shutdown(_ context.Context) error { return nil }
// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority map[peer.ID]*api.Metric,
) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, false)
last := util.SortNumeric(candidates, false)
return append(first, last...), nil
}

View File

@ -1,117 +0,0 @@
package ascendalloc
import (
"context"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type testcase struct {
candidates map[peer.ID]*api.Metric
current map[peer.ID]*api.Metric
expected []peer.ID
}
var (
peer0 = peer.ID("QmUQ6Nsejt1SuZAu8yL8WgqQZHHAYreLVYYa4VPsLUCed7")
peer1 = peer.ID("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
peer2 = peer.ID("QmPrSBATWGAN56fiiEWEhKX3L1F3mTghEQR7vQwaeo7zHi")
peer3 = peer.ID("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)
var inAMinute = time.Now().Add(time.Minute).UnixNano()
var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: {
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: {
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1, peer3, peer2, peer0},
},
{ // filter invalid
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
}
func Test(t *testing.T) {
ctx := context.Background()
alloc := &AscendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
if len(res) == 0 {
t.Fatal("0 allocations")
}
for i, r := range res {
if e := tc.expected[i]; r != e {
t.Errorf("Expect r[%d]=%s but got %s", i, r, e)
}
}
}
}

View File

@ -0,0 +1,307 @@
// Package balanced implements an allocator that can sort allocations
// based on multiple metrics, where metrics may be an arbitrary way to
// partition a set of peers.
//
// For example, allocating by ["tag:region", "disk"] the resulting peer
// candidate order will balanced between regions and ordered by the value of
// the weight of the disk metric.
package balanced
import (
"context"
"fmt"
"sort"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
api "github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var logger = logging.Logger("allocator")
// Allocator is an allocator that partitions metrics and orders
// the final list of allocation by selecting for each partition.
type Allocator struct {
config *Config
rpcClient *rpc.Client
}
// New returns an initialized Allocator.
func New(cfg *Config) (*Allocator, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
return &Allocator{
config: cfg,
}, nil
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (a *Allocator) SetClient(c *rpc.Client) {
a.rpcClient = c
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (a *Allocator) Shutdown(ctx context.Context) error {
a.rpcClient = nil
return nil
}
type partitionedMetric struct {
metricName string
curChoosingIndex int
noMore bool
partitions []*partition // they are in order of their values
}
type partition struct {
value string
weight int64
peers map[peer.ID]bool // the bool tracks whether the peer has been picked already out of the partition when doing the final sort.
sub *partitionedMetric // all peers in sub-partitions will have the same value for this metric
}
// Returns a partitionedMetric which has partitions and subpartitions based
// on the metrics and values given by the "by" slice. The partitions
// are ordered based on the cumulative weight.
func partitionMetrics(set api.MetricsSet, by []string) *partitionedMetric {
rootMetric := by[0]
pnedMetric := &partitionedMetric{
metricName: rootMetric,
partitions: partitionValues(set[rootMetric]),
}
// For sorting based on weight (more to less)
lessF := func(i, j int) bool {
wi := pnedMetric.partitions[i].weight
wj := pnedMetric.partitions[j].weight
// Strict order
if wi == wj {
return pnedMetric.partitions[i].value < pnedMetric.partitions[j].value
}
// Descending!
return wj < wi
}
if len(by) == 1 { // we are done
sort.Slice(pnedMetric.partitions, lessF)
return pnedMetric
}
// process sub-partitions
for _, partition := range pnedMetric.partitions {
filteredSet := make(api.MetricsSet)
for k, v := range set {
if k == rootMetric { // not needed anymore
continue
}
for _, m := range v {
// only leave metrics for peers in current partition
if _, ok := partition.peers[m.Peer]; ok {
filteredSet[k] = append(filteredSet[k], m)
}
}
}
partition.sub = partitionMetrics(filteredSet, by[1:])
// Add the weight of our subpartitions
for _, subp := range partition.sub.partitions {
partition.weight += subp.weight
}
}
sort.Slice(pnedMetric.partitions, lessF)
return pnedMetric
}
func partitionValues(metrics []*api.Metric) []*partition {
partitions := []*partition{}
if len(metrics) <= 0 {
return partitions
}
// We group peers with the same value in the same partition.
partitionsByValue := make(map[string]*partition)
for _, m := range metrics {
// Sometimes two metrics have the same value / weight, but we
// still want to put them in different partitions. Otherwise
// their weights get added and they form a bucket and
// therefore not they are not selected in order: 3 peers with
// freespace=100 and one peer with freespace=200 would result
// in one of the peers with freespace 100 being chosen first
// because the partition's weight is 300.
//
// We are going to call these metrics (like free-space),
// non-partitionable metrics. This is going to be the default
// (for backwards compat reasons).
//
// The informers must set the Partitionable field accordingly
// when two metrics with the same value must be grouped in the
// same partition.
if !m.Partitionable {
partitions = append(partitions, &partition{
value: m.Value,
weight: m.GetWeight(),
peers: map[peer.ID]bool{
m.Peer: false,
},
})
continue
}
// Any other case, we partition by value.
if p, ok := partitionsByValue[m.Value]; ok {
p.peers[m.Peer] = false
p.weight += m.GetWeight()
} else {
partitionsByValue[m.Value] = &partition{
value: m.Value,
weight: m.GetWeight(),
peers: map[peer.ID]bool{
m.Peer: false,
},
}
}
}
for _, p := range partitionsByValue {
partitions = append(partitions, p)
}
return partitions
}
// Returns a list of peers sorted by never choosing twice from the same
// partition if there is some other partition to choose from.
func (pnedm *partitionedMetric) sortedPeers() []peer.ID {
peers := []peer.ID{}
for {
peer := pnedm.chooseNext()
if peer == "" { // This means we are done.
break
}
peers = append(peers, peer)
}
return peers
}
func (pnedm *partitionedMetric) chooseNext() peer.ID {
lenp := len(pnedm.partitions)
if lenp == 0 {
return ""
}
if pnedm.noMore {
return ""
}
var peer peer.ID
curPartition := pnedm.partitions[pnedm.curChoosingIndex]
done := 0
for {
if curPartition.sub != nil {
// Choose something from the sub-partitionedMetric
peer = curPartition.sub.chooseNext()
} else {
// We are a bottom-partition. Choose one of our peers
for pid, used := range curPartition.peers {
if !used {
peer = pid
curPartition.peers[pid] = true // mark as used
break
}
}
}
// look in next partition next time
pnedm.curChoosingIndex = (pnedm.curChoosingIndex + 1) % lenp
curPartition = pnedm.partitions[pnedm.curChoosingIndex]
done++
if peer != "" {
break
}
// no peer and we have looked in as many partitions as we have
if done == lenp {
pnedm.noMore = true
break
}
}
return peer
}
// Allocate produces a sorted list of cluster peer IDs based on different
// metrics provided for those peer IDs.
// It works as follows:
//
// - First, it buckets each peer metrics based on the AllocateBy list. The
// metric name must match the bucket name, otherwise they are put at the end.
// - Second, based on the AllocateBy order, it orders the first bucket and
// groups peers by ordered value.
// - Third, it selects metrics on the second bucket for the most prioritary
// peers of the first bucket and orders their metrics. Then for the peers in
// second position etc.
// - It repeats the process until there is no more buckets to sort.
// - Finally, it returns the first peer of the first
// - Third, based on the AllocateBy order, it select the first metric
func (a *Allocator) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority api.MetricsSet,
) ([]peer.ID, error) {
// For the allocation to work well, there have to be metrics of all
// the types for all the peers. There cannot be a metric of one type
// for a peer that does not appear in the other types.
//
// Removing such occurences is done in allocate.go, before the
// allocator is called.
//
// Otherwise, the sorting might be funny.
candidatePartition := partitionMetrics(candidates, a.config.AllocateBy)
priorityPartition := partitionMetrics(priority, a.config.AllocateBy)
logger.Debugf("Balanced allocator partitions:\n%s\n", printPartition(candidatePartition, 0))
first := priorityPartition.sortedPeers()
last := candidatePartition.sortedPeers()
return append(first, last...), nil
}
// Metrics returns the names of the metrics that have been registered
// with this allocator.
func (a *Allocator) Metrics() []string {
return a.config.AllocateBy
}
func printPartition(m *partitionedMetric, ind int) string {
str := ""
indent := func() {
for i := 0; i < ind+2; i++ {
str += " "
}
}
for _, p := range m.partitions {
indent()
str += fmt.Sprintf(" | %s:%s - %d - [", m.metricName, p.value, p.weight)
for p, u := range p.peers {
str += fmt.Sprintf("%s|%t, ", p, u)
}
str += "]\n"
if p.sub != nil {
str += printPartition(p.sub, ind+2)
}
}
return str
}

View File

@ -0,0 +1,143 @@
package balanced
import (
"context"
"testing"
"time"
api "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer"
)
func makeMetric(name, value string, weight int64, peer peer.ID, partitionable bool) *api.Metric {
return &api.Metric{
Name: name,
Value: value,
Weight: weight,
Peer: peer,
Valid: true,
Partitionable: partitionable,
Expire: time.Now().Add(time.Minute).UnixNano(),
}
}
func TestAllocate(t *testing.T) {
alloc, err := New(&Config{
AllocateBy: []string{
"region",
"az",
"freespace",
},
})
if err != nil {
t.Fatal(err)
}
candidates := api.MetricsSet{
"abc": []*api.Metric{ // don't want anything in results
makeMetric("abc", "a", 0, test.PeerID1, true),
makeMetric("abc", "b", 0, test.PeerID2, true),
},
"region": []*api.Metric{
makeMetric("region", "a-us", 0, test.PeerID1, true),
makeMetric("region", "a-us", 0, test.PeerID2, true),
makeMetric("region", "b-eu", 0, test.PeerID3, true),
makeMetric("region", "b-eu", 0, test.PeerID4, true),
makeMetric("region", "b-eu", 0, test.PeerID5, true),
makeMetric("region", "c-au", 0, test.PeerID6, true),
makeMetric("region", "c-au", 0, test.PeerID7, true),
makeMetric("region", "c-au", 0, test.PeerID8, true), // I don't want to see this in results
},
"az": []*api.Metric{
makeMetric("az", "us1", 0, test.PeerID1, true),
makeMetric("az", "us2", 0, test.PeerID2, true),
makeMetric("az", "eu1", 0, test.PeerID3, true),
makeMetric("az", "eu1", 0, test.PeerID4, true),
makeMetric("az", "eu2", 0, test.PeerID5, true),
makeMetric("az", "au1", 0, test.PeerID6, true),
makeMetric("az", "au1", 0, test.PeerID7, true),
},
"freespace": []*api.Metric{
makeMetric("freespace", "100", 100, test.PeerID1, false),
makeMetric("freespace", "500", 500, test.PeerID2, false),
makeMetric("freespace", "200", 0, test.PeerID3, false), // weight to 0 to test GetWeight() compat
makeMetric("freespace", "400", 0, test.PeerID4, false), // weight to 0 to test GetWeight() compat
makeMetric("freespace", "10", 10, test.PeerID5, false),
makeMetric("freespace", "50", 50, test.PeerID6, false),
makeMetric("freespace", "600", 600, test.PeerID7, false),
makeMetric("freespace", "10000", 10000, test.PeerID8, false),
},
}
// Regions weights: a-us (pids 1,2): 600. b-eu (pids 3,4,5): 610. c-au (pids 6,7): 650
// Az weights: us1: 100. us2: 500. eu1: 600. eu2: 10. au1: 650
// Based on the algorithm it should choose:
//
// - c-au (most-weight)->au1->pid7
// - b-eu->eu1->pid4
// - a-us->us2->pid2
// - <repeat regions>
// - c-au->au1 (nowhere else to choose)->pid6 (region exausted)
// - b-eu->eu2 (already had in eu1)->pid5
// - a-us->us1 (already had in us2)->pid1
// - <repeat regions>
// - b-eu->eu1->pid3 (only peer left)
peers, err := alloc.Allocate(context.Background(),
test.Cid1,
nil,
candidates,
nil,
)
if err != nil {
t.Fatal(err)
}
if len(peers) < 7 {
t.Fatalf("not enough peers: %s", peers)
}
for i, p := range peers {
t.Logf("%d - %s", i, p)
switch i {
case 0:
if p != test.PeerID7 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 1:
if p != test.PeerID4 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 2:
if p != test.PeerID2 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 3:
if p != test.PeerID6 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 4:
if p != test.PeerID5 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 5:
if p != test.PeerID1 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 6:
if p != test.PeerID3 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
default:
t.Error("too many peers")
}
}
}

View File

@ -0,0 +1,103 @@
package balanced
import (
"encoding/json"
"errors"
"github.com/ipfs/ipfs-cluster/config"
"github.com/kelseyhightower/envconfig"
)
const configKey = "metricsalloc"
const envConfigKey = "cluster_metricsalloc"
// These are the default values for a Config.
var (
DefaultAllocateBy = []string{"tag:group", "freespace"}
)
// Config allows to initialize the Allocator.
type Config struct {
config.Saver
AllocateBy []string
}
type jsonConfig struct {
AllocateBy []string `json:"allocate_by"`
}
// ConfigKey returns a human-friendly identifier for this
// Config's type.
func (cfg *Config) ConfigKey() string {
return configKey
}
// Default initializes this Config with sensible values.
func (cfg *Config) Default() error {
cfg.AllocateBy = DefaultAllocateBy
return nil
}
// ApplyEnvVars fills in any Config fields found
// as environment variables.
func (cfg *Config) ApplyEnvVars() error {
jcfg := cfg.toJSONConfig()
err := envconfig.Process(envConfigKey, jcfg)
if err != nil {
return err
}
return cfg.applyJSONConfig(jcfg)
}
// Validate checks that the fields of this configuration have
// sensible values.
func (cfg *Config) Validate() error {
if len(cfg.AllocateBy) <= 0 {
return errors.New("metricalloc.allocate_by is invalid")
}
return nil
}
// LoadJSON parses a raw JSON byte-slice as generated by ToJSON().
func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg)
if err != nil {
return err
}
cfg.Default()
return cfg.applyJSONConfig(jcfg)
}
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
// When unset, leave default
if len(jcfg.AllocateBy) > 0 {
cfg.AllocateBy = jcfg.AllocateBy
}
return cfg.Validate()
}
// ToJSON generates a human-friendly JSON representation of this Config.
func (cfg *Config) ToJSON() ([]byte, error) {
jcfg := cfg.toJSONConfig()
return config.DefaultJSONMarshal(jcfg)
}
func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{
AllocateBy: cfg.AllocateBy,
}
}
// ToDisplayJSON returns JSON config as a string.
func (cfg *Config) ToDisplayJSON() ([]byte, error) {
return config.DisplayJSON(cfg.toJSONConfig())
}

View File

@ -0,0 +1,61 @@
package balanced
import (
"os"
"testing"
)
var cfgJSON = []byte(`
{
"allocate_by": ["tag", "disk"]
}
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(cfgJSON)
if err != nil {
t.Fatal(err)
}
}
func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(cfgJSON)
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
}
cfg = &Config{}
err = cfg.LoadJSON(newjson)
if err != nil {
t.Fatal(err)
}
if len(cfg.AllocateBy) != 2 {
t.Error("configuration was lost in serialization/deserialization")
}
}
func TestDefault(t *testing.T) {
cfg := &Config{}
cfg.Default()
if cfg.Validate() != nil {
t.Fatal("error validating")
}
cfg.AllocateBy = nil
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}
func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_METRICSALLOC_ALLOCATEBY", "a,b,c")
cfg := &Config{}
cfg.ApplyEnvVars()
if len(cfg.AllocateBy) != 3 {
t.Fatal("failed to override allocate_by with env var")
}
}

View File

@ -1,41 +0,0 @@
// Package descendalloc implements an ipfscluster.PinAllocator returns
// allocations based on sorting the metrics in descending order. Thus, peers
// with largest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package descendalloc
import (
"context"
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
// DescendAllocator extends the SimpleAllocator
type DescendAllocator struct{}
// NewAllocator returns an initialized DescendAllocator
func NewAllocator() DescendAllocator {
return DescendAllocator{}
}
// SetClient does nothing in this allocator
func (alloc DescendAllocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc DescendAllocator) Shutdown(_ context.Context) error { return nil }
// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]*api.Metric) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, true)
last := util.SortNumeric(candidates, true)
return append(first, last...), nil
}

View File

@ -1,117 +0,0 @@
package descendalloc
import (
"context"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type testcase struct {
candidates map[peer.ID]*api.Metric
current map[peer.ID]*api.Metric
expected []peer.ID
}
var (
peer0 = peer.ID("QmUQ6Nsejt1SuZAu8yL8WgqQZHHAYreLVYYa4VPsLUCed7")
peer1 = peer.ID("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
peer2 = peer.ID("QmPrSBATWGAN56fiiEWEhKX3L1F3mTghEQR7vQwaeo7zHi")
peer3 = peer.ID("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)
var inAMinute = time.Now().Add(time.Minute).UnixNano()
var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: {
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: {
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1, peer3, peer2, peer0},
},
{ // filter invalid
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]*api.Metric{
peer0: {
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: {
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]*api.Metric{},
expected: []peer.ID{peer1},
},
}
func Test(t *testing.T) {
ctx := context.Background()
alloc := &DescendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
if len(res) == 0 {
t.Fatal("0 allocations")
}
for i, r := range res {
if e := tc.expected[len(res)-i-1]; r != e {
t.Errorf("Expect r[%d]=%s but got %s", i, r, e)
}
}
}
}

View File

@ -1,75 +0,0 @@
// Package util is a utility package used by the allocator
// implementations. This package provides the SortNumeric function, which may be
// used by an allocator to sort peers by their metric values (ascending or
// descending).
package util
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// SortNumeric returns a list of peers sorted by their metric values. If reverse
// is false (true), peers will be sorted from smallest to largest (largest to
// smallest) metric
func SortNumeric(candidates map[peer.ID]*api.Metric, reverse bool) []peer.ID {
vMap := make(map[peer.ID]uint64)
peers := make([]peer.ID, 0, len(candidates))
for k, v := range candidates {
if v.Discard() {
continue
}
val, err := strconv.ParseUint(v.Value, 10, 64)
if err != nil {
continue
}
peers = append(peers, k)
vMap[k] = val
}
sorter := &metricSorter{
m: vMap,
peers: peers,
reverse: reverse,
}
sort.Sort(sorter)
return sorter.peers
}
// metricSorter implements the sort.Sort interface
type metricSorter struct {
peers []peer.ID
m map[peer.ID]uint64
reverse bool
}
// Len returns the number of metrics
func (s metricSorter) Len() int {
return len(s.peers)
}
// Swap Swaps the elements in positions i and j
func (s metricSorter) Swap(i, j int) {
temp := s.peers[i]
s.peers[i] = s.peers[j]
s.peers[j] = temp
}
// Less reports if the element in position i is Less than the element in j
// (important to override this)
func (s metricSorter) Less(i, j int) bool {
peeri := s.peers[i]
peerj := s.peers[j]
x := s.m[peeri]
y := s.m[peerj]
if s.reverse {
return x > y
}
return x < y
}

View File

@ -1100,18 +1100,23 @@ func (n *NodeWithMeta) Size() uint64 {
return uint64(len(n.Data))
}
// MetricsSet is a map to carry slices of metrics indexed by type.
type MetricsSet map[string][]*Metric
// Metric transports information about a peer.ID. It is used to decide
// pin allocations by a PinAllocator. IPFS cluster is agnostic to
// the Value, which should be interpreted by the PinAllocator.
// The ReceivedAt value is a timestamp representing when a peer has received
// the metric value.
type Metric struct {
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
Value string `json:"value" codec:"v,omitempty"`
Expire int64 `json:"expire" codec:"e,omitempty"`
Valid bool `json:"valid" codec:"d,omitempty"`
ReceivedAt int64 `json:"received_at" codec:"t,omitempty"` // ReceivedAt contains a UnixNano timestamp
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
Value string `json:"value" codec:"v,omitempty"`
Expire int64 `json:"expire" codec:"e,omitempty"`
Valid bool `json:"valid" codec:"d,omitempty"`
Weight int64 `json:"weight" codec:"w,omitempty"`
Partitionable bool `json:"partitionable" codec:"o,omitempty"`
ReceivedAt int64 `json:"received_at" codec:"t,omitempty"` // ReceivedAt contains a UnixNano timestamp
}
// SetTTL sets Metric to expire after the given time.Duration
@ -1123,7 +1128,11 @@ func (m *Metric) SetTTL(d time.Duration) {
// GetTTL returns the time left before the Metric expires
func (m *Metric) GetTTL() time.Duration {
expDate := time.Unix(0, m.Expire)
return time.Until(expDate)
ttl := time.Until(expDate)
if ttl < 0 {
ttl = 0
}
return ttl
}
// Expired returns if the Metric has expired
@ -1137,6 +1146,21 @@ func (m *Metric) Discard() bool {
return !m.Valid || m.Expired()
}
// GetWeight returns the weight of the metric. When it is 0,
// it tries to parse the Value and use it as weight.
// This is for compatiblity.
func (m *Metric) GetWeight() int64 {
if m.Weight != 0 {
return m.Weight
}
val, err := strconv.ParseInt(m.Value, 10, 64)
if err != nil {
return 0
}
return val
}
// MetricSlice is a sortable Metric array.
type MetricSlice []*Metric

View File

@ -16,6 +16,7 @@ import (
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/version"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
@ -282,28 +283,46 @@ func (c *Cluster) watchPinset() {
}
}
func (c *Cluster) sendInformerMetric(ctx context.Context, informer Informer) (*api.Metric, error) {
// returns the smallest ttl from the metrics pushed by the informer.
func (c *Cluster) sendInformerMetrics(ctx context.Context, informer Informer) (time.Duration, error) {
ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric")
defer span.End()
metric := informer.GetMetric(ctx)
metric.Peer = c.id
return metric, c.monitor.PublishMetric(ctx, metric)
var minTTL time.Duration
var errors error
metrics := informer.GetMetrics(ctx)
if len(metrics) == 0 {
logger.Errorf("informer %s produced no metrics", informer.Name())
return minTTL, nil
}
for _, metric := range metrics {
metric.Peer = c.id
ttl := metric.GetTTL()
if ttl > 0 && (ttl < minTTL || minTTL == 0) {
minTTL = ttl
}
err := c.monitor.PublishMetric(ctx, metric)
if multierr.AppendInto(&errors, err) {
logger.Warnf("error sending metric %s: %s", metric.Name, err)
}
}
return minTTL, errors
}
func (c *Cluster) sendInformersMetrics(ctx context.Context) ([]*api.Metric, error) {
func (c *Cluster) sendInformersMetrics(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "cluster/sendInformersMetrics")
defer span.End()
var metrics []*api.Metric
var errors error
for _, informer := range c.informers {
m, err := c.sendInformerMetric(ctx, informer)
if err != nil {
return nil, err
_, err := c.sendInformerMetrics(ctx, informer)
if multierr.AppendInto(&errors, err) {
logger.Warnf("informer %s did not send all metrics", informer.Name())
}
metrics = append(metrics, m)
}
return metrics, nil
return errors
}
// pushInformerMetrics loops and publishes informers metrics using the
@ -331,20 +350,24 @@ func (c *Cluster) pushInformerMetrics(ctx context.Context, informer Informer) {
// wait
}
metric, err := c.sendInformerMetric(ctx, informer)
minTTL, err := c.sendInformerMetrics(ctx, informer)
if minTTL == 0 {
minTTL = 30 * time.Second
logger.Warningf("informer %s reported a min metric ttl of 0s.", informer.Name())
}
if err != nil {
if (retries % retryWarnMod) == 0 {
logger.Errorf("error broadcasting metric: %s", err)
retries++
}
// retry sooner
timer.Reset(metric.GetTTL() / 4)
timer.Reset(minTTL / 4)
continue
}
retries = 0
// send metric again in TTL/2
timer.Reset(metric.GetTTL() / 2)
timer.Reset(minTTL / 2)
}
}
@ -964,7 +987,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
}
// Broadcast our metrics to the world
_, err = c.sendInformersMetrics(ctx)
err = c.sendInformersMetrics(ctx)
if err != nil {
logger.Warn(err)
}

View File

@ -12,7 +12,7 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/informer/numpin"
@ -148,7 +148,7 @@ type mockTracer struct {
}
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
ident, clusterCfg, _, _, _, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ident, clusterCfg, _, _, _, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, _, _, _ := testingConfigs()
ctx := context.Background()
host, pubsub, dht := createHost(t, ident.PrivateKey, clusterCfg.Secret, clusterCfg.ListenAddr)
@ -180,7 +180,12 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
t.Fatal(err)
}
alloc := ascendalloc.NewAllocator()
alloc, err := balanced.New(&balanced.Config{
AllocateBy: []string{"numpin"},
})
if err != nil {
t.Fatal(err)
}
numpinCfg := &numpin.Config{}
numpinCfg.Default()
inf, _ := numpin.NewInformer(numpinCfg)

View File

@ -11,7 +11,7 @@ import (
"github.com/ipfs/go-cid"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
@ -342,7 +342,10 @@ func runCmd(c *cli.Context) error {
if err != nil {
return cli.Exit(errors.Wrap(err, "creating disk informer"), 1)
}
alloc := descendalloc.NewAllocator()
alloc, err := balanced.New(cfgs.BalancedAlloc)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating metrics allocator"), 1)
}
crdtcons, err := crdt.New(
host,

View File

@ -6,7 +6,7 @@ import (
"time"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
@ -14,6 +14,7 @@ import (
"github.com/ipfs/ipfs-cluster/consensus/crdt"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/tags"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
@ -155,9 +156,20 @@ func createCluster(
connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp)
checkErr("creating IPFS Connector component", err)
informer, err := disk.NewInformer(cfgs.Diskinf)
checkErr("creating disk informer", err)
alloc := descendalloc.NewAllocator()
var informers []ipfscluster.Informer
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Diskinf.ConfigKey()) {
diskinf, err := disk.NewInformer(cfgs.Diskinf)
checkErr("creating disk informer", err)
informers = append(informers, diskinf)
}
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Tagsinf.ConfigKey()) {
tagsinf, err := tags.New(cfgs.Tagsinf)
checkErr("creating numpin informer", err)
informers = append(informers, tagsinf)
}
alloc, err := balanced.New(cfgs.BalancedAlloc)
checkErr("creating allocator", err)
ipfscluster.ReadyTimeout = cfgs.Raft.WaitForLeaderTimeout + 5*time.Second
@ -206,7 +218,7 @@ func createCluster(
tracker,
mon,
alloc,
[]ipfscluster.Informer{informer},
informers,
tracer,
)
}

View File

@ -0,0 +1 @@
{"replication_factor_min":-1,"replication_factor_max":-1,"name":"","mode":"direct","shard_size":0,"user_allocations":null,"expire_at":"0001-01-01T00:00:00Z","metadata":null,"pin_update":null,"cid":{"/":"QmUaFyXjZUNaUwYF8rBtbJc7fEJ46aJXvgV8z2HHs6jvmJ"},"type":2,"allocations":[],"max_depth":0,"reference":null}

Binary file not shown.

View File

@ -0,0 +1,4 @@
{
"id": "12D3KooWFMMrT3ChFWoTz9L7QFmiLnnMXC7D51KwrCjJdiuYPJqQ",
"private_key": "CAESQPzB/FNh30DDOW2B9JBnHwhM8d7tB/hz5i7NWxzxqiStUjsVQ2mDYmNikFNVkKhibowhTjDzYnSbEXo4SLwSZns="
}

View File

@ -0,0 +1,157 @@
{
"cluster": {
"peername": "carbon",
"secret": "a5da3e684ec4bb93cd94327f965035d0db5001a2983dd94a164e5070aade53bd",
"leave_on_shutdown": false,
"listen_multiaddress": [
"/ip4/0.0.0.0/tcp/9096",
"/ip4/0.0.0.0/udp/9096/quic"
],
"enable_relay_hop": true,
"connection_manager": {
"high_water": 400,
"low_water": 100,
"grace_period": "2m0s"
},
"dial_peer_timeout": "3s",
"state_sync_interval": "5m0s",
"pin_recover_interval": "12m0s",
"replication_factor_min": -1,
"replication_factor_max": -1,
"monitor_ping_interval": "15s",
"peer_watch_interval": "5s",
"mdns_interval": "10s",
"disable_repinning": true,
"peer_addresses": []
},
"consensus": {
"crdt": {
"cluster_name": "ipfs-cluster",
"trusted_peers": [
"*"
],
"batching": {
"max_batch_size": 0,
"max_batch_age": "0s"
}
}
},
"api": {
"ipfsproxy": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"log_file": "",
"read_timeout": "0s",
"read_header_timeout": "5s",
"write_timeout": "0s",
"idle_timeout": "1m0s",
"max_header_bytes": 4096
},
"restapi": {
"http_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "0s",
"read_header_timeout": "5s",
"write_timeout": "0s",
"idle_timeout": "2m0s",
"max_header_bytes": 4096,
"basic_auth_credentials": null,
"http_log_file": "",
"headers": {},
"cors_allowed_origins": [
"*"
],
"cors_allowed_methods": [
"GET"
],
"cors_allowed_headers": [],
"cors_exposed_headers": [
"Content-Type",
"X-Stream-Output",
"X-Chunked-Output",
"X-Content-Length"
],
"cors_allow_credentials": true,
"cors_max_age": "0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "30s",
"ipfs_request_timeout": "5m0s",
"pin_timeout": "2m0s",
"unpin_timeout": "3h0m0s",
"repogc_timeout": "24h0m0s"
}
},
"pin_tracker": {
"stateless": {
"concurrent_pins": 10
}
},
"monitor": {
"pubsubmon": {
"check_interval": "15s",
"failure_threshold": 3
}
},
"allocator": {
"metricsalloc": {
"allocate_by": [
"freespace"
]
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"tags": {
"metric_ttl": "5s",
"tags": {}
}
},
"observations": {
"metrics": {
"enable_stats": false,
"prometheus_endpoint": "/ip4/127.0.0.1/tcp/8888",
"reporting_interval": "2s"
},
"tracing": {
"enable_tracing": false,
"jaeger_agent_endpoint": "/ip4/0.0.0.0/udp/6831",
"sampling_prob": 0.3,
"service_name": "cluster-daemon"
}
},
"datastore": {
"badger": {
"gc_discard_ratio": 0.2,
"gc_interval": "15m0s",
"gc_sleep": "10s",
"badger_options": {
"dir": "",
"value_dir": "",
"sync_writes": true,
"table_loading_mode": 2,
"value_log_loading_mode": 0,
"num_versions_to_keep": 1,
"max_table_size": 16777216,
"level_size_multiplier": 10,
"max_levels": 7,
"value_threshold": 32,
"num_memtables": 5,
"num_level_zero_tables": 5,
"num_level_zero_tables_stall": 10,
"level_one_size": 268435456,
"value_log_file_size": 1073741823,
"value_log_max_entries": 1000000,
"num_compactors": 2,
"compact_l_0_on_close": false,
"read_only": false,
"truncate": true
}
}
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/config"
@ -17,6 +18,7 @@ import (
"github.com/ipfs/ipfs-cluster/datastore/leveldb"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/informer/tags"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
@ -33,8 +35,10 @@ type Configs struct {
Crdt *crdt.Config
Statelesstracker *stateless.Config
Pubsubmon *pubsubmon.Config
BalancedAlloc *balanced.Config
Diskinf *disk.Config
Numpininf *numpin.Config
Tagsinf *tags.Config
Metrics *observations.MetricsConfig
Tracing *observations.TracingConfig
Badger *badger.Config
@ -220,7 +224,10 @@ func (ch *ConfigHelper) init() {
Crdt: &crdt.Config{},
Statelesstracker: &stateless.Config{},
Pubsubmon: &pubsubmon.Config{},
BalancedAlloc: &balanced.Config{},
Diskinf: &disk.Config{},
Numpininf: &numpin.Config{},
Tagsinf: &tags.Config{},
Metrics: &observations.MetricsConfig{},
Tracing: &observations.TracingConfig{},
Badger: &badger.Config{},
@ -232,7 +239,10 @@ func (ch *ConfigHelper) init() {
man.RegisterComponent(config.IPFSConn, cfgs.Ipfshttp)
man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker)
man.RegisterComponent(config.Monitor, cfgs.Pubsubmon)
man.RegisterComponent(config.Allocator, cfgs.BalancedAlloc)
man.RegisterComponent(config.Informer, cfgs.Diskinf)
// man.RegisterComponent(config.Informer, cfgs.Numpininf)
man.RegisterComponent(config.Informer, cfgs.Tagsinf)
man.RegisterComponent(config.Observations, cfgs.Metrics)
man.RegisterComponent(config.Observations, cfgs.Tracing)

View File

@ -1,6 +1,7 @@
package ipfscluster
import (
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/config"
@ -126,9 +127,12 @@ var testingMonCfg = []byte(`{
"failure_threshold": 6
}`)
var testingAllocBalancedCfg = []byte(`{
"allocate_by": ["freespace"]
}`)
var testingDiskInfCfg = []byte(`{
"metric_ttl": "900ms",
"metric_type": "freespace"
"metric_ttl": "900ms"
}`)
var testingTracerCfg = []byte(`{
@ -138,8 +142,8 @@ var testingTracerCfg = []byte(`{
"service_name": "cluster-daemon"
}`)
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *balanced.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
identity.LoadJSON(testingIdentity)
clusterCfg.LoadJSON(testingClusterCfg)
apiCfg.LoadJSON(testingAPICfg)
@ -151,13 +155,14 @@ func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Confi
crdtCfg.LoadJSON(testingCrdtCfg)
statelesstrkrCfg.LoadJSON(testingTrackerCfg)
pubsubmonCfg.LoadJSON(testingMonCfg)
allocBalancedCfg.LoadJSON(testingAllocBalancedCfg)
diskInfCfg.LoadJSON(testingDiskInfCfg)
tracingCfg.LoadJSON(testingTracerCfg)
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg
}
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *balanced.Config, *disk.Config, *observations.TracingConfig) {
identity := &config.Identity{}
clusterCfg := &Config{}
apiCfg := rest.NewConfig()
@ -169,9 +174,10 @@ func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.
crdtCfg := &crdt.Config{}
statelessCfg := &stateless.Config{}
pubsubmonCfg := &pubsubmon.Config{}
allocBalancedCfg := &balanced.Config{}
diskInfCfg := &disk.Config{}
tracingCfg := &observations.TracingConfig{}
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg
}
// func TestConfigDefault(t *testing.T) {

View File

@ -18,17 +18,6 @@ const (
DefaultMetricType = MetricFreeSpace
)
// String returns a string representation for MetricType.
func (t MetricType) String() string {
switch t {
case MetricFreeSpace:
return "freespace"
case MetricRepoSize:
return "reposize"
}
return ""
}
// Config is used to initialize an Informer and customize
// the type and parameters of the metric it produces.
type Config struct {

View File

@ -20,11 +20,22 @@ type MetricType int
const (
// MetricFreeSpace provides the available space reported by IPFS
MetricFreeSpace = iota
MetricFreeSpace MetricType = iota
// MetricRepoSize provides the used space reported by IPFS
MetricRepoSize
)
// String returns a string representation for MetricType.
func (t MetricType) String() string {
switch t {
case MetricFreeSpace:
return "freespace"
case MetricRepoSize:
return "reposize"
}
return ""
}
var logger = logging.Logger("diskinfo")
// Informer is a simple object to implement the ipfscluster.Informer
@ -48,7 +59,7 @@ func NewInformer(cfg *Config) (*Informer, error) {
}, nil
}
// Name returns the user-facing name of this informer.
// Name returns the name of the metric issued by this informer.
func (disk *Informer) Name() string {
return disk.config.MetricType.String()
}
@ -74,9 +85,9 @@ func (disk *Informer) Shutdown(ctx context.Context) error {
return nil
}
// GetMetric returns the metric obtained by this
// Informer.
func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
// GetMetrics returns the metric obtained by this Informer. It must always
// return at least one metric.
func (disk *Informer) GetMetrics(ctx context.Context) []*api.Metric {
ctx, span := trace.StartSpan(ctx, "informer/disk/GetMetric")
defer span.End()
@ -85,10 +96,10 @@ func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
disk.mu.Unlock()
if rpcClient == nil {
return &api.Metric{
return []*api.Metric{&api.Metric{
Name: disk.Name(),
Valid: false,
}
}}
}
var repoStat api.IPFSRepoStat
@ -123,11 +134,13 @@ func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
}
m := &api.Metric{
Name: disk.Name(),
Value: fmt.Sprintf("%d", metric),
Valid: valid,
Name: disk.Name(),
Value: fmt.Sprintf("%d", metric),
Valid: valid,
Weight: int64(metric),
Partitionable: false,
}
m.SetTTL(disk.config.MetricTTL)
return m
return []*api.Metric{m}
}

View File

@ -28,6 +28,16 @@ func (mock *badRPCService) RepoStat(ctx context.Context, in struct{}, out *api.I
return errors.New("fake error")
}
// Returns the first metric
func getMetrics(t *testing.T, inf *Informer) *api.Metric {
t.Helper()
metrics := inf.GetMetrics(context.Background())
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
return metrics[0]
}
func Test(t *testing.T) {
ctx := context.Background()
cfg := &Config{}
@ -37,12 +47,12 @@ func Test(t *testing.T) {
t.Fatal(err)
}
defer inf.Shutdown(ctx)
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric(ctx)
m = getMetrics(t, inf)
if !m.Valid {
t.Error("metric should be valid")
}
@ -59,12 +69,12 @@ func TestFreeSpace(t *testing.T) {
t.Fatal(err)
}
defer inf.Shutdown(ctx)
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric(ctx)
m = getMetrics(t, inf)
if !m.Valid {
t.Error("metric should be valid")
}
@ -85,12 +95,12 @@ func TestRepoSize(t *testing.T) {
t.Fatal(err)
}
defer inf.Shutdown(ctx)
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric(ctx)
m = getMetrics(t, inf)
if !m.Valid {
t.Error("metric should be valid")
}
@ -110,7 +120,7 @@ func TestWithErrors(t *testing.T) {
}
defer inf.Shutdown(ctx)
inf.SetClient(badRPCClient(t))
m := inf.GetMetric(ctx)
m := getMetrics(t, inf)
if m.Valid {
t.Errorf("metric should be invalid")
}

View File

@ -56,17 +56,17 @@ func (npi *Informer) Name() string {
return MetricName
}
// GetMetric contacts the IPFSConnector component and
// requests the `pin ls` command. We return the number
// of pins in IPFS.
func (npi *Informer) GetMetric(ctx context.Context) *api.Metric {
// GetMetrics contacts the IPFSConnector component and requests the `pin ls`
// command. We return the number of pins in IPFS. It must always return at
// least one metric.
func (npi *Informer) GetMetrics(ctx context.Context) []*api.Metric {
ctx, span := trace.StartSpan(ctx, "informer/numpin/GetMetric")
defer span.End()
if npi.rpcClient == nil {
return &api.Metric{
return []*api.Metric{&api.Metric{
Valid: false,
}
}}
}
pinMap := make(map[string]api.IPFSPinStatus)
@ -85,11 +85,12 @@ func (npi *Informer) GetMetric(ctx context.Context) *api.Metric {
valid := err == nil
m := &api.Metric{
Name: MetricName,
Value: fmt.Sprintf("%d", len(pinMap)),
Valid: valid,
Name: MetricName,
Value: fmt.Sprintf("%d", len(pinMap)),
Valid: valid,
Partitionable: false,
}
m.SetTTL(npi.config.MetricTTL)
return m
return []*api.Metric{m}
}

View File

@ -37,12 +37,22 @@ func Test(t *testing.T) {
if err != nil {
t.Fatal(err)
}
m := inf.GetMetric(ctx)
metrics := inf.GetMetrics(ctx)
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
m := metrics[0]
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(mockRPCClient(t))
m = inf.GetMetric(ctx)
metrics = inf.GetMetrics(ctx)
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
m = metrics[0]
if !m.Valid {
t.Error("metric should be valid")
}

119
informer/tags/config.go Normal file
View File

@ -0,0 +1,119 @@
package tags
import (
"encoding/json"
"errors"
"time"
"github.com/ipfs/ipfs-cluster/config"
"github.com/kelseyhightower/envconfig"
)
const configKey = "tags"
const envConfigKey = "cluster_tags"
// Default values for tags Config
const (
DefaultMetricTTL = 30 * time.Second
)
// Default values for tags config
var (
DefaultTags = map[string]string{
"group": "default",
}
)
// Config is used to initialize an Informer and customize
// the type and parameters of the metric it produces.
type Config struct {
config.Saver
MetricTTL time.Duration
Tags map[string]string
}
type jsonConfig struct {
MetricTTL string `json:"metric_ttl"`
Tags map[string]string `json:"tags"`
}
// ConfigKey returns a human-friendly identifier for this type of Metric.
func (cfg *Config) ConfigKey() string {
return configKey
}
// Default initializes this Config with sensible values.
func (cfg *Config) Default() error {
cfg.MetricTTL = DefaultMetricTTL
cfg.Tags = DefaultTags
return nil
}
// ApplyEnvVars fills in any Config fields found
// as environment variables.
func (cfg *Config) ApplyEnvVars() error {
jcfg := cfg.toJSONConfig()
err := envconfig.Process(envConfigKey, jcfg)
if err != nil {
return err
}
return cfg.applyJSONConfig(jcfg)
}
// Validate checks that the fields of this Config have working values,
// at least in appearance.
func (cfg *Config) Validate() error {
if cfg.MetricTTL <= 0 {
return errors.New("tags.metric_ttl is invalid")
}
return nil
}
// LoadJSON reads the fields of this Config from a JSON byteslice as
// generated by ToJSON.
func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg)
if err != nil {
logger.Error("Error unmarshaling disk informer config")
return err
}
cfg.Default()
return cfg.applyJSONConfig(jcfg)
}
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
t, _ := time.ParseDuration(jcfg.MetricTTL)
cfg.MetricTTL = t
cfg.Tags = jcfg.Tags
return cfg.Validate()
}
// ToJSON generates a JSON-formatted human-friendly representation of this
// Config.
func (cfg *Config) ToJSON() (raw []byte, err error) {
jcfg := cfg.toJSONConfig()
raw, err = config.DefaultJSONMarshal(jcfg)
return
}
func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{
MetricTTL: cfg.MetricTTL.String(),
Tags: cfg.Tags,
}
}
// ToDisplayJSON returns JSON config as a string.
func (cfg *Config) ToDisplayJSON() ([]byte, error) {
return config.DisplayJSON(cfg.toJSONConfig())
}

View File

@ -0,0 +1,86 @@
package tags
import (
"encoding/json"
"os"
"testing"
"time"
)
var cfgJSON = []byte(`
{
"metric_ttl": "1s",
"tags": { "a": "b" }
}
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(cfgJSON)
if err != nil {
t.Fatal(err)
}
if cfg.Tags["a"] != "b" {
t.Fatal("tags not parsed")
}
j := &jsonConfig{}
json.Unmarshal(cfgJSON, j)
j.MetricTTL = "-10"
tst, _ := json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error decoding metric_ttl")
}
}
func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(cfgJSON)
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
}
cfg = &Config{}
err = cfg.LoadJSON(newjson)
if err != nil {
t.Fatal(err)
}
}
func TestDefault(t *testing.T) {
cfg := &Config{}
cfg.Default()
if cfg.Validate() != nil {
t.Fatal("error validating")
}
cfg.MetricTTL = 0
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
if cfg.Tags["group"] != "default" {
t.Fatal("Tags default not set")
}
}
func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_TAGS_METRICTTL", "22s")
cfg := &Config{}
cfg.ApplyEnvVars()
if cfg.MetricTTL != 22*time.Second {
t.Fatal("failed to override metric_ttl with env var")
}
os.Setenv("CLUSTER_TAGS_TAGS", "z:q,y:w")
cfg = &Config{}
cfg.ApplyEnvVars()
if cfg.Tags["z"] != "q" || cfg.Tags["y"] != "w" {
t.Fatal("could not override tags with env vars")
}
}

98
informer/tags/tags.go Normal file
View File

@ -0,0 +1,98 @@
// Package tags implements an ipfs-cluster informer publishes user-defined
// tags as metrics.
package tags
import (
"context"
"sync"
"github.com/ipfs/ipfs-cluster/api"
logging "github.com/ipfs/go-log/v2"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var logger = logging.Logger("tags")
// MetricName specifies the name of our metric
var MetricName = "tags"
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
config *Config // set when created, readonly
mu sync.Mutex // guards access to following fields
rpcClient *rpc.Client
}
// New returns an initialized informer using the given InformerConfig.
func New(cfg *Config) (*Informer, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
return &Informer{
config: cfg,
}, nil
}
// Name returns the name of this informer. Note the informer issues metrics
// with custom names.
func (tags *Informer) Name() string {
return MetricName
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (tags *Informer) SetClient(c *rpc.Client) {
tags.mu.Lock()
defer tags.mu.Unlock()
tags.rpcClient = c
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (tags *Informer) Shutdown(ctx context.Context) error {
tags.mu.Lock()
defer tags.mu.Unlock()
tags.rpcClient = nil
return nil
}
// GetMetrics returns one metric for each tag defined in the configuration.
// The metric name is set as "tags:<tag_name>". When no tags are defined,
// a single invalid metric is returned.
func (tags *Informer) GetMetrics(ctx context.Context) []*api.Metric {
// Note we could potentially extend the tag:value syntax to include manual weights
// ie: { "region": "us:100", ... }
// This would potentially allow to always give priority to peers of a certain group
if len(tags.config.Tags) == 0 {
logger.Debug("no tags defined in tags informer")
m := &api.Metric{
Name: "tag:none",
Value: "",
Valid: false,
Partitionable: true,
}
m.SetTTL(tags.config.MetricTTL)
return []*api.Metric{m}
}
metrics := make([]*api.Metric, 0, len(tags.config.Tags))
for n, v := range tags.config.Tags {
m := &api.Metric{
Name: "tag:" + n,
Value: v,
Valid: true,
Partitionable: true,
}
m.SetTTL(tags.config.MetricTTL)
metrics = append(metrics, m)
}
return metrics
}

View File

@ -0,0 +1,27 @@
package tags
import (
"context"
"testing"
)
func Test(t *testing.T) {
ctx := context.Background()
cfg := &Config{}
cfg.Default()
inf, err := New(cfg)
if err != nil {
t.Fatal(err)
}
defer inf.Shutdown(ctx)
m := inf.GetMetrics(ctx)
if len(m) != 1 || !m[0].Valid {
t.Error("metric should be valid")
}
inf.config.Tags["x"] = "y"
m = inf.GetMetrics(ctx)
if len(m) != 2 {
t.Error("there should be 2 metrics")
}
}

View File

@ -137,7 +137,9 @@ type PinTracker interface {
type Informer interface {
Component
Name() string
GetMetric(context.Context) *api.Metric
// GetMetrics returns the metrics obtained by this Informer. It must
// always return at least one metric.
GetMetrics(context.Context) []*api.Metric
}
// PinAllocator decides where to pin certain content. In order to make such
@ -152,7 +154,9 @@ type PinAllocator interface {
// which are currently pinning the content. The candidates map
// contains the metrics for all peers which are eligible for pinning
// the content.
Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]*api.Metric) ([]peer.ID, error)
Allocate(ctx context.Context, c cid.Cid, current, candidates, priority api.MetricsSet) ([]peer.ID, error)
// Metrics returns the list of metrics that the allocator needs.
Metrics() []string
}
// PeerMonitor is a component in charge of publishing a peer's metrics and
@ -170,8 +174,9 @@ type PeerMonitor interface {
// PublishMetric sends a metric to the rest of the peers.
// How to send it, and to who, is to be decided by the implementation.
PublishMetric(context.Context, *api.Metric) error
// LatestMetrics returns a map with the latest metrics of matching name
// for the current cluster peers.
// LatestMetrics returns a map with the latest metrics of matching
// name for the current cluster peers. The result should only contain
// one metric per peer at most.
LatestMetrics(ctx context.Context, name string) []*api.Metric
// MetricNames returns a list of metric names.
MetricNames(ctx context.Context) []string

View File

@ -17,7 +17,7 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/consensus/crdt"
@ -176,7 +176,7 @@ func createComponents(
peername := fmt.Sprintf("peer_%d", i)
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs()
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg := testingConfigs()
ident.ID = host.ID()
ident.PrivateKey = host.Peerstore().PrivKey(host.ID())
@ -211,7 +211,10 @@ func createComponents(
t.Fatal(err)
}
alloc := descendalloc.NewAllocator()
alloc, err := balanced.New(allocBalancedCfg)
if err != nil {
t.Fatal(err)
}
inf, err := disk.NewInformer(diskInfCfg)
if err != nil {
t.Fatal(err)

View File

@ -990,14 +990,13 @@ func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
return nil
}
var metrics []*api.Metric
err := ipfs.rpcClient.GoContext(
ctx,
"",
"Cluster",
"SendInformersMetrics",
struct{}{},
&metrics,
&struct{}{},
nil,
)
if err != nil {

View File

@ -20,8 +20,8 @@ var LoggingFacilities = map[string]string{
"raft": "INFO",
"crdt": "INFO",
"pintracker": "INFO",
"ascendalloc": "INFO",
"diskinfo": "INFO",
"tags": "INFO",
"apitypes": "INFO",
"config": "INFO",
"shardingdags": "INFO",
@ -29,6 +29,7 @@ var LoggingFacilities = map[string]string{
"adder": "INFO",
"optracker": "INFO",
"pstoremgr": "INFO",
"allocator": "INFO",
}
// LoggingFacilitiesExtra provides logging identifiers

View File

@ -5,6 +5,7 @@ package pubsubmon
import (
"bytes"
"context"
"time"
"sync"
@ -157,11 +158,7 @@ func (mon *Monitor) logFromPubsub() {
}
}
logger.Debugf(
"received pubsub metric '%s' from '%s'",
metric.Name,
metric.Peer,
)
debug("recieved", &metric)
err = mon.LogMetric(ctx, &metric)
if err != nil {
@ -208,7 +205,7 @@ func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error {
defer span.End()
mon.metrics.Add(m)
logger.Debugf("pubsub mon logged '%s' metric from '%s'. Expires on %d", m.Name, m.Peer, m.Expire)
debug("logged", m)
return nil
}
@ -231,11 +228,7 @@ func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error {
return err
}
logger.Debugf(
"publishing metric %s to pubsub. Expires: %d",
m.Name,
m.Expire,
)
debug("publish", m)
err = mon.topic.Publish(ctx, b.Bytes())
if err != nil {
@ -281,3 +274,14 @@ func (mon *Monitor) MetricNames(ctx context.Context) []string {
return mon.metrics.MetricNames()
}
func debug(event string, m *api.Metric) {
logger.Debugf(
"%s metric: '%s' - '%s' - '%s' - '%s'",
event,
m.Peer,
m.Name,
m.Value,
time.Unix(0, m.Expire),
)
}

View File

@ -406,24 +406,17 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out *
}
// SendInformerMetric runs Cluster.sendInformerMetric().
func (rpcapi *ClusterRPCAPI) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
m, err := rpcapi.c.sendInformerMetric(ctx, rpcapi.c.informers[0])
func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error {
_, err := rpcapi.c.sendInformerMetrics(ctx, rpcapi.c.informers[0])
if err != nil {
return err
}
*out = *m
return nil
}
// SendInformersMetrics runs Cluster.sendInformerMetric() on all informers.
func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct{}, out *[]*api.Metric) error {
var metrics []*api.Metric
metrics, err := rpcapi.c.sendInformersMetrics(ctx)
if err != nil {
return err
}
*out = metrics
return nil
func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct{}, out *struct{}) error {
return rpcapi.c.sendInformersMetrics(ctx)
}
// Alerts runs Cluster.Alerts().

View File

@ -24,7 +24,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.RecoverLocal": RPCTrusted,
"Cluster.RepoGC": RPCClosed,
"Cluster.RepoGCLocal": RPCTrusted,
"Cluster.SendInformerMetric": RPCClosed,
"Cluster.SendInformerMetrics": RPCClosed,
"Cluster.SendInformersMetrics": RPCClosed,
"Cluster.Alerts": RPCClosed,
"Cluster.Status": RPCClosed,

View File

@ -31,6 +31,9 @@ var (
PeerID4, _ = peer.Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc")
PeerID5, _ = peer.Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg")
PeerID6, _ = peer.Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
PeerID7, _ = peer.Decode("12D3KooWGHTKzeT4KaLGLrbKKyT8zKrBPXAUBRzCAN6ZMDMo4M6M")
PeerID8, _ = peer.Decode("12D3KooWFBFCDQzAkQSwPZLV883pKdsmb6urQ3sMjfJHUxn5GCVv")
PeerID9, _ = peer.Decode("12D3KooWKuJ8LPTyHbyX4nt4C7uWmUobzFsiceTVoFw7HpmoNakM")
PeerName1 = "TestPeer1"
PeerName2 = "TestPeer2"

View File

@ -355,7 +355,7 @@ func (mock *mockCluster) RepoGCLocal(ctx context.Context, in struct{}, out *api.
return nil
}
func (mock *mockCluster) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
func (mock *mockCluster) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error {
return nil
}