From d1731ebd2827336c1eee42fef0e7d4b4897fbef8 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 23 Jan 2017 18:38:59 +0100 Subject: [PATCH 1/8] Use multiaddresses in the configuration and rename JSON entries for clarity License: MIT Signed-off-by: Hector Sanjuan --- cluster.go | 64 +++------- cluster_test.go | 2 +- config.go | 234 +++++++++++++++++++++++++++-------- config_test.go | 17 +-- consensus_test.go | 2 +- ipfs-cluster-service/main.go | 7 +- ipfs_http_connector.go | 38 +++++- ipfs_http_connector_test.go | 13 +- ipfscluster_test.go | 44 ++++--- map_pin_tracker.go | 7 +- raft.go | 7 +- rest_api.go | 23 +++- 12 files changed, 299 insertions(+), 159 deletions(-) diff --git a/cluster.go b/cluster.go index 5d0495fa..fe15d677 100644 --- a/cluster.go +++ b/cluster.go @@ -2,15 +2,11 @@ package ipfscluster import ( "context" - "encoding/base64" "errors" - "fmt" - "strings" "sync" rpc "github.com/hsanjuan/go-libp2p-rpc" cid "github.com/ipfs/go-cid" - crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" peerstore "github.com/libp2p/go-libp2p-peerstore" @@ -59,11 +55,6 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P return nil, err } - tracker.SetClient(rpcClient) - ipfs.SetClient(rpcClient) - api.SetClient(rpcClient) - consensus.SetClient(rpcClient) - cluster := &Cluster{ ctx: ctx, config: cfg, @@ -85,6 +76,13 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P return nil, err } + defer func() { + tracker.SetClient(rpcClient) + ipfs.SetClient(rpcClient) + api.SetClient(rpcClient) + consensus.SetClient(rpcClient) + }() + logger.Infof("starting IPFS Cluster v%s", Version) cluster.run() @@ -345,57 +343,25 @@ func (c *Cluster) run() { // makeHost makes a libp2p-host func makeHost(ctx context.Context, cfg *Config) (host.Host, error) { ps := peerstore.NewPeerstore() - peerID, err := peer.IDB58Decode(cfg.ID) - if err != nil { - logger.Error("decoding ID: ", err) - return nil, err - } - - pkb, err := base64.StdEncoding.DecodeString(cfg.PrivateKey) - if err != nil { - logger.Error("decoding private key base64: ", err) - return nil, err - } - - privateKey, err := crypto.UnmarshalPrivateKey(pkb) - if err != nil { - logger.Error("unmarshaling private key", err) - return nil, err - } - + privateKey := cfg.PrivateKey publicKey := privateKey.GetPublic() - addr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", - cfg.ClusterAddr, cfg.ClusterPort)) - if err != nil { + if err := ps.AddPubKey(cfg.ID, publicKey); err != nil { return nil, err } - if err := ps.AddPubKey(peerID, publicKey); err != nil { + if err := ps.AddPrivKey(cfg.ID, privateKey); err != nil { return nil, err } - if err := ps.AddPrivKey(peerID, privateKey); err != nil { - return nil, err - } - - for _, cpeer := range cfg.ClusterPeers { - addr, err := multiaddr.NewMultiaddr(cpeer) - if err != nil { - logger.Errorf("parsing cluster peer multiaddress %s: %s", cpeer, err) - return nil, err - } - + for _, addr := range cfg.ClusterPeers { pid, err := addr.ValueForProtocol(multiaddr.P_IPFS) if err != nil { return nil, err } - strAddr := strings.Split(addr.String(), "/ipfs/")[0] - maddr, err := multiaddr.NewMultiaddr(strAddr) - if err != nil { - return nil, err - } + ipfs, _ := multiaddr.NewMultiaddr("/ipfs/" + pid) + maddr := addr.Decapsulate(ipfs) peerID, err := peer.IDB58Decode(pid) if err != nil { @@ -410,8 +376,8 @@ func makeHost(ctx context.Context, cfg *Config) (host.Host, error) { network, err := swarm.NewNetwork( ctx, - []multiaddr.Multiaddr{addr}, - peerID, + []multiaddr.Multiaddr{cfg.ClusterAddr}, + cfg.ID, ps, nil, ) diff --git a/cluster_test.go b/cluster_test.go index 84ca2a85..53c347bc 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -160,7 +160,7 @@ func TestClusterMembers(t *testing.T) { defer cl.Shutdown() m := cl.Members() id := testingConfig().ID - if len(m) != 1 || m[0].Pretty() != id { + if len(m) != 1 || m[0] != id { t.Error("bad Members()") } } diff --git a/config.go b/config.go index 685e5fc5..43053e64 100644 --- a/config.go +++ b/config.go @@ -3,30 +3,62 @@ package ipfscluster import ( "encoding/base64" "encoding/json" + "fmt" "io/ioutil" crypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" + ma "github.com/multiformats/go-multiaddr" + + hashiraft "github.com/hashicorp/raft" ) // Default parameters for the configuration const ( DefaultConfigCrypto = crypto.RSA DefaultConfigKeyLength = 2048 - DefaultAPIAddr = "127.0.0.1" - DefaultAPIPort = 9094 - DefaultIPFSAPIAddr = "127.0.0.1" - DefaultIPFSAPIPort = 9095 - DefaultIPFSAddr = "127.0.0.1" - DefaultIPFSPort = 5001 - DefaultClusterAddr = "0.0.0.0" - DefaultClusterPort = 9096 + DefaultAPIAddr = "/ip4/127.0.0.1/tcp/9094" + DefaultIPFSProxyAddr = "/ip4/127.0.0.1/tcp/9095" + DefaultIPFSNodeAddr = "/ip4/127.0.0.1/tcp/5001" + DefaultClusterAddr = "/ip4/0.0.0.0/tcp/9096" ) // Config represents an ipfs-cluster configuration which can be // saved and loaded from disk. Currently it holds configuration // values used by all components. type Config struct { + // Libp2p ID and private key for Cluster communication (including) + // the Consensus component. + ID peer.ID + PrivateKey crypto.PrivKey + + // List of multiaddresses of the peers of this cluster. + ClusterPeers []ma.Multiaddr + + // Listen parameters for the Cluster libp2p Host. Used by + // the RPC and Consensus components. + ClusterAddr ma.Multiaddr + + // Listen parameters for the the Cluster HTTP API component. + APIAddr ma.Multiaddr + + // Listen parameters for the IPFS Proxy. Used by the IPFS + // connector component. + IPFSProxyAddr ma.Multiaddr + + // Host/Port for the IPFS daemon. + IPFSNodeAddr ma.Multiaddr + + // Storage folder for snapshots, log store etc. Used by + // the Consensus component. + ConsensusDataFolder string + + // Hashicorp's Raft configuration + RaftConfig *hashiraft.Config +} + +// This is how a config actually look in JSON +type JSONConfig struct { // Libp2p ID and private key for Cluster communication (including) // the Consensus component. ID string `json:"id"` @@ -36,38 +68,150 @@ type Config struct { ClusterPeers []string `json:"cluster_peers"` // Listen parameters for the Cluster libp2p Host. Used by - // the Remote RPC and Consensus components. - ClusterAddr string `json:"cluster_addr"` - ClusterPort int `json:"cluster_port"` + // the RPC and Consensus components. + ClusterListenMultiaddress string `json:"cluster_multiaddress"` + + // Listen parameters for the the Cluster HTTP API component. + APIListenMultiaddress string `json:"api_multiaddress` + + // Listen parameters for the IPFS Proxy. Used by the IPFS + // connector component. + IPFSProxyListenMultiaddress string `json:ipfs_proxy_api_multiaddress` + + // Host/Port for the IPFS daemon. + IPFSNodeMultiaddress string `json:"ipfs_node_multiaddress"` // Storage folder for snapshots, log store etc. Used by // the Consensus component. ConsensusDataFolder string `json:"consensus_data_folder"` - // Listen parameters for the the Cluster HTTP API component. - APIAddr string `json:"api_addr"` - APIPort int `json:"api_port"` + // Raft configuration + RaftConfig *hashiraft.Config `json:"raft_config"` +} - // Listen parameters for the IPFS Proxy. Used by the IPFS - // connector component. - IPFSAPIAddr string `json:"ipfs_api_addr"` - IPFSAPIPort int `json:"ipfs_api_port"` +func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { + // Multiaddress String() may panic + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("%s", r) + } + }() + pkeyBytes, err := cfg.PrivateKey.Bytes() + if err != nil { + return + } + pKey := base64.StdEncoding.EncodeToString(pkeyBytes) - // Host/Port for the IPFS daemon. - IPFSAddr string `json:"ipfs_addr"` - IPFSPort int `json:"ipfs_port"` + clusterPeers := make([]string, len(cfg.ClusterPeers), len(cfg.ClusterPeers)) + for i := 0; i < len(cfg.ClusterPeers); i++ { + clusterPeers[i] = cfg.ClusterPeers[i].String() + } + + j = &JSONConfig{ + ID: cfg.ID.Pretty(), + PrivateKey: pKey, + ClusterPeers: clusterPeers, + ClusterListenMultiaddress: cfg.ClusterAddr.String(), + APIListenMultiaddress: cfg.APIAddr.String(), + IPFSProxyListenMultiaddress: cfg.IPFSProxyAddr.String(), + IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(), + ConsensusDataFolder: cfg.ConsensusDataFolder, + RaftConfig: cfg.RaftConfig, + } + return +} + +func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { + id, err := peer.IDB58Decode(jcfg.ID) + if err != nil { + return + } + + pkb, err := base64.StdEncoding.DecodeString(jcfg.PrivateKey) + if err != nil { + return + } + pKey, err := crypto.UnmarshalPrivateKey(pkb) + if err != nil { + return + } + + clusterPeers := make([]ma.Multiaddr, len(jcfg.ClusterPeers)) + for i := 0; i < len(jcfg.ClusterPeers); i++ { + maddr, err := ma.NewMultiaddr(jcfg.ClusterPeers[i]) + if err != nil { + return nil, err + } + clusterPeers[i] = maddr + } + + clusterAddr, err := ma.NewMultiaddr(jcfg.ClusterListenMultiaddress) + if err != nil { + return + } + + apiAddr, err := ma.NewMultiaddr(jcfg.APIListenMultiaddress) + if err != nil { + return + } + ipfsProxyAddr, err := ma.NewMultiaddr(jcfg.IPFSProxyListenMultiaddress) + if err != nil { + return + } + ipfsNodeAddr, err := ma.NewMultiaddr(jcfg.IPFSNodeMultiaddress) + if err != nil { + return + } + + c = &Config{ + ID: id, + PrivateKey: pKey, + ClusterPeers: clusterPeers, + ClusterAddr: clusterAddr, + APIAddr: apiAddr, + IPFSProxyAddr: ipfsProxyAddr, + IPFSNodeAddr: ipfsNodeAddr, + RaftConfig: jcfg.RaftConfig, + ConsensusDataFolder: jcfg.ConsensusDataFolder, + } + return } // LoadConfig reads a JSON configuration file from the given path, // parses it and returns a new Config object. func LoadConfig(path string) (*Config, error) { - config := &Config{} + jcfg := &JSONConfig{} file, err := ioutil.ReadFile(path) if err != nil { + logger.Error("error reading the configuration file: ", err) return nil, err } - json.Unmarshal(file, config) - return config, nil + err = json.Unmarshal(file, jcfg) + if err != nil { + logger.Error("error parsing JSON: ", err) + return nil, err + } + cfg, err := jcfg.ToConfig() + if err != nil { + logger.Error("error parsing configuration: ", err) + return nil, err + } + return cfg, nil +} + +// Save stores a configuration as a JSON file in the given path. +func (c *Config) Save(path string) error { + jcfg, err := c.ToJSONConfig() + if err != nil { + logger.Error("error generating JSON config") + return err + } + json, err := json.MarshalIndent(jcfg, "", " ") + if err != nil { + return err + } + err = ioutil.WriteFile(path, json, 0600) + return err } // NewDefaultConfig returns a default configuration object with a randomly @@ -83,34 +227,24 @@ func NewDefaultConfig() (*Config, error) { if err != nil { return nil, err } - privBytes, err := priv.Bytes() - if err != nil { - return nil, err - } - b64priv := base64.StdEncoding.EncodeToString(privBytes) + + raftCfg := hashiraft.DefaultConfig() + raftCfg.EnableSingleNode = true + + clusterAddr, _ := ma.NewMultiaddr(DefaultClusterAddr) + apiAddr, _ := ma.NewMultiaddr(DefaultAPIAddr) + ipfsProxyAddr, _ := ma.NewMultiaddr(DefaultIPFSProxyAddr) + ipfsNodeAddr, _ := ma.NewMultiaddr(DefaultIPFSNodeAddr) return &Config{ - ID: peer.IDB58Encode(pid), - PrivateKey: b64priv, - ClusterPeers: []string{}, - ClusterAddr: DefaultClusterAddr, - ClusterPort: DefaultClusterPort, + ID: pid, + PrivateKey: priv, + ClusterPeers: []ma.Multiaddr{}, + ClusterAddr: clusterAddr, + APIAddr: apiAddr, + IPFSProxyAddr: ipfsProxyAddr, + IPFSNodeAddr: ipfsNodeAddr, ConsensusDataFolder: "ipfscluster-data", - APIAddr: DefaultAPIAddr, - APIPort: DefaultAPIPort, - IPFSAPIAddr: DefaultIPFSAPIAddr, - IPFSAPIPort: DefaultIPFSAPIPort, - IPFSAddr: DefaultIPFSAddr, - IPFSPort: DefaultIPFSPort, + RaftConfig: raftCfg, }, nil } - -// Save stores a configuration as a JSON file in the given path. -func (c *Config) Save(path string) error { - json, err := json.MarshalIndent(c, "", " ") - if err != nil { - return err - } - err = ioutil.WriteFile(path, json, 0600) - return err -} diff --git a/config_test.go b/config_test.go index 21e34a15..00b1dac9 100644 --- a/config_test.go +++ b/config_test.go @@ -1,21 +1,16 @@ package ipfscluster func testingConfig() *Config { - cfg := &Config{ + jcfg := &JSONConfig{ ID: "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA", PrivateKey: "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4", - ClusterAddr: "127.0.0.1", - ClusterPort: 10000, - - ConsensusDataFolder: "./raftFolderFromTests", - - APIAddr: "127.0.0.1", - APIPort: 10002, - - IPFSAPIAddr: "127.0.0.1", - IPFSAPIPort: 10001, + ClusterListenMultiaddress: "/ip4/127.0.0.1/tcp/10000", + APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002", + IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001", + ConsensusDataFolder: "./raftFolderFromTests", } + cfg, _ := jcfg.ToConfig() return cfg } diff --git a/consensus_test.go b/consensus_test.go index 72a59920..2bda4fce 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -165,7 +165,7 @@ func TestConsensusLeader(t *testing.T) { t.Fatal("No leader:", err) } - if l.Pretty() != pID { + if l != pID { t.Errorf("expected %s but the leader appears as %s", pID, l) } } diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index defd7d32..5398eff9 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -63,6 +63,7 @@ var ( var ( initFlag bool configFlag string + forceFlag bool debugFlag bool logLevelFlag string versionFlag bool @@ -94,6 +95,8 @@ func init() { "create a default configuration and exit") flag.StringVar(&configFlag, "config", configPath, "path to the ipfs-cluster-service configuration file") + flag.BoolVar(&forceFlag, "f", false, + "force configuration overwrite when running -init") flag.BoolVar(&debugFlag, "debug", false, "enable full debug logs of ipfs cluster and consensus layers") flag.StringVar(&logLevelFlag, "loglevel", "info", @@ -108,7 +111,7 @@ func init() { if versionFlag { fmt.Println(ipfscluster.Version) } - if initFlag { + if initFlag || flag.Arg(0) == "init" { err := initConfig() checkErr("creating configuration", err) os.Exit(0) @@ -166,7 +169,7 @@ func setupDebug() { func initConfig() error { if _, err := os.Stat(configPath); err == nil { - return fmt.Errorf("%s exists. Try deleting it first", configPath) + return fmt.Errorf("%s exists. Try running with -f", configPath) } cfg, err := ipfscluster.NewDefaultConfig() if err != nil { diff --git a/ipfs_http_connector.go b/ipfs_http_connector.go index 36b3006e..74d0479f 100644 --- a/ipfs_http_connector.go +++ b/ipfs_http_connector.go @@ -9,12 +9,14 @@ import ( "io/ioutil" "net" "net/http" + "strconv" "strings" "sync" "time" rpc "github.com/hsanjuan/go-libp2p-rpc" cid "github.com/ipfs/go-cid" + ma "github.com/multiformats/go-multiaddr" ) // IPFS Proxy settings @@ -64,8 +66,34 @@ type ipfsError struct { // NewIPFSHTTPConnector creates the component and leaves it ready to be started func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) { ctx := context.Background() + destHost, err := cfg.IPFSNodeAddr.ValueForProtocol(ma.P_IP4) + if err != nil { + return nil, err + } + destPortStr, err := cfg.IPFSNodeAddr.ValueForProtocol(ma.P_TCP) + if err != nil { + return nil, err + } + destPort, err := strconv.Atoi(destPortStr) + if err != nil { + return nil, err + } + + listenAddr, err := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_IP4) + if err != nil { + return nil, err + } + listenPortStr, err := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_TCP) + if err != nil { + return nil, err + } + listenPort, err := strconv.Atoi(listenPortStr) + if err != nil { + return nil, err + } + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", - cfg.IPFSAPIAddr, cfg.IPFSAPIPort)) + listenAddr, listenPort)) if err != nil { return nil, err } @@ -81,10 +109,10 @@ func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) { ipfs := &IPFSHTTPConnector{ ctx: ctx, - destHost: cfg.IPFSAddr, - destPort: cfg.IPFSPort, - listenAddr: cfg.IPFSAPIAddr, - listenPort: cfg.IPFSAPIPort, + destHost: destHost, + destPort: destPort, + listenAddr: listenAddr, + listenPort: listenPort, handlers: make(map[string]func(http.ResponseWriter, *http.Request)), rpcReady: make(chan struct{}, 1), listener: l, diff --git a/ipfs_http_connector_test.go b/ipfs_http_connector_test.go index 94b27bcb..798aed85 100644 --- a/ipfs_http_connector_test.go +++ b/ipfs_http_connector_test.go @@ -6,12 +6,13 @@ import ( "testing" cid "github.com/ipfs/go-cid" + ma "github.com/multiformats/go-multiaddr" ) func testIPFSConnectorConfig(mock *ipfsMock) *Config { cfg := testingConfig() - cfg.IPFSAddr = mock.addr - cfg.IPFSPort = mock.port + addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.addr, mock.port)) + cfg.IPFSNodeAddr = addr return cfg } @@ -98,9 +99,11 @@ func TestIPFSProxy(t *testing.T) { defer ipfs.Shutdown() cfg := testingConfig() - res, err := http.Get(fmt.Sprintf("http://%s:%d/api/v0/add?arg=%s", - cfg.IPFSAPIAddr, - cfg.IPFSAPIPort, + host, _ := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_IP4) + port, _ := cfg.IPFSProxyAddr.ValueForProtocol(ma.P_TCP) + res, err := http.Get(fmt.Sprintf("http://%s:%s/api/v0/add?arg=%s", + host, + port, testCid)) if err != nil { t.Fatal("should forward requests to ipfs host: ", err) diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 17a61695..882865d2 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -1,7 +1,6 @@ package ipfscluster import ( - "encoding/base64" "fmt" "math/rand" "os" @@ -12,6 +11,7 @@ import ( cid "github.com/ipfs/go-cid" crypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" + ma "github.com/multiformats/go-multiaddr" ) var ( @@ -32,9 +32,9 @@ var ( nPins = 500 // ports - clusterPort = 20000 - apiPort = 20500 - ipfsAPIPort = 21000 + clusterPort = 20000 + apiPort = 20500 + ipfsProxyPort = 21000 ) func init() { @@ -63,11 +63,11 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { cfgs := make([]*Config, 0, nClusters) type peerInfo struct { - pid string - priv string + pid peer.ID + priv crypto.PrivKey } peers := make([]peerInfo, 0, nClusters) - clusterpeers := make([]string, 0, nClusters) + clusterpeers := make([]ma.Multiaddr, 0, nClusters) // Generate keys and ids for i := 0; i < nClusters; i++ { @@ -75,14 +75,11 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { checkErr(t, err) pid, err := peer.IDFromPublicKey(pub) checkErr(t, err) - privBytes, err := priv.Bytes() - checkErr(t, err) - b64priv := base64.StdEncoding.EncodeToString(privBytes) - ma := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s", + maddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s", clusterPort+i, - pid.Pretty()) - peers = append(peers, peerInfo{pid.Pretty(), b64priv}) - clusterpeers = append(clusterpeers, ma) + pid.Pretty())) + peers = append(peers, peerInfo{pid, priv}) + clusterpeers = append(clusterpeers, maddr) //t.Log(ma) } @@ -90,19 +87,20 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { for i := 0; i < nClusters; i++ { mock := newIpfsMock() ipfsMocks = append(ipfsMocks, mock) + clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i)) + apiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", apiPort+i)) + proxyAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsProxyPort+i)) + nodeAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.addr, mock.port)) + cfgs = append(cfgs, &Config{ ID: peers[i].pid, PrivateKey: peers[i].priv, ClusterPeers: clusterpeers, - ClusterAddr: "127.0.0.1", - ClusterPort: clusterPort + i, - ConsensusDataFolder: "./e2eTestRaft/" + peers[i].pid, - APIAddr: "127.0.0.1", - APIPort: apiPort + i, - IPFSAPIAddr: "127.0.0.1", - IPFSAPIPort: ipfsAPIPort + i, - IPFSAddr: mock.addr, - IPFSPort: mock.port, + ClusterAddr: clusterAddr, + APIAddr: apiAddr, + IPFSProxyAddr: proxyAddr, + IPFSNodeAddr: nodeAddr, + ConsensusDataFolder: "./e2eTestRaft/" + peers[i].pid.Pretty(), }) } diff --git a/map_pin_tracker.go b/map_pin_tracker.go index 9e593949..f39b538e 100644 --- a/map_pin_tracker.go +++ b/map_pin_tracker.go @@ -40,16 +40,11 @@ type MapPinTracker struct { func NewMapPinTracker(cfg *Config) *MapPinTracker { ctx := context.Background() - pID, err := peer.IDB58Decode(cfg.ID) - if err != nil { - panic(err) - } - mpt := &MapPinTracker{ ctx: ctx, status: make(map[string]PinInfo), rpcReady: make(chan struct{}, 1), - peerID: pID, + peerID: cfg.ID, shutdownCh: make(chan struct{}), } logger.Info("starting MapPinTracker") diff --git a/raft.go b/raft.go index 2fa41e0f..dfcf96a5 100644 --- a/raft.go +++ b/raft.go @@ -49,8 +49,11 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) logger.Debug("creating OpLog") cons := libp2praft.NewOpLog(state, op) - raftCfg := hashiraft.DefaultConfig() - raftCfg.EnableSingleNode = raftSingleMode + raftCfg := cfg.RaftConfig + if raftCfg == nil { + raftCfg = hashiraft.DefaultConfig() + raftCfg.EnableSingleNode = raftSingleMode + } if SilentRaft { raftCfg.LogOutput = ioutil.Discard raftCfg.Logger = nil diff --git a/rest_api.go b/rest_api.go index 2c4b664d..2ecf3f0f 100644 --- a/rest_api.go +++ b/rest_api.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + "strconv" "strings" "sync" "time" @@ -13,6 +14,7 @@ import ( rpc "github.com/hsanjuan/go-libp2p-rpc" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-peer" + ma "github.com/multiformats/go-multiaddr" mux "github.com/gorilla/mux" ) @@ -89,9 +91,22 @@ type statusResp []statusCidResp // started. func NewRESTAPI(cfg *Config) (*RESTAPI, error) { ctx := context.Background() + + listenAddr, err := cfg.APIAddr.ValueForProtocol(ma.P_IP4) + if err != nil { + return nil, err + } + listenPortStr, err := cfg.APIAddr.ValueForProtocol(ma.P_TCP) + if err != nil { + return nil, err + } + listenPort, err := strconv.Atoi(listenPortStr) + if err != nil { + return nil, err + } + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", - cfg.APIAddr, - cfg.APIPort)) + listenAddr, listenPort)) if err != nil { return nil, err } @@ -107,8 +122,8 @@ func NewRESTAPI(cfg *Config) (*RESTAPI, error) { api := &RESTAPI{ ctx: ctx, - listenAddr: cfg.APIAddr, - listenPort: cfg.APIPort, + listenAddr: listenAddr, + listenPort: listenPort, listener: l, server: s, rpcReady: make(chan struct{}, 1), From 74c494eb1edda7db569fa5da8aaef3b3d7c49acc Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 23 Jan 2017 19:06:00 +0100 Subject: [PATCH 2/8] Fix configuration generation. Add a custom RaftConfig section which is limited to whatever values we want to leave to the user. Make sure the consensus data is, by default, next to the service.json file. License: MIT Signed-off-by: Hector Sanjuan --- config.go | 23 ++++++++++++++++++----- ipfs-cluster-service/main.go | 16 ++++++++++------ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/config.go b/config.go index 43053e64..4dc9acab 100644 --- a/config.go +++ b/config.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "time" crypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" @@ -72,11 +73,11 @@ type JSONConfig struct { ClusterListenMultiaddress string `json:"cluster_multiaddress"` // Listen parameters for the the Cluster HTTP API component. - APIListenMultiaddress string `json:"api_multiaddress` + APIListenMultiaddress string `json:"api_listen_multiaddress"` // Listen parameters for the IPFS Proxy. Used by the IPFS // connector component. - IPFSProxyListenMultiaddress string `json:ipfs_proxy_api_multiaddress` + IPFSProxyListenMultiaddress string `json:"ipfs_proxy_listen_multiaddress"` // Host/Port for the IPFS daemon. IPFSNodeMultiaddress string `json:"ipfs_node_multiaddress"` @@ -86,7 +87,12 @@ type JSONConfig struct { ConsensusDataFolder string `json:"consensus_data_folder"` // Raft configuration - RaftConfig *hashiraft.Config `json:"raft_config"` + RaftConfig *RaftConfig `json:"raft_config"` +} + +type RaftConfig struct { + SnapshotIntervalSeconds int `json:snapshot_interval_seconds` + EnableSingleNode bool `json:enable_single_node` } func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { @@ -116,7 +122,10 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { IPFSProxyListenMultiaddress: cfg.IPFSProxyAddr.String(), IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(), ConsensusDataFolder: cfg.ConsensusDataFolder, - RaftConfig: cfg.RaftConfig, + RaftConfig: &RaftConfig{ + SnapshotIntervalSeconds: int(cfg.RaftConfig.SnapshotInterval / time.Second), + EnableSingleNode: cfg.RaftConfig.EnableSingleNode, + }, } return } @@ -163,6 +172,10 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { return } + raftCfg := hashiraft.DefaultConfig() + raftCfg.SnapshotInterval = time.Duration(jcfg.RaftConfig.SnapshotIntervalSeconds) * time.Second + raftCfg.EnableSingleNode = jcfg.RaftConfig.EnableSingleNode + c = &Config{ ID: id, PrivateKey: pKey, @@ -171,7 +184,7 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { APIAddr: apiAddr, IPFSProxyAddr: ipfsProxyAddr, IPFSNodeAddr: ipfsNodeAddr, - RaftConfig: jcfg.RaftConfig, + RaftConfig: raftCfg, ConsensusDataFolder: jcfg.ConsensusDataFolder, } return diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 5398eff9..895b5119 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -81,8 +81,6 @@ func init() { usr.HomeDir, ".ipfs-cluster") } - configPath = filepath.Join(DefaultPath, DefaultConfigFile) - dataPath = filepath.Join(DefaultPath, DefaultDataFolder) flag.Usage = func() { out("Usage: %s [options]\n", programName) @@ -93,8 +91,8 @@ func init() { } flag.BoolVar(&initFlag, "init", false, "create a default configuration and exit") - flag.StringVar(&configFlag, "config", configPath, - "path to the ipfs-cluster-service configuration file") + flag.StringVar(&configFlag, "config", DefaultPath, + "path to the ipfs-cluster-service configuration and data folder") flag.BoolVar(&forceFlag, "f", false, "force configuration overwrite when running -init") flag.BoolVar(&debugFlag, "debug", false, @@ -104,7 +102,13 @@ func init() { flag.BoolVar(&versionFlag, "version", false, fmt.Sprintf("display %s version", programName)) flag.Parse() - configPath = configFlag + + absPath, err := filepath.Abs(configFlag) + if err != nil { + panic("error expading " + configFlag) + } + configPath = filepath.Join(absPath, DefaultConfigFile) + dataPath = filepath.Join(absPath, DefaultDataFolder) setupLogging() setupDebug() @@ -168,7 +172,7 @@ func setupDebug() { } func initConfig() error { - if _, err := os.Stat(configPath); err == nil { + if _, err := os.Stat(configPath); err == nil && !forceFlag { return fmt.Errorf("%s exists. Try running with -f", configPath) } cfg, err := ipfscluster.NewDefaultConfig() From 84a7fa663ded100ec33b431a5afce577b92e15c8 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 23 Jan 2017 20:29:05 +0100 Subject: [PATCH 3/8] Workaround tests failing randomly Tracked down reason to: https://github.com/libp2p/go-libp2p-swarm/issues/15 License: MIT Signed-off-by: Hector Sanjuan --- cluster.go | 30 ++++++++++++++++++++++++++++-- ipfscluster.go | 3 +++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/cluster.go b/cluster.go index fe15d677..62e955c1 100644 --- a/cluster.go +++ b/cluster.go @@ -3,7 +3,9 @@ package ipfscluster import ( "context" "errors" + "math/rand" "sync" + "time" rpc "github.com/hsanjuan/go-libp2p-rpc" cid "github.com/ipfs/go-cid" @@ -76,6 +78,9 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P return nil, err } + // Workaround for https://github.com/libp2p/go-libp2p-swarm/issues/15 + cluster.openConns() + defer func() { tracker.SetClient(rpcClient) ipfs.SetClient(rpcClient) @@ -434,7 +439,7 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er var errorMsgs string for i, r := range replies { if e := errs[i]; e != nil { - logger.Error(e) + logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e) errorMsgs += e.Error() + "\n" } pin.Status[r.Peer] = r @@ -479,7 +484,7 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) { var errorMsgs string for i, r := range replies { if e := errs[i]; e != nil { - logger.Error("error in broadcast response: ", e) + logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e) errorMsgs += e.Error() + "\n" } mergePins(r) @@ -494,3 +499,24 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) { } return infos, errors.New(errorMsgs) } + +// openConns is a workaround for +// https://github.com/libp2p/go-libp2p-swarm/issues/15 +// It runs when consensus is initialized so we can assume +// that the cluster is more or less up. +// It should open connections for peers where they haven't +// yet been opened. By randomly sleeping we reduce the +// chance that members will open 2 connections simultaneously. +func (c *Cluster) openConns() { + rand.Seed(time.Now().UnixNano()) + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + peers := c.host.Peerstore().Peers() + for _, p := range peers { + peerInfo := c.host.Peerstore().PeerInfo(p) + if p == c.host.ID() { + continue // do not connect to ourselves + } + // ignore any errors here + c.host.Connect(c.ctx, peerInfo) + } +} diff --git a/ipfscluster.go b/ipfscluster.go index d9c0469f..5c422fce 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -41,6 +41,9 @@ func SetLogLevel(l string) { DEBUG */ logging.SetLogLevel("cluster", l) + //logging.SetLogLevel("libp2p-rpc", l) + //logging.SetLogLevel("swarm2", l) + //logging.SetLogLevel("libp2p-raft", l) } // IPFSStatus values From 79545568487dcbade4e489f8b3466a5f5bdaa169 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 23 Jan 2017 20:39:09 +0100 Subject: [PATCH 4/8] Improve error messages with fishy configurations License: MIT Signed-off-by: Hector Sanjuan --- config.go | 9 +++++++++ ipfs-cluster-service/main.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/config.go b/config.go index 4dc9acab..b5e6b5a5 100644 --- a/config.go +++ b/config.go @@ -133,15 +133,18 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { id, err := peer.IDB58Decode(jcfg.ID) if err != nil { + err = fmt.Errorf("error decoding cluster ID: %s", err) return } pkb, err := base64.StdEncoding.DecodeString(jcfg.PrivateKey) if err != nil { + err = fmt.Errorf("error decoding private_key: %s", err) return } pKey, err := crypto.UnmarshalPrivateKey(pkb) if err != nil { + err = fmt.Errorf("error parsing private_key ID: %s", err) return } @@ -149,6 +152,8 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { for i := 0; i < len(jcfg.ClusterPeers); i++ { maddr, err := ma.NewMultiaddr(jcfg.ClusterPeers[i]) if err != nil { + err = fmt.Errorf("error parsing multiaddress for peer %s: %s", + jcfg.ClusterPeers[i], err) return nil, err } clusterPeers[i] = maddr @@ -156,19 +161,23 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { clusterAddr, err := ma.NewMultiaddr(jcfg.ClusterListenMultiaddress) if err != nil { + err = fmt.Errorf("error parsing cluster_listen_multiaddress: %s", err) return } apiAddr, err := ma.NewMultiaddr(jcfg.APIListenMultiaddress) if err != nil { + err = fmt.Errorf("error parsing api_listen_multiaddress: %s", err) return } ipfsProxyAddr, err := ma.NewMultiaddr(jcfg.IPFSProxyListenMultiaddress) if err != nil { + err = fmt.Errorf("error parsing ipfs_proxy_listen_multiaddress: %s", err) return } ipfsNodeAddr, err := ma.NewMultiaddr(jcfg.IPFSNodeMultiaddress) if err != nil { + err = fmt.Errorf("error parsing ipfs_node_multiaddress: %s", err) return } diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 895b5119..0bbd3442 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -132,7 +132,7 @@ func main() { signal.Notify(signalChan, os.Interrupt) cfg, err := loadConfig() - checkErr("loading configuration", err) + checkErr("error loading configuration", err) api, err := ipfscluster.NewRESTAPI(cfg) checkErr("creating REST API component", err) proxy, err := ipfscluster.NewIPFSHTTPConnector(cfg) From d987073201ff858282a8ac2f57b96624637f1c1b Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 23 Jan 2017 20:56:35 +0100 Subject: [PATCH 5/8] Improve docs for the new configuration formats License: MIT Signed-off-by: Hector Sanjuan --- README.md | 22 +++++++++++----------- config.go | 43 ++++++++++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 8a9e7e73..fc044ac3 100644 --- a/README.md +++ b/README.md @@ -88,20 +88,20 @@ You can add the multiaddresses for the other members of the cluster in the `clus "/ip4/192.168.1.3/tcp/9096/ipfs/QmdFBMf9HMDH3eCWrc1U11YCPenC3Uvy9mZQ2BedTyKTDf", "/ip4/192.168.1.4/tcp/9096/ipfs/QmYY1ggjoew5eFrvkenTR3F4uWqtkBkmgfJk8g9Qqcwy51", "/ip4/192.168.1.5/tcp/9096/ipfs/QmSGCzHkz8gC9fNndMtaCZdf9RFtwtbTEEsGo4zkVfcykD" - ], - "cluster_addr": "0.0.0.0", - "cluster_port": 9096, - "consensus_data_folder": "/home/user/.ipfs-cluster/data", - "api_addr": "0.0.0.0", - "api_port": 9094, - "ipfs_api_addr": "0.0.0.0", - "ipfs_api_port": 9095, - "ipfs_addr": "127.0.0.1", - "ipfs_port": 5001 + ], + "cluster_multiaddress": "/ip4/0.0.0.0/tcp/9096", + "api_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", + "ipfs_proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", + "ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001", + "consensus_data_folder": "/home/hector/go/src/github.com/ipfs/ipfs-cluster/ipfs-cluster-service/data", + "raft_config": { + "SnapshotIntervalSeconds": 120, + "EnableSingleNode": true + } } ``` -The configuration file should probably be identical among all cluster members, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` includes its own address, but it does not have to. +The configuration file should probably be identical among all cluster members, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it does not have to. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig). Once every cluster member has the configuration in place, you can run `ipfs-cluster-service` to start the cluster. diff --git a/config.go b/config.go index b5e6b5a5..0675bde3 100644 --- a/config.go +++ b/config.go @@ -24,9 +24,9 @@ const ( DefaultClusterAddr = "/ip4/0.0.0.0/tcp/9096" ) -// Config represents an ipfs-cluster configuration which can be -// saved and loaded from disk. Currently it holds configuration -// values used by all components. +// Config represents an ipfs-cluster configuration. It is used by +// Cluster components. An initialized version of it can be obtained with +// NewDefaultConfig(). type Config struct { // Libp2p ID and private key for Cluster communication (including) // the Consensus component. @@ -58,28 +58,33 @@ type Config struct { RaftConfig *hashiraft.Config } -// This is how a config actually look in JSON +// JSONConfig represents a Cluster configuration as it will look when it is +// saved using JSON. Most configuration keys are converted into simple types +// like strings, and key names aim to be self-explanatory for the user. type JSONConfig struct { // Libp2p ID and private key for Cluster communication (including) // the Consensus component. ID string `json:"id"` PrivateKey string `json:"private_key"` - // List of multiaddresses of the peers of this cluster. + // List of multiaddresses of the peers of this cluster. This list may + // include the multiaddress of this node. ClusterPeers []string `json:"cluster_peers"` - // Listen parameters for the Cluster libp2p Host. Used by - // the RPC and Consensus components. + // Listen address for the Cluster libp2p host. This is used for + // interal RPC and Consensus communications between cluster members. ClusterListenMultiaddress string `json:"cluster_multiaddress"` - // Listen parameters for the the Cluster HTTP API component. + // Listen address for the the Cluster HTTP API component. + // Tools like ipfs-cluster-ctl will connect to his endpoint to + // manage cluster. APIListenMultiaddress string `json:"api_listen_multiaddress"` - // Listen parameters for the IPFS Proxy. Used by the IPFS - // connector component. + // Listen address for the IPFS Proxy, which forwards requests to + // an IPFS daemon. IPFSProxyListenMultiaddress string `json:"ipfs_proxy_listen_multiaddress"` - // Host/Port for the IPFS daemon. + // API address for the IPFS daemon. IPFSNodeMultiaddress string `json:"ipfs_node_multiaddress"` // Storage folder for snapshots, log store etc. Used by @@ -90,11 +95,17 @@ type JSONConfig struct { RaftConfig *RaftConfig `json:"raft_config"` } +// RaftConfig is a configuration section which affects the behaviour of +// the Raft component. See https://godoc.org/github.com/hashicorp/raft#Config +// for more information. Only the options below are customizable, the rest will +// take the default values from raft.DefaultConfig(). type RaftConfig struct { - SnapshotIntervalSeconds int `json:snapshot_interval_seconds` - EnableSingleNode bool `json:enable_single_node` + SnapshotIntervalSeconds int `json:"snapshot_interval_seconds"` + EnableSingleNode bool `json:"enable_single_node"` } +// ToJSONConfig converts a Config object to its JSON representation which +// is focused on user presentation and easy understanding. func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { // Multiaddress String() may panic defer func() { @@ -130,6 +141,8 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { return } +// ToConfig converts a JSONConfig to its internal Config representation, +// where options are parsed into their native types. func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { id, err := peer.IDB58Decode(jcfg.ID) if err != nil { @@ -222,8 +235,8 @@ func LoadConfig(path string) (*Config, error) { } // Save stores a configuration as a JSON file in the given path. -func (c *Config) Save(path string) error { - jcfg, err := c.ToJSONConfig() +func (cfg *Config) Save(path string) error { + jcfg, err := cfg.ToJSONConfig() if err != nil { logger.Error("error generating JSON config") return err From e932b2f3f6ba32373cdc3a422e93f63eb4f68845 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 23 Jan 2017 21:07:16 +0100 Subject: [PATCH 6/8] Fix tests with raftCfg config section and do not panic if its not there License: MIT Signed-off-by: Hector Sanjuan --- config.go | 6 ++++-- config_test.go | 4 ++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/config.go b/config.go index 0675bde3..cce4f6d3 100644 --- a/config.go +++ b/config.go @@ -195,8 +195,10 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { } raftCfg := hashiraft.DefaultConfig() - raftCfg.SnapshotInterval = time.Duration(jcfg.RaftConfig.SnapshotIntervalSeconds) * time.Second - raftCfg.EnableSingleNode = jcfg.RaftConfig.EnableSingleNode + if jcfg.RaftConfig != nil { + raftCfg.SnapshotInterval = time.Duration(jcfg.RaftConfig.SnapshotIntervalSeconds) * time.Second + raftCfg.EnableSingleNode = jcfg.RaftConfig.EnableSingleNode + } c = &Config{ ID: id, diff --git a/config_test.go b/config_test.go index 00b1dac9..de7a1201 100644 --- a/config_test.go +++ b/config_test.go @@ -9,6 +9,10 @@ func testingConfig() *Config { APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002", IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001", ConsensusDataFolder: "./raftFolderFromTests", + RaftConfig: &RaftConfig{ + EnableSingleNode: true, + SnapshotIntervalSeconds: 120, + }, } cfg, _ := jcfg.ToConfig() From afa8a5c33faad76f6a178f76ee71858c319c85e5 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 23 Jan 2017 23:58:04 +0100 Subject: [PATCH 7/8] Improve startup messages and information License: MIT Signed-off-by: Hector Sanjuan --- README.md | 2 +- cluster.go | 6 +++--- consensus.go | 10 +++++----- ipfs_http_connector.go | 15 +++++++++++---- map_pin_tracker.go | 4 ++-- rest_api.go | 4 +++- 6 files changed, 25 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index fc044ac3..dd8a84e7 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ You can add the multiaddresses for the other members of the cluster in the `clus "api_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "ipfs_proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001", - "consensus_data_folder": "/home/hector/go/src/github.com/ipfs/ipfs-cluster/ipfs-cluster-service/data", + "consensus_data_folder": "/home/user/.ipfs-cluster/data", "raft_config": { "SnapshotIntervalSeconds": 120, "EnableSingleNode": true diff --git a/cluster.go b/cluster.go index 62e955c1..3ac491c3 100644 --- a/cluster.go +++ b/cluster.go @@ -51,6 +51,8 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P rpcServer := rpc.NewServer(host, RPCProtocol) rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer) + logger.Infof("IPFS Cluster v%s - %s/ipfs/%s", Version, cfg.ClusterAddr, host.ID().Pretty()) + consensus, err := NewConsensus(cfg, host, state) if err != nil { logger.Errorf("error creating consensus: %s", err) @@ -88,8 +90,6 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P consensus.SetClient(rpcClient) }() - logger.Infof("starting IPFS Cluster v%s", Version) - cluster.run() return cluster, nil } @@ -136,7 +136,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) { return nil, err } - logger.Info("syncing state to tracker") + logger.Debug("syncing state to tracker") clusterPins := cState.ListPins() var changed []*cid.Cid diff --git a/consensus.go b/consensus.go index 5e90f12b..f0220cce 100644 --- a/consensus.go +++ b/consensus.go @@ -136,8 +136,7 @@ func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) rpcReady: make(chan struct{}, 1), } - logger.Info("starting Consensus component") - logger.Infof("waiting %d seconds for leader", LeaderTimeout/time.Second) + logger.Infof("starting Consensus: waiting %d seconds for leader...", LeaderTimeout/time.Second) con, actor, wrapper, err := makeLibp2pRaft(cc.cfg, cc.host, state, cc.baseOp) if err != nil { @@ -162,7 +161,7 @@ func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) return nil, errors.New("no leader was found after timeout") } - logger.Debugf("raft leader is %s", leader) + logger.Infof("Consensus leader found (%s). Syncing state...", leader.Pretty()) cc.run(state) return cc, nil } @@ -178,7 +177,7 @@ func (cc *Consensus) run(state State) { upToDate := make(chan struct{}) go func() { - logger.Info("consensus state is catching up") + logger.Debug("consensus state is catching up") time.Sleep(time.Second) for { lai := cc.p2pRaft.raft.AppliedIndex() @@ -194,7 +193,7 @@ func (cc *Consensus) run(state State) { }() <-upToDate - logger.Info("consensus state is up to date") + logger.Info("Consensus state is up to date") // While rpc is not ready we cannot perform a sync <-cc.rpcReady @@ -208,6 +207,7 @@ func (cc *Consensus) run(state State) { &pInfo, nil) + logger.Infof("IPFS Cluster is running") <-cc.shutdownCh }() } diff --git a/ipfs_http_connector.go b/ipfs_http_connector.go index 74d0479f..1b126ac0 100644 --- a/ipfs_http_connector.go +++ b/ipfs_http_connector.go @@ -41,6 +41,8 @@ var ( // against the configured IPFS daemom (such as a pin request). type IPFSHTTPConnector struct { ctx context.Context + nodeAddr ma.Multiaddr + proxyAddr ma.Multiaddr destHost string destPort int listenAddr string @@ -108,7 +110,10 @@ func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) { s.SetKeepAlivesEnabled(true) // A reminder that this can be changed ipfs := &IPFSHTTPConnector{ - ctx: ctx, + ctx: ctx, + nodeAddr: cfg.IPFSProxyAddr, + proxyAddr: cfg.IPFSNodeAddr, + destHost: destHost, destPort: destPort, listenAddr: listenAddr, @@ -121,7 +126,6 @@ func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) { smux.HandleFunc("/", ipfs.handle) - logger.Infof("starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort) ipfs.run() return ipfs, nil } @@ -179,6 +183,9 @@ func (ipfs *IPFSHTTPConnector) run() { <-ipfs.rpcReady + logger.Infof("IPFS Proxy: %s -> %s", + ipfs.proxyAddr, + ipfs.nodeAddr) err := ipfs.server.Serve(ipfs.listener) if err != nil && !strings.Contains(err.Error(), "closed network connection") { logger.Error(err) @@ -230,7 +237,7 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error { } return err } - logger.Info("IPFS object is already pinned: ", hash) + logger.Debug("IPFS object is already pinned: ", hash) return nil } @@ -250,7 +257,7 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error { return err } - logger.Info("IPFS object is already unpinned: ", hash) + logger.Debug("IPFS object is already unpinned: ", hash) return nil } diff --git a/map_pin_tracker.go b/map_pin_tracker.go index f39b538e..73177151 100644 --- a/map_pin_tracker.go +++ b/map_pin_tracker.go @@ -47,7 +47,6 @@ func NewMapPinTracker(cfg *Config) *MapPinTracker { peerID: cfg.ID, shutdownCh: make(chan struct{}), } - logger.Info("starting MapPinTracker") mpt.run() return mpt } @@ -60,7 +59,8 @@ func (mpt *MapPinTracker) run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() mpt.ctx = ctx - //<-mpt.rpcReady + <-mpt.rpcReady + logger.Info("PinTracker ready") <-mpt.shutdownCh }() } diff --git a/rest_api.go b/rest_api.go index 2ecf3f0f..b21aa1d8 100644 --- a/rest_api.go +++ b/rest_api.go @@ -34,6 +34,7 @@ var ( // a RESTful HTTP API for Cluster. type RESTAPI struct { ctx context.Context + apiAddr ma.Multiaddr listenAddr string listenPort int rpcClient *rpc.Client @@ -122,6 +123,7 @@ func NewRESTAPI(cfg *Config) (*RESTAPI, error) { api := &RESTAPI{ ctx: ctx, + apiAddr: cfg.APIAddr, listenAddr: listenAddr, listenPort: listenPort, listener: l, @@ -138,7 +140,6 @@ func NewRESTAPI(cfg *Config) (*RESTAPI, error) { } api.router = router - logger.Infof("starting Cluster API on %s:%d", api.listenAddr, api.listenPort) api.run() return api, nil } @@ -212,6 +213,7 @@ func (api *RESTAPI) run() { <-api.rpcReady + logger.Infof("REST API: %s", api.apiAddr) err := api.server.Serve(api.listener) if err != nil && !strings.Contains(err.Error(), "closed network connection") { logger.Error(err) From 24c8253c9ed639f8c6a304f267ee65852a80606d Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 24 Jan 2017 00:46:21 +0100 Subject: [PATCH 8/8] Fix config example License: MIT Signed-off-by: Hector Sanjuan --- README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index dd8a84e7..33fbcbd6 100644 --- a/README.md +++ b/README.md @@ -95,10 +95,9 @@ You can add the multiaddresses for the other members of the cluster in the `clus "ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "consensus_data_folder": "/home/user/.ipfs-cluster/data", "raft_config": { - "SnapshotIntervalSeconds": 120, - "EnableSingleNode": true + "snapshot_interval_seconds": 120, + "enable_single_node": true } -} ``` The configuration file should probably be identical among all cluster members, except for the `id` and `private_key` fields. To facilitate configuration, `cluster_peers` may include its own address, but it does not have to. For additional information about the configuration format, see the [JSONConfig documentation](https://godoc.org/github.com/ipfs/ipfs-cluster#JSONConfig).