Fix #24: Auto-join and auto-leave operations for Cluster

This is the third implementation attempt. This time, rather than
broadcasting PeerAdd/Join requests to the whole cluster, we use the
consensus log to broadcast new peers joining.

This makes it easier to recover from errors and to know who exactly
is member of a cluster and who is not. The consensus is, after all,
meant to agree on things, and the list of cluster peers is something
everyone has to agree on.

Raft itself uses a special log operation to maintain the peer set.

The tests are almost unchanged from the previous attempts so it should
be the same, except it doesn't seem possible to bootstrap a bunch of nodes
at the same time using different bootstrap nodes. It works when using
the same. I'm not sure this worked before either, but the code is
simpler than recursively contacting peers, and scales better for
larger clusters.

Nodes have to be careful about joining clusters while keeping the state
from a different cluster (disjoint logs). This may cause problems with
Raft.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-02-02 23:52:06 +01:00
parent 4e0407ff5c
commit 34fdc329fc
20 changed files with 1256 additions and 609 deletions

114
README.md
View File

@ -81,28 +81,37 @@ $ ipfs-cluster-service -init
The configuration will be placed in `~/.ipfs-cluster/service.json` by default.
You can add the multiaddresses for the other cluster peers the `cluster_peers` variable. For example, here is a valid configuration for a cluster of 4 peers:
You can add the multiaddresses for the other cluster peers the `bootstrap_multiaddresses` variable. For example, here is a valid configuration for a single-peer cluster:
```json
{
"id": "QmSGCzHkz8gC9fNndMtaCZdf9RFtwtbTEEsGo4zkVfcykD",
"private_key" : "<redacted>",
"cluster_peers" : [
"/ip4/192.168.1.2/tcp/9096/ipfs/QmcQ5XvrSQ4DouNkQyQtEoLczbMr6D9bSenGy6WQUCQUBt",
"/ip4/192.168.1.3/tcp/9096/ipfs/QmdFBMf9HMDH3eCWrc1U11YCPenC3Uvy9mZQ2BedTyKTDf",
"/ip4/192.168.1.4/tcp/9096/ipfs/QmYY1ggjoew5eFrvkenTR3F4uWqtkBkmgfJk8g9Qqcwy51"
],
"id": "QmXMhZ53zAoes8TYbKGn3rnm5nfWs5Wdu41Fhhfw9XmM5A",
"private_key": "<redacted>",
"cluster_peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"cluster_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"api_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"ipfs_proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"consensus_data_folder": "/home/user/.ipfs-cluster/data"
"consensus_data_folder": "/home/hector/go/src/github.com/ipfs/ipfs-cluster/ipfs-cluster-service/d1/data",
"state_sync_seconds": 60
}
```
The configuration file should probably be identical among all cluster peers, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it will be removed on boot. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig).
The configuration file should probably be identical among all cluster peers, except for the `id` and `private_key` fields. Once every cluster peer has the configuration in place, you can run `ipfs-cluster-service` to start the cluster.
#### Clusters using `cluster_peers`
The `cluster_peers` configuration variable holds a list of current cluster members. If you know the members of the cluster in advance, or you want to start a cluster fully in parallel, set `cluster_peers` in all configurations so that every peer knows the rest upon boot. Leave `bootstrap` empty (although it will be ignored anyway)
#### Clusters using `bootstrap`
When the `cluster_peers` variable is empty, the multiaddresses `bootstrap` can be used to have a peer join an existing cluster. The peer will contact those addresses (in order) until one of them succeeds in joining it to the cluster. When the peer is shut down, it will save the current cluster peers in the `cluster_peers` configuration variable for future use.
Bootstrap is a convenient method, but more prone to errors than `cluster_peers`. It can be used as well with `ipfs-cluster-service --bootstrap <multiaddress>`. Note that bootstrapping nodes with an old state (or diverging state) from the one running in the cluster may lead to problems with
the consensus, so usually you would want to bootstrap blank nodes.
Once every cluster peer has the configuration in place, you can run `ipfs-cluster-service` to start the cluster.
#### Debugging
@ -153,17 +162,19 @@ Then run cluster:
```
node0> ipfs-cluster-service
12:38:34.470 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59
12:38:34.472 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF cluster.go:61
12:38:34.472 INFO cluster: /ip4/192.168.1.58/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF cluster.go:61
12:38:34.472 INFO cluster: This is a single-node cluster peer_manager.go:141
12:38:34.569 INFO cluster: starting Consensus and waiting for a leader... consensus.go:124
12:38:34.591 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309
12:38:34.591 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168
12:38:34.592 INFO cluster: PinTracker ready map_pin_tracker.go:71
12:38:36.092 INFO cluster: Raft Leader elected: QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF raft.go:146
12:38:36.092 INFO cluster: Consensus state is up to date consensus.go:170
12:38:36.092 INFO cluster: IPFS Cluster is ready cluster.go:526
13:33:34.044 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59
13:33:34.044 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmQHKLBXfS7hf8o2acj7FGADoJDLat3UazucbHrgxqisim cluster.go:61
13:33:34.044 INFO cluster: /ip4/192.168.1.103/tcp/9096/ipfs/QmQHKLBXfS7hf8o2acj7FGADoJDLat3UazucbHrgxqisim cluster.go:61
13:33:34.044 INFO cluster: starting Consensus and waiting for a leader... consensus.go:163
13:33:34.047 INFO cluster: PinTracker ready map_pin_tracker.go:71
13:33:34.047 INFO cluster: waiting for leader raft.go:118
13:33:34.047 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309
13:33:34.047 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168
13:33:35.420 INFO cluster: Raft Leader elected: QmQHKLBXfS7hf8o2acj7FGADoJDLat3UazucbHrgxqisim raft.go:145
13:33:35.921 INFO cluster: Consensus state is up to date consensus.go:214
13:33:35.921 INFO cluster: IPFS Cluster is ready cluster.go:191
13:33:35.921 INFO cluster: Cluster Peers (not including ourselves): cluster.go:192
13:33:35.921 INFO cluster: - No other peers cluster.go:195
```
#### Step 1: Add new members to the cluster
@ -174,41 +185,43 @@ Initialize and run cluster in a different node(s):
node1> ipfs-cluster-service init
ipfs-cluster-service configuration written to /home/user/.ipfs-cluster/service.json
node1> ipfs-cluster-service
12:39:24.818 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59
12:39:24.819 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK cluster.go:61
12:39:24.820 INFO cluster: /ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK cluster.go:61
12:39:24.820 INFO cluster: This is a single-node cluster peer_manager.go:141
12:39:24.850 INFO cluster: starting Consensus and waiting for a leader... consensus.go:124
12:39:24.876 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168
12:39:24.876 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309
12:39:24.876 INFO cluster: PinTracker ready map_pin_tracker.go:71
12:39:26.877 INFO cluster: Raft Leader elected: QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK raft.go:146
12:39:26.877 INFO cluster: Consensus state is up to date consensus.go:170
12:39:26.878 INFO service: IPFS Cluster is ready main.go:184
13:36:19.313 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59
13:36:19.313 INFO cluster: /ip4/127.0.0.1/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85 cluster.go:61
13:36:19.313 INFO cluster: /ip4/192.168.1.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85 cluster.go:61
13:36:19.313 INFO cluster: starting Consensus and waiting for a leader... consensus.go:163
13:36:19.316 INFO cluster: REST API: /ip4/127.0.0.1/tcp/7094 rest_api.go:309
13:36:19.316 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/7095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168
13:36:19.316 INFO cluster: waiting for leader raft.go:118
13:36:19.316 INFO cluster: PinTracker ready map_pin_tracker.go:71
13:36:20.834 INFO cluster: Raft Leader elected: QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85 raft.go:145
13:36:21.334 INFO cluster: Consensus state is up to date consensus.go:214
13:36:21.334 INFO cluster: IPFS Cluster is ready cluster.go:191
13:36:21.334 INFO cluster: Cluster Peers (not including ourselves): cluster.go:192
13:36:21.334 INFO cluster: - No other peers cluster.go:195
```
Add them to the original cluster with `ipfs-cluster-ctl peers add <multiaddr>`. The command will return the ID information of the newly added member:
```
node0> ipfs-cluster-ctl peers add /ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK
node0> ipfs-cluster-ctl peers add /ip4/192.168.1.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85
{
"id": "QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK",
"public_key": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDQUN0iWAdbYEfQFAYcORsd0XnBvR9dk1QrJbzyqwDEHebP/wYqjeK73cyzBrpTYzxyd205ZSrpImL1GvVl+iLONMlz0CHsQ2YL0zzYHy55Y+1LhGGZY5R14MqvrjSq8pWo8U9nF8aenXSxhNvVeErnE5voVUU7YTjXSaXYmsK0cT7erKHZooJ16dzL/UmRTYlirMuGcOv/4WmgYX5fikH1mtw1Ln2xew76qxL5MeCu7v7NNugbsachJFiC/0DewxPClS03Nv6TvW2FsN4iis961EoBH7qTI3E1gUS89s7xp2njfgD/hsyk6YUbEEbOJUNihPFJ3Wlx6ogbis3cdX3tAgMBAAE=",
"id": "QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85",
"public_key": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDtjpvI+XKVGT5toXTimtWceONYsf/1bbRMxLt/fCSYJoSeJqj0HUtttCD3dcBv1M2rElIMXDhyLUpkET+AN6otr9lQnbgi0ZaKrtzphR0w6g/0EQZZaxI2scxF4NcwkwUfe5ceEmPFwax1+C00nd2BF+YEEp+VHNyWgXhCxncOGO74p0YdXBrvXkyfTiy/567L3PPX9F9x+HiutBL39CWhx9INmtvdPB2HwshodF6QbfeljdAYCekgIrCQC8mXOVeePmlWgTwoge9yQbuViZwPiKwwo1AplANXFmSv8gagfjKL7Kc0YOqcHwxBsoUskbJjfheDZJzl19iDs9EvUGk5AgMBAAE=",
"addresses": [
"/ip4/127.0.0.1/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK",
"/ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK"
"/ip4/127.0.0.1/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85",
"/ip4/192.168.1.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85"
],
"cluster_peers": [
"/ip4/192.168.1.58/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF"
"/ip4/192.168.123.103/tcp/7096/ipfs/QmU7JJftGsP1zM8H37XwvfA7dwdepsB7xVnJA6sKpozc85"
],
"version": "0.0.1",
"commit": "",
"commit": "83baa5c859b9b17b2deec4f782d1210590025c80",
"rpc_protocol_version": "/ipfscluster/0.0.1/rpc",
"ipfs": {
"id": "QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY",
"id": "QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewur2n",
"addresses": [
"/ip4/127.0.0.1/tcp/4001/ipfs/QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY",
"/ip4/192.168.1.57/tcp/4001/ipfs/QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY"
"/ip4/127.0.0.1/tcp/4001/ipfs/QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewur2n",
"/ip4/192.168.1.103/tcp/4001/ipfs/QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewur2n"
]
}
}
@ -223,18 +236,19 @@ shutdown. They can be restarted manually and re-added to the Cluster any time:
```
node0> ipfs-cluster-ctl peers rm QmbGFbZVTF3UAEPK9pBVdwHGdDAYkHYufQwSh4k1i8bbbb
OK
Request succeeded
```
The `node1` is then disconnected and shuts down:
```
12:41:13.693 WARNI cluster: this peer has been removed from the Cluster and will shutdown itself peer_manager.go:80
12:41:13.695 INFO cluster: stopping Consensus component consensus.go:231
12:41:14.695 INFO cluster: shutting down IPFS Cluster cluster.go:135
12:41:14.696 INFO cluster: stopping Cluster API rest_api.go:327
12:41:14.696 INFO cluster: stopping IPFS Proxy ipfs_http_connector.go:332
12:41:14.697 INFO cluster: stopping MapPinTracker map_pin_tracker.go:87
13:42:50.828 WARNI cluster: this peer has been removed from the Cluster and will shutdown itself in 5 seconds peer_manager.go:48
13:42:51.828 INFO cluster: stopping Consensus component consensus.go:257
13:42:55.836 INFO cluster: shutting down IPFS Cluster cluster.go:235
13:42:55.836 INFO cluster: Saving configuration config.go:283
13:42:55.837 INFO cluster: stopping Cluster API rest_api.go:327
13:42:55.837 INFO cluster: stopping IPFS Proxy ipfs_http_connector.go:332
13:42:55.837 INFO cluster: stopping MapPinTracker map_pin_tracker.go:87
```
### Go

