crdt: add RepairInterval option

This commit is contained in:
Hector Sanjuan 2022-02-01 23:29:40 +01:00
parent 881266a1a0
commit a64a1c2b6e
3 changed files with 26 additions and 1 deletions

View File

@ -26,6 +26,7 @@ var (
DefaultTrustedPeers = []peer.ID{}
DefaultTrustAll = true
DefaultBatchingMaxQueueSize = 50000
DefaultRepairInterval = time.Hour
)
// BatchingConfig configures parameters for batching multiple pins in a single
@ -78,6 +79,10 @@ type Config struct {
// All keys written to the datastore will be namespaced with this prefix
DatastoreNamespace string
// How often the underlying crdt store triggers a repair when the
// datastore is marked dirty.
RepairInterval time.Duration
// Tracing enables propagation of contexts across binary boundaries.
Tracing bool
}
@ -92,6 +97,7 @@ type jsonConfig struct {
ClusterName string `json:"cluster_name"`
TrustedPeers []string `json:"trusted_peers"`
Batching batchingConfigJSON `json:"batching"`
RepairInterval string `json:"repair_interval"`
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`
PeersetMetric string `json:"peerset_metric,omitempty"`
@ -120,6 +126,10 @@ func (cfg *Config) Validate() error {
if cfg.Batching.MaxQueueSize <= 0 {
return errors.New("crdt.batching.max_queue_size is invalid")
}
if cfg.RepairInterval < 0 {
return errors.New("crdt.repair_interval is invalid")
}
return nil
}
@ -165,6 +175,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
"crdt",
&config.DurationOpt{Duration: jcfg.RebroadcastInterval, Dst: &cfg.RebroadcastInterval, Name: "rebroadcast_interval"},
&config.DurationOpt{Duration: jcfg.Batching.MaxBatchAge, Dst: &cfg.Batching.MaxBatchAge, Name: "max_batch_age"},
&config.DurationOpt{Duration: jcfg.RepairInterval, Dst: &cfg.RepairInterval, Name: "repair_interval"},
)
return cfg.Validate()
}
@ -210,6 +221,8 @@ func (cfg *Config) toJSONConfig() *jsonConfig {
jcfg.RebroadcastInterval = cfg.RebroadcastInterval.String()
}
jcfg.RepairInterval = cfg.RepairInterval.String()
return jcfg
}
@ -226,6 +239,7 @@ func (cfg *Config) Default() error {
MaxBatchAge: 0,
MaxQueueSize: DefaultBatchingMaxQueueSize,
}
cfg.RepairInterval = DefaultRepairInterval
return nil
}

View File

@ -14,7 +14,8 @@ var cfgJSON = []byte(`
"max_batch_size": 30,
"max_batch_age": "5s",
"max_queue_size": 150
}
},
"repair_interval": "1m"
}
`)
@ -33,6 +34,9 @@ func TestLoadJSON(t *testing.T) {
cfg.Batching.MaxQueueSize != 150 {
t.Error("Batching options were not parsed correctly")
}
if cfg.RepairInterval != time.Minute {
t.Error("repair interval not set")
}
cfg = &Config{}
err = cfg.LoadJSON([]byte(`
@ -121,6 +125,12 @@ func TestDefault(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.RepairInterval = -3
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}
func TestApplyEnvVars(t *testing.T) {

View File

@ -201,6 +201,7 @@ func (css *Consensus) setup() {
opts.RebroadcastInterval = css.config.RebroadcastInterval
opts.DAGSyncerTimeout = 2 * time.Minute
opts.Logger = logger
opts.RepairInterval = css.config.RepairInterval
opts.PutHook = func(k ds.Key, v []byte) {
ctx, span := trace.StartSpan(css.ctx, "crdt/PutHook")
defer span.End()