Fix #88: Run SyncAllLocal() regularly on peers.

It makes a pin ls requests to ipfs and makes sure the pin tracker is
up to date.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-04-05 23:29:22 +02:00
parent 4ae6486e26
commit 856252e5c6
3 changed files with 31 additions and 6 deletions

View File

@ -179,14 +179,22 @@ func (c *Cluster) setupRPCClients() {
c.informer.SetClient(c.rpcClient)
}
// stateSyncWatcher loops and triggers StateSync from time to time
func (c *Cluster) stateSyncWatcher() {
// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time
func (c *Cluster) syncWatcher() {
stateSyncTicker := time.NewTicker(
time.Duration(c.config.StateSyncSeconds) * time.Second)
syncTicker := time.NewTicker(
time.Duration(c.config.IPFSSyncSeconds) * time.Second)
for {
select {
case <-stateSyncTicker.C:
logger.Debug("auto-triggering StateSync()")
c.StateSync()
case <-syncTicker.C:
logger.Debug("auto-triggering SyncAllLocal()")
c.SyncAllLocal()
case <-c.ctx.Done():
stateSyncTicker.Stop()
return
@ -318,7 +326,7 @@ func (c *Cluster) repinFromPeer(p peer.ID) {
// run provides a cancellable context and launches some goroutines
// before signaling readyCh
func (c *Cluster) run() {
go c.stateSyncWatcher()
go c.syncWatcher()
go c.pushPingMetrics()
go c.pushInformerMetrics()
go c.alertsHandler()

View File

@ -21,6 +21,7 @@ const (
DefaultIPFSNodeAddr = "/ip4/127.0.0.1/tcp/5001"
DefaultClusterAddr = "/ip4/0.0.0.0/tcp/9096"
DefaultStateSyncSeconds = 60
DefaultIPFSSyncSeconds = 130
DefaultMonitoringIntervalSeconds = 15
)
@ -67,9 +68,12 @@ type Config struct {
// the Consensus component.
ConsensusDataFolder string
// Number of seconds between StateSync() operations
// Number of seconds between automatic calls to StateSync().
StateSyncSeconds int
// Number of seconds between automatic calls to SyncAllLocal().
IPFSSyncSeconds int
// ReplicationFactor is the number of copies we keep for each pin
ReplicationFactor int
@ -140,6 +144,12 @@ type JSONConfig struct {
// when new nodes are joining the cluster
StateSyncSeconds int `json:"state_sync_seconds"`
// Number of seconds between syncs of the local state and
// the state of the ipfs daemon. This ensures that cluster
// provides the right status for tracked items (for example
// to detect that a pin has been removed.
IPFSSyncSeconds int `json:"ipfs_sync_seconds"`
// ReplicationFactor indicates the number of nodes that must pin content.
// For exampe, a replication_factor of 2 will prompt cluster to choose
// two nodes for each pinned hash. A replication_factor -1 will
@ -193,6 +203,7 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) {
IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(),
ConsensusDataFolder: cfg.ConsensusDataFolder,
StateSyncSeconds: cfg.StateSyncSeconds,
IPFSSyncSeconds: cfg.IPFSSyncSeconds,
ReplicationFactor: cfg.ReplicationFactor,
MonitoringIntervalSeconds: cfg.MonitoringIntervalSeconds,
AllocationStrategy: cfg.AllocationStrategy,
@ -273,6 +284,10 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
jcfg.StateSyncSeconds = DefaultStateSyncSeconds
}
if jcfg.IPFSSyncSeconds <= 0 {
jcfg.IPFSSyncSeconds = DefaultIPFSSyncSeconds
}
if jcfg.MonitoringIntervalSeconds <= 0 {
jcfg.MonitoringIntervalSeconds = DefaultMonitoringIntervalSeconds
}
@ -293,6 +308,7 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: jcfg.ConsensusDataFolder,
StateSyncSeconds: jcfg.StateSyncSeconds,
IPFSSyncSeconds: jcfg.IPFSSyncSeconds,
ReplicationFactor: jcfg.ReplicationFactor,
MonitoringIntervalSeconds: jcfg.MonitoringIntervalSeconds,
AllocationStrategy: jcfg.AllocationStrategy,
@ -409,6 +425,7 @@ func NewDefaultConfig() (*Config, error) {
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: "ipfscluster-data",
StateSyncSeconds: DefaultStateSyncSeconds,
IPFSSyncSeconds: DefaultIPFSSyncSeconds,
ReplicationFactor: -1,
MonitoringIntervalSeconds: DefaultMonitoringIntervalSeconds,
AllocationStrategy: "reposize",

View File

@ -22,8 +22,8 @@ var logger = logging.Logger("pintracker")
// if the Cid has stayed in Pinning or Unpinning state
// for longer than these values.
var (
PinningTimeout = 15 * time.Minute
UnpinningTimeout = 10 * time.Second
PinningTimeout = 60 * time.Minute
UnpinningTimeout = 15 * time.Minute
)
// PinQueueSize specifies the maximum amount of pin operations waiting