ipfs-cluster/peer_manager.go

174 lines
3.7 KiB
Go
Raw Normal View History

package ipfscluster
import (
"sync"
"time"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
// peerManager is our own local peerstore
type peerManager struct {
cluster *Cluster
ps peerstore.Peerstore
self peer.ID
peermap map[peer.ID]ma.Multiaddr
m sync.RWMutex
}
func newPeerManager(c *Cluster) *peerManager {
pm := &peerManager{
cluster: c,
ps: c.host.Peerstore(),
self: c.host.ID(),
}
pm.resetPeers()
return pm
}
func (pm *peerManager) addPeer(addr ma.Multiaddr, save bool) error {
logger.Debugf("adding peer %s", addr)
pid, decapAddr, err := multiaddrSplit(addr)
if err != nil {
return err
}
pm.ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)
// Only log these when we are not starting cluster (rpcClient == nil)
// They pollute the start up logs redundantly.
if !pm.isPeer(pid) && pm.cluster.rpcClient != nil {
logger.Infof("new peer: %s", addr.String())
}
pm.m.Lock()
pm.peermap[pid] = addr
pm.m.Unlock()
if save {
pm.savePeers()
}
logger.Debugf("peers after adding %s", pm.peersAddrs())
return nil
}
func (pm *peerManager) rmPeer(pid peer.ID, save bool) error {
logger.Debugf("removing peer %s", pid.Pretty())
// Seeing our own departure during bootstrap. Ignore that.
if pid == pm.self && !pm.cluster.readyB {
return nil
}
// remove ourselves, unless:
// - we are not ready yet (means we are boostrapping)
// - we have been removed (means Shutdown() with LeaveOnShutdown flag)
if pid == pm.self && pm.cluster.readyB && !pm.cluster.removed {
logger.Info("this peer has been removed and will shutdown")
pm.cluster.removed = true
// we are removing ourselves. Therefore we need to:
// - convert cluster peers to bootstrapping peers
// - shut ourselves down if we are not in the process
//
// Note that, if we are here, we have already been
// removed from the raft.
// save peers as boostrappers
pm.cluster.config.Bootstrap = pm.peersAddrs()
pm.resetPeers()
pm.savePeers()
time.Sleep(1 * time.Second)
// should block and do nothing if already doing it
pm.cluster.Shutdown()
return nil
}
// Removing a different peer
if pm.isPeer(pid) {
logger.Infof("removing Cluster peer %s", pid.Pretty())
}
pm.m.Lock()
delete(pm.peermap, pid)
pm.m.Unlock()
if save {
pm.savePeers()
}
return nil
}
func (pm *peerManager) savePeers() {
peers := pm.peersAddrs()
logger.Debugf("saving peers: %s", peers)
pm.cluster.config.Peers = peers
Issue #162: Rework configuration format The following commit reimplements ipfs-cluster configuration under the following premises: * Each component is initialized with a configuration object defined by its module * Each component decides how the JSON representation of its configuration looks like * Each component parses and validates its own configuration * Each component exposes its own defaults * Component configurations are make the sections of a central JSON configuration file (which replaces the current JSON format) * Component configurations implement a common interface (config.ComponentConfig) with a set of common operations * The central configuration file is managed by a config.ConfigManager which: * Registers ComponentConfigs * Assigns the correspondent sections from the JSON file to each component and delegates the parsing * Delegates the JSON generation for each section * Can be notified when the configuration is updated and must be saved to disk The new service.json would then look as follows: ```json { "cluster": { "id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2", "private_key": "<...>", "secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786", "peers": [], "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, "consensus": { "raft": { "heartbeat_timeout": "1s", "election_timeout": "1s", "commit_timeout": "50ms", "max_append_entries": 64, "trailing_logs": 10240, "snapshot_interval": "2m0s", "snapshot_threshold": 8192, "leader_lease_timeout": "500ms" } }, "api": { "restapi": { "listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "read_timeout": "30s", "read_header_timeout": "5s", "write_timeout": "1m0s", "idle_timeout": "2m0s" } }, "ipfs_connector": { "ipfshttp": { "proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "connect_swarms_delay": "7s", "proxy_read_timeout": "10m0s", "proxy_read_header_timeout": "5s", "proxy_write_timeout": "10m0s", "proxy_idle_timeout": "1m0s" } }, "monitor": { "monbasic": { "check_interval": "15s" } }, "informer": { "disk": { "metric_ttl": "30s", "metric_type": "freespace" }, "numpin": { "metric_ttl": "10s" } } } ``` This new format aims to be easily extensible per component. As such, it already surfaces quite a few new options which were hardcoded before. Additionally, since Go API have changed, some redundant methods have been removed and small refactoring has happened to take advantage of the new way. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
pm.cluster.config.NotifySave()
}
func (pm *peerManager) resetPeers() {
pm.m.Lock()
pm.peermap = make(map[peer.ID]ma.Multiaddr)
Issue #162: Rework configuration format The following commit reimplements ipfs-cluster configuration under the following premises: * Each component is initialized with a configuration object defined by its module * Each component decides how the JSON representation of its configuration looks like * Each component parses and validates its own configuration * Each component exposes its own defaults * Component configurations are make the sections of a central JSON configuration file (which replaces the current JSON format) * Component configurations implement a common interface (config.ComponentConfig) with a set of common operations * The central configuration file is managed by a config.ConfigManager which: * Registers ComponentConfigs * Assigns the correspondent sections from the JSON file to each component and delegates the parsing * Delegates the JSON generation for each section * Can be notified when the configuration is updated and must be saved to disk The new service.json would then look as follows: ```json { "cluster": { "id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2", "private_key": "<...>", "secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786", "peers": [], "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, "consensus": { "raft": { "heartbeat_timeout": "1s", "election_timeout": "1s", "commit_timeout": "50ms", "max_append_entries": 64, "trailing_logs": 10240, "snapshot_interval": "2m0s", "snapshot_threshold": 8192, "leader_lease_timeout": "500ms" } }, "api": { "restapi": { "listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "read_timeout": "30s", "read_header_timeout": "5s", "write_timeout": "1m0s", "idle_timeout": "2m0s" } }, "ipfs_connector": { "ipfshttp": { "proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "connect_swarms_delay": "7s", "proxy_read_timeout": "10m0s", "proxy_read_header_timeout": "5s", "proxy_write_timeout": "10m0s", "proxy_idle_timeout": "1m0s" } }, "monitor": { "monbasic": { "check_interval": "15s" } }, "informer": { "disk": { "metric_ttl": "30s", "metric_type": "freespace" }, "numpin": { "metric_ttl": "10s" } } } ``` This new format aims to be easily extensible per component. As such, it already surfaces quite a few new options which were hardcoded before. Additionally, since Go API have changed, some redundant methods have been removed and small refactoring has happened to take advantage of the new way. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
pm.peermap[pm.self] = pm.cluster.config.ListenAddr
pm.m.Unlock()
}
func (pm *peerManager) isPeer(p peer.ID) bool {
if p == pm.self {
return true
}
pm.m.RLock()
_, ok := pm.peermap[p]
pm.m.RUnlock()
return ok
}
// peers including ourselves
func (pm *peerManager) peers() []peer.ID {
pm.m.RLock()
defer pm.m.RUnlock()
var peers []peer.ID
for k := range pm.peermap {
peers = append(peers, k)
}
return peers
}
// cluster peer addresses (NOT including ourselves)
func (pm *peerManager) peersAddrs() []ma.Multiaddr {
pm.m.RLock()
defer pm.m.RUnlock()
addrs := []ma.Multiaddr{}
for k, addr := range pm.peermap {
if k != pm.self {
addrs = append(addrs, addr)
}
}
return addrs
}
// func (pm *peerManager) addFromConfig(cfg *Config) error {
// return pm.setFromMultiaddrs(cfg.ClusterPeers)
// }
// this resets peers!
func (pm *peerManager) setFromMultiaddrs(addrs []ma.Multiaddr, save bool) error {
pm.resetPeers()
for _, m := range addrs {
err := pm.addPeer(m, false)
if err != nil {
logger.Error(err)
return err
}
}
if save {
pm.savePeers()
}
return nil
}