From 4c1e0068f5796b9b995f53cc4e4b1b5b9f8d086e Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 26 Jan 2017 19:59:31 +0100 Subject: [PATCH] Fix #15: Peers() provides lots of information now I have renamed "members" to "peers". Added IPFS daemon ID and addresses to the ID object and have Peers() return the collection of ID() objects from the cluster. License: MIT Signed-off-by: Hector Sanjuan --- README.md | 14 ++--- ROADMAP.md | 4 +- architecture.md | 8 +-- captain.log.md | 2 +- cluster.go | 64 ++++++++++++------- cluster_test.go | 21 +++++-- config.go | 2 +- consensus.go | 23 ++++--- ipfs-cluster-ctl/main.go | 8 +-- ipfs_http_connector.go | 47 ++++++++++++++ ipfs_http_connector_test.go | 26 ++++++++ ipfscluster.go | 121 +++++++++++++++++++++++++++++++++++- ipfscluster_test.go | 53 +++++++++++++--- ipfsmock_test.go | 9 +++ rest_api.go | 110 ++++++++++++++++++++------------ rest_api_test.go | 13 ++-- rpc_api.go | 23 ++++--- rpc_api_test.go | 14 +++-- 18 files changed, 438 insertions(+), 124 deletions(-) diff --git a/README.md b/README.md index 58ebfbf4..8c194029 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Additionally, cluster nodes act as a proxy/wrapper to the IPFS API, so they can IPFS Cluster provides a cluster-node application (`ipfs-cluster-service`), a Go API, a HTTP API and a command-line tool (`ipfs-cluster-ctl`). -Current functionality only allows pinning in all cluster members, but more strategies (like setting a replication factor for each pin) will be developed. +Current functionality only allows pinning in all cluster peers, but more strategies (like setting a replication factor for each pin) will be developed. ## Table of Contents @@ -41,7 +41,7 @@ Current functionality only allows pinning in all cluster members, but more strat Since the start of IPFS it was clear that a tool to coordinate a number of different nodes (and the content they are supposed to store) would add a great value to the IPFS ecosystem. Naïve approaches are possible, but they present some weaknesses, specially at dealing with error handling, recovery and implementation of advanced pinning strategies. -`ipfs-cluster` aims to address this issues by providing a IPFS node wrapper which coordinates multiple cluster members via a consensus algorithm. This ensures that the desired state of the system is always agreed upon and can be easily maintained by the members of the cluster. Thus, every cluster member knows which content is tracked, can decide whether asking IPFS to pin it and can react to any contingencies like node reboots. +`ipfs-cluster` aims to address this issues by providing a IPFS node wrapper which coordinates multiple cluster peers via a consensus algorithm. This ensures that the desired state of the system is always agreed upon and can be easily maintained by the cluster peers. Thus, every cluster node knows which content is tracked, can decide whether asking IPFS to pin it and can react to any contingencies like node reboots. ## Captain @@ -63,7 +63,7 @@ This will install `ipfs-cluster-service` and `ipfs-cluster-ctl` in your `$GOPATH ### `ipfs-cluster-service` -`ipfs-cluster-service` runs a member node for the cluster. Usage information can be obtained running: +`ipfs-cluster-service` runs a cluster peer. Usage information can be obtained running: ``` $ ipfs-cluster-service -h @@ -78,7 +78,7 @@ $ ipfs-cluster-service -init The configuration will be placed in `~/.ipfs-cluster/service.json` by default. -You can add the multiaddresses for the other members of the cluster in the `cluster_peers` variable. For example, here is a valid configuration for a cluster of 4 members: +You can add the multiaddresses for the other cluster peers the `cluster_peers` variable. For example, here is a valid configuration for a cluster of 4 peers: ```json { @@ -101,9 +101,9 @@ You can add the multiaddresses for the other members of the cluster in the `clus } ``` -The configuration file should probably be identical among all cluster members, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it does not have to. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig). +The configuration file should probably be identical among all cluster peers, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it does not have to. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig). -Once every cluster member has the configuration in place, you can run `ipfs-cluster-service` to start the cluster. +Once every cluster peer has the configuration in place, you can run `ipfs-cluster-service` to start the cluster. ### `ipfs-cluster-ctl` @@ -116,7 +116,7 @@ information about supported commands. In summary, it works as follows: ``` -$ ipfs-cluster-ctl member ls # list cluster members +$ ipfs-cluster-ctl peers ls # list cluster peers $ ipfs-cluster-ctl pin add Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # pins a Cid in the cluster $ ipfs-cluster-ctl pin rm Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # unpins a Cid from the cluster $ ipfs-cluster-ctl status # display tracked Cids information diff --git a/ROADMAP.md b/ROADMAP.md index 2f6156c0..64cd8863 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -14,9 +14,9 @@ This quarter is going to be focused on bringing ipfs-cluster to life as a usable On these lines, there are several endeavours which stand out for themselves and are officially part of the general IPFS Roadmaps: -* Dynamically add and remove cluster members in an easy fashion (https://github.com/ipfs/pm/issues/353) +* Dynamically add and remove cluster peers in an easy fashion (https://github.com/ipfs/pm/issues/353) -This involves easily adding a member (or removing) from a running cluster. `ipfs-cluster-service member add ` should work and should update the peer set of all components of all members, along with their configurations. +This involves easily adding a peer (or removing) from a running cluster. `ipfs-cluster-service peer add ` should work and should update the peer set of all components of all peers, along with their configurations. * Replication-factor-based pinning strategy (https://github.com/ipfs/pm/issues/353) diff --git a/architecture.md b/architecture.md index d681b3b3..2373ce0d 100644 --- a/architecture.md +++ b/architecture.md @@ -4,12 +4,12 @@ These definitions are still evolving and may change: - * Member: an ipfs cluster server node which is member of the consensus. Alternatively: node, peer + * Peer: an ipfs cluster service node which is member of the consensus. Alternatively: node, member. * ipfs-cluster: the IPFS cluster software * ipfs-cluster-ctl: the IPFS cluster client command line application * ipfs-cluster-service: the IPFS cluster node application * API: the REST-ish API implemented by the RESTAPI component and used by the clients. - * RPC API: the internal API that cluster members and components use. + * RPC API: the internal API that cluster peers and components use. * Go API: the public interface offered by the Cluster object in Go. * Component: an ipfs-cluster module which performs specific functions and uses the RPC API to communicate with other parts of the system. Implements the Component interface. @@ -38,7 +38,7 @@ Components perform a number of functions and need to be able to communicate with * the API needs to use functionality provided by the main component * the PinTracker needs to use functionality provided by the IPFSConnector - * the main component needs to use functionality provided by the main component of a different member (the leader) + * the main component needs to use functionality provided by the main component of a different peer (the leader) ### RPC API @@ -56,7 +56,7 @@ Currently, all components live in the same `ipfscluster` Go module, but they sha ### `ipfs-cluster-service` -This is the service application of IPFS Cluster. It brings up a cluster, connects to other members, gets the latest consensus state and participates in cluster. +This is the service application of IPFS Cluster. It brings up a cluster, connects to other peers, gets the latest consensus state and participates in cluster. ### `ipfs-cluster-ctl` diff --git a/captain.log.md b/captain.log.md index 77f43f6e..6db4665e 100644 --- a/captain.log.md +++ b/captain.log.md @@ -6,7 +6,7 @@ I have just merged the initial cluster version into master. There are many rough The rest of the quarter will be focused on 4 main issues: -* Simplify the process of adding and removing members of a cluster +* Simplify the process of adding and removing cluster peers * Implement a replication-factor-based pinning strategy * Generate real end to end tests * Make ipfs-cluster stable diff --git a/cluster.go b/cluster.go index 08274eef..e73432cb 100644 --- a/cluster.go +++ b/cluster.go @@ -8,11 +8,9 @@ import ( rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" - crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" peerstore "github.com/libp2p/go-libp2p-peerstore" - protocol "github.com/libp2p/go-libp2p-protocol" swarm "github.com/libp2p/go-libp2p-swarm" basichost "github.com/libp2p/go-libp2p/p2p/host/basic" ma "github.com/multiformats/go-multiaddr" @@ -40,16 +38,6 @@ type Cluster struct { wg sync.WaitGroup } -// ID holds information about the Cluster peer -type ID struct { - ID peer.ID - PublicKey crypto.PubKey - Addresses []ma.Multiaddr - Version string - Commit string - RPCProtocolVersion protocol.ID -} - // NewCluster builds a new IPFS Cluster. It initializes a LibP2P host, creates // and RPC Server and client and sets up all components. func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker) (*Cluster, error) { @@ -140,13 +128,22 @@ func (c *Cluster) Shutdown() error { // ID returns information about the Cluster peer func (c *Cluster) ID() ID { + // ignore error since it is included in response object + ipfsID, _ := c.ipfs.ID() + var addrs []ma.Multiaddr + for _, addr := range c.host.Addrs() { + ipfsAddr, _ := ma.NewMultiaddr("/ipfs/" + c.host.ID().Pretty()) + addrs = append(addrs, addr.Encapsulate(ipfsAddr)) + } + return ID{ ID: c.host.ID(), PublicKey: c.host.Peerstore().PubKey(c.host.ID()), - Addresses: c.host.Addrs(), + Addresses: addrs, Version: Version, Commit: Commit, RPCProtocolVersion: RPCProtocol, + IPFS: ipfsID, } } @@ -248,13 +245,13 @@ func (c *Cluster) SyncLocal(h *cid.Cid) (PinInfo, error) { return pInfo, err } -// SyncAll triggers LocalSync() operations in all members of the Cluster. +// SyncAll triggers LocalSync() operations in all cluster peers. func (c *Cluster) SyncAll() ([]GlobalPinInfo, error) { return c.globalPinInfoSlice("SyncAllLocal") } // Sync triggers a LocalSyncCid() operation for a given Cid -// in all members of the Cluster. +// in all cluster peers. func (c *Cluster) Sync(h *cid.Cid) (GlobalPinInfo, error) { return c.globalPinInfoCid("SyncLocal", h) } @@ -265,7 +262,7 @@ func (c *Cluster) RecoverLocal(h *cid.Cid) (PinInfo, error) { } // Recover triggers a recover operation for a given Cid in all -// members of the Cluster. +// cluster peers. func (c *Cluster) Recover(h *cid.Cid) (GlobalPinInfo, error) { return c.globalPinInfoCid("TrackerRecover", h) } @@ -318,8 +315,32 @@ func (c *Cluster) Version() string { return Version } -// Members returns the IDs of the members of this Cluster -func (c *Cluster) Members() []peer.ID { +// Peers returns the IDs of the members of this Cluster +func (c *Cluster) Peers() []ID { + members := c.peerList() + peersSerial := make([]IDSerial, len(members), len(members)) + peers := make([]ID, len(members), len(members)) + + ifaceReplies := make([]interface{}, len(members), len(members)) + for i := range peersSerial { + ifaceReplies[i] = &peersSerial[i] + } + + errs := c.multiRPC(members, "Cluster", "ID", struct{}{}, ifaceReplies) + for i, err := range errs { + if err != nil { + peersSerial[i].ID = peer.IDB58Encode(members[i]) + peersSerial[i].Error = err.Error() + } + } + + for i, ps := range peersSerial { + peers[i] = ps.ToID() + } + return peers +} + +func (c *Cluster) peerList() []peer.ID { return c.host.Peerstore().Peers() } @@ -421,7 +442,7 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er PeerMap: make(map[peer.ID]PinInfo), } - members := c.Members() + members := c.peerList() replies := make([]PinInfo, len(members), len(members)) ifaceReplies := make([]interface{}, len(members), len(members)) for i := range replies { @@ -456,7 +477,7 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) { var infos []GlobalPinInfo fullMap := make(map[string]GlobalPinInfo) - members := c.Members() + members := c.peerList() replies := make([][]PinInfo, len(members), len(members)) ifaceReplies := make([]interface{}, len(members), len(members)) for i := range replies { @@ -513,11 +534,12 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) { // openConns is a workaround for // https://github.com/libp2p/go-libp2p-swarm/issues/15 +// which break our tests. // It runs when consensus is initialized so we can assume // that the cluster is more or less up. // It should open connections for peers where they haven't // yet been opened. By randomly sleeping we reduce the -// chance that members will open 2 connections simultaneously. +// chance that peers will open 2 connections simultaneously. func (c *Cluster) openConns() { rand.Seed(time.Now().UnixNano()) time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) diff --git a/cluster_test.go b/cluster_test.go index d940f877..929e2d9b 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -30,6 +30,15 @@ type mockConnector struct { mockComponent } +func (ipfs *mockConnector) ID() (IPFSID, error) { + if ipfs.returnError { + return IPFSID{}, errors.New("") + } + return IPFSID{ + ID: testPeerID, + }, nil +} + func (ipfs *mockConnector) Pin(c *cid.Cid) error { if ipfs.returnError { return errors.New("") @@ -179,14 +188,16 @@ func TestClusterUnpin(t *testing.T) { } } -func TestClusterMembers(t *testing.T) { +func TestClusterPeers(t *testing.T) { cl, _, _, _, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() - m := cl.Members() - id := testingConfig().ID - if len(m) != 1 || m[0] != id { - t.Error("bad Members()") + peers := cl.Peers() + if len(peers) != 1 { + t.Fatal("expected 1 peer") + } + if peers[0].ID != testingConfig().ID { + t.Error("bad member") } } diff --git a/config.go b/config.go index cce4f6d3..4dad9d27 100644 --- a/config.go +++ b/config.go @@ -72,7 +72,7 @@ type JSONConfig struct { ClusterPeers []string `json:"cluster_peers"` // Listen address for the Cluster libp2p host. This is used for - // interal RPC and Consensus communications between cluster members. + // interal RPC and Consensus communications between cluster peers. ClusterListenMultiaddress string `json:"cluster_multiaddress"` // Listen address for the the Cluster HTTP API component. diff --git a/consensus.go b/consensus.go index bbaea9e1..418c3b30 100644 --- a/consensus.go +++ b/consensus.go @@ -96,7 +96,7 @@ ROLLBACK: } // Consensus handles the work of keeping a shared-state between -// the members of an IPFS Cluster, as well as modifying that state and +// the peers of an IPFS Cluster, as well as modifying that state and // applying any updates in a thread-safe manner. type Consensus struct { ctx context.Context @@ -199,14 +199,21 @@ func (cc *Consensus) run(state State) { <-cc.rpcReady var pInfo []PinInfo - cc.rpcClient.Go( - "", - "Cluster", - "StateSync", - struct{}{}, - &pInfo, - nil) + _, err := cc.State() + // only check sync if we have a state + // avoid error on new running clusters + if err != nil { + logger.Debug("skipping state sync: ", err) + } else { + cc.rpcClient.Go( + "", + "Cluster", + "StateSync", + struct{}{}, + &pInfo, + nil) + } logger.Infof("IPFS Cluster is running") <-cc.shutdownCh }() diff --git a/ipfs-cluster-ctl/main.go b/ipfs-cluster-ctl/main.go index 36952ff4..b6197f4b 100644 --- a/ipfs-cluster-ctl/main.go +++ b/ipfs-cluster-ctl/main.go @@ -120,17 +120,17 @@ This command will print out information about the cluster peer used }, }, { - Name: "member", - Usage: "list and manage IPFS Cluster members", + Name: "peers", + Usage: "list and manage IPFS Cluster peers", UsageText: ` -This command can be used to list and manage IPFS Cluster members. +This command can be used to list and manage IPFS Cluster peers. `, Subcommands: []cli.Command{ { Name: "ls", Usage: "list the nodes participating in the IPFS Cluster", Action: func(c *cli.Context) error { - resp := request("GET", "/members") + resp := request("GET", "/peers") formatResponse(resp) return nil }, diff --git a/ipfs_http_connector.go b/ipfs_http_connector.go index 3f0d161a..a7d00928 100644 --- a/ipfs_http_connector.go +++ b/ipfs_http_connector.go @@ -16,6 +16,7 @@ import ( rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ) @@ -71,6 +72,11 @@ type pinLsResp struct { } } +type ipfsIDResp struct { + ID string + Addresses []string +} + // NewIPFSHTTPConnector creates the component and leaves it ready to be started func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) { ctx := context.Background() @@ -228,6 +234,47 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error { return nil } +// ID performs an ID request against the configured +// IPFS daemon. It returns the fetched information. +// If the request fails, or the parsing fails, it +// returns an error and an empty IPFSID which also +// contains the error message. +func (ipfs *IPFSHTTPConnector) ID() (IPFSID, error) { + id := IPFSID{} + body, err := ipfs.get("id") + if err != nil { + id.Error = err.Error() + return id, err + } + + var resp ipfsIDResp + err = json.Unmarshal(body, &resp) + if err != nil { + id.Error = err.Error() + return id, err + } + + pID, err := peer.IDB58Decode(resp.ID) + if err != nil { + id.Error = err.Error() + return id, err + } + id.ID = pID + + mAddrs := make([]ma.Multiaddr, len(resp.Addresses), len(resp.Addresses)) + for i, strAddr := range resp.Addresses { + mAddr, err := ma.NewMultiaddr(strAddr) + if err != nil { + id.Error = err.Error() + return id, err + } + mAddrs[i] = mAddr + } + id.Addresses = mAddrs + + return id, nil +} + // Pin performs a pin request against the configured IPFS // daemon. func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error { diff --git a/ipfs_http_connector_test.go b/ipfs_http_connector_test.go index 91ba29b1..c539b3b1 100644 --- a/ipfs_http_connector_test.go +++ b/ipfs_http_connector_test.go @@ -34,6 +34,32 @@ func TestNewIPFSHTTPConnector(t *testing.T) { defer ipfs.Shutdown() } +func TestIPFSID(t *testing.T) { + ipfs, mock := testIPFSConnector(t) + defer ipfs.Shutdown() + id, err := ipfs.ID() + if err != nil { + t.Fatal(err) + } + if id.ID != testPeerID { + t.Error("expected testPeerID") + } + if len(id.Addresses) != 1 { + t.Error("expected 1 address") + } + if id.Error != "" { + t.Error("expected no error") + } + mock.Close() + id, err = ipfs.ID() + if err == nil { + t.Error("expected an error") + } + if id.Error != err.Error() { + t.Error("error messages should match") + } +} + func TestIPFSPin(t *testing.T) { ipfs, mock := testIPFSConnector(t) defer mock.Close() diff --git a/ipfscluster.go b/ipfscluster.go index a6d6d2df..f49562e3 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -2,7 +2,7 @@ // allows to orchestrate pinning operations among several IPFS nodes. // // IPFS Cluster uses a go-libp2p-raft to keep a shared state between -// the different members of the cluster. It also uses LibP2P to enable +// the different cluster peers. It also uses LibP2P to enable // communication between its different components, which perform different // tasks like managing the underlying IPFS daemons, or providing APIs for // external control. @@ -11,16 +11,19 @@ package ipfscluster import ( "time" + crypto "github.com/libp2p/go-libp2p-crypto" + rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" + ma "github.com/multiformats/go-multiaddr" ) var logger = logging.Logger("cluster") -// RPCProtocol is used to send libp2p messages between cluster members +// RPCProtocol is used to send libp2p messages between cluster peers var RPCProtocol = protocol.ID("/ipfscluster/" + Version + "/rpc") // SilentRaft controls whether all Raft log messages are discarded. @@ -86,7 +89,7 @@ func (ips IPFSPinStatus) IsPinned() bool { } // GlobalPinInfo contains cluster-wide status information about a tracked Cid, -// indexed by cluster member. +// indexed by cluster peer. type GlobalPinInfo struct { Cid *cid.Cid PeerMap map[peer.ID]PinInfo @@ -148,6 +151,7 @@ type API interface { // an IPFS daemon. This is a base component. type IPFSConnector interface { Component + ID() (IPFSID, error) Pin(*cid.Cid) error Unpin(*cid.Cid) error PinLsCid(*cid.Cid) (IPFSPinStatus, error) @@ -200,3 +204,114 @@ type PinTracker interface { // Recover retriggers a Pin/Unpin operation in Cids with error status. Recover(*cid.Cid) (PinInfo, error) } + +// IPFSID is used to store information about the underlying IPFS daemon +type IPFSID struct { + ID peer.ID + Addresses []ma.Multiaddr + Error string +} + +// IPFSIDSerial is the serializable IPFSID for RPC requests +type IPFSIDSerial struct { + ID string + Addresses [][]byte + Error string +} + +// ToSerial converts IPFSID to a go serializable object +func (id *IPFSID) ToSerial() IPFSIDSerial { + mAddrsB := make([][]byte, len(id.Addresses), len(id.Addresses)) + for i, a := range id.Addresses { + mAddrsB[i] = a.Bytes() + } + return IPFSIDSerial{ + ID: peer.IDB58Encode(id.ID), + Addresses: mAddrsB, + Error: id.Error, + } +} + +// ToID converts an IPFSIDSerial to IPFSID +// It will ignore any errors when parsing the fields. +func (ids *IPFSIDSerial) ToID() IPFSID { + id := IPFSID{} + if pID, err := peer.IDB58Decode(ids.ID); err == nil { + id.ID = pID + } + id.Addresses = make([]ma.Multiaddr, len(ids.Addresses), len(ids.Addresses)) + for i, mAddrB := range ids.Addresses { + if mAddr, err := ma.NewMultiaddrBytes(mAddrB); err == nil { + id.Addresses[i] = mAddr + } + } + id.Error = ids.Error + return id +} + +// ID holds information about the Cluster peer +type ID struct { + ID peer.ID + PublicKey crypto.PubKey + Addresses []ma.Multiaddr + Version string + Commit string + RPCProtocolVersion protocol.ID + Error string + IPFS IPFSID +} + +// IDSerial is the serializable ID counterpart for RPC requests +type IDSerial struct { + ID string + PublicKey []byte + Addresses [][]byte + Version string + Commit string + RPCProtocolVersion string + Error string + IPFS IPFSIDSerial +} + +// ToSerial converts an ID to its Go-serializable version +func (id ID) ToSerial() IDSerial { + mAddrsB := make([][]byte, len(id.Addresses), len(id.Addresses)) + for i, a := range id.Addresses { + mAddrsB[i] = a.Bytes() + } + pkey, _ := id.PublicKey.Bytes() + return IDSerial{ + ID: peer.IDB58Encode(id.ID), + PublicKey: pkey, + Addresses: mAddrsB, + Version: id.Version, + Commit: id.Commit, + RPCProtocolVersion: string(id.RPCProtocolVersion), + Error: id.Error, + IPFS: id.IPFS.ToSerial(), + } +} + +// ToID converts an IDSerial object to ID. +// It will ignore any errors when parsing the fields. +func (ids IDSerial) ToID() ID { + id := ID{} + if pID, err := peer.IDB58Decode(ids.ID); err == nil { + id.ID = pID + } + if pkey, err := crypto.UnmarshalPublicKey(ids.PublicKey); err == nil { + id.PublicKey = pkey + } + id.Addresses = make([]ma.Multiaddr, len(ids.Addresses), len(ids.Addresses)) + for i, mAddrB := range ids.Addresses { + if mAddr, err := ma.NewMultiaddrBytes(mAddrB); err == nil { + id.Addresses[i] = mAddr + } + } + id.Version = ids.Version + id.Commit = ids.Commit + id.RPCProtocolVersion = protocol.ID(ids.RPCProtocolVersion) + id.Error = ids.Error + id.IPFS = ids.IPFS.ToID() + return id +} diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 137e8e40..d19cdd04 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -165,13 +165,50 @@ func TestClustersVersion(t *testing.T) { runF(t, clusters, f) } +func TestClustersPeers(t *testing.T) { + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + delay() + + j := rand.Intn(nClusters) // choose a random cluster peer + peers := clusters[j].Peers() + if len(peers) != nClusters { + t.Fatal("expected as many peers as clusters") + } + + clusterIDMap := make(map[peer.ID]ID) + peerIDMap := make(map[peer.ID]ID) + + for _, c := range clusters { + id := c.ID() + clusterIDMap[id.ID] = id + } + + for _, p := range peers { + peerIDMap[p.ID] = p + } + + for k, id := range clusterIDMap { + id2, ok := peerIDMap[k] + if !ok { + t.Fatal("expected id in both maps") + } + if !crypto.KeyEqual(id.PublicKey, id2.PublicKey) { + t.Error("expected same public key") + } + if id.IPFS.ID != id2.IPFS.ID { + t.Error("expected same ipfs daemon ID") + } + } +} + func TestClustersPin(t *testing.T) { clusters, mock := createClusters(t) defer shutdownClusters(t, clusters, mock) exampleCid, _ := cid.Decode(testCid) prefix := exampleCid.Prefix() for i := 0; i < nPins; i++ { - j := rand.Intn(nClusters) // choose a random cluster member + j := rand.Intn(nClusters) // choose a random cluster peer h, err := prefix.Sum(randomBytes()) // create random cid checkErr(t, err) err = clusters[j].Pin(h) @@ -204,7 +241,7 @@ func TestClustersPin(t *testing.T) { pinList := clusters[0].Pins() for i := 0; i < nPins; i++ { - j := rand.Intn(nClusters) // choose a random cluster member + j := rand.Intn(nClusters) // choose a random cluster peer err := clusters[j].Unpin(pinList[i]) if err != nil { t.Errorf("error unpinning %s: %s", pinList[i], err) @@ -340,7 +377,7 @@ func TestClustersSyncAll(t *testing.T) { clusters[0].Pin(h2) delay() - j := rand.Intn(nClusters) // choose a random cluster member + j := rand.Intn(nClusters) // choose a random cluster peer ginfos, err := clusters[j].SyncAll() if err != nil { t.Fatal(err) @@ -357,7 +394,7 @@ func TestClustersSyncAll(t *testing.T) { t.Fatal("GlobalPinInfo should have this cluster") } if inf.Status != TrackerStatusPinError && inf.Status != TrackerStatusPinning { - t.Error("should be PinError in all members") + t.Error("should be PinError in all peers") } } } @@ -398,7 +435,7 @@ func TestClustersSync(t *testing.T) { } if inf.Status != TrackerStatusPinError && inf.Status != TrackerStatusPinning { - t.Error("should be PinError or Pinning in all members") + t.Error("should be PinError or Pinning in all peers") } } @@ -418,7 +455,7 @@ func TestClustersSync(t *testing.T) { t.Fatal("GlobalPinInfo should have this cluster") } if inf.Status != TrackerStatusPinned { - t.Error("the GlobalPinInfo should show Pinned in all members") + t.Error("the GlobalPinInfo should show Pinned in all peers") } } } @@ -488,7 +525,7 @@ func TestClustersRecover(t *testing.T) { } if inf.Status != TrackerStatusPinError { - t.Error("should be PinError in all members") + t.Error("should be PinError in all peers") } } @@ -508,7 +545,7 @@ func TestClustersRecover(t *testing.T) { t.Fatal("GlobalPinInfo should have this cluster") } if inf.Status != TrackerStatusPinned { - t.Error("the GlobalPinInfo should show Pinned in all members") + t.Error("the GlobalPinInfo should show Pinned in all peers") } } } diff --git a/ipfsmock_test.go b/ipfsmock_test.go index 8f0df984..c6f74898 100644 --- a/ipfsmock_test.go +++ b/ipfsmock_test.go @@ -63,6 +63,15 @@ func (m *ipfsMock) handler(w http.ResponseWriter, r *http.Request) { endp := strings.TrimPrefix(p, "/api/v0/") var cidStr string switch endp { + case "id": + resp := ipfsIDResp{ + ID: testPeerID.Pretty(), + Addresses: []string{ + "/ip4/0.0.0.0/tcp/1234", + }, + } + j, _ := json.Marshal(resp) + w.Write(j) case "pin/add": query := r.URL.Query() arg, ok := query["arg"] diff --git a/rest_api.go b/rest_api.go index b38671f4..cc0f6ce4 100644 --- a/rest_api.go +++ b/rest_api.go @@ -14,7 +14,6 @@ import ( rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" mux "github.com/gorilla/mux" @@ -88,13 +87,58 @@ type statusCidResp struct { PeerMap map[string]statusInfo `json:"peer_map"` } -type idResp struct { - ID string `json:"id"` - PublicKey string `json:"public_key"` - Addresses []string `json:"addresses"` - Version string `json:"version"` - Commit string `json:"commit"` - RPCProtocolVersion string `json:"rpc_protocol_version"` +type restIPFSIDResp struct { + ID string `json:"id"` + Addresses []string `json:"addresses"` + Error string `json:"error,omitempty"` +} + +func newRestIPFSIDResp(id IPFSID) *restIPFSIDResp { + addrs := make([]string, len(id.Addresses), len(id.Addresses)) + for i, a := range id.Addresses { + addrs[i] = a.String() + } + + return &restIPFSIDResp{ + ID: id.ID.Pretty(), + Addresses: addrs, + Error: id.Error, + } +} + +type restIDResp struct { + ID string `json:"id"` + PublicKey string `json:"public_key"` + Addresses []string `json:"addresses"` + Version string `json:"version"` + Commit string `json:"commit"` + RPCProtocolVersion string `json:"rpc_protocol_version"` + Error string `json:"error,omitempty"` + IPFS *restIPFSIDResp `json:"ipfs"` +} + +func newRestIDResp(id ID) *restIDResp { + pubKey := "" + if id.PublicKey != nil { + keyBytes, err := id.PublicKey.Bytes() + if err == nil { + pubKey = base64.StdEncoding.EncodeToString(keyBytes) + } + } + addrs := make([]string, len(id.Addresses), len(id.Addresses)) + for i, a := range id.Addresses { + addrs[i] = a.String() + } + return &restIDResp{ + ID: id.ID.Pretty(), + PublicKey: pubKey, + Addresses: addrs, + Version: id.Version, + Commit: id.Commit, + RPCProtocolVersion: string(id.RPCProtocolVersion), + Error: id.Error, + IPFS: newRestIPFSIDResp(id.IPFS), + } } type statusResp []statusCidResp @@ -172,10 +216,10 @@ func (api *RESTAPI) routes() []route { }, { - "Members", + "Peers", "GET", - "/members", - api.memberListHandler, + "/peers", + api.peerListHandler, }, { @@ -278,33 +322,16 @@ func (api *RESTAPI) SetClient(c *rpc.Client) { } func (api *RESTAPI) idHandler(w http.ResponseWriter, r *http.Request) { - idObj := ID{} + idSerial := IDSerial{} err := api.rpcClient.Call("", "Cluster", "ID", struct{}{}, - &idObj) + &idSerial) if checkRPCErr(w, err) { - pubKey := "" - if idObj.PublicKey != nil { - keyBytes, err := idObj.PublicKey.Bytes() - if err == nil { - pubKey = base64.StdEncoding.EncodeToString(keyBytes) - } - } - addrs := make([]string, len(idObj.Addresses), len(idObj.Addresses)) - for i, a := range idObj.Addresses { - addrs[i] = a.String() - } - idResponse := idResp{ - ID: idObj.ID.Pretty(), - PublicKey: pubKey, - Addresses: addrs, - Version: idObj.Version, - Commit: idObj.Commit, - RPCProtocolVersion: string(idObj.RPCProtocolVersion), - } - sendJSONResponse(w, 200, idResponse) + id := idSerial.ToID() + resp := newRestIDResp(id) + sendJSONResponse(w, 200, resp) } } @@ -321,20 +348,21 @@ func (api *RESTAPI) versionHandler(w http.ResponseWriter, r *http.Request) { } } -func (api *RESTAPI) memberListHandler(w http.ResponseWriter, r *http.Request) { - var peers []peer.ID +func (api *RESTAPI) peerListHandler(w http.ResponseWriter, r *http.Request) { + var peersSerial []IDSerial err := api.rpcClient.Call("", "Cluster", - "MemberList", + "Peers", struct{}{}, - &peers) + &peersSerial) if checkRPCErr(w, err) { - var strPeers []string - for _, p := range peers { - strPeers = append(strPeers, p.Pretty()) + var resp []*restIDResp + for _, pS := range peersSerial { + p := pS.ToID() + resp = append(resp, newRestIDResp(p)) } - sendJSONResponse(w, 200, strPeers) + sendJSONResponse(w, 200, resp) } } diff --git a/rest_api_test.go b/rest_api_test.go index 2f1aae9d..f68d296b 100644 --- a/rest_api_test.go +++ b/rest_api_test.go @@ -76,7 +76,7 @@ func TestRESTAPIShutdown(t *testing.T) { func TestRestAPIIDEndpoint(t *testing.T) { api := testRESTAPI(t) defer api.Shutdown() - id := idResp{} + id := restIDResp{} makeGet(t, "/id", &id) if id.ID != testPeerID.Pretty() { t.Error("expected correct id") @@ -93,13 +93,16 @@ func TestRESTAPIVersionEndpoint(t *testing.T) { } } -func TestRESTAPIMemberListEndpoint(t *testing.T) { +func TestRESTAPIPeerstEndpoint(t *testing.T) { api := testRESTAPI(t) defer api.Shutdown() - var list []string - makeGet(t, "/members", &list) - if len(list) != 1 || list[0] != testPeerID.Pretty() { + var list []restIDResp + makeGet(t, "/peers", &list) + if len(list) != 1 { + t.Fatal("expected 1 element") + } + if list[0].ID != testPeerID.Pretty() { t.Error("expected a different peer id list: ", list) } } diff --git a/rpc_api.go b/rpc_api.go index 88e7af49..6cb498b1 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -1,12 +1,9 @@ package ipfscluster -import ( - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-peer" -) +import cid "github.com/ipfs/go-cid" // RPCAPI is a go-libp2p-gorpc service which provides the internal ipfs-cluster -// API, which enables components and members of the cluster to communicate and +// API, which enables components and cluster peers to communicate and // request actions from each other. // // The RPC API methods are usually redirects to the actual methods in @@ -45,8 +42,9 @@ func (arg *CidArg) CID() (*cid.Cid, error) { */ // ID runs Cluster.ID() -func (api *RPCAPI) ID(in struct{}, out *ID) error { - *out = api.cluster.ID() +func (api *RPCAPI) ID(in struct{}, out *IDSerial) error { + id := api.cluster.ID().ToSerial() + *out = id return nil } @@ -85,9 +83,14 @@ func (api *RPCAPI) Version(in struct{}, out *string) error { return nil } -// MemberList runs Cluster.Members(). -func (api *RPCAPI) MemberList(in struct{}, out *[]peer.ID) error { - *out = api.cluster.Members() +// Peers runs Cluster.Peers(). +func (api *RPCAPI) Peers(in struct{}, out *[]IDSerial) error { + peers := api.cluster.Peers() + var sPeers []IDSerial + for _, p := range peers { + sPeers = append(sPeers, p.ToSerial()) + } + *out = sPeers return nil } diff --git a/rpc_api_test.go b/rpc_api_test.go index fca39788..5d771cc9 100644 --- a/rpc_api_test.go +++ b/rpc_api_test.go @@ -44,7 +44,7 @@ func (mock *mockService) PinList(in struct{}, out *[]string) error { return nil } -func (mock *mockService) ID(in struct{}, out *ID) error { +func (mock *mockService) ID(in struct{}, out *IDSerial) error { _, pubkey, _ := crypto.GenerateKeyPair( DefaultConfigCrypto, DefaultConfigKeyLength) @@ -52,7 +52,10 @@ func (mock *mockService) ID(in struct{}, out *ID) error { ID: testPeerID, PublicKey: pubkey, Version: "0.0.mock", - } + IPFS: IPFSID{ + ID: testPeerID, + }, + }.ToSerial() return nil } @@ -61,8 +64,11 @@ func (mock *mockService) Version(in struct{}, out *string) error { return nil } -func (mock *mockService) MemberList(in struct{}, out *[]peer.ID) error { - *out = []peer.ID{testPeerID} +func (mock *mockService) Peers(in struct{}, out *[]IDSerial) error { + id := IDSerial{} + mock.ID(in, &id) + + *out = []IDSerial{id} return nil }