diff --git a/cluster.go b/cluster.go index c23c8142..7f5d5b9d 100644 --- a/cluster.go +++ b/cluster.go @@ -229,6 +229,7 @@ func (c *Cluster) syncWatcher() { stateSyncTicker := time.NewTicker(c.config.StateSyncInterval) syncTicker := time.NewTicker(c.config.IPFSSyncInterval) + recoverTicker := time.NewTicker(c.config.PinRecoverInterval) for { select { @@ -238,6 +239,9 @@ func (c *Cluster) syncWatcher() { case <-syncTicker.C: logger.Debug("auto-triggering SyncAllLocal()") c.SyncAllLocal(ctx) + case <-recoverTicker.C: + logger.Debug("auto-triggering RecoverAllLocal()") + c.RecoverAllLocal(ctx) case <-c.ctx.Done(): stateSyncTicker.Stop() return diff --git a/cluster_config.go b/cluster_config.go index f1a4f70a..113b9658 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -26,6 +26,7 @@ const ( DefaultListenAddr = "/ip4/0.0.0.0/tcp/9096" DefaultStateSyncInterval = 600 * time.Second DefaultIPFSSyncInterval = 130 * time.Second + DefaultPinRecoverInterval = 1 * time.Hour DefaultMonitorPingInterval = 15 * time.Second DefaultPeerWatchInterval = 5 * time.Second DefaultReplicationFactor = -1 @@ -82,7 +83,7 @@ type Config struct { // consistency, increase with larger states. StateSyncInterval time.Duration - // Number of seconds between syncs of the local state and + // Time between syncs of the local state and // the state of the ipfs daemon. This ensures that cluster // provides the right status for tracked items (for example // to detect that a pin has been removed. Reduce for faster @@ -90,6 +91,10 @@ type Config struct { // large. IPFSSyncInterval time.Duration + // Time between automatic runs of the "recover" operation + // which will retry to pin/unpin items in error state. + PinRecoverInterval time.Duration + // ReplicationFactorMax indicates the target number of nodes // that should pin content. For exampe, a replication_factor of // 3 will have cluster allocate each pinned hash to 3 peers if @@ -146,6 +151,7 @@ type configJSON struct { ConnectionManager *connMgrConfigJSON `json:"connection_manager"` StateSyncInterval string `json:"state_sync_interval"` IPFSSyncInterval string `json:"ipfs_sync_interval"` + PinRecoverInterval string `json:"pin_recover_interval"` ReplicationFactorMin int `json:"replication_factor_min"` ReplicationFactorMax int `json:"replication_factor_max"` MonitorPingInterval string `json:"monitor_ping_interval"` @@ -231,6 +237,10 @@ func (cfg *Config) Validate() error { return errors.New("cluster.ipfs_sync_interval is invalid") } + if cfg.PinRecoverInterval <= 0 { + return errors.New("cluster.pin_recover_interval is invalid") + } + if cfg.MonitorPingInterval <= 0 { return errors.New("cluster.monitoring_interval is invalid") } @@ -319,6 +329,7 @@ func (cfg *Config) setDefaults() { cfg.LeaveOnShutdown = DefaultLeaveOnShutdown cfg.StateSyncInterval = DefaultStateSyncInterval cfg.IPFSSyncInterval = DefaultIPFSSyncInterval + cfg.PinRecoverInterval = DefaultPinRecoverInterval cfg.ReplicationFactorMin = DefaultReplicationFactor cfg.ReplicationFactorMax = DefaultReplicationFactor cfg.MonitorPingInterval = DefaultMonitorPingInterval @@ -384,6 +395,7 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error { err = config.ParseDurations("cluster", &config.DurationOpt{Duration: jcfg.StateSyncInterval, Dst: &cfg.StateSyncInterval, Name: "state_sync_interval"}, &config.DurationOpt{Duration: jcfg.IPFSSyncInterval, Dst: &cfg.IPFSSyncInterval, Name: "ipfs_sync_interval"}, + &config.DurationOpt{Duration: jcfg.PinRecoverInterval, Dst: &cfg.PinRecoverInterval, Name: "pin_recover_interval"}, &config.DurationOpt{Duration: jcfg.MonitorPingInterval, Dst: &cfg.MonitorPingInterval, Name: "monitor_ping_interval"}, &config.DurationOpt{Duration: jcfg.PeerWatchInterval, Dst: &cfg.PeerWatchInterval, Name: "peer_watch_interval"}, ) @@ -432,6 +444,7 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) { } jcfg.StateSyncInterval = cfg.StateSyncInterval.String() jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String() + jcfg.PinRecoverInterval = cfg.PinRecoverInterval.String() jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String() jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String() jcfg.DisableRepinning = cfg.DisableRepinning diff --git a/cluster_config_test.go b/cluster_config_test.go index bbc5747b..98d2cca4 100644 --- a/cluster_config_test.go +++ b/cluster_config_test.go @@ -20,6 +20,7 @@ var ccfgTestJSON = []byte(` "listen_multiaddress": "/ip4/127.0.0.1/tcp/10000", "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", + "pin_recover_interval": "1m", "replication_factor_min": 5, "replication_factor_max": 5, "monitor_ping_interval": "2s", @@ -66,6 +67,13 @@ func TestLoadJSON(t *testing.T) { } }) + t.Run("expected pin_recover_interval", func(t *testing.T) { + cfg := loadJSON(t) + if cfg.PinRecoverInterval != time.Minute { + t.Error("expected pin_recover_interval of 1m") + } + }) + t.Run("expected connection_manager", func(t *testing.T) { cfg := loadJSON(t) if cfg.ConnMgr.LowWater != 500 { @@ -246,4 +254,10 @@ func TestValidate(t *testing.T) { if cfg.Validate() == nil { t.Fatal("expected error validating") } + + cfg.Default() + cfg.PinRecoverInterval = 0 + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } }