Feat: regularly trigger "recover" automatically

Interval controlled by a pin_recover_interval option in the config.
This commit is contained in:
Hector Sanjuan 2019-07-08 16:55:05 +02:00
parent 53f1a99feb
commit 997208a82f
3 changed files with 32 additions and 1 deletions

View File

@ -229,6 +229,7 @@ func (c *Cluster) syncWatcher() {
stateSyncTicker := time.NewTicker(c.config.StateSyncInterval)
syncTicker := time.NewTicker(c.config.IPFSSyncInterval)
recoverTicker := time.NewTicker(c.config.PinRecoverInterval)
for {
select {
@ -238,6 +239,9 @@ func (c *Cluster) syncWatcher() {
case <-syncTicker.C:
logger.Debug("auto-triggering SyncAllLocal()")
c.SyncAllLocal(ctx)
case <-recoverTicker.C:
logger.Debug("auto-triggering RecoverAllLocal()")
c.RecoverAllLocal(ctx)
case <-c.ctx.Done():
stateSyncTicker.Stop()
return

View File

@ -26,6 +26,7 @@ const (
DefaultListenAddr = "/ip4/0.0.0.0/tcp/9096"
DefaultStateSyncInterval = 600 * time.Second
DefaultIPFSSyncInterval = 130 * time.Second
DefaultPinRecoverInterval = 1 * time.Hour
DefaultMonitorPingInterval = 15 * time.Second
DefaultPeerWatchInterval = 5 * time.Second
DefaultReplicationFactor = -1
@ -82,7 +83,7 @@ type Config struct {
// consistency, increase with larger states.
StateSyncInterval time.Duration
// Number of seconds between syncs of the local state and
// Time between syncs of the local state and
// the state of the ipfs daemon. This ensures that cluster
// provides the right status for tracked items (for example
// to detect that a pin has been removed. Reduce for faster
@ -90,6 +91,10 @@ type Config struct {
// large.
IPFSSyncInterval time.Duration
// Time between automatic runs of the "recover" operation
// which will retry to pin/unpin items in error state.
PinRecoverInterval time.Duration
// ReplicationFactorMax indicates the target number of nodes
// that should pin content. For exampe, a replication_factor of
// 3 will have cluster allocate each pinned hash to 3 peers if
@ -146,6 +151,7 @@ type configJSON struct {
ConnectionManager *connMgrConfigJSON `json:"connection_manager"`
StateSyncInterval string `json:"state_sync_interval"`
IPFSSyncInterval string `json:"ipfs_sync_interval"`
PinRecoverInterval string `json:"pin_recover_interval"`
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
MonitorPingInterval string `json:"monitor_ping_interval"`
@ -231,6 +237,10 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.ipfs_sync_interval is invalid")
}
if cfg.PinRecoverInterval <= 0 {
return errors.New("cluster.pin_recover_interval is invalid")
}
if cfg.MonitorPingInterval <= 0 {
return errors.New("cluster.monitoring_interval is invalid")
}
@ -319,6 +329,7 @@ func (cfg *Config) setDefaults() {
cfg.LeaveOnShutdown = DefaultLeaveOnShutdown
cfg.StateSyncInterval = DefaultStateSyncInterval
cfg.IPFSSyncInterval = DefaultIPFSSyncInterval
cfg.PinRecoverInterval = DefaultPinRecoverInterval
cfg.ReplicationFactorMin = DefaultReplicationFactor
cfg.ReplicationFactorMax = DefaultReplicationFactor
cfg.MonitorPingInterval = DefaultMonitorPingInterval
@ -384,6 +395,7 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
err = config.ParseDurations("cluster",
&config.DurationOpt{Duration: jcfg.StateSyncInterval, Dst: &cfg.StateSyncInterval, Name: "state_sync_interval"},
&config.DurationOpt{Duration: jcfg.IPFSSyncInterval, Dst: &cfg.IPFSSyncInterval, Name: "ipfs_sync_interval"},
&config.DurationOpt{Duration: jcfg.PinRecoverInterval, Dst: &cfg.PinRecoverInterval, Name: "pin_recover_interval"},
&config.DurationOpt{Duration: jcfg.MonitorPingInterval, Dst: &cfg.MonitorPingInterval, Name: "monitor_ping_interval"},
&config.DurationOpt{Duration: jcfg.PeerWatchInterval, Dst: &cfg.PeerWatchInterval, Name: "peer_watch_interval"},
)
@ -432,6 +444,7 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
}
jcfg.StateSyncInterval = cfg.StateSyncInterval.String()
jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String()
jcfg.PinRecoverInterval = cfg.PinRecoverInterval.String()
jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String()
jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String()
jcfg.DisableRepinning = cfg.DisableRepinning

View File

@ -20,6 +20,7 @@ var ccfgTestJSON = []byte(`
"listen_multiaddress": "/ip4/127.0.0.1/tcp/10000",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"pin_recover_interval": "1m",
"replication_factor_min": 5,
"replication_factor_max": 5,
"monitor_ping_interval": "2s",
@ -66,6 +67,13 @@ func TestLoadJSON(t *testing.T) {
}
})
t.Run("expected pin_recover_interval", func(t *testing.T) {
cfg := loadJSON(t)
if cfg.PinRecoverInterval != time.Minute {
t.Error("expected pin_recover_interval of 1m")
}
})
t.Run("expected connection_manager", func(t *testing.T) {
cfg := loadJSON(t)
if cfg.ConnMgr.LowWater != 500 {
@ -246,4 +254,10 @@ func TestValidate(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.PinRecoverInterval = 0
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}