Cluster: add libp2p host parameter to constructor.

NewCluster() now takes an optional Host parameter.
The rationale is to allow to re-use an existing libp2p Host
when creating the cluster.

The NewClusterHost method now allows to create a host
with the options used by cluster.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-03-13 18:16:15 +01:00
parent 836d552fbb
commit 41b17bf477
6 changed files with 70 additions and 93 deletions

View File

@ -3,12 +3,9 @@ package ipfscluster
import ( import (
"context" "context"
"errors" "errors"
"strings"
"sync" "sync"
"time" "time"
pnet "github.com/libp2p/go-libp2p-pnet"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state"
@ -16,11 +13,7 @@ import (
rpc "github.com/hsanjuan/go-libp2p-gorpc" rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
@ -64,6 +57,7 @@ type Cluster struct {
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready() // this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up. // if you need to wait until the peer is fully up.
func NewCluster( func NewCluster(
host host.Host,
cfg *Config, cfg *Config,
consensusCfg *raft.Config, consensusCfg *raft.Config,
api API, api API,
@ -80,10 +74,14 @@ func NewCluster(
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
host, err := makeHost(ctx, cfg)
if err != nil { if host == nil {
cancel() h, err := NewClusterHost(ctx, cfg)
return nil, err if err != nil {
cancel()
return nil, err
}
host = h
} }
if c := Commit; len(c) >= 8 { if c := Commit; len(c) >= 8 {
@ -1097,71 +1095,6 @@ func (c *Cluster) Peers() []api.ID {
return peers return peers
} }
// makeHost makes a libp2p-host.
func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
ps := peerstore.NewPeerstore()
privateKey := cfg.PrivateKey
publicKey := privateKey.GetPublic()
var protec ipnet.Protector
if len(cfg.Secret) != 0 {
var err error
clusterKey, err := clusterSecretToKey(cfg.Secret)
if err != nil {
return nil, err
}
protec, err = pnet.NewProtector(strings.NewReader(clusterKey))
if err != nil {
return nil, err
}
// this is in go-ipfs, not sure whether we want something like it here
/* go func() {
t := time.NewTicker(30 * time.Second)
<-t.C // swallow one tick
for {
select {
case <-t.C:
if ph := cfg.Host; ph != nil {
if len(ph.Network().Peers()) == 0 {
log.Warning("We are in a private network and have no peers.")
log.Warning("This might be a configuration mistake.")
}
}
case <-n.Process().Closing:
t.Stop()
return
}
}
}()*/
}
if err := ps.AddPubKey(cfg.ID, publicKey); err != nil {
return nil, err
}
if err := ps.AddPrivKey(cfg.ID, privateKey); err != nil {
return nil, err
}
ps.AddAddr(cfg.ID, cfg.ListenAddr, peerstore.PermanentAddrTTL)
network, err := swarm.NewNetworkWithProtector(
ctx,
[]ma.Multiaddr{cfg.ListenAddr},
cfg.ID,
ps,
protec,
nil,
)
if err != nil {
return nil, err
}
bhost := basichost.New(network)
return bhost, nil
}
// Perform an RPC request to multiple destinations // Perform an RPC request to multiple destinations
func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args interface{}, reply []interface{}) []error { func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args interface{}, reply []interface{}) []error {
if len(dests) != len(reply) { if len(dests) != len(reply) {

View File

@ -1,7 +1,6 @@
package ipfscluster package ipfscluster
import ( import (
"bytes"
crand "crypto/rand" crand "crypto/rand"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
@ -394,7 +393,7 @@ func (cfg *Config) ToJSON() (raw []byte, err error) {
jcfg.ID = cfg.ID.Pretty() jcfg.ID = cfg.ID.Pretty()
jcfg.Peername = cfg.Peername jcfg.Peername = cfg.Peername
jcfg.PrivateKey = pKey jcfg.PrivateKey = pKey
jcfg.Secret = EncodeClusterSecret(cfg.Secret) jcfg.Secret = EncodeProtectorKey(cfg.Secret)
jcfg.Peers = clusterPeers jcfg.Peers = clusterPeers
jcfg.Bootstrap = bootstrap jcfg.Bootstrap = bootstrap
jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin
@ -434,11 +433,6 @@ func DecodeClusterSecret(hexSecret string) ([]byte, error) {
} }
} }
// EncodeClusterSecret converts a byte slice to its hex string representation.
func EncodeClusterSecret(secretBytes []byte) string {
return hex.EncodeToString(secretBytes)
}
func generateClusterSecret() ([]byte, error) { func generateClusterSecret() ([]byte, error) {
secretBytes := make([]byte, 32) secretBytes := make([]byte, 32)
_, err := crand.Read(secretBytes) _, err := crand.Read(secretBytes)
@ -447,12 +441,3 @@ func generateClusterSecret() ([]byte, error) {
} }
return secretBytes, nil return secretBytes, nil
} }
func clusterSecretToKey(secret []byte) (string, error) {
var key bytes.Buffer
key.WriteString("/key/swarm/psk/1.0.0/\n")
key.WriteString("/base16/\n")
key.WriteString(EncodeClusterSecret(secret))
return key.String(), nil
}

View File

@ -104,6 +104,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
inf, _ := numpin.NewInformer(numpinCfg) inf, _ := numpin.NewInformer(numpinCfg)
cl, err := NewCluster( cl, err := NewCluster(
nil,
clusterCfg, clusterCfg,
consensusCfg, consensusCfg,
api, api,

57
clusterhost.go Normal file
View File

@ -0,0 +1,57 @@
package ipfscluster
import (
"bytes"
"context"
"encoding/hex"
"strings"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
pnet "github.com/libp2p/go-libp2p-pnet"
ma "github.com/multiformats/go-multiaddr"
)
// NewClusterHost creates a libp2p Host with the options in the from the
// provided cluster configuration.
func NewClusterHost(ctx context.Context, cfg *Config) (host.Host, error) {
var prot ipnet.Protector
// Create protector if we have a secret.
if cfg.Secret != nil && len(cfg.Secret) > 0 {
protKey, err := SecretToProtectorKey(cfg.Secret)
if err != nil {
return nil, err
}
prot, err = pnet.NewProtector(strings.NewReader(protKey))
if err != nil {
return nil, err
}
}
return libp2p.New(ctx,
libp2p.Identity(cfg.PrivateKey),
libp2p.ListenAddrs([]ma.Multiaddr{cfg.ListenAddr}...),
libp2p.PrivateNetwork(prot),
// FIXME: Enable when libp2p >= 5.0.16
// https://github.com/libp2p/go-libp2p/pull/293
//libp2p.NATPortMap(),
)
}
// SecretToProtectorKey converts a private network secret
// provided as a byte-slice into the format expected by go-libp2p-pnet
func SecretToProtectorKey(secret []byte) (string, error) {
var key bytes.Buffer
key.WriteString("/key/swarm/psk/1.0.0/\n")
key.WriteString("/base16/\n")
key.WriteString(EncodeProtectorKey(secret))
return key.String(), nil
}
// EncodeProtectorKey converts a byte slice to its hex string representation.
func EncodeProtectorKey(secretBytes []byte) string {
return hex.EncodeToString(secretBytes)
}

View File

@ -495,6 +495,7 @@ func daemon(c *cli.Context) error {
informer, alloc := setupAllocation(c.GlobalString("alloc"), diskInfCfg, numpinInfCfg) informer, alloc := setupAllocation(c.GlobalString("alloc"), diskInfCfg, numpinInfCfg)
cluster, err := ipfscluster.NewCluster( cluster, err := ipfscluster.NewCluster(
nil,
clusterCfg, clusterCfg,
consensusCfg, consensusCfg,
api, api,

View File

@ -125,7 +125,7 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft
} }
func createCluster(t *testing.T, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster { func createCluster(t *testing.T, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
cl, err := NewCluster(clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf) cl, err := NewCluster(nil, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
checkErr(t, err) checkErr(t, err)
<-cl.Ready() <-cl.Ready()
return cl return cl