Fix: repo/stat gets hammered on busy cluster peers
Given that every pin and block/put writes something to IPFS and thus increases the repo size, a while ago we added a check to let the IPFS connector directly trigger the sending of metrics every 10 of such requests. This was meant to update the metrics more often so that balancing happened more granularly (particularly the freespace one). In practice, on a cluster that receives several hundreds of pin/adds operations in a few seconds, this is just bad. So: * We disable by default the whole thing. * We add a new InformerTriggerInterval configuration option to enable the thing. * Fix a bug that made this always call the first informer, which may not have been the freespace one).
This commit is contained in:
parent
ed348f29c1
commit
acde3f16d0
|
@ -112,7 +112,8 @@ var testingIpfsCfg = []byte(`{
|
||||||
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||||
"connect_swarms_delay": "7s",
|
"connect_swarms_delay": "7s",
|
||||||
"pin_timeout": "30s",
|
"pin_timeout": "30s",
|
||||||
"unpin_timeout": "15s"
|
"unpin_timeout": "15s",
|
||||||
|
"informer_trigger_interval": 10
|
||||||
}`)
|
}`)
|
||||||
|
|
||||||
var testingTrackerCfg = []byte(`
|
var testingTrackerCfg = []byte(`
|
||||||
|
|
|
@ -18,13 +18,14 @@ const envConfigKey = "cluster_ipfshttp"
|
||||||
|
|
||||||
// Default values for Config.
|
// Default values for Config.
|
||||||
const (
|
const (
|
||||||
DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001"
|
DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001"
|
||||||
DefaultConnectSwarmsDelay = 30 * time.Second
|
DefaultConnectSwarmsDelay = 30 * time.Second
|
||||||
DefaultIPFSRequestTimeout = 5 * time.Minute
|
DefaultIPFSRequestTimeout = 5 * time.Minute
|
||||||
DefaultPinTimeout = 2 * time.Minute
|
DefaultPinTimeout = 2 * time.Minute
|
||||||
DefaultUnpinTimeout = 3 * time.Hour
|
DefaultUnpinTimeout = 3 * time.Hour
|
||||||
DefaultRepoGCTimeout = 24 * time.Hour
|
DefaultRepoGCTimeout = 24 * time.Hour
|
||||||
DefaultUnpinDisable = false
|
DefaultInformerTriggerInterval = 0 // disabled
|
||||||
|
DefaultUnpinDisable = false
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config is used to initialize a Connector and allows to customize
|
// Config is used to initialize a Connector and allows to customize
|
||||||
|
@ -51,6 +52,11 @@ type Config struct {
|
||||||
|
|
||||||
// RepoGC Operation timeout
|
// RepoGC Operation timeout
|
||||||
RepoGCTimeout time.Duration
|
RepoGCTimeout time.Duration
|
||||||
|
|
||||||
|
// How many pin and block/put operations need to happen before we do a
|
||||||
|
// special broadcast informer metrics to the network. 0 to disable.
|
||||||
|
InformerTriggerInterval int
|
||||||
|
|
||||||
// Disables the unpin operation and returns an error.
|
// Disables the unpin operation and returns an error.
|
||||||
UnpinDisable bool
|
UnpinDisable bool
|
||||||
|
|
||||||
|
@ -59,13 +65,14 @@ type Config struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type jsonConfig struct {
|
type jsonConfig struct {
|
||||||
NodeMultiaddress string `json:"node_multiaddress"`
|
NodeMultiaddress string `json:"node_multiaddress"`
|
||||||
ConnectSwarmsDelay string `json:"connect_swarms_delay"`
|
ConnectSwarmsDelay string `json:"connect_swarms_delay"`
|
||||||
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
|
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
|
||||||
PinTimeout string `json:"pin_timeout"`
|
PinTimeout string `json:"pin_timeout"`
|
||||||
UnpinTimeout string `json:"unpin_timeout"`
|
UnpinTimeout string `json:"unpin_timeout"`
|
||||||
RepoGCTimeout string `json:"repogc_timeout"`
|
RepoGCTimeout string `json:"repogc_timeout"`
|
||||||
UnpinDisable bool `json:"unpin_disable,omitempty"`
|
InformerTriggerInterval int `json:"informer_trigger_interval"`
|
||||||
|
UnpinDisable bool `json:"unpin_disable,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConfigKey provides a human-friendly identifier for this type of Config.
|
// ConfigKey provides a human-friendly identifier for this type of Config.
|
||||||
|
@ -82,6 +89,7 @@ func (cfg *Config) Default() error {
|
||||||
cfg.PinTimeout = DefaultPinTimeout
|
cfg.PinTimeout = DefaultPinTimeout
|
||||||
cfg.UnpinTimeout = DefaultUnpinTimeout
|
cfg.UnpinTimeout = DefaultUnpinTimeout
|
||||||
cfg.RepoGCTimeout = DefaultRepoGCTimeout
|
cfg.RepoGCTimeout = DefaultRepoGCTimeout
|
||||||
|
cfg.InformerTriggerInterval = DefaultInformerTriggerInterval
|
||||||
cfg.UnpinDisable = DefaultUnpinDisable
|
cfg.UnpinDisable = DefaultUnpinDisable
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -130,6 +138,9 @@ func (cfg *Config) Validate() error {
|
||||||
if cfg.RepoGCTimeout < 0 {
|
if cfg.RepoGCTimeout < 0 {
|
||||||
err = errors.New("ipfshttp.repogc_timeout invalid")
|
err = errors.New("ipfshttp.repogc_timeout invalid")
|
||||||
}
|
}
|
||||||
|
if cfg.InformerTriggerInterval < 0 {
|
||||||
|
err = errors.New("ipfshttp.update_metrics_after")
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
||||||
|
@ -157,6 +168,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
||||||
|
|
||||||
cfg.NodeAddr = nodeAddr
|
cfg.NodeAddr = nodeAddr
|
||||||
cfg.UnpinDisable = jcfg.UnpinDisable
|
cfg.UnpinDisable = jcfg.UnpinDisable
|
||||||
|
cfg.InformerTriggerInterval = jcfg.InformerTriggerInterval
|
||||||
|
|
||||||
err = config.ParseDurations(
|
err = config.ParseDurations(
|
||||||
"ipfshttp",
|
"ipfshttp",
|
||||||
|
@ -201,6 +213,7 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) {
|
||||||
jcfg.PinTimeout = cfg.PinTimeout.String()
|
jcfg.PinTimeout = cfg.PinTimeout.String()
|
||||||
jcfg.UnpinTimeout = cfg.UnpinTimeout.String()
|
jcfg.UnpinTimeout = cfg.UnpinTimeout.String()
|
||||||
jcfg.RepoGCTimeout = cfg.RepoGCTimeout.String()
|
jcfg.RepoGCTimeout = cfg.RepoGCTimeout.String()
|
||||||
|
jcfg.InformerTriggerInterval = cfg.InformerTriggerInterval
|
||||||
jcfg.UnpinDisable = cfg.UnpinDisable
|
jcfg.UnpinDisable = cfg.UnpinDisable
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -14,7 +14,8 @@ var cfgJSON = []byte(`
|
||||||
"ipfs_request_timeout": "5m0s",
|
"ipfs_request_timeout": "5m0s",
|
||||||
"pin_timeout": "2m",
|
"pin_timeout": "2m",
|
||||||
"unpin_timeout": "3h",
|
"unpin_timeout": "3h",
|
||||||
"repogc_timeout": "24h"
|
"repogc_timeout": "24h",
|
||||||
|
"informer_trigger_interval": 10
|
||||||
}
|
}
|
||||||
`)
|
`)
|
||||||
|
|
||||||
|
@ -27,6 +28,11 @@ func TestLoadJSON(t *testing.T) {
|
||||||
|
|
||||||
j := &jsonConfig{}
|
j := &jsonConfig{}
|
||||||
json.Unmarshal(cfgJSON, j)
|
json.Unmarshal(cfgJSON, j)
|
||||||
|
|
||||||
|
if cfg.InformerTriggerInterval != 10 {
|
||||||
|
t.Error("missing value")
|
||||||
|
}
|
||||||
|
|
||||||
j.NodeMultiaddress = "abc"
|
j.NodeMultiaddress = "abc"
|
||||||
tst, _ := json.Marshal(j)
|
tst, _ := json.Marshal(j)
|
||||||
err = cfg.LoadJSON(tst)
|
err = cfg.LoadJSON(tst)
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/ipfs-cluster/api"
|
"github.com/ipfs/ipfs-cluster/api"
|
||||||
|
@ -41,11 +42,6 @@ var DNSTimeout = 5 * time.Second
|
||||||
|
|
||||||
var logger = logging.Logger("ipfshttp")
|
var logger = logging.Logger("ipfshttp")
|
||||||
|
|
||||||
// updateMetricsMod only makes updates to informer metrics
|
|
||||||
// on the nth occasion. So, for example, for every BlockPut,
|
|
||||||
// only the 10th will trigger a SendInformerMetrics call.
|
|
||||||
var updateMetricMod = 10
|
|
||||||
|
|
||||||
// Connector implements the IPFSConnector interface
|
// Connector implements the IPFSConnector interface
|
||||||
// and provides a component which is used to perform
|
// and provides a component which is used to perform
|
||||||
// on-demand requests against the configured IPFS daemom
|
// on-demand requests against the configured IPFS daemom
|
||||||
|
@ -62,8 +58,7 @@ type Connector struct {
|
||||||
|
|
||||||
client *http.Client // client to ipfs daemon
|
client *http.Client // client to ipfs daemon
|
||||||
|
|
||||||
updateMetricMutex sync.Mutex
|
updateMetricCount uint64
|
||||||
updateMetricCount int
|
|
||||||
|
|
||||||
shutdownLock sync.Mutex
|
shutdownLock sync.Mutex
|
||||||
shutdown bool
|
shutdown bool
|
||||||
|
@ -970,11 +965,12 @@ func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error)
|
||||||
// Returns true every updateMetricsMod-th time that we
|
// Returns true every updateMetricsMod-th time that we
|
||||||
// call this function.
|
// call this function.
|
||||||
func (ipfs *Connector) shouldUpdateMetric() bool {
|
func (ipfs *Connector) shouldUpdateMetric() bool {
|
||||||
ipfs.updateMetricMutex.Lock()
|
if ipfs.config.InformerTriggerInterval <= 0 {
|
||||||
defer ipfs.updateMetricMutex.Unlock()
|
return false
|
||||||
ipfs.updateMetricCount++
|
}
|
||||||
if ipfs.updateMetricCount%updateMetricMod == 0 {
|
curCount := atomic.AddUint64(&ipfs.updateMetricCount, 1)
|
||||||
ipfs.updateMetricCount = 0
|
if curCount%uint64(ipfs.config.InformerTriggerInterval) == 0 {
|
||||||
|
atomic.StoreUint64(&ipfs.updateMetricCount, 0)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -27,6 +27,7 @@ func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
|
||||||
cfg.Default()
|
cfg.Default()
|
||||||
cfg.NodeAddr = nodeMAddr
|
cfg.NodeAddr = nodeMAddr
|
||||||
cfg.ConnectSwarmsDelay = 0
|
cfg.ConnectSwarmsDelay = 0
|
||||||
|
cfg.InformerTriggerInterval = 10
|
||||||
|
|
||||||
ipfs, err := NewConnector(cfg)
|
ipfs, err := NewConnector(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -407,11 +407,7 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out *
|
||||||
|
|
||||||
// SendInformerMetric runs Cluster.sendInformerMetric().
|
// SendInformerMetric runs Cluster.sendInformerMetric().
|
||||||
func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error {
|
func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error {
|
||||||
_, err := rpcapi.c.sendInformerMetrics(ctx, rpcapi.c.informers[0])
|
return rpcapi.c.sendInformersMetrics(ctx)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendInformersMetrics runs Cluster.sendInformerMetric() on all informers.
|
// SendInformersMetrics runs Cluster.sendInformerMetric() on all informers.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user