Feat: Enable DHT-based peer discovery and routing for cluster peers
This uses go-libp2p-kad-dht as routing provider for the Cluster Peers. This means that: * A cluster peer can discover other Cluster peers even if they are not in their peerstore file. * We remove a bunch of code sending and receiving peers multiaddresses when a new peer was added to the Cluster. * PeerAdd now takes an ID and not a multiaddress. We do not need to ask the new peer which is our external multiaddress nor broadcast the new multiaddress to everyone. This will fix problems when bootstrapping a new peer to the Cluster while not all the other peers are online. * Adding a new peer does not mean to open connections to all peers anymore. The number of connections will be made according to the DHT parameters (this is good to have for future work) The that detecting a peer addition in the watchPeers() function does no longer mean that we have connected to it or that we know its multiaddresses. Therefore it's no point to save the peerstore in these events anymore. Here a question opens, should we save the peerstore at all, and should we save multiaddresses only for cluster peers, or for everyone known? Currently, the peerstore is only updated on clean shutdown, and it is updated with all the multiaddresses known, and not limited to peer IDs in the cluster, (because, why not). License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
parent
99aea7ba79
commit
d3d1f960f5
|
@ -8,11 +8,10 @@ import (
|
|||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
)
|
||||
|
||||
// ID returns information about the cluster Peer.
|
||||
|
@ -34,13 +33,13 @@ func (c *Client) Peers() ([]api.ID, error) {
|
|||
}
|
||||
|
||||
type peerAddBody struct {
|
||||
Addr string `json:"peer_multiaddress"`
|
||||
PeerID string `json:"peer_id"`
|
||||
}
|
||||
|
||||
// PeerAdd adds a new peer to the cluster.
|
||||
func (c *Client) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
|
||||
addrStr := addr.String()
|
||||
body := peerAddBody{addrStr}
|
||||
func (c *Client) PeerAdd(pid peer.ID) (api.ID, error) {
|
||||
pidStr := peer.IDB58Encode(pid)
|
||||
body := peerAddBody{pidStr}
|
||||
|
||||
var buf bytes.Buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
|
|
|
@ -104,8 +104,7 @@ func TestPeerAdd(t *testing.T) {
|
|||
defer shutdown(api)
|
||||
|
||||
testF := func(t *testing.T, c *Client) {
|
||||
addr, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234/ipfs/" + test.TestPeerID1.Pretty())
|
||||
id, err := c.PeerAdd(addr)
|
||||
id, err := c.PeerAdd(test.TestPeerID1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ type route struct {
|
|||
}
|
||||
|
||||
type peerAddBody struct {
|
||||
PeerMultiaddr string `json:"peer_multiaddress"`
|
||||
PeerID string `json:"peer_id"`
|
||||
}
|
||||
|
||||
// NewAPI creates a new REST API component with the given configuration.
|
||||
|
@ -506,9 +506,9 @@ func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
mAddr, err := ma.NewMultiaddr(addInfo.PeerMultiaddr)
|
||||
_, err = peer.IDB58Decode(addInfo.PeerID)
|
||||
if err != nil {
|
||||
sendErrorResponse(w, 400, "error decoding peer_multiaddress")
|
||||
sendErrorResponse(w, 400, "error decoding peer_id")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -516,7 +516,7 @@ func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
|
|||
err = api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"PeerAdd",
|
||||
types.MultiaddrToSerial(mAddr),
|
||||
addInfo.PeerID,
|
||||
&ids)
|
||||
sendResponse(w, err, ids)
|
||||
}
|
||||
|
|
|
@ -275,7 +275,7 @@ func TestAPIPeerAddEndpoint(t *testing.T) {
|
|||
tf := func(t *testing.T, url urlF) {
|
||||
id := api.IDSerial{}
|
||||
// post with valid body
|
||||
body := fmt.Sprintf("{\"peer_multiaddress\":\"/ip4/1.2.3.4/tcp/1234/ipfs/%s\"}", test.TestPeerID1.Pretty())
|
||||
body := fmt.Sprintf("{\"peer_id\":\"%s\"}", test.TestPeerID1.Pretty())
|
||||
t.Log(body)
|
||||
makePost(t, rest, url(rest)+"/peers", []byte(body), &id)
|
||||
|
||||
|
@ -292,10 +292,10 @@ func TestAPIPeerAddEndpoint(t *testing.T) {
|
|||
if errResp.Code != 400 {
|
||||
t.Error("expected error with bad body")
|
||||
}
|
||||
// Send invalid multiaddr
|
||||
makePost(t, rest, url(rest)+"/peers", []byte("{\"peer_multiaddress\": \"ab\"}"), &errResp)
|
||||
// Send invalid peer id
|
||||
makePost(t, rest, url(rest)+"/peers", []byte("{\"peer_id\": \"ab\"}"), &errResp)
|
||||
if errResp.Code != 400 {
|
||||
t.Error("expected error with bad multiaddress")
|
||||
t.Error("expected error with bad peer_id")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
192
cluster.go
192
cluster.go
|
@ -15,7 +15,9 @@ import (
|
|||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
|
@ -34,6 +36,7 @@ type Cluster struct {
|
|||
id peer.ID
|
||||
config *Config
|
||||
host host.Host
|
||||
dht *dht.IpfsDHT
|
||||
rpcServer *rpc.Server
|
||||
rpcClient *rpc.Client
|
||||
peerManager *pstoremgr.Manager
|
||||
|
@ -86,6 +89,8 @@ func NewCluster(
|
|||
return nil, errors.New("cluster host is nil")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
listenAddrs := ""
|
||||
for _, addr := range host.Addrs() {
|
||||
listenAddrs += fmt.Sprintf(" %s/ipfs/%s\n", addr, host.ID().Pretty())
|
||||
|
@ -97,15 +102,32 @@ func NewCluster(
|
|||
logger.Infof("IPFS Cluster v%s listening on:\n%s\n", Version, listenAddrs)
|
||||
}
|
||||
|
||||
// Note, we already loaded peers from peerstore into the host
|
||||
// in daemon.go.
|
||||
peerManager := pstoremgr.New(host, cfg.GetPeerstorePath())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
idht, err := dht.New(ctx, host)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Let the DHT be maintained regularly
|
||||
err = idht.Bootstrap(ctx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rHost := routedhost.Wrap(host, idht)
|
||||
|
||||
c := &Cluster{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
id: host.ID(),
|
||||
config: cfg,
|
||||
host: host,
|
||||
host: rHost,
|
||||
dht: idht,
|
||||
consensus: consensus,
|
||||
api: api,
|
||||
ipfs: ipfs,
|
||||
|
@ -127,7 +149,6 @@ func NewCluster(
|
|||
c.Shutdown()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.setupRPCClients()
|
||||
go func() {
|
||||
c.ready(ReadyTimeout)
|
||||
|
@ -264,7 +285,6 @@ func (c *Cluster) alertsHandler() {
|
|||
// detects that we have been removed from the peerset, it shuts down this peer.
|
||||
func (c *Cluster) watchPeers() {
|
||||
ticker := time.NewTicker(c.config.PeerWatchInterval)
|
||||
lastPeers := PeersFromMultiaddrs(c.peerManager.LoadPeerstore())
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -272,7 +292,6 @@ func (c *Cluster) watchPeers() {
|
|||
return
|
||||
case <-ticker.C:
|
||||
logger.Debugf("%s watching peers", c.id)
|
||||
save := false
|
||||
hasMe := false
|
||||
peers, err := c.consensus.Peers()
|
||||
if err != nil {
|
||||
|
@ -286,28 +305,12 @@ func (c *Cluster) watchPeers() {
|
|||
}
|
||||
}
|
||||
|
||||
if len(peers) != len(lastPeers) {
|
||||
save = true
|
||||
} else {
|
||||
added, removed := diffPeers(lastPeers, peers)
|
||||
if len(added) != 0 || len(removed) != 0 {
|
||||
save = true
|
||||
}
|
||||
}
|
||||
|
||||
lastPeers = peers
|
||||
|
||||
if !hasMe {
|
||||
logger.Infof("%s: removed from raft. Initiating shutdown", c.id.Pretty())
|
||||
c.removed = true
|
||||
go c.Shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
if save {
|
||||
logger.Info("peerset change detected. Saving peers addresses")
|
||||
c.peerManager.SavePeerstoreForPeers(peers)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -419,6 +422,12 @@ func (c *Cluster) Shutdown() error {
|
|||
|
||||
logger.Info("shutting down Cluster")
|
||||
|
||||
// Try to store peerset file for all known peers whatsoever
|
||||
// if we got ready (otherwise, don't overwrite anything)
|
||||
if c.readyB {
|
||||
c.peerManager.SavePeerstoreForPeers(c.host.Peerstore().Peers())
|
||||
}
|
||||
|
||||
// Only attempt to leave if:
|
||||
// - consensus is initialized
|
||||
// - cluster was ready (no bootstrapping error)
|
||||
|
@ -443,13 +452,6 @@ func (c *Cluster) Shutdown() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Do not save anything if we were not ready
|
||||
// if c.readyB {
|
||||
// // peers are saved usually on addPeer/rmPeer
|
||||
// // c.peerManager.savePeers()
|
||||
// c.config.BackupState(c.state)
|
||||
//}
|
||||
|
||||
// We left the cluster or were removed. Destroy the Raft state.
|
||||
if c.removed && c.readyB {
|
||||
err := c.consensus.Clean()
|
||||
|
@ -529,86 +531,24 @@ func (c *Cluster) ID() api.ID {
|
|||
|
||||
// PeerAdd adds a new peer to this Cluster.
|
||||
//
|
||||
// The new peer must be reachable. It will be added to the
|
||||
// consensus and will receive the shared state (including the
|
||||
// list of peers). The new peer should be a single-peer cluster,
|
||||
// preferable without any relevant state.
|
||||
func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
|
||||
// For it to work well, the new peer should be discoverable
|
||||
// (part of our peerstore or connected to one of the existing peers)
|
||||
// and reachable. Since PeerAdd allows to add peers which are
|
||||
// not running, or reachable, it is recommended to call Join() from the
|
||||
// new peer instead.
|
||||
//
|
||||
// The new peer ID will be passed to the consensus
|
||||
// component to be added to the peerset.
|
||||
func (c *Cluster) PeerAdd(pid peer.ID) (api.ID, error) {
|
||||
// starting 10 nodes on the same box for testing
|
||||
// causes deadlock and a global lock here
|
||||
// seems to help.
|
||||
c.paMux.Lock()
|
||||
defer c.paMux.Unlock()
|
||||
logger.Debugf("peerAdd called with %s", addr)
|
||||
pid, decapAddr, err := api.Libp2pMultiaddrSplit(addr)
|
||||
if err != nil {
|
||||
id := api.ID{
|
||||
Error: err.Error(),
|
||||
}
|
||||
return id, err
|
||||
}
|
||||
|
||||
// Figure out its real address if we have one
|
||||
remoteAddr := getRemoteMultiaddr(c.host, pid, decapAddr)
|
||||
|
||||
// whisper address to everyone, including ourselves
|
||||
peers, err := c.consensus.Peers()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return api.ID{Error: err.Error()}, err
|
||||
}
|
||||
|
||||
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, len(peers))
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
||||
errs := c.rpcClient.MultiCall(
|
||||
ctxs,
|
||||
peers,
|
||||
"Cluster",
|
||||
"PeerManagerAddPeer",
|
||||
api.MultiaddrToSerial(remoteAddr),
|
||||
rpcutil.RPCDiscardReplies(len(peers)),
|
||||
)
|
||||
|
||||
brk := false
|
||||
for i, e := range errs {
|
||||
if e != nil {
|
||||
brk = true
|
||||
logger.Errorf("%s: %s", peers[i].Pretty(), e)
|
||||
}
|
||||
}
|
||||
if brk {
|
||||
msg := "error broadcasting new peer's address: all cluster members need to be healthy for this operation to succeed. Try removing any unhealthy peers. Check the logs for more information about the error."
|
||||
logger.Error(msg)
|
||||
id := api.ID{ID: pid, Error: "error broadcasting new peer's address"}
|
||||
return id, errors.New(msg)
|
||||
}
|
||||
|
||||
// Figure out our address to that peer. This also
|
||||
// ensures that it is reachable
|
||||
var addrSerial api.MultiaddrSerial
|
||||
err = c.rpcClient.Call(pid, "Cluster",
|
||||
"RemoteMultiaddrForPeer", c.id, &addrSerial)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
id := api.ID{ID: pid, Error: err.Error()}
|
||||
return id, err
|
||||
}
|
||||
|
||||
// Send cluster peers to the new peer.
|
||||
clusterPeers := append(c.peerManager.PeersAddresses(peers),
|
||||
addrSerial.ToMultiaddr())
|
||||
err = c.rpcClient.Call(pid,
|
||||
"Cluster",
|
||||
"PeerManagerImportAddresses",
|
||||
api.MultiaddrsToSerial(clusterPeers),
|
||||
&struct{}{})
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
logger.Debugf("peerAdd called with %s", pid.Pretty())
|
||||
|
||||
// Log the new peer in the log so everyone gets it.
|
||||
err = c.consensus.AddPeer(pid)
|
||||
err := c.consensus.AddPeer(pid)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
id := api.ID{ID: pid, Error: err.Error()}
|
||||
|
@ -639,20 +579,21 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
|
|||
}
|
||||
newNodePeers := id.ClusterPeers
|
||||
added, removed := diffPeers(ownPeers, newNodePeers)
|
||||
if len(added) == 0 && len(removed) == 0 {
|
||||
if len(added) == 0 && len(removed) == 0 && containsPeer(ownPeers, pid) {
|
||||
break // the new peer has fully joined
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
logger.Debugf("%s addPeer: retrying to get ID from %s",
|
||||
c.id.Pretty(), pid.Pretty())
|
||||
}
|
||||
logger.Info("Peer added ", pid.Pretty())
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// PeerRemove removes a peer from this Cluster.
|
||||
//
|
||||
// The peer will be removed from the consensus peerset, all it's content
|
||||
// will be re-pinned and the peer it will shut itself down.
|
||||
// The peer will be removed from the consensus peerset.
|
||||
// This may first trigger repinnings for all content if not disabled.
|
||||
func (c *Cluster) PeerRemove(pid peer.ID) error {
|
||||
// We need to repin before removing the peer, otherwise, it won't
|
||||
// be able to submit the pins.
|
||||
|
@ -664,13 +605,14 @@ func (c *Cluster) PeerRemove(pid peer.ID) error {
|
|||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("Peer removed ", pid.Pretty())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Join adds this peer to an existing cluster. The calling peer should
|
||||
// be a single-peer cluster node. This is almost equivalent to calling
|
||||
// PeerAdd on the destination cluster.
|
||||
// Join adds this peer to an existing cluster by bootstrapping to a
|
||||
// given multiaddress. It works by calling PeerAdd on the destination
|
||||
// cluster and making sure that the new peer is ready to discover and contact
|
||||
// the rest.
|
||||
func (c *Cluster) Join(addr ma.Multiaddr) error {
|
||||
logger.Debugf("Join(%s)", addr)
|
||||
|
||||
|
@ -695,14 +637,35 @@ func (c *Cluster) Join(addr ma.Multiaddr) error {
|
|||
err = c.rpcClient.Call(pid,
|
||||
"Cluster",
|
||||
"PeerAdd",
|
||||
api.MultiaddrToSerial(
|
||||
api.MustLibp2pMultiaddrJoin(c.config.ListenAddr, c.id)),
|
||||
peer.IDB58Encode(c.id),
|
||||
&myID)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// We need to trigger a DHT bootstrap asap for this peer to not be
|
||||
// lost if the peer it bootstrapped to goes down. We do this manually
|
||||
// by triggering 1 round of bootstrap in the background.
|
||||
// Note that our regular bootstrap process is still running in the
|
||||
// background since we created the cluster.
|
||||
go func() {
|
||||
ch := make(chan time.Time)
|
||||
bstCfg := dht.DefaultBootstrapConfig
|
||||
dhtBstCtx, cancel := context.WithTimeout(c.ctx, bstCfg.Timeout*2)
|
||||
defer cancel()
|
||||
proc, err := c.dht.BootstrapOnSignal(bstCfg, ch)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
ch <- time.Now() // boostrap
|
||||
defer close(ch)
|
||||
select {
|
||||
case <-dhtBstCtx.Done(): // shut down the process
|
||||
proc.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// wait for leader and for state to catch up
|
||||
// then sync
|
||||
err = c.consensus.WaitForSync()
|
||||
|
@ -711,15 +674,6 @@ func (c *Cluster) Join(addr ma.Multiaddr) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Since we might call this while not ready (bootstrap), we need to save
|
||||
// peers or we won't notice.
|
||||
peers, err := c.consensus.Peers()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
} else {
|
||||
c.peerManager.SavePeerstoreForPeers(peers)
|
||||
}
|
||||
|
||||
c.StateSync()
|
||||
|
||||
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
|
||||
|
|
|
@ -87,9 +87,14 @@
|
|||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
"hash": "QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB",
|
||||
"hash": "QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP",
|
||||
"name": "go-cid",
|
||||
"version": "0.7.21"
|
||||
"version": "0.7.22"
|
||||
},
|
||||
{
|
||||
"hash": "QmQYwRL1T9dJtdCScoeRQwwvScbJTcWqnXhq4dYQ6Cu5vX",
|
||||
"name": "go-libp2p-kad-dht",
|
||||
"version": "4.2.6"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.11.0",
|
||||
|
|
|
@ -1,17 +1,18 @@
|
|||
package ipfscluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/test"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
|
@ -29,6 +30,22 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
|
|||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// This allows discovery
|
||||
// PeerAdd won't work without this.
|
||||
for i := 1; i < nClusters; i++ {
|
||||
err := cls[i].host.Connect(
|
||||
context.Background(),
|
||||
peerstore.PeerInfo{
|
||||
ID: cls[0].id,
|
||||
Addrs: cls[0].host.Addrs(),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
return cls, mocks
|
||||
}
|
||||
|
||||
|
@ -46,8 +63,7 @@ func TestClustersPeerAdd(t *testing.T) {
|
|||
}
|
||||
|
||||
for i := 1; i < len(clusters); i++ {
|
||||
addr := clusterAddr(clusters[i])
|
||||
id, err := clusters[0].PeerAdd(addr)
|
||||
id, err := clusters[0].PeerAdd(clusters[i].id)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -86,7 +102,14 @@ func TestClustersPeerAdd(t *testing.T) {
|
|||
t.Log(c.ID().ClusterPeers)
|
||||
t.Error("By now cluster peers should reflect all peers")
|
||||
}
|
||||
}
|
||||
runF(t, clusters, f)
|
||||
|
||||
for _, c := range clusters {
|
||||
c.Shutdown()
|
||||
}
|
||||
|
||||
f2 := func(t *testing.T, c *Cluster) {
|
||||
// check that all peers are part of the peerstore
|
||||
// (except ourselves)
|
||||
addrs := c.peerManager.LoadPeerstore()
|
||||
|
@ -99,15 +122,15 @@ func TestClustersPeerAdd(t *testing.T) {
|
|||
peerMap[pid] = struct{}{}
|
||||
}
|
||||
|
||||
if len(peerMap) != nClusters-1 {
|
||||
t.Error(c.peerManager.LoadPeerstore())
|
||||
t.Errorf("%s: expected different cluster peers in the peerstore", c.id)
|
||||
if len(peerMap) == 0 {
|
||||
t.Errorf("%s: peerstore to store at least 1 peer", c.id)
|
||||
}
|
||||
|
||||
}
|
||||
runF(t, clusters, f)
|
||||
runF(t, clusters, f2)
|
||||
}
|
||||
|
||||
func TestClustersPeerAddBadPeer(t *testing.T) {
|
||||
func TestClustersJoinBadPeer(t *testing.T) {
|
||||
clusters, mocks := peerManagerClusters(t)
|
||||
defer shutdownClusters(t, clusters, mocks)
|
||||
|
||||
|
@ -115,7 +138,7 @@ func TestClustersPeerAddBadPeer(t *testing.T) {
|
|||
t.Skip("need at least 2 nodes for this test")
|
||||
}
|
||||
|
||||
badClusterAddr := clusterAddr(clusters[1])
|
||||
addr := clusterAddr(clusters[1])
|
||||
|
||||
// We add a cluster that has been shutdown
|
||||
// (closed transports)
|
||||
|
@ -125,7 +148,7 @@ func TestClustersPeerAddBadPeer(t *testing.T) {
|
|||
// Sometimes we hang otherwise.
|
||||
delay()
|
||||
|
||||
_, err := clusters[0].PeerAdd(badClusterAddr)
|
||||
err := clusters[0].Join(addr)
|
||||
if err == nil {
|
||||
t.Error("expected an error")
|
||||
}
|
||||
|
@ -143,19 +166,21 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
|
|||
t.Skip("need at least 3 nodes for this test")
|
||||
}
|
||||
|
||||
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
|
||||
_, err := clusters[0].PeerAdd(clusters[1].id)
|
||||
ids := clusters[1].Peers()
|
||||
if len(ids) != 2 {
|
||||
t.Error("expected 2 peers")
|
||||
}
|
||||
|
||||
// Now we shutdown one member of the running cluster
|
||||
// Now we shutdown the one member of the running cluster
|
||||
// and try to add someone else.
|
||||
err = clusters[1].Shutdown()
|
||||
if err != nil {
|
||||
t.Error("Shutdown should be clean: ", err)
|
||||
}
|
||||
_, err = clusters[0].PeerAdd(clusterAddr(clusters[2]))
|
||||
delay() // This makes sure the leader realizes
|
||||
//that it's not leader anymore. Otherwise it commits fine.
|
||||
_, err = clusters[0].PeerAdd(clusters[2].id)
|
||||
|
||||
if err == nil {
|
||||
t.Error("expected an error")
|
||||
|
@ -510,8 +535,7 @@ func TestClustersPeerRejoin(t *testing.T) {
|
|||
|
||||
// add all clusters
|
||||
for i := 1; i < len(clusters); i++ {
|
||||
addr := clusterAddr(clusters[i])
|
||||
_, err := clusters[0].PeerAdd(addr)
|
||||
_, err := clusters[0].PeerAdd(clusters[i].id)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -555,8 +579,7 @@ func TestClustersPeerRejoin(t *testing.T) {
|
|||
c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret)
|
||||
clusters[0] = c0
|
||||
mocks[0] = m0
|
||||
addr := clusterAddr(c0)
|
||||
_, err = clusters[1].PeerAdd(addr)
|
||||
err = c0.Join(clusterAddr(clusters[1]))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package ipfscluster
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestClusterSecretFormat(t *testing.T) {
|
||||
goodSecret := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
|
@ -40,7 +42,7 @@ func TestSimplePNet(t *testing.T) {
|
|||
t.Skip("need at least 2 nodes for this test")
|
||||
}
|
||||
|
||||
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
|
||||
_, err := clusters[0].PeerAdd(clusters[1].id)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
42
rpc_api.go
42
rpc_api.go
|
@ -2,7 +2,6 @@ package ipfscluster
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
|
||||
|
@ -83,9 +82,9 @@ func (rpcapi *RPCAPI) Peers(ctx context.Context, in struct{}, out *[]api.IDSeria
|
|||
}
|
||||
|
||||
// PeerAdd runs Cluster.PeerAdd().
|
||||
func (rpcapi *RPCAPI) PeerAdd(ctx context.Context, in api.MultiaddrSerial, out *api.IDSerial) error {
|
||||
addr := in.ToMultiaddr()
|
||||
id, err := rpcapi.c.PeerAdd(addr)
|
||||
func (rpcapi *RPCAPI) PeerAdd(ctx context.Context, in string, out *api.IDSerial) error {
|
||||
pid, _ := peer.IDB58Decode(in)
|
||||
id, err := rpcapi.c.PeerAdd(pid)
|
||||
*out = id.ToSerial()
|
||||
return err
|
||||
}
|
||||
|
@ -335,24 +334,6 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe
|
|||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
Peer Manager methods
|
||||
*/
|
||||
|
||||
// PeerManagerAddPeer runs peerManager.addPeer().
|
||||
func (rpcapi *RPCAPI) PeerManagerAddPeer(ctx context.Context, in api.MultiaddrSerial, out *struct{}) error {
|
||||
addr := in.ToMultiaddr()
|
||||
err := rpcapi.c.peerManager.ImportPeer(addr, false)
|
||||
return err
|
||||
}
|
||||
|
||||
// PeerManagerImportAddresses runs peerManager.importAddresses().
|
||||
func (rpcapi *RPCAPI) PeerManagerImportAddresses(ctx context.Context, in api.MultiaddrsSerial, out *struct{}) error {
|
||||
addrs := in.ToMultiaddrs()
|
||||
err := rpcapi.c.peerManager.ImportPeers(addrs, false)
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
PeerMonitor
|
||||
*/
|
||||
|
@ -368,20 +349,3 @@ func (rpcapi *RPCAPI) PeerMonitorLatestMetrics(ctx context.Context, in string, o
|
|||
*out = rpcapi.c.monitor.LatestMetrics(in)
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
Other
|
||||
*/
|
||||
|
||||
// RemoteMultiaddrForPeer returns the multiaddr of a peer as seen by this peer.
|
||||
// This is necessary for a peer to figure out which of its multiaddresses the
|
||||
// peers are seeing (also when crossing NATs). It should be called from
|
||||
// the peer the IN parameter indicates.
|
||||
func (rpcapi *RPCAPI) RemoteMultiaddrForPeer(ctx context.Context, in peer.ID, out *api.MultiaddrSerial) error {
|
||||
conns := rpcapi.c.host.Network().ConnsToPeer(in)
|
||||
if len(conns) == 0 {
|
||||
return errors.New("no connections to: " + in.Pretty())
|
||||
}
|
||||
*out = api.MultiaddrToSerial(api.MustLibp2pMultiaddrJoin(conns[0].RemoteMultiaddr(), in))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ func (mock *mockService) Peers(ctx context.Context, in struct{}, out *[]api.IDSe
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) PeerAdd(ctx context.Context, in api.MultiaddrSerial, out *api.IDSerial) error {
|
||||
func (mock *mockService) PeerAdd(ctx context.Context, in string, out *api.IDSerial) error {
|
||||
id := api.IDSerial{}
|
||||
mock.ID(ctx, struct{}{}, &id)
|
||||
*out = id
|
||||
|
@ -300,12 +300,6 @@ func (mock *mockService) TrackerRecover(ctx context.Context, in api.PinSerial, o
|
|||
return nil
|
||||
}
|
||||
|
||||
/* PeerManager methods */
|
||||
|
||||
func (mock *mockService) PeerManagerAddPeer(ctx context.Context, in api.MultiaddrSerial, out *struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
/* PeerMonitor methods */
|
||||
|
||||
// PeerMonitorLogMetric runs PeerMonitor.LogMetric().
|
||||
|
|
Loading…
Reference in New Issue
Block a user