2016-12-02 18:33:39 +00:00
package ipfscluster
import (
"context"
2017-02-13 15:46:53 +00:00
"errors"
2017-06-30 00:40:06 +00:00
"strings"
2016-12-15 13:07:19 +00:00
"sync"
2017-01-23 19:29:05 +00:00
"time"
2016-12-02 18:33:39 +00:00
2017-07-20 11:18:46 +00:00
pnet "github.com/libp2p/go-libp2p-pnet"
2017-02-08 17:04:08 +00:00
"github.com/ipfs/ipfs-cluster/api"
2017-03-10 16:24:25 +00:00
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/state"
2017-02-08 17:04:08 +00:00
2017-01-25 11:14:39 +00:00
rpc "github.com/hsanjuan/go-libp2p-gorpc"
2016-12-16 21:00:08 +00:00
cid "github.com/ipfs/go-cid"
2016-12-16 11:40:28 +00:00
host "github.com/libp2p/go-libp2p-host"
2017-06-30 00:40:06 +00:00
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
2016-12-16 11:40:28 +00:00
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
2017-01-24 15:19:23 +00:00
ma "github.com/multiformats/go-multiaddr"
2016-12-02 18:33:39 +00:00
)
// Cluster is the main IPFS cluster component. It provides
2017-01-24 11:39:08 +00:00
// the go-API for it and orchestrates the components that make up the system.
2016-12-02 18:33:39 +00:00
type Cluster struct {
2017-02-09 15:29:17 +00:00
ctx context . Context
cancel func ( )
2016-12-02 18:33:39 +00:00
2017-02-02 22:52:06 +00:00
id peer . ID
2017-01-30 12:12:25 +00:00
config * Config
host host . Host
rpcServer * rpc . Server
rpcClient * rpc . Client
peerManager * peerManager
2016-12-02 18:33:39 +00:00
2017-03-14 15:37:29 +00:00
consensus Consensus
2016-12-15 18:08:46 +00:00
api API
2016-12-02 18:33:39 +00:00
ipfs IPFSConnector
2017-03-10 16:24:25 +00:00
state state . State
2016-12-06 21:29:59 +00:00
tracker PinTracker
2017-02-13 15:46:53 +00:00
monitor PeerMonitor
allocator PinAllocator
informer Informer
2016-12-15 13:07:19 +00:00
shutdownLock sync . Mutex
2017-10-31 10:20:14 +00:00
shutdownB bool
2017-11-01 12:25:28 +00:00
removed bool
2017-01-30 12:12:25 +00:00
doneCh chan struct { }
readyCh chan struct { }
2017-10-30 11:17:39 +00:00
readyB bool
2016-12-15 13:07:19 +00:00
wg sync . WaitGroup
2017-02-02 22:52:06 +00:00
2017-11-08 19:04:04 +00:00
// paMux sync.Mutex
2016-12-02 18:33:39 +00:00
}
2017-01-30 12:12:25 +00:00
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
// creates and RPC Server and client and sets up all components.
//
// The new cluster peer may still be performing initialization tasks when
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up.
2017-02-13 15:46:53 +00:00
func NewCluster (
cfg * Config ,
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
consensusCfg * raft . Config ,
2017-02-13 15:46:53 +00:00
api API ,
ipfs IPFSConnector ,
2017-03-10 16:24:25 +00:00
st state . State ,
2017-02-13 15:46:53 +00:00
tracker PinTracker ,
monitor PeerMonitor ,
allocator PinAllocator ,
informer Informer ) ( * Cluster , error ) {
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
err := cfg . Validate ( )
if err != nil {
return nil , err
}
2017-02-09 15:29:17 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2016-12-02 18:33:39 +00:00
host , err := makeHost ( ctx , cfg )
if err != nil {
2017-02-15 14:16:34 +00:00
cancel ( )
2016-12-02 18:33:39 +00:00
return nil , err
}
2017-12-12 16:47:21 +00:00
logger . Infof ( "IPFS Cluster v%s-%s listening on:" , Version , Commit [ 0 : 8 ] )
2017-01-30 12:12:25 +00:00
for _ , addr := range host . Addrs ( ) {
logger . Infof ( " %s/ipfs/%s" , addr , host . ID ( ) . Pretty ( ) )
2016-12-02 18:33:39 +00:00
}
2017-11-08 19:04:04 +00:00
peerManager := newPeerManager ( host )
peerManager . importAddresses ( cfg . Peers )
peerManager . importAddresses ( cfg . Bootstrap )
2017-02-02 22:52:06 +00:00
c := & Cluster {
2017-11-08 19:04:04 +00:00
ctx : ctx ,
cancel : cancel ,
id : host . ID ( ) ,
config : cfg ,
host : host ,
api : api ,
ipfs : ipfs ,
state : st ,
tracker : tracker ,
monitor : monitor ,
allocator : allocator ,
informer : informer ,
peerManager : peerManager ,
shutdownB : false ,
removed : false ,
doneCh : make ( chan struct { } ) ,
readyCh : make ( chan struct { } ) ,
readyB : false ,
}
2017-02-02 22:52:06 +00:00
err = c . setupRPC ( )
2016-12-23 18:35:37 +00:00
if err != nil {
2017-02-02 22:52:06 +00:00
c . Shutdown ( )
2016-12-23 18:35:37 +00:00
return nil , err
}
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
err = c . setupConsensus ( consensusCfg )
2017-01-30 12:12:25 +00:00
if err != nil {
2017-02-02 22:52:06 +00:00
c . Shutdown ( )
2017-01-30 12:12:25 +00:00
return nil , err
}
2017-02-02 22:52:06 +00:00
c . setupRPCClients ( )
2017-02-13 15:46:53 +00:00
ok := c . bootstrap ( )
if ! ok {
logger . Error ( "Bootstrap unsuccessful" )
c . Shutdown ( )
return nil , errors . New ( "bootstrap unsuccessful" )
}
go func ( ) {
c . ready ( )
c . run ( )
} ( )
2017-02-02 22:52:06 +00:00
return c , nil
}
2017-01-30 12:12:25 +00:00
2017-02-02 22:52:06 +00:00
func ( c * Cluster ) setupRPC ( ) error {
rpcServer := rpc . NewServer ( c . host , RPCProtocol )
2017-02-08 17:04:08 +00:00
err := rpcServer . RegisterName ( "Cluster" , & RPCAPI { c } )
2017-02-02 22:52:06 +00:00
if err != nil {
return err
}
c . rpcServer = rpcServer
rpcClient := rpc . NewClientWithServer ( c . host , RPCProtocol , rpcServer )
c . rpcClient = rpcClient
return nil
}
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
func ( c * Cluster ) setupConsensus ( consensuscfg * raft . Config ) error {
2017-02-02 22:52:06 +00:00
var startPeers [ ] peer . ID
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
if len ( c . config . Peers ) > 0 {
startPeers = peersFromMultiaddrs ( c . config . Peers )
2017-02-02 22:52:06 +00:00
} else {
2017-10-23 11:46:37 +00:00
// start as single cluster before being added
// to the bootstrapper peers' cluster.
startPeers = [ ] peer . ID { }
2017-02-02 22:52:06 +00:00
}
2017-03-10 16:24:25 +00:00
consensus , err := raft . NewConsensus (
2017-02-15 12:40:08 +00:00
append ( startPeers , c . id ) ,
2017-02-02 22:52:06 +00:00
c . host ,
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
consensuscfg ,
2017-02-02 22:52:06 +00:00
c . state )
2017-01-30 12:12:25 +00:00
if err != nil {
logger . Errorf ( "error creating consensus: %s" , err )
2017-02-02 22:52:06 +00:00
return err
2017-01-30 12:12:25 +00:00
}
2017-02-02 22:52:06 +00:00
c . consensus = consensus
return nil
}
2017-01-30 12:12:25 +00:00
2017-02-02 22:52:06 +00:00
func ( c * Cluster ) setupRPCClients ( ) {
c . tracker . SetClient ( c . rpcClient )
c . ipfs . SetClient ( c . rpcClient )
c . api . SetClient ( c . rpcClient )
c . consensus . SetClient ( c . rpcClient )
2017-02-13 15:46:53 +00:00
c . monitor . SetClient ( c . rpcClient )
c . allocator . SetClient ( c . rpcClient )
c . informer . SetClient ( c . rpcClient )
2017-02-02 22:52:06 +00:00
}
2016-12-09 19:54:46 +00:00
2017-04-05 21:29:22 +00:00
// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time
func ( c * Cluster ) syncWatcher ( ) {
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
stateSyncTicker := time . NewTicker ( c . config . StateSyncInterval )
syncTicker := time . NewTicker ( c . config . IPFSSyncInterval )
2017-04-05 21:29:22 +00:00
2017-02-02 22:52:06 +00:00
for {
select {
case <- stateSyncTicker . C :
2017-04-05 21:29:22 +00:00
logger . Debug ( "auto-triggering StateSync()" )
2017-02-02 22:52:06 +00:00
c . StateSync ( )
2017-04-05 21:29:22 +00:00
case <- syncTicker . C :
logger . Debug ( "auto-triggering SyncAllLocal()" )
c . SyncAllLocal ( )
2017-02-02 22:52:06 +00:00
case <- c . ctx . Done ( ) :
stateSyncTicker . Stop ( )
return
}
}
}
2017-02-28 15:01:26 +00:00
func ( c * Cluster ) broadcastMetric ( m api . Metric ) error {
2017-11-08 19:04:04 +00:00
peers , err := c . consensus . Peers ( )
if err != nil {
logger . Error ( err )
return err
}
2017-03-13 14:55:52 +00:00
leader , err := c . consensus . Leader ( )
if err != nil {
return err
}
2017-11-08 19:04:04 +00:00
if m . Discard ( ) {
logger . Warningf ( "discarding invalid metric: %+v" , m )
return nil
}
2017-07-20 11:18:46 +00:00
// If a peer is down, the rpc call will get locked. Therefore,
// we need to do it async. This way we keep broadcasting
// even if someone is down. Eventually those requests will
// timeout in libp2p and the errors logged.
go func ( ) {
if leader == c . id {
// Leader needs to broadcast its metric to everyone
// in case it goes down (new leader will have to detect this node went down)
logger . Debugf ( "Leader %s about to broadcast metric %s to %s. Expires: %s" , c . id , m . Name , peers , m . Expire )
errs := c . multiRPC ( peers ,
"Cluster" ,
"PeerMonitorLogMetric" ,
m ,
copyEmptyStructToIfaces ( make ( [ ] struct { } , len ( peers ) , len ( peers ) ) ) )
for i , e := range errs {
if e != nil {
logger . Errorf ( "error pushing metric to %s: %s" , peers [ i ] . Pretty ( ) , e )
}
2017-03-13 14:55:52 +00:00
}
2017-07-20 11:18:46 +00:00
logger . Debugf ( "Leader %s broadcasted metric %s to %s. Expires: %s" , c . id , m . Name , peers , m . Expire )
} else {
// non-leaders just need to forward their metrics to the leader
logger . Debugf ( "Peer %s about to send metric %s to %s. Expires: %s" , c . id , m . Name , leader , m . Expire )
err := c . rpcClient . Call ( leader ,
"Cluster" , "PeerMonitorLogMetric" ,
m , & struct { } { } )
if err != nil {
logger . Error ( err )
}
logger . Debugf ( "Peer %s sent metric %s to %s. Expires: %s" , c . id , m . Name , leader , m . Expire )
2017-02-28 15:01:26 +00:00
2017-07-20 11:18:46 +00:00
}
} ( )
2017-02-28 15:01:26 +00:00
return nil
}
2017-02-13 15:46:53 +00:00
// push metrics loops and pushes metrics to the leader's monitor
func ( c * Cluster ) pushInformerMetrics ( ) {
timer := time . NewTimer ( 0 ) // fire immediately first
2017-10-23 11:46:37 +00:00
// The following control how often to make and log
// a retry
retries := 0
retryDelay := 500 * time . Millisecond
retryWarnMod := 60
2017-02-13 15:46:53 +00:00
for {
select {
case <- c . ctx . Done ( ) :
return
case <- timer . C :
// wait
}
2017-02-28 15:01:26 +00:00
metric := c . informer . GetMetric ( )
metric . Peer = c . id
err := c . broadcastMetric ( metric )
2017-02-13 15:46:53 +00:00
if err != nil {
2017-10-23 11:46:37 +00:00
if ( retries % retryWarnMod ) == 0 {
logger . Errorf ( "error broadcasting metric: %s" , err )
retries ++
}
// retry in retryDelay
timer . Reset ( retryDelay )
2017-02-13 15:46:53 +00:00
continue
}
2017-10-23 11:46:37 +00:00
retries = 0
// send metric again in TTL/2
2017-02-28 15:01:26 +00:00
timer . Reset ( metric . GetTTL ( ) / 2 )
}
}
2017-02-13 15:46:53 +00:00
2017-02-28 15:01:26 +00:00
func ( c * Cluster ) pushPingMetrics ( ) {
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
ticker := time . NewTicker ( c . config . MonitorPingInterval )
2017-02-28 15:01:26 +00:00
for {
metric := api . Metric {
Name : "ping" ,
Peer : c . id ,
Valid : true ,
2017-02-13 15:46:53 +00:00
}
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
metric . SetTTLDuration ( c . config . MonitorPingInterval * 2 )
2017-02-28 15:01:26 +00:00
c . broadcastMetric ( metric )
2017-02-13 15:46:53 +00:00
2017-02-28 15:01:26 +00:00
select {
case <- c . ctx . Done ( ) :
return
case <- ticker . C :
}
}
}
2017-02-13 15:46:53 +00:00
2017-02-28 15:01:26 +00:00
// read the alerts channel from the monitor and triggers repins
func ( c * Cluster ) alertsHandler ( ) {
for {
select {
case <- c . ctx . Done ( ) :
return
case alrt := <- c . monitor . Alerts ( ) :
2017-03-13 14:55:52 +00:00
// only the leader handles alerts
leader , err := c . consensus . Leader ( )
if err == nil && leader == c . id {
2017-07-20 11:18:46 +00:00
logger . Warningf ( "Peer %s received alert for %s in %s" , c . id , alrt . MetricName , alrt . Peer . Pretty ( ) )
2017-02-28 15:01:26 +00:00
switch alrt . MetricName {
case "ping" :
c . repinFromPeer ( alrt . Peer )
}
}
}
}
}
2017-11-08 19:04:04 +00:00
// detects any changes in the peerset and saves the configuration. When it
// detects that we have been removed from the peerset, it shuts down this peer.
func ( c * Cluster ) watchPeers ( ) {
// TODO: Config option?
ticker := time . NewTicker ( 5 * time . Second )
2017-11-15 16:52:27 +00:00
lastPeers := peersFromMultiaddrs ( c . config . Peers )
2017-11-08 19:04:04 +00:00
for {
select {
case <- c . ctx . Done ( ) :
return
case <- ticker . C :
logger . Debugf ( "%s watching peers" , c . id )
save := false
hasMe := false
peers , err := c . consensus . Peers ( )
if err != nil {
logger . Error ( err )
continue
}
for _ , p := range peers {
if p == c . id {
hasMe = true
break
}
}
if len ( peers ) != len ( lastPeers ) {
save = true
} else {
2017-11-14 22:54:23 +00:00
added , removed := diffPeers ( lastPeers , peers )
if len ( added ) != 0 || len ( removed ) != 0 {
save = true
2017-11-08 19:04:04 +00:00
}
}
lastPeers = peers
if ! hasMe {
2017-11-15 01:33:46 +00:00
logger . Infof ( "%s: removed from raft. Initiating shutdown" , c . id . Pretty ( ) )
2017-11-08 19:04:04 +00:00
c . removed = true
c . config . Bootstrap = c . peerManager . addresses ( peers )
c . config . savePeers ( [ ] ma . Multiaddr { } )
go c . Shutdown ( )
return
}
if save {
logger . Info ( "peerset change detected" )
c . config . savePeers ( c . peerManager . addresses ( peers ) )
}
}
}
}
2017-07-03 15:45:22 +00:00
// find all Cids pinned to a given peer and triggers re-pins on them.
2017-02-28 15:01:26 +00:00
func ( c * Cluster ) repinFromPeer ( p peer . ID ) {
cState , err := c . consensus . State ( )
if err != nil {
logger . Warning ( err )
return
}
list := cState . List ( )
2017-03-08 17:28:43 +00:00
for _ , pin := range list {
2017-07-05 14:28:52 +00:00
if containsPeer ( pin . Allocations , p ) {
logger . Infof ( "repinning %s out of %s" , pin . Cid , p . Pretty ( ) )
c . pin ( pin , [ ] peer . ID { p } ) // pin blacklisting this peer
2017-02-28 15:01:26 +00:00
}
2017-02-13 15:46:53 +00:00
}
}
2017-11-08 19:04:04 +00:00
// run launches some go-routines which live throughout the cluster's life
2017-02-02 22:52:06 +00:00
func ( c * Cluster ) run ( ) {
2017-04-05 21:29:22 +00:00
go c . syncWatcher ( )
2017-02-28 15:01:26 +00:00
go c . pushPingMetrics ( )
2017-02-13 15:46:53 +00:00
go c . pushInformerMetrics ( )
2017-11-08 19:04:04 +00:00
go c . watchPeers ( )
2017-02-28 15:01:26 +00:00
go c . alertsHandler ( )
2017-02-02 22:52:06 +00:00
}
2017-02-13 15:46:53 +00:00
func ( c * Cluster ) ready ( ) {
2017-02-02 22:52:06 +00:00
// We bootstrapped first because with dirty state consensus
// may have a peerset and not find a leader so we cannot wait
// for it.
timer := time . NewTimer ( 30 * time . Second )
select {
case <- timer . C :
logger . Error ( "consensus start timed out" )
c . Shutdown ( )
return
case <- c . consensus . Ready ( ) :
case <- c . ctx . Done ( ) :
return
}
// Cluster is ready.
2017-11-08 19:04:04 +00:00
peers , err := c . consensus . Peers ( )
if err != nil {
logger . Error ( err )
c . Shutdown ( )
return
}
logger . Info ( "Cluster Peers (without including ourselves):" )
if len ( peers ) == 1 {
2017-02-02 22:52:06 +00:00
logger . Info ( " - No other peers" )
}
2017-11-08 19:04:04 +00:00
for _ , p := range peers {
if p != c . id {
logger . Infof ( " - %s" , p . Pretty ( ) )
}
2017-02-02 22:52:06 +00:00
}
2017-02-13 15:46:53 +00:00
close ( c . readyCh )
2017-10-30 11:17:39 +00:00
c . readyB = true
2017-12-12 16:47:21 +00:00
logger . Info ( "** IPFS Cluster is READY **" )
2017-02-02 22:52:06 +00:00
}
func ( c * Cluster ) bootstrap ( ) bool {
// Cases in which we do not bootstrap
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
if len ( c . config . Bootstrap ) == 0 || len ( c . config . Peers ) > 0 {
2017-02-02 22:52:06 +00:00
return true
}
for _ , b := range c . config . Bootstrap {
logger . Infof ( "Bootstrapping to %s" , b )
err := c . Join ( b )
if err == nil {
return true
}
logger . Error ( err )
}
2017-11-10 15:22:31 +00:00
2017-02-02 22:52:06 +00:00
return false
2016-12-02 18:33:39 +00:00
}
2017-01-30 12:12:25 +00:00
// Ready returns a channel which signals when this peer is
// fully initialized (including consensus).
func ( c * Cluster ) Ready ( ) <- chan struct { } {
2017-02-02 22:52:06 +00:00
return c . readyCh
2017-01-30 12:12:25 +00:00
}
2016-12-02 18:33:39 +00:00
// Shutdown stops the IPFS cluster components
func ( c * Cluster ) Shutdown ( ) error {
2016-12-15 13:07:19 +00:00
c . shutdownLock . Lock ( )
defer c . shutdownLock . Unlock ( )
2017-02-02 22:52:06 +00:00
2017-10-31 10:20:14 +00:00
if c . shutdownB {
logger . Debug ( "Cluster is already shutdown" )
2016-12-15 13:07:19 +00:00
return nil
}
2017-10-31 10:20:14 +00:00
logger . Info ( "shutting down Cluster" )
2017-02-02 22:52:06 +00:00
2017-11-01 12:41:55 +00:00
// Only attempt to leave if:
// - consensus is initialized
// - cluster was ready (no bootstrapping error)
2017-11-08 19:04:04 +00:00
// - We are not removed already (means watchPeers() called uss)
2017-11-01 12:41:55 +00:00
if c . consensus != nil && c . config . LeaveOnShutdown && c . readyB && ! c . removed {
c . removed = true
2017-11-08 19:04:04 +00:00
peers , err := c . consensus . Peers ( )
if err == nil {
// best effort
logger . Warning ( "attempting to leave the cluster. This may take some seconds" )
err := c . consensus . RmPeer ( c . id )
if err != nil {
logger . Error ( "leaving cluster: " + err . Error ( ) )
}
// save peers as bootstrappers
c . config . Bootstrap = c . peerManager . addresses ( peers )
c . config . savePeers ( [ ] ma . Multiaddr { } )
2017-02-02 22:52:06 +00:00
}
}
2017-01-30 12:12:25 +00:00
if con := c . consensus ; con != nil {
if err := con . Shutdown ( ) ; err != nil {
logger . Errorf ( "error stopping consensus: %s" , err )
return err
}
2016-12-02 18:33:39 +00:00
}
2017-02-02 22:52:06 +00:00
2017-10-30 12:04:30 +00:00
// Do not save anything if we were not ready
2017-11-28 22:45:10 +00:00
// if c.readyB {
// // peers are saved usually on addPeer/rmPeer
// // c.peerManager.savePeers()
// c.config.BackupState(c.state)
//}
2017-02-02 22:52:06 +00:00
2017-11-01 12:25:28 +00:00
// We left the cluster or were removed. Destroy the Raft state.
if c . removed && c . readyB {
err := c . consensus . Clean ( )
if err != nil {
logger . Error ( "cleaning consensus: " , err )
}
}
2017-02-28 15:01:26 +00:00
if err := c . monitor . Shutdown ( ) ; err != nil {
logger . Errorf ( "error stopping monitor: %s" , err )
return err
}
2016-12-02 18:33:39 +00:00
if err := c . api . Shutdown ( ) ; err != nil {
2016-12-15 13:19:41 +00:00
logger . Errorf ( "error stopping API: %s" , err )
2016-12-02 18:33:39 +00:00
return err
}
if err := c . ipfs . Shutdown ( ) ; err != nil {
2016-12-15 13:19:41 +00:00
logger . Errorf ( "error stopping IPFS Connector: %s" , err )
2016-12-02 18:33:39 +00:00
return err
}
2016-12-07 16:21:29 +00:00
if err := c . tracker . Shutdown ( ) ; err != nil {
2016-12-15 13:19:41 +00:00
logger . Errorf ( "error stopping PinTracker: %s" , err )
2016-12-07 16:21:29 +00:00
return err
}
2017-11-15 01:33:46 +00:00
// Cancel contexts - **NOTE**: This kills the context in the
// libp2p HOST too!
c . cancel ( )
2016-12-23 18:35:37 +00:00
c . host . Close ( ) // Shutdown all network services
2017-11-15 01:33:46 +00:00
c . wg . Wait ( )
2017-10-31 10:20:14 +00:00
c . shutdownB = true
2017-01-30 12:12:25 +00:00
close ( c . doneCh )
2016-12-09 19:54:46 +00:00
return nil
}
2017-01-30 12:12:25 +00:00
// Done provides a way to learn if the Peer has been shutdown
// (for example, because it has been removed from the Cluster)
func ( c * Cluster ) Done ( ) <- chan struct { } {
return c . doneCh
}
2017-01-25 17:07:19 +00:00
// ID returns information about the Cluster peer
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) ID ( ) api . ID {
2017-01-26 18:59:31 +00:00
// ignore error since it is included in response object
ipfsID , _ := c . ipfs . ID ( )
var addrs [ ] ma . Multiaddr
2017-02-15 12:51:43 +00:00
addrsSet := make ( map [ string ] struct { } ) // to filter dups
2017-01-26 18:59:31 +00:00
for _ , addr := range c . host . Addrs ( ) {
2017-02-15 12:51:43 +00:00
addrsSet [ addr . String ( ) ] = struct { } { }
}
for k := range addrsSet {
addr , _ := ma . NewMultiaddr ( k )
2017-02-15 12:40:08 +00:00
addrs = append ( addrs , multiaddrJoin ( addr , c . id ) )
2017-01-26 18:59:31 +00:00
}
2017-11-15 15:23:22 +00:00
peers := [ ] peer . ID { }
// This method might get called very early by a remote peer
// and might catch us when consensus is not set
if c . consensus != nil {
peers , _ = c . consensus . Peers ( )
}
2017-11-08 19:04:04 +00:00
2017-02-08 17:04:08 +00:00
return api . ID {
2017-02-15 12:40:08 +00:00
ID : c . id ,
//PublicKey: c.host.Peerstore().PubKey(c.id),
2017-11-10 15:09:45 +00:00
Addresses : addrs ,
ClusterPeers : peers ,
ClusterPeersAddresses : c . peerManager . addresses ( peers ) ,
Version : Version ,
Commit : Commit ,
RPCProtocolVersion : RPCProtocol ,
IPFS : ipfsID ,
2017-12-01 18:50:13 +00:00
Peername : c . config . Peername ,
2017-01-24 15:19:23 +00:00
}
}
2017-01-30 12:12:25 +00:00
// PeerAdd adds a new peer to this Cluster.
//
2017-02-02 22:52:06 +00:00
// The new peer must be reachable. It will be added to the
// consensus and will receive the shared state (including the
// list of peers). The new peer should be a single-peer cluster,
// preferable without any relevant state.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) PeerAdd ( addr ma . Multiaddr ) ( api . ID , error ) {
2017-02-02 22:52:06 +00:00
// starting 10 nodes on the same box for testing
// causes deadlock and a global lock here
// seems to help.
2017-11-08 19:04:04 +00:00
// c.paMux.Lock()
// defer c.paMux.Unlock()
2017-02-02 22:52:06 +00:00
logger . Debugf ( "peerAdd called with %s" , addr )
pid , decapAddr , err := multiaddrSplit ( addr )
2017-01-30 12:12:25 +00:00
if err != nil {
2017-02-08 17:04:08 +00:00
id := api . ID {
2017-01-30 12:12:25 +00:00
Error : err . Error ( ) ,
}
return id , err
}
2017-02-02 22:52:06 +00:00
// Figure out its real address if we have one
remoteAddr := getRemoteMultiaddr ( c . host , pid , decapAddr )
2017-11-08 19:04:04 +00:00
// whisper address to everyone, including ourselves
peers , err := c . consensus . Peers ( )
2017-01-30 12:12:25 +00:00
if err != nil {
logger . Error ( err )
2017-11-08 19:04:04 +00:00
return api . ID { Error : err . Error ( ) } , err
}
errs := c . multiRPC ( peers , "Cluster" ,
"PeerManagerAddPeer" ,
api . MultiaddrToSerial ( remoteAddr ) ,
copyEmptyStructToIfaces ( make ( [ ] struct { } , len ( peers ) , len ( peers ) ) ) )
brk := false
for i , e := range errs {
if e != nil {
brk = true
logger . Errorf ( "%s: %s" , peers [ i ] . Pretty ( ) , e )
}
}
if brk {
msg := "error broadcasting new peer's address: all cluster members need to be healthy for this operation to succeed. Try removing any unhealthy peers. Check the logs for more information about the error."
logger . Error ( msg )
id := api . ID { ID : pid , Error : "error broadcasting new peer's address" }
return id , errors . New ( msg )
2017-01-30 12:12:25 +00:00
}
2017-02-02 22:52:06 +00:00
// Figure out our address to that peer. This also
// ensures that it is reachable
2017-02-08 17:04:08 +00:00
var addrSerial api . MultiaddrSerial
2017-02-02 22:52:06 +00:00
err = c . rpcClient . Call ( pid , "Cluster" ,
2017-02-15 12:40:08 +00:00
"RemoteMultiaddrForPeer" , c . id , & addrSerial )
2017-02-02 22:52:06 +00:00
if err != nil {
2017-01-30 12:12:25 +00:00
logger . Error ( err )
2017-02-08 17:04:08 +00:00
id := api . ID { ID : pid , Error : err . Error ( ) }
2017-01-30 12:12:25 +00:00
return id , err
}
2017-02-02 22:52:06 +00:00
// Send cluster peers to the new peer.
2017-11-08 19:04:04 +00:00
clusterPeers := append ( c . peerManager . addresses ( peers ) ,
2017-02-02 22:52:06 +00:00
addrSerial . ToMultiaddr ( ) )
err = c . rpcClient . Call ( pid ,
"Cluster" ,
2017-11-08 19:04:04 +00:00
"PeerManagerImportAddresses" ,
2017-02-08 17:04:08 +00:00
api . MultiaddrsToSerial ( clusterPeers ) ,
2017-02-02 22:52:06 +00:00
& struct { } { } )
2017-01-30 12:12:25 +00:00
if err != nil {
2017-02-02 22:52:06 +00:00
logger . Error ( err )
2017-01-30 12:12:25 +00:00
}
2017-11-14 20:04:33 +00:00
// Log the new peer in the log so everyone gets it.
err = c . consensus . AddPeer ( pid )
if err != nil {
logger . Error ( err )
id := api . ID { ID : pid , Error : err . Error ( ) }
return id , err
}
2017-03-23 18:34:33 +00:00
// Ask the new peer to connect its IPFS daemon to the rest
err = c . rpcClient . Call ( pid ,
"Cluster" ,
"IPFSConnectSwarms" ,
struct { } { } ,
& struct { } { } )
if err != nil {
logger . Error ( err )
}
2017-11-14 22:54:23 +00:00
id := api . ID { }
// wait up to 2 seconds for new peer to catch up
// and return an up to date api.ID object.
// otherwise it might not contain the current cluster peers
// as it should.
for i := 0 ; i < 20 ; i ++ {
id , _ = c . getIDForPeer ( pid )
ownPeers , err := c . consensus . Peers ( )
if err != nil {
break
}
newNodePeers := id . ClusterPeers
added , removed := diffPeers ( ownPeers , newNodePeers )
if len ( added ) == 0 && len ( removed ) == 0 {
break // the new peer has fully joined
}
time . Sleep ( 200 * time . Millisecond )
logger . Debugf ( "%s addPeer: retrying to get ID from %s" ,
c . id . Pretty ( ) , pid . Pretty ( ) )
}
2017-02-02 22:52:06 +00:00
return id , nil
2017-01-30 12:12:25 +00:00
}
// PeerRemove removes a peer from this Cluster.
//
2017-11-08 19:04:04 +00:00
// The peer will be removed from the consensus peerset, all it's content
// will be re-pinned and the peer it will shut itself down.
2017-02-02 22:52:06 +00:00
func ( c * Cluster ) PeerRemove ( pid peer . ID ) error {
2017-10-31 10:20:14 +00:00
// We need to repin before removing the peer, otherwise, it won't
// be able to submit the pins.
logger . Infof ( "re-allocating all CIDs directly associated to %s" , pid )
2017-07-03 15:45:22 +00:00
c . repinFromPeer ( pid )
2017-11-08 19:04:04 +00:00
err := c . consensus . RmPeer ( pid )
2017-02-02 22:52:06 +00:00
if err != nil {
logger . Error ( err )
return err
}
return nil
}
// Join adds this peer to an existing cluster. The calling peer should
// be a single-peer cluster node. This is almost equivalent to calling
// PeerAdd on the destination cluster.
func ( c * Cluster ) Join ( addr ma . Multiaddr ) error {
logger . Debugf ( "Join(%s)" , addr )
//if len(c.peerManager.peers()) > 1 {
// logger.Error(c.peerManager.peers())
// return errors.New("only single-node clusters can be joined")
//}
pid , _ , err := multiaddrSplit ( addr )
if err != nil {
logger . Error ( err )
return err
}
// Bootstrap to myself
2017-02-15 12:40:08 +00:00
if pid == c . id {
2017-02-02 22:52:06 +00:00
return nil
2017-01-30 12:12:25 +00:00
}
2017-02-02 22:52:06 +00:00
// Add peer to peerstore so we can talk to it
2017-11-08 19:04:04 +00:00
c . peerManager . addPeer ( addr )
2017-02-02 22:52:06 +00:00
// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
2017-10-30 11:17:39 +00:00
// ListenAddr).
2017-02-08 17:04:08 +00:00
var myID api . IDSerial
2017-02-02 22:52:06 +00:00
err = c . rpcClient . Call ( pid ,
"Cluster" ,
"PeerAdd" ,
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
api . MultiaddrToSerial (
multiaddrJoin ( c . config . ListenAddr , c . id ) ) ,
2017-02-02 22:52:06 +00:00
& myID )
if err != nil {
logger . Error ( err )
return err
2017-01-30 12:12:25 +00:00
}
2017-02-02 22:52:06 +00:00
// wait for leader and for state to catch up
// then sync
err = c . consensus . WaitForSync ( )
if err != nil {
logger . Error ( err )
return err
}
2017-11-10 15:22:31 +00:00
// Since we might call this while not ready (bootstrap), we need to save
// peers or we won't notice.
peers , err := c . consensus . Peers ( )
if err != nil {
logger . Error ( err )
} else {
c . config . savePeers ( c . peerManager . addresses ( peers ) )
}
2017-02-02 22:52:06 +00:00
c . StateSync ( )
2017-11-08 19:04:04 +00:00
logger . Infof ( "%s: joined %s's cluster" , c . id . Pretty ( ) , pid . Pretty ( ) )
2017-01-30 12:12:25 +00:00
return nil
}
2016-12-20 18:51:13 +00:00
// StateSync syncs the consensus state to the Pin Tracker, ensuring
// that every Cid that should be tracked is tracked. It returns
// PinInfo for Cids which were added or deleted.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) StateSync ( ) ( [ ] api . PinInfo , error ) {
2016-12-09 19:54:46 +00:00
cState , err := c . consensus . State ( )
if err != nil {
2016-12-15 18:08:46 +00:00
return nil , err
2016-12-09 19:54:46 +00:00
}
2016-12-20 18:51:13 +00:00
2017-01-23 22:58:04 +00:00
logger . Debug ( "syncing state to tracker" )
2017-02-13 15:46:53 +00:00
clusterPins := cState . List ( )
2016-12-20 18:51:13 +00:00
var changed [ ] * cid . Cid
// Track items which are not tracked
2017-03-08 15:57:27 +00:00
for _ , pin := range clusterPins {
if c . tracker . Status ( pin . Cid ) . Status == api . TrackerStatusUnpinned {
changed = append ( changed , pin . Cid )
go c . tracker . Track ( pin )
2016-12-20 18:51:13 +00:00
}
}
// Untrack items which should not be tracked
2017-01-25 18:38:23 +00:00
for _ , p := range c . tracker . StatusAll ( ) {
2017-02-13 15:46:53 +00:00
if ! cState . Has ( p . Cid ) {
2017-02-08 17:04:08 +00:00
changed = append ( changed , p . Cid )
go c . tracker . Untrack ( p . Cid )
2016-12-20 18:51:13 +00:00
}
}
2017-02-08 17:04:08 +00:00
var infos [ ] api . PinInfo
2016-12-19 17:35:24 +00:00
for _ , h := range changed {
2017-01-25 18:38:23 +00:00
infos = append ( infos , c . tracker . Status ( h ) )
2016-12-20 18:51:13 +00:00
}
return infos , nil
}
2017-12-01 11:56:26 +00:00
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
// If an error happens, the slice will contain as much information as
// could be fetched from other peers.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) StatusAll ( ) ( [ ] api . GlobalPinInfo , error ) {
2017-01-25 18:38:23 +00:00
return c . globalPinInfoSlice ( "TrackerStatusAll" )
2016-12-23 18:35:37 +00:00
}
2017-12-01 11:56:26 +00:00
// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer.
func ( c * Cluster ) StatusAllLocal ( ) [ ] api . PinInfo {
return c . tracker . StatusAll ( )
}
// Status returns the GlobalPinInfo for a given Cid as fetched from all
// current peers. If an error happens, the GlobalPinInfo should contain
// as much information as could be fetched from the other peers.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) Status ( h * cid . Cid ) ( api . GlobalPinInfo , error ) {
2017-01-25 18:38:23 +00:00
return c . globalPinInfoCid ( "TrackerStatus" , h )
2016-12-23 18:35:37 +00:00
}
2017-12-01 11:56:26 +00:00
// StatusLocal returns this peer's PinInfo for a given Cid.
func ( c * Cluster ) StatusLocal ( h * cid . Cid ) api . PinInfo {
return c . tracker . Status ( h )
}
// SyncAll triggers SyncAllLocal() operations in all cluster peers, making sure
// that the state of tracked items matches the state reported by the IPFS daemon
// and returning the results as GlobalPinInfo. If an error happens, the slice
// will contain as much information as could be fetched from the peers.
func ( c * Cluster ) SyncAll ( ) ( [ ] api . GlobalPinInfo , error ) {
return c . globalPinInfoSlice ( "SyncAllLocal" )
}
2017-01-25 18:38:23 +00:00
// SyncAllLocal makes sure that the current state for all tracked items
2017-12-01 11:56:26 +00:00
// in this peer matches the state reported by the IPFS daemon.
2016-12-20 18:51:13 +00:00
//
2017-01-25 18:38:23 +00:00
// SyncAllLocal returns the list of PinInfo that where updated because of
// the operation, along with those in error states.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) SyncAllLocal ( ) ( [ ] api . PinInfo , error ) {
2017-01-25 18:38:23 +00:00
syncedItems , err := c . tracker . SyncAll ( )
// Despite errors, tracker provides synced items that we can provide.
// They encapsulate the error.
2017-01-25 17:07:19 +00:00
if err != nil {
logger . Error ( "tracker.Sync() returned with error: " , err )
logger . Error ( "Is the ipfs daemon running?" )
2016-12-09 19:54:46 +00:00
}
2017-01-25 18:38:23 +00:00
return syncedItems , err
2016-12-15 18:08:46 +00:00
}
2017-12-04 12:59:48 +00:00
// Sync triggers a SyncLocal() operation for a given Cid.
2017-12-01 11:56:26 +00:00
// in all cluster peers.
func ( c * Cluster ) Sync ( h * cid . Cid ) ( api . GlobalPinInfo , error ) {
return c . globalPinInfoCid ( "SyncLocal" , h )
}
2017-01-25 18:38:23 +00:00
// SyncLocal performs a local sync operation for the given Cid. This will
// tell the tracker to verify the status of the Cid against the IPFS daemon.
// It returns the updated PinInfo for the Cid.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) SyncLocal ( h * cid . Cid ) ( api . PinInfo , error ) {
2016-12-20 18:51:13 +00:00
var err error
2017-01-25 18:38:23 +00:00
pInfo , err := c . tracker . Sync ( h )
2017-01-25 17:07:19 +00:00
// Despite errors, trackers provides an updated PinInfo so
// we just log it.
if err != nil {
logger . Error ( "tracker.SyncCid() returned with error: " , err )
logger . Error ( "Is the ipfs daemon running?" )
2016-12-15 18:08:46 +00:00
}
2017-01-25 18:38:23 +00:00
return pInfo , err
2016-12-15 18:08:46 +00:00
}
2017-12-01 11:56:26 +00:00
// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
2017-11-30 00:53:31 +00:00
// by this peer.
func ( c * Cluster ) RecoverAllLocal ( ) ( [ ] api . PinInfo , error ) {
return c . tracker . RecoverAll ( )
}
2017-01-25 18:38:23 +00:00
// Recover triggers a recover operation for a given Cid in all
2017-01-26 18:59:31 +00:00
// cluster peers.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) Recover ( h * cid . Cid ) ( api . GlobalPinInfo , error ) {
2017-01-25 18:38:23 +00:00
return c . globalPinInfoCid ( "TrackerRecover" , h )
2016-12-15 18:08:46 +00:00
}
2017-12-01 11:56:26 +00:00
// RecoverLocal triggers a recover operation for a given Cid in this peer only.
// It returns the updated PinInfo, after recovery.
func ( c * Cluster ) RecoverLocal ( h * cid . Cid ) ( api . PinInfo , error ) {
return c . tracker . Recover ( h )
}
2016-12-15 18:08:46 +00:00
// Pins returns the list of Cids managed by Cluster and which are part
// of the current global state. This is the source of truth as to which
2017-04-06 02:27:02 +00:00
// pins are managed and their allocation, but does not indicate if
// the item is successfully pinned. For that, use StatusAll().
2017-03-08 15:57:27 +00:00
func ( c * Cluster ) Pins ( ) [ ] api . Pin {
2016-12-15 18:08:46 +00:00
cState , err := c . consensus . State ( )
if err != nil {
2017-02-01 17:16:09 +00:00
logger . Error ( err )
2017-03-08 15:57:27 +00:00
return [ ] api . Pin { }
2016-12-15 18:08:46 +00:00
}
2017-02-13 15:46:53 +00:00
return cState . List ( )
2016-12-02 18:33:39 +00:00
}
2017-04-06 02:27:02 +00:00
// PinGet returns information for a single Cid managed by Cluster.
// The information is obtained from the current global state. The
// returned api.Pin provides information about the allocations
// assigned for the requested Cid, but does not provide indicate if
// the item is successfully pinned. For that, use Status(). PinGet
// returns an error if the given Cid is not part of the global state.
func ( c * Cluster ) PinGet ( h * cid . Cid ) ( api . Pin , error ) {
2018-01-16 10:19:39 +00:00
pin := c . getCurrentPin ( h )
if pin . ReplicationFactorMin == 0 && pin . ReplicationFactorMax == 0 {
2017-12-06 12:45:35 +00:00
return pin , errors . New ( "cid is not part of the global state" )
2017-04-06 02:27:02 +00:00
}
return pin , nil
}
2016-12-02 18:33:39 +00:00
// Pin makes the cluster Pin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state. Depending on the cluster
2016-12-16 11:40:28 +00:00
// pinning strategy, the PinTracker may then request the IPFS daemon
2017-01-23 13:01:49 +00:00
// to pin the Cid.
2016-12-02 18:33:39 +00:00
//
// Pin returns an error if the operation could not be persisted
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
2017-03-08 17:28:43 +00:00
func ( c * Cluster ) Pin ( pin api . Pin ) error {
2017-07-03 15:45:22 +00:00
return c . pin ( pin , [ ] peer . ID { } )
}
// pin performs the actual pinning and supports a blacklist to be
// able to evacuate a node.
func ( c * Cluster ) pin ( pin api . Pin , blacklist [ ] peer . ID ) error {
2018-01-12 17:04:46 +00:00
rplMin := pin . ReplicationFactorMin
rplMax := pin . ReplicationFactorMax
if rplMin == 0 {
rplMin = c . config . ReplicationFactorMin
pin . ReplicationFactorMin = rplMin
2017-02-13 15:46:53 +00:00
}
2018-01-12 17:04:46 +00:00
if rplMax == 0 {
rplMax = c . config . ReplicationFactorMax
pin . ReplicationFactorMax = rplMax
}
if err := isReplicationFactorValid ( rplMin , rplMax ) ; err != nil {
return err
}
2017-02-13 15:46:53 +00:00
switch {
2018-01-12 17:04:46 +00:00
case rplMin == - 1 && rplMax == - 1 :
2017-03-29 20:52:13 +00:00
pin . Allocations = [ ] peer . ID { }
2017-03-08 17:28:43 +00:00
logger . Infof ( "IPFS cluster pinning %s everywhere:" , pin . Cid )
2018-01-12 17:04:46 +00:00
default :
allocs , err := c . allocate ( pin . Cid , rplMin , rplMax , blacklist )
2017-02-13 15:46:53 +00:00
if err != nil {
return err
}
2018-01-12 17:04:46 +00:00
if allocs == nil {
logger . Infof ( "Skipping repinning of %s. Replication factor is within thresholds" , pin . Cid )
return nil
}
2017-03-08 15:57:27 +00:00
pin . Allocations = allocs
2017-03-08 17:28:43 +00:00
logger . Infof ( "IPFS cluster pinning %s on %s:" , pin . Cid , pin . Allocations )
2017-02-13 15:46:53 +00:00
}
2017-03-08 15:57:27 +00:00
err := c . consensus . LogPin ( pin )
2016-12-23 18:35:37 +00:00
if err != nil {
return err
2016-12-02 18:33:39 +00:00
}
return nil
}
// Unpin makes the cluster Unpin a Cid. This implies adding the Cid
2017-01-23 13:01:49 +00:00
// to the IPFS Cluster peers shared-state.
2016-12-02 18:33:39 +00:00
//
// Unpin returns an error if the operation could not be persisted
// to the global state. Unpin does not reflect the success or failure
// of underlying IPFS daemon unpinning operations.
func ( c * Cluster ) Unpin ( h * cid . Cid ) error {
2017-02-28 15:01:26 +00:00
logger . Info ( "IPFS cluster unpinning:" , h )
2017-02-13 15:46:53 +00:00
2017-03-08 15:57:27 +00:00
pin := api . Pin {
2017-02-13 15:46:53 +00:00
Cid : h ,
}
2017-03-08 15:57:27 +00:00
err := c . consensus . LogUnpin ( pin )
2016-12-23 18:35:37 +00:00
if err != nil {
return err
2016-12-02 18:33:39 +00:00
}
return nil
}
2017-11-08 19:04:04 +00:00
// Version returns the current IPFS Cluster version.
2016-12-02 18:33:39 +00:00
func ( c * Cluster ) Version ( ) string {
return Version
}
2017-11-08 19:04:04 +00:00
// Peers returns the IDs of the members of this Cluster.
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) Peers ( ) [ ] api . ID {
2017-11-08 19:04:04 +00:00
members , err := c . consensus . Peers ( )
if err != nil {
logger . Error ( err )
logger . Error ( "an empty list of peers will be returned" )
return [ ] api . ID { }
}
2017-02-08 17:04:08 +00:00
peersSerial := make ( [ ] api . IDSerial , len ( members ) , len ( members ) )
peers := make ( [ ] api . ID , len ( members ) , len ( members ) )
2017-01-26 18:59:31 +00:00
2017-01-30 12:12:25 +00:00
errs := c . multiRPC ( members , "Cluster" , "ID" , struct { } { } ,
copyIDSerialsToIfaces ( peersSerial ) )
2017-01-26 18:59:31 +00:00
for i , err := range errs {
if err != nil {
peersSerial [ i ] . ID = peer . IDB58Encode ( members [ i ] )
peersSerial [ i ] . Error = err . Error ( )
}
}
for i , ps := range peersSerial {
peers [ i ] = ps . ToID ( )
}
return peers
}
2017-11-08 19:04:04 +00:00
// makeHost makes a libp2p-host.
2016-12-15 18:08:46 +00:00
func makeHost ( ctx context . Context , cfg * Config ) ( host . Host , error ) {
2016-12-02 18:33:39 +00:00
ps := peerstore . NewPeerstore ( )
2017-01-23 17:38:59 +00:00
privateKey := cfg . PrivateKey
2016-12-02 18:33:39 +00:00
publicKey := privateKey . GetPublic ( )
2017-06-22 17:21:05 +00:00
var protec ipnet . Protector
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
if len ( cfg . Secret ) != 0 {
2017-06-30 00:40:06 +00:00
var err error
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
clusterKey , err := clusterSecretToKey ( cfg . Secret )
2017-07-03 20:00:01 +00:00
if err != nil {
return nil , err
}
2017-07-04 20:39:10 +00:00
protec , err = pnet . NewProtector ( strings . NewReader ( clusterKey ) )
2017-06-22 17:21:05 +00:00
if err != nil {
return nil , err
}
// this is in go-ipfs, not sure whether we want something like it here
/ * go func ( ) {
t := time . NewTicker ( 30 * time . Second )
<- t . C // swallow one tick
for {
select {
case <- t . C :
if ph := cfg . Host ; ph != nil {
if len ( ph . Network ( ) . Peers ( ) ) == 0 {
log . Warning ( "We are in a private network and have no peers." )
log . Warning ( "This might be a configuration mistake." )
}
}
case <- n . Process ( ) . Closing :
t . Stop ( )
return
}
}
} ( ) * /
}
2017-01-23 17:38:59 +00:00
if err := ps . AddPubKey ( cfg . ID , publicKey ) ; err != nil {
2016-12-02 18:33:39 +00:00
return nil , err
}
2017-01-23 17:38:59 +00:00
if err := ps . AddPrivKey ( cfg . ID , privateKey ) ; err != nil {
2016-12-02 18:33:39 +00:00
return nil , err
}
2017-11-29 16:49:03 +00:00
ps . AddAddr ( cfg . ID , cfg . ListenAddr , peerstore . PermanentAddrTTL )
2017-06-22 17:21:05 +00:00
network , err := swarm . NewNetworkWithProtector (
2016-12-02 18:33:39 +00:00
ctx ,
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
[ ] ma . Multiaddr { cfg . ListenAddr } ,
2017-01-23 17:38:59 +00:00
cfg . ID ,
2016-12-02 18:33:39 +00:00
ps ,
2017-06-22 17:21:05 +00:00
protec ,
2016-12-02 18:33:39 +00:00
nil ,
)
if err != nil {
return nil , err
}
bhost := basichost . New ( network )
return bhost , nil
}
2016-12-20 18:51:13 +00:00
2017-01-30 12:12:25 +00:00
// Perform an RPC request to multiple destinations
2016-12-23 18:35:37 +00:00
func ( c * Cluster ) multiRPC ( dests [ ] peer . ID , svcName , svcMethod string , args interface { } , reply [ ] interface { } ) [ ] error {
if len ( dests ) != len ( reply ) {
2017-01-23 13:21:26 +00:00
panic ( "must have matching dests and replies" )
2016-12-23 18:35:37 +00:00
}
var wg sync . WaitGroup
errs := make ( [ ] error , len ( dests ) , len ( dests ) )
2016-12-28 15:25:24 +00:00
for i := range dests {
2016-12-23 18:35:37 +00:00
wg . Add ( 1 )
go func ( i int ) {
defer wg . Done ( )
err := c . rpcClient . Call (
dests [ i ] ,
svcName ,
svcMethod ,
args ,
reply [ i ] )
errs [ i ] = err
} ( i )
}
wg . Wait ( )
return errs
2016-12-20 18:51:13 +00:00
2016-12-23 18:35:37 +00:00
}
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) globalPinInfoCid ( method string , h * cid . Cid ) ( api . GlobalPinInfo , error ) {
pin := api . GlobalPinInfo {
2017-01-25 17:07:19 +00:00
Cid : h ,
2017-02-08 17:04:08 +00:00
PeerMap : make ( map [ peer . ID ] api . PinInfo ) ,
2016-12-20 18:51:13 +00:00
}
2017-11-08 19:04:04 +00:00
members , err := c . consensus . Peers ( )
if err != nil {
logger . Error ( err )
return api . GlobalPinInfo { } , err
}
2017-02-08 17:04:08 +00:00
replies := make ( [ ] api . PinInfoSerial , len ( members ) , len ( members ) )
2017-03-08 15:57:27 +00:00
arg := api . Pin {
2017-02-08 17:04:08 +00:00
Cid : h ,
}
errs := c . multiRPC ( members ,
"Cluster" ,
method , arg . ToSerial ( ) ,
copyPinInfoSerialToIfaces ( replies ) )
for i , rserial := range replies {
2017-06-21 12:16:28 +00:00
e := errs [ i ]
// Potentially rserial is empty. But ToPinInfo ignores all
// errors from underlying libraries. In that case .Status
// will be TrackerStatusBug (0)
2017-02-08 17:04:08 +00:00
r := rserial . ToPinInfo ( )
2017-06-21 12:16:28 +00:00
// No error. Parse and continue
if e == nil {
pin . PeerMap [ members [ i ] ] = r
continue
}
// Deal with error cases (err != nil): wrap errors in PinInfo
// In this case, we had no answer at all. The contacted peer
// must be offline or unreachable.
if r . Status == api . TrackerStatusBug {
logger . Errorf ( "%s: error in broadcast response from %s: %s " , c . id , members [ i ] , e )
pin . PeerMap [ members [ i ] ] = api . PinInfo {
Cid : h ,
Peer : members [ i ] ,
Status : api . TrackerStatusClusterError ,
TS : time . Now ( ) ,
Error : e . Error ( ) ,
2017-01-23 23:52:42 +00:00
}
2017-06-21 12:16:28 +00:00
} else { // there was an rpc error, but got a valid response :S
r . Error = e . Error ( )
pin . PeerMap [ members [ i ] ] = r
// unlikely to come down this path
2016-12-20 18:51:13 +00:00
}
}
2016-12-28 15:25:24 +00:00
2017-01-23 23:52:42 +00:00
return pin , nil
2016-12-20 18:51:13 +00:00
}
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) globalPinInfoSlice ( method string ) ( [ ] api . GlobalPinInfo , error ) {
var infos [ ] api . GlobalPinInfo
fullMap := make ( map [ string ] api . GlobalPinInfo )
2016-12-20 18:51:13 +00:00
2017-11-08 19:04:04 +00:00
members , err := c . consensus . Peers ( )
if err != nil {
logger . Error ( err )
return [ ] api . GlobalPinInfo { } , err
}
2017-02-08 17:04:08 +00:00
replies := make ( [ ] [ ] api . PinInfoSerial , len ( members ) , len ( members ) )
errs := c . multiRPC ( members ,
"Cluster" ,
method , struct { } { } ,
copyPinInfoSerialSliceToIfaces ( replies ) )
2016-12-20 18:51:13 +00:00
2017-02-08 17:04:08 +00:00
mergePins := func ( pins [ ] api . PinInfoSerial ) {
for _ , pserial := range pins {
p := pserial . ToPinInfo ( )
item , ok := fullMap [ pserial . Cid ]
2016-12-20 18:51:13 +00:00
if ! ok {
2017-02-08 17:04:08 +00:00
fullMap [ pserial . Cid ] = api . GlobalPinInfo {
Cid : p . Cid ,
PeerMap : map [ peer . ID ] api . PinInfo {
2016-12-20 18:51:13 +00:00
p . Peer : p ,
} ,
}
} else {
2017-01-25 17:07:19 +00:00
item . PeerMap [ p . Peer ] = p
2016-12-20 18:51:13 +00:00
}
}
}
2017-01-24 00:09:27 +00:00
erroredPeers := make ( map [ peer . ID ] string )
2016-12-23 18:35:37 +00:00
for i , r := range replies {
2017-01-25 17:07:19 +00:00
if e := errs [ i ] ; e != nil { // This error must come from not being able to contact that cluster member
2017-02-15 12:40:08 +00:00
logger . Errorf ( "%s: error in broadcast response from %s: %s " , c . id , members [ i ] , e )
2017-01-24 00:09:27 +00:00
erroredPeers [ members [ i ] ] = e . Error ( )
2017-01-23 23:52:42 +00:00
} else {
mergePins ( r )
2016-12-20 18:51:13 +00:00
}
}
2017-01-24 00:09:27 +00:00
// Merge any errors
for p , msg := range erroredPeers {
2017-02-08 17:04:08 +00:00
for cidStr := range fullMap {
c , _ := cid . Decode ( cidStr )
fullMap [ cidStr ] . PeerMap [ p ] = api . PinInfo {
Cid : c ,
2017-01-24 00:09:27 +00:00
Peer : p ,
2017-02-08 17:04:08 +00:00
Status : api . TrackerStatusClusterError ,
2017-01-24 00:09:27 +00:00
TS : time . Now ( ) ,
Error : msg ,
}
}
}
2016-12-20 18:51:13 +00:00
for _ , v := range fullMap {
infos = append ( infos , v )
}
2017-01-23 23:52:42 +00:00
return infos , nil
2016-12-20 18:51:13 +00:00
}
2017-01-23 19:29:05 +00:00
2017-02-08 17:04:08 +00:00
func ( c * Cluster ) getIDForPeer ( pid peer . ID ) ( api . ID , error ) {
idSerial := api . ID { ID : pid } . ToSerial ( )
2017-02-02 22:52:06 +00:00
err := c . rpcClient . Call (
pid , "Cluster" , "ID" , struct { } { } , & idSerial )
id := idSerial . ToID ( )
if err != nil {
logger . Error ( err )
id . Error = err . Error ( )
2017-01-23 19:29:05 +00:00
}
2017-02-02 22:52:06 +00:00
return id , err
2017-01-23 19:29:05 +00:00
}
2017-02-13 15:46:53 +00:00
2017-11-14 22:54:23 +00:00
// diffPeers returns the peerIDs added and removed from peers2 in relation to
// peers1
func diffPeers ( peers1 , peers2 [ ] peer . ID ) ( added , removed [ ] peer . ID ) {
m1 := make ( map [ peer . ID ] struct { } )
m2 := make ( map [ peer . ID ] struct { } )
added = make ( [ ] peer . ID , 0 )
removed = make ( [ ] peer . ID , 0 )
if peers1 == nil && peers2 == nil {
return
}
if peers1 == nil {
added = peers2
return
}
if peers2 == nil {
removed = peers1
return
}
for _ , p := range peers1 {
m1 [ p ] = struct { } { }
}
for _ , p := range peers2 {
m2 [ p ] = struct { } { }
}
2017-12-06 12:45:35 +00:00
for k := range m1 {
2017-11-14 22:54:23 +00:00
_ , ok := m2 [ k ]
if ! ok {
removed = append ( removed , k )
}
}
2017-12-06 12:45:35 +00:00
for k := range m2 {
2017-11-14 22:54:23 +00:00
_ , ok := m1 [ k ]
if ! ok {
added = append ( added , k )
}
}
return
}