Merge pull request #1559 from ipfs/fix/repo-stat-hammering
Fix: repo/stat gets hammered on busy cluster peers
This commit is contained in:
commit
2c204968b8
|
@ -112,7 +112,8 @@ var testingIpfsCfg = []byte(`{
|
|||
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||
"connect_swarms_delay": "7s",
|
||||
"pin_timeout": "30s",
|
||||
"unpin_timeout": "15s"
|
||||
"unpin_timeout": "15s",
|
||||
"informer_trigger_interval": 10
|
||||
}`)
|
||||
|
||||
var testingTrackerCfg = []byte(`
|
||||
|
|
|
@ -18,13 +18,14 @@ const envConfigKey = "cluster_ipfshttp"
|
|||
|
||||
// Default values for Config.
|
||||
const (
|
||||
DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001"
|
||||
DefaultConnectSwarmsDelay = 30 * time.Second
|
||||
DefaultIPFSRequestTimeout = 5 * time.Minute
|
||||
DefaultPinTimeout = 2 * time.Minute
|
||||
DefaultUnpinTimeout = 3 * time.Hour
|
||||
DefaultRepoGCTimeout = 24 * time.Hour
|
||||
DefaultUnpinDisable = false
|
||||
DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001"
|
||||
DefaultConnectSwarmsDelay = 30 * time.Second
|
||||
DefaultIPFSRequestTimeout = 5 * time.Minute
|
||||
DefaultPinTimeout = 2 * time.Minute
|
||||
DefaultUnpinTimeout = 3 * time.Hour
|
||||
DefaultRepoGCTimeout = 24 * time.Hour
|
||||
DefaultInformerTriggerInterval = 0 // disabled
|
||||
DefaultUnpinDisable = false
|
||||
)
|
||||
|
||||
// Config is used to initialize a Connector and allows to customize
|
||||
|
@ -51,6 +52,11 @@ type Config struct {
|
|||
|
||||
// RepoGC Operation timeout
|
||||
RepoGCTimeout time.Duration
|
||||
|
||||
// How many pin and block/put operations need to happen before we do a
|
||||
// special broadcast informer metrics to the network. 0 to disable.
|
||||
InformerTriggerInterval int
|
||||
|
||||
// Disables the unpin operation and returns an error.
|
||||
UnpinDisable bool
|
||||
|
||||
|
@ -59,13 +65,14 @@ type Config struct {
|
|||
}
|
||||
|
||||
type jsonConfig struct {
|
||||
NodeMultiaddress string `json:"node_multiaddress"`
|
||||
ConnectSwarmsDelay string `json:"connect_swarms_delay"`
|
||||
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
|
||||
PinTimeout string `json:"pin_timeout"`
|
||||
UnpinTimeout string `json:"unpin_timeout"`
|
||||
RepoGCTimeout string `json:"repogc_timeout"`
|
||||
UnpinDisable bool `json:"unpin_disable,omitempty"`
|
||||
NodeMultiaddress string `json:"node_multiaddress"`
|
||||
ConnectSwarmsDelay string `json:"connect_swarms_delay"`
|
||||
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
|
||||
PinTimeout string `json:"pin_timeout"`
|
||||
UnpinTimeout string `json:"unpin_timeout"`
|
||||
RepoGCTimeout string `json:"repogc_timeout"`
|
||||
InformerTriggerInterval int `json:"informer_trigger_interval"`
|
||||
UnpinDisable bool `json:"unpin_disable,omitempty"`
|
||||
}
|
||||
|
||||
// ConfigKey provides a human-friendly identifier for this type of Config.
|
||||
|
@ -82,6 +89,7 @@ func (cfg *Config) Default() error {
|
|||
cfg.PinTimeout = DefaultPinTimeout
|
||||
cfg.UnpinTimeout = DefaultUnpinTimeout
|
||||
cfg.RepoGCTimeout = DefaultRepoGCTimeout
|
||||
cfg.InformerTriggerInterval = DefaultInformerTriggerInterval
|
||||
cfg.UnpinDisable = DefaultUnpinDisable
|
||||
|
||||
return nil
|
||||
|
@ -130,6 +138,9 @@ func (cfg *Config) Validate() error {
|
|||
if cfg.RepoGCTimeout < 0 {
|
||||
err = errors.New("ipfshttp.repogc_timeout invalid")
|
||||
}
|
||||
if cfg.InformerTriggerInterval < 0 {
|
||||
err = errors.New("ipfshttp.update_metrics_after")
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
|
@ -157,6 +168,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
|||
|
||||
cfg.NodeAddr = nodeAddr
|
||||
cfg.UnpinDisable = jcfg.UnpinDisable
|
||||
cfg.InformerTriggerInterval = jcfg.InformerTriggerInterval
|
||||
|
||||
err = config.ParseDurations(
|
||||
"ipfshttp",
|
||||
|
@ -201,6 +213,7 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) {
|
|||
jcfg.PinTimeout = cfg.PinTimeout.String()
|
||||
jcfg.UnpinTimeout = cfg.UnpinTimeout.String()
|
||||
jcfg.RepoGCTimeout = cfg.RepoGCTimeout.String()
|
||||
jcfg.InformerTriggerInterval = cfg.InformerTriggerInterval
|
||||
jcfg.UnpinDisable = cfg.UnpinDisable
|
||||
|
||||
return
|
||||
|
|
|
@ -14,7 +14,8 @@ var cfgJSON = []byte(`
|
|||
"ipfs_request_timeout": "5m0s",
|
||||
"pin_timeout": "2m",
|
||||
"unpin_timeout": "3h",
|
||||
"repogc_timeout": "24h"
|
||||
"repogc_timeout": "24h",
|
||||
"informer_trigger_interval": 10
|
||||
}
|
||||
`)
|
||||
|
||||
|
@ -27,6 +28,11 @@ func TestLoadJSON(t *testing.T) {
|
|||
|
||||
j := &jsonConfig{}
|
||||
json.Unmarshal(cfgJSON, j)
|
||||
|
||||
if cfg.InformerTriggerInterval != 10 {
|
||||
t.Error("missing value")
|
||||
}
|
||||
|
||||
j.NodeMultiaddress = "abc"
|
||||
tst, _ := json.Marshal(j)
|
||||
err = cfg.LoadJSON(tst)
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
|
@ -41,11 +42,6 @@ var DNSTimeout = 5 * time.Second
|
|||
|
||||
var logger = logging.Logger("ipfshttp")
|
||||
|
||||
// updateMetricsMod only makes updates to informer metrics
|
||||
// on the nth occasion. So, for example, for every BlockPut,
|
||||
// only the 10th will trigger a SendInformerMetrics call.
|
||||
var updateMetricMod = 10
|
||||
|
||||
// Connector implements the IPFSConnector interface
|
||||
// and provides a component which is used to perform
|
||||
// on-demand requests against the configured IPFS daemom
|
||||
|
@ -62,8 +58,7 @@ type Connector struct {
|
|||
|
||||
client *http.Client // client to ipfs daemon
|
||||
|
||||
updateMetricMutex sync.Mutex
|
||||
updateMetricCount int
|
||||
updateMetricCount uint64
|
||||
|
||||
shutdownLock sync.Mutex
|
||||
shutdown bool
|
||||
|
@ -970,11 +965,12 @@ func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error)
|
|||
// Returns true every updateMetricsMod-th time that we
|
||||
// call this function.
|
||||
func (ipfs *Connector) shouldUpdateMetric() bool {
|
||||
ipfs.updateMetricMutex.Lock()
|
||||
defer ipfs.updateMetricMutex.Unlock()
|
||||
ipfs.updateMetricCount++
|
||||
if ipfs.updateMetricCount%updateMetricMod == 0 {
|
||||
ipfs.updateMetricCount = 0
|
||||
if ipfs.config.InformerTriggerInterval <= 0 {
|
||||
return false
|
||||
}
|
||||
curCount := atomic.AddUint64(&ipfs.updateMetricCount, 1)
|
||||
if curCount%uint64(ipfs.config.InformerTriggerInterval) == 0 {
|
||||
atomic.StoreUint64(&ipfs.updateMetricCount, 0)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
|
|
@ -27,6 +27,7 @@ func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
|
|||
cfg.Default()
|
||||
cfg.NodeAddr = nodeMAddr
|
||||
cfg.ConnectSwarmsDelay = 0
|
||||
cfg.InformerTriggerInterval = 10
|
||||
|
||||
ipfs, err := NewConnector(cfg)
|
||||
if err != nil {
|
||||
|
|
|
@ -408,11 +408,7 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out *
|
|||
|
||||
// SendInformerMetric runs Cluster.sendInformerMetric().
|
||||
func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error {
|
||||
_, err := rpcapi.c.sendInformerMetrics(ctx, rpcapi.c.informers[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return rpcapi.c.sendInformersMetrics(ctx)
|
||||
}
|
||||
|
||||
// SendInformersMetrics runs Cluster.sendInformerMetric() on all informers.
|
||||
|
|
Loading…
Reference in New Issue
Block a user