Feat #1008: Add batching configuration options and parsing
This commit is contained in:
parent
70096efa1b
commit
75cf1b32c7
|
@ -25,8 +25,26 @@ var (
|
|||
DefaultRebroadcastInterval = time.Minute
|
||||
DefaultTrustedPeers = []peer.ID{}
|
||||
DefaultTrustAll = true
|
||||
DefaultBatchingMaxQueueSize = 1000
|
||||
)
|
||||
|
||||
// BatchingConfig configures parameters for batching multiple pins in a single
|
||||
// CRDT-put operation.
|
||||
//
|
||||
// MaxBatchSize will trigger a commit whenever the number of pins in the batch
|
||||
// reaches the limit.
|
||||
//
|
||||
// MaxBatchAge will trigger a commit when the oldest update in the batch
|
||||
// reaches it. Setting both values to 0 means batching is disabled.
|
||||
//
|
||||
// MaxQueueSize specifies how many items can be waiting to be batched before
|
||||
// the LogPin/Unpin operations block.
|
||||
type BatchingConfig struct {
|
||||
MaxBatchSize int
|
||||
MaxBatchAge time.Duration
|
||||
MaxQueueSize int
|
||||
}
|
||||
|
||||
// Config is the configuration object for Consensus.
|
||||
type Config struct {
|
||||
config.Saver
|
||||
|
@ -45,6 +63,10 @@ type Config struct {
|
|||
// for this peer that are forbidden for other peers.
|
||||
TrustedPeers []peer.ID
|
||||
|
||||
// Specifies whether to batch CRDT updates for increased
|
||||
// performance.
|
||||
Batching BatchingConfig
|
||||
|
||||
// The interval before re-announcing the current state
|
||||
// to the network when no activity is observed.
|
||||
RebroadcastInterval time.Duration
|
||||
|
@ -60,9 +82,16 @@ type Config struct {
|
|||
Tracing bool
|
||||
}
|
||||
|
||||
type batchingConfigJSON struct {
|
||||
MaxBatchSize int `json:"max_batch_size"`
|
||||
MaxBatchAge string `json:"max_batch_age"`
|
||||
MaxQueueSize int `json:"max_queue_size,omitempty"`
|
||||
}
|
||||
|
||||
type jsonConfig struct {
|
||||
ClusterName string `json:"cluster_name"`
|
||||
TrustedPeers []string `json:"trusted_peers"`
|
||||
Batching batchingConfigJSON `json:"batching"`
|
||||
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`
|
||||
|
||||
PeersetMetric string `json:"peerset_metric,omitempty"`
|
||||
|
@ -87,6 +116,10 @@ func (cfg *Config) Validate() error {
|
|||
if cfg.RebroadcastInterval <= 0 {
|
||||
return errors.New("crdt.rebroadcast_interval is invalid")
|
||||
}
|
||||
|
||||
if cfg.Batching.MaxQueueSize <= 0 {
|
||||
return errors.New("crdt.batching.max_queue_size is invalid")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -123,11 +156,15 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
|||
cfg.TrustedPeers = append(cfg.TrustedPeers, pid)
|
||||
}
|
||||
|
||||
cfg.Batching.MaxBatchSize = jcfg.Batching.MaxBatchSize
|
||||
|
||||
config.SetIfNotDefault(jcfg.Batching.MaxQueueSize, &cfg.Batching.MaxQueueSize)
|
||||
config.SetIfNotDefault(jcfg.PeersetMetric, &cfg.PeersetMetric)
|
||||
config.SetIfNotDefault(jcfg.DatastoreNamespace, &cfg.DatastoreNamespace)
|
||||
config.ParseDurations(
|
||||
"crdt",
|
||||
&config.DurationOpt{Duration: jcfg.RebroadcastInterval, Dst: &cfg.RebroadcastInterval, Name: "rebroadcast_interval"},
|
||||
&config.DurationOpt{Duration: jcfg.Batching.MaxBatchAge, Dst: &cfg.Batching.MaxBatchAge, Name: "max_batch_age"},
|
||||
)
|
||||
return cfg.Validate()
|
||||
}
|
||||
|
@ -152,6 +189,13 @@ func (cfg *Config) toJSONConfig() *jsonConfig {
|
|||
jcfg.TrustedPeers = api.PeersToStrings(cfg.TrustedPeers)
|
||||
}
|
||||
|
||||
jcfg.Batching.MaxBatchSize = cfg.Batching.MaxBatchSize
|
||||
jcfg.Batching.MaxBatchAge = cfg.Batching.MaxBatchAge.String()
|
||||
if cfg.Batching.MaxQueueSize != DefaultBatchingMaxQueueSize {
|
||||
jcfg.Batching.MaxQueueSize = cfg.Batching.MaxQueueSize
|
||||
// otherwise leave as 0/hidden
|
||||
}
|
||||
|
||||
if cfg.PeersetMetric != DefaultPeersetMetric {
|
||||
jcfg.PeersetMetric = cfg.PeersetMetric
|
||||
// otherwise leave empty/hidden
|
||||
|
@ -177,6 +221,11 @@ func (cfg *Config) Default() error {
|
|||
cfg.DatastoreNamespace = DefaultDatastoreNamespace
|
||||
cfg.TrustedPeers = DefaultTrustedPeers
|
||||
cfg.TrustAll = DefaultTrustAll
|
||||
cfg.Batching = BatchingConfig{
|
||||
MaxBatchSize: 0,
|
||||
MaxBatchAge: 0,
|
||||
MaxQueueSize: DefaultBatchingMaxQueueSize,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -197,3 +246,8 @@ func (cfg *Config) ApplyEnvVars() error {
|
|||
func (cfg *Config) ToDisplayJSON() ([]byte, error) {
|
||||
return config.DisplayJSON(cfg.toJSONConfig())
|
||||
}
|
||||
|
||||
func (cfg *Config) batchingEnabled() bool {
|
||||
return cfg.Batching.MaxBatchSize > 0 &&
|
||||
cfg.Batching.MaxBatchAge > 0
|
||||
}
|
||||
|
|
|
@ -3,12 +3,18 @@ package crdt
|
|||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var cfgJSON = []byte(`
|
||||
{
|
||||
"cluster_name": "test",
|
||||
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6"]
|
||||
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6"],
|
||||
"batching": {
|
||||
"max_batch_size": 30,
|
||||
"max_batch_age": "5s",
|
||||
"max_queue_size": 150
|
||||
}
|
||||
}
|
||||
`)
|
||||
|
||||
|
@ -22,6 +28,12 @@ func TestLoadJSON(t *testing.T) {
|
|||
t.Error("TrustAll should not be enabled when peers in trusted peers")
|
||||
}
|
||||
|
||||
if cfg.Batching.MaxBatchSize != 30 ||
|
||||
cfg.Batching.MaxBatchAge != 5*time.Second ||
|
||||
cfg.Batching.MaxQueueSize != 150 {
|
||||
t.Error("Batching options were not parsed correctly")
|
||||
}
|
||||
|
||||
cfg = &Config{}
|
||||
err = cfg.LoadJSON([]byte(`
|
||||
{
|
||||
|
@ -59,6 +71,10 @@ func TestLoadJSON(t *testing.T) {
|
|||
if !cfg.TrustAll {
|
||||
t.Error("expected TrustAll to be true")
|
||||
}
|
||||
|
||||
if cfg.Batching.MaxQueueSize != DefaultBatchingMaxQueueSize {
|
||||
t.Error("MaxQueueSize should be default when unset")
|
||||
}
|
||||
}
|
||||
|
||||
func TestToJSON(t *testing.T) {
|
||||
|
@ -99,15 +115,32 @@ func TestDefault(t *testing.T) {
|
|||
if cfg.Validate() == nil {
|
||||
t.Fatal("expected error validating")
|
||||
}
|
||||
|
||||
cfg.Default()
|
||||
cfg.Batching.MaxQueueSize = -3
|
||||
if cfg.Validate() == nil {
|
||||
t.Fatal("expected error validating")
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyEnvVars(t *testing.T) {
|
||||
os.Setenv("CLUSTER_CRDT_CLUSTERNAME", "test2")
|
||||
os.Setenv("CLUSTER_CRDT_BATCHING_MAXBATCHSIZE", "5")
|
||||
os.Setenv("CLUSTER_CRDT_BATCHING_MAXBATCHAGE", "10s")
|
||||
|
||||
cfg := &Config{}
|
||||
cfg.Default()
|
||||
cfg.ApplyEnvVars()
|
||||
|
||||
if cfg.ClusterName != "test2" {
|
||||
t.Fatal("failed to override cluster_name with env var")
|
||||
t.Error("failed to override cluster_name with env var")
|
||||
}
|
||||
|
||||
if cfg.Batching.MaxBatchSize != 5 {
|
||||
t.Error("MaxBatchSize as env var does not work")
|
||||
}
|
||||
|
||||
if cfg.Batching.MaxBatchAge != 10*time.Second {
|
||||
t.Error("MaxBatchAge as env var does not work")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user