diff --git a/config_test.go b/config_test.go index 7c592bdf..2d5f79bc 100644 --- a/config_test.go +++ b/config_test.go @@ -112,7 +112,8 @@ var testingIpfsCfg = []byte(`{ "node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "connect_swarms_delay": "7s", "pin_timeout": "30s", - "unpin_timeout": "15s" + "unpin_timeout": "15s", + "informer_trigger_interval": 10 }`) var testingTrackerCfg = []byte(` diff --git a/ipfsconn/ipfshttp/config.go b/ipfsconn/ipfshttp/config.go index 76aa78a7..757722c5 100644 --- a/ipfsconn/ipfshttp/config.go +++ b/ipfsconn/ipfshttp/config.go @@ -18,13 +18,14 @@ const envConfigKey = "cluster_ipfshttp" // Default values for Config. const ( - DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001" - DefaultConnectSwarmsDelay = 30 * time.Second - DefaultIPFSRequestTimeout = 5 * time.Minute - DefaultPinTimeout = 2 * time.Minute - DefaultUnpinTimeout = 3 * time.Hour - DefaultRepoGCTimeout = 24 * time.Hour - DefaultUnpinDisable = false + DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001" + DefaultConnectSwarmsDelay = 30 * time.Second + DefaultIPFSRequestTimeout = 5 * time.Minute + DefaultPinTimeout = 2 * time.Minute + DefaultUnpinTimeout = 3 * time.Hour + DefaultRepoGCTimeout = 24 * time.Hour + DefaultInformerTriggerInterval = 0 // disabled + DefaultUnpinDisable = false ) // Config is used to initialize a Connector and allows to customize @@ -51,6 +52,11 @@ type Config struct { // RepoGC Operation timeout RepoGCTimeout time.Duration + + // How many pin and block/put operations need to happen before we do a + // special broadcast informer metrics to the network. 0 to disable. + InformerTriggerInterval int + // Disables the unpin operation and returns an error. UnpinDisable bool @@ -59,13 +65,14 @@ type Config struct { } type jsonConfig struct { - NodeMultiaddress string `json:"node_multiaddress"` - ConnectSwarmsDelay string `json:"connect_swarms_delay"` - IPFSRequestTimeout string `json:"ipfs_request_timeout"` - PinTimeout string `json:"pin_timeout"` - UnpinTimeout string `json:"unpin_timeout"` - RepoGCTimeout string `json:"repogc_timeout"` - UnpinDisable bool `json:"unpin_disable,omitempty"` + NodeMultiaddress string `json:"node_multiaddress"` + ConnectSwarmsDelay string `json:"connect_swarms_delay"` + IPFSRequestTimeout string `json:"ipfs_request_timeout"` + PinTimeout string `json:"pin_timeout"` + UnpinTimeout string `json:"unpin_timeout"` + RepoGCTimeout string `json:"repogc_timeout"` + InformerTriggerInterval int `json:"informer_trigger_interval"` + UnpinDisable bool `json:"unpin_disable,omitempty"` } // ConfigKey provides a human-friendly identifier for this type of Config. @@ -82,6 +89,7 @@ func (cfg *Config) Default() error { cfg.PinTimeout = DefaultPinTimeout cfg.UnpinTimeout = DefaultUnpinTimeout cfg.RepoGCTimeout = DefaultRepoGCTimeout + cfg.InformerTriggerInterval = DefaultInformerTriggerInterval cfg.UnpinDisable = DefaultUnpinDisable return nil @@ -130,6 +138,9 @@ func (cfg *Config) Validate() error { if cfg.RepoGCTimeout < 0 { err = errors.New("ipfshttp.repogc_timeout invalid") } + if cfg.InformerTriggerInterval < 0 { + err = errors.New("ipfshttp.update_metrics_after") + } return err @@ -157,6 +168,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { cfg.NodeAddr = nodeAddr cfg.UnpinDisable = jcfg.UnpinDisable + cfg.InformerTriggerInterval = jcfg.InformerTriggerInterval err = config.ParseDurations( "ipfshttp", @@ -201,6 +213,7 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) { jcfg.PinTimeout = cfg.PinTimeout.String() jcfg.UnpinTimeout = cfg.UnpinTimeout.String() jcfg.RepoGCTimeout = cfg.RepoGCTimeout.String() + jcfg.InformerTriggerInterval = cfg.InformerTriggerInterval jcfg.UnpinDisable = cfg.UnpinDisable return diff --git a/ipfsconn/ipfshttp/config_test.go b/ipfsconn/ipfshttp/config_test.go index 8e1038dc..87e1258a 100644 --- a/ipfsconn/ipfshttp/config_test.go +++ b/ipfsconn/ipfshttp/config_test.go @@ -14,7 +14,8 @@ var cfgJSON = []byte(` "ipfs_request_timeout": "5m0s", "pin_timeout": "2m", "unpin_timeout": "3h", - "repogc_timeout": "24h" + "repogc_timeout": "24h", + "informer_trigger_interval": 10 } `) @@ -27,6 +28,11 @@ func TestLoadJSON(t *testing.T) { j := &jsonConfig{} json.Unmarshal(cfgJSON, j) + + if cfg.InformerTriggerInterval != 10 { + t.Error("missing value") + } + j.NodeMultiaddress = "abc" tst, _ := json.Marshal(j) err = cfg.LoadJSON(tst) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 817f5a57..53a44b8e 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -14,6 +14,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/ipfs/ipfs-cluster/api" @@ -41,11 +42,6 @@ var DNSTimeout = 5 * time.Second var logger = logging.Logger("ipfshttp") -// updateMetricsMod only makes updates to informer metrics -// on the nth occasion. So, for example, for every BlockPut, -// only the 10th will trigger a SendInformerMetrics call. -var updateMetricMod = 10 - // Connector implements the IPFSConnector interface // and provides a component which is used to perform // on-demand requests against the configured IPFS daemom @@ -62,8 +58,7 @@ type Connector struct { client *http.Client // client to ipfs daemon - updateMetricMutex sync.Mutex - updateMetricCount int + updateMetricCount uint64 shutdownLock sync.Mutex shutdown bool @@ -970,11 +965,12 @@ func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) // Returns true every updateMetricsMod-th time that we // call this function. func (ipfs *Connector) shouldUpdateMetric() bool { - ipfs.updateMetricMutex.Lock() - defer ipfs.updateMetricMutex.Unlock() - ipfs.updateMetricCount++ - if ipfs.updateMetricCount%updateMetricMod == 0 { - ipfs.updateMetricCount = 0 + if ipfs.config.InformerTriggerInterval <= 0 { + return false + } + curCount := atomic.AddUint64(&ipfs.updateMetricCount, 1) + if curCount%uint64(ipfs.config.InformerTriggerInterval) == 0 { + atomic.StoreUint64(&ipfs.updateMetricCount, 0) return true } return false diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index f6a52317..21bcba57 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -27,6 +27,7 @@ func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) { cfg.Default() cfg.NodeAddr = nodeMAddr cfg.ConnectSwarmsDelay = 0 + cfg.InformerTriggerInterval = 10 ipfs, err := NewConnector(cfg) if err != nil { diff --git a/rpc_api.go b/rpc_api.go index ff880398..6a78922a 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -407,11 +407,7 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out * // SendInformerMetric runs Cluster.sendInformerMetric(). func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error { - _, err := rpcapi.c.sendInformerMetrics(ctx, rpcapi.c.informers[0]) - if err != nil { - return err - } - return nil + return rpcapi.c.sendInformersMetrics(ctx) } // SendInformersMetrics runs Cluster.sendInformerMetric() on all informers.