service/follow: Enable new metrics allocator

This commit is contained in:
Hector Sanjuan 2021-09-13 14:14:50 +02:00
parent 1c0abde8a5
commit cf4fb74993
7 changed files with 70 additions and 31 deletions

View File

@ -66,14 +66,14 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pi
}
mSet := make(api.MetricsSet)
for _, informer := range c.informers {
metricType := informer.Name()
mSet[metricType] = c.monitor.LatestMetrics(ctx, metricType)
metrics := c.allocator.Metrics()
for _, metricName := range metrics {
mSet[metricName] = c.monitor.LatestMetrics(ctx, metricName)
}
curSet, curPeers, candSet, candPeers, prioSet, prioPeers := filterMetrics(
mSet,
len(c.informers),
len(metrics),
currentAllocs,
priorityList,
blacklist,
@ -108,7 +108,7 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pi
//
// For a metric/peer to be included in a group, it is necessary that it has
// metrics for all informers.
func filterMetrics(mSet api.MetricsSet, numInformers int, currentAllocs, priorityList, blacklist []peer.ID) (
func filterMetrics(mSet api.MetricsSet, numMetrics int, currentAllocs, priorityList, blacklist []peer.ID) (
curSet api.MetricsSet,
curPeers []peer.ID,
candSet api.MetricsSet,
@ -144,7 +144,7 @@ func filterMetrics(mSet api.MetricsSet, numInformers int, currentAllocs, priorit
// Put the metrics in their sets if peers have metrics for all informers
// Record peers
for p, metrics := range peersMap {
if len(metrics) == numInformers {
if len(metrics) == numMetrics {
for _, m := range metrics {
mSet[m.Name] = append(mSet[m.Name], m)
}
@ -166,14 +166,14 @@ func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID
logger.Errorf("Not enough candidates to allocate %s:", hash)
logger.Errorf(" Needed: %d", needed)
logger.Errorf(" Wanted: %d", wanted)
logger.Errorf(" Valid candidates: %d:", len(candidatesValid))
logger.Errorf(" Available candidates: %d:", len(candidatesValid))
for _, c := range candidatesValid {
logger.Errorf(" - %s", c.Pretty())
}
errorMsg := "not enough peers to allocate CID. "
errorMsg += fmt.Sprintf("Needed at least: %d. ", needed)
errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted)
errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid))
errorMsg += fmt.Sprintf("Available candidates: %d. ", len(candidatesValid))
errorMsg += "See logs for more info."
return errors.New(errorMsg)
}
@ -189,12 +189,13 @@ func (c *Cluster) obtainAllocations(
defer span.End()
nCurrentValid := len(currentPeers)
nCandidatesValid := len(candidatePeers) + len(priorityPeers)
nAvailableValid := len(candidatePeers) + len(priorityPeers)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
logger.Debugf("obtainAllocations: current: %d", nCurrentValid)
logger.Debugf("obtainAllocations: candidates: %d", nCandidatesValid)
logger.Debugf("obtainAllocations: available: %d", nAvailableValid)
logger.Debugf("obtainAllocations: candidates: %d", len(candidatePeers))
logger.Debugf("obtainAllocations: priority: %d", len(priorityPeers))
logger.Debugf("obtainAllocations: Needed: %d", needed)
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
@ -213,7 +214,7 @@ func (c *Cluster) obtainAllocations(
return nil, nil
}
if nCandidatesValid < needed { // not enough candidates
if nAvailableValid < needed { // not enough candidates
return nil, allocationError(hash, needed, wanted, append(priorityPeers, candidatePeers...))
}

View File

@ -20,38 +20,38 @@ import (
"go.opencensus.io/trace"
)
// Metrics is an allocator that partitions metrics and orders
// Allocator is an allocator that partitions metrics and orders
// the final least of allocation by selecting for each partition.
type Metrics struct {
type Allocator struct {
config *Config
rpcClient *rpc.Client
}
// New returns an initialized Allocator.
func New(cfg *Config) (*Metrics, error) {
func New(cfg *Config) (*Allocator, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
return &Metrics{
return &Allocator{
config: cfg,
}, nil
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (m *Metrics) SetClient(c *rpc.Client) {
m.rpcClient = c
func (a *Allocator) SetClient(c *rpc.Client) {
a.rpcClient = c
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (m *Metrics) Shutdown(ctx context.Context) error {
func (a *Allocator) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "allocator/metrics/Shutdown")
defer span.End()
m.rpcClient = nil
a.rpcClient = nil
return nil
}
@ -209,8 +209,7 @@ func partitionValues(sortedMetrics []*api.Metric, inf informer) []*partition {
// - It repeats the process until there is no more buckets to sort.
// - Finally, it returns the first peer of the first
// - Third, based on the AllocateBy order, it select the first metric
func (m *Metrics) Allocate(
func (a *Allocator) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority api.MetricsSet,
@ -221,7 +220,7 @@ func (m *Metrics) Allocate(
if arg == nil {
continue
}
for _, by := range m.config.AllocateBy {
for _, by := range a.config.AllocateBy {
sorter := informers[by].sorter
if sorter == nil {
return nil, fmt.Errorf("allocate_by contains an unknown metric name: %s", by)
@ -239,8 +238,8 @@ func (m *Metrics) Allocate(
//
// Otherwise, the sorting might be funny.
candidatePartition := partitionMetrics(candidates, m.config.AllocateBy)
priorityPartition := partitionMetrics(priority, m.config.AllocateBy)
candidatePartition := partitionMetrics(candidates, a.config.AllocateBy)
priorityPartition := partitionMetrics(priority, a.config.AllocateBy)
//fmt.Println("---")
//printPartition(candidatePartition)
@ -251,6 +250,12 @@ func (m *Metrics) Allocate(
return append(first, last...), nil
}
// Metrics returns the names of the metrics that have been registered
// with this allocator.
func (a *Allocator) Metrics() []string {
return a.config.AllocateBy
}
// func printPartition(p *partitionedMetric) {
// fmt.Println(p.metricName)
// for _, p := range p.partitions {

View File

@ -11,7 +11,7 @@ import (
"github.com/ipfs/go-cid"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
@ -342,7 +342,10 @@ func runCmd(c *cli.Context) error {
if err != nil {
return cli.Exit(errors.Wrap(err, "creating disk informer"), 1)
}
alloc := descendalloc.NewAllocator()
alloc, err := metrics.New(cfgs.MetricsAlloc)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating metrics allocator"), 1)
}
crdtcons, err := crdt.New(
host,

View File

@ -6,7 +6,7 @@ import (
"time"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
@ -14,6 +14,7 @@ import (
"github.com/ipfs/ipfs-cluster/consensus/crdt"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/tags"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
@ -155,9 +156,20 @@ func createCluster(
connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp)
checkErr("creating IPFS Connector component", err)
informer, err := disk.NewInformer(cfgs.Diskinf)
var informers []ipfscluster.Informer
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Diskinf.ConfigKey()) {
diskinf, err := disk.NewInformer(cfgs.Diskinf)
checkErr("creating disk informer", err)
alloc := descendalloc.NewAllocator()
informers = append(informers, diskinf)
}
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Tagsinf.ConfigKey()) {
tagsinf, err := tags.New(cfgs.Tagsinf)
checkErr("creating numpin informer", err)
informers = append(informers, tagsinf)
}
alloc, err := metrics.New(cfgs.MetricsAlloc)
checkErr("creating allocator", err)
ipfscluster.ReadyTimeout = cfgs.Raft.WaitForLeaderTimeout + 5*time.Second
@ -206,7 +218,7 @@ func createCluster(
tracker,
mon,
alloc,
[]ipfscluster.Informer{informer},
informers,
tracer,
)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/config"
@ -17,6 +18,7 @@ import (
"github.com/ipfs/ipfs-cluster/datastore/leveldb"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/informer/tags"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
@ -33,8 +35,10 @@ type Configs struct {
Crdt *crdt.Config
Statelesstracker *stateless.Config
Pubsubmon *pubsubmon.Config
MetricsAlloc *metrics.Config
Diskinf *disk.Config
Numpininf *numpin.Config
Tagsinf *tags.Config
Metrics *observations.MetricsConfig
Tracing *observations.TracingConfig
Badger *badger.Config
@ -220,7 +224,10 @@ func (ch *ConfigHelper) init() {
Crdt: &crdt.Config{},
Statelesstracker: &stateless.Config{},
Pubsubmon: &pubsubmon.Config{},
MetricsAlloc: &metrics.Config{},
Diskinf: &disk.Config{},
Numpininf: &numpin.Config{},
Tagsinf: &tags.Config{},
Metrics: &observations.MetricsConfig{},
Tracing: &observations.TracingConfig{},
Badger: &badger.Config{},
@ -232,7 +239,10 @@ func (ch *ConfigHelper) init() {
man.RegisterComponent(config.IPFSConn, cfgs.Ipfshttp)
man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker)
man.RegisterComponent(config.Monitor, cfgs.Pubsubmon)
man.RegisterComponent(config.Allocator, cfgs.MetricsAlloc)
man.RegisterComponent(config.Informer, cfgs.Diskinf)
// man.RegisterComponent(config.Informer, cfgs.Numpininf)
man.RegisterComponent(config.Informer, cfgs.Tagsinf)
man.RegisterComponent(config.Observations, cfgs.Metrics)
man.RegisterComponent(config.Observations, cfgs.Tracing)

View File

@ -6,6 +6,8 @@ import (
"context"
"sync"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
logging "github.com/ipfs/go-log/v2"
@ -35,6 +37,10 @@ func New(cfg *Config) (*Informer, error) {
return nil, err
}
for k := range cfg.Tags {
metrics.RegisterInformer("tag:"+k, sorter.SortText, true)
}
return &Informer{
config: cfg,
}, nil

View File

@ -153,6 +153,8 @@ type PinAllocator interface {
// contains the metrics for all peers which are eligible for pinning
// the content.
Allocate(ctx context.Context, c cid.Cid, current, candidates, priority api.MetricsSet) ([]peer.ID, error)
// Metrics returns the list of metrics that the allocator needs.
Metrics() []string
}
// PeerMonitor is a component in charge of publishing a peer's metrics and