Feat: introduce a ConnectionManager for the libp2p host

As follow up to #787, this uses the default libp2p connection manager for the
cluster libp2p host. The connection manager settings can be set in the main
configuration section (but it should be compatible with previous
configurations which have it unset).

This PR is just introducing the connection manager. Peer connection
protection etc will come in additional PRs.
This commit is contained in:
Hector Sanjuan 2019-05-23 00:34:47 +02:00
parent e523215ee2
commit ba5e423f58
6 changed files with 135 additions and 31 deletions

View File

@ -32,8 +32,18 @@ const (
DefaultLeaveOnShutdown = false
DefaultDisableRepinning = false
DefaultPeerstoreFile = "peerstore"
DefaultConnMgrHighWater = 400
DefaultConnMgrLowWater = 100
DefaultConnMgrGracePeriod = 2 * time.Minute
)
// ConnMgrConfig configures the libp2p host connection manager.
type ConnMgrConfig struct {
HighWater int
LowWater int
GracePeriod time.Duration
}
// Config is the configuration object containing customizable variables to
// initialize the main ipfs-cluster component. It implements the
// config.ComponentConfig interface.
@ -62,6 +72,10 @@ type Config struct {
// the RPC and Consensus components.
ListenAddr ma.Multiaddr
// ConnMgr holds configuration values for the connection manager
// for the libp2p host.
ConnMgr ConnMgrConfig
// Time between syncs of the consensus state to the
// tracker state. Normally states are synced anyway, but this helps
// when new nodes are joining the cluster. Reduce for faster
@ -123,20 +137,28 @@ type Config struct {
// saved using JSON. Most configuration keys are converted into simple types
// like strings, and key names aim to be self-explanatory for the user.
type configJSON struct {
ID string `json:"id,omitempty"`
Peername string `json:"peername"`
PrivateKey string `json:"private_key,omitempty"`
Secret string `json:"secret"`
LeaveOnShutdown bool `json:"leave_on_shutdown"`
ListenMultiaddress string `json:"listen_multiaddress"`
StateSyncInterval string `json:"state_sync_interval"`
IPFSSyncInterval string `json:"ipfs_sync_interval"`
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
MonitorPingInterval string `json:"monitor_ping_interval"`
PeerWatchInterval string `json:"peer_watch_interval"`
DisableRepinning bool `json:"disable_repinning"`
PeerstoreFile string `json:"peerstore_file,omitempty"`
ID string `json:"id,omitempty"`
Peername string `json:"peername"`
PrivateKey string `json:"private_key,omitempty"`
Secret string `json:"secret"`
LeaveOnShutdown bool `json:"leave_on_shutdown"`
ListenMultiaddress string `json:"listen_multiaddress"`
ConnectionManager *connMgrConfigJSON `json:"connection_manager"`
StateSyncInterval string `json:"state_sync_interval"`
IPFSSyncInterval string `json:"ipfs_sync_interval"`
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
MonitorPingInterval string `json:"monitor_ping_interval"`
PeerWatchInterval string `json:"peer_watch_interval"`
DisableRepinning bool `json:"disable_repinning"`
PeerstoreFile string `json:"peerstore_file,omitempty"`
}
// connMgrConfigJSON configures the libp2p host connection manager.
type connMgrConfigJSON struct {
HighWater int `json:"high_water"`
LowWater int `json:"low_water"`
GracePeriod string `json:"grace_period"`
}
// ConfigKey returns a human-readable string to identify
@ -185,6 +207,22 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.listen_multiaddress is undefined")
}
if cfg.ConnMgr.LowWater <= 0 {
return errors.New("cluster.connection_manager.low_water is invalid")
}
if cfg.ConnMgr.HighWater <= 0 {
return errors.New("cluster.connection_manager.high_water is invalid")
}
if cfg.ConnMgr.LowWater > cfg.ConnMgr.HighWater {
return errors.New("cluster.connection_manager.low_water is greater than high_water")
}
if cfg.ConnMgr.GracePeriod == 0 {
return errors.New("cluster.connection_manager.grace_period is invalid")
}
if cfg.StateSyncInterval <= 0 {
return errors.New("cluster.state_sync_interval is invalid")
}
@ -273,6 +311,11 @@ func (cfg *Config) setDefaults() {
addr, _ := ma.NewMultiaddr(DefaultListenAddr)
cfg.ListenAddr = addr
cfg.ConnMgr = ConnMgrConfig{
HighWater: DefaultConnMgrHighWater,
LowWater: DefaultConnMgrLowWater,
GracePeriod: DefaultConnMgrGracePeriod,
}
cfg.LeaveOnShutdown = DefaultLeaveOnShutdown
cfg.StateSyncInterval = DefaultStateSyncInterval
cfg.IPFSSyncInterval = DefaultIPFSSyncInterval
@ -320,6 +363,19 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
}
cfg.ListenAddr = clusterAddr
if conman := jcfg.ConnectionManager; conman != nil {
cfg.ConnMgr = ConnMgrConfig{
HighWater: jcfg.ConnectionManager.HighWater,
LowWater: jcfg.ConnectionManager.LowWater,
}
err = config.ParseDurations("cluster",
&config.DurationOpt{Duration: jcfg.ConnectionManager.GracePeriod, Dst: &cfg.ConnMgr.GracePeriod, Name: "connection_manager.grace_period"},
)
if err != nil {
return err
}
}
rplMin := jcfg.ReplicationFactorMin
rplMax := jcfg.ReplicationFactorMax
config.SetIfNotDefault(rplMin, &cfg.ReplicationFactorMin)
@ -369,6 +425,11 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax
jcfg.LeaveOnShutdown = cfg.LeaveOnShutdown
jcfg.ListenMultiaddress = cfg.ListenAddr.String()
jcfg.ConnectionManager = &connMgrConfigJSON{
HighWater: cfg.ConnMgr.HighWater,
LowWater: cfg.ConnMgr.LowWater,
GracePeriod: cfg.ConnMgr.GracePeriod.String(),
}
jcfg.StateSyncInterval = cfg.StateSyncInterval.String()
jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String()
jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String()

View File

@ -4,15 +4,19 @@ import (
"encoding/json"
"os"
"testing"
"time"
)
var ccfgTestJSON = []byte(`
{
"id": "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA",
"peername": "testpeer",
"private_key": "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4",
"secret": "2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed",
"leave_on_shutdown": true,
"connection_manager": {
"high_water": 501,
"low_water": 500,
"grace_period": "100m0s"
},
"listen_multiaddress": "/ip4/127.0.0.1/tcp/10000",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
@ -24,13 +28,13 @@ var ccfgTestJSON = []byte(`
`)
func TestLoadJSON(t *testing.T) {
loadJSON := func(t *testing.T) (*Config, error) {
loadJSON := func(t *testing.T) *Config {
cfg := &Config{}
err := cfg.LoadJSON(ccfgTestJSON)
if err != nil {
return cfg, err
t.Fatal(err)
}
return cfg, nil
return cfg
}
t.Run("basic", func(t *testing.T) {
@ -42,35 +46,39 @@ func TestLoadJSON(t *testing.T) {
})
t.Run("peername", func(t *testing.T) {
cfg, err := loadJSON(t)
if err != nil {
t.Error(err)
}
cfg := loadJSON(t)
if cfg.Peername != "testpeer" {
t.Error("expected peername 'testpeer'")
}
})
t.Run("expected replication factor", func(t *testing.T) {
cfg, err := loadJSON(t)
if err != nil {
t.Error(err)
}
cfg := loadJSON(t)
if cfg.ReplicationFactorMin != 5 {
t.Error("expected replication factor min == 5")
}
})
t.Run("expected disable_repinning", func(t *testing.T) {
cfg, err := loadJSON(t)
if err != nil {
t.Error(err)
}
cfg := loadJSON(t)
if !cfg.DisableRepinning {
t.Error("expected disable_repinning to be true")
}
})
t.Run("expected connection_manager", func(t *testing.T) {
cfg := loadJSON(t)
if cfg.ConnMgr.LowWater != 500 {
t.Error("expected low_water to be 500")
}
if cfg.ConnMgr.HighWater != 501 {
t.Error("expected high_water to be 501")
}
if cfg.ConnMgr.GracePeriod != 100*time.Minute {
t.Error("expected grace_period to be 100m")
}
})
loadJSON2 := func(t *testing.T, f func(j *configJSON)) (*Config, error) {
cfg := &Config{}
j := &configJSON{}
@ -162,6 +170,21 @@ func TestLoadJSON(t *testing.T) {
t.Error("expected default replication factors")
}
})
t.Run("conn manager default", func(t *testing.T) {
cfg, err := loadJSON2(
t,
func(j *configJSON) {
j.ConnectionManager = nil
},
)
if err != nil {
t.Fatal(err)
}
if cfg.ConnMgr.LowWater != DefaultConnMgrLowWater {
t.Error("default conn manager values not set")
}
})
}
func TestToJSON(t *testing.T) {
@ -217,4 +240,10 @@ func TestValidate(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.ConnMgr.GracePeriod = 0
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/ipfs/ipfs-cluster/config"
libp2p "github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
@ -25,12 +26,15 @@ func NewClusterHost(
cfg *Config,
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) {
connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)
h, err := newHost(
ctx,
cfg.Secret,
ident.PrivateKey,
libp2p.ListenAddrs(cfg.ListenAddr),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connman),
)
if err != nil {
return nil, nil, nil, err

View File

@ -26,6 +26,11 @@ var testingClusterCfg = []byte(`{
"secret": "2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed",
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/127.0.0.1/tcp/10000",
"connection_manager": {
"high_water": 400,
"low_water": 200,
"grace_period": "2m0s"
},
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,

1
go.mod
View File

@ -41,6 +41,7 @@ require (
github.com/kelseyhightower/envconfig v1.3.0
github.com/lanzafame/go-libp2p-ocgorpc v0.0.3
github.com/libp2p/go-libp2p v0.0.25
github.com/libp2p/go-libp2p-connmgr v0.0.5
github.com/libp2p/go-libp2p-consensus v0.0.1
github.com/libp2p/go-libp2p-crypto v0.0.2
github.com/libp2p/go-libp2p-gorpc v0.0.3

4
go.sum
View File

@ -342,6 +342,8 @@ github.com/libp2p/go-libp2p-blankhost v0.0.1/go.mod h1:Ibpbw/7cPPYwFb7PACIWdvxxv
github.com/libp2p/go-libp2p-circuit v0.0.1/go.mod h1:Dqm0s/BiV63j8EEAs8hr1H5HudqvCAeXxDyic59lCwE=
github.com/libp2p/go-libp2p-circuit v0.0.6 h1:egD2CKFVdqnHgIHzPkM6J7m3MKZpFqoTPDfxBqQ7kRQ=
github.com/libp2p/go-libp2p-circuit v0.0.6/go.mod h1:W34ISBRpoCPUeOR26xzTbLo+s3hDO9153hJCfvHzBlg=
github.com/libp2p/go-libp2p-connmgr v0.0.5 h1:EzgFolZ1RHUiuRj22zZRcGu8TJuBkfjeuH9TazbcmP4=
github.com/libp2p/go-libp2p-connmgr v0.0.5/go.mod h1:uwDfgdgqB5248sQYib1xo603cSsMg9PgAKu0Z+Y65Qk=
github.com/libp2p/go-libp2p-consensus v0.0.1 h1:jcVbHRZLwTXU9iT/mPi+Lx4/OrIzq3bU1TbZNhYFCV8=
github.com/libp2p/go-libp2p-consensus v0.0.1/go.mod h1:+9Wrfhc5QOqWB0gXI0m6ARlkHfdJpcFXmRU0WoHz4Mo=
github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw=
@ -364,6 +366,8 @@ github.com/libp2p/go-libp2p-interface-connmgr v0.0.1 h1:Q9EkNSLAOF+u90L88qmE9z/f
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4 h1:/LngXETpII5qOD7YjAcQiIxhVtdAk/NQe5t9sC6BR0E=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5 h1:KG/KNYL2tYzXAfMvQN5K1aAGTYSYUMJ1prgYa2/JI1E=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1 h1:7GnzRrBTJHEsofi1ahFdPN9Si6skwXQE9UqR2S+Pkh8=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k=
github.com/libp2p/go-libp2p-kad-dht v0.0.11 h1:3s6Me8i0vmuQM++HmVgRb9dC6y33/jmcxKPMExx7oJg=