MapPinTracker: support configuration section

This also generates a default configuration section when it
doesn't exist, so it's backwards compatible.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2017-11-29 14:32:26 +01:00
parent 39fb193eaf
commit 11a8926236
10 changed files with 212 additions and 39 deletions

View File

@ -85,12 +85,12 @@ func (ipfs *mockConnector) FreeSpace() (uint64, error) { retu
func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil }
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {
clusterCfg, _, _, consensusCfg, monCfg, _ := testingConfigs()
clusterCfg, _, _, consensusCfg, trackerCfg, monCfg, _ := testingConfigs()
api := &mockAPI{}
ipfs := &mockConnector{}
st := mapstate.NewMapState()
tracker := maptracker.NewMapPinTracker(clusterCfg.ID)
tracker := maptracker.NewMapPinTracker(trackerCfg, clusterCfg.ID)
monCfg.CheckInterval = 2 * time.Second
mon, _ := basic.NewMonitor(monCfg)
alloc := ascendalloc.NewAllocator()

View File

@ -283,6 +283,10 @@ func (cfg *Manager) LoadJSON(bs []byte) error {
return err
}
logger.Debugf("%s section configuration loaded", name)
} else {
logger.Warningf("%s section is empty, generating default", name)
component.SetBaseDir(dir)
component.Default()
}
}
return nil

View File

@ -6,6 +6,7 @@ import (
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
)
var testingClusterSecret, _ = DecodeClusterSecret("2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed")
@ -59,6 +60,14 @@ var testingIpfsCfg = []byte(`{
"proxy_idle_timeout": "1m0s"
}`)
var testingTrackerCfg = []byte(`
{
"pinning_timeout": "30s",
"unpinning_timeout": "15s",
"max_pin_queue_size": 4092
}
`)
var testingMonCfg = []byte(`{
"check_interval": "2s"
}`)
@ -68,26 +77,28 @@ var testingDiskInfCfg = []byte(`{
"metric_type": "freespace"
}`)
func testingConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *basic.Config, *disk.Config) {
clusterCfg, apiCfg, ipfsCfg, consensusCfg, monCfg, diskInfCfg := testingEmptyConfigs()
func testingConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *disk.Config) {
clusterCfg, apiCfg, ipfsCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg := testingEmptyConfigs()
clusterCfg.LoadJSON(testingClusterCfg)
apiCfg.LoadJSON(testingAPICfg)
ipfsCfg.LoadJSON(testingIpfsCfg)
consensusCfg.LoadJSON(testingRaftCfg)
trackerCfg.LoadJSON(testingTrackerCfg)
monCfg.LoadJSON(testingMonCfg)
diskInfCfg.LoadJSON(testingDiskInfCfg)
return clusterCfg, apiCfg, ipfsCfg, consensusCfg, monCfg, diskInfCfg
return clusterCfg, apiCfg, ipfsCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg
}
func testingEmptyConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *basic.Config, *disk.Config) {
func testingEmptyConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *disk.Config) {
clusterCfg := &Config{}
apiCfg := &rest.Config{}
ipfshttpCfg := &ipfshttp.Config{}
consensusCfg := &raft.Config{}
trackerCfg := &maptracker.Config{}
monCfg := &basic.Config{}
diskInfCfg := &disk.Config{}
return clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, monCfg, diskInfCfg
return clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg
}
// func TestConfigDefault(t *testing.T) {

View File

