diff --git a/README.md b/README.md index 645a821b..3033695f 100644 --- a/README.md +++ b/README.md @@ -81,28 +81,37 @@ $ ipfs-cluster-service -init The configuration will be placed in `~/.ipfs-cluster/service.json` by default. -You can add the multiaddresses for the other cluster peers the `cluster_peers` variable. For example, here is a valid configuration for a cluster of 4 peers: +You can add the multiaddresses for the other cluster peers the `bootstrap_multiaddresses` variable. For example, here is a valid configuration for a single-peer cluster: ```json { - "id": "QmSGCzHkz8gC9fNndMtaCZdf9RFtwtbTEEsGo4zkVfcykD", - "private_key" : "", - "cluster_peers" : [ - "/ip4/192.168.1.2/tcp/9096/ipfs/QmcQ5XvrSQ4DouNkQyQtEoLczbMr6D9bSenGy6WQUCQUBt", - "/ip4/192.168.1.3/tcp/9096/ipfs/QmdFBMf9HMDH3eCWrc1U11YCPenC3Uvy9mZQ2BedTyKTDf", - "/ip4/192.168.1.4/tcp/9096/ipfs/QmYY1ggjoew5eFrvkenTR3F4uWqtkBkmgfJk8g9Qqcwy51" - ], + "id": "QmXMhZ53zAoes8TYbKGn3rnm5nfWs5Wdu41Fhhfw9XmM5A", + "private_key": "", + "cluster_peers": [], + "bootstrap": [], + "leave_on_shutdown": false, "cluster_multiaddress": "/ip4/0.0.0.0/tcp/9096", "api_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "ipfs_proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001", - "consensus_data_folder": "/home/user/.ipfs-cluster/data" + "consensus_data_folder": "/home/hector/go/src/github.com/ipfs/ipfs-cluster/ipfs-cluster-service/d1/data", + "state_sync_seconds": 60 } ``` -The configuration file should probably be identical among all cluster peers, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it will be removed on boot. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig). +The configuration file should probably be identical among all cluster peers, except for the `id` and `private_key` fields. Once every cluster peer has the configuration in place, you can run `ipfs-cluster-service` to start the cluster. + +#### Clusters using `cluster_peers` + +The `cluster_peers` configuration variable holds a list of current cluster members. If you know the members of the cluster in advance, or you want to start a cluster fully in parallel, set `cluster_peers` in all configurations so that every peer knows the rest upon boot. Leave `bootstrap` empty (although it will be ignored anyway) + +#### Clusters using `bootstrap` + +When the `cluster_peers` variable is empty, the multiaddresses `bootstrap` can be used to have a peer join an existing cluster. The peer will contact those addresses (in order) until one of them succeeds in joining it to the cluster. When the peer is shut down, it will save the current cluster peers in the `cluster_peers` configuration variable for future use. + +Bootstrap is a convenient method, but more prone to errors than `cluster_peers`. It can be used as well with `ipfs-cluster-service --bootstrap `. Note that bootstrapping nodes with an old state (or diverging state) from the one running in the cluster may lead to problems with +the consensus, so usually you would want to bootstrap blank nodes. -Once every cluster peer has the configuration in place, you can run `ipfs-cluster-service` to start the cluster. #### Debugging @@ -153,17 +162,19 @@ Then run cluster: ``` node0> ipfs-cluster-service -12:38:34.470 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59 -12:38:34.472 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF cluster.go:61 -12:38:34.472 INFO cluster: /ip4/192.168.1.58/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF cluster.go:61 -12:38:34.472 INFO cluster: This is a single-node cluster peer_manager.go:141 -12:38:34.569 INFO cluster: starting Consensus and waiting for a leader... consensus.go:124 -12:38:34.591 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309 -12:38:34.591 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168 -12:38:34.592 INFO cluster: PinTracker ready map_pin_tracker.go:71 -12:38:36.092 INFO cluster: Raft Leader elected: QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF raft.go:146 -12:38:36.092 INFO cluster: Consensus state is up to date consensus.go:170 -12:38:36.092 INFO cluster: IPFS Cluster is ready cluster.go:526 +13:33:34.044 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59 +13:33:34.044 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmQHKLBXfS7hf8o2acj7FGADoJDLat3UazucbHrgxqisim cluster.go:61 +13:33:34.044 INFO cluster: /ip4/192.168.1.103/tcp/9096/ipfs/QmQHKLBXfS7hf8o2acj7FGADoJDLat3UazucbHrgxqisim cluster.go:61 +13:33:34.044 INFO cluster: starting Consensus and waiting for a leader... consensus.go:163 +13:33:34.047 INFO cluster: PinTracker ready map_pin_tracker.go:71 +13:33:34.047 INFO cluster: waiting for leader raft.go:118 +13:33:34.047 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309 +13:33:34.047 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168 +13:33:35.420 INFO cluster: Raft Leader elected: QmQHKLBXfS7hf8o2acj7FGADoJDLat3UazucbHrgxqisim raft.go:145 +13:33:35.921 INFO cluster: Consensus state is up to date consensus.go:214 +13:33:35.921 INFO cluster: IPFS Cluster is ready cluster.go:191 +13:33:35.921 INFO cluster: Cluster Peers (not including ourselves): cluster.go:192 +13:33:35.921 INFO cluster: - No other peers cluster.go:195 ``` #### Step 1: Add new members to the cluster @@ -174,41 +185,43 @@ Initialize and run cluster in a different node(s): node1> ipfs-cluster-service init ipfs-cluster-service configuration written to /home/user/.ipfs-cluster/service.json node1> ipfs-cluster-service -12:39:24.818 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59 -12:39:24.819 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK cluster.go:61 -12:39:24.820 INFO cluster: /ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK cluster.go:61 -12:39:24.820 INFO cluster: This is a single-node cluster peer_manager.go:141 -12:39:24.850 INFO cluster: starting Consensus and waiting for a leader... consensus.go:124 -12:39:24.876 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168 -12:39:24.876 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309 -12:39:24.876 INFO cluster: PinTracker ready map_pin_tracker.go:71 -12:39:26.877 INFO cluster: Raft Leader elected: QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK raft.go:146 -12:39:26.877 INFO cluster: Consensus state is up to date consensus.go:170 -12:39:26.878 INFO service: IPFS Cluster is ready main.go:184 +13:36:19.313 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59 +13:36:19.313 INFO cluster: /ip4/127.0.0.1/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85 cluster.go:61 +13:36:19.313 INFO cluster: /ip4/192.168.1.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85 cluster.go:61 +13:36:19.313 INFO cluster: starting Consensus and waiting for a leader... consensus.go:163 +13:36:19.316 INFO cluster: REST API: /ip4/127.0.0.1/tcp/7094 rest_api.go:309 +13:36:19.316 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/7095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168 +13:36:19.316 INFO cluster: waiting for leader raft.go:118 +13:36:19.316 INFO cluster: PinTracker ready map_pin_tracker.go:71 +13:36:20.834 INFO cluster: Raft Leader elected: QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85 raft.go:145 +13:36:21.334 INFO cluster: Consensus state is up to date consensus.go:214 +13:36:21.334 INFO cluster: IPFS Cluster is ready cluster.go:191 +13:36:21.334 INFO cluster: Cluster Peers (not including ourselves): cluster.go:192 +13:36:21.334 INFO cluster: - No other peers cluster.go:195 ``` Add them to the original cluster with `ipfs-cluster-ctl peers add `. The command will return the ID information of the newly added member: ``` -node0> ipfs-cluster-ctl peers add /ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK +node0> ipfs-cluster-ctl peers add /ip4/192.168.1.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85 { - "id": "QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK", - "public_key": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDQUN0iWAdbYEfQFAYcORsd0XnBvR9dk1QrJbzyqwDEHebP/wYqjeK73cyzBrpTYzxyd205ZSrpImL1GvVl+iLONMlz0CHsQ2YL0zzYHy55Y+1LhGGZY5R14MqvrjSq8pWo8U9nF8aenXSxhNvVeErnE5voVUU7YTjXSaXYmsK0cT7erKHZooJ16dzL/UmRTYlirMuGcOv/4WmgYX5fikH1mtw1Ln2xew76qxL5MeCu7v7NNugbsachJFiC/0DewxPClS03Nv6TvW2FsN4iis961EoBH7qTI3E1gUS89s7xp2njfgD/hsyk6YUbEEbOJUNihPFJ3Wlx6ogbis3cdX3tAgMBAAE=", + "id": "QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85", + "public_key": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDtjpvI+XKVGT5toXTimtWceONYsf/1bbRMxLt/fCSYJoSeJqj0HUtttCD3dcBv1M2rElIMXDhyLUpkET+AN6otr9lQnbgi0ZaKrtzphR0w6g/0EQZZaxI2scxF4NcwkwUfe5ceEmPFwax1+C00nd2BF+YEEp+VHNyWgXhCxncOGO74p0YdXBrvXkyfTiy/567L3PPX9F9x+HiutBL39CWhx9INmtvdPB2HwshodF6QbfeljdAYCekgIrCQC8mXOVeePmlWgTwoge9yQbuViZwPiKwwo1AplANXFmSv8gagfjKL7Kc0YOqcHwxBsoUskbJjfheDZJzl19iDs9EvUGk5AgMBAAE=", "addresses": [ - "/ip4/127.0.0.1/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK", - "/ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK" + "/ip4/127.0.0.1/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85", + "/ip4/192.168.1.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85" ], "cluster_peers": [ - "/ip4/192.168.1.58/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF" + "/ip4/192.168.123.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85" ], "version": "0.0.1", - "commit": "", + "commit": "83baa5c859b9b17b2deec4f782d1210590025c80", "rpc_protocol_version": "/ipfscluster/0.0.1/rpc", "ipfs": { - "id": "QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY", + "id": "QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewur2n", "addresses": [ - "/ip4/127.0.0.1/tcp/4001/ipfs/QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY", - "/ip4/192.168.1.57/tcp/4001/ipfs/QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY" + "/ip4/127.0.0.1/tcp/4001/ipfs/QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewur2n", + "/ip4/192.168.1.103/tcp/4001/ipfs/QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewur2n" ] } } @@ -223,18 +236,19 @@ shutdown. They can be restarted manually and re-added to the Cluster any time: ``` node0> ipfs-cluster-ctl peers rm QmbGFbZVTF3UAEPK9pBVdwHGdDAYkHYufQwSh4k1i8bbbb -OK +Request succeeded ``` The `node1` is then disconnected and shuts down: ``` -12:41:13.693 WARNI cluster: this peer has been removed from the Cluster and will shutdown itself peer_manager.go:80 -12:41:13.695 INFO cluster: stopping Consensus component consensus.go:231 -12:41:14.695 INFO cluster: shutting down IPFS Cluster cluster.go:135 -12:41:14.696 INFO cluster: stopping Cluster API rest_api.go:327 -12:41:14.696 INFO cluster: stopping IPFS Proxy ipfs_http_connector.go:332 -12:41:14.697 INFO cluster: stopping MapPinTracker map_pin_tracker.go:87 +13:42:50.828 WARNI cluster: this peer has been removed from the Cluster and will shutdown itself in 5 seconds peer_manager.go:48 +13:42:51.828 INFO cluster: stopping Consensus component consensus.go:257 +13:42:55.836 INFO cluster: shutting down IPFS Cluster cluster.go:235 +13:42:55.836 INFO cluster: Saving configuration config.go:283 +13:42:55.837 INFO cluster: stopping Cluster API rest_api.go:327 +13:42:55.837 INFO cluster: stopping IPFS Proxy ipfs_http_connector.go:332 +13:42:55.837 INFO cluster: stopping MapPinTracker map_pin_tracker.go:87 ``` ### Go diff --git a/architecture.md b/architecture.md index 2a496f6f..657fd285 100644 --- a/architecture.md +++ b/architecture.md @@ -70,33 +70,41 @@ This sections explains how some things work in Cluster. * `NewCluster` triggers the Cluster bootstrap. `IPFSConnector`, `State`, `PinTracker` component are provided. These components are up but possibly waiting for a `SetClient(RPCClient)` call. Without having an RPCClient, they are unable to communicate with the other components. * The first step bootstrapping is to create the libp2p `Host`. It is using configuration keys to set up public, private keys, the swarm network etc. -* The `peerManager` is initialized. Peers from the configuration and ourselves are added to the peer list. +* The `peerManager` is initialized. It allows to list known cluster peers and keep components in the loop about changes. * The `RPCServer` (`go-libp2p-gorpc`) is setup, along with an `RPCClient`. The client can communicate to any RPC server using libp2p, or with the local one (shortcutting). The `RPCAPI` object is registered: it implements all the RPC operations supported by cluster and used for inter-component/inter-peer communication. * The `Consensus` component is bootstrapped. It: - * Sets up a new Raft node from scratch, including snapshots, stable datastore (boltDB), log store etc... + * Sets up a new Raft node (with the `cluster_peers` from the configuration) from scratch, including snapshots, stable datastore (boltDB), log store etc... * Initializes `go-libp2p-consensus` components (`Actor` and `LogOpConsensus`) using `go-libp2p-raft` - * Returns a new `Consensus` object while asynchronously waiting for a Raft leader and then also for the current Raft peer to catch up with the latest index of the log. + * Returns a new `Consensus` object while asynchronously waiting for a Raft leader and then also for the current Raft peer to catch up with the latest index of the log when there is anything to catch up with. * Waits for an RPCClient to be set and when it happens it triggers an asynchronous RPC request (`StateSync`) to the local `Cluster` component and reports `Ready()` * The `StateSync` operation (from the main `Cluster` component) makes a diff between the local `MapPinTracker` state and the consensus-maintained state. It triggers asynchronous local RPC requests (`Track` and `Untrack`) to the `MapPinTracker`. * The `MapPinTracker` receives the `Track` requests and checks, pins or unpins items, as well as updating the local status of a pin (see the Pinning section below) * While the consensus is being bootstrapped, `SetClient(RPCClient` is called on all components (tracker, ipfs connector, api and consensus) * The new `Cluster` object is returned. -* Asynchronously, a thread waits for the `Consensus` component to report `Ready()`. When this happens, `Cluster` reports itself `Ready()`. At this moment, all components are up, consensus is working and cluster is ready to perform any operations. The consensus state may still be syncing, or mostly the `MapPinTracker` may still be verifying that pins are there against the daemon, but this does not causes any problems to use cluster. +* Asynchronously, a thread triggers the bootsrap process and then waits for the `Consensus` component to report `Ready()`. +* The bootstrap process uses the configured multiaddresses from the Bootstrap section of the configuration perform a remote RPC `PeerAdd` request (it tries until it works in one of them). +* The remote node then performs a consensus log operation logging the multiaddress and peer ID of the new cluster member. If the log operation is successful, the new member is added to `Raft` (which internally logs it as well). The new node is contacted by Raft, its internal peerset updated. It receives the state from the cluster, including pins (which are synced to the `PinTracker` and other peers, which are handled by the `peerManager`. +* If the bootstrap was successful or not necessary, Cluster reports itself `Ready()`. At this moment, all components are up, consensus is working and cluster is ready to perform any operations. The consensus state may still be syncing, or mostly the `MapPinTracker` may still be verifying that pins are there against the daemon, but this does not causes any problems to use cluster. ### Adding a peer * The `RESTAPI` component receives a `PeerAdd` request on the respective endpoint. It makes a `PeerAdd` RPC request. * The local RPC server receives it and calls `PeerAdd` method in the main `Cluster` component. -* A libp2p connection is opened to the new peer's multiaddress. It should be reachable. We note down the local multiaddress used to reach the new peer. -* A broadcast `PeerManagerAddPeer` request is sent to all peers in the current cluster. It is received and the RPC server calls `peerManager.addPeer`. The `peerManager` is an annex to the main Cluster Component around the peer management functionality (add/remove). -* The `peerManager` adds the new peer to libp2p's peerstore, asks Raft to make if part of its peerset and adds it to the `ClusterPeers` section -of the configuration (which is then saved). -* If the broadcast requests fails somewhere, the operation is aborted, and a `PeerRemove` operation for the new peer is triggered to undo any changes. Otherwise, on success, the local list of ClusterPeers from the configuration, along with the local multiaddress from the connection we noted down are -sent to the new peer (via the RPC `PeerManagerAddFromMultiaddrs` method). -* The new peer's `peerManager` updates the current list of peers, the Raft's peer set and the configuration and has become part of the new cluster. -* The `PeerAdd` method in `Cluster` makes an remote RPC request to the new peers `ID` method. The received ID is used as response to the call. -* The RPC server takes the response and sends it to the `RESTAPI` component, which in turns converts it and responds to the request. +* If there is a connection from the given PID, we use it to determine the true multiaddress (with the right remote IP), in case it was not +provided correctly (usually when using `Join` since it cannot be determined-see below). This allows to correctly let peers join from accross NATs. +* The peer is added to the `peerManager` so we can perform RPC requests to it. +* A remote RPC `RemoteMultiaddressForPeer` is performed to the new peer, so it reports how our multiaddress looks from their side. +* The address of the new peer is then commited to the consensus state. Since this is a new peer, Raft peerstore is also updated. Raft starts +sending updates to the new peer, among them prompting it to update its own peerstore with the list of peers in this cluster. +* The list of cluster peers (plus ourselves with our real multiaddress) is sent to the new cluster in a remote `PeerManagerAddMultiaddrs` RPC request. +* A remote `ID` RPC request is performed and the information about the new node is returned. + +### Joining a cluster + +* Joining means calling `AddPeer` on the cluster we want to join. +* This is used also for bootstrapping. +* Join is disallowed if we are not a single-peer cluster. ### Pinning diff --git a/cluster.go b/cluster.go index a140e664..aae1861a 100644 --- a/cluster.go +++ b/cluster.go @@ -2,9 +2,7 @@ package ipfscluster import ( "context" - "errors" "fmt" - "math/rand" "sync" "time" @@ -23,6 +21,7 @@ import ( type Cluster struct { ctx context.Context + id peer.ID config *Config host host.Host rpcServer *rpc.Server @@ -41,6 +40,8 @@ type Cluster struct { doneCh chan struct{} readyCh chan struct{} wg sync.WaitGroup + + paMux sync.Mutex } // NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host, @@ -61,8 +62,9 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P logger.Infof(" %s/ipfs/%s", addr, host.ID().Pretty()) } - cluster := &Cluster{ + c := &Cluster{ ctx: ctx, + id: host.ID(), config: cfg, host: host, api: api, @@ -74,71 +76,195 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P readyCh: make(chan struct{}, 1), } - // Setup peer manager - pm := newPeerManager(cluster) - cluster.peerManager = pm - err = pm.addFromConfig(cfg) + c.setupPeerManager() + err = c.setupRPC() if err != nil { - cluster.Shutdown() + c.Shutdown() return nil, err } - // Workaround for https://github.com/libp2p/go-libp2p-swarm/issues/15 - cluster.openConns() - - // Setup RPC - rpcServer := rpc.NewServer(host, RPCProtocol) - err = rpcServer.RegisterName("Cluster", &RPCAPI{cluster: cluster}) + err = c.setupConsensus() if err != nil { - cluster.Shutdown() + c.Shutdown() return nil, err } - cluster.rpcServer = rpcServer + c.setupRPCClients() + c.run() + return c, nil +} - // Setup RPC client that components from this peer will use - rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer) - cluster.rpcClient = rpcClient +func (c *Cluster) setupPeerManager() { + pm := newPeerManager(c) + c.peerManager = pm + if len(c.config.ClusterPeers) > 0 { + c.peerManager.addFromMultiaddrs(c.config.ClusterPeers) + } else { + c.peerManager.addFromMultiaddrs(c.config.Bootstrap) + } - // Setup Consensus - consensus, err := NewConsensus(pm.peers(), host, cfg.ConsensusDataFolder, state) +} + +func (c *Cluster) setupRPC() error { + rpcServer := rpc.NewServer(c.host, RPCProtocol) + err := rpcServer.RegisterName("Cluster", &RPCAPI{cluster: c}) + if err != nil { + return err + } + c.rpcServer = rpcServer + rpcClient := rpc.NewClientWithServer(c.host, RPCProtocol, rpcServer) + c.rpcClient = rpcClient + return nil +} + +func (c *Cluster) setupConsensus() error { + var startPeers []peer.ID + if len(c.config.ClusterPeers) > 0 { + startPeers = peersFromMultiaddrs(c.config.ClusterPeers) + } else { + startPeers = peersFromMultiaddrs(c.config.Bootstrap) + } + + consensus, err := NewConsensus( + append(startPeers, c.host.ID()), + c.host, + c.config.ConsensusDataFolder, + c.state) if err != nil { logger.Errorf("error creating consensus: %s", err) - cluster.Shutdown() - return nil, err + return err } - cluster.consensus = consensus + c.consensus = consensus + return nil +} - tracker.SetClient(rpcClient) - ipfs.SetClient(rpcClient) - api.SetClient(rpcClient) - consensus.SetClient(rpcClient) +func (c *Cluster) setupRPCClients() { + c.tracker.SetClient(c.rpcClient) + c.ipfs.SetClient(c.rpcClient) + c.api.SetClient(c.rpcClient) + c.consensus.SetClient(c.rpcClient) +} - cluster.run() - return cluster, nil +func (c *Cluster) stateSyncWatcher() { + stateSyncTicker := time.NewTicker( + time.Duration(c.config.StateSyncSeconds) * time.Second) + for { + select { + case <-stateSyncTicker.C: + c.StateSync() + case <-c.ctx.Done(): + stateSyncTicker.Stop() + return + } + } +} + +// run provides a cancellable context and launches some goroutines +// before signaling readyCh +func (c *Cluster) run() { + c.wg.Add(1) + // cancellable context + go func() { + defer c.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c.ctx = ctx + go c.stateSyncWatcher() + go c.bootstrapAndReady() + <-c.shutdownCh + }() +} + +func (c *Cluster) bootstrapAndReady() { + ok := c.bootstrap() + if !ok { + logger.Error("Bootstrap unsuccessful") + c.Shutdown() + return + } + + // We bootstrapped first because with dirty state consensus + // may have a peerset and not find a leader so we cannot wait + // for it. + timer := time.NewTimer(30 * time.Second) + select { + case <-timer.C: + logger.Error("consensus start timed out") + c.Shutdown() + return + case <-c.consensus.Ready(): + case <-c.ctx.Done(): + return + } + + // Cluster is ready. + c.readyCh <- struct{}{} + logger.Info("IPFS Cluster is ready") + logger.Info("Cluster Peers (not including ourselves):") + peers := c.peerManager.peersAddrs() + if len(peers) == 0 { + logger.Info(" - No other peers") + } + for _, a := range c.peerManager.peersAddrs() { + logger.Infof(" - %s", a) + } +} + +func (c *Cluster) bootstrap() bool { + // Cases in which we do not bootstrap + if len(c.config.Bootstrap) == 0 || len(c.config.ClusterPeers) > 0 { + return true + } + + for _, b := range c.config.Bootstrap { + logger.Infof("Bootstrapping to %s", b) + err := c.Join(b) + if err == nil { + return true + } + logger.Error(err) + } + return false } // Ready returns a channel which signals when this peer is // fully initialized (including consensus). func (c *Cluster) Ready() <-chan struct{} { - return c.consensus.readyCh + return c.readyCh } // Shutdown stops the IPFS cluster components func (c *Cluster) Shutdown() error { c.shutdownLock.Lock() defer c.shutdownLock.Unlock() + if c.shutdown { logger.Warning("Cluster is already shutdown") return nil } logger.Info("shutting down IPFS Cluster") + + if c.config.LeaveOnShutdown { + // best effort + logger.Warning("Attempting to leave Cluster. This may take some seconds") + err := c.consensus.LogRmPeer(c.host.ID()) + if err != nil { + logger.Error("leaving cluster: " + err.Error()) + } else { + time.Sleep(2 * time.Second) + } + c.peerManager.resetPeers() + } + if con := c.consensus; con != nil { if err := con.Shutdown(); err != nil { logger.Errorf("error stopping consensus: %s", err) return err } } + + c.peerManager.savePeers() + if err := c.api.Shutdown(); err != nil { logger.Errorf("error stopping API: %s", err) return err @@ -172,15 +298,14 @@ func (c *Cluster) ID() ID { ipfsID, _ := c.ipfs.ID() var addrs []ma.Multiaddr for _, addr := range c.host.Addrs() { - ipfsAddr, _ := ma.NewMultiaddr("/ipfs/" + c.host.ID().Pretty()) - addrs = append(addrs, addr.Encapsulate(ipfsAddr)) + addrs = append(addrs, multiaddrJoin(addr, c.host.ID())) } return ID{ ID: c.host.ID(), PublicKey: c.host.Peerstore().PubKey(c.host.ID()), Addresses: addrs, - ClusterPeers: c.config.ClusterPeers, + ClusterPeers: c.peerManager.peersAddrs(), Version: Version, Commit: Commit, RPCProtocolVersion: RPCProtocol, @@ -190,18 +315,18 @@ func (c *Cluster) ID() ID { // PeerAdd adds a new peer to this Cluster. // -// The current peer will first attempt to contact the provided -// peer at the given multiaddress. If the connection is successful, -// the new peer, with the given multiaddress will be added to the -// cluster_peers and the configuration saved with the updated set. -// All other Cluster peers will be asked to do the same. -// -// Finally, the list of cluster peers is sent to the new -// peer, which will update its configuration and join the cluster. -// -// PeerAdd will fail if any of the peers is not reachable. +// The new peer must be reachable. It will be added to the +// consensus and will receive the shared state (including the +// list of peers). The new peer should be a single-peer cluster, +// preferable without any relevant state. func (c *Cluster) PeerAdd(addr ma.Multiaddr) (ID, error) { - p, decapAddr, err := multiaddrSplit(addr) + // starting 10 nodes on the same box for testing + // causes deadlock and a global lock here + // seems to help. + c.paMux.Lock() + defer c.paMux.Unlock() + logger.Debugf("peerAdd called with %s", addr) + pid, decapAddr, err := multiaddrSplit(addr) if err != nil { id := ID{ Error: err.Error(), @@ -209,107 +334,131 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (ID, error) { return id, err } - // only add reachable nodes - err = c.host.Connect(c.ctx, peerstore.PeerInfo{ - ID: p, - Addrs: []ma.Multiaddr{decapAddr}, - }) + // Figure out its real address if we have one + remoteAddr := getRemoteMultiaddr(c.host, pid, decapAddr) + + err = c.peerManager.addPeer(remoteAddr) if err != nil { - err = fmt.Errorf("Peer unreachable. Aborting operation: %s", err) - id := ID{ - ID: p, - Error: err.Error(), - } logger.Error(err) + id := ID{ID: pid, Error: err.Error()} return id, err } - // Find which local address we use to connect - conns := c.host.Network().ConnsToPeer(p) - if len(conns) == 0 { - err := errors.New("No connections to peer available") - logger.Error(err) - id := ID{ - ID: p, - Error: err.Error(), - } - - return id, err - } - pidMAddr, _ := ma.NewMultiaddr("/ipfs/" + c.host.ID().Pretty()) - localMAddr := conns[0].LocalMultiaddr().Encapsulate(pidMAddr) - - // Let all peer managers know they need to add this peer - peers := c.peerManager.peers() - replies := make([]peer.ID, len(peers), len(peers)) - errs := c.multiRPC(peers, "Cluster", "PeerManagerAddPeer", - MultiaddrToSerial(addr), copyPIDsToIfaces(replies)) - errorMsgs := "" - for i, err := range errs { - if err != nil { - logger.Error(err) - errorMsgs += fmt.Sprintf("%s: %s\n", - peers[i].Pretty(), - err.Error()) - } - } - if errorMsgs != "" { - logger.Error("There were errors adding peer. Trying to rollback the operation") - c.PeerRemove(p) - id := ID{ - ID: p, - Error: "Error adding peer: " + errorMsgs, - } - return id, errors.New(errorMsgs) - } - - // Inform the peer of the current cluster peers - clusterPeers := MultiaddrsToSerial(c.config.ClusterPeers) - clusterPeers = append(clusterPeers, MultiaddrToSerial(localMAddr)) - err = c.rpcClient.Call( - p, "Cluster", "PeerManagerAddFromMultiaddrs", - clusterPeers, &struct{}{}) + // Figure out our address to that peer. This also + // ensures that it is reachable + var addrSerial MultiaddrSerial + err = c.rpcClient.Call(pid, "Cluster", + "RemoteMultiaddrForPeer", c.host.ID(), &addrSerial) if err != nil { - logger.Errorf("Error sending back the list of peers: %s") - id := ID{ - ID: p, - Error: err.Error(), - } + logger.Error(err) + id := ID{ID: pid, Error: err.Error()} + c.peerManager.rmPeer(pid, false) return id, err } - idSerial := ID{ - ID: p, - }.ToSerial() - err = c.rpcClient.Call( - p, "Cluster", "ID", struct{}{}, &idSerial) - logger.Infof("peer %s has been added to the Cluster", addr) - return idSerial.ToID(), err + // Log the new peer in the log so everyone gets it. + err = c.consensus.LogAddPeer(remoteAddr) + if err != nil { + logger.Error(err) + id := ID{ID: pid, Error: err.Error()} + c.peerManager.rmPeer(pid, false) + return id, err + } + + // Send cluster peers to the new peer. + clusterPeers := append(c.peerManager.peersAddrs(), + addrSerial.ToMultiaddr()) + err = c.rpcClient.Call(pid, + "Cluster", + "PeerManagerAddFromMultiaddrs", + MultiaddrsToSerial(clusterPeers), + &struct{}{}) + if err != nil { + logger.Error(err) + } + + id, err := c.getIDForPeer(pid) + return id, nil } // PeerRemove removes a peer from this Cluster. // // The peer will be removed from the consensus peer set, -// remove all cluster peers from its configuration and -// shut itself down. -func (c *Cluster) PeerRemove(p peer.ID) error { - peers := c.peerManager.peers() - replies := make([]struct{}, len(peers), len(peers)) - errs := c.multiRPC(peers, "Cluster", "PeerManagerRmPeer", - p, copyEmptyStructToIfaces(replies)) - errorMsgs := "" - for i, err := range errs { - if err != nil && peers[i] != p { - logger.Error(err) - errorMsgs += fmt.Sprintf("%s: %s\n", - peers[i].Pretty(), - err.Error()) - } +// it will be shut down after this happens. +func (c *Cluster) PeerRemove(pid peer.ID) error { + if !c.peerManager.isPeer(pid) { + return fmt.Errorf("%s is not a peer", pid.Pretty()) } - if errorMsgs != "" { - return errors.New(errorMsgs) + + err := c.consensus.LogRmPeer(pid) + if err != nil { + logger.Error(err) + return err } - logger.Infof("peer %s has been removed from the Cluster", p.Pretty()) + + // This is a best effort. It may fail + // if that peer is down + err = c.rpcClient.Call(pid, + "Cluster", + "PeerManagerRmPeerShutdown", + pid, + &struct{}{}) + if err != nil { + logger.Error(err) + } + + return nil +} + +// Join adds this peer to an existing cluster. The calling peer should +// be a single-peer cluster node. This is almost equivalent to calling +// PeerAdd on the destination cluster. +func (c *Cluster) Join(addr ma.Multiaddr) error { + logger.Debugf("Join(%s)", addr) + + //if len(c.peerManager.peers()) > 1 { + // logger.Error(c.peerManager.peers()) + // return errors.New("only single-node clusters can be joined") + //} + + pid, _, err := multiaddrSplit(addr) + if err != nil { + logger.Error(err) + return err + } + + // Bootstrap to myself + if pid == c.host.ID() { + return nil + } + + // Add peer to peerstore so we can talk to it + c.peerManager.addPeer(addr) + + // Note that PeerAdd() on the remote peer will + // figure out what our real address is (obviously not + // ClusterAddr). + var myID IDSerial + err = c.rpcClient.Call(pid, + "Cluster", + "PeerAdd", + MultiaddrToSerial(multiaddrJoin(c.config.ClusterAddr, c.host.ID())), + &myID) + if err != nil { + logger.Error(err) + return err + } + + // wait for leader and for state to catch up + // then sync + err = c.consensus.WaitForSync() + if err != nil { + logger.Error(err) + return err + } + c.StateSync() + + logger.Infof("joined %s's cluster", addr) return nil } @@ -326,20 +475,16 @@ func (c *Cluster) StateSync() ([]PinInfo, error) { clusterPins := cState.ListPins() var changed []*cid.Cid + // For the moment we run everything in parallel. + // The PinTracker should probably decide if it can + // pin in parallel or queues everything and does it + // one by one + // Track items which are not tracked for _, h := range clusterPins { if c.tracker.Status(h).Status == TrackerStatusUnpinned { changed = append(changed, h) - err := c.rpcClient.Go("", - "Cluster", - "Track", - NewCidArg(h), - &struct{}{}, - nil) - if err != nil { - return []PinInfo{}, err - } - + go c.tracker.Track(h) } } @@ -348,15 +493,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) { h, _ := cid.Decode(p.CidStr) if !cState.HasPin(h) { changed = append(changed, h) - err := c.rpcClient.Go("", - "Cluster", - "Track", - &CidArg{p.CidStr}, - &struct{}{}, - nil) - if err != nil { - return []PinInfo{}, err - } + go c.tracker.Untrack(h) } } @@ -504,31 +641,6 @@ func (c *Cluster) Peers() []ID { return peers } -// run provides a cancellable context and waits for all components to be ready -// before signaling readyCh -func (c *Cluster) run() { - c.wg.Add(1) - - // Currently we do nothing other than waiting to - // cancel our context. - go func() { - defer c.wg.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c.ctx = ctx - - for { - select { - case <-c.shutdownCh: - return - case <-c.consensus.Ready(): - close(c.readyCh) - logger.Info("IPFS Cluster is ready") - } - } - }() -} - // makeHost makes a libp2p-host func makeHost(ctx context.Context, cfg *Config) (host.Host, error) { ps := peerstore.NewPeerstore() @@ -672,24 +784,14 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) { return infos, nil } -// openConns is a workaround for -// https://github.com/libp2p/go-libp2p-swarm/issues/15 -// which break our tests. -// It runs when consensus is initialized so we can assume -// that the cluster is more or less up. -// It should open connections for peers where they haven't -// yet been opened. By randomly sleeping we reduce the -// chance that peers will open 2 connections simultaneously. -func (c *Cluster) openConns() { - rand.Seed(time.Now().UnixNano()) - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - peers := c.host.Peerstore().Peers() - for _, p := range peers { - peerInfo := c.host.Peerstore().PeerInfo(p) - if p == c.host.ID() { - continue // do not connect to ourselves - } - // ignore any errors here - c.host.Connect(c.ctx, peerInfo) +func (c *Cluster) getIDForPeer(pid peer.ID) (ID, error) { + idSerial := ID{ID: pid}.ToSerial() + err := c.rpcClient.Call( + pid, "Cluster", "ID", struct{}{}, &idSerial) + id := idSerial.ToID() + if err != nil { + logger.Error(err) + id.Error = err.Error() } + return id, err } diff --git a/config.go b/config.go index af7c1cd3..f506e3eb 100644 --- a/config.go +++ b/config.go @@ -14,12 +14,13 @@ import ( // Default parameters for the configuration const ( - DefaultConfigCrypto = crypto.RSA - DefaultConfigKeyLength = 2048 - DefaultAPIAddr = "/ip4/127.0.0.1/tcp/9094" - DefaultIPFSProxyAddr = "/ip4/127.0.0.1/tcp/9095" - DefaultIPFSNodeAddr = "/ip4/127.0.0.1/tcp/5001" - DefaultClusterAddr = "/ip4/0.0.0.0/tcp/9096" + DefaultConfigCrypto = crypto.RSA + DefaultConfigKeyLength = 2048 + DefaultAPIAddr = "/ip4/127.0.0.1/tcp/9094" + DefaultIPFSProxyAddr = "/ip4/127.0.0.1/tcp/9095" + DefaultIPFSNodeAddr = "/ip4/127.0.0.1/tcp/5001" + DefaultClusterAddr = "/ip4/0.0.0.0/tcp/9096" + DefaultStateSyncSeconds = 60 ) // Config represents an ipfs-cluster configuration. It is used by @@ -31,9 +32,21 @@ type Config struct { ID peer.ID PrivateKey crypto.PrivKey - // List of multiaddresses of the peers of this cluster. + // ClusterPeers is the list of peers in the Cluster. They are used + // as the initial peers in the consensus. When bootstrapping a peer, + // ClusterPeers will be filled in automatically for the next run upon + // shutdown. ClusterPeers []ma.Multiaddr - pMux sync.Mutex + + // Bootstrap peers multiaddresses. This peer will attempt to + // join the clusters of the peers in this list after booting. + // Leave empty for a single-peer-cluster. + Bootstrap []ma.Multiaddr + + // Leave Cluster on shutdown. Politely informs other peers + // of the departure and removes itself from the consensus + // peer set. The Cluster size will be reduced by one. + LeaveOnShutdown bool // Listen parameters for the Cluster libp2p Host. Used by // the RPC and Consensus components. @@ -53,9 +66,14 @@ type Config struct { // the Consensus component. ConsensusDataFolder string + // Number of seconds between StateSync() operations + StateSyncSeconds int + // if a config has been loaded from disk, track the path // so it can be saved to the same place. path string + + saveMux sync.Mutex } // JSONConfig represents a Cluster configuration as it will look when it is @@ -67,10 +85,22 @@ type JSONConfig struct { ID string `json:"id"` PrivateKey string `json:"private_key"` - // List of multiaddresses of the peers of this cluster. This list may - // include the multiaddress of this node. + // ClusterPeers is the list of peers' multiaddresses in the Cluster. + // They are used as the initial peers in the consensus. When + // bootstrapping a peer, ClusterPeers will be filled in automatically. ClusterPeers []string `json:"cluster_peers"` + // Bootstrap peers multiaddresses. This peer will attempt to + // join the clusters of the peers in the list. ONLY when ClusterPeers + // is empty. Otherwise it is ignored. Leave empty for a single-peer + // cluster. + Bootstrap []string `json:"bootstrap"` + + // Leave Cluster on shutdown. Politely informs other peers + // of the departure and removes itself from the consensus + // peer set. The Cluster size will be reduced by one. + LeaveOnShutdown bool `json:"leave_on_shutdown"` + // Listen address for the Cluster libp2p host. This is used for // interal RPC and Consensus communications between cluster peers. ClusterListenMultiaddress string `json:"cluster_multiaddress"` @@ -90,6 +120,11 @@ type JSONConfig struct { // Storage folder for snapshots, log store etc. Used by // the Consensus component. ConsensusDataFolder string `json:"consensus_data_folder"` + + // Number of seconds between syncs of the consensus state to the + // tracker state. Normally states are synced anyway, but this helps + // when new nodes are joining the cluster + StateSyncSeconds int `json:"state_sync_seconds"` } // ToJSONConfig converts a Config object to its JSON representation which @@ -107,22 +142,28 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { } pKey := base64.StdEncoding.EncodeToString(pkeyBytes) - cfg.pMux.Lock() clusterPeers := make([]string, len(cfg.ClusterPeers), len(cfg.ClusterPeers)) for i := 0; i < len(cfg.ClusterPeers); i++ { clusterPeers[i] = cfg.ClusterPeers[i].String() } - cfg.pMux.Unlock() + + bootstrap := make([]string, len(cfg.Bootstrap), len(cfg.Bootstrap)) + for i := 0; i < len(cfg.Bootstrap); i++ { + bootstrap[i] = cfg.Bootstrap[i].String() + } j = &JSONConfig{ ID: cfg.ID.Pretty(), PrivateKey: pKey, ClusterPeers: clusterPeers, + Bootstrap: bootstrap, + LeaveOnShutdown: cfg.LeaveOnShutdown, ClusterListenMultiaddress: cfg.ClusterAddr.String(), APIListenMultiaddress: cfg.APIAddr.String(), IPFSProxyListenMultiaddress: cfg.IPFSProxyAddr.String(), IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(), ConsensusDataFolder: cfg.ConsensusDataFolder, + StateSyncSeconds: cfg.StateSyncSeconds, } return } @@ -158,6 +199,17 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { clusterPeers[i] = maddr } + bootstrap := make([]ma.Multiaddr, len(jcfg.Bootstrap)) + for i := 0; i < len(jcfg.Bootstrap); i++ { + maddr, err := ma.NewMultiaddr(jcfg.Bootstrap[i]) + if err != nil { + err = fmt.Errorf("error parsing multiaddress for peer %s: %s", + jcfg.Bootstrap[i], err) + return nil, err + } + bootstrap[i] = maddr + } + clusterAddr, err := ma.NewMultiaddr(jcfg.ClusterListenMultiaddress) if err != nil { err = fmt.Errorf("error parsing cluster_listen_multiaddress: %s", err) @@ -180,15 +232,22 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { return } + if jcfg.StateSyncSeconds <= 0 { + jcfg.StateSyncSeconds = DefaultStateSyncSeconds + } + c = &Config{ ID: id, PrivateKey: pKey, ClusterPeers: clusterPeers, + Bootstrap: bootstrap, + LeaveOnShutdown: jcfg.LeaveOnShutdown, ClusterAddr: clusterAddr, APIAddr: apiAddr, IPFSProxyAddr: ipfsProxyAddr, IPFSNodeAddr: ipfsNodeAddr, ConsensusDataFolder: jcfg.ConsensusDataFolder, + StateSyncSeconds: jcfg.StateSyncSeconds, } return } @@ -217,7 +276,17 @@ func LoadConfig(path string) (*Config, error) { } // Save stores a configuration as a JSON file in the given path. +// If no path is provided, it uses the path the configuration was +// loaded from. func (cfg *Config) Save(path string) error { + cfg.saveMux.Lock() + defer cfg.saveMux.Unlock() + + if path == "" { + path = cfg.path + } + + logger.Info("Saving configuration") jcfg, err := cfg.ToJSONConfig() if err != nil { logger.Error("error generating JSON config") @@ -254,52 +323,13 @@ func NewDefaultConfig() (*Config, error) { ID: pid, PrivateKey: priv, ClusterPeers: []ma.Multiaddr{}, + Bootstrap: []ma.Multiaddr{}, + LeaveOnShutdown: false, ClusterAddr: clusterAddr, APIAddr: apiAddr, IPFSProxyAddr: ipfsProxyAddr, IPFSNodeAddr: ipfsNodeAddr, ConsensusDataFolder: "ipfscluster-data", + StateSyncSeconds: DefaultStateSyncSeconds, }, nil } - -func (cfg *Config) addPeer(addr ma.Multiaddr) { - cfg.pMux.Lock() - defer cfg.pMux.Unlock() - found := false - for _, cpeer := range cfg.ClusterPeers { - if cpeer.Equal(addr) { - found = true - } - } - if !found { - cfg.ClusterPeers = append(cfg.ClusterPeers, addr) - } - logger.Debugf("add: cluster peers are now: %s", cfg.ClusterPeers) -} - -func (cfg *Config) rmPeer(p peer.ID) { - cfg.pMux.Lock() - defer cfg.pMux.Unlock() - foundPos := -1 - for i, addr := range cfg.ClusterPeers { - cp, _, _ := multiaddrSplit(addr) - if cp == p { - foundPos = i - } - } - if foundPos < 0 { - return - } - - // Delete preserving order - copy(cfg.ClusterPeers[foundPos:], cfg.ClusterPeers[foundPos+1:]) - cfg.ClusterPeers[len(cfg.ClusterPeers)-1] = nil // or the zero value of T - cfg.ClusterPeers = cfg.ClusterPeers[:len(cfg.ClusterPeers)-1] - logger.Debugf("rm: cluster peers are now: %s", cfg.ClusterPeers) -} - -func (cfg *Config) emptyPeers() { - cfg.pMux.Lock() - defer cfg.pMux.Unlock() - cfg.ClusterPeers = []ma.Multiaddr{} -} diff --git a/config_test.go b/config_test.go index 8ee7729c..692980f1 100644 --- a/config_test.go +++ b/config_test.go @@ -11,6 +11,7 @@ func testingConfig() *Config { APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002", IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001", ConsensusDataFolder: "./raftFolderFromTests", + LeaveOnShutdown: true, } cfg, _ := jcfg.ToConfig() @@ -85,9 +86,9 @@ func TestConfigToConfig(t *testing.T) { } j, _ = cfg.ToJSONConfig() - j.ClusterPeers = []string{"abc"} + j.Bootstrap = []string{"abc"} _, err = j.ToConfig() if err == nil { - t.Error("expected error parsing cluster_peers") + t.Error("expected error parsing Bootstrap") } } diff --git a/consensus.go b/consensus.go index 9415997c..79df9fe2 100644 --- a/consensus.go +++ b/consensus.go @@ -12,24 +12,31 @@ import ( host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" libp2praft "github.com/libp2p/go-libp2p-raft" + ma "github.com/multiformats/go-multiaddr" ) // Type of pin operation const ( LogOpPin = iota + 1 LogOpUnpin + LogOpAddPeer + LogOpRmPeer ) -// LeaderTimeout specifies how long to wait during initialization -// before failing for not having a leader. -var LeaderTimeout = 120 * time.Second +// LeaderTimeout specifies how long to wait before failing an operation +// because there is no leader +var LeaderTimeout = 15 * time.Second + +// CommitRetries specifies how many times we retry a failed commit until +// we give up +var CommitRetries = 2 type clusterLogOpType int // clusterLogOp represents an operation for the OpLogConsensus system. // It implements the consensus.Op interface. type clusterLogOp struct { - Cid string + Arg string Type clusterLogOpType ctx context.Context rpcClient *rpc.Client @@ -44,15 +51,13 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) panic("received unexpected state type") } - c, err := cid.Decode(op.Cid) - if err != nil { - // Should never be here - panic("could not decode a CID we ourselves encoded") - } - switch op.Type { case LogOpPin: - err := state.AddPin(c) + c, err := cid.Decode(op.Arg) + if err != nil { + panic("could not decode a CID we ourselves encoded") + } + err = state.AddPin(c) if err != nil { goto ROLLBACK } @@ -64,7 +69,11 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) &struct{}{}, nil) case LogOpUnpin: - err := state.RmPin(c) + c, err := cid.Decode(op.Arg) + if err != nil { + panic("could not decode a CID we ourselves encoded") + } + err = state.RmPin(c) if err != nil { goto ROLLBACK } @@ -75,6 +84,28 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) NewCidArg(c), &struct{}{}, nil) + case LogOpAddPeer: + addr, err := ma.NewMultiaddr(op.Arg) + if err != nil { + panic("could not decode a multiaddress we ourselves encoded") + } + op.rpcClient.Call("", + "Cluster", + "PeerManagerAddPeer", + MultiaddrToSerial(addr), + &struct{}{}) + // TODO rebalance ops + case LogOpRmPeer: + pid, err := peer.IDB58Decode(op.Arg) + if err != nil { + panic("could not decode a PID we ourselves encoded") + } + op.rpcClient.Call("", + "Cluster", + "PeerManagerRmPeer", + pid, + &struct{}{}) + // TODO rebalance ops default: logger.Error("unknown clusterLogOp type. Ignoring") } @@ -155,35 +186,52 @@ func (cc *Consensus) run() { cc.ctx = ctx cc.baseOp.ctx = ctx - go func() { - cc.finishBootstrap() - }() + go cc.finishBootstrap() <-cc.shutdownCh }() } +// WaitForSync waits for a leader and for the state to be up to date, then returns. +func (cc *Consensus) WaitForSync() error { + leaderCtx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout) + defer cancel() + err := cc.raft.WaitForLeader(leaderCtx) + if err != nil { + return errors.New("error waiting for leader: " + err.Error()) + } + err = cc.raft.WaitForUpdates(cc.ctx) + if err != nil { + return errors.New("error waiting for consensus updates: " + err.Error()) + } + return nil +} + // waits until there is a consensus leader and syncs the state // to the tracker func (cc *Consensus) finishBootstrap() { - cc.raft.WaitForLeader(cc.ctx) - cc.raft.WaitForUpdates(cc.ctx) + err := cc.WaitForSync() + if err != nil { + return + } logger.Info("Consensus state is up to date") // While rpc is not ready we cannot perform a sync - select { - case <-cc.ctx.Done(): - return - case <-cc.rpcReady: + if cc.rpcClient == nil { + select { + case <-cc.ctx.Done(): + return + case <-cc.rpcReady: + } } - var pInfo []PinInfo - - _, err := cc.State() + st, err := cc.State() + _ = st // only check sync if we have a state // avoid error on new running clusters if err != nil { logger.Debug("skipping state sync: ", err) } else { + var pInfo []PinInfo cc.rpcClient.Go( "", "Cluster", @@ -193,29 +241,9 @@ func (cc *Consensus) finishBootstrap() { nil) } cc.readyCh <- struct{}{} - logger.Debug("consensus ready") // not accurate if we are shutting down + logger.Debug("consensus ready") } -// // raft stores peer add/rm operations. This is how to force a peer set. -// func (cc *Consensus) setPeers() { -// logger.Debug("forcefully setting Raft peers to known set") -// var peersStr []string -// var peers []peer.ID -// err := cc.rpcClient.Call("", -// "Cluster", -// "PeerManagerPeers", -// struct{}{}, -// &peers) -// if err != nil { -// logger.Error(err) -// return -// } -// for _, p := range peers { -// peersStr = append(peersStr, p.Pretty()) -// } -// cc.p2pRaft.raft.SetPeers(peersStr) -// } - // Shutdown stops the component so it will not process any // more updates. The underlying consensus is permanently // shutdown, along with the libp2p transport. @@ -267,9 +295,20 @@ func (cc *Consensus) Ready() <-chan struct{} { return cc.readyCh } -func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { +func (cc *Consensus) op(argi interface{}, t clusterLogOpType) *clusterLogOp { + var arg string + switch argi.(type) { + case *cid.Cid: + arg = argi.(*cid.Cid).String() + case peer.ID: + arg = peer.IDB58Encode(argi.(peer.ID)) + case ma.Multiaddr: + arg = argi.(ma.Multiaddr).String() + default: + panic("bad type") + } return &clusterLogOp{ - Cid: c.String(), + Arg: arg, Type: t, } } @@ -278,7 +317,12 @@ func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) { leader, err := cc.Leader() if err != nil { - return false, err + rctx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout) + defer cancel() + err := cc.raft.WaitForLeader(rctx) + if err != nil { + return false, err + } } if leader == cc.host.ID() { return false, nil @@ -293,64 +337,144 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err return true, err } +func (cc *Consensus) logOpCid(rpcOp string, opType clusterLogOpType, c *cid.Cid) error { + var finalErr error + for i := 0; i < CommitRetries; i++ { + logger.Debugf("Try %d", i) + redirected, err := cc.redirectToLeader(rpcOp, NewCidArg(c)) + if err != nil { + finalErr = err + continue + } + + if redirected { + return nil + } + + // It seems WE are the leader. + + // Create pin operation for the log + op := cc.op(c, opType) + _, err = cc.consensus.CommitOp(op) + if err != nil { + // This means the op did not make it to the log + finalErr = err + time.Sleep(200 * time.Millisecond) + continue + } + finalErr = nil + break + } + if finalErr != nil { + return finalErr + } + + switch opType { + case LogOpPin: + logger.Infof("pin committed to global state: %s", c) + case LogOpUnpin: + logger.Infof("unpin committed to global state: %s", c) + } + return nil +} + // LogPin submits a Cid to the shared state of the cluster. It will forward // the operation to the leader if this is not it. func (cc *Consensus) LogPin(c *cid.Cid) error { - redirected, err := cc.redirectToLeader("ConsensusLogPin", NewCidArg(c)) - if err != nil || redirected { - return err - } - - // It seems WE are the leader. - - // Create pin operation for the log - op := cc.op(c, LogOpPin) - _, err = cc.consensus.CommitOp(op) - if err != nil { - // This means the op did not make it to the log - return err - } - logger.Infof("pin committed to global state: %s", c) - return nil + return cc.logOpCid("ConsensusLogPin", LogOpPin, c) } // LogUnpin removes a Cid from the shared state of the cluster. func (cc *Consensus) LogUnpin(c *cid.Cid) error { - redirected, err := cc.redirectToLeader("ConsensusLogUnpin", NewCidArg(c)) - if err != nil || redirected { - return err - } + return cc.logOpCid("ConsensusLogUnpin", LogOpUnpin, c) +} - // It seems WE are the leader. +// LogAddPeer submits a new peer to the shared state of the cluster. It will +// forward the operation to the leader if this is not it. +func (cc *Consensus) LogAddPeer(addr ma.Multiaddr) error { + var finalErr error + for i := 0; i < CommitRetries; i++ { + logger.Debugf("Try %d", i) + redirected, err := cc.redirectToLeader("ConsensusLogAddPeer", MultiaddrToSerial(addr)) + if err != nil { + finalErr = err + continue + } - // Create unpin operation for the log - op := cc.op(c, LogOpUnpin) - _, err = cc.consensus.CommitOp(op) - if err != nil { - return err + if redirected { + return nil + } + + // It seems WE are the leader. + pid, _, err := multiaddrSplit(addr) + if err != nil { + return err + } + + // Create pin operation for the log + op := cc.op(addr, LogOpAddPeer) + _, err = cc.consensus.CommitOp(op) + if err != nil { + // This means the op did not make it to the log + finalErr = err + time.Sleep(200 * time.Millisecond) + continue + } + err = cc.raft.AddPeer(peer.IDB58Encode(pid)) + if err != nil { + finalErr = err + continue + } + finalErr = nil + break } - logger.Infof("unpin committed to global state: %s", c) + if finalErr != nil { + return finalErr + } + logger.Infof("peer committed to global state: %s", addr) return nil } -// AddPeer attempts to add a peer to the consensus. -func (cc *Consensus) AddPeer(p peer.ID) error { - //redirected, err := cc.redirectToLeader("ConsensusAddPeer", p) - //if err != nil || redirected { - // return err - // } +// LogRmPeer removes a peer from the shared state of the cluster. It will +// forward the operation to the leader if this is not it. +func (cc *Consensus) LogRmPeer(pid peer.ID) error { + var finalErr error + for i := 0; i < CommitRetries; i++ { + logger.Debugf("Try %d", i) + redirected, err := cc.redirectToLeader("ConsensusLogRmPeer", pid) + if err != nil { + finalErr = err + continue + } - return cc.raft.AddPeer(peer.IDB58Encode(p)) -} + if redirected { + return nil + } -// RemovePeer attempts to remove a peer from the consensus. -func (cc *Consensus) RemovePeer(p peer.ID) error { - //redirected, err := cc.redirectToLeader("ConsensusRemovePeer", p) - //if err != nil || redirected { - // return err - //} + // It seems WE are the leader. - return cc.raft.RemovePeer(peer.IDB58Encode(p)) + // Create pin operation for the log + op := cc.op(pid, LogOpRmPeer) + _, err = cc.consensus.CommitOp(op) + if err != nil { + // This means the op did not make it to the log + finalErr = err + continue + } + err = cc.raft.RemovePeer(peer.IDB58Encode(pid)) + if err != nil { + finalErr = err + time.Sleep(200 * time.Millisecond) + continue + } + finalErr = nil + break + } + if finalErr != nil { + return finalErr + } + logger.Infof("peer removed from global state: %s", pid) + return nil } // State retrieves the current consensus State. It may error diff --git a/consensus_test.go b/consensus_test.go index 362abca0..694ac18c 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -12,7 +12,7 @@ import ( func TestApplyToPin(t *testing.T) { op := &clusterLogOp{ - Cid: testCid, + Arg: testCid, Type: LogOpPin, ctx: context.Background(), rpcClient: mockRPCClient(t), @@ -28,7 +28,7 @@ func TestApplyToPin(t *testing.T) { func TestApplyToUnpin(t *testing.T) { op := &clusterLogOp{ - Cid: testCid, + Arg: testCid, Type: LogOpUnpin, ctx: context.Background(), rpcClient: mockRPCClient(t), @@ -52,7 +52,7 @@ func TestApplyToBadState(t *testing.T) { }() op := &clusterLogOp{ - Cid: testCid, + Arg: testCid, Type: LogOpUnpin, ctx: context.Background(), rpcClient: mockRPCClient(t), @@ -70,7 +70,7 @@ func TestApplyToBadCid(t *testing.T) { }() op := &clusterLogOp{ - Cid: "agadfaegf", + Arg: "agadfaegf", Type: LogOpPin, ctx: context.Background(), rpcClient: mockRPCClient(t), diff --git a/debug.go b/debug.go index bdcd67d3..eff42743 100644 --- a/debug.go +++ b/debug.go @@ -3,5 +3,10 @@ package ipfscluster func init() { - SetLogLevel("DEBUG") + l := "DEBUG" + SetFacilityLogLevel("cluster", l) + //SetFacilityLogLevel("raft", l) + //SetFacilityLogLevel("p2p-gorpc", l) + //SetFacilityLogLevel("swarm2", l) + //SetFacilityLogLevel("libp2p-raft", l) } diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 843544dd..0c884a06 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "os" "os/signal" @@ -8,6 +9,7 @@ import ( "path/filepath" logging "github.com/ipfs/go-log" + ma "github.com/multiformats/go-multiaddr" "github.com/urfave/cli" ipfscluster "github.com/ipfs/ipfs-cluster" @@ -59,6 +61,21 @@ initialized with "init" and its default location is For feedback, bug reports or any additional information, visit https://github.com/ipfs/ipfs-cluster. + + +EXAMPLES + +Initial configuration: + +$ ipfs-cluster-service init + +Launch a cluster: + +$ ipfs-cluster-service + +Launch a peer and join existing cluster: + +$ ipfs-cluster-service --bootstrap /ip4/192.168.1.2/tcp/9096/ipfs/QmPSoSaPXpyunaBwHs1rZBKYSqRV4bLRk32VGYLuvdrypL `, programName, programName, @@ -110,7 +127,8 @@ func main() { app := cli.NewApp() app.Name = programName app.Usage = "IPFS Cluster node" - app.UsageText = Description + app.Description = Description + //app.Copyright = "© Protocol Labs, Inc." app.Version = ipfscluster.Version app.Flags = []cli.Flag{ cli.BoolFlag{ @@ -126,16 +144,25 @@ func main() { }, cli.BoolFlag{ Name: "force, f", - Usage: "force configuration overwrite when running 'init'", + Usage: "forcefully proceed with some actions. i.e. overwriting configuration", + }, + cli.StringFlag{ + Name: "bootstrap, j", + Usage: "join a cluster providing an existing peer's `multiaddress`. Overrides the \"bootstrap\" values from the configuration", + }, + cli.BoolFlag{ + Name: "leave, x", + Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"", + Hidden: true, }, cli.BoolFlag{ Name: "debug, d", - Usage: "enable full debug logging", + Usage: "enable full debug logging (very verbose)", }, cli.StringFlag{ Name: "loglevel, l", Value: "info", - Usage: "set the loglevel [critical, error, warning, info, debug]", + Usage: "set the loglevel for cluster only [critical, error, warning, info, debug]", }, } @@ -185,6 +212,22 @@ func run(c *cli.Context) error { cfg, err := loadConfig() checkErr("loading configuration", err) + if a := c.String("bootstrap"); a != "" { + if len(cfg.ClusterPeers) > 0 && !c.Bool("force") { + return errors.New("The configuration provides ClusterPeers. Use -f to ignore and proceed bootstrapping") + } + joinAddr, err := ma.NewMultiaddr(a) + if err != nil { + return fmt.Errorf("error parsing multiaddress: %s", err) + } + cfg.Bootstrap = []ma.Multiaddr{joinAddr} + cfg.ClusterPeers = []ma.Multiaddr{} + } + + if c.Bool("leave") { + cfg.LeaveOnShutdown = true + } + api, err := ipfscluster.NewRESTAPI(cfg) checkErr("creating REST API component", err) @@ -201,35 +244,33 @@ func run(c *cli.Context) error { tracker) checkErr("starting cluster", err) - signalChan := make(chan os.Signal) + signalChan := make(chan os.Signal, 20) signal.Notify(signalChan, os.Interrupt) - for { select { case <-signalChan: err = cluster.Shutdown() checkErr("shutting down cluster", err) - return nil case <-cluster.Done(): return nil case <-cluster.Ready(): - logger.Info("IPFS Cluster is ready") } } } func setupLogging(lvl string) { - logging.SetLogLevel("service", lvl) - logging.SetLogLevel("cluster", lvl) - //logging.SetLogLevel("raft", lvl) + ipfscluster.SetFacilityLogLevel("service", lvl) + ipfscluster.SetFacilityLogLevel("cluster", lvl) + //ipfscluster.SetFacilityLogLevel("raft", lvl) } func setupDebug() { - logging.SetLogLevel("cluster", "debug") - //logging.SetLogLevel("libp2p-raft", "debug") - logging.SetLogLevel("p2p-gorpc", "debug") - //logging.SetLogLevel("swarm2", "debug") - logging.SetLogLevel("raft", "debug") + l := "DEBUG" + ipfscluster.SetFacilityLogLevel("cluster", l) + ipfscluster.SetFacilityLogLevel("raft", l) + ipfscluster.SetFacilityLogLevel("p2p-gorpc", l) + //SetFacilityLogLevel("swarm2", l) + //SetFacilityLogLevel("libp2p-raft", l) } func initConfig(force bool) { diff --git a/ipfscluster.go b/ipfscluster.go index 725c35b6..6c8a98ab 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -155,6 +155,7 @@ type State interface { ListPins() []*cid.Cid // HasPin returns true if the state is holding a Cid HasPin(*cid.Cid) bool + // AddPeer adds a peer to the shared state } // PinTracker represents a component which tracks the status of diff --git a/ipfscluster_test.go b/ipfscluster_test.go index e0da2c31..c5268857 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -26,7 +26,7 @@ var ( //TestClusters* var ( // number of clusters to create - nClusters = 3 + nClusters = 6 // number of pins to pin/unpin/check nPins = 500 @@ -70,12 +70,13 @@ func createComponents(t *testing.T, i int) (*Config, *RESTAPI, *IPFSHTTPConnecto cfg, _ := NewDefaultConfig() cfg.ID = pid cfg.PrivateKey = priv - cfg.ClusterPeers = []ma.Multiaddr{} + cfg.Bootstrap = []ma.Multiaddr{} cfg.ClusterAddr = clusterAddr cfg.APIAddr = apiAddr cfg.IPFSProxyAddr = proxyAddr cfg.IPFSNodeAddr = nodeAddr cfg.ConsensusDataFolder = "./e2eTestRaft/" + pid.Pretty() + cfg.LeaveOnShutdown = false api, err := NewRESTAPI(cfg) checkErr(t, err) @@ -124,6 +125,8 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { cfg.ID.Pretty())) clusterPeers[i] = addr } + + // Set up the cluster using ClusterPeers for i := 0; i < nClusters; i++ { cfgs[i].ClusterPeers = make([]ma.Multiaddr, nClusters, nClusters) for j := 0; j < nClusters; j++ { @@ -131,6 +134,16 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { } } + // Alternative way of starting using bootstrap + // for i := 1; i < nClusters; i++ { + // addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s", + // clusterPort, + // cfgs[0].ID.Pretty())) + + // // Use previous cluster for bootstrapping + // cfgs[i].Bootstrap = []ma.Multiaddr{addr} + // } + var wg sync.WaitGroup for i := 0; i < nClusters; i++ { wg.Add(1) @@ -140,6 +153,12 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { }(i) } wg.Wait() + + // Yet an alternative way using PeerAdd + // for i := 1; i < nClusters; i++ { + // clusters[0].PeerAdd(clusterAddr(clusters[i])) + // } + delay() return clusters, ipfsMocks } @@ -168,7 +187,16 @@ func runF(t *testing.T, clusters []*Cluster, f func(*testing.T, *Cluster)) { } func delay() { - time.Sleep(time.Duration(nClusters) * time.Second) + var d int + if nClusters > 10 { + d = 8 + + } else if nClusters > 5 { + d = 5 + } else { + d = nClusters + } + time.Sleep(time.Duration(d) * time.Second) } func TestClustersVersion(t *testing.T) { diff --git a/logging.go b/logging.go index fa8cd5c9..7b5c16e5 100644 --- a/logging.go +++ b/logging.go @@ -14,8 +14,8 @@ var logger = logging.Logger("cluster") var raftStdLogger = makeRaftLogger() var raftLogger = logging.Logger("raft") -// SetLogLevel sets the level in the logs -func SetLogLevel(l string) { +// SetFacilityLogLevel sets the log level for a given module +func SetFacilityLogLevel(f, l string) { /* CRITICAL Level = iota ERROR @@ -24,11 +24,7 @@ func SetLogLevel(l string) { INFO DEBUG */ - logging.SetLogLevel("cluster", l) - //logging.SetLogLevel("raft", l) - //logging.SetLogLevel("p2p-gorpc", l) - //logging.SetLogLevel("swarm2", l) - //logging.SetLogLevel("libp2p-raft", l) + logging.SetLogLevel(f, l) } // This redirects Raft output to our logger diff --git a/map_state.go b/map_state.go index b7687a93..82464c0e 100644 --- a/map_state.go +++ b/map_state.go @@ -9,21 +9,24 @@ import ( // MapState is a very simple database to store the state of the system // using a Go map. It is thread safe. It implements the State interface. type MapState struct { - mux sync.RWMutex - PinMap map[string]struct{} + pinMux sync.RWMutex + PinMap map[string]struct{} + peerMux sync.RWMutex + PeerMap map[string]string } // NewMapState initializes the internal map and returns a new MapState object. func NewMapState() *MapState { return &MapState{ - PinMap: make(map[string]struct{}), + PinMap: make(map[string]struct{}), + PeerMap: make(map[string]string), } } // AddPin adds a Cid to the internal map. func (st *MapState) AddPin(c *cid.Cid) error { - st.mux.Lock() - defer st.mux.Unlock() + st.pinMux.Lock() + defer st.pinMux.Unlock() var a struct{} st.PinMap[c.String()] = a return nil @@ -31,24 +34,24 @@ func (st *MapState) AddPin(c *cid.Cid) error { // RmPin removes a Cid from the internal map. func (st *MapState) RmPin(c *cid.Cid) error { - st.mux.Lock() - defer st.mux.Unlock() + st.pinMux.Lock() + defer st.pinMux.Unlock() delete(st.PinMap, c.String()) return nil } // HasPin returns true if the Cid belongs to the State. func (st *MapState) HasPin(c *cid.Cid) bool { - st.mux.RLock() - defer st.mux.RUnlock() + st.pinMux.RLock() + defer st.pinMux.RUnlock() _, ok := st.PinMap[c.String()] return ok } // ListPins provides a list of Cids in the State. func (st *MapState) ListPins() []*cid.Cid { - st.mux.RLock() - defer st.mux.RUnlock() + st.pinMux.RLock() + defer st.pinMux.RUnlock() cids := make([]*cid.Cid, 0, len(st.PinMap)) for k := range st.PinMap { c, _ := cid.Decode(k) diff --git a/nodebug.go b/nodebug.go index dbf902a6..9918c7dd 100644 --- a/nodebug.go +++ b/nodebug.go @@ -2,6 +2,11 @@ package ipfscluster +// This is our default logs levels func init() { - SetLogLevel("INFO") + SetFacilityLogLevel("cluster", "INFO") + SetFacilityLogLevel("raft", "ERROR") + SetFacilityLogLevel("p2p-gorpc", "ERROR") + //SetFacilityLogLevel("swarm2", l) + SetFacilityLogLevel("libp2p-raft", "CRITICAL") } diff --git a/peer_manager.go b/peer_manager.go index e2ced91b..e7db6895 100644 --- a/peer_manager.go +++ b/peer_manager.go @@ -1,8 +1,6 @@ package ipfscluster import ( - "os" - "path/filepath" "sync" "time" @@ -11,140 +9,131 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +// peerManager is our own local peerstore type peerManager struct { cluster *Cluster + ps peerstore.Peerstore + self peer.ID - peerSetMux sync.RWMutex - peerSet map[peer.ID]struct{} + peermap map[peer.ID]ma.Multiaddr + m sync.RWMutex } func newPeerManager(c *Cluster) *peerManager { pm := &peerManager{ cluster: c, + ps: c.host.Peerstore(), + self: c.host.ID(), } - pm.resetPeerSet() + pm.resetPeers() return pm } -func (pm *peerManager) addPeer(addr ma.Multiaddr) (peer.ID, error) { +func (pm *peerManager) addPeer(addr ma.Multiaddr) error { logger.Debugf("adding peer %s", addr) - - peerID, decapAddr, err := multiaddrSplit(addr) + pid, decapAddr, err := multiaddrSplit(addr) if err != nil { - return peerID, err + return err + } + pm.ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL) + + if !pm.isPeer(pid) { + logger.Infof("new Cluster peer %s", addr.String()) } - pm.peerSetMux.RLock() - _, ok := pm.peerSet[peerID] - pm.peerSetMux.RUnlock() + pm.m.Lock() + pm.peermap[pid] = addr + pm.m.Unlock() - if ok { - logger.Debugf("%s is already a peer", peerID) - return peerID, nil - } - - pm.peerSetMux.Lock() - pm.peerSet[peerID] = struct{}{} - pm.peerSetMux.Unlock() - pm.cluster.host.Peerstore().AddAddr(peerID, decapAddr, peerstore.PermanentAddrTTL) - pm.cluster.config.addPeer(addr) - if con := pm.cluster.consensus; con != nil { - pm.cluster.consensus.AddPeer(peerID) - } - if path := pm.cluster.config.path; path != "" { - err := pm.cluster.config.Save(path) - if err != nil { - logger.Error(err) - } - } - return peerID, nil + return nil } -func (pm *peerManager) rmPeer(p peer.ID) error { - logger.Debugf("removing peer %s", p.Pretty()) - pm.peerSetMux.RLock() - _, ok := pm.peerSet[p] - pm.peerSetMux.RUnlock() - if !ok { - return nil +func (pm *peerManager) rmPeer(pid peer.ID, selfShutdown bool) error { + logger.Debugf("removing peer %s", pid.Pretty()) + + if pm.isPeer(pid) { + logger.Infof("removing Cluster peer %s", pid.Pretty()) } - pm.peerSetMux.Lock() - delete(pm.peerSet, p) - pm.peerSetMux.Unlock() - pm.cluster.host.Peerstore().ClearAddrs(p) - pm.cluster.config.rmPeer(p) - pm.cluster.consensus.RemovePeer(p) + + pm.m.Lock() + delete(pm.peermap, pid) + pm.m.Unlock() // It's ourselves. This is not very graceful - if p == pm.cluster.host.ID() { - logger.Warning("this peer has been removed from the Cluster and will shutdown itself") - pm.cluster.config.emptyPeers() + if pid == pm.self && selfShutdown { + logger.Warning("this peer has been removed from the Cluster and will shutdown itself in 5 seconds") defer func() { go func() { - time.Sleep(time.Second) + time.Sleep(1 * time.Second) pm.cluster.consensus.Shutdown() - pm.selfShutdown() + pm.resetPeers() + time.Sleep(4 * time.Second) + pm.cluster.Shutdown() }() }() } - if path := pm.cluster.config.path; path != "" { - pm.cluster.config.Save(path) - } - return nil } -func (pm *peerManager) selfShutdown() { - err := pm.cluster.Shutdown() - if err == nil { - // If the shutdown worked correctly - // (including snapshot) we can remove the Raft - // database (which traces peers additions - // and removals). It makes re-start of the peer - // way less confusing for Raft while the state - // kept in the snapshot. - os.Remove(filepath.Join(pm.cluster.config.ConsensusDataFolder, "raft.db")) +func (pm *peerManager) savePeers() { + pm.cluster.config.ClusterPeers = pm.peersAddrs() + pm.cluster.config.Save("") +} + +func (pm *peerManager) resetPeers() { + pm.m.Lock() + pm.peermap = make(map[peer.ID]ma.Multiaddr) + pm.peermap[pm.self] = pm.cluster.config.ClusterAddr + pm.m.Unlock() +} + +func (pm *peerManager) isPeer(p peer.ID) bool { + if p == pm.self { + return true } + + pm.m.RLock() + _, ok := pm.peermap[p] + pm.m.RUnlock() + return ok } -// empty the peerset and add ourselves only -func (pm *peerManager) resetPeerSet() { - pm.peerSetMux.Lock() - defer pm.peerSetMux.Unlock() - pm.peerSet = make(map[peer.ID]struct{}) - pm.peerSet[pm.cluster.host.ID()] = struct{}{} -} - +// peers including ourselves func (pm *peerManager) peers() []peer.ID { - pm.peerSetMux.RLock() - defer pm.peerSetMux.RUnlock() - var pList []peer.ID - for k := range pm.peerSet { - pList = append(pList, k) + pm.m.RLock() + defer pm.m.RUnlock() + var peers []peer.ID + for k := range pm.peermap { + peers = append(peers, k) } - return pList + return peers } -func (pm *peerManager) addFromConfig(cfg *Config) error { - return pm.addFromMultiaddrs(cfg.ClusterPeers) +// cluster peer addresses (NOT including ourselves) +func (pm *peerManager) peersAddrs() []ma.Multiaddr { + pm.m.RLock() + defer pm.m.RUnlock() + var addrs []ma.Multiaddr + for k, addr := range pm.peermap { + if k != pm.self { + addrs = append(addrs, addr) + } + } + return addrs } -func (pm *peerManager) addFromMultiaddrs(mAddrIDs []ma.Multiaddr) error { - pm.resetPeerSet() - pm.cluster.config.emptyPeers() - if len(mAddrIDs) > 0 { - logger.Info("adding Cluster peers:") - } else { - logger.Info("This is a single-node cluster") - } +// func (pm *peerManager) addFromConfig(cfg *Config) error { +// return pm.addFromMultiaddrs(cfg.ClusterPeers) +// } - for _, m := range mAddrIDs { - _, err := pm.addPeer(m) +func (pm *peerManager) addFromMultiaddrs(addrs []ma.Multiaddr) error { + for _, m := range addrs { + err := pm.addPeer(m) if err != nil { + logger.Error(err) return err } - logger.Infof(" - %s", m.String()) } return nil } diff --git a/peer_manager_test.go b/peer_manager_test.go index c737e46b..bef4edab 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -1,8 +1,10 @@ package ipfscluster import ( + "math/rand" "sync" "testing" + "time" cid "github.com/ipfs/go-cid" ma "github.com/multiformats/go-multiaddr" @@ -26,9 +28,7 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { } func clusterAddr(c *Cluster) ma.Multiaddr { - addr := c.config.ClusterAddr - pidAddr, _ := ma.NewMultiaddr("/ipfs/" + c.ID().ID.Pretty()) - return addr.Encapsulate(pidAddr) + return multiaddrJoin(c.config.ClusterAddr, c.ID().ID) } func TestClustersPeerAdd(t *testing.T) { @@ -36,7 +36,7 @@ func TestClustersPeerAdd(t *testing.T) { defer shutdownClusters(t, clusters, mocks) if len(clusters) < 2 { - t.Fatal("need at least 2 nodes for this test") + t.Skip("need at least 2 nodes for this test") } for i := 1; i < len(clusters); i++ { @@ -45,6 +45,7 @@ func TestClustersPeerAdd(t *testing.T) { if err != nil { t.Fatal(err) } + if len(id.ClusterPeers) != i { // ClusterPeers is originally empty and contains nodes as we add them t.Log(id.ClusterPeers) @@ -64,6 +65,7 @@ func TestClustersPeerAdd(t *testing.T) { // check they are tracked by the peer manager if len(ids) != nClusters { + //t.Log(ids) t.Error("added clusters are not part of clusters") } @@ -74,16 +76,21 @@ func TestClustersPeerAdd(t *testing.T) { t.Error("expected 1 pin everywhere") } - // check that its part of the configuration - if len(c.config.ClusterPeers) != nClusters-1 { - t.Error("expected different cluster peers in the configuration") + if len(c.ID().ClusterPeers) != nClusters-1 { + t.Log(c.ID().ClusterPeers) + t.Error("By now cluster peers should reflect all peers") } - for _, peer := range c.config.ClusterPeers { - if peer == nil { - t.Error("something went wrong adding peer to config") - } - } + // // check that its part of the configuration + // if len(c.config.ClusterPeers) != nClusters-1 { + // t.Error("expected different cluster peers in the configuration") + // } + + // for _, peer := range c.config.ClusterPeers { + // if peer == nil { + // t.Error("something went wrong adding peer to config") + // } + // } } runF(t, clusters, f) } @@ -93,7 +100,7 @@ func TestClustersPeerAddBadPeer(t *testing.T) { defer shutdownClusters(t, clusters, mocks) if len(clusters) < 2 { - t.Fatal("need at least 2 nodes for this test") + t.Skip("need at least 2 nodes for this test") } // We add a cluster that has been shutdown @@ -114,7 +121,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { defer shutdownClusters(t, clusters, mocks) if len(clusters) < 3 { - t.Fatal("need at least 3 nodes for this test") + t.Skip("need at least 3 nodes for this test") } _, err := clusters[0].PeerAdd(clusterAddr(clusters[1])) @@ -139,10 +146,15 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { } func TestClustersPeerRemove(t *testing.T) { - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) + clusters, mocks := createClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 2 { + t.Skip("test needs at least 2 clusters") + } p := clusters[1].ID().ID + //t.Logf("remove %s from %s", p.Pretty(), clusters[0].config.ClusterPeers) err := clusters[0].PeerRemove(p) if err != nil { t.Error(err) @@ -154,19 +166,143 @@ func TestClustersPeerRemove(t *testing.T) { if ok { t.Error("removed peer should have exited") } - if len(c.config.ClusterPeers) != 0 { - t.Error("cluster peers should be empty") - } + // if len(c.config.ClusterPeers) != 0 { + // t.Error("cluster peers should be empty") + // } } else { ids := c.Peers() if len(ids) != nClusters-1 { t.Error("should have removed 1 peer") } - if len(c.config.ClusterPeers) != nClusters-2 { - t.Error("should have removed peer from config") - } + // if len(c.config.ClusterPeers) != nClusters-1 { + // t.Log(c.config.ClusterPeers) + // t.Error("should have removed peer from config") + // } } } runF(t, clusters, f) } + +func TestClusterPeerRemoveSelf(t *testing.T) { + clusters, mocks := createClusters(t) + defer shutdownClusters(t, clusters, mocks) + + for i := 0; i < len(clusters); i++ { + err := clusters[i].PeerRemove(clusters[i].ID().ID) + if err != nil { + t.Error(err) + } + time.Sleep(time.Second) + _, more := <-clusters[i].Done() + if more { + t.Error("should be done") + } + } +} + +func TestClustersPeerJoin(t *testing.T) { + clusters, mocks := peerManagerClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 3 { + t.Skip("test needs at least 3 clusters") + } + + for i := 1; i < len(clusters); i++ { + err := clusters[i].Join(clusterAddr(clusters[0])) + if err != nil { + t.Fatal(err) + } + } + hash, _ := cid.Decode(testCid) + clusters[0].Pin(hash) + delay() + + f := func(t *testing.T, c *Cluster) { + peers := c.Peers() + if len(peers) != nClusters { + t.Error("all peers should be connected") + } + pins := c.Pins() + if len(pins) != 1 || !pins[0].Equals(hash) { + t.Error("all peers should have pinned the cid") + } + } + runF(t, clusters, f) +} + +func TestClustersPeerJoinAllAtOnce(t *testing.T) { + clusters, mocks := peerManagerClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 2 { + t.Skip("test needs at least 2 clusters") + } + + f := func(t *testing.T, c *Cluster) { + err := c.Join(clusterAddr(clusters[0])) + if err != nil { + t.Fatal(err) + } + } + runF(t, clusters[1:], f) + + hash, _ := cid.Decode(testCid) + clusters[0].Pin(hash) + delay() + + f2 := func(t *testing.T, c *Cluster) { + peers := c.Peers() + if len(peers) != nClusters { + t.Error("all peers should be connected") + } + pins := c.Pins() + if len(pins) != 1 || !pins[0].Equals(hash) { + t.Error("all peers should have pinned the cid") + } + } + runF(t, clusters, f2) +} + +func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) { + clusters, mocks := peerManagerClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 3 { + t.Skip("test needs at least 3 clusters") + } + + // We have a 2 node cluster and the rest of nodes join + // one of the two seeds randomly + + err := clusters[1].Join(clusterAddr(clusters[0])) + if err != nil { + t.Fatal(err) + } + + f := func(t *testing.T, c *Cluster) { + j := rand.Intn(2) + err := c.Join(clusterAddr(clusters[j])) + if err != nil { + t.Fatal(err) + } + } + runF(t, clusters[2:], f) + + hash, _ := cid.Decode(testCid) + clusters[0].Pin(hash) + delay() + + f2 := func(t *testing.T, c *Cluster) { + peers := c.Peers() + if len(peers) != nClusters { + t.Error("all peers should be connected") + } + pins := c.Pins() + if len(pins) != 1 || !pins[0].Equals(hash) { + t.Error("all peers should have pinned the cid") + } + } + runF(t, clusters, f2) +} diff --git a/raft.go b/raft.go index 1d62ee59..5b650de8 100644 --- a/raft.go +++ b/raft.go @@ -37,6 +37,7 @@ type Raft struct { stableStore hashiraft.StableStore peerstore *libp2praft.Peerstore boltdb *raftboltdb.BoltStore + dataFolder string } func defaultRaftConfig() *hashiraft.Config { @@ -87,14 +88,15 @@ func NewRaft(peers []peer.ID, host host.Host, dataFolder string, fsm hashiraft.F return nil, err } + cfg := defaultRaftConfig() logger.Debug("creating Raft") - r, err := hashiraft.NewRaft(defaultRaftConfig(), fsm, logStore, logStore, snapshots, pstore, transport) + r, err := hashiraft.NewRaft(cfg, fsm, logStore, logStore, snapshots, pstore, transport) if err != nil { logger.Error("initializing raft: ", err) return nil, err } - return &Raft{ + raft := &Raft{ raft: r, transport: transport, snapshotStore: snapshots, @@ -102,22 +104,26 @@ func NewRaft(peers []peer.ID, host host.Host, dataFolder string, fsm hashiraft.F stableStore: logStore, peerstore: pstore, boltdb: logStore, - }, nil + dataFolder: dataFolder, + } + + return raft, nil } -// WaitForLeader holds until Raft says we have a leader -func (r *Raft) WaitForLeader(ctx context.Context) { +// WaitForLeader holds until Raft says we have a leader. +// Returns an error if we don't. +func (r *Raft) WaitForLeader(ctx context.Context) error { // Using Raft observers panics on non-64 architectures. // This is a work around + logger.Info("waiting for leader") if sixtyfour { - r.waitForLeader(ctx) - } else { - r.waitForLeaderLegacy(ctx) + return r.waitForLeader(ctx) } + return r.waitForLeaderLegacy(ctx) } -func (r *Raft) waitForLeader(ctx context.Context) { - obsCh := make(chan hashiraft.Observation) +func (r *Raft) waitForLeader(ctx context.Context) error { + obsCh := make(chan hashiraft.Observation, 1) filter := func(o *hashiraft.Observation) bool { switch o.Data.(type) { case hashiraft.LeaderObservation: @@ -126,29 +132,43 @@ func (r *Raft) waitForLeader(ctx context.Context) { return false } } - observer := hashiraft.NewObserver(obsCh, true, filter) + observer := hashiraft.NewObserver(obsCh, false, filter) r.raft.RegisterObserver(observer) defer r.raft.DeregisterObserver(observer) - select { - case obs := <-obsCh: - leaderObs := obs.Data.(hashiraft.LeaderObservation) - logger.Infof("Raft Leader elected: %s", leaderObs.Leader) - - case <-ctx.Done(): - return + ticker := time.NewTicker(time.Second) + for { + select { + case obs := <-obsCh: + switch obs.Data.(type) { + case hashiraft.LeaderObservation: + leaderObs := obs.Data.(hashiraft.LeaderObservation) + logger.Infof("Raft Leader elected: %s", leaderObs.Leader) + return nil + } + case <-ticker.C: + if l := r.raft.Leader(); l != "" { //we missed or there was no election + logger.Debug("waitForleaderTimer") + logger.Infof("Raft Leader elected: %s", l) + ticker.Stop() + return nil + } + case <-ctx.Done(): + return ctx.Err() + } } } -func (r *Raft) waitForLeaderLegacy(ctx context.Context) { +// 32-bit systems should use this. +func (r *Raft) waitForLeaderLegacy(ctx context.Context) error { for { leader := r.raft.Leader() if leader != "" { logger.Infof("Raft Leader elected: %s", leader) - return + return nil } select { case <-ctx.Done(): - return + return ctx.Err() default: time.Sleep(500 * time.Millisecond) } @@ -156,23 +176,22 @@ func (r *Raft) waitForLeaderLegacy(ctx context.Context) { } // WaitForUpdates holds until Raft has synced to the last index in the log -func (r *Raft) WaitForUpdates(ctx context.Context) { +func (r *Raft) WaitForUpdates(ctx context.Context) error { logger.Debug("Raft state is catching up") for { select { case <-ctx.Done(): - return + return ctx.Err() default: lai := r.raft.AppliedIndex() li := r.raft.LastIndex() logger.Debugf("current Raft index: %d/%d", lai, li) if lai == li { - return + return nil } time.Sleep(500 * time.Millisecond) } - } } @@ -199,28 +218,81 @@ func (r *Raft) Shutdown() error { if err != nil { errMsgs += "could not close boltdb: " + err.Error() } + if errMsgs != "" { return errors.New(errMsgs) } + + // If the shutdown worked correctly + // (including snapshot) we can remove the Raft + // database (which traces peers additions + // and removals). It makes re-start of the peer + // way less confusing for Raft while the state + // can be restored from the snapshot. + //os.Remove(filepath.Join(r.dataFolder, "raft.db")) return nil } // AddPeer adds a peer to Raft func (r *Raft) AddPeer(peer string) error { + if r.hasPeer(peer) { + logger.Debug("skipping raft add as already in peer set") + return nil + } + future := r.raft.AddPeer(peer) err := future.Error() + if err != nil { + logger.Error("raft cannot add peer: ", err) + return err + } + peers, _ := r.peerstore.Peers() + logger.Debugf("raft peerstore: %s", peers) return err } // RemovePeer removes a peer from Raft func (r *Raft) RemovePeer(peer string) error { + if !r.hasPeer(peer) { + return nil + } + future := r.raft.RemovePeer(peer) err := future.Error() + if err != nil { + logger.Error("raft cannot remove peer: ", err) + return err + } + peers, _ := r.peerstore.Peers() + logger.Debugf("raft peerstore: %s", peers) return err } +// func (r *Raft) SetPeers(peers []string) error { +// logger.Debugf("SetPeers(): %s", peers) +// future := r.raft.SetPeers(peers) +// err := future.Error() +// if err != nil { +// logger.Error(err) +// } +// return err +// } + // Leader returns Raft's leader. It may be an empty string if // there is no leader or it is unknown. func (r *Raft) Leader() string { return r.raft.Leader() } + +func (r *Raft) hasPeer(peer string) bool { + found := false + peers, _ := r.peerstore.Peers() + for _, p := range peers { + if p == peer { + found = true + break + } + } + + return found +} diff --git a/rpc_api.go b/rpc_api.go index 0332796e..f23b1867 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -1,8 +1,9 @@ package ipfscluster import ( - cid "github.com/ipfs/go-cid" + "errors" + cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-peer" ) @@ -111,6 +112,13 @@ func (api *RPCAPI) PeerRemove(in peer.ID, out *struct{}) error { return api.cluster.PeerRemove(in) } +// Join runs Cluster.Join(). +func (api *RPCAPI) Join(in MultiaddrSerial, out *struct{}) error { + addr := in.ToMultiaddr() + err := api.cluster.Join(addr) + return err +} + // StatusAll runs Cluster.StatusAll(). func (api *RPCAPI) StatusAll(in struct{}, out *[]GlobalPinInfo) error { pinfo, err := api.cluster.StatusAll() @@ -295,32 +303,58 @@ func (api *RPCAPI) ConsensusLogUnpin(in *CidArg, out *struct{}) error { return api.cluster.consensus.LogUnpin(c) } +// ConsensusLogAddPeer runs Consensus.LogAddPeer(). +func (api *RPCAPI) ConsensusLogAddPeer(in MultiaddrSerial, out *struct{}) error { + addr := in.ToMultiaddr() + return api.cluster.consensus.LogAddPeer(addr) +} + +// ConsensusLogRmPeer runs Consensus.LogRmPeer(). +func (api *RPCAPI) ConsensusLogRmPeer(in peer.ID, out *struct{}) error { + return api.cluster.consensus.LogRmPeer(in) +} + /* Peer Manager methods */ // PeerManagerAddPeer runs peerManager.addPeer(). -func (api *RPCAPI) PeerManagerAddPeer(in MultiaddrSerial, out *peer.ID) error { - mAddr := in.ToMultiaddr() - p, err := api.cluster.peerManager.addPeer(mAddr) - *out = p +func (api *RPCAPI) PeerManagerAddPeer(in MultiaddrSerial, out *struct{}) error { + addr := in.ToMultiaddr() + err := api.cluster.peerManager.addPeer(addr) return err } -// PeerManagerRmPeer runs peerManager.rmPeer(). -func (api *RPCAPI) PeerManagerRmPeer(in peer.ID, out *struct{}) error { - return api.cluster.peerManager.rmPeer(in) -} - // PeerManagerAddFromMultiaddrs runs peerManager.addFromMultiaddrs(). func (api *RPCAPI) PeerManagerAddFromMultiaddrs(in MultiaddrsSerial, out *struct{}) error { - api.cluster.peerManager.addFromMultiaddrs(in.ToMultiaddrs()) - return nil + addrs := in.ToMultiaddrs() + err := api.cluster.peerManager.addFromMultiaddrs(addrs) + return err } -// PeerManagerPeers runs peerManager.peers(). -func (api *RPCAPI) PeerManagerPeers(in struct{}, out *[]peer.ID) error { - peers := api.cluster.peerManager.peers() - *out = peers +// PeerManagerRmPeerShutdown runs peerManager.rmPeer(). +func (api *RPCAPI) PeerManagerRmPeerShutdown(in peer.ID, out *struct{}) error { + return api.cluster.peerManager.rmPeer(in, true) +} + +// PeerManagerRmPeer runs peerManager.rmPeer(). +func (api *RPCAPI) PeerManagerRmPeer(in peer.ID, out *struct{}) error { + return api.cluster.peerManager.rmPeer(in, false) +} + +/* + Other +*/ + +// RemoteMultiaddrForPeer returns the multiaddr of a peer as seen by this peer. +// This is necessary for a peer to figure out which of its multiaddresses the +// peers are seeing (also when crossing NATs). It should be called from +// the peer the IN parameter indicates. +func (api *RPCAPI) RemoteMultiaddrForPeer(in peer.ID, out *MultiaddrSerial) error { + conns := api.cluster.host.Network().ConnsToPeer(in) + if len(conns) == 0 { + return errors.New("no connections to: " + in.Pretty()) + } + *out = MultiaddrToSerial(multiaddrJoin(conns[0].RemoteMultiaddr(), in)) return nil } diff --git a/silent.go b/silent.go index a9ace111..d1fabf36 100644 --- a/silent.go +++ b/silent.go @@ -3,5 +3,10 @@ package ipfscluster func init() { - SetLogLevel("CRITICAL") + l := "CRITICAL" + SetFacilityLogLevel("cluster", l) + SetFacilityLogLevel("raft", l) + SetFacilityLogLevel("p2p-gorpc", l) + SetFacilityLogLevel("swarm2", l) + SetFacilityLogLevel("libp2p-raft", l) } diff --git a/util.go b/util.go index c6ea5d34..0eb2c7d5 100644 --- a/util.go +++ b/util.go @@ -3,18 +3,20 @@ package ipfscluster import ( "fmt" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ) // The copy functions below are used in calls to Cluste.multiRPC() -func copyPIDsToIfaces(in []peer.ID) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) - for i := range in { - ifaces[i] = &in[i] - } - return ifaces -} +// func copyPIDsToIfaces(in []peer.ID) []interface{} { +// ifaces := make([]interface{}, len(in), len(in)) +// for i := range in { +// ifaces[i] = &in[i] +// } +// return ifaces +// } func copyIDSerialsToIfaces(in []IDSerial) []interface{} { ifaces := make([]interface{}, len(in), len(in)) @@ -51,7 +53,7 @@ func copyEmptyStructToIfaces(in []struct{}) []interface{} { func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) { pid, err := addr.ValueForProtocol(ma.P_IPFS) if err != nil { - err = fmt.Errorf("Invalid peer multiaddress: %s: %s", addr, err) + err = fmt.Errorf("invalid peer multiaddress: %s: %s", addr, err) logger.Error(err) return "", nil, err } @@ -61,9 +63,60 @@ func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) { peerID, err := peer.IDB58Decode(pid) if err != nil { - err = fmt.Errorf("Invalid peer ID in multiaddress: %s: %s", pid) + err = fmt.Errorf("invalid peer ID in multiaddress: %s: %s", pid, err) logger.Error(err) return "", nil, err } return peerID, decapAddr, nil } + +func multiaddrJoin(addr ma.Multiaddr, p peer.ID) ma.Multiaddr { + pidAddr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(p)) + // let this break badly + if err != nil { + panic("called multiaddrJoin with bad peer!") + } + return addr.Encapsulate(pidAddr) +} + +func peersFromMultiaddrs(addrs []ma.Multiaddr) []peer.ID { + var pids []peer.ID + for _, addr := range addrs { + pid, _, err := multiaddrSplit(addr) + if err != nil { + continue + } + pids = append(pids, pid) + } + return pids +} + +// // connect to a peer ID. +// func connectToPeer(ctx context.Context, h host.Host, id peer.ID, addr ma.Multiaddr) error { +// err := h.Connect(ctx, peerstore.PeerInfo{ +// ID: id, +// Addrs: []ma.Multiaddr{addr}, +// }) +// return err +// } + +// // return the local multiaddresses used to communicate to a peer. +// func localMultiaddrsTo(h host.Host, pid peer.ID) []ma.Multiaddr { +// var addrs []ma.Multiaddr +// conns := h.Network().ConnsToPeer(pid) +// logger.Debugf("conns to %s are: %s", pid, conns) +// for _, conn := range conns { +// addrs = append(addrs, multiaddrJoin(conn.LocalMultiaddr(), h.ID())) +// } +// return addrs +// } + +// If we have connections open to that PID and they are using a different addr +// then we return the one we are using, otherwise the one provided +func getRemoteMultiaddr(h host.Host, pid peer.ID, addr ma.Multiaddr) ma.Multiaddr { + conns := h.Network().ConnsToPeer(pid) + if len(conns) > 0 { + return multiaddrJoin(conns[0].RemoteMultiaddr(), pid) + } + return multiaddrJoin(addr, pid) +}