diff --git a/README.md b/README.md index 6324323e..645a821b 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,9 @@ Current functionality only allows pinning in all cluster peers, but more strateg - [Maintainers and roadmap](#maintainers-and-roadmap) - [Install](#install) - [Usage](#usage) + - [`ipfs-cluster-service`](#ipfs-cluster-service) + - [`ipfs-cluster-ctl`](#ipfs-cluster-ctl) + - [Quick start: Building and updating an IPFS Cluster](#quick-start-building-and-updating-an-ipfs-cluster) - [API](#api) - [Contribute](#contribute) - [License](#license) @@ -87,21 +90,17 @@ You can add the multiaddresses for the other cluster peers the `cluster_peers` v "cluster_peers" : [ "/ip4/192.168.1.2/tcp/9096/ipfs/QmcQ5XvrSQ4DouNkQyQtEoLczbMr6D9bSenGy6WQUCQUBt", "/ip4/192.168.1.3/tcp/9096/ipfs/QmdFBMf9HMDH3eCWrc1U11YCPenC3Uvy9mZQ2BedTyKTDf", - "/ip4/192.168.1.4/tcp/9096/ipfs/QmYY1ggjoew5eFrvkenTR3F4uWqtkBkmgfJk8g9Qqcwy51", - "/ip4/192.168.1.5/tcp/9096/ipfs/QmSGCzHkz8gC9fNndMtaCZdf9RFtwtbTEEsGo4zkVfcykD" + "/ip4/192.168.1.4/tcp/9096/ipfs/QmYY1ggjoew5eFrvkenTR3F4uWqtkBkmgfJk8g9Qqcwy51" ], "cluster_multiaddress": "/ip4/0.0.0.0/tcp/9096", "api_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "ipfs_proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001", - "consensus_data_folder": "/home/user/.ipfs-cluster/data", - "raft_config": { - "snapshot_interval_seconds": 120, - "enable_single_node": true - } + "consensus_data_folder": "/home/user/.ipfs-cluster/data" +} ``` -The configuration file should probably be identical among all cluster peers, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it does not have to. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig). +The configuration file should probably be identical among all cluster peers, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it will be removed on boot. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig). Once every cluster peer has the configuration in place, you can run `ipfs-cluster-service` to start the cluster. @@ -124,6 +123,8 @@ In summary, it works as follows: ``` $ ipfs-cluster-ctl id # show cluster peer and ipfs daemon information $ ipfs-cluster-ctl peers ls # list cluster peers +$ ipfs-cluster-ctl peers add /ip4/1.2.3.4/tcp/1234/ # add a new cluster peer +$ ipfs-cluster-ctl peers rm # remove a cluster peer $ ipfs-cluster-ctl pin add Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # pins a CID in the cluster $ ipfs-cluster-ctl pin rm Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # unpins a CID from the cluster $ ipfs-cluster-ctl status # display tracked CIDs information @@ -135,6 +136,107 @@ $ ipfs-cluster-ctl recover Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # at `ipfs-cluster-ctl` provides a `--debug` flag which allows to inspect request paths and raw response bodies. +### Quick start: Building and updating an IPFS Cluster + +#### Step 0: Run your first cluster node + +This step creates a single-node IPFS Cluster. + +First initialize the configuration: + +``` +node0 $ ipfs-cluster-service init +ipfs-cluster-service configuration written to /home/user/.ipfs-cluster/service.json +``` + +Then run cluster: + +``` +node0> ipfs-cluster-service +12:38:34.470 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59 +12:38:34.472 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF cluster.go:61 +12:38:34.472 INFO cluster: /ip4/192.168.1.58/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF cluster.go:61 +12:38:34.472 INFO cluster: This is a single-node cluster peer_manager.go:141 +12:38:34.569 INFO cluster: starting Consensus and waiting for a leader... consensus.go:124 +12:38:34.591 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309 +12:38:34.591 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168 +12:38:34.592 INFO cluster: PinTracker ready map_pin_tracker.go:71 +12:38:36.092 INFO cluster: Raft Leader elected: QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF raft.go:146 +12:38:36.092 INFO cluster: Consensus state is up to date consensus.go:170 +12:38:36.092 INFO cluster: IPFS Cluster is ready cluster.go:526 +``` + +#### Step 1: Add new members to the cluster + +Initialize and run cluster in a different node(s): + +``` +node1> ipfs-cluster-service init +ipfs-cluster-service configuration written to /home/user/.ipfs-cluster/service.json +node1> ipfs-cluster-service +12:39:24.818 INFO cluster: IPFS Cluster v0.0.1 listening on: cluster.go:59 +12:39:24.819 INFO cluster: /ip4/127.0.0.1/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK cluster.go:61 +12:39:24.820 INFO cluster: /ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK cluster.go:61 +12:39:24.820 INFO cluster: This is a single-node cluster peer_manager.go:141 +12:39:24.850 INFO cluster: starting Consensus and waiting for a leader... consensus.go:124 +12:39:24.876 INFO cluster: IPFS Proxy: /ip4/127.0.0.1/tcp/9095 -> /ip4/127.0.0.1/tcp/5001 ipfs_http_connector.go:168 +12:39:24.876 INFO cluster: REST API: /ip4/127.0.0.1/tcp/9094 rest_api.go:309 +12:39:24.876 INFO cluster: PinTracker ready map_pin_tracker.go:71 +12:39:26.877 INFO cluster: Raft Leader elected: QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK raft.go:146 +12:39:26.877 INFO cluster: Consensus state is up to date consensus.go:170 +12:39:26.878 INFO service: IPFS Cluster is ready main.go:184 +``` + +Add them to the original cluster with `ipfs-cluster-ctl peers add `. The command will return the ID information of the newly added member: + +``` +node0> ipfs-cluster-ctl peers add /ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK +{ + "id": "QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK", + "public_key": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDQUN0iWAdbYEfQFAYcORsd0XnBvR9dk1QrJbzyqwDEHebP/wYqjeK73cyzBrpTYzxyd205ZSrpImL1GvVl+iLONMlz0CHsQ2YL0zzYHy55Y+1LhGGZY5R14MqvrjSq8pWo8U9nF8aenXSxhNvVeErnE5voVUU7YTjXSaXYmsK0cT7erKHZooJ16dzL/UmRTYlirMuGcOv/4WmgYX5fikH1mtw1Ln2xew76qxL5MeCu7v7NNugbsachJFiC/0DewxPClS03Nv6TvW2FsN4iis961EoBH7qTI3E1gUS89s7xp2njfgD/hsyk6YUbEEbOJUNihPFJ3Wlx6ogbis3cdX3tAgMBAAE=", + "addresses": [ + "/ip4/127.0.0.1/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK", + "/ip4/192.168.1.57/tcp/9096/ipfs/QmQn6aaWJNvyZnLh4soqjXQXiXGZM8VTuZW4B6AGaWxeNK" + ], + "cluster_peers": [ + "/ip4/192.168.1.58/tcp/9096/ipfs/QmWM4knzVuWU5utXqkD2JeQ9zYT82f4s9s196bJ8PekStF" + ], + "version": "0.0.1", + "commit": "", + "rpc_protocol_version": "/ipfscluster/0.0.1/rpc", + "ipfs": { + "id": "QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY", + "addresses": [ + "/ip4/127.0.0.1/tcp/4001/ipfs/QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY", + "/ip4/192.168.1.57/tcp/4001/ipfs/QmRbn14eEDGEDf9d6mW64W6KsinkmMXqZaToWVbRANT8gY" + ] + } +} +``` + +You can repeat the process with any other nodes. + +#### Step 3: Remove no longer needed nodes + +You can use `ipfs-cluster-ctl peers rm ` to remove and disconnect any nodes from your cluster. The nodes will be automatically +shutdown. They can be restarted manually and re-added to the Cluster any time: + +``` +node0> ipfs-cluster-ctl peers rm QmbGFbZVTF3UAEPK9pBVdwHGdDAYkHYufQwSh4k1i8bbbb +OK +``` + +The `node1` is then disconnected and shuts down: + +``` +12:41:13.693 WARNI cluster: this peer has been removed from the Cluster and will shutdown itself peer_manager.go:80 +12:41:13.695 INFO cluster: stopping Consensus component consensus.go:231 +12:41:14.695 INFO cluster: shutting down IPFS Cluster cluster.go:135 +12:41:14.696 INFO cluster: stopping Cluster API rest_api.go:327 +12:41:14.696 INFO cluster: stopping IPFS Proxy ipfs_http_connector.go:332 +12:41:14.697 INFO cluster: stopping MapPinTracker map_pin_tracker.go:87 +``` + ### Go IPFS Cluster nodes can be launched directly from Go. The `Cluster` object provides methods to interact with the cluster and perform actions. @@ -152,6 +254,8 @@ This is a quick summary of API endpoints offered by the Rest API component (thes |GET |/id |Cluster peer information| |GET |/version |Cluster version| |GET |/peers |Cluster peers| +|POST |/peers |Add new peer| +|DELETE|/peers/{peerID} |Remove a peer| |GET |/pinlist |List of pins in the consensus state| |GET |/pins |Status of all tracked CIDs| |POST |/pins/sync |Sync all| diff --git a/architecture.md b/architecture.md index 2373ce0d..2a496f6f 100644 --- a/architecture.md +++ b/architecture.md @@ -32,7 +32,7 @@ These definitions are still evolving and may change: * **IPFSConnector**: a component which talks to the IPFS daemon and provides a proxy to it. Default: `IPFSHTTPConnector` * **API**: a component which offers a public facing API. Default: `RESTAPI` * **State**: a component which keeps a list of Pins (maintained by the Consensus component) - * The **Consensus** component. This component separate but internal to Cluster in the sense that it cannot be provided arbitrarily during initialization. The consensus component uses `go-libp2p-raft` via `go-libp2p-consensus`. While it is attempted to be agnostic from the underlying consensus implementation, it is not possible in all places. These places are however well marked. + * The **Consensus** component. This component is separate but internal to Cluster in the sense that it cannot be provided arbitrarily during initialization. The consensus component uses `go-libp2p-raft` via `go-libp2p-consensus`. While it is attempted to be agnostic from the underlying consensus implementation, it is not possible in all places. These places are however well marked. Components perform a number of functions and need to be able to communicate with eachothers: i.e.: @@ -62,6 +62,54 @@ This is the service application of IPFS Cluster. It brings up a cluster, connect This is the client/control application of IPFS Cluster. It is a command line interface which uses the REST API to communicate with Cluster. +## Code paths + +This sections explains how some things work in Cluster. + +### Startup + +* `NewCluster` triggers the Cluster bootstrap. `IPFSConnector`, `State`, `PinTracker` component are provided. These components are up but possibly waiting for a `SetClient(RPCClient)` call. Without having an RPCClient, they are unable to communicate with the other components. +* The first step bootstrapping is to create the libp2p `Host`. It is using configuration keys to set up public, private keys, the swarm network etc. +* The `peerManager` is initialized. Peers from the configuration and ourselves are added to the peer list. +* The `RPCServer` (`go-libp2p-gorpc`) is setup, along with an `RPCClient`. The client can communicate to any RPC server using libp2p, or with the local one (shortcutting). The `RPCAPI` object is registered: it implements all the RPC operations supported by cluster and used for inter-component/inter-peer communication. +* The `Consensus` component is bootstrapped. It: + * Sets up a new Raft node from scratch, including snapshots, stable datastore (boltDB), log store etc... + * Initializes `go-libp2p-consensus` components (`Actor` and `LogOpConsensus`) using `go-libp2p-raft` + * Returns a new `Consensus` object while asynchronously waiting for a Raft leader and then also for the current Raft peer to catch up with the latest index of the log. + * Waits for an RPCClient to be set and when it happens it triggers an asynchronous RPC request (`StateSync`) to the local `Cluster` component and reports `Ready()` + * The `StateSync` operation (from the main `Cluster` component) makes a diff between the local `MapPinTracker` state and the consensus-maintained state. It triggers asynchronous local RPC requests (`Track` and `Untrack`) to the `MapPinTracker`. + * The `MapPinTracker` receives the `Track` requests and checks, pins or unpins items, as well as updating the local status of a pin (see the Pinning section below) +* While the consensus is being bootstrapped, `SetClient(RPCClient` is called on all components (tracker, ipfs connector, api and consensus) +* The new `Cluster` object is returned. +* Asynchronously, a thread waits for the `Consensus` component to report `Ready()`. When this happens, `Cluster` reports itself `Ready()`. At this moment, all components are up, consensus is working and cluster is ready to perform any operations. The consensus state may still be syncing, or mostly the `MapPinTracker` may still be verifying that pins are there against the daemon, but this does not causes any problems to use cluster. + + +### Adding a peer + +* The `RESTAPI` component receives a `PeerAdd` request on the respective endpoint. It makes a `PeerAdd` RPC request. +* The local RPC server receives it and calls `PeerAdd` method in the main `Cluster` component. +* A libp2p connection is opened to the new peer's multiaddress. It should be reachable. We note down the local multiaddress used to reach the new peer. +* A broadcast `PeerManagerAddPeer` request is sent to all peers in the current cluster. It is received and the RPC server calls `peerManager.addPeer`. The `peerManager` is an annex to the main Cluster Component around the peer management functionality (add/remove). +* The `peerManager` adds the new peer to libp2p's peerstore, asks Raft to make if part of its peerset and adds it to the `ClusterPeers` section +of the configuration (which is then saved). +* If the broadcast requests fails somewhere, the operation is aborted, and a `PeerRemove` operation for the new peer is triggered to undo any changes. Otherwise, on success, the local list of ClusterPeers from the configuration, along with the local multiaddress from the connection we noted down are +sent to the new peer (via the RPC `PeerManagerAddFromMultiaddrs` method). +* The new peer's `peerManager` updates the current list of peers, the Raft's peer set and the configuration and has become part of the new cluster. +* The `PeerAdd` method in `Cluster` makes an remote RPC request to the new peers `ID` method. The received ID is used as response to the call. +* The RPC server takes the response and sends it to the `RESTAPI` component, which in turns converts it and responds to the request. + +### Pinning + +* The `RESTAPI` component receives a `Pin` request on the respective endpoint. This triggers a `Pin` RPC request. +* The local RPC server receives it and calls `Pin` method in the main `Cluster` component. +* The main cluster component calls `LogPin` in the `Consensus` Component. +* The Consensus component checks that it is the Raft leader, or alternatively makes an `ConsensusLogPin` RPC request to whoever is the leader. +* The Consensus component of the current Raft's leader builds a pin `clusterLogOp` operation performs a `consensus.Commit(op)`. This is handled by `go-libp2p-raft` and, when the operation makes it to the Raft's log, it triggers `clusterLogOp.ApplyTo(state)` in every peer. +* `ApplyTo` modifies the local cluster state by adding the pin and triggers an asynchronous `Track` local RPC request. It returns without waiting for the result. While `ApplyTo` is running no other state updates may be applied so it is a critical code path. +* The RPC server handles the request to the `MapPinTracker` component and its `Track` method. It marks the local state of the pin as `pinning` and makes a local RPC request for the `IPFSHTTPConnector` component (`IPFSPin`) to pin it. +* The `IPFSHTTPConnector` component receives the request and uses the configured IPFS daemon API to send a pin request with the given hash. It waits until the call comes back and returns success or failure. +* The `MapPinTracker` component receives the result from the RPC request to the `IPFSHTTPConnector` and updates the local status of the pin to `pinned` or `pin_error`. + ## Legacy illustrations See: https://ipfs.io/ipfs/QmWhbV7KX9toZbi4ycj6J9GVbTVvzGx5ERffc6ymYLT5HS diff --git a/cluster.go b/cluster.go index fcfe787c..68bdb05a 100644 --- a/cluster.go +++ b/cluster.go @@ -2,6 +2,8 @@ package ipfscluster import ( "context" + "errors" + "fmt" "math/rand" "sync" "time" @@ -21,10 +23,11 @@ import ( type Cluster struct { ctx context.Context - config *Config - host host.Host - rpcServer *rpc.Server - rpcClient *rpc.Client + config *Config + host host.Host + rpcServer *rpc.Server + rpcClient *rpc.Client + peerManager *peerManager consensus *Consensus api API @@ -35,11 +38,17 @@ type Cluster struct { shutdownLock sync.Mutex shutdown bool shutdownCh chan struct{} + doneCh chan struct{} + readyCh chan struct{} wg sync.WaitGroup } -// NewCluster builds a new IPFS Cluster. It initializes a LibP2P host, creates -// and RPC Server and client and sets up all components. +// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host, +// creates and RPC Server and client and sets up all components. +// +// The new cluster peer may still be performing initialization tasks when +// this call returns (consensus may still be bootstrapping). Use Cluster.Ready() +// if you need to wait until the peer is fully up. func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker) (*Cluster, error) { ctx := context.Background() host, err := makeHost(ctx, cfg) @@ -47,52 +56,73 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P return nil, err } - rpcServer := rpc.NewServer(host, RPCProtocol) - rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer) - - logger.Infof("IPFS Cluster v%s - %s/ipfs/%s", Version, cfg.ClusterAddr, host.ID().Pretty()) - - consensus, err := NewConsensus(cfg, host, state) - if err != nil { - logger.Errorf("error creating consensus: %s", err) - return nil, err + logger.Infof("IPFS Cluster v%s listening on:", Version) + for _, addr := range host.Addrs() { + logger.Infof(" %s/ipfs/%s", addr, host.ID().Pretty()) } cluster := &Cluster{ ctx: ctx, config: cfg, host: host, - rpcServer: rpcServer, - rpcClient: rpcClient, - consensus: consensus, api: api, ipfs: ipfs, state: state, tracker: tracker, - shutdownCh: make(chan struct{}), + shutdownCh: make(chan struct{}, 1), + doneCh: make(chan struct{}, 1), + readyCh: make(chan struct{}, 1), } - err = rpcServer.RegisterName( - "Cluster", - &RPCAPI{cluster: cluster}) + // Setup peer manager + pm := newPeerManager(cluster) + cluster.peerManager = pm + err = pm.addFromConfig(cfg) if err != nil { + cluster.Shutdown() return nil, err } // Workaround for https://github.com/libp2p/go-libp2p-swarm/issues/15 cluster.openConns() - defer func() { - tracker.SetClient(rpcClient) - ipfs.SetClient(rpcClient) - api.SetClient(rpcClient) - consensus.SetClient(rpcClient) - }() + // Setup RPC + rpcServer := rpc.NewServer(host, RPCProtocol) + err = rpcServer.RegisterName("Cluster", &RPCAPI{cluster: cluster}) + if err != nil { + cluster.Shutdown() + return nil, err + } + cluster.rpcServer = rpcServer + + // Setup RPC client that components from this peer will use + rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer) + cluster.rpcClient = rpcClient + + // Setup Consensus + consensus, err := NewConsensus(cfg, host, state) + if err != nil { + logger.Errorf("error creating consensus: %s", err) + cluster.Shutdown() + return nil, err + } + cluster.consensus = consensus + + tracker.SetClient(rpcClient) + ipfs.SetClient(rpcClient) + api.SetClient(rpcClient) + consensus.SetClient(rpcClient) cluster.run() return cluster, nil } +// Ready returns a channel which signals when this peer is +// fully initialized (including consensus). +func (c *Cluster) Ready() <-chan struct{} { + return c.consensus.readyCh +} + // Shutdown stops the IPFS cluster components func (c *Cluster) Shutdown() error { c.shutdownLock.Lock() @@ -103,9 +133,11 @@ func (c *Cluster) Shutdown() error { } logger.Info("shutting down IPFS Cluster") - if err := c.consensus.Shutdown(); err != nil { - logger.Errorf("error stopping consensus: %s", err) - return err + if con := c.consensus; con != nil { + if err := con.Shutdown(); err != nil { + logger.Errorf("error stopping consensus: %s", err) + return err + } } if err := c.api.Shutdown(); err != nil { logger.Errorf("error stopping API: %s", err) @@ -123,9 +155,17 @@ func (c *Cluster) Shutdown() error { c.shutdownCh <- struct{}{} c.wg.Wait() c.host.Close() // Shutdown all network services + c.shutdown = true + close(c.doneCh) return nil } +// Done provides a way to learn if the Peer has been shutdown +// (for example, because it has been removed from the Cluster) +func (c *Cluster) Done() <-chan struct{} { + return c.doneCh +} + // ID returns information about the Cluster peer func (c *Cluster) ID() ID { // ignore error since it is included in response object @@ -140,6 +180,7 @@ func (c *Cluster) ID() ID { ID: c.host.ID(), PublicKey: c.host.Peerstore().PubKey(c.host.ID()), Addresses: addrs, + ClusterPeers: c.config.ClusterPeers, Version: Version, Commit: Commit, RPCProtocolVersion: RPCProtocol, @@ -147,6 +188,131 @@ func (c *Cluster) ID() ID { } } +// PeerAdd adds a new peer to this Cluster. +// +// The current peer will first attempt to contact the provided +// peer at the given multiaddress. If the connection is successful, +// the new peer, with the given multiaddress will be added to the +// cluster_peers and the configuration saved with the updated set. +// All other Cluster peers will be asked to do the same. +// +// Finally, the list of cluster peers is sent to the new +// peer, which will update its configuration and join the cluster. +// +// PeerAdd will fail if any of the peers is not reachable. +func (c *Cluster) PeerAdd(addr ma.Multiaddr) (ID, error) { + p, decapAddr, err := multiaddrSplit(addr) + if err != nil { + id := ID{ + Error: err.Error(), + } + return id, err + } + + // only add reachable nodes + err = c.host.Connect(c.ctx, peerstore.PeerInfo{ + ID: p, + Addrs: []ma.Multiaddr{decapAddr}, + }) + if err != nil { + err = fmt.Errorf("Peer unreachable. Aborting operation: %s", err) + id := ID{ + ID: p, + Error: err.Error(), + } + logger.Error(err) + return id, err + } + + // Find which local address we use to connect + conns := c.host.Network().ConnsToPeer(p) + if len(conns) == 0 { + err := errors.New("No connections to peer available") + logger.Error(err) + id := ID{ + ID: p, + Error: err.Error(), + } + + return id, err + } + pidMAddr, _ := ma.NewMultiaddr("/ipfs/" + c.host.ID().Pretty()) + localMAddr := conns[0].LocalMultiaddr().Encapsulate(pidMAddr) + + // Let all peer managers know they need to add this peer + peers := c.peerManager.peers() + replies := make([]peer.ID, len(peers), len(peers)) + errs := c.multiRPC(peers, "Cluster", "PeerManagerAddPeer", + MultiaddrToSerial(addr), copyPIDsToIfaces(replies)) + errorMsgs := "" + for i, err := range errs { + if err != nil { + logger.Error(err) + errorMsgs += fmt.Sprintf("%s: %s\n", + peers[i].Pretty(), + err.Error()) + } + } + if errorMsgs != "" { + logger.Error("There were errors adding peer. Trying to rollback the operation") + c.PeerRemove(p) + id := ID{ + ID: p, + Error: "Error adding peer: " + errorMsgs, + } + return id, errors.New(errorMsgs) + } + + // Inform the peer of the current cluster peers + clusterPeers := MultiaddrsToSerial(c.config.ClusterPeers) + clusterPeers = append(clusterPeers, MultiaddrToSerial(localMAddr)) + err = c.rpcClient.Call( + p, "Cluster", "PeerManagerAddFromMultiaddrs", + clusterPeers, &struct{}{}) + if err != nil { + logger.Errorf("Error sending back the list of peers: %s") + id := ID{ + ID: p, + Error: err.Error(), + } + return id, err + } + idSerial := ID{ + ID: p, + }.ToSerial() + err = c.rpcClient.Call( + p, "Cluster", "ID", struct{}{}, &idSerial) + + logger.Infof("peer %s has been added to the Cluster", addr) + return idSerial.ToID(), err +} + +// PeerRemove removes a peer from this Cluster. +// +// The peer will be removed from the consensus peer set, +// remove all cluster peers from its configuration and +// shut itself down. +func (c *Cluster) PeerRemove(p peer.ID) error { + peers := c.peerManager.peers() + replies := make([]struct{}, len(peers), len(peers)) + errs := c.multiRPC(peers, "Cluster", "PeerManagerRmPeer", + p, copyEmptyStructToIfaces(replies)) + errorMsgs := "" + for i, err := range errs { + if err != nil && peers[i] != p { + logger.Error(err) + errorMsgs += fmt.Sprintf("%s: %s\n", + peers[i].Pretty(), + err.Error()) + } + } + if errorMsgs != "" { + return errors.New(errorMsgs) + } + logger.Infof("peer %s has been removed from the Cluster", p.Pretty()) + return nil +} + // StateSync syncs the consensus state to the Pin Tracker, ensuring // that every Cid that should be tracked is tracked. It returns // PinInfo for Cids which were added or deleted. @@ -317,16 +483,13 @@ func (c *Cluster) Version() string { // Peers returns the IDs of the members of this Cluster func (c *Cluster) Peers() []ID { - members := c.peerList() + members := c.peerManager.peers() peersSerial := make([]IDSerial, len(members), len(members)) peers := make([]ID, len(members), len(members)) - ifaceReplies := make([]interface{}, len(members), len(members)) - for i := range peersSerial { - ifaceReplies[i] = &peersSerial[i] - } + errs := c.multiRPC(members, "Cluster", "ID", struct{}{}, + copyIDSerialsToIfaces(peersSerial)) - errs := c.multiRPC(members, "Cluster", "ID", struct{}{}, ifaceReplies) for i, err := range errs { if err != nil { peersSerial[i].ID = peer.IDB58Encode(members[i]) @@ -340,12 +503,8 @@ func (c *Cluster) Peers() []ID { return peers } -func (c *Cluster) peerList() []peer.ID { - return c.host.Peerstore().Peers() -} - -// run reads from the RPC channels of the different components and launches -// short-lived go-routines to handle any requests. +// run provides a cancellable context and waits for all components to be ready +// before signaling readyCh func (c *Cluster) run() { c.wg.Add(1) @@ -356,7 +515,16 @@ func (c *Cluster) run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() c.ctx = ctx - <-c.shutdownCh + + for { + select { + case <-c.shutdownCh: + return + case <-c.consensus.Ready(): + close(c.readyCh) + logger.Info("IPFS Cluster is ready") + } + } }() } @@ -374,26 +542,6 @@ func makeHost(ctx context.Context, cfg *Config) (host.Host, error) { return nil, err } - for _, addr := range cfg.ClusterPeers { - pid, err := addr.ValueForProtocol(ma.P_IPFS) - if err != nil { - return nil, err - } - - ipfs, _ := ma.NewMultiaddr("/ipfs/" + pid) - maddr := addr.Decapsulate(ipfs) - - peerID, err := peer.IDB58Decode(pid) - if err != nil { - return nil, err - } - - ps.AddAddrs( - peerID, - []ma.Multiaddr{maddr}, - peerstore.PermanentAddrTTL) - } - network, err := swarm.NewNetwork( ctx, []ma.Multiaddr{cfg.ClusterAddr}, @@ -410,7 +558,7 @@ func makeHost(ctx context.Context, cfg *Config) (host.Host, error) { return bhost, nil } -// Perform a sync rpc request to multiple destinations +// Perform an RPC request to multiple destinations func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args interface{}, reply []interface{}) []error { if len(dests) != len(reply) { panic("must have matching dests and replies") @@ -442,14 +590,10 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er PeerMap: make(map[peer.ID]PinInfo), } - members := c.peerList() + members := c.peerManager.peers() replies := make([]PinInfo, len(members), len(members)) - ifaceReplies := make([]interface{}, len(members), len(members)) - for i := range replies { - ifaceReplies[i] = &replies[i] - } args := NewCidArg(h) - errs := c.multiRPC(members, "Cluster", method, args, ifaceReplies) + errs := c.multiRPC(members, "Cluster", method, args, copyPinInfoToIfaces(replies)) for i, r := range replies { if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member @@ -476,13 +620,9 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) { var infos []GlobalPinInfo fullMap := make(map[string]GlobalPinInfo) - members := c.peerList() + members := c.peerManager.peers() replies := make([][]PinInfo, len(members), len(members)) - ifaceReplies := make([]interface{}, len(members), len(members)) - for i := range replies { - ifaceReplies[i] = &replies[i] - } - errs := c.multiRPC(members, "Cluster", method, struct{}{}, ifaceReplies) + errs := c.multiRPC(members, "Cluster", method, struct{}{}, copyPinInfoSliceToIfaces(replies)) mergePins := func(pins []PinInfo) { for _, p := range pins { diff --git a/cluster_test.go b/cluster_test.go index 929e2d9b..a662766c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -85,6 +85,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *MapState if err != nil { t.Fatal("cannot create cluster:", err) } + <-cl.Ready() return cl, api, ipfs, st, tracker } diff --git a/config.go b/config.go index 4dad9d27..006b34cd 100644 --- a/config.go +++ b/config.go @@ -5,13 +5,13 @@ import ( "encoding/json" "fmt" "io/ioutil" + "sync" "time" + hashiraft "github.com/hashicorp/raft" crypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" - - hashiraft "github.com/hashicorp/raft" ) // Default parameters for the configuration @@ -35,6 +35,7 @@ type Config struct { // List of multiaddresses of the peers of this cluster. ClusterPeers []ma.Multiaddr + pMux sync.Mutex // Listen parameters for the Cluster libp2p Host. Used by // the RPC and Consensus components. @@ -56,6 +57,10 @@ type Config struct { // Hashicorp's Raft configuration RaftConfig *hashiraft.Config + + // if a config has been loaded from disk, track the path + // so it can be saved to the same place. + path string } // JSONConfig represents a Cluster configuration as it will look when it is @@ -119,10 +124,12 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { } pKey := base64.StdEncoding.EncodeToString(pkeyBytes) + cfg.pMux.Lock() clusterPeers := make([]string, len(cfg.ClusterPeers), len(cfg.ClusterPeers)) for i := 0; i < len(cfg.ClusterPeers); i++ { clusterPeers[i] = cfg.ClusterPeers[i].String() } + cfg.pMux.Unlock() j = &JSONConfig{ ID: cfg.ID.Pretty(), @@ -195,6 +202,8 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { } raftCfg := hashiraft.DefaultConfig() + raftCfg.DisableBootstrapAfterElect = false + raftCfg.ShutdownOnRemove = false if jcfg.RaftConfig != nil { raftCfg.SnapshotInterval = time.Duration(jcfg.RaftConfig.SnapshotIntervalSeconds) * time.Second raftCfg.EnableSingleNode = jcfg.RaftConfig.EnableSingleNode @@ -233,6 +242,7 @@ func LoadConfig(path string) (*Config, error) { logger.Error("error parsing configuration: ", err) return nil, err } + cfg.path = path return cfg, nil } @@ -266,7 +276,9 @@ func NewDefaultConfig() (*Config, error) { } raftCfg := hashiraft.DefaultConfig() + raftCfg.DisableBootstrapAfterElect = false raftCfg.EnableSingleNode = true + raftCfg.ShutdownOnRemove = false clusterAddr, _ := ma.NewMultiaddr(DefaultClusterAddr) apiAddr, _ := ma.NewMultiaddr(DefaultAPIAddr) @@ -285,3 +297,45 @@ func NewDefaultConfig() (*Config, error) { RaftConfig: raftCfg, }, nil } + +func (cfg *Config) addPeer(addr ma.Multiaddr) { + cfg.pMux.Lock() + defer cfg.pMux.Unlock() + found := false + for _, cpeer := range cfg.ClusterPeers { + if cpeer.Equal(addr) { + found = true + } + } + if !found { + cfg.ClusterPeers = append(cfg.ClusterPeers, addr) + } + logger.Debugf("add: cluster peers are now: %s", cfg.ClusterPeers) +} + +func (cfg *Config) rmPeer(p peer.ID) { + cfg.pMux.Lock() + defer cfg.pMux.Unlock() + foundPos := -1 + for i, addr := range cfg.ClusterPeers { + cp, _, _ := multiaddrSplit(addr) + if cp == p { + foundPos = i + } + } + if foundPos < 0 { + return + } + + // Delete preserving order + copy(cfg.ClusterPeers[foundPos:], cfg.ClusterPeers[foundPos+1:]) + cfg.ClusterPeers[len(cfg.ClusterPeers)-1] = nil // or the zero value of T + cfg.ClusterPeers = cfg.ClusterPeers[:len(cfg.ClusterPeers)-1] + logger.Debugf("rm: cluster peers are now: %s", cfg.ClusterPeers) +} + +func (cfg *Config) emptyPeers() { + cfg.pMux.Lock() + defer cfg.pMux.Unlock() + cfg.ClusterPeers = []ma.Multiaddr{} +} diff --git a/config_test.go b/config_test.go index de7a1201..eec83cd9 100644 --- a/config_test.go +++ b/config_test.go @@ -1,5 +1,7 @@ package ipfscluster +import "testing" + func testingConfig() *Config { jcfg := &JSONConfig{ ID: "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA", @@ -18,3 +20,78 @@ func testingConfig() *Config { cfg, _ := jcfg.ToConfig() return cfg } + +func TestDefaultConfig(t *testing.T) { + _, err := NewDefaultConfig() + if err != nil { + t.Fatal(err) + } +} + +func TestConfigToJSON(t *testing.T) { + cfg, err := NewDefaultConfig() + if err != nil { + t.Fatal(err) + } + _, err = cfg.ToJSONConfig() + if err != nil { + t.Error(err) + } +} + +func TestConfigToConfig(t *testing.T) { + cfg, _ := NewDefaultConfig() + j, _ := cfg.ToJSONConfig() + _, err := j.ToConfig() + if err != nil { + t.Error(err) + } + + j.ID = "abc" + _, err = j.ToConfig() + if err == nil { + t.Error("expected error decoding ID") + } + + j, _ = cfg.ToJSONConfig() + j.PrivateKey = "abc" + _, err = j.ToConfig() + if err == nil { + t.Error("expected error parsing private key") + } + + j, _ = cfg.ToJSONConfig() + j.ClusterListenMultiaddress = "abc" + _, err = j.ToConfig() + if err == nil { + t.Error("expected error parsing cluster_listen_multiaddress") + } + + j, _ = cfg.ToJSONConfig() + j.APIListenMultiaddress = "abc" + _, err = j.ToConfig() + if err == nil { + t.Error("expected error parsing api_listen_multiaddress") + } + + j, _ = cfg.ToJSONConfig() + j.IPFSProxyListenMultiaddress = "abc" + _, err = j.ToConfig() + if err == nil { + t.Error("expected error parsing ipfs_proxy_listen_multiaddress") + } + + j, _ = cfg.ToJSONConfig() + j.IPFSNodeMultiaddress = "abc" + _, err = j.ToConfig() + if err == nil { + t.Error("expected error parsing ipfs_node_multiaddress") + } + + j, _ = cfg.ToJSONConfig() + j.ClusterPeers = []string{"abc"} + _, err = j.ToConfig() + if err == nil { + t.Error("expected error parsing cluster_peers") + } +} diff --git a/consensus.go b/consensus.go index 418c3b30..979bfaa0 100644 --- a/consensus.go +++ b/consensus.go @@ -111,6 +111,7 @@ type Consensus struct { rpcClient *rpc.Client rpcReady chan struct{} + readyCh chan struct{} shutdownLock sync.Mutex shutdown bool @@ -132,11 +133,12 @@ func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) cfg: cfg, host: host, baseOp: op, - shutdownCh: make(chan struct{}), + shutdownCh: make(chan struct{}, 1), rpcReady: make(chan struct{}, 1), + readyCh: make(chan struct{}, 1), } - logger.Infof("starting Consensus: waiting %d seconds for leader...", LeaderTimeout/time.Second) + logger.Infof("starting Consensus and waiting leader...") con, actor, wrapper, err := makeLibp2pRaft(cc.cfg, cc.host, state, cc.baseOp) if err != nil { @@ -147,26 +149,11 @@ func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) cc.consensus = con cc.p2pRaft = wrapper - // Wait for a leader - start := time.Now() - leader := peer.ID("") - for time.Since(start) < LeaderTimeout { - time.Sleep(500 * time.Millisecond) - leader, err = cc.Leader() - if err == nil { - break - } - } - if leader == "" { - return nil, errors.New("no leader was found after timeout") - } - - logger.Infof("Consensus leader found (%s). Syncing state...", leader.Pretty()) - cc.run(state) + cc.run() return cc, nil } -func (cc *Consensus) run(state State) { +func (cc *Consensus) run() { cc.wg.Add(1) go func() { defer cc.wg.Done() @@ -175,24 +162,13 @@ func (cc *Consensus) run(state State) { cc.ctx = ctx cc.baseOp.ctx = ctx - upToDate := make(chan struct{}) - go func() { - logger.Debug("consensus state is catching up") - time.Sleep(time.Second) - for { - lai := cc.p2pRaft.raft.AppliedIndex() - li := cc.p2pRaft.raft.LastIndex() - logger.Debugf("current Raft index: %d/%d", - lai, li) - if lai == li { - upToDate <- struct{}{} - break - } - time.Sleep(500 * time.Millisecond) - } - }() + leader, err := cc.waitForLeader() + if err != nil { + return + } + logger.Infof("Consensus leader found (%s). Syncing state...", leader.Pretty()) - <-upToDate + cc.waitForUpdates() logger.Info("Consensus state is up to date") // While rpc is not ready we cannot perform a sync @@ -200,7 +176,7 @@ func (cc *Consensus) run(state State) { var pInfo []PinInfo - _, err := cc.State() + _, err = cc.State() // only check sync if we have a state // avoid error on new running clusters if err != nil { @@ -214,11 +190,80 @@ func (cc *Consensus) run(state State) { &pInfo, nil) } - logger.Infof("IPFS Cluster is running") + cc.readyCh <- struct{}{} + logger.Debug("consensus ready") <-cc.shutdownCh }() } +// waits until there is a raft leader +func (cc *Consensus) waitForLeader() (peer.ID, error) { + // Wait for a leader + leader := peer.ID("") + var err error + rounds := 0 + for { + select { + case <-cc.ctx.Done(): + return "", errors.New("shutdown") + default: + if rounds%20 == 0 { //every 10 secs + logger.Info("Consensus is waiting for a leader...") + } + rounds++ + time.Sleep(500 * time.Millisecond) + leader, err = cc.Leader() + if err == nil && leader != "" { + return leader, nil + } + } + } + return leader, nil +} + +// waits until the appliedIndex is the same as the lastIndex +func (cc *Consensus) waitForUpdates() { + // Wait for state catch up + logger.Debug("consensus state is catching up") + time.Sleep(time.Second) + for { + select { + case <-cc.ctx.Done(): + return + default: + lai := cc.p2pRaft.raft.AppliedIndex() + li := cc.p2pRaft.raft.LastIndex() + logger.Debugf("current Raft index: %d/%d", + lai, li) + if lai == li { + return + } + time.Sleep(500 * time.Millisecond) + } + + } +} + +// raft stores peer add/rm operations. This is how to force a peer set. +func (cc *Consensus) setPeers() { + logger.Debug("forcefully setting Raft peers to known set") + var peersStr []string + var peers []peer.ID + err := cc.rpcClient.Call("", + "Cluster", + "PeerManagerPeers", + struct{}{}, + &peers) + if err != nil { + logger.Error(err) + return + } + for _, p := range peers { + peersStr = append(peersStr, p.Pretty()) + } + cc.p2pRaft.raft.SetPeers(peersStr) +} + // Shutdown stops the component so it will not process any // more updates. The underlying consensus is permanently // shutdown, along with the libp2p transport. @@ -244,16 +289,13 @@ func (cc *Consensus) Shutdown() error { if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") { errMsgs += "could not take snapshot: " + err.Error() + ".\n" } + f = cc.p2pRaft.raft.Shutdown() err = f.Error() if err != nil { errMsgs += "could not shutdown raft: " + err.Error() + ".\n" } - // BUG(hector): See go-libp2p-raft#16 - // err = cc.p2pRaft.transport.Close() - // if err != nil { - // errMsgs += "could not close libp2p transport: " + err.Error() + ".\n" - // } + err = cc.p2pRaft.boltdb.Close() // important! if err != nil { errMsgs += "could not close boltdb: " + err.Error() + ".\n" @@ -276,6 +318,12 @@ func (cc *Consensus) SetClient(c *rpc.Client) { cc.rpcReady <- struct{}{} } +// Ready returns a channel which is signaled when the Consensus +// algorithm has finished bootstrapping and is ready to use +func (cc *Consensus) Ready() <-chan struct{} { + return cc.readyCh +} + func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { return &clusterLogOp{ Cid: c.String(), @@ -284,27 +332,28 @@ func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { } // returns true if the operation was redirected to the leader -func (cc *Consensus) redirectToLeader(method string, c *cid.Cid) (bool, error) { +func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) { leader, err := cc.Leader() if err != nil { return false, err } - if leader != cc.host.ID() { - err = cc.rpcClient.Call( - leader, - "Cluster", - method, - NewCidArg(c), - &struct{}{}) - return true, err + if leader == cc.host.ID() { + return false, nil } - return false, nil + + err = cc.rpcClient.Call( + leader, + "Cluster", + method, + arg, + &struct{}{}) + return true, err } // LogPin submits a Cid to the shared state of the cluster. It will forward // the operation to the leader if this is not it. func (cc *Consensus) LogPin(c *cid.Cid) error { - redirected, err := cc.redirectToLeader("ConsensusLogPin", c) + redirected, err := cc.redirectToLeader("ConsensusLogPin", NewCidArg(c)) if err != nil || redirected { return err } @@ -324,7 +373,7 @@ func (cc *Consensus) LogPin(c *cid.Cid) error { // LogUnpin removes a Cid from the shared state of the cluster. func (cc *Consensus) LogUnpin(c *cid.Cid) error { - redirected, err := cc.redirectToLeader("ConsensusLogUnpin", c) + redirected, err := cc.redirectToLeader("ConsensusLogUnpin", NewCidArg(c)) if err != nil || redirected { return err } @@ -341,6 +390,30 @@ func (cc *Consensus) LogUnpin(c *cid.Cid) error { return nil } +// AddPeer attempts to add a peer to the consensus. +func (cc *Consensus) AddPeer(p peer.ID) error { + //redirected, err := cc.redirectToLeader("ConsensusAddPeer", p) + //if err != nil || redirected { + // return err + // } + // We are the leader + future := cc.p2pRaft.raft.AddPeer(peer.IDB58Encode(p)) + err := future.Error() + return err +} + +// RemovePeer attempts to remove a peer from the consensus. +func (cc *Consensus) RemovePeer(p peer.ID) error { + //redirected, err := cc.redirectToLeader("ConsensusRmPeer", p) + //if err != nil || redirected { + // return err + //} + + future := cc.p2pRaft.raft.RemovePeer(peer.IDB58Encode(p)) + err := future.Error() + return err +} + // State retrieves the current consensus State. It may error // if no State has been agreed upon or the state is not // consistent. The returned State is the last agreed-upon diff --git a/consensus_test.go b/consensus_test.go index 7ff81dcd..664c3eb9 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -97,6 +97,7 @@ func testingConsensus(t *testing.T) *Consensus { t.Fatal("cannot create Consensus:", err) } cc.SetClient(mockRPCClient(t)) + <-cc.Ready() return cc } diff --git a/ipfs-cluster-ctl/main.go b/ipfs-cluster-ctl/main.go index b6197f4b..9f87bbc1 100644 --- a/ipfs-cluster-ctl/main.go +++ b/ipfs-cluster-ctl/main.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "os" @@ -13,6 +14,8 @@ import ( cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" + peer "github.com/libp2p/go-libp2p-peer" + ma "github.com/multiformats/go-multiaddr" cli "github.com/urfave/cli" ) @@ -56,6 +59,10 @@ type errorResp struct { Message string `json:"message"` } +type peerAddBody struct { + Addr string `json:"peer_multiaddress"` +} + func out(m string, a ...interface{}) { fmt.Fprintf(os.Stderr, m, a...) } @@ -114,7 +121,7 @@ func main() { This command will print out information about the cluster peer used `, Action: func(c *cli.Context) error { - resp := request("GET", "/id") + resp := request("GET", "/id", nil) formatResponse(resp) return nil }, @@ -129,8 +136,55 @@ This command can be used to list and manage IPFS Cluster peers. { Name: "ls", Usage: "list the nodes participating in the IPFS Cluster", + UsageText: ` +This commands provides a list of the ID information of all the peers in the Cluster. +`, Action: func(c *cli.Context) error { - resp := request("GET", "/peers") + resp := request("GET", "/peers", nil) + formatResponse(resp) + return nil + }, + }, + { + Name: "add", + Usage: "add a peer to the Cluster", + UsageText: ` +This command adds a new peer to the cluster. In order for the operation to +succeed, the new peer needs to be reachable and any other member of the cluster +should be online. The operation returns the ID information for the new peer. +`, + ArgsUsage: "", + Action: func(c *cli.Context) error { + addr := c.Args().First() + if addr == "" { + return cli.NewExitError("Error: a multiaddress argument is needed", 1) + } + _, err := ma.NewMultiaddr(addr) + checkErr("parsing multiaddress", err) + addBody := peerAddBody{addr} + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.Encode(addBody) + resp := request("POST", "/peers", &buf) + formatResponse(resp) + return nil + }, + }, + { + Name: "rm", + Usage: "remove a peer from the Cluster", + UsageText: ` +This command removes a peer from the cluster. If the peer is online, it will +automatically shut down. All other cluster peers should be online for the +operation to succeed, otherwise some nodes may be left with an outdated list of +cluster peers. +`, + ArgsUsage: "", + Action: func(c *cli.Context) error { + pid := c.Args().First() + _, err := peer.IDB58Decode(pid) + checkErr("parsing peer ID", err) + resp := request("DELETE", "/peers/"+pid, nil) formatResponse(resp) return nil }, @@ -161,9 +215,9 @@ in the cluster and should be part of the list offered by "pin ls". cidStr := c.Args().First() _, err := cid.Decode(cidStr) checkErr("parsing cid", err) - request("POST", "/pins/"+cidStr) + request("POST", "/pins/"+cidStr, nil) time.Sleep(500 * time.Millisecond) - resp := request("GET", "/pins/"+cidStr) + resp := request("GET", "/pins/"+cidStr, nil) formatResponse(resp) return nil }, @@ -184,9 +238,9 @@ although unpinning operations in the cluster may take longer or fail. cidStr := c.Args().First() _, err := cid.Decode(cidStr) checkErr("parsing cid", err) - request("DELETE", "/pins/"+cidStr) + request("DELETE", "/pins/"+cidStr, nil) time.Sleep(500 * time.Millisecond) - resp := request("GET", "/pins/"+cidStr) + resp := request("GET", "/pins/"+cidStr, nil) formatResponse(resp) return nil }, @@ -201,7 +255,7 @@ merely represents the list of pins which are part of the global state of the cluster. For specific information, use "status". `, Action: func(c *cli.Context) error { - resp := request("GET", "/pinlist") + resp := request("GET", "/pinlist", nil) formatResponse(resp) return nil }, @@ -227,7 +281,7 @@ with "sync". _, err := cid.Decode(cidStr) checkErr("parsing cid", err) } - resp := request("GET", "/pins/"+cidStr) + resp := request("GET", "/pins/"+cidStr, nil) formatResponse(resp) return nil }, @@ -254,9 +308,9 @@ CIDs in error state may be manually recovered with "recover". if cidStr != "" { _, err := cid.Decode(cidStr) checkErr("parsing cid", err) - resp = request("POST", "/pins/"+cidStr+"/sync") + resp = request("POST", "/pins/"+cidStr+"/sync", nil) } else { - resp = request("POST", "/pins/sync") + resp = request("POST", "/pins/sync", nil) } formatResponse(resp) return nil @@ -279,7 +333,7 @@ of the item upon completion. if cidStr != "" { _, err := cid.Decode(cidStr) checkErr("parsing cid", err) - resp = request("POST", "/pins/"+cidStr+"/recover") + resp = request("POST", "/pins/"+cidStr+"/recover", nil) formatResponse(resp) } else { @@ -296,7 +350,7 @@ This command retrieves the IPFS Cluster version and can be used to check that it matches the CLI version (shown by -v). `, Action: func(c *cli.Context) error { - resp := request("GET", "/version") + resp := request("GET", "/version", nil) formatResponse(resp) return nil }, @@ -306,7 +360,7 @@ to check that it matches the CLI version (shown by -v). app.Run(os.Args) } -func request(method, path string, args ...string) *http.Response { +func request(method, path string, body io.Reader, args ...string) *http.Response { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(defaultTimeout)*time.Second) defer cancel() @@ -321,7 +375,7 @@ func request(method, path string, args ...string) *http.Response { logger.Debugf("%s: %s", method, u) - r, err := http.NewRequest(method, u, nil) + r, err := http.NewRequest(method, u, body) checkErr("creating request", err) r.WithContext(ctx) @@ -345,6 +399,8 @@ func formatResponse(r *http.Response) { out("Error %d: %s", e.Code, e.Message) } else if r.StatusCode == http.StatusAccepted { out("%s", "request accepted") + } else if r.StatusCode == http.StatusNoContent { + out("%s", "Request succeeded\n") } else { var resp interface{} err = json.Unmarshal(body, &resp) diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 8524b9b8..bb54eeb6 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -33,7 +33,7 @@ using LibP2P. %s needs a valid configuration to run. This configuration is independent from IPFS and includes its own LibP2P key-pair. It can be -initialized with --init and its default location is +initialized with "init" and its default location is ~/%s/%s. For feedback, bug reports or any additional information, visit @@ -44,6 +44,8 @@ https://github.com/ipfs/ipfs-cluster. DefaultPath, DefaultConfigFile) +var logger = logging.Logger("service") + // Default location for the configurations and data var ( // DefaultPath is initialized to something like ~/.ipfs-cluster/service.json @@ -84,10 +86,6 @@ func checkErr(doing string, err error) { } func main() { - // Catch SIGINT as a way to exit - signalChan := make(chan os.Signal) - signal.Notify(signalChan, os.Interrupt) - app := cli.NewApp() app.Name = programName app.Usage = "IPFS Cluster node" @@ -95,8 +93,9 @@ func main() { app.Version = ipfscluster.Version app.Flags = []cli.Flag{ cli.BoolFlag{ - Name: "init", - Usage: "create a default configuration and exit", + Name: "init", + Usage: "create a default configuration and exit", + Hidden: true, }, cli.StringFlag{ Name: "config, c", @@ -106,7 +105,7 @@ func main() { }, cli.BoolFlag{ Name: "force, f", - Usage: "force configuration overwrite when running --init", + Usage: "force configuration overwrite when running 'init'", }, cli.BoolFlag{ Name: "debug, d", @@ -171,26 +170,38 @@ func main() { tracker) checkErr("starting cluster", err) - // Wait until we are told to exit by a signal - <-signalChan - err = cluster.Shutdown() - checkErr("shutting down cluster", err) - return nil + signalChan := make(chan os.Signal) + signal.Notify(signalChan, os.Interrupt) + + for { + select { + case <-signalChan: + err = cluster.Shutdown() + checkErr("shutting down cluster", err) + return nil + case <-cluster.Done(): + return nil + case <-cluster.Ready(): + logger.Info("IPFS Cluster is ready") + } + } } app.Run(os.Args) } func setupLogging(lvl string) { + logging.SetLogLevel("service", lvl) logging.SetLogLevel("cluster", lvl) + //logging.SetLogLevel("raft", lvl) } func setupDebug() { logging.SetLogLevel("cluster", "debug") - logging.SetLogLevel("libp2p-raft", "debug") + //logging.SetLogLevel("libp2p-raft", "debug") logging.SetLogLevel("p2p-gorpc", "debug") //logging.SetLogLevel("swarm2", "debug") - ipfscluster.SilentRaft = false + logging.SetLogLevel("raft", "debug") } func initConfig(force bool) { @@ -201,7 +212,7 @@ func initConfig(force bool) { cfg, err := ipfscluster.NewDefaultConfig() checkErr("creating default configuration", err) cfg.ConsensusDataFolder = dataPath - err = os.MkdirAll(DefaultPath, 0700) + err = os.MkdirAll(filepath.Dir(configPath), 0700) err = cfg.Save(configPath) checkErr("saving new configuration", err) out("%s configuration written to %s\n", diff --git a/ipfscluster.go b/ipfscluster.go index f8263de0..725c35b6 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -11,40 +11,17 @@ package ipfscluster import ( "time" - crypto "github.com/libp2p/go-libp2p-crypto" - rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" + crypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" ma "github.com/multiformats/go-multiaddr" ) -var logger = logging.Logger("cluster") - // RPCProtocol is used to send libp2p messages between cluster peers var RPCProtocol = protocol.ID("/ipfscluster/" + Version + "/rpc") -// SilentRaft controls whether all Raft log messages are discarded. -var SilentRaft = true - -// SetLogLevel sets the level in the logs -func SetLogLevel(l string) { - /* - CRITICAL Level = iota - ERROR - WARNING - NOTICE - INFO - DEBUG - */ - logging.SetLogLevel("cluster", l) - //logging.SetLogLevel("p2p-gorpc", l) - //logging.SetLogLevel("swarm2", l) - //logging.SetLogLevel("libp2p-raft", l) -} - // TrackerStatus values const ( // IPFSStatus should never take this value @@ -163,7 +140,7 @@ type IPFSConnector interface { type Peered interface { AddPeer(p peer.ID) RmPeer(p peer.ID) - SetPeers(peers []peer.ID) + //SetPeers(peers []peer.ID) } // State represents the shared state of the cluster and it @@ -215,19 +192,15 @@ type IPFSID struct { // IPFSIDSerial is the serializable IPFSID for RPC requests type IPFSIDSerial struct { ID string - Addresses [][]byte + Addresses MultiaddrsSerial Error string } // ToSerial converts IPFSID to a go serializable object func (id *IPFSID) ToSerial() IPFSIDSerial { - mAddrsB := make([][]byte, len(id.Addresses), len(id.Addresses)) - for i, a := range id.Addresses { - mAddrsB[i] = a.Bytes() - } return IPFSIDSerial{ ID: peer.IDB58Encode(id.ID), - Addresses: mAddrsB, + Addresses: MultiaddrsToSerial(id.Addresses), Error: id.Error, } } @@ -239,12 +212,7 @@ func (ids *IPFSIDSerial) ToID() IPFSID { if pID, err := peer.IDB58Decode(ids.ID); err == nil { id.ID = pID } - id.Addresses = make([]ma.Multiaddr, len(ids.Addresses), len(ids.Addresses)) - for i, mAddrB := range ids.Addresses { - if mAddr, err := ma.NewMultiaddrBytes(mAddrB); err == nil { - id.Addresses[i] = mAddr - } - } + id.Addresses = ids.Addresses.ToMultiaddrs() id.Error = ids.Error return id } @@ -254,6 +222,7 @@ type ID struct { ID peer.ID PublicKey crypto.PubKey Addresses []ma.Multiaddr + ClusterPeers []ma.Multiaddr Version string Commit string RPCProtocolVersion protocol.ID @@ -265,7 +234,8 @@ type ID struct { type IDSerial struct { ID string PublicKey []byte - Addresses [][]byte + Addresses MultiaddrsSerial + ClusterPeers MultiaddrsSerial Version string Commit string RPCProtocolVersion string @@ -275,15 +245,16 @@ type IDSerial struct { // ToSerial converts an ID to its Go-serializable version func (id ID) ToSerial() IDSerial { - mAddrsB := make([][]byte, len(id.Addresses), len(id.Addresses)) - for i, a := range id.Addresses { - mAddrsB[i] = a.Bytes() + var pkey []byte + if id.PublicKey != nil { + pkey, _ = id.PublicKey.Bytes() } - pkey, _ := id.PublicKey.Bytes() + return IDSerial{ ID: peer.IDB58Encode(id.ID), PublicKey: pkey, - Addresses: mAddrsB, + Addresses: MultiaddrsToSerial(id.Addresses), + ClusterPeers: MultiaddrsToSerial(id.ClusterPeers), Version: id.Version, Commit: id.Commit, RPCProtocolVersion: string(id.RPCProtocolVersion), @@ -302,12 +273,9 @@ func (ids IDSerial) ToID() ID { if pkey, err := crypto.UnmarshalPublicKey(ids.PublicKey); err == nil { id.PublicKey = pkey } - id.Addresses = make([]ma.Multiaddr, len(ids.Addresses), len(ids.Addresses)) - for i, mAddrB := range ids.Addresses { - if mAddr, err := ma.NewMultiaddrBytes(mAddrB); err == nil { - id.Addresses[i] = mAddr - } - } + + id.Addresses = ids.Addresses.ToMultiaddrs() + id.ClusterPeers = ids.ClusterPeers.ToMultiaddrs() id.Version = ids.Version id.Commit = ids.Commit id.RPCProtocolVersion = protocol.ID(ids.RPCProtocolVersion) @@ -315,3 +283,40 @@ func (ids IDSerial) ToID() ID { id.IPFS = ids.IPFS.ToID() return id } + +// MultiaddrSerial is a Multiaddress in a serializable form +type MultiaddrSerial []byte + +// MultiaddrsSerial is an array of Multiaddresses in serializable form +type MultiaddrsSerial []MultiaddrSerial + +// MultiaddrToSerial converts a Multiaddress to its serializable form +func MultiaddrToSerial(addr ma.Multiaddr) MultiaddrSerial { + return addr.Bytes() +} + +// ToMultiaddr converts a serializable Multiaddress to its original type. +// All errors are ignored. +func (addrS MultiaddrSerial) ToMultiaddr() ma.Multiaddr { + a, _ := ma.NewMultiaddrBytes(addrS) + return a +} + +// MultiaddrsToSerial converts a slice of Multiaddresses to its +// serializable form. +func MultiaddrsToSerial(addrs []ma.Multiaddr) MultiaddrsSerial { + addrsS := make([]MultiaddrSerial, len(addrs), len(addrs)) + for i, a := range addrs { + addrsS[i] = MultiaddrToSerial(a) + } + return addrsS +} + +// ToMultiaddrs converts MultiaddrsSerial back to a slice of Multiaddresses +func (addrsS MultiaddrsSerial) ToMultiaddrs() []ma.Multiaddr { + addrs := make([]ma.Multiaddr, len(addrsS), len(addrsS)) + for i, addrS := range addrsS { + addrs[i] = addrS.ToMultiaddr() + } + return addrs +} diff --git a/ipfscluster_test.go b/ipfscluster_test.go index d19cdd04..e0da2c31 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -56,68 +56,86 @@ func randomBytes() []byte { return bs } +func createComponents(t *testing.T, i int) (*Config, *RESTAPI, *IPFSHTTPConnector, *MapState, *MapPinTracker, *ipfsMock) { + mock := newIpfsMock() + clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i)) + apiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", apiPort+i)) + proxyAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsProxyPort+i)) + nodeAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.addr, mock.port)) + priv, pub, err := crypto.GenerateKeyPair(crypto.RSA, 2048) + checkErr(t, err) + pid, err := peer.IDFromPublicKey(pub) + checkErr(t, err) + + cfg, _ := NewDefaultConfig() + cfg.ID = pid + cfg.PrivateKey = priv + cfg.ClusterPeers = []ma.Multiaddr{} + cfg.ClusterAddr = clusterAddr + cfg.APIAddr = apiAddr + cfg.IPFSProxyAddr = proxyAddr + cfg.IPFSNodeAddr = nodeAddr + cfg.ConsensusDataFolder = "./e2eTestRaft/" + pid.Pretty() + + api, err := NewRESTAPI(cfg) + checkErr(t, err) + ipfs, err := NewIPFSHTTPConnector(cfg) + checkErr(t, err) + state := NewMapState() + tracker := NewMapPinTracker(cfg) + + return cfg, api, ipfs, state, tracker, mock +} + +func createCluster(t *testing.T, cfg *Config, api *RESTAPI, ipfs *IPFSHTTPConnector, state *MapState, tracker *MapPinTracker) *Cluster { + cl, err := NewCluster(cfg, api, ipfs, state, tracker) + checkErr(t, err) + <-cl.Ready() + return cl +} + +func createOnePeerCluster(t *testing.T, nth int) (*Cluster, *ipfsMock) { + cfg, api, ipfs, state, tracker, mock := createComponents(t, nth) + cl := createCluster(t, cfg, api, ipfs, state, tracker) + return cl, mock +} + func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { os.RemoveAll("./e2eTestRaft") - ipfsMocks := make([]*ipfsMock, 0, nClusters) - clusters := make([]*Cluster, 0, nClusters) - cfgs := make([]*Config, 0, nClusters) + cfgs := make([]*Config, nClusters, nClusters) + apis := make([]*RESTAPI, nClusters, nClusters) + ipfss := make([]*IPFSHTTPConnector, nClusters, nClusters) + states := make([]*MapState, nClusters, nClusters) + trackers := make([]*MapPinTracker, nClusters, nClusters) + ipfsMocks := make([]*ipfsMock, nClusters, nClusters) + clusters := make([]*Cluster, nClusters, nClusters) - type peerInfo struct { - pid peer.ID - priv crypto.PrivKey - } - peers := make([]peerInfo, 0, nClusters) - clusterpeers := make([]ma.Multiaddr, 0, nClusters) - - // Generate keys and ids + clusterPeers := make([]ma.Multiaddr, nClusters, nClusters) for i := 0; i < nClusters; i++ { - priv, pub, err := crypto.GenerateKeyPair(crypto.RSA, 2048) - checkErr(t, err) - pid, err := peer.IDFromPublicKey(pub) - checkErr(t, err) - maddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s", + cfg, api, ipfs, state, tracker, mock := createComponents(t, i) + cfgs[i] = cfg + apis[i] = api + ipfss[i] = ipfs + states[i] = state + trackers[i] = tracker + ipfsMocks[i] = mock + addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s", clusterPort+i, - pid.Pretty())) - peers = append(peers, peerInfo{pid, priv}) - clusterpeers = append(clusterpeers, maddr) - //t.Log(ma) + cfg.ID.Pretty())) + clusterPeers[i] = addr } - - // Generate nClusters configs for i := 0; i < nClusters; i++ { - mock := newIpfsMock() - ipfsMocks = append(ipfsMocks, mock) - clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i)) - apiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", apiPort+i)) - proxyAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsProxyPort+i)) - nodeAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.addr, mock.port)) - - cfgs = append(cfgs, &Config{ - ID: peers[i].pid, - PrivateKey: peers[i].priv, - ClusterPeers: clusterpeers, - ClusterAddr: clusterAddr, - APIAddr: apiAddr, - IPFSProxyAddr: proxyAddr, - IPFSNodeAddr: nodeAddr, - ConsensusDataFolder: "./e2eTestRaft/" + peers[i].pid.Pretty(), - }) + cfgs[i].ClusterPeers = make([]ma.Multiaddr, nClusters, nClusters) + for j := 0; j < nClusters; j++ { + cfgs[i].ClusterPeers[j] = clusterPeers[j] + } } var wg sync.WaitGroup for i := 0; i < nClusters; i++ { wg.Add(1) go func(i int) { - api, err := NewRESTAPI(cfgs[i]) - checkErr(t, err) - ipfs, err := NewIPFSHTTPConnector(cfgs[i]) - checkErr(t, err) - state := NewMapState() - tracker := NewMapPinTracker(cfgs[i]) - - cl, err := NewCluster(cfgs[i], api, ipfs, state, tracker) - checkErr(t, err) - clusters = append(clusters, cl) + clusters[i] = createCluster(t, cfgs[i], apis[i], ipfss[i], states[i], trackers[i]) wg.Done() }(i) } @@ -549,3 +567,19 @@ func TestClustersRecover(t *testing.T) { } } } + +func TestClustersShutdown(t *testing.T) { + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + + f := func(t *testing.T, c *Cluster) { + err := c.Shutdown() + if err != nil { + t.Error("should be able to shutdown cleanly") + } + } + // Shutdown 3 times + runF(t, clusters, f) + runF(t, clusters, f) + runF(t, clusters, f) +} diff --git a/logging.go b/logging.go new file mode 100644 index 00000000..063f04f9 --- /dev/null +++ b/logging.go @@ -0,0 +1,62 @@ +package ipfscluster + +import ( + "bufio" + "bytes" + "log" + "strings" + "time" + + logging "github.com/ipfs/go-log" +) + +var logger = logging.Logger("cluster") +var raftStdLogger = makeRaftLogger() +var raftLogger = logging.Logger("raft") + +// SetLogLevel sets the level in the logs +func SetLogLevel(l string) { + /* + CRITICAL Level = iota + ERROR + WARNING + NOTICE + INFO + DEBUG + */ + logging.SetLogLevel("cluster", l) + //logging.SetLogLevel("p2p-gorpc", l) + //logging.SetLogLevel("swarm2", l) + //logging.SetLogLevel("libp2p-raft", l) +} + +// This redirects Raft output to our logger +func makeRaftLogger() *log.Logger { + var buf bytes.Buffer + rLogger := log.New(&buf, "", 0) + reader := bufio.NewReader(&buf) + go func() { + for { + t, err := reader.ReadString('\n') + if err != nil { + time.Sleep(time.Second) + continue + } + t = strings.TrimSuffix(t, "\n") + + switch { + case strings.Contains(t, "[DEBUG]"): + raftLogger.Debug(strings.TrimPrefix(t, "[DEBUG] raft: ")) + case strings.Contains(t, "[WARN]"): + raftLogger.Warning(strings.TrimPrefix(t, "[WARN] raft: ")) + case strings.Contains(t, "[ERR]"): + raftLogger.Error(strings.TrimPrefix(t, "[ERR] raft: ")) + case strings.Contains(t, "[INFO]"): + raftLogger.Info(strings.TrimPrefix(t, "[INFO] raft: ")) + default: + raftLogger.Debug(t) + } + } + }() + return rLogger +} diff --git a/map_pin_tracker.go b/map_pin_tracker.go index 4584a714..d103f00c 100644 --- a/map_pin_tracker.go +++ b/map_pin_tracker.go @@ -53,7 +53,7 @@ func NewMapPinTracker(cfg *Config) *MapPinTracker { status: make(map[string]PinInfo), rpcReady: make(chan struct{}, 1), peerID: cfg.ID, - shutdownCh: make(chan struct{}), + shutdownCh: make(chan struct{}, 1), } mpt.run() return mpt @@ -85,6 +85,7 @@ func (mpt *MapPinTracker) Shutdown() error { } logger.Info("stopping MapPinTracker") + close(mpt.rpcReady) mpt.shutdownCh <- struct{}{} mpt.wg.Wait() mpt.shutdown = true diff --git a/peer_manager.go b/peer_manager.go new file mode 100644 index 00000000..e2ced91b --- /dev/null +++ b/peer_manager.go @@ -0,0 +1,150 @@ +package ipfscluster + +import ( + "os" + "path/filepath" + "sync" + "time" + + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" +) + +type peerManager struct { + cluster *Cluster + + peerSetMux sync.RWMutex + peerSet map[peer.ID]struct{} +} + +func newPeerManager(c *Cluster) *peerManager { + pm := &peerManager{ + cluster: c, + } + pm.resetPeerSet() + return pm +} + +func (pm *peerManager) addPeer(addr ma.Multiaddr) (peer.ID, error) { + logger.Debugf("adding peer %s", addr) + + peerID, decapAddr, err := multiaddrSplit(addr) + if err != nil { + return peerID, err + } + + pm.peerSetMux.RLock() + _, ok := pm.peerSet[peerID] + pm.peerSetMux.RUnlock() + + if ok { + logger.Debugf("%s is already a peer", peerID) + return peerID, nil + } + + pm.peerSetMux.Lock() + pm.peerSet[peerID] = struct{}{} + pm.peerSetMux.Unlock() + pm.cluster.host.Peerstore().AddAddr(peerID, decapAddr, peerstore.PermanentAddrTTL) + pm.cluster.config.addPeer(addr) + if con := pm.cluster.consensus; con != nil { + pm.cluster.consensus.AddPeer(peerID) + } + if path := pm.cluster.config.path; path != "" { + err := pm.cluster.config.Save(path) + if err != nil { + logger.Error(err) + } + } + return peerID, nil +} + +func (pm *peerManager) rmPeer(p peer.ID) error { + logger.Debugf("removing peer %s", p.Pretty()) + pm.peerSetMux.RLock() + _, ok := pm.peerSet[p] + pm.peerSetMux.RUnlock() + if !ok { + return nil + } + pm.peerSetMux.Lock() + delete(pm.peerSet, p) + pm.peerSetMux.Unlock() + pm.cluster.host.Peerstore().ClearAddrs(p) + pm.cluster.config.rmPeer(p) + pm.cluster.consensus.RemovePeer(p) + + // It's ourselves. This is not very graceful + if p == pm.cluster.host.ID() { + logger.Warning("this peer has been removed from the Cluster and will shutdown itself") + pm.cluster.config.emptyPeers() + defer func() { + go func() { + time.Sleep(time.Second) + pm.cluster.consensus.Shutdown() + pm.selfShutdown() + }() + }() + } + + if path := pm.cluster.config.path; path != "" { + pm.cluster.config.Save(path) + } + + return nil +} + +func (pm *peerManager) selfShutdown() { + err := pm.cluster.Shutdown() + if err == nil { + // If the shutdown worked correctly + // (including snapshot) we can remove the Raft + // database (which traces peers additions + // and removals). It makes re-start of the peer + // way less confusing for Raft while the state + // kept in the snapshot. + os.Remove(filepath.Join(pm.cluster.config.ConsensusDataFolder, "raft.db")) + } +} + +// empty the peerset and add ourselves only +func (pm *peerManager) resetPeerSet() { + pm.peerSetMux.Lock() + defer pm.peerSetMux.Unlock() + pm.peerSet = make(map[peer.ID]struct{}) + pm.peerSet[pm.cluster.host.ID()] = struct{}{} +} + +func (pm *peerManager) peers() []peer.ID { + pm.peerSetMux.RLock() + defer pm.peerSetMux.RUnlock() + var pList []peer.ID + for k := range pm.peerSet { + pList = append(pList, k) + } + return pList +} + +func (pm *peerManager) addFromConfig(cfg *Config) error { + return pm.addFromMultiaddrs(cfg.ClusterPeers) +} + +func (pm *peerManager) addFromMultiaddrs(mAddrIDs []ma.Multiaddr) error { + pm.resetPeerSet() + pm.cluster.config.emptyPeers() + if len(mAddrIDs) > 0 { + logger.Info("adding Cluster peers:") + } else { + logger.Info("This is a single-node cluster") + } + + for _, m := range mAddrIDs { + _, err := pm.addPeer(m) + if err != nil { + return err + } + logger.Infof(" - %s", m.String()) + } + return nil +} diff --git a/peer_manager_test.go b/peer_manager_test.go new file mode 100644 index 00000000..5bef4e27 --- /dev/null +++ b/peer_manager_test.go @@ -0,0 +1,168 @@ +package ipfscluster + +import ( + "sync" + "testing" + + cid "github.com/ipfs/go-cid" + ma "github.com/multiformats/go-multiaddr" +) + +func peerManagerClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { + cls := make([]*Cluster, nClusters, nClusters) + mocks := make([]*ipfsMock, nClusters, nClusters) + var wg sync.WaitGroup + for i := 0; i < nClusters; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + cl, m := createOnePeerCluster(t, i) + cls[i] = cl + mocks[i] = m + }(i) + } + wg.Wait() + return cls, mocks +} + +func clusterAddr(c *Cluster) ma.Multiaddr { + addr := c.config.ClusterAddr + pidAddr, _ := ma.NewMultiaddr("/ipfs/" + c.ID().ID.Pretty()) + return addr.Encapsulate(pidAddr) +} + +func TestClustersPeerAdd(t *testing.T) { + clusters, mocks := peerManagerClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 2 { + t.Fatal("need at least 2 nodes for this test") + } + + for i := 1; i < len(clusters); i++ { + addr := clusterAddr(clusters[i]) + id, err := clusters[0].PeerAdd(addr) + if err != nil { + t.Fatal(err) + } + if len(id.ClusterPeers) != i { + // ClusterPeers is originally empty and contains nodes as we add them + t.Log(id.ClusterPeers) + t.Fatal("cluster peers should be up to date with the cluster") + } + } + + h, _ := cid.Decode(testCid) + clusters[1].Pin(h) + delay() + + f := func(t *testing.T, c *Cluster) { + ids := c.Peers() + + // check they are tracked by the peer manager + if len(ids) != nClusters { + t.Error("added clusters are not part of clusters") + } + + // Check that they are part of the consensus + pins := c.Pins() + if len(pins) != 1 { + t.Error("expected 1 pin everywhere") + } + + // check that its part of the configuration + if len(c.config.ClusterPeers) != nClusters-1 { + t.Error("expected different cluster peers in the configuration") + } + + for _, peer := range c.config.ClusterPeers { + if peer == nil { + t.Error("something went wrong adding peer to config") + } + } + } + runF(t, clusters, f) +} + +func TestClustersPeerAddBadPeer(t *testing.T) { + clusters, mocks := peerManagerClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 2 { + t.Fatal("need at least 2 nodes for this test") + } + + // We add a cluster that has been shutdown + // (closed transports) + clusters[1].Shutdown() + _, err := clusters[0].PeerAdd(clusterAddr(clusters[1])) + if err == nil { + t.Error("expected an error") + } + ids := clusters[0].Peers() + if len(ids) != 1 { + t.Error("cluster should have only one member") + } +} + +func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { + clusters, mocks := peerManagerClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 3 { + t.Fatal("need at least 3 nodes for this test") + } + + _, err := clusters[0].PeerAdd(clusterAddr(clusters[1])) + ids := clusters[1].Peers() + if len(ids) != 2 { + t.Error("expected 2 peers") + } + + // Now we shutdown one member of the running cluster + // and try to add someone else. + clusters[1].Shutdown() + _, err = clusters[0].PeerAdd(clusterAddr(clusters[2])) + + if err == nil { + t.Error("expected an error") + } + + ids = clusters[0].Peers() + if len(ids) != 2 { + t.Error("cluster should still have 2 peers") + } +} + +func TestClustersPeerRemove(t *testing.T) { + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + + p := clusters[1].ID().ID + err := clusters[0].PeerRemove(p) + if err != nil { + t.Error(err) + } + + f := func(t *testing.T, c *Cluster) { + if c.ID().ID == p { //This is the removed cluster + _, ok := <-c.Done() + if ok { + t.Error("removed peer should have exited") + } + if len(c.config.ClusterPeers) != 0 { + t.Error("cluster peers should be empty") + } + } else { + ids := c.Peers() + if len(ids) != nClusters-1 { + t.Error("should have removed 1 peer") + } + if len(c.config.ClusterPeers) != nClusters-2 { + t.Error("should have removed peer from config") + } + } + } + + runF(t, clusters, f) +} diff --git a/raft.go b/raft.go index 197acdd4..327085bc 100644 --- a/raft.go +++ b/raft.go @@ -37,10 +37,13 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) //transport.OpenConns() pstore := &libp2praft.Peerstore{} - hPeers := host.Peerstore().Peers() - strPeers := make([]string, 0, len(hPeers)) - for _, p := range hPeers { - strPeers = append(strPeers, peer.IDB58Encode(p)) + strPeers := []string{peer.IDB58Encode(host.ID())} + for _, addr := range cfg.ClusterPeers { + p, _, err := multiaddrSplit(addr) + if err != nil { + return nil, nil, nil, err + } + strPeers = append(strPeers, p.Pretty()) } pstore.SetPeers(strPeers) @@ -52,12 +55,11 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) raftCfg = hashiraft.DefaultConfig() raftCfg.EnableSingleNode = raftSingleMode } - if SilentRaft { - raftCfg.LogOutput = ioutil.Discard - raftCfg.Logger = nil - } + raftCfg.LogOutput = ioutil.Discard + raftCfg.Logger = raftStdLogger + logger.Debug("creating file snapshot store") - snapshots, err := hashiraft.NewFileSnapshotStore(cfg.ConsensusDataFolder, maxSnapshots, nil) + snapshots, err := hashiraft.NewFileSnapshotStoreWithLogger(cfg.ConsensusDataFolder, maxSnapshots, raftStdLogger) if err != nil { logger.Error("creating file snapshot store: ", err) return nil, nil, nil, err diff --git a/rest_api.go b/rest_api.go index cc0f6ce4..3ca4da22 100644 --- a/rest_api.go +++ b/rest_api.go @@ -12,11 +12,11 @@ import ( "sync" "time" + mux "github.com/gorilla/mux" rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" - - mux "github.com/gorilla/mux" ) // Server settings @@ -56,6 +56,10 @@ type route struct { HandlerFunc http.HandlerFunc } +type peerAddBody struct { + PeerMultiaddr string `json:"peer_multiaddress"` +} + type errorResp struct { Code int `json:"code"` Message string `json:"message"` @@ -110,6 +114,7 @@ type restIDResp struct { ID string `json:"id"` PublicKey string `json:"public_key"` Addresses []string `json:"addresses"` + ClusterPeers []string `json:"cluster_peers"` Version string `json:"version"` Commit string `json:"commit"` RPCProtocolVersion string `json:"rpc_protocol_version"` @@ -129,10 +134,15 @@ func newRestIDResp(id ID) *restIDResp { for i, a := range id.Addresses { addrs[i] = a.String() } + peers := make([]string, len(id.ClusterPeers), len(id.ClusterPeers)) + for i, a := range id.ClusterPeers { + peers[i] = a.String() + } return &restIDResp{ ID: id.ID.Pretty(), PublicKey: pubKey, Addresses: addrs, + ClusterPeers: peers, Version: id.Version, Commit: id.Commit, RPCProtocolVersion: string(id.RPCProtocolVersion), @@ -221,6 +231,18 @@ func (api *RESTAPI) routes() []route { "/peers", api.peerListHandler, }, + { + "PeerAdd", + "POST", + "/peers", + api.peerAddHandler, + }, + { + "PeerRemove", + "DELETE", + "/peers/{peer}", + api.peerRemoveHandler, + }, { "Pins", @@ -329,8 +351,7 @@ func (api *RESTAPI) idHandler(w http.ResponseWriter, r *http.Request) { struct{}{}, &idSerial) if checkRPCErr(w, err) { - id := idSerial.ToID() - resp := newRestIDResp(id) + resp := newRestIDResp(idSerial.ToID()) sendJSONResponse(w, 200, resp) } } @@ -366,6 +387,48 @@ func (api *RESTAPI) peerListHandler(w http.ResponseWriter, r *http.Request) { } } +func (api *RESTAPI) peerAddHandler(w http.ResponseWriter, r *http.Request) { + dec := json.NewDecoder(r.Body) + defer r.Body.Close() + + var addInfo peerAddBody + err := dec.Decode(&addInfo) + if err != nil { + sendErrorResponse(w, 400, "error decoding request body") + return + } + + mAddr, err := ma.NewMultiaddr(addInfo.PeerMultiaddr) + if err != nil { + sendErrorResponse(w, 400, "error decoding peer_multiaddress") + return + } + + var ids IDSerial + err = api.rpcClient.Call("", + "Cluster", + "PeerAdd", + MultiaddrToSerial(mAddr), + &ids) + if checkRPCErr(w, err) { + resp := newRestIDResp(ids.ToID()) + sendJSONResponse(w, 200, resp) + } +} + +func (api *RESTAPI) peerRemoveHandler(w http.ResponseWriter, r *http.Request) { + if p := parsePidOrError(w, r); p != "" { + err := api.rpcClient.Call("", + "Cluster", + "PeerRemove", + p, + &struct{}{}) + if checkRPCErr(w, err) { + sendEmptyResponse(w) + } + } +} + func (api *RESTAPI) pinHandler(w http.ResponseWriter, r *http.Request) { if c := parseCidOrError(w, r); c != nil { err := api.rpcClient.Call("", @@ -482,6 +545,17 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) *CidArg { return &CidArg{hash} } +func parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID { + vars := mux.Vars(r) + idStr := vars["peer"] + pid, err := peer.IDB58Decode(idStr) + if err != nil { + sendErrorResponse(w, 400, "error decoding Peer ID: "+err.Error()) + return "" + } + return pid +} + // checkRPCErr takes care of returning standard error responses if we // pass an error to it. It returns true when everythings OK (no error // was handled), or false otherwise. diff --git a/rest_api_test.go b/rest_api_test.go index f68d296b..d95da6bf 100644 --- a/rest_api_test.go +++ b/rest_api_test.go @@ -3,6 +3,7 @@ package ipfscluster import ( "bytes" "encoding/json" + "fmt" "io/ioutil" "net/http" "testing" @@ -51,8 +52,8 @@ func makeGet(t *testing.T, path string, resp interface{}) { processResp(t, httpResp, err, resp) } -func makePost(t *testing.T, path string, resp interface{}) { - httpResp, err := http.Post(apiHost+path, "application/json", bytes.NewReader([]byte{})) +func makePost(t *testing.T, path string, body []byte, resp interface{}) { + httpResp, err := http.Post(apiHost+path, "application/json", bytes.NewReader(body)) processResp(t, httpResp, err, resp) } @@ -107,20 +108,57 @@ func TestRESTAPIPeerstEndpoint(t *testing.T) { } } +func TestRESTAPIPeerAddEndpoint(t *testing.T) { + api := testRESTAPI(t) + defer api.Shutdown() + + id := restIDResp{} + // post with valid body + body := fmt.Sprintf("{\"peer_multiaddress\":\"/ip4/1.2.3.4/tcp/1234/ipfs/%s\"}", testPeerID.Pretty()) + t.Log(body) + makePost(t, "/peers", []byte(body), &id) + + if id.ID != testPeerID.Pretty() { + t.Error("expected correct ID") + } + if id.Error != "" { + t.Error("did not expect an error") + } + + // Send invalid body + errResp := errorResp{} + makePost(t, "/peers", []byte("oeoeoeoe"), &errResp) + if errResp.Code != 400 { + t.Error("expected error with bad body") + } + // Send invalid multiaddr + makePost(t, "/peers", []byte("{\"peer_multiaddress\": \"ab\"}"), &errResp) + if errResp.Code != 400 { + t.Error("expected error with bad multiaddress") + } +} + +func TestRESTAPIPeerRemoveEndpoint(t *testing.T) { + api := testRESTAPI(t) + defer api.Shutdown() + + makeDelete(t, "/peers/"+testPeerID.Pretty(), &struct{}{}) +} + func TestRESTAPIPinEndpoint(t *testing.T) { api := testRESTAPI(t) defer api.Shutdown() // test regular post - makePost(t, "/pins/"+testCid, &struct{}{}) + makePost(t, "/pins/"+testCid, []byte{}, &struct{}{}) errResp := errorResp{} - makePost(t, "/pins/"+errorCid, &errResp) + makePost(t, "/pins/"+errorCid, []byte{}, &errResp) if errResp.Message != errBadCid.Error() { t.Error("expected different error: ", errResp.Message) } - makePost(t, "/pins/abcd", &errResp) + makePost(t, "/pins/abcd", []byte{}, &errResp) if errResp.Code != 400 { t.Error("should fail with bad Cid") } @@ -195,7 +233,7 @@ func TestRESTAPISyncAllEndpoint(t *testing.T) { defer api.Shutdown() var resp statusResp - makePost(t, "/pins/sync", &resp) + makePost(t, "/pins/sync", []byte{}, &resp) if len(resp) != 3 || resp[0].Cid != testCid1 || @@ -209,7 +247,7 @@ func TestRESTAPISyncEndpoint(t *testing.T) { defer api.Shutdown() var resp statusCidResp - makePost(t, "/pins/"+testCid+"/sync", &resp) + makePost(t, "/pins/"+testCid+"/sync", []byte{}, &resp) if resp.Cid != testCid { t.Error("expected the same cid") @@ -228,7 +266,7 @@ func TestRESTAPIRecoverEndpoint(t *testing.T) { defer api.Shutdown() var resp statusCidResp - makePost(t, "/pins/"+testCid+"/recover", &resp) + makePost(t, "/pins/"+testCid+"/recover", []byte{}, &resp) if resp.Cid != testCid { t.Error("expected the same cid") diff --git a/rpc_api.go b/rpc_api.go index 6cb498b1..548722f3 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -1,6 +1,10 @@ package ipfscluster -import cid "github.com/ipfs/go-cid" +import ( + cid "github.com/ipfs/go-cid" + + peer "github.com/libp2p/go-libp2p-peer" +) // RPCAPI is a go-libp2p-gorpc service which provides the internal ipfs-cluster // API, which enables components and cluster peers to communicate and @@ -94,6 +98,19 @@ func (api *RPCAPI) Peers(in struct{}, out *[]IDSerial) error { return nil } +// PeerAdd runs Cluster.PeerAdd(). +func (api *RPCAPI) PeerAdd(in MultiaddrSerial, out *IDSerial) error { + addr := in.ToMultiaddr() + id, err := api.cluster.PeerAdd(addr) + *out = id.ToSerial() + return err +} + +// PeerRemove runs Cluster.PeerRm(). +func (api *RPCAPI) PeerRemove(in peer.ID, out *struct{}) error { + return api.cluster.PeerRemove(in) +} + // StatusAll runs Cluster.StatusAll(). func (api *RPCAPI) StatusAll(in struct{}, out *[]GlobalPinInfo) error { pinfo, err := api.cluster.StatusAll() @@ -277,3 +294,34 @@ func (api *RPCAPI) ConsensusLogUnpin(in *CidArg, out *struct{}) error { } return api.cluster.consensus.LogUnpin(c) } + +/* + Peer Manager methods + +*/ + +// PeerManagerAddPeer runs peerManager.addPeer(). +func (api *RPCAPI) PeerManagerAddPeer(in MultiaddrSerial, out *peer.ID) error { + mAddr := in.ToMultiaddr() + p, err := api.cluster.peerManager.addPeer(mAddr) + *out = p + return err +} + +// PeerManagerRmPeer runs peerManager.rmPeer(). +func (api *RPCAPI) PeerManagerRmPeer(in peer.ID, out *struct{}) error { + return api.cluster.peerManager.rmPeer(in) +} + +// PeerManagerAddFromMultiaddrs runs peerManager.addFromMultiaddrs(). +func (api *RPCAPI) PeerManagerAddFromMultiaddrs(in MultiaddrsSerial, out *struct{}) error { + api.cluster.peerManager.addFromMultiaddrs(in.ToMultiaddrs()) + return nil +} + +// PeerManagerPeers runs peerManager.peers(). +func (api *RPCAPI) PeerManagerPeers(in struct{}, out *[]peer.ID) error { + peers := api.cluster.peerManager.peers() + *out = peers + return nil +} diff --git a/rpc_api_test.go b/rpc_api_test.go index 5d771cc9..483758b0 100644 --- a/rpc_api_test.go +++ b/rpc_api_test.go @@ -72,6 +72,17 @@ func (mock *mockService) Peers(in struct{}, out *[]IDSerial) error { return nil } +func (mock *mockService) PeerAdd(in MultiaddrSerial, out *IDSerial) error { + id := IDSerial{} + mock.ID(struct{}{}, &id) + *out = id + return nil +} + +func (mock *mockService) PeerRemove(in peer.ID, out *struct{}) error { + return nil +} + func (mock *mockService) StatusAll(in struct{}, out *[]GlobalPinInfo) error { c1, _ := cid.Decode(testCid1) c2, _ := cid.Decode(testCid2) diff --git a/util.go b/util.go new file mode 100644 index 00000000..c6ea5d34 --- /dev/null +++ b/util.go @@ -0,0 +1,69 @@ +package ipfscluster + +import ( + "fmt" + + peer "github.com/libp2p/go-libp2p-peer" + ma "github.com/multiformats/go-multiaddr" +) + +// The copy functions below are used in calls to Cluste.multiRPC() +func copyPIDsToIfaces(in []peer.ID) []interface{} { + ifaces := make([]interface{}, len(in), len(in)) + for i := range in { + ifaces[i] = &in[i] + } + return ifaces +} + +func copyIDSerialsToIfaces(in []IDSerial) []interface{} { + ifaces := make([]interface{}, len(in), len(in)) + for i := range in { + ifaces[i] = &in[i] + } + return ifaces +} + +func copyPinInfoToIfaces(in []PinInfo) []interface{} { + ifaces := make([]interface{}, len(in), len(in)) + for i := range in { + ifaces[i] = &in[i] + } + return ifaces +} + +func copyPinInfoSliceToIfaces(in [][]PinInfo) []interface{} { + ifaces := make([]interface{}, len(in), len(in)) + for i := range in { + ifaces[i] = &in[i] + } + return ifaces +} + +func copyEmptyStructToIfaces(in []struct{}) []interface{} { + ifaces := make([]interface{}, len(in), len(in)) + for i := range in { + ifaces[i] = &in[i] + } + return ifaces +} + +func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) { + pid, err := addr.ValueForProtocol(ma.P_IPFS) + if err != nil { + err = fmt.Errorf("Invalid peer multiaddress: %s: %s", addr, err) + logger.Error(err) + return "", nil, err + } + + ipfs, _ := ma.NewMultiaddr("/ipfs/" + pid) + decapAddr := addr.Decapsulate(ipfs) + + peerID, err := peer.IDB58Decode(pid) + if err != nil { + err = fmt.Errorf("Invalid peer ID in multiaddress: %s: %s", pid) + logger.Error(err) + return "", nil, err + } + return peerID, decapAddr, nil +}