View File

@ -70,33 +70,41 @@ This sections explains how some things work in Cluster.
* `NewCluster` triggers the Cluster bootstrap. `IPFSConnector`, `State`, `PinTracker` component are provided. These components are up but possibly waiting for a `SetClient(RPCClient)` call. Without having an RPCClient, they are unable to communicate with the other components.
* The first step bootstrapping is to create the libp2p `Host`. It is using configuration keys to set up public, private keys, the swarm network etc.
* The `peerManager` is initialized. Peers from the configuration and ourselves are added to the peer list.
* The `peerManager` is initialized. It allows to list known cluster peers and keep components in the loop about changes.
* The `RPCServer` (`go-libp2p-gorpc`) is setup, along with an `RPCClient`. The client can communicate to any RPC server using libp2p, or with the local one (shortcutting). The `RPCAPI` object is registered: it implements all the RPC operations supported by cluster and used for inter-component/inter-peer communication.
* The `Consensus` component is bootstrapped. It:
* Sets up a new Raft node from scratch, including snapshots, stable datastore (boltDB), log store etc...
* Sets up a new Raft node (with the `cluster_peers` from the configuration) from scratch, including snapshots, stable datastore (boltDB), log store etc...
* Initializes `go-libp2p-consensus` components (`Actor` and `LogOpConsensus`) using `go-libp2p-raft`
* Returns a new `Consensus` object while asynchronously waiting for a Raft leader and then also for the current Raft peer to catch up with the latest index of the log.
* Returns a new `Consensus` object while asynchronously waiting for a Raft leader and then also for the current Raft peer to catch up with the latest index of the log when there is anything to catch up with.
* Waits for an RPCClient to be set and when it happens it triggers an asynchronous RPC request (`StateSync`) to the local `Cluster` component and reports `Ready()`
* The `StateSync` operation (from the main `Cluster` component) makes a diff between the local `MapPinTracker` state and the consensus-maintained state. It triggers asynchronous local RPC requests (`Track` and `Untrack`) to the `MapPinTracker`.
* The `MapPinTracker` receives the `Track` requests and checks, pins or unpins items, as well as updating the local status of a pin (see the Pinning section below)
* While the consensus is being bootstrapped, `SetClient(RPCClient` is called on all components (tracker, ipfs connector, api and consensus)
* The new `Cluster` object is returned.
* Asynchronously, a thread waits for the `Consensus` component to report `Ready()`. When this happens, `Cluster` reports itself `Ready()`. At this moment, all components are up, consensus is working and cluster is ready to perform any operations. The consensus state may still be syncing, or mostly the `MapPinTracker` may still be verifying that pins are there against the daemon, but this does not causes any problems to use cluster.
* Asynchronously, a thread triggers the bootsrap process and then waits for the `Consensus` component to report `Ready()`.
* The bootstrap process uses the configured multiaddresses from the Bootstrap section of the configuration perform a remote RPC `PeerAdd` request (it tries until it works in one of them).
* The remote node then performs a consensus log operation logging the multiaddress and peer ID of the new cluster member. If the log operation is successful, the new member is added to `Raft` (which internally logs it as well). The new node is contacted by Raft, its internal peerset updated. It receives the state from the cluster, including pins (which are synced to the `PinTracker` and other peers, which are handled by the `peerManager`.
* If the bootstrap was successful or not necessary, Cluster reports itself `Ready()`. At this moment, all components are up, consensus is working and cluster is ready to perform any operations. The consensus state may still be syncing, or mostly the `MapPinTracker` may still be verifying that pins are there against the daemon, but this does not causes any problems to use cluster.
### Adding a peer
* The `RESTAPI` component receives a `PeerAdd` request on the respective endpoint. It makes a `PeerAdd` RPC request.
* The local RPC server receives it and calls `PeerAdd` method in the main `Cluster` component.
* A libp2p connection is opened to the new peer's multiaddress. It should be reachable. We note down the local multiaddress used to reach the new peer.
* A broadcast `PeerManagerAddPeer` request is sent to all peers in the current cluster. It is received and the RPC server calls `peerManager.addPeer`. The `peerManager` is an annex to the main Cluster Component around the peer management functionality (add/remove).
* The `peerManager` adds the new peer to libp2p's peerstore, asks Raft to make if part of its peerset and adds it to the `ClusterPeers` section
of the configuration (which is then saved).
* If the broadcast requests fails somewhere, the operation is aborted, and a `PeerRemove` operation for the new peer is triggered to undo any changes. Otherwise, on success, the local list of ClusterPeers from the configuration, along with the local multiaddress from the connection we noted down are
sent to the new peer (via the RPC `PeerManagerAddFromMultiaddrs` method).
* The new peer's `peerManager` updates the current list of peers, the Raft's peer set and the configuration and has become part of the new cluster.
* The `PeerAdd` method in `Cluster` makes an remote RPC request to the new peers `ID` method. The received ID is used as response to the call.
* The RPC server takes the response and sends it to the `RESTAPI` component, which in turns converts it and responds to the request.
* If there is a connection from the given PID, we use it to determine the true multiaddress (with the right remote IP), in case it was not
provided correctly (usually when using `Join` since it cannot be determined-see below). This allows to correctly let peers join from accross NATs.
* The peer is added to the `peerManager` so we can perform RPC requests to it.
* A remote RPC `RemoteMultiaddressForPeer` is performed to the new peer, so it reports how our multiaddress looks from their side.
* The address of the new peer is then commited to the consensus state. Since this is a new peer, Raft peerstore is also updated. Raft starts
sending updates to the new peer, among them prompting it to update its own peerstore with the list of peers in this cluster.
* The list of cluster peers (plus ourselves with our real multiaddress) is sent to the new cluster in a remote `PeerManagerAddMultiaddrs` RPC request.
* A remote `ID` RPC request is performed and the information about the new node is returned.
### Joining a cluster
* Joining means calling `AddPeer` on the cluster we want to join.
* This is used also for bootstrapping.
* Join is disallowed if we are not a single-peer cluster.
### Pinning

View File

@ -2,9 +2,7 @@ package ipfscluster
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
@ -23,6 +21,7 @@ import (
type Cluster struct {
ctx context.Context
id peer.ID
config *Config
host host.Host
rpcServer *rpc.Server
@ -41,6 +40,8 @@ type Cluster struct {
doneCh chan struct{}
readyCh chan struct{}
wg sync.WaitGroup
paMux sync.Mutex
}
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
@ -61,8 +62,9 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
logger.Infof(" %s/ipfs/%s", addr, host.ID().Pretty())
}
cluster := &Cluster{
c := &Cluster{
ctx: ctx,
id: host.ID(),
config: cfg,
host: host,
api: api,
@ -74,71 +76,195 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
readyCh: make(chan struct{}, 1),
}
// Setup peer manager
pm := newPeerManager(cluster)
cluster.peerManager = pm
err = pm.addFromConfig(cfg)
c.setupPeerManager()
err = c.setupRPC()
if err != nil {
cluster.Shutdown()
c.Shutdown()
return nil, err
}
// Workaround for https://github.com/libp2p/go-libp2p-swarm/issues/15
cluster.openConns()
// Setup RPC
rpcServer := rpc.NewServer(host, RPCProtocol)
err = rpcServer.RegisterName("Cluster", &RPCAPI{cluster: cluster})
err = c.setupConsensus()
if err != nil {
cluster.Shutdown()
c.Shutdown()
return nil, err
}
cluster.rpcServer = rpcServer
c.setupRPCClients()
c.run()
return c, nil
}
// Setup RPC client that components from this peer will use
rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer)
cluster.rpcClient = rpcClient
func (c *Cluster) setupPeerManager() {
pm := newPeerManager(c)
c.peerManager = pm
if len(c.config.ClusterPeers) > 0 {
c.peerManager.addFromMultiaddrs(c.config.ClusterPeers)
} else {
c.peerManager.addFromMultiaddrs(c.config.Bootstrap)
}
// Setup Consensus
consensus, err := NewConsensus(pm.peers(), host, cfg.ConsensusDataFolder, state)
}
func (c *Cluster) setupRPC() error {
rpcServer := rpc.NewServer(c.host, RPCProtocol)
err := rpcServer.RegisterName("Cluster", &RPCAPI{cluster: c})
if err != nil {
return err
}
c.rpcServer = rpcServer
rpcClient := rpc.NewClientWithServer(c.host, RPCProtocol, rpcServer)
c.rpcClient = rpcClient
return nil
}
func (c *Cluster) setupConsensus() error {
var startPeers []peer.ID
if len(c.config.ClusterPeers) > 0 {
startPeers = peersFromMultiaddrs(c.config.ClusterPeers)
} else {
startPeers = peersFromMultiaddrs(c.config.Bootstrap)
}
consensus, err := NewConsensus(
append(startPeers, c.host.ID()),
c.host,
c.config.ConsensusDataFolder,
c.state)
if err != nil {
logger.Errorf("error creating consensus: %s", err)
cluster.Shutdown()
return nil, err
return err
}
cluster.consensus = consensus
c.consensus = consensus
return nil
}
tracker.SetClient(rpcClient)
ipfs.SetClient(rpcClient)
api.SetClient(rpcClient)
consensus.SetClient(rpcClient)
func (c *Cluster) setupRPCClients() {
c.tracker.SetClient(c.rpcClient)
c.ipfs.SetClient(c.rpcClient)
c.api.SetClient(c.rpcClient)
c.consensus.SetClient(c.rpcClient)
}
cluster.run()
return cluster, nil
func (c *Cluster) stateSyncWatcher() {
stateSyncTicker := time.NewTicker(
time.Duration(c.config.StateSyncSeconds) * time.Second)
for {
select {
case <-stateSyncTicker.C:
c.StateSync()
case <-c.ctx.Done():
stateSyncTicker.Stop()
return
}
}
}
// run provides a cancellable context and launches some goroutines
// before signaling readyCh
func (c *Cluster) run() {
c.wg.Add(1)
// cancellable context
go func() {
defer c.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.ctx = ctx
go c.stateSyncWatcher()
go c.bootstrapAndReady()
<-c.shutdownCh
}()
}
func (c *Cluster) bootstrapAndReady() {
ok := c.bootstrap()
if !ok {
logger.Error("Bootstrap unsuccessful")
c.Shutdown()
return
}
// We bootstrapped first because with dirty state consensus
// may have a peerset and not find a leader so we cannot wait
// for it.
timer := time.NewTimer(30 * time.Second)
select {
case <-timer.C:
logger.Error("consensus start timed out")
c.Shutdown()
return
case <-c.consensus.Ready():
case <-c.ctx.Done():
return
}
// Cluster is ready.
c.readyCh <- struct{}{}
logger.Info("IPFS Cluster is ready")
logger.Info("Cluster Peers (not including ourselves):")
peers := c.peerManager.peersAddrs()
if len(peers) == 0 {
logger.Info(" - No other peers")
}
for _, a := range c.peerManager.peersAddrs() {
logger.Infof(" - %s", a)
}
}
func (c *Cluster) bootstrap() bool {
// Cases in which we do not bootstrap
if len(c.config.Bootstrap) == 0 || len(c.config.ClusterPeers) > 0 {
return true
}
for _, b := range c.config.Bootstrap {
logger.Infof("Bootstrapping to %s", b)
err := c.Join(b)
if err == nil {
return true
}
logger.Error(err)
}
return false
}
// Ready returns a channel which signals when this peer is
// fully initialized (including consensus).
func (c *Cluster) Ready() <-chan struct{} {
return c.consensus.readyCh
return c.readyCh
}
// Shutdown stops the IPFS cluster components
func (c *Cluster) Shutdown() error {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdown {
logger.Warning("Cluster is already shutdown")
return nil
}
logger.Info("shutting down IPFS Cluster")
if c.config.LeaveOnShutdown {
// best effort
logger.Warning("Attempting to leave Cluster. This may take some seconds")
err := c.consensus.LogRmPeer(c.host.ID())
if err != nil {
logger.Error("leaving cluster: " + err.Error())
} else {
time.Sleep(2 * time.Second)
}
c.peerManager.resetPeers()
}
if con := c.consensus; con != nil {
if err := con.Shutdown(); err != nil {
logger.Errorf("error stopping consensus: %s", err)
return err
}
}
c.peerManager.savePeers()
if err := c.api.Shutdown(); err != nil {
logger.Errorf("error stopping API: %s", err)
return err
@ -172,15 +298,14 @@ func (c *Cluster) ID() ID {
ipfsID, _ := c.ipfs.ID()
var addrs []ma.Multiaddr
for _, addr := range c.host.Addrs() {
ipfsAddr, _ := ma.NewMultiaddr("/ipfs/" + c.host.ID().Pretty())
addrs = append(addrs, addr.Encapsulate(ipfsAddr))
addrs = append(addrs, multiaddrJoin(addr, c.host.ID()))
}
return ID{
ID: c.host.ID(),
PublicKey: c.host.Peerstore().PubKey(c.host.ID()),
Addresses: addrs,
ClusterPeers: c.config.ClusterPeers,
ClusterPeers: c.peerManager.peersAddrs(),
Version: Version,
Commit: Commit,
RPCProtocolVersion: RPCProtocol,
@ -190,18 +315,18 @@ func (c *Cluster) ID() ID {
// PeerAdd adds a new peer to this Cluster.
//
// The current peer will first attempt to contact the provided
// peer at the given multiaddress. If the connection is successful,
// the new peer, with the given multiaddress will be added to the
// cluster_peers and the configuration saved with the updated set.
// All other Cluster peers will be asked to do the same.
//
// Finally, the list of cluster peers is sent to the new
// peer, which will update its configuration and join the cluster.
//
// PeerAdd will fail if any of the peers is not reachable.
// The new peer must be reachable. It will be added to the
// consensus and will receive the shared state (including the
// list of peers). The new peer should be a single-peer cluster,
// preferable without any relevant state.
func (c *Cluster) PeerAdd(addr ma.Multiaddr) (ID, error) {
p, decapAddr, err := multiaddrSplit(addr)
// starting 10 nodes on the same box for testing
// causes deadlock and a global lock here
// seems to help.
c.paMux.Lock()
defer c.paMux.Unlock()
logger.Debugf("peerAdd called with %s", addr)
pid, decapAddr, err := multiaddrSplit(addr)
if err != nil {
id := ID{
Error: err.Error(),
@ -209,107 +334,131 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (ID, error) {
return id, err
}
// only add reachable nodes
err = c.host.Connect(c.ctx, peerstore.PeerInfo{
ID: p,
Addrs: []ma.Multiaddr{decapAddr},
})
if err != nil {
err = fmt.Errorf("Peer unreachable. Aborting operation: %s", err)
id := ID{
ID: p,
Error: err.Error(),
}
logger.Error(err)
return id, err
}
// Figure out its real address if we have one
remoteAddr := getRemoteMultiaddr(c.host, pid, decapAddr)
// Find which local address we use to connect
conns := c.host.Network().ConnsToPeer(p)
if len(conns) == 0 {
err := errors.New("No connections to peer available")
logger.Error(err)
id := ID{
ID: p,
Error: err.Error(),
}
return id, err
}
pidMAddr, _ := ma.NewMultiaddr("/ipfs/" + c.host.ID().Pretty())
localMAddr := conns[0].LocalMultiaddr().Encapsulate(pidMAddr)
// Let all peer managers know they need to add this peer
peers := c.peerManager.peers()
replies := make([]peer.ID, len(peers), len(peers))
errs := c.multiRPC(peers, "Cluster", "PeerManagerAddPeer",
MultiaddrToSerial(addr), copyPIDsToIfaces(replies))
errorMsgs := ""
for i, err := range errs {
err = c.peerManager.addPeer(remoteAddr)
if err != nil {
logger.Error(err)
errorMsgs += fmt.Sprintf("%s: %s\n",
peers[i].Pretty(),
err.Error())
}
}
if errorMsgs != "" {
logger.Error("There were errors adding peer. Trying to rollback the operation")
c.PeerRemove(p)
id := ID{
ID: p,
Error: "Error adding peer: " + errorMsgs,
}
return id, errors.New(errorMsgs)
}
// Inform the peer of the current cluster peers
clusterPeers := MultiaddrsToSerial(c.config.ClusterPeers)
clusterPeers = append(clusterPeers, MultiaddrToSerial(localMAddr))
err = c.rpcClient.Call(
p, "Cluster", "PeerManagerAddFromMultiaddrs",
clusterPeers, &struct{}{})
if err != nil {
logger.Errorf("Error sending back the list of peers: %s")
id := ID{
ID: p,
Error: err.Error(),
}
id := ID{ID: pid, Error: err.Error()}
return id, err
}
idSerial := ID{
ID: p,
}.ToSerial()
err = c.rpcClient.Call(
p, "Cluster", "ID", struct{}{}, &idSerial)
logger.Infof("peer %s has been added to the Cluster", addr)
return idSerial.ToID(), err
// Figure out our address to that peer. This also
// ensures that it is reachable
var addrSerial MultiaddrSerial
err = c.rpcClient.Call(pid, "Cluster",
"RemoteMultiaddrForPeer", c.host.ID(), &addrSerial)
if err != nil {
logger.Error(err)
id := ID{ID: pid, Error: err.Error()}
c.peerManager.rmPeer(pid, false)
return id, err
}
// Log the new peer in the log so everyone gets it.
err = c.consensus.LogAddPeer(remoteAddr)
if err != nil {
logger.Error(err)
id := ID{ID: pid, Error: err.Error()}
c.peerManager.rmPeer(pid, false)
return id, err
}
// Send cluster peers to the new peer.
clusterPeers := append(c.peerManager.peersAddrs(),
addrSerial.ToMultiaddr())
err = c.rpcClient.Call(pid,
"Cluster",
"PeerManagerAddFromMultiaddrs",
MultiaddrsToSerial(clusterPeers),
&struct{}{})
if err != nil {
logger.Error(err)
}
id, err := c.getIDForPeer(pid)
return id, nil
}
// PeerRemove removes a peer from this Cluster.
//
// The peer will be removed from the consensus peer set,
// remove all cluster peers from its configuration and
// shut itself down.
func (c *Cluster) PeerRemove(p peer.ID) error {
peers := c.peerManager.peers()
replies := make([]struct{}, len(peers), len(peers))
errs := c.multiRPC(peers, "Cluster", "PeerManagerRmPeer",
p, copyEmptyStructToIfaces(replies))
errorMsgs := ""
for i, err := range errs {
if err != nil && peers[i] != p {
// it will be shut down after this happens.
func (c *Cluster) PeerRemove(pid peer.ID) error {
if !c.peerManager.isPeer(pid) {
return fmt.Errorf("%s is not a peer", pid.Pretty())
}
err := c.consensus.LogRmPeer(pid)
if err != nil {
logger.Error(err)
errorMsgs += fmt.Sprintf("%s: %s\n",
peers[i].Pretty(),
err.Error())
return err
}
// This is a best effort. It may fail
// if that peer is down
err = c.rpcClient.Call(pid,
"Cluster",
"PeerManagerRmPeerShutdown",
pid,
&struct{}{})
if err != nil {
logger.Error(err)
}
if errorMsgs != "" {
return errors.New(errorMsgs)
return nil
}
// Join adds this peer to an existing cluster. The calling peer should
// be a single-peer cluster node. This is almost equivalent to calling
// PeerAdd on the destination cluster.
func (c *Cluster) Join(addr ma.Multiaddr) error {
logger.Debugf("Join(%s)", addr)
//if len(c.peerManager.peers()) > 1 {
// logger.Error(c.peerManager.peers())
// return errors.New("only single-node clusters can be joined")
//}
pid, _, err := multiaddrSplit(addr)
if err != nil {
logger.Error(err)
return err
}
logger.Infof("peer %s has been removed from the Cluster", p.Pretty())
// Bootstrap to myself
if pid == c.host.ID() {
return nil
}
// Add peer to peerstore so we can talk to it
c.peerManager.addPeer(addr)
// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
// ClusterAddr).
var myID IDSerial
err = c.rpcClient.Call(pid,
"Cluster",
"PeerAdd",
MultiaddrToSerial(multiaddrJoin(c.config.ClusterAddr, c.host.ID())),
&myID)
if err != nil {
logger.Error(err)
return err
}
// wait for leader and for state to catch up
// then sync
err = c.consensus.WaitForSync()
if err != nil {
logger.Error(err)
return err
}
c.StateSync()
logger.Infof("joined %s's cluster", addr)
return nil
}
@ -326,20 +475,16 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
clusterPins := cState.ListPins()
var changed []*cid.Cid
// For the moment we run everything in parallel.
// The PinTracker should probably decide if it can
// pin in parallel or queues everything and does it
// one by one
// Track items which are not tracked
for _, h := range clusterPins {
if c.tracker.Status(h).Status == TrackerStatusUnpinned {
changed = append(changed, h)
err := c.rpcClient.Go("",
"Cluster",
"Track",
NewCidArg(h),
&struct{}{},
nil)
if err != nil {
return []PinInfo{}, err
}
go c.tracker.Track(h)
}
}
@ -348,15 +493,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
h, _ := cid.Decode(p.CidStr)
if !cState.HasPin(h) {
changed = append(changed, h)
err := c.rpcClient.Go("",
"Cluster",
"Track",
&CidArg{p.CidStr},
&struct{}{},
nil)
if err != nil {
return []PinInfo{}, err
}
go c.tracker.Untrack(h)
}
}
@ -504,31 +641,6 @@ func (c *Cluster) Peers() []ID {
return peers
}
// run provides a cancellable context and waits for all components to be ready
// before signaling readyCh
func (c *Cluster) run() {
c.wg.Add(1)
// Currently we do nothing other than waiting to
// cancel our context.
go func() {
defer c.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.ctx = ctx
for {
select {
case <-c.shutdownCh:
return
case <-c.consensus.Ready():
close(c.readyCh)
logger.Info("IPFS Cluster is ready")
}
}
}()
}
// makeHost makes a libp2p-host
func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
ps := peerstore.NewPeerstore()
@ -672,24 +784,14 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
return infos, nil
}
// openConns is a workaround for
// https://github.com/libp2p/go-libp2p-swarm/issues/15
// which break our tests.
// It runs when consensus is initialized so we can assume
// that the cluster is more or less up.
// It should open connections for peers where they haven't
// yet been opened. By randomly sleeping we reduce the
// chance that peers will open 2 connections simultaneously.
func (c *Cluster) openConns() {
rand.Seed(time.Now().UnixNano())
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
peers := c.host.Peerstore().Peers()
for _, p := range peers {
peerInfo := c.host.Peerstore().PeerInfo(p)
if p == c.host.ID() {
continue // do not connect to ourselves
}
// ignore any errors here
c.host.Connect(c.ctx, peerInfo)
func (c *Cluster) getIDForPeer(pid peer.ID) (ID, error) {
idSerial := ID{ID: pid}.ToSerial()
err := c.rpcClient.Call(
pid, "Cluster", "ID", struct{}{}, &idSerial)
id := idSerial.ToID()
if err != nil {
logger.Error(err)
id.Error = err.Error()
}
return id, err
}

126
config.go
View File

@ -20,6 +20,7 @@ const (
DefaultIPFSProxyAddr = "/ip4/127.0.0.1/tcp/9095"
DefaultIPFSNodeAddr = "/ip4/127.0.0.1/tcp/5001"
DefaultClusterAddr = "/ip4/0.0.0.0/tcp/9096"
DefaultStateSyncSeconds = 60
)
// Config represents an ipfs-cluster configuration. It is used by
@ -31,9 +32,21 @@ type Config struct {
ID peer.ID
PrivateKey crypto.PrivKey
// List of multiaddresses of the peers of this cluster.
// ClusterPeers is the list of peers in the Cluster. They are used
// as the initial peers in the consensus. When bootstrapping a peer,
// ClusterPeers will be filled in automatically for the next run upon
// shutdown.
ClusterPeers []ma.Multiaddr
pMux sync.Mutex
// Bootstrap peers multiaddresses. This peer will attempt to
// join the clusters of the peers in this list after booting.
// Leave empty for a single-peer-cluster.
Bootstrap []ma.Multiaddr
// Leave Cluster on shutdown. Politely informs other peers
// of the departure and removes itself from the consensus
// peer set. The Cluster size will be reduced by one.
LeaveOnShutdown bool
// Listen parameters for the Cluster libp2p Host. Used by
// the RPC and Consensus components.
@ -53,9 +66,14 @@ type Config struct {
// the Consensus component.
ConsensusDataFolder string
// Number of seconds between StateSync() operations
StateSyncSeconds int
// if a config has been loaded from disk, track the path
// so it can be saved to the same place.
path string
saveMux sync.Mutex
}
// JSONConfig represents a Cluster configuration as it will look when it is
@ -67,10 +85,22 @@ type JSONConfig struct {
ID string `json:"id"`
PrivateKey string `json:"private_key"`
// List of multiaddresses of the peers of this cluster. This list may
// include the multiaddress of this node.
// ClusterPeers is the list of peers' multiaddresses in the Cluster.
// They are used as the initial peers in the consensus. When
// bootstrapping a peer, ClusterPeers will be filled in automatically.
ClusterPeers []string `json:"cluster_peers"`
// Bootstrap peers multiaddresses. This peer will attempt to
// join the clusters of the peers in the list. ONLY when ClusterPeers
// is empty. Otherwise it is ignored. Leave empty for a single-peer
// cluster.
Bootstrap []string `json:"bootstrap"`
// Leave Cluster on shutdown. Politely informs other peers
// of the departure and removes itself from the consensus
// peer set. The Cluster size will be reduced by one.
LeaveOnShutdown bool `json:"leave_on_shutdown"`
// Listen address for the Cluster libp2p host. This is used for
// interal RPC and Consensus communications between cluster peers.
ClusterListenMultiaddress string `json:"cluster_multiaddress"`
@ -90,6 +120,11 @@ type JSONConfig struct {
// Storage folder for snapshots, log store etc. Used by
// the Consensus component.
ConsensusDataFolder string `json:"consensus_data_folder"`
// Number of seconds between syncs of the consensus state to the
// tracker state. Normally states are synced anyway, but this helps
// when new nodes are joining the cluster
StateSyncSeconds int `json:"state_sync_seconds"`
}
// ToJSONConfig converts a Config object to its JSON representation which
@ -107,22 +142,28 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) {
}
pKey := base64.StdEncoding.EncodeToString(pkeyBytes)
cfg.pMux.Lock()
clusterPeers := make([]string, len(cfg.ClusterPeers), len(cfg.ClusterPeers))
for i := 0; i < len(cfg.ClusterPeers); i++ {
clusterPeers[i] = cfg.ClusterPeers[i].String()
}
cfg.pMux.Unlock()
bootstrap := make([]string, len(cfg.Bootstrap), len(cfg.Bootstrap))
for i := 0; i < len(cfg.Bootstrap); i++ {
bootstrap[i] = cfg.Bootstrap[i].String()
}
j = &JSONConfig{
ID: cfg.ID.Pretty(),
PrivateKey: pKey,
ClusterPeers: clusterPeers,
Bootstrap: bootstrap,
LeaveOnShutdown: cfg.LeaveOnShutdown,
ClusterListenMultiaddress: cfg.ClusterAddr.String(),
APIListenMultiaddress: cfg.APIAddr.String(),
IPFSProxyListenMultiaddress: cfg.IPFSProxyAddr.String(),
IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(),
ConsensusDataFolder: cfg.ConsensusDataFolder,
StateSyncSeconds: cfg.StateSyncSeconds,
}
return
}
@ -158,6 +199,17 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
clusterPeers[i] = maddr
}
bootstrap := make([]ma.Multiaddr, len(jcfg.Bootstrap))
for i := 0; i < len(jcfg.Bootstrap); i++ {
maddr, err := ma.NewMultiaddr(jcfg.Bootstrap[i])
if err != nil {
err = fmt.Errorf("error parsing multiaddress for peer %s: %s",
jcfg.Bootstrap[i], err)
return nil, err
}
bootstrap[i] = maddr
}
clusterAddr, err := ma.NewMultiaddr(jcfg.ClusterListenMultiaddress)
if err != nil {
err = fmt.Errorf("error parsing cluster_listen_multiaddress: %s", err)
@ -180,15 +232,22 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
return
}
if jcfg.StateSyncSeconds <= 0 {
jcfg.StateSyncSeconds = DefaultStateSyncSeconds
}
c = &Config{
ID: id,
PrivateKey: pKey,
ClusterPeers: clusterPeers,
Bootstrap: bootstrap,
LeaveOnShutdown: jcfg.LeaveOnShutdown,
ClusterAddr: clusterAddr,
APIAddr: apiAddr,
IPFSProxyAddr: ipfsProxyAddr,
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: jcfg.ConsensusDataFolder,
StateSyncSeconds: jcfg.StateSyncSeconds,
}
return
}
@ -217,7 +276,17 @@ func LoadConfig(path string) (*Config, error) {
}
// Save stores a configuration as a JSON file in the given path.
// If no path is provided, it uses the path the configuration was
// loaded from.
func (cfg *Config) Save(path string) error {
cfg.saveMux.Lock()
defer cfg.saveMux.Unlock()
if path == "" {
path = cfg.path
}
logger.Info("Saving configuration")
jcfg, err := cfg.ToJSONConfig()
if err != nil {
logger.Error("error generating JSON config")
@ -254,52 +323,13 @@ func NewDefaultConfig() (*Config, error) {
ID: pid,
PrivateKey: priv,
ClusterPeers: []ma.Multiaddr{},
Bootstrap: []ma.Multiaddr{},
LeaveOnShutdown: false,
ClusterAddr: clusterAddr,
APIAddr: apiAddr,
IPFSProxyAddr: ipfsProxyAddr,
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: "ipfscluster-data",
StateSyncSeconds: DefaultStateSyncSeconds,
}, nil
}
func (cfg *Config) addPeer(addr ma.Multiaddr) {
cfg.pMux.Lock()
defer cfg.pMux.Unlock()
found := false
for _, cpeer := range cfg.ClusterPeers {
if cpeer.Equal(addr) {
found = true
}
}
if !found {
cfg.ClusterPeers = append(cfg.ClusterPeers, addr)
}
logger.Debugf("add: cluster peers are now: %s", cfg.ClusterPeers)
}
func (cfg *Config) rmPeer(p peer.ID) {
cfg.pMux.Lock()
defer cfg.pMux.Unlock()
foundPos := -1
for i, addr := range cfg.ClusterPeers {
cp, _, _ := multiaddrSplit(addr)
if cp == p {
foundPos = i
}
}
if foundPos < 0 {
return
}
// Delete preserving order
copy(cfg.ClusterPeers[foundPos:], cfg.ClusterPeers[foundPos+1:])
cfg.ClusterPeers[len(cfg.ClusterPeers)-1] = nil // or the zero value of T
cfg.ClusterPeers = cfg.ClusterPeers[:len(cfg.ClusterPeers)-1]
logger.Debugf("rm: cluster peers are now: %s", cfg.ClusterPeers)
}
func (cfg *Config) emptyPeers() {
cfg.pMux.Lock()
defer cfg.pMux.Unlock()
cfg.ClusterPeers = []ma.Multiaddr{}
}

View File

@ -11,6 +11,7 @@ func testingConfig() *Config {
APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002",
IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001",
ConsensusDataFolder: "./raftFolderFromTests",
LeaveOnShutdown: true,
}
cfg, _ := jcfg.ToConfig()
@ -85,9 +86,9 @@ func TestConfigToConfig(t *testing.T) {
}
j, _ = cfg.ToJSONConfig()
j.ClusterPeers = []string{"abc"}
j.Bootstrap = []string{"abc"}
_, err = j.ToConfig()
if err == nil {
t.Error("expected error parsing cluster_peers")
t.Error("expected error parsing Bootstrap")
}
}

View File

@ -12,24 +12,31 @@ import (
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
libp2praft "github.com/libp2p/go-libp2p-raft"
ma "github.com/multiformats/go-multiaddr"
)
// Type of pin operation
const (
LogOpPin = iota + 1
LogOpUnpin
LogOpAddPeer
LogOpRmPeer
)
// LeaderTimeout specifies how long to wait during initialization
// before failing for not having a leader.
var LeaderTimeout = 120 * time.Second
// LeaderTimeout specifies how long to wait before failing an operation
// because there is no leader
var LeaderTimeout = 15 * time.Second
// CommitRetries specifies how many times we retry a failed commit until
// we give up
var CommitRetries = 2
type clusterLogOpType int
// clusterLogOp represents an operation for the OpLogConsensus system.
// It implements the consensus.Op interface.
type clusterLogOp struct {
Cid string
Arg string
Type clusterLogOpType
ctx context.Context
rpcClient *rpc.Client
@ -44,15 +51,13 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
panic("received unexpected state type")
}
c, err := cid.Decode(op.Cid)
if err != nil {
// Should never be here
panic("could not decode a CID we ourselves encoded")
}
switch op.Type {
case LogOpPin:
err := state.AddPin(c)
c, err := cid.Decode(op.Arg)
if err != nil {
panic("could not decode a CID we ourselves encoded")
}
err = state.AddPin(c)
if err != nil {
goto ROLLBACK
}
@ -64,7 +69,11 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
&struct{}{},
nil)
case LogOpUnpin:
err := state.RmPin(c)
c, err := cid.Decode(op.Arg)
if err != nil {
panic("could not decode a CID we ourselves encoded")
}
err = state.RmPin(c)
if err != nil {
goto ROLLBACK
}
@ -75,6 +84,28 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
NewCidArg(c),
&struct{}{},
nil)
case LogOpAddPeer:
addr, err := ma.NewMultiaddr(op.Arg)
if err != nil {
panic("could not decode a multiaddress we ourselves encoded")
}
op.rpcClient.Call("",
"Cluster",
"PeerManagerAddPeer",
MultiaddrToSerial(addr),
&struct{}{})
// TODO rebalance ops
case LogOpRmPeer:
pid, err := peer.IDB58Decode(op.Arg)
if err != nil {
panic("could not decode a PID we ourselves encoded")
}
op.rpcClient.Call("",
"Cluster",
"PeerManagerRmPeer",
pid,
&struct{}{})
// TODO rebalance ops
default:
logger.Error("unknown clusterLogOp type. Ignoring")
}
@ -155,35 +186,52 @@ func (cc *Consensus) run() {
cc.ctx = ctx
cc.baseOp.ctx = ctx
go func() {
cc.finishBootstrap()
}()
go cc.finishBootstrap()
<-cc.shutdownCh
}()
}
// WaitForSync waits for a leader and for the state to be up to date, then returns.
func (cc *Consensus) WaitForSync() error {
leaderCtx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout)
defer cancel()
err := cc.raft.WaitForLeader(leaderCtx)
if err != nil {
return errors.New("error waiting for leader: " + err.Error())
}
err = cc.raft.WaitForUpdates(cc.ctx)
if err != nil {
return errors.New("error waiting for consensus updates: " + err.Error())
}
return nil
}
// waits until there is a consensus leader and syncs the state
// to the tracker
func (cc *Consensus) finishBootstrap() {
cc.raft.WaitForLeader(cc.ctx)
cc.raft.WaitForUpdates(cc.ctx)
err := cc.WaitForSync()
if err != nil {
return
}
logger.Info("Consensus state is up to date")
// While rpc is not ready we cannot perform a sync
if cc.rpcClient == nil {
select {
case <-cc.ctx.Done():
return
case <-cc.rpcReady:
}
}
var pInfo []PinInfo
_, err := cc.State()
st, err := cc.State()
_ = st
// only check sync if we have a state
// avoid error on new running clusters
if err != nil {
logger.Debug("skipping state sync: ", err)
} else {
var pInfo []PinInfo
cc.rpcClient.Go(
"",
"Cluster",
@ -193,29 +241,9 @@ func (cc *Consensus) finishBootstrap() {
nil)
}
cc.readyCh <- struct{}{}
logger.Debug("consensus ready") // not accurate if we are shutting down
logger.Debug("consensus ready")
}
// // raft stores peer add/rm operations. This is how to force a peer set.
// func (cc *Consensus) setPeers() {
// logger.Debug("forcefully setting Raft peers to known set")
// var peersStr []string
// var peers []peer.ID
// err := cc.rpcClient.Call("",
// "Cluster",
// "PeerManagerPeers",
// struct{}{},
// &peers)
// if err != nil {
// logger.Error(err)
// return
// }
// for _, p := range peers {
// peersStr = append(peersStr, p.Pretty())
// }
// cc.p2pRaft.raft.SetPeers(peersStr)
// }
// Shutdown stops the component so it will not process any
// more updates. The underlying consensus is permanently
// shutdown, along with the libp2p transport.
@ -267,9 +295,20 @@ func (cc *Consensus) Ready() <-chan struct{} {
return cc.readyCh
}
func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp {
func (cc *Consensus) op(argi interface{}, t clusterLogOpType) *clusterLogOp {
var arg string
switch argi.(type) {
case *cid.Cid:
arg = argi.(*cid.Cid).String()
case peer.ID:
arg = peer.IDB58Encode(argi.(peer.ID))
case ma.Multiaddr:
arg = argi.(ma.Multiaddr).String()
default:
panic("bad type")
}
return &clusterLogOp{
Cid: c.String(),
Arg: arg,
Type: t,
}
}
@ -277,9 +316,14 @@ func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp {
// returns true if the operation was redirected to the leader
func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) {
leader, err := cc.Leader()
if err != nil {
rctx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout)
defer cancel()
err := cc.raft.WaitForLeader(rctx)
if err != nil {
return false, err
}
}
if leader == cc.host.ID() {
return false, nil
}
@ -293,66 +337,146 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err
return true, err
}
// LogPin submits a Cid to the shared state of the cluster. It will forward
// the operation to the leader if this is not it.
func (cc *Consensus) LogPin(c *cid.Cid) error {
redirected, err := cc.redirectToLeader("ConsensusLogPin", NewCidArg(c))
if err != nil || redirected {
return err
func (cc *Consensus) logOpCid(rpcOp string, opType clusterLogOpType, c *cid.Cid) error {
var finalErr error
for i := 0; i < CommitRetries; i++ {
logger.Debugf("Try %d", i)
redirected, err := cc.redirectToLeader(rpcOp, NewCidArg(c))
if err != nil {
finalErr = err
continue
}
if redirected {
return nil
}
// It seems WE are the leader.
// Create pin operation for the log
op := cc.op(c, LogOpPin)
op := cc.op(c, opType)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log
return err
finalErr = err
time.Sleep(200 * time.Millisecond)
continue
}
finalErr = nil
break
}
if finalErr != nil {
return finalErr
}
switch opType {
case LogOpPin:
logger.Infof("pin committed to global state: %s", c)
case LogOpUnpin:
logger.Infof("unpin committed to global state: %s", c)
}
return nil
}
// LogPin submits a Cid to the shared state of the cluster. It will forward
// the operation to the leader if this is not it.
func (cc *Consensus) LogPin(c *cid.Cid) error {
return cc.logOpCid("ConsensusLogPin", LogOpPin, c)
}
// LogUnpin removes a Cid from the shared state of the cluster.
func (cc *Consensus) LogUnpin(c *cid.Cid) error {
redirected, err := cc.redirectToLeader("ConsensusLogUnpin", NewCidArg(c))
if err != nil || redirected {
return cc.logOpCid("ConsensusLogUnpin", LogOpUnpin, c)
}
// LogAddPeer submits a new peer to the shared state of the cluster. It will
// forward the operation to the leader if this is not it.
func (cc *Consensus) LogAddPeer(addr ma.Multiaddr) error {
var finalErr error
for i := 0; i < CommitRetries; i++ {
logger.Debugf("Try %d", i)
redirected, err := cc.redirectToLeader("ConsensusLogAddPeer", MultiaddrToSerial(addr))
if err != nil {
finalErr = err
continue
}
if redirected {
return nil
}
// It seems WE are the leader.
pid, _, err := multiaddrSplit(addr)
if err != nil {
return err
}
// Create pin operation for the log
op := cc.op(addr, LogOpAddPeer)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log
finalErr = err
time.Sleep(200 * time.Millisecond)
continue
}
err = cc.raft.AddPeer(peer.IDB58Encode(pid))
if err != nil {
finalErr = err
continue
}
finalErr = nil
break
}
if finalErr != nil {
return finalErr
}
logger.Infof("peer committed to global state: %s", addr)
return nil
}
// LogRmPeer removes a peer from the shared state of the cluster. It will
// forward the operation to the leader if this is not it.
func (cc *Consensus) LogRmPeer(pid peer.ID) error {
var finalErr error
for i := 0; i < CommitRetries; i++ {
logger.Debugf("Try %d", i)
redirected, err := cc.redirectToLeader("ConsensusLogRmPeer", pid)
if err != nil {
finalErr = err
continue
}
if redirected {
return nil
}
// It seems WE are the leader.
// Create unpin operation for the log
op := cc.op(c, LogOpUnpin)
// Create pin operation for the log
op := cc.op(pid, LogOpRmPeer)
_, err = cc.consensus.CommitOp(op)
if err != nil {
return err
// This means the op did not make it to the log
finalErr = err
continue
}
logger.Infof("unpin committed to global state: %s", c)
err = cc.raft.RemovePeer(peer.IDB58Encode(pid))
if err != nil {
finalErr = err
time.Sleep(200 * time.Millisecond)
continue
}
finalErr = nil
break
}
if finalErr != nil {
return finalErr
}
logger.Infof("peer removed from global state: %s", pid)
return nil
}
// AddPeer attempts to add a peer to the consensus.
func (cc *Consensus) AddPeer(p peer.ID) error {
//redirected, err := cc.redirectToLeader("ConsensusAddPeer", p)
//if err != nil || redirected {
// return err
// }
return cc.raft.AddPeer(peer.IDB58Encode(p))
}
// RemovePeer attempts to remove a peer from the consensus.
func (cc *Consensus) RemovePeer(p peer.ID) error {
//redirected, err := cc.redirectToLeader("ConsensusRemovePeer", p)
//if err != nil || redirected {
// return err
//}
return cc.raft.RemovePeer(peer.IDB58Encode(p))
}
// State retrieves the current consensus State. It may error
// if no State has been agreed upon or the state is not
// consistent. The returned State is the last agreed-upon

View File

@ -12,7 +12,7 @@ import (
func TestApplyToPin(t *testing.T) {
op := &clusterLogOp{
Cid: testCid,
Arg: testCid,
Type: LogOpPin,
ctx: context.Background(),
rpcClient: mockRPCClient(t),
@ -28,7 +28,7 @@ func TestApplyToPin(t *testing.T) {
func TestApplyToUnpin(t *testing.T) {
op := &clusterLogOp{
Cid: testCid,
Arg: testCid,
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: mockRPCClient(t),
@ -52,7 +52,7 @@ func TestApplyToBadState(t *testing.T) {
}()
op := &clusterLogOp{
Cid: testCid,
Arg: testCid,
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: mockRPCClient(t),
@ -70,7 +70,7 @@ func TestApplyToBadCid(t *testing.T) {
}()
op := &clusterLogOp{
Cid: "agadfaegf",
Arg: "agadfaegf",
Type: LogOpPin,
ctx: context.Background(),
rpcClient: mockRPCClient(t),

View File

@ -3,5 +3,10 @@
package ipfscluster
func init() {
SetLogLevel("DEBUG")
l := "DEBUG"
SetFacilityLogLevel("cluster", l)
//SetFacilityLogLevel("raft", l)
//SetFacilityLogLevel("p2p-gorpc", l)
//SetFacilityLogLevel("swarm2", l)
//SetFacilityLogLevel("libp2p-raft", l)
}

View File

@ -1,6 +1,7 @@
package main
import (
"errors"
"fmt"
"os"
"os/signal"
@ -8,6 +9,7 @@ import (
"path/filepath"
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli"
ipfscluster "github.com/ipfs/ipfs-cluster"
@ -59,6 +61,21 @@ initialized with "init" and its default location is
For feedback, bug reports or any additional information, visit
https://github.com/ipfs/ipfs-cluster.
EXAMPLES
Initial configuration:
$ ipfs-cluster-service init
Launch a cluster:
$ ipfs-cluster-service
Launch a peer and join existing cluster:
$ ipfs-cluster-service --bootstrap /ip4/192.168.1.2/tcp/9096/ipfs/QmPSoSaPXpyunaBwHs1rZBKYSqRV4bLRk32VGYLuvdrypL
`,
programName,
programName,
@ -110,7 +127,8 @@ func main() {
app := cli.NewApp()
app.Name = programName
app.Usage = "IPFS Cluster node"
app.UsageText = Description
app.Description = Description
//app.Copyright = "© Protocol Labs, Inc."
app.Version = ipfscluster.Version
app.Flags = []cli.Flag{
cli.BoolFlag{
@ -126,16 +144,25 @@ func main() {
},
cli.BoolFlag{
Name: "force, f",
Usage: "force configuration overwrite when running 'init'",
Usage: "forcefully proceed with some actions. i.e. overwriting configuration",
},
cli.StringFlag{
Name: "bootstrap, j",
Usage: "join a cluster providing an existing peer's `multiaddress`. Overrides the \"bootstrap\" values from the configuration",
},
cli.BoolFlag{
Name: "leave, x",
Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"",
Hidden: true,
},
cli.BoolFlag{
Name: "debug, d",
Usage: "enable full debug logging",
Usage: "enable full debug logging (very verbose)",
},
cli.StringFlag{
Name: "loglevel, l",
Value: "info",
Usage: "set the loglevel [critical, error, warning, info, debug]",
Usage: "set the loglevel for cluster only [critical, error, warning, info, debug]",
},
}
@ -185,6 +212,22 @@ func run(c *cli.Context) error {
cfg, err := loadConfig()
checkErr("loading configuration", err)
if a := c.String("bootstrap"); a != "" {
if len(cfg.ClusterPeers) > 0 && !c.Bool("force") {
return errors.New("The configuration provides ClusterPeers. Use -f to ignore and proceed bootstrapping")
}
joinAddr, err := ma.NewMultiaddr(a)
if err != nil {
return fmt.Errorf("error parsing multiaddress: %s", err)
}
cfg.Bootstrap = []ma.Multiaddr{joinAddr}
cfg.ClusterPeers = []ma.Multiaddr{}
}
if c.Bool("leave") {
cfg.LeaveOnShutdown = true
}
api, err := ipfscluster.NewRESTAPI(cfg)
checkErr("creating REST API component", err)
@ -201,35 +244,33 @@ func run(c *cli.Context) error {
tracker)
checkErr("starting cluster", err)
signalChan := make(chan os.Signal)
signalChan := make(chan os.Signal, 20)
signal.Notify(signalChan, os.Interrupt)
for {
select {
case <-signalChan:
err = cluster.Shutdown()
checkErr("shutting down cluster", err)
return nil
case <-cluster.Done():
return nil
case <-cluster.Ready():
logger.Info("IPFS Cluster is ready")
}
}
}
func setupLogging(lvl string) {
logging.SetLogLevel("service", lvl)
logging.SetLogLevel("cluster", lvl)
//logging.SetLogLevel("raft", lvl)
ipfscluster.SetFacilityLogLevel("service", lvl)
ipfscluster.SetFacilityLogLevel("cluster", lvl)
//ipfscluster.SetFacilityLogLevel("raft", lvl)
}
func setupDebug() {
logging.SetLogLevel("cluster", "debug")
//logging.SetLogLevel("libp2p-raft", "debug")
logging.SetLogLevel("p2p-gorpc", "debug")
//logging.SetLogLevel("swarm2", "debug")
logging.SetLogLevel("raft", "debug")
l := "DEBUG"
ipfscluster.SetFacilityLogLevel("cluster", l)
ipfscluster.SetFacilityLogLevel("raft", l)
ipfscluster.SetFacilityLogLevel("p2p-gorpc", l)
//SetFacilityLogLevel("swarm2", l)
//SetFacilityLogLevel("libp2p-raft", l)
}
func initConfig(force bool) {

View File

@ -155,6 +155,7 @@ type State interface {
ListPins() []*cid.Cid
// HasPin returns true if the state is holding a Cid
HasPin(*cid.Cid) bool
// AddPeer adds a peer to the shared state
}
// PinTracker represents a component which tracks the status of

View File

@ -26,7 +26,7 @@ var (
//TestClusters*
var (
// number of clusters to create
nClusters = 3
nClusters = 6
// number of pins to pin/unpin/check
nPins = 500
@ -70,12 +70,13 @@ func createComponents(t *testing.T, i int) (*Config, *RESTAPI, *IPFSHTTPConnecto
cfg, _ := NewDefaultConfig()
cfg.ID = pid
cfg.PrivateKey = priv
cfg.ClusterPeers = []ma.Multiaddr{}
cfg.Bootstrap = []ma.Multiaddr{}
cfg.ClusterAddr = clusterAddr
cfg.APIAddr = apiAddr
cfg.IPFSProxyAddr = proxyAddr
cfg.IPFSNodeAddr = nodeAddr
cfg.ConsensusDataFolder = "./e2eTestRaft/" + pid.Pretty()
cfg.LeaveOnShutdown = false
api, err := NewRESTAPI(cfg)
checkErr(t, err)
@ -124,6 +125,8 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
cfg.ID.Pretty()))
clusterPeers[i] = addr
}
// Set up the cluster using ClusterPeers
for i := 0; i < nClusters; i++ {
cfgs[i].ClusterPeers = make([]ma.Multiaddr, nClusters, nClusters)
for j := 0; j < nClusters; j++ {
@ -131,6 +134,16 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
}
}
// Alternative way of starting using bootstrap
// for i := 1; i < nClusters; i++ {
// addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s",
// clusterPort,
// cfgs[0].ID.Pretty()))
// // Use previous cluster for bootstrapping
// cfgs[i].Bootstrap = []ma.Multiaddr{addr}
// }
var wg sync.WaitGroup
for i := 0; i < nClusters; i++ {
wg.Add(1)
@ -140,6 +153,12 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
}(i)
}
wg.Wait()
// Yet an alternative way using PeerAdd
// for i := 1; i < nClusters; i++ {
// clusters[0].PeerAdd(clusterAddr(clusters[i]))
// }
delay()
return clusters, ipfsMocks
}
@ -168,7 +187,16 @@ func runF(t *testing.T, clusters []*Cluster, f func(*testing.T, *Cluster)) {
}
func delay() {
time.Sleep(time.Duration(nClusters) * time.Second)
var d int
if nClusters > 10 {
d = 8
} else if nClusters > 5 {
d = 5
} else {
d = nClusters
}
time.Sleep(time.Duration(d) * time.Second)
}
func TestClustersVersion(t *testing.T) {

View File

@ -14,8 +14,8 @@ var logger = logging.Logger("cluster")
var raftStdLogger = makeRaftLogger()
var raftLogger = logging.Logger("raft")
// SetLogLevel sets the level in the logs
func SetLogLevel(l string) {
// SetFacilityLogLevel sets the log level for a given module
func SetFacilityLogLevel(f, l string) {
/*
CRITICAL Level = iota
ERROR
@ -24,11 +24,7 @@ func SetLogLevel(l string) {
INFO
DEBUG
*/
logging.SetLogLevel("cluster", l)
//logging.SetLogLevel("raft", l)
//logging.SetLogLevel("p2p-gorpc", l)
//logging.SetLogLevel("swarm2", l)
//logging.SetLogLevel("libp2p-raft", l)
logging.SetLogLevel(f, l)
}
// This redirects Raft output to our logger

View File

@ -9,21 +9,24 @@ import (
// MapState is a very simple database to store the state of the system
// using a Go map. It is thread safe. It implements the State interface.
type MapState struct {
mux sync.RWMutex
pinMux sync.RWMutex
PinMap map[string]struct{}
peerMux sync.RWMutex
PeerMap map[string]string
}
// NewMapState initializes the internal map and returns a new MapState object.
func NewMapState() *MapState {
return &MapState{
PinMap: make(map[string]struct{}),
PeerMap: make(map[string]string),
}
}
// AddPin adds a Cid to the internal map.
func (st *MapState) AddPin(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.pinMux.Lock()
defer st.pinMux.Unlock()
var a struct{}
st.PinMap[c.String()] = a
return nil
@ -31,24 +34,24 @@ func (st *MapState) AddPin(c *cid.Cid) error {
// RmPin removes a Cid from the internal map.
func (st *MapState) RmPin(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.pinMux.Lock()
defer st.pinMux.Unlock()
delete(st.PinMap, c.String())
return nil
}
// HasPin returns true if the Cid belongs to the State.
func (st *MapState) HasPin(c *cid.Cid) bool {
st.mux.RLock()
defer st.mux.RUnlock()
st.pinMux.RLock()
defer st.pinMux.RUnlock()
_, ok := st.PinMap[c.String()]
return ok
}
// ListPins provides a list of Cids in the State.
func (st *MapState) ListPins() []*cid.Cid {
st.mux.RLock()
defer st.mux.RUnlock()
st.pinMux.RLock()
defer st.pinMux.RUnlock()
cids := make([]*cid.Cid, 0, len(st.PinMap))
for k := range st.PinMap {
c, _ := cid.Decode(k)

View File

@ -2,6 +2,11 @@
package ipfscluster
// This is our default logs levels
func init() {
SetLogLevel("INFO")
SetFacilityLogLevel("cluster", "INFO")
SetFacilityLogLevel("raft", "ERROR")
SetFacilityLogLevel("p2p-gorpc", "ERROR")
//SetFacilityLogLevel("swarm2", l)
SetFacilityLogLevel("libp2p-raft", "CRITICAL")
}

View File

@ -1,8 +1,6 @@
package ipfscluster
import (
"os"
"path/filepath"
"sync"
"time"
@ -11,140 +9,131 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
// peerManager is our own local peerstore
type peerManager struct {
cluster *Cluster
ps peerstore.Peerstore
self peer.ID
peerSetMux sync.RWMutex
peerSet map[peer.ID]struct{}
peermap map[peer.ID]ma.Multiaddr
m sync.RWMutex
}
func newPeerManager(c *Cluster) *peerManager {
pm := &peerManager{
cluster: c,
ps: c.host.Peerstore(),
self: c.host.ID(),
}
pm.resetPeerSet()
pm.resetPeers()
return pm
}
func (pm *peerManager) addPeer(addr ma.Multiaddr) (peer.ID, error) {
func (pm *peerManager) addPeer(addr ma.Multiaddr) error {
logger.Debugf("adding peer %s", addr)
peerID, decapAddr, err := multiaddrSplit(addr)
if err != nil {
return peerID, err
}
pm.peerSetMux.RLock()
_, ok := pm.peerSet[peerID]
pm.peerSetMux.RUnlock()
if ok {
logger.Debugf("%s is already a peer", peerID)
return peerID, nil
}
pm.peerSetMux.Lock()
pm.peerSet[peerID] = struct{}{}
pm.peerSetMux.Unlock()
pm.cluster.host.Peerstore().AddAddr(peerID, decapAddr, peerstore.PermanentAddrTTL)
pm.cluster.config.addPeer(addr)
if con := pm.cluster.consensus; con != nil {
pm.cluster.consensus.AddPeer(peerID)
}
if path := pm.cluster.config.path; path != "" {
err := pm.cluster.config.Save(path)
if err != nil {
logger.Error(err)
}
}
return peerID, nil
}
func (pm *peerManager) rmPeer(p peer.ID) error {
logger.Debugf("removing peer %s", p.Pretty())
pm.peerSetMux.RLock()
_, ok := pm.peerSet[p]
pm.peerSetMux.RUnlock()
if !ok {
return nil
}
pm.peerSetMux.Lock()
delete(pm.peerSet, p)
pm.peerSetMux.Unlock()
pm.cluster.host.Peerstore().ClearAddrs(p)
pm.cluster.config.rmPeer(p)
pm.cluster.consensus.RemovePeer(p)
// It's ourselves. This is not very graceful
if p == pm.cluster.host.ID() {
logger.Warning("this peer has been removed from the Cluster and will shutdown itself")
pm.cluster.config.emptyPeers()
defer func() {
go func() {
time.Sleep(time.Second)
pm.cluster.consensus.Shutdown()
pm.selfShutdown()
}()
}()
}
if path := pm.cluster.config.path; path != "" {
pm.cluster.config.Save(path)
}
return nil
}
func (pm *peerManager) selfShutdown() {
err := pm.cluster.Shutdown()
if err == nil {
// If the shutdown worked correctly
// (including snapshot) we can remove the Raft
// database (which traces peers additions
// and removals). It makes re-start of the peer
// way less confusing for Raft while the state
// kept in the snapshot.
os.Remove(filepath.Join(pm.cluster.config.ConsensusDataFolder, "raft.db"))
}
}
// empty the peerset and add ourselves only
func (pm *peerManager) resetPeerSet() {
pm.peerSetMux.Lock()
defer pm.peerSetMux.Unlock()
pm.peerSet = make(map[peer.ID]struct{})
pm.peerSet[pm.cluster.host.ID()] = struct{}{}
}
func (pm *peerManager) peers() []peer.ID {
pm.peerSetMux.RLock()
defer pm.peerSetMux.RUnlock()
var pList []peer.ID
for k := range pm.peerSet {
pList = append(pList, k)
}
return pList
}
func (pm *peerManager) addFromConfig(cfg *Config) error {
return pm.addFromMultiaddrs(cfg.ClusterPeers)
}
func (pm *peerManager) addFromMultiaddrs(mAddrIDs []ma.Multiaddr) error {
pm.resetPeerSet()
pm.cluster.config.emptyPeers()
if len(mAddrIDs) > 0 {
logger.Info("adding Cluster peers:")
} else {
logger.Info("This is a single-node cluster")
}
for _, m := range mAddrIDs {
_, err := pm.addPeer(m)
pid, decapAddr, err := multiaddrSplit(addr)
if err != nil {
return err
}
logger.Infof(" - %s", m.String())
pm.ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)
if !pm.isPeer(pid) {
logger.Infof("new Cluster peer %s", addr.String())
}
pm.m.Lock()
pm.peermap[pid] = addr
pm.m.Unlock()
return nil
}
func (pm *peerManager) rmPeer(pid peer.ID, selfShutdown bool) error {
logger.Debugf("removing peer %s", pid.Pretty())
if pm.isPeer(pid) {
logger.Infof("removing Cluster peer %s", pid.Pretty())
}
pm.m.Lock()
delete(pm.peermap, pid)
pm.m.Unlock()
// It's ourselves. This is not very graceful
if pid == pm.self && selfShutdown {
logger.Warning("this peer has been removed from the Cluster and will shutdown itself in 5 seconds")
defer func() {
go func() {
time.Sleep(1 * time.Second)
pm.cluster.consensus.Shutdown()
pm.resetPeers()
time.Sleep(4 * time.Second)
pm.cluster.Shutdown()
}()
}()
}
return nil
}
func (pm *peerManager) savePeers() {
pm.cluster.config.ClusterPeers = pm.peersAddrs()
pm.cluster.config.Save("")
}
func (pm *peerManager) resetPeers() {
pm.m.Lock()
pm.peermap = make(map[peer.ID]ma.Multiaddr)
pm.peermap[pm.self] = pm.cluster.config.ClusterAddr
pm.m.Unlock()
}
func (pm *peerManager) isPeer(p peer.ID) bool {
if p == pm.self {
return true
}
pm.m.RLock()
_, ok := pm.peermap[p]
pm.m.RUnlock()
return ok
}
// peers including ourselves
func (pm *peerManager) peers() []peer.ID {
pm.m.RLock()
defer pm.m.RUnlock()
var peers []peer.ID
for k := range pm.peermap {
peers = append(peers, k)
}
return peers
}
// cluster peer addresses (NOT including ourselves)
func (pm *peerManager) peersAddrs() []ma.Multiaddr {
pm.m.RLock()
defer pm.m.RUnlock()
var addrs []ma.Multiaddr
for k, addr := range pm.peermap {
if k != pm.self {
addrs = append(addrs, addr)
}
}
return addrs
}
// func (pm *peerManager) addFromConfig(cfg *Config) error {
// return pm.addFromMultiaddrs(cfg.ClusterPeers)
// }
func (pm *peerManager) addFromMultiaddrs(addrs []ma.Multiaddr) error {
for _, m := range addrs {
err := pm.addPeer(m)
if err != nil {
logger.Error(err)
return err
}
}
return nil
}

View File

@ -1,8 +1,10 @@
package ipfscluster
import (
"math/rand"
"sync"
"testing"
"time"
cid "github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"
@ -26,9 +28,7 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
}
func clusterAddr(c *Cluster) ma.Multiaddr {
addr := c.config.ClusterAddr
pidAddr, _ := ma.NewMultiaddr("/ipfs/" + c.ID().ID.Pretty())
return addr.Encapsulate(pidAddr)
return multiaddrJoin(c.config.ClusterAddr, c.ID().ID)
}
func TestClustersPeerAdd(t *testing.T) {
@ -36,7 +36,7 @@ func TestClustersPeerAdd(t *testing.T) {
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Fatal("need at least 2 nodes for this test")
t.Skip("need at least 2 nodes for this test")
}
for i := 1; i < len(clusters); i++ {
@ -45,6 +45,7 @@ func TestClustersPeerAdd(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(id.ClusterPeers) != i {
// ClusterPeers is originally empty and contains nodes as we add them
t.Log(id.ClusterPeers)
@ -64,6 +65,7 @@ func TestClustersPeerAdd(t *testing.T) {
// check they are tracked by the peer manager
if len(ids) != nClusters {
//t.Log(ids)
t.Error("added clusters are not part of clusters")
}
@ -74,16 +76,21 @@ func TestClustersPeerAdd(t *testing.T) {
t.Error("expected 1 pin everywhere")
}
// check that its part of the configuration
if len(c.config.ClusterPeers) != nClusters-1 {
t.Error("expected different cluster peers in the configuration")
if len(c.ID().ClusterPeers) != nClusters-1 {
t.Log(c.ID().ClusterPeers)
t.Error("By now cluster peers should reflect all peers")
}
for _, peer := range c.config.ClusterPeers {
if peer == nil {
t.Error("something went wrong adding peer to config")
}
}
// // check that its part of the configuration
// if len(c.config.ClusterPeers) != nClusters-1 {
// t.Error("expected different cluster peers in the configuration")
// }
// for _, peer := range c.config.ClusterPeers {
// if peer == nil {
// t.Error("something went wrong adding peer to config")
// }
// }
}
runF(t, clusters, f)
}
@ -93,7 +100,7 @@ func TestClustersPeerAddBadPeer(t *testing.T) {
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Fatal("need at least 2 nodes for this test")
t.Skip("need at least 2 nodes for this test")
}
// We add a cluster that has been shutdown
@ -114,7 +121,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Fatal("need at least 3 nodes for this test")
t.Skip("need at least 3 nodes for this test")
}
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
@ -139,10 +146,15 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
}
func TestClustersPeerRemove(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
}
p := clusters[1].ID().ID
//t.Logf("remove %s from %s", p.Pretty(), clusters[0].config.ClusterPeers)
err := clusters[0].PeerRemove(p)
if err != nil {
t.Error(err)
@ -154,19 +166,143 @@ func TestClustersPeerRemove(t *testing.T) {
if ok {
t.Error("removed peer should have exited")
}
if len(c.config.ClusterPeers) != 0 {
t.Error("cluster peers should be empty")
}
// if len(c.config.ClusterPeers) != 0 {
// t.Error("cluster peers should be empty")
// }
} else {
ids := c.Peers()
if len(ids) != nClusters-1 {
t.Error("should have removed 1 peer")
}
if len(c.config.ClusterPeers) != nClusters-2 {
t.Error("should have removed peer from config")
}
// if len(c.config.ClusterPeers) != nClusters-1 {
// t.Log(c.config.ClusterPeers)
// t.Error("should have removed peer from config")
// }
}
}
runF(t, clusters, f)
}
func TestClusterPeerRemoveSelf(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
for i := 0; i < len(clusters); i++ {
err := clusters[i].PeerRemove(clusters[i].ID().ID)
if err != nil {
t.Error(err)
}
time.Sleep(time.Second)
_, more := <-clusters[i].Done()
if more {
t.Error("should be done")
}
}
}
func TestClustersPeerJoin(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
}
for i := 1; i < len(clusters); i++ {
err := clusters[i].Join(clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
}
}
hash, _ := cid.Decode(testCid)
clusters[0].Pin(hash)
delay()
f := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
runF(t, clusters, f)
}
func TestClustersPeerJoinAllAtOnce(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
}
f := func(t *testing.T, c *Cluster) {
err := c.Join(clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
}
}
runF(t, clusters[1:], f)
hash, _ := cid.Decode(testCid)
clusters[0].Pin(hash)
delay()
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
runF(t, clusters, f2)
}
func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
}
// We have a 2 node cluster and the rest of nodes join
// one of the two seeds randomly
err := clusters[1].Join(clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
}
f := func(t *testing.T, c *Cluster) {
j := rand.Intn(2)
err := c.Join(clusterAddr(clusters[j]))
if err != nil {
t.Fatal(err)
}
}
runF(t, clusters[2:], f)
hash, _ := cid.Decode(testCid)
clusters[0].Pin(hash)
delay()
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
runF(t, clusters, f2)
}

112
raft.go
View File

@ -37,6 +37,7 @@ type Raft struct {
stableStore hashiraft.StableStore
peerstore *libp2praft.Peerstore
boltdb *raftboltdb.BoltStore
dataFolder string
}
func defaultRaftConfig() *hashiraft.Config {
@ -87,14 +88,15 @@ func NewRaft(peers []peer.ID, host host.Host, dataFolder string, fsm hashiraft.F
return nil, err
}
cfg := defaultRaftConfig()
logger.Debug("creating Raft")
r, err := hashiraft.NewRaft(defaultRaftConfig(), fsm, logStore, logStore, snapshots, pstore, transport)
r, err := hashiraft.NewRaft(cfg, fsm, logStore, logStore, snapshots, pstore, transport)
if err != nil {
logger.Error("initializing raft: ", err)
return nil, err
}
return &Raft{
raft := &Raft{
raft: r,
transport: transport,
snapshotStore: snapshots,
@ -102,22 +104,26 @@ func NewRaft(peers []peer.ID, host host.Host, dataFolder string, fsm hashiraft.F
stableStore: logStore,
peerstore: pstore,
boltdb: logStore,
}, nil
dataFolder: dataFolder,
}
return raft, nil
}
// WaitForLeader holds until Raft says we have a leader
func (r *Raft) WaitForLeader(ctx context.Context) {
// WaitForLeader holds until Raft says we have a leader.
// Returns an error if we don't.
func (r *Raft) WaitForLeader(ctx context.Context) error {
// Using Raft observers panics on non-64 architectures.
// This is a work around
logger.Info("waiting for leader")
if sixtyfour {
r.waitForLeader(ctx)
} else {
r.waitForLeaderLegacy(ctx)
return r.waitForLeader(ctx)
}
return r.waitForLeaderLegacy(ctx)
}
func (r *Raft) waitForLeader(ctx context.Context) {
obsCh := make(chan hashiraft.Observation)
func (r *Raft) waitForLeader(ctx context.Context) error {
obsCh := make(chan hashiraft.Observation, 1)
filter := func(o *hashiraft.Observation) bool {
switch o.Data.(type) {
case hashiraft.LeaderObservation:
@ -126,29 +132,43 @@ func (r *Raft) waitForLeader(ctx context.Context) {
return false
}
}
observer := hashiraft.NewObserver(obsCh, true, filter)
observer := hashiraft.NewObserver(obsCh, false, filter)
r.raft.RegisterObserver(observer)
defer r.raft.DeregisterObserver(observer)
ticker := time.NewTicker(time.Second)
for {
select {
case obs := <-obsCh:
switch obs.Data.(type) {
case hashiraft.LeaderObservation:
leaderObs := obs.Data.(hashiraft.LeaderObservation)
logger.Infof("Raft Leader elected: %s", leaderObs.Leader)
return nil
}
case <-ticker.C:
if l := r.raft.Leader(); l != "" { //we missed or there was no election
logger.Debug("waitForleaderTimer")
logger.Infof("Raft Leader elected: %s", l)
ticker.Stop()
return nil
}
case <-ctx.Done():
return
return ctx.Err()
}
}
}
func (r *Raft) waitForLeaderLegacy(ctx context.Context) {
// 32-bit systems should use this.
func (r *Raft) waitForLeaderLegacy(ctx context.Context) error {
for {
leader := r.raft.Leader()
if leader != "" {
logger.Infof("Raft Leader elected: %s", leader)
return
return nil
}
select {
case <-ctx.Done():
return
return ctx.Err()
default:
time.Sleep(500 * time.Millisecond)
}
@ -156,23 +176,22 @@ func (r *Raft) waitForLeaderLegacy(ctx context.Context) {
}
// WaitForUpdates holds until Raft has synced to the last index in the log
func (r *Raft) WaitForUpdates(ctx context.Context) {
func (r *Raft) WaitForUpdates(ctx context.Context) error {
logger.Debug("Raft state is catching up")
for {
select {
case <-ctx.Done():
return
return ctx.Err()
default:
lai := r.raft.AppliedIndex()
li := r.raft.LastIndex()
logger.Debugf("current Raft index: %d/%d",
lai, li)
if lai == li {
return
return nil
}
time.Sleep(500 * time.Millisecond)
}
}
}
@ -199,28 +218,81 @@ func (r *Raft) Shutdown() error {
if err != nil {
errMsgs += "could not close boltdb: " + err.Error()
}
if errMsgs != "" {
return errors.New(errMsgs)
}
// If the shutdown worked correctly
// (including snapshot) we can remove the Raft
// database (which traces peers additions
// and removals). It makes re-start of the peer
// way less confusing for Raft while the state
// can be restored from the snapshot.
//os.Remove(filepath.Join(r.dataFolder, "raft.db"))
return nil
}
// AddPeer adds a peer to Raft
func (r *Raft) AddPeer(peer string) error {
if r.hasPeer(peer) {
logger.Debug("skipping raft add as already in peer set")
return nil
}
future := r.raft.AddPeer(peer)
err := future.Error()
if err != nil {
logger.Error("raft cannot add peer: ", err)
return err
}
peers, _ := r.peerstore.Peers()
logger.Debugf("raft peerstore: %s", peers)
return err
}
// RemovePeer removes a peer from Raft
func (r *Raft) RemovePeer(peer string) error {
if !r.hasPeer(peer) {
return nil
}
future := r.raft.RemovePeer(peer)
err := future.Error()
if err != nil {
logger.Error("raft cannot remove peer: ", err)
return err
}
peers, _ := r.peerstore.Peers()
logger.Debugf("raft peerstore: %s", peers)
return err
}
// func (r *Raft) SetPeers(peers []string) error {
// logger.Debugf("SetPeers(): %s", peers)
// future := r.raft.SetPeers(peers)
// err := future.Error()
// if err != nil {
// logger.Error(err)
// }
// return err
// }
// Leader returns Raft's leader. It may be an empty string if
// there is no leader or it is unknown.
func (r *Raft) Leader() string {
return r.raft.Leader()
}
func (r *Raft) hasPeer(peer string) bool {
found := false
peers, _ := r.peerstore.Peers()
for _, p := range peers {
if p == peer {
found = true
break
}
}
return found
}

View File

@ -1,8 +1,9 @@
package ipfscluster
import (
cid "github.com/ipfs/go-cid"
"errors"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
@ -111,6 +112,13 @@ func (api *RPCAPI) PeerRemove(in peer.ID, out *struct{}) error {
return api.cluster.PeerRemove(in)
}
// Join runs Cluster.Join().
func (api *RPCAPI) Join(in MultiaddrSerial, out *struct{}) error {
addr := in.ToMultiaddr()
err := api.cluster.Join(addr)
return err
}
// StatusAll runs Cluster.StatusAll().
func (api *RPCAPI) StatusAll(in struct{}, out *[]GlobalPinInfo) error {
pinfo, err := api.cluster.StatusAll()
@ -295,32 +303,58 @@ func (api *RPCAPI) ConsensusLogUnpin(in *CidArg, out *struct{}) error {
return api.cluster.consensus.LogUnpin(c)
}
// ConsensusLogAddPeer runs Consensus.LogAddPeer().
func (api *RPCAPI) ConsensusLogAddPeer(in MultiaddrSerial, out *struct{}) error {
addr := in.ToMultiaddr()
return api.cluster.consensus.LogAddPeer(addr)
}
// ConsensusLogRmPeer runs Consensus.LogRmPeer().
func (api *RPCAPI) ConsensusLogRmPeer(in peer.ID, out *struct{}) error {
return api.cluster.consensus.LogRmPeer(in)
}
/*
Peer Manager methods
*/
// PeerManagerAddPeer runs peerManager.addPeer().
func (api *RPCAPI) PeerManagerAddPeer(in MultiaddrSerial, out *peer.ID) error {
mAddr := in.ToMultiaddr()
p, err := api.cluster.peerManager.addPeer(mAddr)
*out = p
func (api *RPCAPI) PeerManagerAddPeer(in MultiaddrSerial, out *struct{}) error {
addr := in.ToMultiaddr()
err := api.cluster.peerManager.addPeer(addr)
return err
}
// PeerManagerRmPeer runs peerManager.rmPeer().
func (api *RPCAPI) PeerManagerRmPeer(in peer.ID, out *struct{}) error {
return api.cluster.peerManager.rmPeer(in)
}
// PeerManagerAddFromMultiaddrs runs peerManager.addFromMultiaddrs().
func (api *RPCAPI) PeerManagerAddFromMultiaddrs(in MultiaddrsSerial, out *struct{}) error {
api.cluster.peerManager.addFromMultiaddrs(in.ToMultiaddrs())
return nil
addrs := in.ToMultiaddrs()
err := api.cluster.peerManager.addFromMultiaddrs(addrs)
return err
}
// PeerManagerPeers runs peerManager.peers().
func (api *RPCAPI) PeerManagerPeers(in struct{}, out *[]peer.ID) error {
peers := api.cluster.peerManager.peers()
*out = peers
// PeerManagerRmPeerShutdown runs peerManager.rmPeer().
func (api *RPCAPI) PeerManagerRmPeerShutdown(in peer.ID, out *struct{}) error {
return api.cluster.peerManager.rmPeer(in, true)
}
// PeerManagerRmPeer runs peerManager.rmPeer().
func (api *RPCAPI) PeerManagerRmPeer(in peer.ID, out *struct{}) error {
return api.cluster.peerManager.rmPeer(in, false)
}
/*
Other
*/
// RemoteMultiaddrForPeer returns the multiaddr of a peer as seen by this peer.
// This is necessary for a peer to figure out which of its multiaddresses the
// peers are seeing (also when crossing NATs). It should be called from
// the peer the IN parameter indicates.
func (api *RPCAPI) RemoteMultiaddrForPeer(in peer.ID, out *MultiaddrSerial) error {
conns := api.cluster.host.Network().ConnsToPeer(in)
if len(conns) == 0 {
return errors.New("no connections to: " + in.Pretty())
}
*out = MultiaddrToSerial(multiaddrJoin(conns[0].RemoteMultiaddr(), in))
return nil
}

View File

@ -3,5 +3,10 @@
package ipfscluster
func init() {
SetLogLevel("CRITICAL")
l := "CRITICAL"
SetFacilityLogLevel("cluster", l)
SetFacilityLogLevel("raft", l)
SetFacilityLogLevel("p2p-gorpc", l)
SetFacilityLogLevel("swarm2", l)
SetFacilityLogLevel("libp2p-raft", l)
}

71
util.go
View File

@ -3,18 +3,20 @@ package ipfscluster
import (
"fmt"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
// The copy functions below are used in calls to Cluste.multiRPC()
func copyPIDsToIfaces(in []peer.ID) []interface{} {
ifaces := make([]interface{}, len(in), len(in))
for i := range in {
ifaces[i] = &in[i]
}
return ifaces
}
// func copyPIDsToIfaces(in []peer.ID) []interface{} {
// ifaces := make([]interface{}, len(in), len(in))
// for i := range in {
// ifaces[i] = &in[i]
// }
// return ifaces
// }
func copyIDSerialsToIfaces(in []IDSerial) []interface{} {
ifaces := make([]interface{}, len(in), len(in))
@ -51,7 +53,7 @@ func copyEmptyStructToIfaces(in []struct{}) []interface{} {
func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
pid, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
err = fmt.Errorf("Invalid peer multiaddress: %s: %s", addr, err)
err = fmt.Errorf("invalid peer multiaddress: %s: %s", addr, err)
logger.Error(err)
return "", nil, err
}
@ -61,9 +63,60 @@ func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
peerID, err := peer.IDB58Decode(pid)
if err != nil {
err = fmt.Errorf("Invalid peer ID in multiaddress: %s: %s", pid)
err = fmt.Errorf("invalid peer ID in multiaddress: %s: %s", pid, err)
logger.Error(err)
return "", nil, err
}
return peerID, decapAddr, nil
}
func multiaddrJoin(addr ma.Multiaddr, p peer.ID) ma.Multiaddr {
pidAddr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(p))
// let this break badly
if err != nil {
panic("called multiaddrJoin with bad peer!")
}
return addr.Encapsulate(pidAddr)
}
func peersFromMultiaddrs(addrs []ma.Multiaddr) []peer.ID {
var pids []peer.ID
for _, addr := range addrs {
pid, _, err := multiaddrSplit(addr)
if err != nil {
continue
}
pids = append(pids, pid)
}
return pids
}
// // connect to a peer ID.
// func connectToPeer(ctx context.Context, h host.Host, id peer.ID, addr ma.Multiaddr) error {
// err := h.Connect(ctx, peerstore.PeerInfo{
// ID: id,
// Addrs: []ma.Multiaddr{addr},
// })
// return err
// }
// // return the local multiaddresses used to communicate to a peer.
// func localMultiaddrsTo(h host.Host, pid peer.ID) []ma.Multiaddr {
// var addrs []ma.Multiaddr
// conns := h.Network().ConnsToPeer(pid)
// logger.Debugf("conns to %s are: %s", pid, conns)
// for _, conn := range conns {
// addrs = append(addrs, multiaddrJoin(conn.LocalMultiaddr(), h.ID()))
// }
// return addrs
// }
// If we have connections open to that PID and they are using a different addr
// then we return the one we are using, otherwise the one provided
func getRemoteMultiaddr(h host.Host, pid peer.ID, addr ma.Multiaddr) ma.Multiaddr {
conns := h.Network().ConnsToPeer(pid)
if len(conns) > 0 {
return multiaddrJoin(conns[0].RemoteMultiaddr(), pid)
}
return multiaddrJoin(addr, pid)
}