Rename allocator/metrics to allocator/balanced

This commit is contained in:
Hector Sanjuan 2021-10-06 11:26:38 +02:00
parent 6b31f44351
commit 26e229df94
19 changed files with 197 additions and 35 deletions

View File

@ -1,4 +1,4 @@
// Package metrics implements an allocator that can sort allocations
// Package balanced implements an allocator that can sort allocations
// based on multiple metrics, where metrics may be an arbitrary way to
// partition a set of peers.
//
@ -6,7 +6,7 @@
// first order candidate peers by tag metric, and then by "disk" metric.
// The final list will pick up allocations from each tag metric group.
// based on the given order of metrics.
package metrics
package balanced
import (
"context"

View File

@ -1,4 +1,4 @@
package metrics
package balanced
import (
"context"

View File

@ -1,4 +1,4 @@
package metrics
package balanced
import (
"encoding/json"

View File

@ -1,4 +1,4 @@
package metrics
package balanced
import (
"os"

View File

@ -1,4 +1,4 @@
package metrics
package balanced
import "github.com/ipfs/ipfs-cluster/api"

View File

@ -12,7 +12,7 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/informer/numpin"
@ -180,7 +180,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
t.Fatal(err)
}
alloc, err := metrics.New(&metrics.Config{
alloc, err := balanced.New(&balanced.Config{
AllocateBy: []string{"numpin"},
})
if err != nil {

View File

@ -11,7 +11,7 @@ import (
"github.com/ipfs/go-cid"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
@ -342,7 +342,7 @@ func runCmd(c *cli.Context) error {
if err != nil {
return cli.Exit(errors.Wrap(err, "creating disk informer"), 1)
}
alloc, err := metrics.New(cfgs.MetricsAlloc)
alloc, err := balanced.New(cfgs.BalancedAlloc)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating metrics allocator"), 1)
}

View File

@ -6,7 +6,7 @@ import (
"time"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
@ -168,7 +168,7 @@ func createCluster(
informers = append(informers, tagsinf)
}
alloc, err := metrics.New(cfgs.MetricsAlloc)
alloc, err := balanced.New(cfgs.BalancedAlloc)
checkErr("creating allocator", err)
ipfscluster.ReadyTimeout = cfgs.Raft.WaitForLeaderTimeout + 5*time.Second

View File

@ -0,0 +1 @@
{"replication_factor_min":-1,"replication_factor_max":-1,"name":"","mode":"direct","shard_size":0,"user_allocations":null,"expire_at":"0001-01-01T00:00:00Z","metadata":null,"pin_update":null,"cid":{"/":"QmUaFyXjZUNaUwYF8rBtbJc7fEJ46aJXvgV8z2HHs6jvmJ"},"type":2,"allocations":[],"max_depth":0,"reference":null}

Binary file not shown.

View File

@ -0,0 +1,4 @@
{
"id": "12D3KooWFMMrT3ChFWoTz9L7QFmiLnnMXC7D51KwrCjJdiuYPJqQ",
"private_key": "CAESQPzB/FNh30DDOW2B9JBnHwhM8d7tB/hz5i7NWxzxqiStUjsVQ2mDYmNikFNVkKhibowhTjDzYnSbEXo4SLwSZns="
}

View File

@ -0,0 +1,157 @@
{
"cluster": {
"peername": "carbon",
"secret": "a5da3e684ec4bb93cd94327f965035d0db5001a2983dd94a164e5070aade53bd",
"leave_on_shutdown": false,
"listen_multiaddress": [
"/ip4/0.0.0.0/tcp/9096",
"/ip4/0.0.0.0/udp/9096/quic"
],
"enable_relay_hop": true,
"connection_manager": {
"high_water": 400,
"low_water": 100,
"grace_period": "2m0s"
},
"dial_peer_timeout": "3s",
"state_sync_interval": "5m0s",
"pin_recover_interval": "12m0s",
"replication_factor_min": -1,
"replication_factor_max": -1,
"monitor_ping_interval": "15s",
"peer_watch_interval": "5s",
"mdns_interval": "10s",
"disable_repinning": true,
"peer_addresses": []
},
"consensus": {
"crdt": {
"cluster_name": "ipfs-cluster",
"trusted_peers": [
"*"
],
"batching": {
"max_batch_size": 0,
"max_batch_age": "0s"
}
}
},
"api": {
"ipfsproxy": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"log_file": "",
"read_timeout": "0s",
"read_header_timeout": "5s",
"write_timeout": "0s",
"idle_timeout": "1m0s",
"max_header_bytes": 4096
},
"restapi": {
"http_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "0s",
"read_header_timeout": "5s",
"write_timeout": "0s",
"idle_timeout": "2m0s",
"max_header_bytes": 4096,
"basic_auth_credentials": null,
"http_log_file": "",
"headers": {},
"cors_allowed_origins": [
"*"
],
"cors_allowed_methods": [
"GET"
],
"cors_allowed_headers": [],
"cors_exposed_headers": [
"Content-Type",
"X-Stream-Output",
"X-Chunked-Output",
"X-Content-Length"
],
"cors_allow_credentials": true,
"cors_max_age": "0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "30s",
"ipfs_request_timeout": "5m0s",
"pin_timeout": "2m0s",
"unpin_timeout": "3h0m0s",
"repogc_timeout": "24h0m0s"
}
},
"pin_tracker": {
"stateless": {
"concurrent_pins": 10
}
},
"monitor": {
"pubsubmon": {
"check_interval": "15s",
"failure_threshold": 3
}
},
"allocator": {
"metricsalloc": {
"allocate_by": [
"freespace"
]
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"tags": {
"metric_ttl": "5s",
"tags": {}
}
},
"observations": {
"metrics": {
"enable_stats": false,
"prometheus_endpoint": "/ip4/127.0.0.1/tcp/8888",
"reporting_interval": "2s"
},
"tracing": {
"enable_tracing": false,
"jaeger_agent_endpoint": "/ip4/0.0.0.0/udp/6831",
"sampling_prob": 0.3,
"service_name": "cluster-daemon"
}
},
"datastore": {
"badger": {
"gc_discard_ratio": 0.2,
"gc_interval": "15m0s",
"gc_sleep": "10s",
"badger_options": {
"dir": "",
"value_dir": "",
"sync_writes": true,
"table_loading_mode": 2,
"value_log_loading_mode": 0,
"num_versions_to_keep": 1,
"max_table_size": 16777216,
"level_size_multiplier": 10,
"max_levels": 7,
"value_threshold": 32,
"num_memtables": 5,
"num_level_zero_tables": 5,
"num_level_zero_tables_stall": 10,
"level_one_size": 268435456,
"value_log_file_size": 1073741823,
"value_log_max_entries": 1000000,
"num_compactors": 2,
"compact_l_0_on_close": false,
"read_only": false,
"truncate": true
}
}
}
}

View File

@ -8,7 +8,7 @@ import (
"github.com/pkg/errors"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/config"
@ -35,7 +35,7 @@ type Configs struct {
Crdt *crdt.Config
Statelesstracker *stateless.Config
Pubsubmon *pubsubmon.Config
MetricsAlloc *metrics.Config
BalancedAlloc *balanced.Config
Diskinf *disk.Config
Numpininf *numpin.Config
Tagsinf *tags.Config
@ -224,7 +224,7 @@ func (ch *ConfigHelper) init() {
Crdt: &crdt.Config{},
Statelesstracker: &stateless.Config{},
Pubsubmon: &pubsubmon.Config{},
MetricsAlloc: &metrics.Config{},
BalancedAlloc: &balanced.Config{},
Diskinf: &disk.Config{},
Numpininf: &numpin.Config{},
Tagsinf: &tags.Config{},
@ -239,7 +239,7 @@ func (ch *ConfigHelper) init() {
man.RegisterComponent(config.IPFSConn, cfgs.Ipfshttp)
man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker)
man.RegisterComponent(config.Monitor, cfgs.Pubsubmon)
man.RegisterComponent(config.Allocator, cfgs.MetricsAlloc)
man.RegisterComponent(config.Allocator, cfgs.BalancedAlloc)
man.RegisterComponent(config.Informer, cfgs.Diskinf)
// man.RegisterComponent(config.Informer, cfgs.Numpininf)
man.RegisterComponent(config.Informer, cfgs.Tagsinf)

View File

@ -1,7 +1,7 @@
package ipfscluster
import (
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api/ipfsproxy"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/config"
@ -127,7 +127,7 @@ var testingMonCfg = []byte(`{
"failure_threshold": 6
}`)
var testingAllocMetricsCfg = []byte(`{
var testingAllocBalancedCfg = []byte(`{
"allocate_by": ["freespace"]
}`)
@ -142,8 +142,8 @@ var testingTracerCfg = []byte(`{
"service_name": "cluster-daemon"
}`)
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *metrics.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *balanced.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
identity.LoadJSON(testingIdentity)
clusterCfg.LoadJSON(testingClusterCfg)
apiCfg.LoadJSON(testingAPICfg)
@ -155,14 +155,14 @@ func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Confi
crdtCfg.LoadJSON(testingCrdtCfg)
statelesstrkrCfg.LoadJSON(testingTrackerCfg)
pubsubmonCfg.LoadJSON(testingMonCfg)
allocMetricsCfg.LoadJSON(testingAllocMetricsCfg)
allocBalancedCfg.LoadJSON(testingAllocBalancedCfg)
diskInfCfg.LoadJSON(testingDiskInfCfg)
tracingCfg.LoadJSON(testingTracerCfg)
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg
}
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *metrics.Config, *disk.Config, *observations.TracingConfig) {
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *leveldb.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *balanced.Config, *disk.Config, *observations.TracingConfig) {
identity := &config.Identity{}
clusterCfg := &Config{}
apiCfg := &rest.Config{}
@ -174,10 +174,10 @@ func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.
crdtCfg := &crdt.Config{}
statelessCfg := &stateless.Config{}
pubsubmonCfg := &pubsubmon.Config{}
allocMetricsCfg := &metrics.Config{}
allocBalancedCfg := &balanced.Config{}
diskInfCfg := &disk.Config{}
tracingCfg := &observations.TracingConfig{}
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg
}
// func TestConfigDefault(t *testing.T) {

View File

@ -7,7 +7,7 @@ import (
"fmt"
"sync"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
@ -41,8 +41,8 @@ func (t MetricType) String() string {
var logger = logging.Logger("diskinfo")
func init() {
metrics.RegisterInformer(MetricFreeSpace.String(), sorter.SortNumericReverse, false)
metrics.RegisterInformer(MetricRepoSize.String(), sorter.SortNumeric, false)
balanced.RegisterInformer(MetricFreeSpace.String(), sorter.SortNumericReverse, false)
balanced.RegisterInformer(MetricRepoSize.String(), sorter.SortNumeric, false)
}
// Informer is a simple object to implement the ipfscluster.Informer

View File

@ -6,7 +6,7 @@ import (
"context"
"fmt"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
@ -19,7 +19,7 @@ import (
var MetricName = "numpin"
func init() {
metrics.RegisterInformer(MetricName, sorter.SortNumeric, false)
balanced.RegisterInformer(MetricName, sorter.SortNumeric, false)
}
// Informer is a simple object to implement the ipfscluster.Informer

View File

@ -6,7 +6,7 @@ import (
"context"
"sync"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/allocator/sorter"
"github.com/ipfs/ipfs-cluster/api"
@ -36,7 +36,7 @@ func New(cfg *Config) (*Informer, error) {
}
for k := range cfg.Tags {
metrics.RegisterInformer("tag:"+k, sorter.SortText, true)
balanced.RegisterInformer("tag:"+k, sorter.SortText, true)
}
return &Informer{

View File

@ -17,7 +17,7 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/allocator/metrics"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/consensus/crdt"
@ -176,7 +176,7 @@ func createComponents(
peername := fmt.Sprintf("peer_%d", i)
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, allocMetricsCfg, diskInfCfg, tracingCfg := testingConfigs()
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, levelDBCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, allocBalancedCfg, diskInfCfg, tracingCfg := testingConfigs()
ident.ID = host.ID()
ident.PrivateKey = host.Peerstore().PrivKey(host.ID())
@ -211,7 +211,7 @@ func createComponents(
t.Fatal(err)
}
alloc, err := metrics.New(allocMetricsCfg)
alloc, err := balanced.New(allocBalancedCfg)
if err != nil {
t.Fatal(err)
}