From 4daece2b98b6b7456fefecc2ae67f7bee3e4500e Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 5 May 2022 11:19:57 +0200 Subject: [PATCH] Feat: add a new "pinqueue" informer component This new component broadcasts metrics about the current size of the pinqueue, which can in turn be used to inform allocations. It has a weight_bucket_size option that serves to divide the actual size by a given factor. This allows considering peers with similar queue sizes to have the same weight. Additionally, some changes have been made to the balanced allocator so that a combination of tags, pinqueue sizes and free-spaces can be used. When allocating by [, pinqueue, freespace], the allocator will prioritize choosing peers with the smallest pin queue weight first, and of those with the same weight, it will allocate based on freespace. --- allocator/balanced/balanced.go | 45 ++++++--- allocator/balanced/balanced_test.go | 28 ++++-- api/types.go | 13 +-- cmd/ipfs-cluster-follow/commands.go | 2 +- cmd/ipfs-cluster-service/daemon.go | 19 ++-- cmdutils/configs.go | 20 ++-- informer/pinqueue/config.go | 111 +++++++++++++++++++++++ informer/pinqueue/config_test.go | 76 ++++++++++++++++ informer/pinqueue/pinqueue.go | 110 ++++++++++++++++++++++ informer/pinqueue/pinqueue_test.go | 79 ++++++++++++++++ ipfscluster.go | 2 + pintracker/optracker/operationtracker.go | 5 + pintracker/stateless/stateless.go | 5 + rpc_api.go | 7 ++ rpc_policy.go | 14 +-- test/rpc_api_mock.go | 5 + 16 files changed, 488 insertions(+), 53 deletions(-) create mode 100644 informer/pinqueue/config.go create mode 100644 informer/pinqueue/config_test.go create mode 100644 informer/pinqueue/pinqueue.go create mode 100644 informer/pinqueue/pinqueue_test.go diff --git a/allocator/balanced/balanced.go b/allocator/balanced/balanced.go index 83c408a5..4b5999d9 100644 --- a/allocator/balanced/balanced.go +++ b/allocator/balanced/balanced.go @@ -60,10 +60,11 @@ type partitionedMetric struct { } type partition struct { - value string - weight int64 - peers map[peer.ID]bool // the bool tracks whether the peer has been picked already out of the partition when doing the final sort. - sub *partitionedMetric // all peers in sub-partitions will have the same value for this metric + value string + weight int64 + aggregatedWeight int64 + peers map[peer.ID]bool // the bool tracks whether the peer has been picked already out of the partition when doing the final sort. + sub *partitionedMetric // all peers in sub-partitions will have the same value for this metric } // Returns a partitionedMetric which has partitions and subpartitions based @@ -80,9 +81,19 @@ func partitionMetrics(set api.MetricsSet, by []string) *partitionedMetric { lessF := func(i, j int) bool { wi := pnedMetric.partitions[i].weight wj := pnedMetric.partitions[j].weight - // Strict order + + // if weight is equal, sort by aggregated weight of + // all sub-partitions. if wi == wj { - return pnedMetric.partitions[i].value < pnedMetric.partitions[j].value + awi := pnedMetric.partitions[i].aggregatedWeight + awj := pnedMetric.partitions[j].aggregatedWeight + // If subpartitions weight the same, do strict order + // based on value string + if awi == awj { + return pnedMetric.partitions[i].value < pnedMetric.partitions[j].value + } + return awj < awi + } // Descending! return wj < wi @@ -109,9 +120,10 @@ func partitionMetrics(set api.MetricsSet, by []string) *partitionedMetric { } partition.sub = partitionMetrics(filteredSet, by[1:]) - // Add the weight of our subpartitions + + // Add the aggregated weight of the subpartitions for _, subp := range partition.sub.partitions { - partition.weight += subp.weight + partition.aggregatedWeight += subp.aggregatedWeight } } sort.Slice(pnedMetric.partitions, lessF) @@ -144,10 +156,16 @@ func partitionValues(metrics []api.Metric) []*partition { // The informers must set the Partitionable field accordingly // when two metrics with the same value must be grouped in the // same partition. + // + // Note: aggregatedWeight is the same as weight here (sum of + // weight of all metrics in partitions), and gets updated + // later in partitionMetrics with the aggregated weight of + // sub-partitions. if !m.Partitionable { partitions = append(partitions, &partition{ - value: m.Value, - weight: m.GetWeight(), + value: m.Value, + weight: m.GetWeight(), + aggregatedWeight: m.GetWeight(), peers: map[peer.ID]bool{ m.Peer: false, }, @@ -159,10 +177,12 @@ func partitionValues(metrics []api.Metric) []*partition { if p, ok := partitionsByValue[m.Value]; ok { p.peers[m.Peer] = false p.weight += m.GetWeight() + p.aggregatedWeight += m.GetWeight() } else { partitionsByValue[m.Value] = &partition{ - value: m.Value, - weight: m.GetWeight(), + value: m.Value, + weight: m.GetWeight(), + aggregatedWeight: m.GetWeight(), peers: map[peer.ID]bool{ m.Peer: false, }, @@ -270,6 +290,7 @@ func (a *Allocator) Allocate( priorityPartition := partitionMetrics(priority, a.config.AllocateBy) logger.Debugf("Balanced allocator partitions:\n%s\n", printPartition(candidatePartition, 0)) + //fmt.Println(printPartition(candidatePartition, 0)) first := priorityPartition.sortedPeers() last := candidatePartition.sortedPeers() diff --git a/allocator/balanced/balanced_test.go b/allocator/balanced/balanced_test.go index 8e33c732..943dca5a 100644 --- a/allocator/balanced/balanced_test.go +++ b/allocator/balanced/balanced_test.go @@ -27,6 +27,7 @@ func TestAllocate(t *testing.T) { AllocateBy: []string{ "region", "az", + "pinqueue", "freespace", }, }) @@ -62,12 +63,23 @@ func TestAllocate(t *testing.T) { makeMetric("az", "au1", 0, test.PeerID6, true), makeMetric("az", "au1", 0, test.PeerID7, true), }, + "pinqueue": []api.Metric{ + makeMetric("pinqueue", "100", 0, test.PeerID1, true), + makeMetric("pinqueue", "200", 0, test.PeerID2, true), + + makeMetric("pinqueue", "100", 0, test.PeerID3, true), + makeMetric("pinqueue", "200", 0, test.PeerID4, true), + makeMetric("pinqueue", "300", 0, test.PeerID5, true), + + makeMetric("pinqueue", "100", 0, test.PeerID6, true), + makeMetric("pinqueue", "1000", -1, test.PeerID7, true), + }, "freespace": []api.Metric{ makeMetric("freespace", "100", 100, test.PeerID1, false), makeMetric("freespace", "500", 500, test.PeerID2, false), - makeMetric("freespace", "200", 0, test.PeerID3, false), // weight to 0 to test GetWeight() compat - makeMetric("freespace", "400", 0, test.PeerID4, false), // weight to 0 to test GetWeight() compat + makeMetric("freespace", "200", 200, test.PeerID3, false), + makeMetric("freespace", "400", 400, test.PeerID4, false), makeMetric("freespace", "10", 10, test.PeerID5, false), makeMetric("freespace", "50", 50, test.PeerID6, false), @@ -77,15 +89,15 @@ func TestAllocate(t *testing.T) { }, } - // Regions weights: a-us (pids 1,2): 600. b-eu (pids 3,4,5): 610. c-au (pids 6,7): 650 - // Az weights: us1: 100. us2: 500. eu1: 600. eu2: 10. au1: 650 + // Regions weights: a-us (pids 1,2): 600. b-eu (pids 3,4,5): 610. c-au (pids 6,7): 649 + // Az weights: us1: 100. us2: 500. eu1: 600. eu2: 10. au1: 649 // Based on the algorithm it should choose: // - // - c-au (most-weight)->au1->pid7 + // - c-au (most-weight)->au1->pinqueue(0)->pid6 // - b-eu->eu1->pid4 // - a-us->us2->pid2 // - - // - c-au->au1 (nowhere else to choose)->pid6 (region exausted) + // - c-au->au1 (nowhere else to choose)->pid7 (region exausted) // - b-eu->eu2 (already had in eu1)->pid5 // - a-us->us1 (already had in us2)->pid1 // - @@ -109,7 +121,7 @@ func TestAllocate(t *testing.T) { t.Logf("%d - %s", i, p) switch i { case 0: - if p != test.PeerID7 { + if p != test.PeerID6 { t.Errorf("wrong id in pos %d: %s", i, p) } case 1: @@ -121,7 +133,7 @@ func TestAllocate(t *testing.T) { t.Errorf("wrong id in pos %d: %s", i, p) } case 3: - if p != test.PeerID6 { + if p != test.PeerID7 { t.Errorf("wrong id in pos %d: %s", i, p) } case 4: diff --git a/api/types.go b/api/types.go index 9d759ca1..2a7589f1 100644 --- a/api/types.go +++ b/api/types.go @@ -1355,19 +1355,10 @@ func (m Metric) Discard() bool { return !m.Valid || m.Expired() } -// GetWeight returns the weight of the metric. When it is 0, -// it tries to parse the Value and use it as weight. +// GetWeight returns the weight of the metric. // This is for compatiblity. func (m Metric) GetWeight() int64 { - if m.Weight != 0 { - return m.Weight - } - - val, err := strconv.ParseInt(m.Value, 10, 64) - if err != nil { - return 0 - } - return val + return m.Weight } // MetricSlice is a sortable Metric array. diff --git a/cmd/ipfs-cluster-follow/commands.go b/cmd/ipfs-cluster-follow/commands.go index b749b3e7..e102fe7b 100644 --- a/cmd/ipfs-cluster-follow/commands.go +++ b/cmd/ipfs-cluster-follow/commands.go @@ -337,7 +337,7 @@ func runCmd(c *cli.Context) error { return cli.Exit(errors.Wrap(err, "creating IPFS Connector component"), 1) } - informer, err := disk.NewInformer(cfgs.Diskinf) + informer, err := disk.NewInformer(cfgs.DiskInf) if err != nil { return cli.Exit(errors.Wrap(err, "creating disk informer"), 1) } diff --git a/cmd/ipfs-cluster-service/daemon.go b/cmd/ipfs-cluster-service/daemon.go index c2915a2d..887528e2 100644 --- a/cmd/ipfs-cluster-service/daemon.go +++ b/cmd/ipfs-cluster-service/daemon.go @@ -15,6 +15,7 @@ import ( "github.com/ipfs/ipfs-cluster/consensus/crdt" "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/informer/disk" + "github.com/ipfs/ipfs-cluster/informer/pinqueue" "github.com/ipfs/ipfs-cluster/informer/tags" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" @@ -171,15 +172,21 @@ func createCluster( checkErr("creating IPFS Connector component", err) var informers []ipfscluster.Informer - if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Diskinf.ConfigKey()) { - diskinf, err := disk.NewInformer(cfgs.Diskinf) + if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.DiskInf.ConfigKey()) { + diskInf, err := disk.NewInformer(cfgs.DiskInf) checkErr("creating disk informer", err) - informers = append(informers, diskinf) + informers = append(informers, diskInf) } - if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.Tagsinf.ConfigKey()) { - tagsinf, err := tags.New(cfgs.Tagsinf) + if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.TagsInf.ConfigKey()) { + tagsInf, err := tags.New(cfgs.TagsInf) checkErr("creating numpin informer", err) - informers = append(informers, tagsinf) + informers = append(informers, tagsInf) + } + + if cfgMgr.IsLoadedFromJSON(config.Informer, cfgs.PinQueueInf.ConfigKey()) { + pinQueueInf, err := pinqueue.New(cfgs.PinQueueInf) + checkErr("creating pinqueue informer", err) + informers = append(informers, pinQueueInf) } // For legacy compatibility we need to make the allocator diff --git a/cmdutils/configs.go b/cmdutils/configs.go index d689e758..2490ef0f 100644 --- a/cmdutils/configs.go +++ b/cmdutils/configs.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/ipfs-cluster/datastore/leveldb" "github.com/ipfs/ipfs-cluster/informer/disk" "github.com/ipfs/ipfs-cluster/informer/numpin" + "github.com/ipfs/ipfs-cluster/informer/pinqueue" "github.com/ipfs/ipfs-cluster/informer/tags" "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" @@ -38,9 +39,10 @@ type Configs struct { Statelesstracker *stateless.Config Pubsubmon *pubsubmon.Config BalancedAlloc *balanced.Config - Diskinf *disk.Config - Numpininf *numpin.Config - Tagsinf *tags.Config + DiskInf *disk.Config + NumpinInf *numpin.Config + TagsInf *tags.Config + PinQueueInf *pinqueue.Config Metrics *observations.MetricsConfig Tracing *observations.TracingConfig Badger *badger.Config @@ -228,9 +230,10 @@ func (ch *ConfigHelper) init() { Statelesstracker: &stateless.Config{}, Pubsubmon: &pubsubmon.Config{}, BalancedAlloc: &balanced.Config{}, - Diskinf: &disk.Config{}, - Numpininf: &numpin.Config{}, - Tagsinf: &tags.Config{}, + DiskInf: &disk.Config{}, + NumpinInf: &numpin.Config{}, + TagsInf: &tags.Config{}, + PinQueueInf: &pinqueue.Config{}, Metrics: &observations.MetricsConfig{}, Tracing: &observations.TracingConfig{}, Badger: &badger.Config{}, @@ -244,9 +247,10 @@ func (ch *ConfigHelper) init() { man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker) man.RegisterComponent(config.Monitor, cfgs.Pubsubmon) man.RegisterComponent(config.Allocator, cfgs.BalancedAlloc) - man.RegisterComponent(config.Informer, cfgs.Diskinf) + man.RegisterComponent(config.Informer, cfgs.DiskInf) // man.RegisterComponent(config.Informer, cfgs.Numpininf) - man.RegisterComponent(config.Informer, cfgs.Tagsinf) + man.RegisterComponent(config.Informer, cfgs.TagsInf) + man.RegisterComponent(config.Informer, cfgs.PinQueueInf) man.RegisterComponent(config.Observations, cfgs.Metrics) man.RegisterComponent(config.Observations, cfgs.Tracing) diff --git a/informer/pinqueue/config.go b/informer/pinqueue/config.go new file mode 100644 index 00000000..acbadab1 --- /dev/null +++ b/informer/pinqueue/config.go @@ -0,0 +1,111 @@ +package pinqueue + +import ( + "encoding/json" + "errors" + "time" + + "github.com/ipfs/ipfs-cluster/config" + "github.com/kelseyhightower/envconfig" +) + +const configKey = "pinqueue" +const envConfigKey = "cluster_pinqueue" + +// These are the default values for a Config. +const ( + DefaultMetricTTL = 30 * time.Second + DefaultWeightBucketSize = 100000 // 100k pins +) + +// Config allows to initialize an Informer. +type Config struct { + config.Saver + + MetricTTL time.Duration + WeightBucketSize int +} + +type jsonConfig struct { + MetricTTL string `json:"metric_ttl"` + WeightBucketSize int `json:"weight_bucket_size"` +} + +// ConfigKey returns a human-friendly identifier for this +// Config's type. +func (cfg *Config) ConfigKey() string { + return configKey +} + +// Default initializes this Config with sensible values. +func (cfg *Config) Default() error { + cfg.MetricTTL = DefaultMetricTTL + cfg.WeightBucketSize = DefaultWeightBucketSize + return nil +} + +// ApplyEnvVars fills in any Config fields found +// as environment variables. +func (cfg *Config) ApplyEnvVars() error { + jcfg := cfg.toJSONConfig() + + err := envconfig.Process(envConfigKey, jcfg) + if err != nil { + return err + } + + return cfg.applyJSONConfig(jcfg) +} + +// Validate checks that the fields of this configuration have +// sensible values. +func (cfg *Config) Validate() error { + if cfg.MetricTTL <= 0 { + return errors.New("pinqueue.metric_ttl is invalid") + } + if cfg.WeightBucketSize < 0 { + return errors.New("pinqueue.WeightBucketSize is invalid") + } + + return nil +} + +// LoadJSON parses a raw JSON byte-slice as generated by ToJSON(). +func (cfg *Config) LoadJSON(raw []byte) error { + jcfg := &jsonConfig{} + err := json.Unmarshal(raw, jcfg) + if err != nil { + return err + } + + cfg.Default() + + return cfg.applyJSONConfig(jcfg) +} + +func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { + t, _ := time.ParseDuration(jcfg.MetricTTL) + cfg.MetricTTL = t + cfg.WeightBucketSize = jcfg.WeightBucketSize + + return cfg.Validate() +} + +// ToJSON generates a human-friendly JSON representation of this Config. +func (cfg *Config) ToJSON() ([]byte, error) { + jcfg := cfg.toJSONConfig() + + return config.DefaultJSONMarshal(jcfg) +} + +func (cfg *Config) toJSONConfig() *jsonConfig { + return &jsonConfig{ + MetricTTL: cfg.MetricTTL.String(), + WeightBucketSize: cfg.WeightBucketSize, + } +} + +// ToDisplayJSON returns JSON config as a string. +func (cfg *Config) ToDisplayJSON() ([]byte, error) { + return config.DisplayJSON(cfg.toJSONConfig()) +} diff --git a/informer/pinqueue/config_test.go b/informer/pinqueue/config_test.go new file mode 100644 index 00000000..b72d7476 --- /dev/null +++ b/informer/pinqueue/config_test.go @@ -0,0 +1,76 @@ +package pinqueue + +import ( + "encoding/json" + "os" + "testing" + "time" +) + +var cfgJSON = []byte(` +{ + "metric_ttl": "1s" +} +`) + +func TestLoadJSON(t *testing.T) { + cfg := &Config{} + err := cfg.LoadJSON(cfgJSON) + if err != nil { + t.Fatal(err) + } + + j := &jsonConfig{} + + json.Unmarshal(cfgJSON, j) + j.MetricTTL = "-10" + tst, _ := json.Marshal(j) + err = cfg.LoadJSON(tst) + if err == nil { + t.Error("expected error decoding metric_ttl") + } +} + +func TestToJSON(t *testing.T) { + cfg := &Config{} + cfg.LoadJSON(cfgJSON) + newjson, err := cfg.ToJSON() + if err != nil { + t.Fatal(err) + } + cfg = &Config{} + err = cfg.LoadJSON(newjson) + if err != nil { + t.Fatal(err) + } +} + +func TestDefault(t *testing.T) { + cfg := &Config{} + cfg.Default() + if cfg.Validate() != nil { + t.Fatal("error validating") + } + + cfg.MetricTTL = 0 + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } + + cfg.Default() + cfg.WeightBucketSize = -2 + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } + +} + +func TestApplyEnvVars(t *testing.T) { + os.Setenv("CLUSTER_PINQUEUE_METRICTTL", "22s") + cfg := &Config{} + cfg.ApplyEnvVars() + + if cfg.MetricTTL != 22*time.Second { + t.Fatal("failed to override metric_ttl with env var") + } +} diff --git a/informer/pinqueue/pinqueue.go b/informer/pinqueue/pinqueue.go new file mode 100644 index 00000000..26607962 --- /dev/null +++ b/informer/pinqueue/pinqueue.go @@ -0,0 +1,110 @@ +// Package pinqueue implements an ipfs-cluster informer which issues the +// current size of the pinning queue. +package pinqueue + +import ( + "context" + "fmt" + "sync" + + "github.com/ipfs/ipfs-cluster/api" + + rpc "github.com/libp2p/go-libp2p-gorpc" + + "go.opencensus.io/trace" +) + +// MetricName specifies the name of our metric +var MetricName = "pinqueue" + +// Informer is a simple object to implement the ipfscluster.Informer +// and Component interfaces +type Informer struct { + config *Config + + mu sync.Mutex + rpcClient *rpc.Client +} + +// New returns an initialized Informer. +func New(cfg *Config) (*Informer, error) { + err := cfg.Validate() + if err != nil { + return nil, err + } + + return &Informer{ + config: cfg, + }, nil +} + +// SetClient provides us with an rpc.Client which allows +// contacting other components in the cluster. +func (inf *Informer) SetClient(c *rpc.Client) { + inf.mu.Lock() + inf.rpcClient = c + inf.mu.Unlock() +} + +// Shutdown is called on cluster shutdown. We just invalidate +// any metrics from this point. +func (inf *Informer) Shutdown(ctx context.Context) error { + _, span := trace.StartSpan(ctx, "informer/numpin/Shutdown") + defer span.End() + + inf.mu.Lock() + inf.rpcClient = nil + inf.mu.Unlock() + return nil +} + +// Name returns the name of this informer +func (inf *Informer) Name() string { + return MetricName +} + +// GetMetrics contacts the Pintracker component and requests the number of +// queued items for pinning. +func (inf *Informer) GetMetrics(ctx context.Context) []api.Metric { + ctx, span := trace.StartSpan(ctx, "informer/pinqueue/GetMetric") + defer span.End() + + inf.mu.Lock() + rpcClient := inf.rpcClient + inf.mu.Unlock() + + if rpcClient == nil { + return []api.Metric{ + { + Valid: false, + }, + } + } + + var queued int64 + + err := rpcClient.CallContext( + ctx, + "", + "PinTracker", + "PinQueueSize", + struct{}{}, + &queued, + ) + valid := err == nil + weight := -queued // smaller pin queues have more priority + if div := inf.config.WeightBucketSize; div > 0 { + weight = weight / int64(div) + } + + m := api.Metric{ + Name: MetricName, + Value: fmt.Sprintf("%d", queued), + Valid: valid, + Partitionable: false, + Weight: weight, + } + + m.SetTTL(inf.config.MetricTTL) + return []api.Metric{m} +} diff --git a/informer/pinqueue/pinqueue_test.go b/informer/pinqueue/pinqueue_test.go new file mode 100644 index 00000000..697517e8 --- /dev/null +++ b/informer/pinqueue/pinqueue_test.go @@ -0,0 +1,79 @@ +package pinqueue + +import ( + "context" + "testing" + + rpc "github.com/libp2p/go-libp2p-gorpc" +) + +type mockService struct{} + +func (mock *mockService) PinQueueSize(ctx context.Context, in struct{}, out *int64) error { + *out = 42 + return nil +} + +func mockRPCClient(t *testing.T) *rpc.Client { + s := rpc.NewServer(nil, "mock") + c := rpc.NewClientWithServer(nil, "mock", s) + err := s.RegisterName("PinTracker", &mockService{}) + if err != nil { + t.Fatal(err) + } + return c +} + +func Test(t *testing.T) { + ctx := context.Background() + cfg := &Config{} + cfg.Default() + cfg.WeightBucketSize = 0 + inf, err := New(cfg) + if err != nil { + t.Fatal(err) + } + metrics := inf.GetMetrics(ctx) + if len(metrics) != 1 { + t.Fatal("expected 1 metric") + } + m := metrics[0] + + if m.Valid { + t.Error("metric should be invalid") + } + inf.SetClient(mockRPCClient(t)) + + metrics = inf.GetMetrics(ctx) + if len(metrics) != 1 { + t.Fatal("expected 1 metric") + } + m = metrics[0] + if !m.Valid { + t.Error("metric should be valid") + } + if m.Value != "42" { + t.Error("bad metric value", m.Value) + } + if m.Partitionable { + t.Error("should not be a partitionable metric") + } + if m.Weight != -42 { + t.Error("weight should be -42") + } + + cfg.WeightBucketSize = 5 + inf, err = New(cfg) + if err != nil { + t.Fatal(err) + } + inf.SetClient(mockRPCClient(t)) + metrics = inf.GetMetrics(ctx) + if len(metrics) != 1 { + t.Fatal("expected 1 metric") + } + m = metrics[0] + if m.Weight != -8 { + t.Error("weight should be -8, not", m.Weight) + } +} diff --git a/ipfscluster.go b/ipfscluster.go index ad820222..454ff064 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -128,6 +128,8 @@ type PinTracker interface { RecoverAll(context.Context, chan<- api.PinInfo) error // Recover retriggers a Pin/Unpin operation in a Cids with error status. Recover(context.Context, api.Cid) (api.PinInfo, error) + // PinQueueSize returns the current size of the pinning queue. + PinQueueSize(context.Context) (int64, error) } // Informer provides Metric information from a peer. The metrics produced by diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index d8d068bc..86a59e38 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -368,3 +368,8 @@ func (opt *OperationTracker) recordMetric(op *Operation, val int64) { } } } + +// PinQueueSize returns the current number of items queued to pin. +func (opt *OperationTracker) PinQueueSize() int64 { + return atomic.LoadInt64(&opt.pinQueuedCount) +} diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index d4592848..fcd87cdf 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -658,6 +658,11 @@ func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, error return out, nil } +// PinQueueSize returns the current size of the pinning queue. +func (spt *Tracker) PinQueueSize(ctx context.Context) (int64, error) { + return spt.optracker.PinQueueSize(), nil +} + // func (spt *Tracker) getErrorsAll(ctx context.Context) []api.PinInfo { // return spt.optracker.Filter(ctx, optracker.PhaseError) // } diff --git a/rpc_api.go b/rpc_api.go index 581dfc09..62307dcc 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -498,6 +498,13 @@ func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in api.Cid, out *ap return err } +// PinQueueSize runs PinTracker.PinQueueSize(). +func (rpcapi *PinTrackerRPCAPI) PinQueueSize(ctx context.Context, in struct{}, out *int64) error { + size, err := rpcapi.tracker.PinQueueSize(ctx) + *out = size + return err +} + /* IPFS Connector component methods */ diff --git a/rpc_policy.go b/rpc_policy.go index 79388efe..cb935943 100644 --- a/rpc_policy.go +++ b/rpc_policy.go @@ -39,12 +39,13 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{ "Cluster.Version": RPCOpen, // PinTracker methods - "PinTracker.Recover": RPCTrusted, // Called in broadcast from Recover() - "PinTracker.RecoverAll": RPCClosed, // Broadcast in RecoverAll unimplemented - "PinTracker.Status": RPCTrusted, - "PinTracker.StatusAll": RPCTrusted, - "PinTracker.Track": RPCClosed, - "PinTracker.Untrack": RPCClosed, + "PinTracker.PinQueueSize": RPCClosed, + "PinTracker.Recover": RPCTrusted, // Called in broadcast from Recover() + "PinTracker.RecoverAll": RPCClosed, // Broadcast in RecoverAll unimplemented + "PinTracker.Status": RPCTrusted, + "PinTracker.StatusAll": RPCTrusted, + "PinTracker.Track": RPCClosed, + "PinTracker.Untrack": RPCClosed, // IPFSConnector methods "IPFSConnector.BlockGet": RPCClosed, @@ -69,4 +70,3 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{ "PeerMonitor.LatestMetrics": RPCClosed, "PeerMonitor.MetricNames": RPCClosed, } - diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index d24fed91..027d9649 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -502,6 +502,11 @@ func (mock *mockPinTracker) Recover(ctx context.Context, in api.Cid, out *api.Pi return nil } +func (mock *mockPinTracker) PinQueueSize(ctx context.Context, in struct{}, out *int64) error { + *out = 10 + return nil +} + /* PeerMonitor methods */ // LatestMetrics runs PeerMonitor.LatestMetrics().