@ -217,7 +217,7 @@ configuration.
},
Action: func(c *cli.Context) error {
userSecret, userSecretDefined := userProvidedSecret(c.Bool("custom-secret"))
cfg, clustercfg, _, _, _, _, _, _ := makeConfigs()
cfg, clustercfg, _, _, _, _, _, _, _ := makeConfigs()
defer cfg.Shutdown() // wait for saves
// Generate defaults for all registered components
@ -294,7 +294,7 @@ func run(c *cli.Context) error {
func daemon(c *cli.Context) error {
// Load all the configurations
cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, monCfg, diskInfCfg, numpinInfCfg := makeConfigs()
cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg := makeConfigs()
// always wait for configuration to be saved
defer cfg.Shutdown()
@ -328,7 +328,7 @@ func daemon(c *cli.Context) error {
err = validateVersion(clusterCfg, consensusCfg)
checkErr("validating version", err)
tracker := maptracker.NewMapPinTracker(clusterCfg.ID)
tracker := maptracker.NewMapPinTracker(trackerCfg, clusterCfg.ID)
mon, err := basic.NewMonitor(monCfg)
checkErr("creating Monitor component", err)
informer, alloc := setupAllocation(c.String("alloc"), diskInfCfg, numpinInfCfg)
@ -453,12 +453,13 @@ func promptUser(msg string) string {
return scanner.Text()
}
func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshttp.Config, *raft.Config, *basic.Config, *disk.Config, *numpin.Config) {
func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *disk.Config, *numpin.Config) {
cfg := config.NewManager()
clusterCfg := &ipfscluster.Config{}
apiCfg := &rest.Config{}
ipfshttpCfg := &ipfshttp.Config{}
consensusCfg := &raft.Config{}
trackerCfg := &maptracker.Config{}
monCfg := &basic.Config{}
diskInfCfg := &disk.Config{}
numpinInfCfg := &numpin.Config{}
@ -466,8 +467,9 @@ func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshtt
cfg.RegisterComponent(config.API, apiCfg)
cfg.RegisterComponent(config.IPFSConn, ipfshttpCfg)
cfg.RegisterComponent(config.Consensus, consensusCfg)
cfg.RegisterComponent(config.PinTracker, trackerCfg)
cfg.RegisterComponent(config.Monitor, monCfg)
cfg.RegisterComponent(config.Informer, diskInfCfg)
cfg.RegisterComponent(config.Informer, numpinInfCfg)
return cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, monCfg, diskInfCfg, numpinInfCfg
return cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg
}

View File

@ -3,16 +3,15 @@ package main
import (
"errors"
"io/ioutil"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/state/mapstate"
)
func upgrade() error {
//Load configs
cfg, clusterCfg, _, _, consensusCfg, _, _, _ := makeConfigs()
//Load configs
cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()
err := cfg.LoadJSONFromFile(configPath)
if err != nil {
return err
@ -44,7 +43,6 @@ func upgrade() error {
return nil
}
func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error {
state := mapstate.NewMapState()
r, snapExists, err := raft.LastStateRaw(cCfg)

View File

@ -72,7 +72,7 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft
pid, err := peer.IDFromPublicKey(pub)
checkErr(t, err)
clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, monCfg, diskInfCfg := testingConfigs()
clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg := testingConfigs()
clusterCfg.ID = pid
clusterCfg.PrivateKey = priv
@ -89,7 +89,7 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft
ipfs, err := ipfshttp.NewConnector(ipfshttpCfg)
checkErr(t, err)
state := mapstate.NewMapState()
tracker := maptracker.NewMapPinTracker(clusterCfg.ID)
tracker := maptracker.NewMapPinTracker(trackerCfg, clusterCfg.ID)
mon, err := basic.NewMonitor(monCfg)
checkErr(t, err)
alloc := descendalloc.NewAllocator()

View File

@ -0,0 +1,105 @@
package maptracker
import (
"encoding/json"
"errors"
"time"
"github.com/ipfs/ipfs-cluster/config"
)
const configKey = "maptracker"
// Default values for this Config.
const (
DefaultPinningTimeout = 60 * time.Minute
DefaultUnpinningTimeout = 5 * time.Minute
DefaultMaxPinQueueSize = 4096
)
// Config allows to initialize a Monitor and customize some parameters.
type Config struct {
config.Saver
// PinningTimeout specifies how long to wait before a pinning state becomes a pin error
PinningTimeout time.Duration
// UnpinningTimeout specifies how long to wait before an unpinning state becomes a pin error
UnpinningTimeout time.Duration
// MaxPinQueueSize specifies how many pin or unpin requests we can hold in the queue
// If higher, they will automatically marked with an error.
MaxPinQueueSize int
}
type jsonConfig struct {
PinningTimeout string `json:"pinning_timeout"`
UnpinningTimeout string `json:"unpinning_timeout"`
MaxPinQueueSize int `json:"max_pin_queue_size"`
}
// ConfigKey provides a human-friendly identifier for this type of Config.
func (cfg *Config) ConfigKey() string {
return configKey
}
// Default sets the fields of this Config to sensible values.
func (cfg *Config) Default() error {
cfg.PinningTimeout = DefaultPinningTimeout
cfg.UnpinningTimeout = DefaultUnpinningTimeout
cfg.MaxPinQueueSize = DefaultMaxPinQueueSize
return nil
}
// Validate checks that the fields of this Config have working values,
// at least in appearance.
func (cfg *Config) Validate() error {
if cfg.PinningTimeout <= 0 {
return errors.New("maptracker.pinning_timeout too low")
}
if cfg.UnpinningTimeout <= 0 {
return errors.New("maptracker.unpinning_timeout too low")
}
if cfg.MaxPinQueueSize <= 0 {
return errors.New("maptracker.max_pin_queue_size too low")
}
return nil
}
// LoadJSON sets the fields of this Config to the values defined by the JSON
// representation of it, as generated by ToJSON.
func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg)
if err != nil {
logger.Error("Error unmarshaling basic monitor config")
return err
}
cfg.Default()
parseDuration := func(txt string) time.Duration {
d, _ := time.ParseDuration(txt)
if txt != "" && d == 0 {
logger.Warningf("%s is not a valid duration. Default will be used", txt)
}
return d
}
pinningTimeo := parseDuration(jcfg.PinningTimeout)
unpinningTimeo := parseDuration(jcfg.UnpinningTimeout)
config.SetIfNotDefault(pinningTimeo, &cfg.PinningTimeout)
config.SetIfNotDefault(unpinningTimeo, &cfg.UnpinningTimeout)
config.SetIfNotDefault(jcfg.MaxPinQueueSize, &cfg.MaxPinQueueSize)
return cfg.Validate()
}
// ToJSON generates a human-friendly JSON representation of this Config.
func (cfg *Config) ToJSON() ([]byte, error) {
jcfg := &jsonConfig{}
jcfg.PinningTimeout = cfg.PinningTimeout.String()
jcfg.UnpinningTimeout = cfg.UnpinningTimeout.String()
jcfg.MaxPinQueueSize = cfg.MaxPinQueueSize
return config.DefaultJSONMarshal(jcfg)
}

View File

@ -0,0 +1,62 @@
package maptracker
import (
"encoding/json"
"testing"
)
var cfgJSON = []byte(`
{
"pinning_timeout": "30s",
"unpinning_timeout": "15s",
"max_pin_queue_size": 4092
}
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(cfgJSON)
if err != nil {
t.Fatal(err)
}
j := &jsonConfig{}
json.Unmarshal(cfgJSON, j)
j.PinningTimeout = "-10"
tst, _ := json.Marshal(j)
err = cfg.LoadJSON(tst)
if err != nil {
t.Error("did not expect an error")
}
if cfg.PinningTimeout != DefaultPinningTimeout {
t.Error("expected default pinning_timeout")
}
}
func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(cfgJSON)
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
}
cfg = &Config{}
err = cfg.LoadJSON(newjson)
if err != nil {
t.Fatal(err)
}
}
func TestDefault(t *testing.T) {
cfg := &Config{}
cfg.Default()
if cfg.Validate() != nil {
t.Fatal("error validating")
}
cfg.UnpinningTimeout = 0
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}

View File

@ -18,19 +18,6 @@ import (
var logger = logging.Logger("pintracker")
// A Pin or Unpin operation will be considered failed
// if the Cid has stayed in Pinning or Unpinning state
// for longer than these values.
var (
PinningTimeout = 60 * time.Minute
UnpinningTimeout = 15 * time.Minute
)
// PinQueueSize specifies the maximum amount of pin operations waiting
// to be performed. If the queue is full, pins/unpins will be set to
// pinError/unpinError.
var PinQueueSize = 1024
var (
errUnpinningTimeout = errors.New("unpinning operation is taking too long")
errPinningTimeout = errors.New("pinning operation is taking too long")
@ -43,6 +30,7 @@ var (
type MapPinTracker struct {
mux sync.RWMutex
status map[string]api.PinInfo
config *Config
ctx context.Context
cancel func()
@ -61,17 +49,18 @@ type MapPinTracker struct {
// NewMapPinTracker returns a new object which has been correcly
// initialized with the given configuration.
func NewMapPinTracker(pid peer.ID) *MapPinTracker {
func NewMapPinTracker(cfg *Config, pid peer.ID) *MapPinTracker {
ctx, cancel := context.WithCancel(context.Background())
mpt := &MapPinTracker{
ctx: ctx,
cancel: cancel,
status: make(map[string]api.PinInfo),
config: cfg,
rpcReady: make(chan struct{}, 1),
peerID: pid,
pinCh: make(chan api.Pin, PinQueueSize),
unpinCh: make(chan api.Pin, PinQueueSize),
pinCh: make(chan api.Pin, cfg.MaxPinQueueSize),
unpinCh: make(chan api.Pin, cfg.MaxPinQueueSize),
}
go mpt.pinWorker()
go mpt.unpinWorker()
@ -374,7 +363,7 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI
case api.TrackerStatusPinning, api.TrackerStatusPinError:
mpt.set(c, api.TrackerStatusPinned)
case api.TrackerStatusUnpinning:
if time.Since(p.TS) > UnpinningTimeout {
if time.Since(p.TS) > mpt.config.UnpinningTimeout {
mpt.setError(c, errUnpinningTimeout)
}
case api.TrackerStatusUnpinned:
@ -388,7 +377,7 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI
mpt.setError(c, errUnpinned)
case api.TrackerStatusPinError: // nothing, keep error as it was
case api.TrackerStatusPinning:
if time.Since(p.TS) > PinningTimeout {
if time.Since(p.TS) > mpt.config.PinningTimeout {
mpt.setError(c, errPinningTimeout)
}
case api.TrackerStatusUnpinning, api.TrackerStatusUnpinError:

View File

@ -12,7 +12,9 @@ import (
)
func testMapPinTracker(t *testing.T) *MapPinTracker {
mpt := NewMapPinTracker(test.TestPeerID1)
cfg := &Config{}
cfg.Default()
mpt := NewMapPinTracker(cfg, test.TestPeerID1)
mpt.SetClient(test.NewMockRPCClient(t))
return mpt
}