Use multiaddresses in the configuration and rename JSON entries for clarity

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-01-23 18:38:59 +01:00
parent 365c549d7c
commit d1731ebd28
12 changed files with 299 additions and 159 deletions

View File

@ -2,15 +2,11 @@ package ipfscluster
import (
"context"
"encoding/base64"
"errors"
"fmt"
"strings"
"sync"
rpc "github.com/hsanjuan/go-libp2p-rpc"
cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
@ -59,11 +55,6 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
return nil, err
}
tracker.SetClient(rpcClient)
ipfs.SetClient(rpcClient)
api.SetClient(rpcClient)
consensus.SetClient(rpcClient)
cluster := &Cluster{
ctx: ctx,
config: cfg,
@ -85,6 +76,13 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
return nil, err
}
defer func() {
tracker.SetClient(rpcClient)
ipfs.SetClient(rpcClient)
api.SetClient(rpcClient)
consensus.SetClient(rpcClient)
}()
logger.Infof("starting IPFS Cluster v%s", Version)
cluster.run()
@ -345,57 +343,25 @@ func (c *Cluster) run() {
// makeHost makes a libp2p-host
func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
ps := peerstore.NewPeerstore()
peerID, err := peer.IDB58Decode(cfg.ID)
if err != nil {
logger.Error("decoding ID: ", err)
return nil, err
}
pkb, err := base64.StdEncoding.DecodeString(cfg.PrivateKey)
if err != nil {
logger.Error("decoding private key base64: ", err)
return nil, err
}
privateKey, err := crypto.UnmarshalPrivateKey(pkb)
if err != nil {
logger.Error("unmarshaling private key", err)
return nil, err
}
privateKey := cfg.PrivateKey
publicKey := privateKey.GetPublic()
addr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d",
cfg.ClusterAddr, cfg.ClusterPort))
if err != nil {
if err := ps.AddPubKey(cfg.ID, publicKey); err != nil {
return nil, err
}
if err := ps.AddPubKey(peerID, publicKey); err != nil {
if err := ps.AddPrivKey(cfg.ID, privateKey); err != nil {
return nil, err
}
if err := ps.AddPrivKey(peerID, privateKey); err != nil {
return nil, err
}
for _, cpeer := range cfg.ClusterPeers {
addr, err := multiaddr.NewMultiaddr(cpeer)
if err != nil {
logger.Errorf("parsing cluster peer multiaddress %s: %s", cpeer, err)
return nil, err
}
for _, addr := range cfg.ClusterPeers {
pid, err := addr.ValueForProtocol(multiaddr.P_IPFS)
if err != nil {
return nil, err
}
strAddr := strings.Split(addr.String(), "/ipfs/")[0]
maddr, err := multiaddr.NewMultiaddr(strAddr)
if err != nil {
return nil, err
}
ipfs, _ := multiaddr.NewMultiaddr("/ipfs/" + pid)
maddr := addr.Decapsulate(ipfs)
peerID, err := peer.IDB58Decode(pid)
if err != nil {
@ -410,8 +376,8 @@ func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
network, err := swarm.NewNetwork(
ctx,
[]multiaddr.Multiaddr{addr},
peerID,
[]multiaddr.Multiaddr{cfg.ClusterAddr},
cfg.ID,
ps,
nil,
)

View File

@ -160,7 +160,7 @@ func TestClusterMembers(t *testing.T) {
defer cl.Shutdown()
m := cl.Members()
id := testingConfig().ID
if len(m) != 1 || m[0].Pretty() != id {
if len(m) != 1 || m[0] != id {
t.Error("bad Members()")
}
}

234
config.go
View File

@ -3,30 +3,62 @@ package ipfscluster
import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
hashiraft "github.com/hashicorp/raft"
)
// Default parameters for the configuration
const (
DefaultConfigCrypto = crypto.RSA
DefaultConfigKeyLength = 2048
DefaultAPIAddr = "127.0.0.1"
DefaultAPIPort = 9094
DefaultIPFSAPIAddr = "127.0.0.1"
DefaultIPFSAPIPort = 9095
DefaultIPFSAddr = "127.0.0.1"
DefaultIPFSPort = 5001
DefaultClusterAddr = "0.0.0.0"
DefaultClusterPort = 9096
DefaultAPIAddr = "/ip4/127.0.0.1/tcp/9094"
DefaultIPFSProxyAddr = "/ip4/127.0.0.1/tcp/9095"
DefaultIPFSNodeAddr = "/ip4/127.0.0.1/tcp/5001"
DefaultClusterAddr = "/ip4/0.0.0.0/tcp/9096"
)
// Config represents an ipfs-cluster configuration which can be
// saved and loaded from disk. Currently it holds configuration
// values used by all components.
type Config struct {
// Libp2p ID and private key for Cluster communication (including)
// the Consensus component.
ID peer.ID
PrivateKey crypto.PrivKey
// List of multiaddresses of the peers of this cluster.
ClusterPeers []ma.Multiaddr
// Listen parameters for the Cluster libp2p Host. Used by
// the RPC and Consensus components.
ClusterAddr ma.Multiaddr
// Listen parameters for the the Cluster HTTP API component.
APIAddr ma.Multiaddr
// Listen parameters for the IPFS Proxy. Used by the IPFS
// connector component.
IPFSProxyAddr ma.Multiaddr
// Host/Port for the IPFS daemon.
IPFSNodeAddr ma.Multiaddr
// Storage folder for snapshots, log store etc. Used by
// the Consensus component.
ConsensusDataFolder string
// Hashicorp's Raft configuration
RaftConfig *hashiraft.Config
}
// This is how a config actually look in JSON
type JSONConfig struct {
// Libp2p ID and private key for Cluster communication (including)
// the Consensus component.
ID string `json:"id"`
@ -36,38 +68,150 @@ type Config struct {
ClusterPeers []string `json:"cluster_peers"`
// Listen parameters for the Cluster libp2p Host. Used by
// the Remote RPC and Consensus components.
ClusterAddr string `json:"cluster_addr"`
ClusterPort int `json:"cluster_port"`
// the RPC and Consensus components.
ClusterListenMultiaddress string `json:"cluster_multiaddress"`
// Listen parameters for the the Cluster HTTP API component.
APIListenMultiaddress string `json:"api_multiaddress`
// Listen parameters for the IPFS Proxy. Used by the IPFS
// connector component.
IPFSProxyListenMultiaddress string `json:ipfs_proxy_api_multiaddress`
// Host/Port for the IPFS daemon.
IPFSNodeMultiaddress string `json:"ipfs_node_multiaddress"`
// Storage folder for snapshots, log store etc. Used by
// the Consensus component.
ConsensusDataFolder string `json:"consensus_data_folder"`
// Listen parameters for the the Cluster HTTP API component.
APIAddr string `json:"api_addr"`
APIPort int `json:"api_port"`
// Raft configuration
RaftConfig *hashiraft.Config `json:"raft_config"`
}
// Listen parameters for the IPFS Proxy. Used by the IPFS
// connector component.
IPFSAPIAddr string `json:"ipfs_api_addr"`
IPFSAPIPort int `json:"ipfs_api_port"`
func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) {
// Multiaddress String() may panic
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%s", r)
}
}()
pkeyBytes, err := cfg.PrivateKey.Bytes()
if err != nil {
return
}
pKey := base64.StdEncoding.EncodeToString(pkeyBytes)
// Host/Port for the IPFS daemon.
IPFSAddr string `json:"ipfs_addr"`
IPFSPort int `json:"ipfs_port"`
clusterPeers := make([]string, len(cfg.ClusterPeers), len(cfg.ClusterPeers))
for i := 0; i < len(cfg.ClusterPeers); i++ {
clusterPeers[i] = cfg.ClusterPeers[i].String()
}
j = &JSONConfig{
ID: cfg.ID.Pretty(),
PrivateKey: pKey,
ClusterPeers: clusterPeers,
ClusterListenMultiaddress: cfg.ClusterAddr.String(),
APIListenMultiaddress: cfg.APIAddr.String(),
IPFSProxyListenMultiaddress: cfg.IPFSProxyAddr.String(),
IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(),
ConsensusDataFolder: cfg.ConsensusDataFolder,
RaftConfig: cfg.RaftConfig,
}
return
}
func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
id, err := peer.IDB58Decode(jcfg.ID)
if err != nil {
return
}
pkb, err := base64.StdEncoding.DecodeString(jcfg.PrivateKey)
if err != nil {
return
}
pKey, err := crypto.UnmarshalPrivateKey(pkb)
if err != nil {
return
}
clusterPeers := make([]ma.Multiaddr, len(jcfg.ClusterPeers))
for i := 0; i < len(jcfg.ClusterPeers); i++ {
maddr, err := ma.NewMultiaddr(jcfg.ClusterPeers[i])
if err != nil {
return nil, err
}
clusterPeers[i] = maddr
}
clusterAddr, err := ma.NewMultiaddr(jcfg.ClusterListenMultiaddress)
if err != nil {
return
}
apiAddr, err := ma.NewMultiaddr(jcfg.APIListenMultiaddress)
if err != nil {
return
}
ipfsProxyAddr, err := ma.NewMultiaddr(jcfg.IPFSProxyListenMultiaddress)
if err != nil {
return
}
ipfsNodeAddr, err := ma.NewMultiaddr(jcfg.IPFSNodeMultiaddress)
if err != nil {
return
}
c = &Config{
ID: id,
PrivateKey: pKey,
ClusterPeers: clusterPeers,
ClusterAddr: clusterAddr,
APIAddr: apiAddr,
IPFSProxyAddr: ipfsProxyAddr,
IPFSNodeAddr: ipfsNodeAddr,
RaftConfig: jcfg.RaftConfig,
ConsensusDataFolder: jcfg.ConsensusDataFolder,
}
return
}
// LoadConfig reads a JSON configuration file from the given path,
// parses it and returns a new Config object.
func LoadConfig(path string) (*Config, error) {
config := &Config{}
jcfg := &JSONConfig{}
file, err := ioutil.ReadFile(path)
if err != nil {
logger.Error("error reading the configuration file: ", err)
return nil, err
}
json.Unmarshal(file, config)
return config, nil
err = json.Unmarshal(file, jcfg)
if err != nil {
logger.Error("error parsing JSON: ", err)
return nil, err
}
cfg, err := jcfg.ToConfig()
if err != nil {
logger.Error("error parsing configuration: ", err)
return nil, err
}
return cfg, nil
}
// Save stores a configuration as a JSON file in the given path.
func (c *Config) Save(path string) error {
jcfg, err := c.ToJSONConfig()
if err != nil {
logger.Error("error generating JSON config")
return err
}
json, err := json.MarshalIndent(jcfg, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(path, json, 0600)
return err
}
// NewDefaultConfig returns a default configuration object with a randomly
@ -83,34 +227,24 @@ func NewDefaultConfig() (*Config, error) {
if err != nil {
return nil, err
}
privBytes, err := priv.Bytes()
if err != nil {
return nil, err
}
b64priv := base64.StdEncoding.EncodeToString(privBytes)
raftCfg := hashiraft.DefaultConfig()
raftCfg.EnableSingleNode = true
clusterAddr, _ := ma.NewMultiaddr(DefaultClusterAddr)
apiAddr, _ := ma.NewMultiaddr(DefaultAPIAddr)
ipfsProxyAddr, _ := ma.NewMultiaddr(DefaultIPFSProxyAddr)
ipfsNodeAddr, _ := ma.NewMultiaddr(DefaultIPFSNodeAddr)
return &Config{
ID: peer.IDB58Encode(pid),
PrivateKey: b64priv,
ClusterPeers: []string{},
ClusterAddr: DefaultClusterAddr,
ClusterPort: DefaultClusterPort,
ID: pid,
PrivateKey: priv,
ClusterPeers: []ma.Multiaddr{},
ClusterAddr: clusterAddr,
APIAddr: apiAddr,
IPFSProxyAddr: ipfsProxyAddr,
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: "ipfscluster-data",
APIAddr: DefaultAPIAddr,
APIPort: DefaultAPIPort,
IPFSAPIAddr: DefaultIPFSAPIAddr,
IPFSAPIPort: DefaultIPFSAPIPort,
IPFSAddr: DefaultIPFSAddr,
IPFSPort: DefaultIPFSPort,
RaftConfig: raftCfg,
}, nil
}
// Save stores a configuration as a JSON file in the given path.
func (c *Config) Save(path string) error {
json, err := json.MarshalIndent(c, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(path, json, 0600)
return err
}

View File

@ -1,21 +1,16 @@
package ipfscluster
func testingConfig() *Config {
cfg := &Config{
jcfg := &JSONConfig{
ID: "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA",
PrivateKey: "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4",
ClusterAddr: "127.0.0.1",
ClusterPort: 10000,
ConsensusDataFolder: "./raftFolderFromTests",
APIAddr: "127.0.0.1",
APIPort: 10002,
IPFSAPIAddr: "127.0.0.1",
IPFSAPIPort: 10001,
ClusterListenMultiaddress: "/ip4/127.0.0.1/tcp/10000",
APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002",
IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001",
ConsensusDataFolder: "./raftFolderFromTests",
}
cfg, _ := jcfg.ToConfig()
return cfg
}

View File

@ -165,7 +165,7 @@ func TestConsensusLeader(t *testing.T) {
t.Fatal("No leader:", err)
}
if l.Pretty() != pID {
if l != pID {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}

View File

@ -63,6 +63,7 @@ var (
var (
initFlag bool
configFlag string
forceFlag bool
debugFlag bool
logLevelFlag string
versionFlag bool
@ -94,6 +95,8 @@ func init() {
"create a default configuration and exit")
flag.StringVar(&configFlag, "config", configPath,
"path to the ipfs-cluster-service configuration file")
flag.BoolVar(&forceFlag, "f", false,
"force configuration overwrite when running -init")
flag.BoolVar(&debugFlag, "debug", false,
"enable full debug logs of ipfs cluster and consensus layers")
flag.StringVar(&logLevelFlag, "loglevel", "info",
@ -108,7 +111,7 @@ func init() {
if versionFlag {
fmt.Println(ipfscluster.Version)
}
if initFlag {
if initFlag || flag.Arg(0) == "init" {
err := initConfig()
checkErr("creating configuration", err)
os.Exit(0)
@ -166,7 +169,7 @@ func setupDebug() {
func initConfig() error {
if _, err := os.Stat(configPath); err == nil {
return fmt.Errorf("%s exists. Try deleting it first", configPath)
return fmt.Errorf("%s exists. Try running with -f", configPath)
}
cfg, err := ipfscluster.NewDefaultConfig()
if err != nil {

View File

@ -9,12 +9,14 @@ import (
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
rpc "github.com/hsanjuan/go-libp2p-rpc"
cid "github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"
)
// IPFS Proxy settings
@ -64,8 +66,34 @@ type ipfsError struct {
// NewIPFSHTTPConnector creates the component and leaves it ready to be started
func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
ctx := context.Background()
destHost, err := cfg.IPFSNodeAddr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
destPortStr, err := cfg.IPFSNodeAddr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, err
}
destPort, err := strconv.Atoi(destPortStr)
if err != nil {
return nil, err
}
listenAddr, err := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
listenPortStr, err := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, err
}
listenPort, err := strconv.Atoi(listenPortStr)
if err != nil {
return nil, err
}
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.IPFSAPIAddr, cfg.IPFSAPIPort))
listenAddr, listenPort))
if err != nil {
return nil, err
}
@ -81,10 +109,10 @@ func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
ipfs := &IPFSHTTPConnector{
ctx: ctx,
destHost: cfg.IPFSAddr,
destPort: cfg.IPFSPort,
listenAddr: cfg.IPFSAPIAddr,
listenPort: cfg.IPFSAPIPort,
destHost: destHost,
destPort: destPort,
listenAddr: listenAddr,
listenPort: listenPort,
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
rpcReady: make(chan struct{}, 1),
listener: l,

View File

@ -6,12 +6,13 @@ import (
"testing"
cid "github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"
)
func testIPFSConnectorConfig(mock *ipfsMock) *Config {
cfg := testingConfig()
cfg.IPFSAddr = mock.addr
cfg.IPFSPort = mock.port
addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.addr, mock.port))
cfg.IPFSNodeAddr = addr
return cfg
}
@ -98,9 +99,11 @@ func TestIPFSProxy(t *testing.T) {
defer ipfs.Shutdown()
cfg := testingConfig()
res, err := http.Get(fmt.Sprintf("http://%s:%d/api/v0/add?arg=%s",
cfg.IPFSAPIAddr,
cfg.IPFSAPIPort,
host, _ := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_IP4)
port, _ := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_TCP)
res, err := http.Get(fmt.Sprintf("http://%s:%s/api/v0/add?arg=%s",
host,
port,
testCid))
if err != nil {
t.Fatal("should forward requests to ipfs host: ", err)

View File

@ -1,7 +1,6 @@
package ipfscluster
import (
"encoding/base64"
"fmt"
"math/rand"
"os"
@ -12,6 +11,7 @@ import (
cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
var (
@ -32,9 +32,9 @@ var (
nPins = 500
// ports
clusterPort = 20000
apiPort = 20500
ipfsAPIPort = 21000
clusterPort = 20000
apiPort = 20500
ipfsProxyPort = 21000
)
func init() {
@ -63,11 +63,11 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
cfgs := make([]*Config, 0, nClusters)
type peerInfo struct {
pid string
priv string
pid peer.ID
priv crypto.PrivKey
}
peers := make([]peerInfo, 0, nClusters)
clusterpeers := make([]string, 0, nClusters)
clusterpeers := make([]ma.Multiaddr, 0, nClusters)
// Generate keys and ids
for i := 0; i < nClusters; i++ {
@ -75,14 +75,11 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
checkErr(t, err)
pid, err := peer.IDFromPublicKey(pub)
checkErr(t, err)
privBytes, err := priv.Bytes()
checkErr(t, err)
b64priv := base64.StdEncoding.EncodeToString(privBytes)
ma := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s",
maddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s",
clusterPort+i,
pid.Pretty())
peers = append(peers, peerInfo{pid.Pretty(), b64priv})
clusterpeers = append(clusterpeers, ma)
pid.Pretty()))
peers = append(peers, peerInfo{pid, priv})
clusterpeers = append(clusterpeers, maddr)
//t.Log(ma)
}
@ -90,19 +87,20 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
for i := 0; i < nClusters; i++ {
mock := newIpfsMock()
ipfsMocks = append(ipfsMocks, mock)
clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i))
apiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", apiPort+i))
proxyAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsProxyPort+i))
nodeAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.addr, mock.port))
cfgs = append(cfgs, &Config{
ID: peers[i].pid,
PrivateKey: peers[i].priv,
ClusterPeers: clusterpeers,
ClusterAddr: "127.0.0.1",
ClusterPort: clusterPort + i,
ConsensusDataFolder: "./e2eTestRaft/" + peers[i].pid,
APIAddr: "127.0.0.1",
APIPort: apiPort + i,
IPFSAPIAddr: "127.0.0.1",
IPFSAPIPort: ipfsAPIPort + i,
IPFSAddr: mock.addr,
IPFSPort: mock.port,
ClusterAddr: clusterAddr,
APIAddr: apiAddr,
IPFSProxyAddr: proxyAddr,
IPFSNodeAddr: nodeAddr,
ConsensusDataFolder: "./e2eTestRaft/" + peers[i].pid.Pretty(),
})
}

View File

@ -40,16 +40,11 @@ type MapPinTracker struct {
func NewMapPinTracker(cfg *Config) *MapPinTracker {
ctx := context.Background()
pID, err := peer.IDB58Decode(cfg.ID)
if err != nil {
panic(err)
}
mpt := &MapPinTracker{
ctx: ctx,
status: make(map[string]PinInfo),
rpcReady: make(chan struct{}, 1),
peerID: pID,
peerID: cfg.ID,
shutdownCh: make(chan struct{}),
}
logger.Info("starting MapPinTracker")

View File

@ -49,8 +49,11 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp)
logger.Debug("creating OpLog")
cons := libp2praft.NewOpLog(state, op)
raftCfg := hashiraft.DefaultConfig()
raftCfg.EnableSingleNode = raftSingleMode
raftCfg := cfg.RaftConfig
if raftCfg == nil {
raftCfg = hashiraft.DefaultConfig()
raftCfg.EnableSingleNode = raftSingleMode
}
if SilentRaft {
raftCfg.LogOutput = ioutil.Discard
raftCfg.Logger = nil

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -13,6 +14,7 @@ import (
rpc "github.com/hsanjuan/go-libp2p-rpc"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
mux "github.com/gorilla/mux"
)
@ -89,9 +91,22 @@ type statusResp []statusCidResp
// started.
func NewRESTAPI(cfg *Config) (*RESTAPI, error) {
ctx := context.Background()
listenAddr, err := cfg.APIAddr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
listenPortStr, err := cfg.APIAddr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, err
}
listenPort, err := strconv.Atoi(listenPortStr)
if err != nil {
return nil, err
}
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.APIAddr,
cfg.APIPort))
listenAddr, listenPort))
if err != nil {
return nil, err
}
@ -107,8 +122,8 @@ func NewRESTAPI(cfg *Config) (*RESTAPI, error) {
api := &RESTAPI{
ctx: ctx,
listenAddr: cfg.APIAddr,
listenPort: cfg.APIPort,
listenAddr: listenAddr,
listenPort: listenPort,
listener: l,
server: s,
rpcReady: make(chan struct{}, 1),