cluster: new dial_peer_timeout option (3 seconds)

This commits allows to set the default libp2p DialPeerTimeout.

By default it is one minute, which means that a broadcast operation to a
non-responsive peer may take just that, and will block until the failure
happens.

Fixes #1350.
This commit is contained in:
Hector Sanjuan 2021-05-03 17:16:35 +02:00
parent adff9a744e
commit 8cd9e7928d
3 changed files with 24 additions and 5 deletions

View File

@ -42,6 +42,7 @@ const (
DefaultConnMgrHighWater = 400
DefaultConnMgrLowWater = 100
DefaultConnMgrGracePeriod = 2 * time.Minute
DefaultDialPeerTimeout = 3 * time.Second
DefaultFollowerMode = false
DefaultMDNSInterval = 10 * time.Second
)
@ -88,6 +89,10 @@ type Config struct {
// FIXME: This only applies to ipfs-cluster-service.
ConnMgr ConnMgrConfig
// Sets the default dial timeout for libp2p connections to other
// peers.
DialPeerTimeout time.Duration
// Time between syncs of the consensus state to the
// tracker state. Normally states are synced anyway, but this helps
// when new nodes are joining the cluster. Reduce for faster
@ -168,6 +173,7 @@ type configJSON struct {
ListenMultiaddress ipfsconfig.Strings `json:"listen_multiaddress"`
EnableRelayHop bool `json:"enable_relay_hop"`
ConnectionManager *connMgrConfigJSON `json:"connection_manager"`
DialPeerTimeout string `json:"dial_peer_timeout"`
StateSyncInterval string `json:"state_sync_interval"`
PinRecoverInterval string `json:"pin_recover_interval"`
ReplicationFactorMin int `json:"replication_factor_min"`
@ -256,6 +262,10 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.connection_manager.grace_period is invalid")
}
if cfg.DialPeerTimeout <= 0 {
return errors.New("cluster.dial_peer_timeout is invalid")
}
if cfg.StateSyncInterval <= 0 {
return errors.New("cluster.state_sync_interval is invalid")
}
@ -354,6 +364,7 @@ func (cfg *Config) setDefaults() {
LowWater: DefaultConnMgrLowWater,
GracePeriod: DefaultConnMgrGracePeriod,
}
cfg.DialPeerTimeout = DefaultDialPeerTimeout
cfg.LeaveOnShutdown = DefaultLeaveOnShutdown
cfg.StateSyncInterval = DefaultStateSyncInterval
cfg.PinRecoverInterval = DefaultPinRecoverInterval
@ -428,6 +439,7 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
config.SetIfNotDefault(rplMax, &cfg.ReplicationFactorMax)
err = config.ParseDurations("cluster",
&config.DurationOpt{Duration: jcfg.DialPeerTimeout, Dst: &cfg.DialPeerTimeout, Name: "dial_peer_timeout"},
&config.DurationOpt{Duration: jcfg.StateSyncInterval, Dst: &cfg.StateSyncInterval, Name: "state_sync_interval"},
&config.DurationOpt{Duration: jcfg.PinRecoverInterval, Dst: &cfg.PinRecoverInterval, Name: "pin_recover_interval"},
&config.DurationOpt{Duration: jcfg.MonitorPingInterval, Dst: &cfg.MonitorPingInterval, Name: "monitor_ping_interval"},
@ -494,6 +506,7 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
LowWater: cfg.ConnMgr.LowWater,
GracePeriod: cfg.ConnMgr.GracePeriod.String(),
}
jcfg.DialPeerTimeout = cfg.DialPeerTimeout.String()
jcfg.StateSyncInterval = cfg.StateSyncInterval.String()
jcfg.PinRecoverInterval = cfg.PinRecoverInterval.String()
jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String()

View File

@ -4,15 +4,16 @@ import (
"context"
"encoding/hex"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
ipns "github.com/ipfs/go-ipns"
"github.com/ipfs/ipfs-cluster/config"
config "github.com/ipfs/ipfs-cluster/config"
libp2p "github.com/libp2p/go-libp2p"
relay "github.com/libp2p/go-libp2p-circuit"
connmgr "github.com/libp2p/go-libp2p-connmgr"
crypto "github.com/libp2p/go-libp2p-core/crypto"
host "github.com/libp2p/go-libp2p-core/host"
network "github.com/libp2p/go-libp2p-core/network"
corepnet "github.com/libp2p/go-libp2p-core/pnet"
routing "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
@ -52,6 +53,12 @@ func NewClusterHost(
ds datastore.Datastore,
) (host.Host, *pubsub.PubSub, *dual.DHT, error) {
// Set the default dial timeout for all libp2p connections. It is not
// very good to touch this global variable here, but the alternative
// is to used a modify context everywhere, even if the user supplies
// it.
network.DialPeerTimeout = cfg.DialPeerTimeout
connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)
relayOpts := []relay.RelayOpt{}

View File

@ -104,8 +104,7 @@ func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool, ttl time.Duration
if connect {
go func() {
ctx, cancel := context.WithTimeout(pm.ctx, ConnectTimeout)
defer cancel()
ctx := net.WithDialPeerTimeout(pm.ctx, ConnectTimeout)
pm.host.Connect(ctx, *pinfo)
}()
}