Feat: add a new "pinqueue" informer component

This new component broadcasts metrics about the current size of the pinqueue,
which can in turn be used to inform allocations.

It has a weight_bucket_size option that serves to divide the actual size by a
given factor. This allows considering peers with similar queue sizes to have
the same weight.

Additionally, some changes have been made to the balanced allocator so that a
combination of tags, pinqueue sizes and free-spaces can be used. When
allocating by [<tag>, pinqueue, freespace], the allocator will prioritize
choosing peers with the smallest pin queue weight first, and of those with the
same weight, it will allocate based on freespace.
This commit is contained in:
Hector Sanjuan 2022-05-05 11:19:57 +02:00
parent 8dc18cd3d7
commit 4daece2b98
16 changed files with 488 additions and 53 deletions

View File

@ -62,6 +62,7 @@ type partitionedMetric struct {
type partition struct {
value string
weight int64
aggregatedWeight int64
peers map[peer.ID]bool // the bool tracks whether the peer has been picked already out of the partition when doing the final sort.
sub *partitionedMetric // all peers in sub-partitions will have the same value for this metric
}
@ -80,10 +81,20 @@ func partitionMetrics(set api.MetricsSet, by []string) *partitionedMetric {
lessF := func(i, j int) bool {
wi := pnedMetric.partitions[i].weight
wj := pnedMetric.partitions[j].weight
// Strict order
// if weight is equal, sort by aggregated weight of
// all sub-partitions.
if wi == wj {
awi := pnedMetric.partitions[i].aggregatedWeight
awj := pnedMetric.partitions[j].aggregatedWeight
// If subpartitions weight the same, do strict order
// based on value string
if awi == awj {
return pnedMetric.partitions[i].value < pnedMetric.partitions[j].value
}
return awj < awi
}
// Descending!
return wj < wi
}
@ -109,9 +120,10 @@ func partitionMetrics(set api.MetricsSet, by []string) *partitionedMetric {
}
partition.sub = partitionMetrics(filteredSet, by[1:])
// Add the weight of our subpartitions
// Add the aggregated weight of the subpartitions
for _, subp := range partition.sub.partitions {
partition.weight += subp.weight
partition.aggregatedWeight += subp.aggregatedWeight
}
}
sort.Slice(pnedMetric.partitions, lessF)
@ -144,10 +156,16 @@ func partitionValues(metrics []api.Metric) []*partition {
// The informers must set the Partitionable field accordingly
// when two metrics with the same value must be grouped in the
// same partition.
//
// Note: aggregatedWeight is the same as weight here (sum of
// weight of all metrics in partitions), and gets updated
// later in partitionMetrics with the aggregated weight of
// sub-partitions.
if !m.Partitionable {
partitions = append(partitions, &partition{
value: m.Value,
weight: m.GetWeight(),
aggregatedWeight: m.GetWeight(),
peers: map[peer.ID]bool{
m.Peer: false,
},
@ -159,10 +177,12 @@ func partitionValues(metrics []api.Metric) []*partition {
if p, ok := partitionsByValue[m.Value]; ok {
p.peers[m.Peer] = false
p.weight += m.GetWeight()
p.aggregatedWeight += m.GetWeight()
} else {
partitionsByValue[m.Value] = &partition{
value: m.Value,
weight: m.GetWeight(),
aggregatedWeight: m.GetWeight(),
peers: map[peer.ID]bool{
m.Peer: false,
},
@ -270,6 +290,7 @@ func (a *Allocator) Allocate(
priorityPartition := partitionMetrics(priority, a.config.AllocateBy)
logger.Debugf("Balanced allocator partitions:\n%s\n", printPartition(candidatePartition, 0))
//fmt.Println(printPartition(candidatePartition, 0))
first := priorityPartition.sortedPeers()
last := candidatePartition.sortedPeers()

View File

@ -27,6 +27,7 @@ func TestAllocate(t *testing.T) {
AllocateBy: []string{
"region",
"az",
"pinqueue",
"freespace",
},
})
@ -62,12 +63,23 @@ func TestAllocate(t *testing.T) {
makeMetric("az", "au1", 0, test.PeerID6, true),
makeMetric("az", "au1", 0, test.PeerID7, true),
},
"pinqueue": []api.Metric{
makeMetric("pinqueue", "100", 0, test.PeerID1, true),
makeMetric("pinqueue", "200", 0, test.PeerID2, true),
makeMetric("pinqueue", "100", 0, test.PeerID3, true),
makeMetric("pinqueue", "200", 0, test.PeerID4, true),
makeMetric("pinqueue", "300", 0, test.PeerID5, true),
makeMetric("pinqueue", "100", 0, test.PeerID6, true),
makeMetric("pinqueue", "1000", -1, test.PeerID7, true),
},
"freespace": []api.Metric{
makeMetric("freespace", "100", 100, test.PeerID1, false),
makeMetric("freespace", "500", 500, test.PeerID2, false),
makeMetric("freespace", "200", 0, test.PeerID3, false), // weight to 0 to test GetWeight() compat
makeMetric("freespace", "400", 0, test.PeerID4, false), // weight to 0 to test GetWeight() compat
makeMetric("freespace", "200", 200, test.PeerID3, false),
makeMetric("freespace", "400", 400, test.PeerID4, false),
makeMetric("freespace", "10", 10, test.PeerID5, false),
makeMetric("freespace", "50", 50, test.PeerID6, false),
@ -77,15 +89,15 @@ func TestAllocate(t *testing.T) {
},
}
// Regions weights: a-us (pids 1,2): 600. b-eu (pids 3,4,5): 610. c-au (pids 6,7): 650
// Az weights: us1: 100. us2: 500. eu1: 600. eu2: 10. au1: 650
// Regions weights: a-us (pids 1,2): 600. b-eu (pids 3,4,5): 610. c-au (pids 6,7): 649
// Az weights: us1: 100. us2: 500. eu1: 600. eu2: 10. au1: 649
// Based on the algorithm it should choose:
//
// - c-au (most-weight)->au1->pid7
// - c-au (most-weight)->au1->pinqueue(0)->pid6
// - b-eu->eu1->pid4
// - a-us->us2->pid2
// - <repeat regions>
// - c-au->au1 (nowhere else to choose)->pid6 (region exausted)
// - c-au->au1 (nowhere else to choose)->pid7 (region exausted)
// - b-eu->eu2 (already had in eu1)->pid5
// - a-us->us1 (already had in us2)->pid1
// - <repeat regions>
@ -109,7 +121,7 @@ func TestAllocate(t *testing.T) {
t.Logf("%d - %s", i, p)
switch i {
case 0:
if p != test.PeerID7 {
if p != test.PeerID6 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 1:
@ -121,7 +133,7 @@ func TestAllocate(t *testing.T) {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 3:
if p != test.PeerID6 {
if p != test.PeerID7 {
t.Errorf("wrong id in pos %d: %s", i, p)
}
case 4:

View File

@ -1355,21 +1355,12 @@ func (m Metric) Discard() bool {
return !m.Valid || m.Expired()
}
// GetWeight returns the weight of the metric. When it is 0,
// it tries to parse the Value and use it as weight.
// GetWeight returns the weight of the metric.
// This is for compatiblity.
func (m Metric) GetWeight() int64 {
if m.Weight != 0 {
return m.Weight
}
val, err := strconv.ParseInt(m.Value, 10, 64)
if err != nil {
return 0
}
return val
}
// MetricSlice is a sortable Metric array.
type MetricSlice []Metric

View File

@ -337,7 +337,7 @@ func runCmd(c *cli.Context) error {
return cli.Exit(errors.Wrap(err, "creating IPFS Connector component"), 1)
}
informer, err := disk.NewInformer(cfgs.Diskinf)
informer, err := disk.NewInformer(cfgs.DiskInf)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating disk informer"), 1)
}

View File

@ -15,6 +15,7 @@ import (
"github.com/ipfs/ipfs-cluster/consensus/crdt"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/pinqueue"
"github.com/ipfs/ipfs-cluster/informer/tags"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
@ -171,15 +172,21 @@ func createCluster(
checkErr("creating IPFS Connector component", err)
var informers []ipfscluster.Informer
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Diskinf.ConfigKey()) {
diskinf, err := disk.NewInformer(cfgs.Diskinf)
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.DiskInf.ConfigKey()) {
diskInf, err := disk.NewInformer(cfgs.DiskInf)
checkErr("creating disk informer", err)
informers = append(informers, diskinf)
informers = append(informers, diskInf)
}
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Tagsinf.ConfigKey()) {
tagsinf, err := tags.New(cfgs.Tagsinf)
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.TagsInf.ConfigKey()) {
tagsInf, err := tags.New(cfgs.TagsInf)
checkErr("creating numpin informer", err)
informers = append(informers, tagsinf)
informers = append(informers, tagsInf)
}
if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.PinQueueInf.ConfigKey()) {
pinQueueInf, err := pinqueue.New(cfgs.PinQueueInf)
checkErr("creating pinqueue informer", err)
informers = append(informers, pinQueueInf)
}
// For legacy compatibility we need to make the allocator

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/ipfs-cluster/datastore/leveldb"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/informer/pinqueue"
"github.com/ipfs/ipfs-cluster/informer/tags"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
@ -38,9 +39,10 @@ type Configs struct {
Statelesstracker *stateless.Config
Pubsubmon *pubsubmon.Config
BalancedAlloc *balanced.Config
Diskinf *disk.Config
Numpininf *numpin.Config
Tagsinf *tags.Config
DiskInf *disk.Config
NumpinInf *numpin.Config
TagsInf *tags.Config
PinQueueInf *pinqueue.Config
Metrics *observations.MetricsConfig
Tracing *observations.TracingConfig
Badger *badger.Config
@ -228,9 +230,10 @@ func (ch *ConfigHelper) init() {
Statelesstracker: &stateless.Config{},
Pubsubmon: &pubsubmon.Config{},
BalancedAlloc: &balanced.Config{},
Diskinf: &disk.Config{},
Numpininf: &numpin.Config{},
Tagsinf: &tags.Config{},
DiskInf: &disk.Config{},
NumpinInf: &numpin.Config{},
TagsInf: &tags.Config{},
PinQueueInf: &pinqueue.Config{},
Metrics: &observations.MetricsConfig{},
Tracing: &observations.TracingConfig{},
Badger: &badger.Config{},
@ -244,9 +247,10 @@ func (ch *ConfigHelper) init() {
man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker)
man.RegisterComponent(config.Monitor, cfgs.Pubsubmon)
man.RegisterComponent(config.Allocator, cfgs.BalancedAlloc)
man.RegisterComponent(config.Informer, cfgs.Diskinf)
man.RegisterComponent(config.Informer, cfgs.DiskInf)
// man.RegisterComponent(config.Informer, cfgs.Numpininf)
man.RegisterComponent(config.Informer, cfgs.Tagsinf)
man.RegisterComponent(config.Informer, cfgs.TagsInf)
man.RegisterComponent(config.Informer, cfgs.PinQueueInf)
man.RegisterComponent(config.Observations, cfgs.Metrics)
man.RegisterComponent(config.Observations, cfgs.Tracing)

111
informer/pinqueue/config.go Normal file
View File

@ -0,0 +1,111 @@
package pinqueue
import (
"encoding/json"
"errors"
"time"
"github.com/ipfs/ipfs-cluster/config"
"github.com/kelseyhightower/envconfig"
)
const configKey = "pinqueue"
const envConfigKey = "cluster_pinqueue"
// These are the default values for a Config.
const (
DefaultMetricTTL = 30 * time.Second
DefaultWeightBucketSize = 100000 // 100k pins
)
// Config allows to initialize an Informer.
type Config struct {
config.Saver
MetricTTL time.Duration
WeightBucketSize int
}
type jsonConfig struct {
MetricTTL string `json:"metric_ttl"`
WeightBucketSize int `json:"weight_bucket_size"`
}
// ConfigKey returns a human-friendly identifier for this
// Config's type.
func (cfg *Config) ConfigKey() string {
return configKey
}
// Default initializes this Config with sensible values.
func (cfg *Config) Default() error {
cfg.MetricTTL = DefaultMetricTTL
cfg.WeightBucketSize = DefaultWeightBucketSize
return nil
}
// ApplyEnvVars fills in any Config fields found
// as environment variables.
func (cfg *Config) ApplyEnvVars() error {
jcfg := cfg.toJSONConfig()
err := envconfig.Process(envConfigKey, jcfg)
if err != nil {
return err
}
return cfg.applyJSONConfig(jcfg)
}
// Validate checks that the fields of this configuration have
// sensible values.
func (cfg *Config) Validate() error {
if cfg.MetricTTL <= 0 {
return errors.New("pinqueue.metric_ttl is invalid")
}
if cfg.WeightBucketSize < 0 {
return errors.New("pinqueue.WeightBucketSize is invalid")
}
return nil
}
// LoadJSON parses a raw JSON byte-slice as generated by ToJSON().
func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg)
if err != nil {
return err
}
cfg.Default()
return cfg.applyJSONConfig(jcfg)
}
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
t, _ := time.ParseDuration(jcfg.MetricTTL)
cfg.MetricTTL = t
cfg.WeightBucketSize = jcfg.WeightBucketSize
return cfg.Validate()
}
// ToJSON generates a human-friendly JSON representation of this Config.
func (cfg *Config) ToJSON() ([]byte, error) {
jcfg := cfg.toJSONConfig()
return config.DefaultJSONMarshal(jcfg)
}
func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{
MetricTTL: cfg.MetricTTL.String(),
WeightBucketSize: cfg.WeightBucketSize,
}
}
// ToDisplayJSON returns JSON config as a string.
func (cfg *Config) ToDisplayJSON() ([]byte, error) {
return config.DisplayJSON(cfg.toJSONConfig())
}

View File

@ -0,0 +1,76 @@
package pinqueue
import (
"encoding/json"
"os"
"testing"
"time"
)
var cfgJSON = []byte(`
{
"metric_ttl": "1s"
}
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(cfgJSON)
if err != nil {
t.Fatal(err)
}
j := &jsonConfig{}
json.Unmarshal(cfgJSON, j)
j.MetricTTL = "-10"
tst, _ := json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error decoding metric_ttl")
}
}
func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(cfgJSON)
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
}
cfg = &Config{}
err = cfg.LoadJSON(newjson)
if err != nil {
t.Fatal(err)
}
}
func TestDefault(t *testing.T) {
cfg := &Config{}
cfg.Default()
if cfg.Validate() != nil {
t.Fatal("error validating")
}
cfg.MetricTTL = 0
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.WeightBucketSize = -2
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}
func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_PINQUEUE_METRICTTL", "22s")
cfg := &Config{}
cfg.ApplyEnvVars()
if cfg.MetricTTL != 22*time.Second {
t.Fatal("failed to override metric_ttl with env var")
}
}

View File

@ -0,0 +1,110 @@
// Package pinqueue implements an ipfs-cluster informer which issues the
// current size of the pinning queue.
package pinqueue
import (
"context"
"fmt"
"sync"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/libp2p/go-libp2p-gorpc"
"go.opencensus.io/trace"
)
// MetricName specifies the name of our metric
var MetricName = "pinqueue"
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces
type Informer struct {
config *Config
mu sync.Mutex
rpcClient *rpc.Client
}
// New returns an initialized Informer.
func New(cfg *Config) (*Informer, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
return &Informer{
config: cfg,
}, nil
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (inf *Informer) SetClient(c *rpc.Client) {
inf.mu.Lock()
inf.rpcClient = c
inf.mu.Unlock()
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (inf *Informer) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "informer/numpin/Shutdown")
defer span.End()
inf.mu.Lock()
inf.rpcClient = nil
inf.mu.Unlock()
return nil
}
// Name returns the name of this informer
func (inf *Informer) Name() string {
return MetricName
}
// GetMetrics contacts the Pintracker component and requests the number of
// queued items for pinning.
func (inf *Informer) GetMetrics(ctx context.Context) []api.Metric {
ctx, span := trace.StartSpan(ctx, "informer/pinqueue/GetMetric")
defer span.End()
inf.mu.Lock()
rpcClient := inf.rpcClient
inf.mu.Unlock()
if rpcClient == nil {
return []api.Metric{
{
Valid: false,
},
}
}
var queued int64
err := rpcClient.CallContext(
ctx,
"",
"PinTracker",
"PinQueueSize",
struct{}{},
&queued,
)
valid := err == nil
weight := -queued // smaller pin queues have more priority
if div := inf.config.WeightBucketSize; div > 0 {
weight = weight / int64(div)
}
m := api.Metric{
Name: MetricName,
Value: fmt.Sprintf("%d", queued),
Valid: valid,
Partitionable: false,
Weight: weight,
}
m.SetTTL(inf.config.MetricTTL)
return []api.Metric{m}
}

View File

@ -0,0 +1,79 @@
package pinqueue
import (
"context"
"testing"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
type mockService struct{}
func (mock *mockService) PinQueueSize(ctx context.Context, in struct{}, out *int64) error {
*out = 42
return nil
}
func mockRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("PinTracker", &mockService{})
if err != nil {
t.Fatal(err)
}
return c
}
func Test(t *testing.T) {
ctx := context.Background()
cfg := &Config{}
cfg.Default()
cfg.WeightBucketSize = 0
inf, err := New(cfg)
if err != nil {
t.Fatal(err)
}
metrics := inf.GetMetrics(ctx)
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
m := metrics[0]
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(mockRPCClient(t))
metrics = inf.GetMetrics(ctx)
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
m = metrics[0]
if !m.Valid {
t.Error("metric should be valid")
}
if m.Value != "42" {
t.Error("bad metric value", m.Value)
}
if m.Partitionable {
t.Error("should not be a partitionable metric")
}
if m.Weight != -42 {
t.Error("weight should be -42")
}
cfg.WeightBucketSize = 5
inf, err = New(cfg)
if err != nil {
t.Fatal(err)
}
inf.SetClient(mockRPCClient(t))
metrics = inf.GetMetrics(ctx)
if len(metrics) != 1 {
t.Fatal("expected 1 metric")
}
m = metrics[0]
if m.Weight != -8 {
t.Error("weight should be -8, not", m.Weight)
}
}

View File

@ -128,6 +128,8 @@ type PinTracker interface {
RecoverAll(context.Context, chan<- api.PinInfo) error
// Recover retriggers a Pin/Unpin operation in a Cids with error status.
Recover(context.Context, api.Cid) (api.PinInfo, error)
// PinQueueSize returns the current size of the pinning queue.
PinQueueSize(context.Context) (int64, error)
}
// Informer provides Metric information from a peer. The metrics produced by

View File

@ -368,3 +368,8 @@ func (opt *OperationTracker) recordMetric(op *Operation, val int64) {
}
}
}
// PinQueueSize returns the current number of items queued to pin.
func (opt *OperationTracker) PinQueueSize() int64 {
return atomic.LoadInt64(&opt.pinQueuedCount)
}

View File

@ -658,6 +658,11 @@ func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, error
return out, nil
}
// PinQueueSize returns the current size of the pinning queue.
func (spt *Tracker) PinQueueSize(ctx context.Context) (int64, error) {
return spt.optracker.PinQueueSize(), nil
}
// func (spt *Tracker) getErrorsAll(ctx context.Context) []api.PinInfo {
// return spt.optracker.Filter(ctx, optracker.PhaseError)
// }

View File

@ -498,6 +498,13 @@ func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in api.Cid, out *ap
return err
}
// PinQueueSize runs PinTracker.PinQueueSize().
func (rpcapi *PinTrackerRPCAPI) PinQueueSize(ctx context.Context, in struct{}, out *int64) error {
size, err := rpcapi.tracker.PinQueueSize(ctx)
*out = size
return err
}
/*
IPFS Connector component methods
*/

View File

@ -39,6 +39,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.Version": RPCOpen,
// PinTracker methods
"PinTracker.PinQueueSize": RPCClosed,
"PinTracker.Recover": RPCTrusted, // Called in broadcast from Recover()
"PinTracker.RecoverAll": RPCClosed, // Broadcast in RecoverAll unimplemented
"PinTracker.Status": RPCTrusted,
@ -69,4 +70,3 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"PeerMonitor.LatestMetrics": RPCClosed,
"PeerMonitor.MetricNames": RPCClosed,
}

View File

@ -502,6 +502,11 @@ func (mock *mockPinTracker) Recover(ctx context.Context, in api.Cid, out *api.Pi
return nil
}
func (mock *mockPinTracker) PinQueueSize(ctx context.Context, in struct{}, out *int64) error {
*out = 10
return nil
}
/* PeerMonitor methods */
// LatestMetrics runs PeerMonitor.LatestMetrics().