Fix #787: Connectivity fixes

Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.

We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.

For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.

This introduces a number of changes:

* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
  * Protect the trusted Peer ID in the conn manager
  * Give top priority in the PeerManager to that Peer (see below)
  * Mark addresses as permanent in the Peerstore

The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.

This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.

All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
This commit is contained in:
Hector Sanjuan 2019-05-23 18:41:33 +02:00
parent 501ee7b41c
commit 196aa23f34
10 changed files with 346 additions and 157 deletions

View File

@ -8,11 +8,10 @@ import (
"net/http"
"time"
"github.com/ipfs/ipfs-cluster/api"
p2phttp "github.com/hsanjuan/go-libp2p-http"
libp2p "github.com/libp2p/go-libp2p"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pnet "github.com/libp2p/go-libp2p-pnet"
madns "github.com/multiformats/go-multiaddr-dns"
@ -41,11 +40,15 @@ func (c *defaultClient) defaultTransport() {
func (c *defaultClient) enableLibp2p() error {
c.defaultTransport()
pid, addr, err := api.Libp2pMultiaddrSplit(c.config.APIAddr)
pinfo, err := peerstore.InfoFromP2pAddr(c.config.APIAddr)
if err != nil {
return err
}
if len(pinfo.Addrs) == 0 {
return errors.New("APIAddr only includes a Peer ID")
}
var prot ipnet.Protector
if c.config.ProtectorKey != nil && len(c.config.ProtectorKey) > 0 {
if len(c.config.ProtectorKey) != 32 {
@ -67,16 +70,16 @@ func (c *defaultClient) enableLibp2p() error {
ctx, cancel := context.WithTimeout(c.ctx, ResolveTimeout)
defer cancel()
resolvedAddrs, err := madns.Resolve(ctx, addr)
resolvedAddrs, err := madns.Resolve(ctx, pinfo.Addrs[0])
if err != nil {
return err
}
h.Peerstore().AddAddrs(pid, resolvedAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().AddAddrs(pinfo.ID, resolvedAddrs, peerstore.PermanentAddrTTL)
c.transport.RegisterProtocol("libp2p", p2phttp.NewTransport(h))
c.net = "libp2p"
c.p2p = h
c.hostname = pid.Pretty()
c.hostname = peer.IDB58Encode(pinfo.ID)
return nil
}

View File

@ -1,8 +1,6 @@
package api
import (
"fmt"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
@ -32,29 +30,6 @@ func StringsToPeers(strs []string) []peer.ID {
return peers
}
// Libp2pMultiaddrSplit takes a LibP2P multiaddress (/<multiaddr>/ipfs/<peerID>)
// and decapsulates it, parsing the peer ID. Returns an error if there is
// any problem (for example, the provided address not being a Libp2p one).
func Libp2pMultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
pid, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
err = fmt.Errorf("invalid peer multiaddress: %s: %s", addr, err)
logger.Error(err)
return "", nil, err
}
ipfs, _ := ma.NewMultiaddr("/ipfs/" + pid)
decapAddr := addr.Decapsulate(ipfs)
peerID, err := peer.IDB58Decode(pid)
if err != nil {
err = fmt.Errorf("invalid peer ID in multiaddress: %s: %s", pid, err)
logger.Error(err)
return "", nil, err
}
return peerID, decapAddr, nil
}
// MustLibp2pMultiaddrJoin takes a LibP2P multiaddress and a peer ID and
// encapsulates a new /ipfs/<peerID> address. It will panic if the given
// peer ID is bad.

View File

@ -24,6 +24,7 @@ import (
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
@ -36,7 +37,10 @@ import (
// consensus layer.
var ReadyTimeout = 30 * time.Second
var pingMetricName = "ping"
const (
pingMetricName = "ping"
bootstrapCount = 3
)
// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
@ -116,9 +120,7 @@ func NewCluster(
logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)
// Note, we already loaded peers from peerstore into the host
// in daemon.go.
peerManager := pstoremgr.New(host, cfg.GetPeerstorePath())
peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath())
c := &Cluster{
ctx: ctx,
@ -144,6 +146,18 @@ func NewCluster(
readyB: false,
}
// Import known cluster peers from peerstore file. Set
// a non permanent TTL.
c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL)
// Attempt to connect to some peers (up to bootstrapCount)
actualCount := c.peerManager.Bootstrap(bootstrapCount)
// We cannot warn about this as this is normal if going to Join() later
logger.Debugf("bootstrap count %d", actualCount)
// Bootstrap the DHT now that we possibly have some connections
c.dht.Bootstrap(c.ctx)
// After setupRPC components can do their tasks with a fully operative
// routed libp2p host with some connections and a working DHT (hopefully).
err = c.setupRPC()
if err != nil {
c.Shutdown(ctx)
@ -465,9 +479,6 @@ This might be due to one or several causes:
// Cluster is ready.
// Bootstrap the DHT now that we possibly have some connections
c.dht.Bootstrap(c.ctx)
peers, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
@ -632,12 +643,24 @@ func (c *Cluster) ID(ctx context.Context) *api.ID {
peers, _ = c.consensus.Peers(ctx)
}
clusterPeerInfos := c.peerManager.PeerInfos(peers)
addresses := []api.Multiaddr{}
for _, pinfo := range clusterPeerInfos {
addrs, err := peerstore.InfoToP2pAddrs(&pinfo)
if err != nil {
continue
}
for _, a := range addrs {
addresses = append(addresses, api.NewMultiaddrWithValue(a))
}
}
return &api.ID{
ID: c.id,
//PublicKey: c.host.Peerstore().PubKey(c.id),
Addresses: addrs,
ClusterPeers: peers,
ClusterPeersAddresses: c.peerManager.PeersAddresses(peers),
ClusterPeersAddresses: addresses,
Version: version.Version.String(),
RPCProtocolVersion: version.RPCProtocol,
IPFS: ipfsID,
@ -720,20 +743,15 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
logger.Debugf("Join(%s)", addr)
pid, _, err := api.Libp2pMultiaddrSplit(addr)
// Add peer to peerstore so we can talk to it (and connect)
pid, err := c.peerManager.ImportPeer(addr, true, peerstore.PermanentAddrTTL)
if err != nil {
logger.Error(err)
return err
}
// Bootstrap to myself
if pid == c.id {
return nil
}
// Add peer to peerstore so we can talk to it (and connect)
c.peerManager.ImportPeer(addr, true)
// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
// ListenAddr).

View File

@ -23,7 +23,6 @@ import (
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"go.opencensus.io/tag"
ds "github.com/ipfs/go-datastore"
@ -113,13 +112,6 @@ func createCluster(
ctx, err := tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty()))
checkErr("tag context with host id", err)
peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath())
// Import peers but do not connect. We cannot connect to peers until
// everything has been created (dht, pubsub, bitswap). Otherwise things
// fail.
// Connections will happen as needed during bootstrap, rpc etc.
peerstoreMgr.ImportPeersFromPeerstore(false)
api, err := rest.NewAPIWithHost(ctx, cfgs.apiCfg, host)
checkErr("creating REST API component", err)

View File

@ -73,7 +73,7 @@ func (raftsm *raftStateManager) ImportState(r io.Reader) error {
if err != nil {
return err
}
pm := pstoremgr.New(nil, raftsm.cfgs.clusterCfg.GetPeerstorePath())
pm := pstoremgr.New(context.Background(), nil, raftsm.cfgs.clusterCfg.GetPeerstorePath())
raftPeers := append(
ipfscluster.PeersFromMultiaddrs(pm.LoadPeerstore()),
raftsm.ident.ID,

View File

@ -9,6 +9,7 @@ import (
ipfslite "github.com/hsanjuan/ipfs-lite"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/dsstate"
multihash "github.com/multiformats/go-multihash"
@ -22,6 +23,7 @@ import (
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
@ -29,6 +31,7 @@ var logger = logging.Logger("crdt")
var (
blocksNs = "b" // blockstore namespace
connMgrTag = "crdt"
)
// Common variables for the module.
@ -49,6 +52,7 @@ type Consensus struct {
trustedPeers sync.Map
host host.Host
peerManager *pstoremgr.Manager
store ds.Datastore
namespace ds.Key
@ -89,6 +93,7 @@ func New(
cancel: cancel,
config: cfg,
host: host,
peerManager: pstoremgr.New(ctx, host, ""),
dht: dht,
store: store,
namespace: ds.NewKey(cfg.DatastoreNamespace),
@ -98,6 +103,7 @@ func New(
}
// Set up a fast-lookup trusted peers cache.
// Protect these peers in the ConnMgr
for _, p := range css.config.TrustedPeers {
css.Trust(ctx, p)
}
@ -296,9 +302,18 @@ func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
return ok
}
// Trust marks a peer as "trusted".
// Trust marks a peer as "trusted". It makes sure it is trusted as issuer
// for pubsub updates, it is protected in the connection manager, it
// has the highest priority when the peerstore is saved, and it's addresses
// are always remembered.
func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error {
css.trustedPeers.Store(pid, struct{}{})
if conman := css.host.ConnManager(); conman != nil {
conman.Protect(pid, connMgrTag)
}
css.peerManager.SetPriority(pid, 0)
addrs := css.host.Peerstore().Addrs(pid)
css.host.Peerstore().SetAddrs(pid, addrs, peerstore.PermanentAddrTTL)
return nil
}

View File

@ -155,11 +155,11 @@ func TestClustersPeerAdd(t *testing.T) {
addrs := c.peerManager.LoadPeerstore()
peerMap := make(map[peer.ID]struct{})
for _, a := range addrs {
pid, _, err := api.Libp2pMultiaddrSplit(a)
pinfo, err := peerstore.InfoFromP2pAddr(a)
if err != nil {
t.Fatal(err)
}
peerMap[pid] = struct{}{}
peerMap[pinfo.ID] = struct{}{}
}
if len(peerMap) == 0 {

View File

@ -10,11 +10,10 @@ import (
"context"
"fmt"
"os"
"sort"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
@ -25,10 +24,14 @@ import (
var logger = logging.Logger("pstoremgr")
// Timeouts for network operations triggered by the Manager
// PriorityTag is used to attach metadata to peers in the peerstore
// so they can be sorted.
var PriorityTag = "cluster"
// Timeouts for network operations triggered by the Manager.
var (
DNSTimeout = 5 * time.Second
ConnectTimeout = 10 * time.Second
ConnectTimeout = 5 * time.Second
)
// Manager provides utilities for handling cluster peer addresses
@ -43,9 +46,9 @@ type Manager struct {
// New creates a Manager with the given libp2p Host and peerstorePath.
// The path indicates the place to persist and read peer addresses from.
// If empty, these operations (LoadPeerstore, SavePeerstore) will no-op.
func New(h host.Host, peerstorePath string) *Manager {
func New(ctx context.Context, h host.Host, peerstorePath string) *Manager {
return &Manager{
ctx: context.Background(),
ctx: ctx,
host: h,
peerstorePath: peerstorePath,
}
@ -54,38 +57,33 @@ func New(h host.Host, peerstorePath string) *Manager {
// ImportPeer adds a new peer address to the host's peerstore, optionally
// dialing to it. It will resolve any DNS multiaddresses before adding them.
// The address is expected to include the /ipfs/<peerID> protocol part.
func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool) error {
// Peers are added with the given ttl
func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool, ttl time.Duration) (peer.ID, error) {
if pm.host == nil {
return nil
return "", nil
}
logger.Debugf("adding peer address %s", addr)
pid, decapAddr, err := api.Libp2pMultiaddrSplit(addr)
pinfo, err := peerstore.InfoFromP2pAddr(addr)
if err != nil {
return err
return "", err
}
pm.host.Peerstore().AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)
// dns multiaddresses need to be resolved because libp2p only does that
// on explicit bhost.Connect().
if madns.Matches(addr) {
ctx, cancel := context.WithTimeout(pm.ctx, DNSTimeout)
defer cancel()
resolvedAddrs, err := madns.Resolve(ctx, addr)
if err != nil {
logger.Error(err)
return err
}
pm.ImportPeers(resolvedAddrs, connect)
// Do not add ourselves
if pinfo.ID == pm.host.ID() {
return pinfo.ID, nil
}
pm.host.Peerstore().AddAddrs(pinfo.ID, pinfo.Addrs, ttl)
if connect {
go func() {
ctx, cancel := context.WithTimeout(pm.ctx, ConnectTimeout)
defer cancel()
pm.host.Network().DialPeer(ctx, pid)
pm.host.Connect(ctx, *pinfo)
}()
}
return nil
return pinfo.ID, nil
}
// RmPeer clear all addresses for a given peer ID from the host's peerstore.
@ -100,19 +98,17 @@ func (pm *Manager) RmPeer(pid peer.ID) error {
}
// if the peer has dns addresses, return only those, otherwise
// return all. In all cases, encapsulate the peer ID.
func (pm *Manager) filteredPeerAddrs(p peer.ID) []api.Multiaddr {
// return all.
func (pm *Manager) filteredPeerAddrs(p peer.ID) []ma.Multiaddr {
all := pm.host.Peerstore().Addrs(p)
peerAddrs := []api.Multiaddr{}
peerDNSAddrs := []api.Multiaddr{}
peerPart, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(p)))
peerAddrs := []ma.Multiaddr{}
peerDNSAddrs := []ma.Multiaddr{}
for _, a := range all {
encAddr := a.Encapsulate(peerPart)
if madns.Matches(encAddr) {
peerDNSAddrs = append(peerDNSAddrs, api.NewMultiaddrWithValue(encAddr))
if madns.Matches(a) {
peerDNSAddrs = append(peerDNSAddrs, a)
} else {
peerAddrs = append(peerAddrs, api.NewMultiaddrWithValue(encAddr))
peerAddrs = append(peerAddrs, a)
}
}
@ -123,11 +119,12 @@ func (pm *Manager) filteredPeerAddrs(p peer.ID) []api.Multiaddr {
return peerAddrs
}
// PeersAddresses returns the list of multiaddresses (encapsulating the
// /ipfs/<peerID> part) for the given set of peers. For peers for which
// we know DNS multiaddresses, we only return those. Otherwise, we return
// all the multiaddresses known for that peer.
func (pm *Manager) PeersAddresses(peers []peer.ID) []api.Multiaddr {
// PeerInfos returns a slice of peerinfos for the given set of peers in order
// of priority. For peers for which we know DNS
// multiaddresses, we only include those. Otherwise, the PeerInfo includes all
// the multiaddresses known for that peer. Peers without addresses are not
// included.
func (pm *Manager) PeerInfos(peers []peer.ID) []peerstore.PeerInfo {
if pm.host == nil {
return nil
}
@ -136,29 +133,47 @@ func (pm *Manager) PeersAddresses(peers []peer.ID) []api.Multiaddr {
return nil
}
var addrs []api.Multiaddr
var pinfos []peerstore.PeerInfo
for _, p := range peers {
if p == pm.host.ID() {
continue
}
addrs = append(addrs, pm.filteredPeerAddrs(p)...)
pinfo := peerstore.PeerInfo{
ID: p,
Addrs: pm.filteredPeerAddrs(p),
}
return addrs
if len(pinfo.Addrs) > 0 {
pinfos = append(pinfos, pinfo)
}
}
toSort := &peerSort{
pinfos: pinfos,
pstore: pm.host.Peerstore(),
}
// Sort from highest to lowest priority
sort.Sort(toSort)
return toSort.pinfos
}
// ImportPeers calls ImportPeer for every address in the given slice, using the
// given connect parameter.
func (pm *Manager) ImportPeers(addrs []ma.Multiaddr, connect bool) error {
for _, a := range addrs {
pm.ImportPeer(a, connect)
// given connect parameter. Peers are tagged with priority as given
// by their position in the list.
func (pm *Manager) ImportPeers(addrs []ma.Multiaddr, connect bool, ttl time.Duration) error {
for i, a := range addrs {
pid, err := pm.ImportPeer(a, connect, ttl)
if err == nil {
pm.SetPriority(pid, i)
}
}
return nil
}
// ImportPeersFromPeerstore reads the peerstore file and calls ImportPeers with
// the addresses obtained from it.
func (pm *Manager) ImportPeersFromPeerstore(connect bool) error {
return pm.ImportPeers(pm.LoadPeerstore(), connect)
func (pm *Manager) ImportPeersFromPeerstore(connect bool, ttl time.Duration) error {
return pm.ImportPeers(pm.LoadPeerstore(), connect, ttl)
}
// LoadPeerstore parses the peerstore file and returns the list
@ -202,7 +217,7 @@ func (pm *Manager) LoadPeerstore() (addrs []ma.Multiaddr) {
// SavePeerstore stores a slice of multiaddresses in the peerstore file, one
// per line.
func (pm *Manager) SavePeerstore(addrs []api.Multiaddr) {
func (pm *Manager) SavePeerstore(pinfos []peerstore.PeerInfo) {
if pm.peerstorePath == "" {
return
}
@ -221,13 +236,95 @@ func (pm *Manager) SavePeerstore(addrs []api.Multiaddr) {
}
defer f.Close()
for _, pinfo := range pinfos {
addrs, err := peerstore.InfoToP2pAddrs(&pinfo)
if err != nil {
logger.Warning(err)
continue
}
for _, a := range addrs {
f.Write([]byte(fmt.Sprintf("%s\n", a.Value().String())))
f.Write([]byte(fmt.Sprintf("%s\n", a.String())))
}
}
}
// SavePeerstoreForPeers calls PeersAddresses and then saves the peerstore
// SavePeerstoreForPeers calls PeerInfos and then saves the peerstore
// file using the result.
func (pm *Manager) SavePeerstoreForPeers(peers []peer.ID) {
pm.SavePeerstore(pm.PeersAddresses(peers))
pm.SavePeerstore(pm.PeerInfos(peers))
}
// Bootstrap attempts to get as much as count connected peers by selecting
// randomly from those in the libp2p host peerstore. It returns the number
// if peers it sucessfully connected to.
func (pm *Manager) Bootstrap(count int) int {
knownPeers := pm.host.Peerstore().PeersWithAddrs()
toSort := &peerSort{
pinfos: peerstore.PeerInfos(pm.host.Peerstore(), knownPeers),
pstore: pm.host.Peerstore(),
}
// Sort from highest to lowest priority
sort.Sort(toSort)
pinfos := toSort.pinfos
lenKnown := len(pinfos)
totalConns := 0
// keep conecting while we have peers in the store
// and we have not reached count.
for i := 0; i < lenKnown && totalConns < count; i++ {
pinfo := pinfos[i]
ctx, cancel := context.WithTimeout(pm.ctx, ConnectTimeout)
defer cancel()
logger.Infof("connecting to %s", pinfo.ID)
err := pm.host.Connect(ctx, pinfo)
if err != nil {
logger.Warning(err)
pm.SetPriority(pinfo.ID, 9999)
continue
}
totalConns++
}
return totalConns
}
// SetPriority attaches a priority to a peer. 0 means more priority than
// 1. 1 means more priority than 2 etc.
func (pm *Manager) SetPriority(pid peer.ID, prio int) error {
return pm.host.Peerstore().Put(pid, PriorityTag, prio)
}
type peerSort struct {
pinfos []peerstore.PeerInfo
pstore peerstore.Peerstore
}
func (ps *peerSort) Len() int {
return len(ps.pinfos)
}
func (ps *peerSort) Less(i, j int) bool {
pinfo1 := ps.pinfos[i]
pinfo2 := ps.pinfos[j]
var prio1, prio2 int
prio1iface, err := ps.pstore.Get(pinfo1.ID, PriorityTag)
if err == nil {
prio1 = prio1iface.(int)
}
prio2iface, err := ps.pstore.Get(pinfo2.ID, PriorityTag)
if err == nil {
prio2 = prio2iface.(int)
}
return prio1 < prio2
}
func (ps *peerSort) Swap(i, j int) {
pinfo1 := ps.pinfos[i]
pinfo2 := ps.pinfos[j]
ps.pinfos[i] = pinfo2
ps.pinfos[j] = pinfo1
}

View File

@ -4,21 +4,21 @@ import (
"context"
"os"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
libp2p "github.com/libp2p/go-libp2p"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
var pid = "QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc"
func makeMgr(t *testing.T) *Manager {
h, err := libp2p.New(context.Background())
if err != nil {
t.Fatal(err)
}
return New(h, "peerstore")
return New(context.Background(), h, "peerstore")
}
func clean(pm *Manager) {
@ -27,31 +27,45 @@ func clean(pm *Manager) {
}
}
func testAddr(loc string, pid peer.ID) ma.Multiaddr {
m, _ := ma.NewMultiaddr(loc + "/ipfs/" + peer.IDB58Encode(pid))
return m
}
func TestManager(t *testing.T) {
pm := makeMgr(t)
defer clean(pm)
testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid)
loc := "/ip4/127.0.0.1/tcp/1234"
testAddr := testAddr(loc, test.PeerID1)
err := pm.ImportPeer(testPeer, false)
_, err := pm.ImportPeer(testAddr, false, time.Minute)
if err != nil {
t.Fatal(err)
}
peers := api.StringsToPeers([]string{pid, pm.host.ID().Pretty()})
addrs := pm.PeersAddresses(peers)
if len(addrs) != 1 {
t.Fatal("expected 1 address")
peers := []peer.ID{test.PeerID1, pm.host.ID()}
pinfos := pm.PeerInfos(peers)
if len(pinfos) != 1 {
t.Fatal("expected 1 peerinfo")
}
if !addrs[0].Equal(testPeer) {
if pinfos[0].ID != test.PeerID1 {
t.Error("expected same peer as added")
}
if len(pinfos[0].Addrs) != 1 {
t.Fatal("expected an address")
}
if pinfos[0].Addrs[0].String() != loc {
t.Error("expected same address as added")
}
pm.RmPeer(peers[0])
addrs = pm.PeersAddresses(peers)
if len(addrs) != 0 {
t.Fatal("expected 0 addresses")
pinfos = pm.PeerInfos(peers)
if len(pinfos) != 0 {
t.Fatal("expected 0 pinfos")
}
}
@ -59,21 +73,27 @@ func TestManagerDNS(t *testing.T) {
pm := makeMgr(t)
defer clean(pm)
testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid)
testPeer2, _ := ma.NewMultiaddr("/dns4/localhost/tcp/1235/ipfs/" + pid)
loc1 := "/ip4/127.0.0.1/tcp/1234"
testAddr1 := testAddr(loc1, test.PeerID1)
loc2 := "/dns4/localhost/tcp/1235"
testAddr2 := testAddr(loc2, test.PeerID1)
err := pm.ImportPeers([]ma.Multiaddr{testPeer, testPeer2}, false)
err := pm.ImportPeers([]ma.Multiaddr{testAddr1, testAddr2}, false, time.Minute)
if err != nil {
t.Fatal(err)
}
addrs := pm.PeersAddresses(api.StringsToPeers([]string{pid}))
if len(addrs) != 1 {
t.Fatal("expected 1 address")
pinfos := pm.PeerInfos([]peer.ID{test.PeerID1})
if len(pinfos) != 1 {
t.Fatal("expected 1 pinfo")
}
if !addrs[0].Equal(testPeer2) {
t.Error("expected only the dns address")
if len(pinfos[0].Addrs) != 1 {
t.Error("expected a single address")
}
if pinfos[0].Addrs[0].String() != "/dns4/localhost/tcp/1235" {
t.Error("expected the dns address")
}
}
@ -81,25 +101,95 @@ func TestPeerstore(t *testing.T) {
pm := makeMgr(t)
defer clean(pm)
testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid)
testPeer2, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235/ipfs/" + pid)
loc1 := "/ip4/127.0.0.1/tcp/1234"
testAddr1 := testAddr(loc1, test.PeerID1)
loc2 := "/ip4/127.0.0.1/tcp/1235"
testAddr2 := testAddr(loc2, test.PeerID1)
err := pm.ImportPeers([]ma.Multiaddr{testPeer, testPeer2}, false)
err := pm.ImportPeers([]ma.Multiaddr{testAddr1, testAddr2}, false, time.Minute)
if err != nil {
t.Fatal(err)
}
pm.SavePeerstoreForPeers(api.StringsToPeers([]string{pid}))
pm.SavePeerstoreForPeers([]peer.ID{test.PeerID1})
pm2 := makeMgr(t)
defer clean(pm2)
err = pm2.ImportPeersFromPeerstore(false)
err = pm2.ImportPeersFromPeerstore(false, time.Minute)
if err != nil {
t.Fatal(err)
}
if len(pm2.PeersAddresses(api.StringsToPeers([]string{pid}))) != 2 {
t.Error("expected 2 addresses from the peerstore")
pinfos := pm2.PeerInfos([]peer.ID{test.PeerID1})
if len(pinfos) != 1 {
t.Fatal("expected 1 peer in the peerstore")
}
if len(pinfos[0].Addrs) != 2 {
t.Error("expected 2 addresses")
}
}
func TestPriority(t *testing.T) {
pm := makeMgr(t)
defer clean(pm)
loc1 := "/ip4/127.0.0.1/tcp/1234"
testAddr1 := testAddr(loc1, test.PeerID1)
loc2 := "/ip4/127.0.0.2/tcp/1235"
testAddr2 := testAddr(loc2, test.PeerID2)
loc3 := "/ip4/127.0.0.3/tcp/1234"
testAddr3 := testAddr(loc3, test.PeerID3)
loc4 := "/ip4/127.0.0.4/tcp/1235"
testAddr4 := testAddr(loc4, test.PeerID4)
err := pm.ImportPeers([]ma.Multiaddr{testAddr1, testAddr2, testAddr3, testAddr4}, false, time.Minute)
if err != nil {
t.Fatal(err)
}
pinfos := pm.PeerInfos([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1})
if len(pinfos) != 4 {
t.Fatal("expected 4 pinfos")
}
if pinfos[0].ID != test.PeerID1 ||
pinfos[1].ID != test.PeerID2 ||
pinfos[2].ID != test.PeerID3 ||
pinfos[3].ID != test.PeerID4 {
t.Error("wrong order of peerinfos")
}
pm.SetPriority(test.PeerID1, 100)
pinfos = pm.PeerInfos([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1})
if len(pinfos) != 4 {
t.Fatal("expected 4 pinfos")
}
if pinfos[3].ID != test.PeerID1 {
t.Fatal("PeerID1 should be last in the list")
}
pm.SavePeerstoreForPeers([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1})
pm2 := makeMgr(t)
defer clean(pm2)
err = pm2.ImportPeersFromPeerstore(false, time.Minute)
if err != nil {
t.Fatal(err)
}
pinfos = pm2.PeerInfos([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1})
if len(pinfos) != 4 {
t.Fatal("expected 4 pinfos")
}
if pinfos[0].ID != test.PeerID2 ||
pinfos[1].ID != test.PeerID3 ||
pinfos[2].ID != test.PeerID4 ||
pinfos[3].ID != test.PeerID1 {
t.Error("wrong order of peerinfos")
}
}

11
util.go
View File

@ -4,10 +4,9 @@ import (
"errors"
"fmt"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
@ -18,14 +17,14 @@ func PeersFromMultiaddrs(addrs []ma.Multiaddr) []peer.ID {
var pids []peer.ID
pm := make(map[peer.ID]struct{})
for _, addr := range addrs {
pid, _, err := api.Libp2pMultiaddrSplit(addr)
pinfo, err := peerstore.InfoFromP2pAddr(addr)
if err != nil {
continue
}
_, ok := pm[pid]
_, ok := pm[pinfo.ID]
if !ok {
pm[pid] = struct{}{}
pids = append(pids, pid)
pm[pinfo.ID] = struct{}{}
pids = append(pids, pinfo.ID)
}
}
return pids