Renames of configuration keys and more docs
License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
ba31e35c1a
commit
4ca39ae9b3
|
@ -435,7 +435,7 @@ func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
|
||||||
publicKey := privateKey.GetPublic()
|
publicKey := privateKey.GetPublic()
|
||||||
|
|
||||||
addr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d",
|
addr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d",
|
||||||
cfg.ConsensusListenAddr, cfg.ConsensusListenPort))
|
cfg.ClusterAddr, cfg.ClusterPort))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
41
config.go
41
config.go
|
@ -6,18 +6,35 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
IPFSHost string `json:"ipfs_host"`
|
// Libp2p ID and private key for Cluster communication (including)
|
||||||
IPFSPort int `json:"ipfs_port"`
|
// the Consensus component.
|
||||||
APIListenAddr string `json:"cluster_api_listen_addr"`
|
ID string `json:"id"`
|
||||||
APIListenPort int `json:"cluster_api_listen_port"`
|
PrivateKey string `json:"private_key"`
|
||||||
IPFSAPIListenAddr string `json:"ipfs_api_listen_addr"`
|
|
||||||
IPFSAPIListenPort int `json:"ipfs_api_listen_port"`
|
// List of multiaddresses of the peers of this cluster.
|
||||||
ConsensusListenAddr string `json:"consensus_listen_addr"`
|
ClusterPeers []string `json:"cluster_peers"`
|
||||||
ConsensusListenPort int `json:"consensus_listen_port"`
|
|
||||||
ClusterPeers []string `json:"cluster_peers"`
|
// Listen parameters for the Cluster libp2p Host. Used by
|
||||||
ID string `json:"id"`
|
// the Remote RPC and Consensus components.
|
||||||
PrivateKey string `json:"private_key"`
|
ClusterAddr string `json:"cluster_addr"`
|
||||||
RaftFolder string `json:"raft_folder"`
|
ClusterPort int `json:"cluster_port"`
|
||||||
|
|
||||||
|
// Storage folder for snapshots, log store etc. Used by
|
||||||
|
// the Consensus component.
|
||||||
|
ConsensusDataFolder string `json:"consensus_data_folder"`
|
||||||
|
|
||||||
|
// Listen parameters for the the Cluster HTTP API component.
|
||||||
|
APIAddr string `json:"api_addr"`
|
||||||
|
APIPort int `json:"api_port"`
|
||||||
|
|
||||||
|
// Listen parameters for the IPFS Proxy. Used by the IPFS
|
||||||
|
// connector component.
|
||||||
|
IPFSAPIAddr string `json:"ipfs_api_addr"`
|
||||||
|
IPFSAPIPort int `json:"ipfs_api_port"`
|
||||||
|
|
||||||
|
// Host/Port for the IPFS daemon.
|
||||||
|
IPFSAddr string `json:"ipfs_addr"`
|
||||||
|
IPFSPort int `json:"ipfs_port"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadConfig(path string) (*Config, error) {
|
func LoadConfig(path string) (*Config, error) {
|
||||||
|
|
|
@ -40,7 +40,7 @@ func main() {
|
||||||
|
|
||||||
state := ipfscluster.NewMapState()
|
state := ipfscluster.NewMapState()
|
||||||
tracker := ipfscluster.NewMapPinTracker()
|
tracker := ipfscluster.NewMapPinTracker()
|
||||||
remote := NewLibp2pRemote()
|
remote := ipfscluster.NewLibp2pRemote()
|
||||||
|
|
||||||
cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker, remote)
|
cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker, remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -49,7 +49,7 @@ type ipfsError struct {
|
||||||
func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
|
func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
|
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
|
||||||
cfg.IPFSAPIListenAddr, cfg.IPFSAPIListenPort))
|
cfg.IPFSAPIAddr, cfg.IPFSAPIPort))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -62,10 +62,10 @@ func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
|
||||||
|
|
||||||
ipfs := &IPFSHTTPConnector{
|
ipfs := &IPFSHTTPConnector{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
destHost: cfg.IPFSHost,
|
destHost: cfg.IPFSAddr,
|
||||||
destPort: cfg.IPFSPort,
|
destPort: cfg.IPFSPort,
|
||||||
listenAddr: cfg.IPFSAPIListenAddr,
|
listenAddr: cfg.IPFSAPIAddr,
|
||||||
listenPort: cfg.IPFSAPIListenPort,
|
listenPort: cfg.IPFSAPIPort,
|
||||||
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
|
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
|
||||||
rpcCh: make(chan RPC, RPCMaxQueue),
|
rpcCh: make(chan RPC, RPCMaxQueue),
|
||||||
listener: l,
|
listener: l,
|
|
@ -5,16 +5,20 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// This is an ipfs daemon mock which should sustain the functionality used by
|
// This is an ipfs daemon mock which should sustain the functionality used by
|
||||||
// used by ipfscluster.
|
// ipfscluster.
|
||||||
|
|
||||||
type ipfsMock struct {
|
type ipfsMock struct {
|
||||||
server *httptest.Server
|
server *httptest.Server
|
||||||
|
addr string
|
||||||
|
port int
|
||||||
pinMap *MapState
|
pinMap *MapState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,6 +46,13 @@ func newIpfsMock() *ipfsMock {
|
||||||
}
|
}
|
||||||
ts := httptest.NewServer(http.HandlerFunc(m.handler))
|
ts := httptest.NewServer(http.HandlerFunc(m.handler))
|
||||||
m.server = ts
|
m.server = ts
|
||||||
|
|
||||||
|
url, _ := url.Parse(mock.server.URL)
|
||||||
|
h := strings.Split(url.Host, ":")
|
||||||
|
i, _ := strconv.Atoi(h[1])
|
||||||
|
|
||||||
|
m.port = i
|
||||||
|
m.addr = h[0]
|
||||||
return m
|
return m
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
4
raft.go
4
raft.go
|
@ -56,14 +56,14 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp)
|
||||||
raftCfg.Logger = nil
|
raftCfg.Logger = nil
|
||||||
}
|
}
|
||||||
logger.Debug("creating file snapshot store")
|
logger.Debug("creating file snapshot store")
|
||||||
snapshots, err := hashiraft.NewFileSnapshotStore(cfg.RaftFolder, maxSnapshots, nil)
|
snapshots, err := hashiraft.NewFileSnapshotStore(cfg.ConsensusDataFolder, maxSnapshots, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating file snapshot store: ", err)
|
logger.Error("creating file snapshot store: ", err)
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debug("creating BoltDB log store")
|
logger.Debug("creating BoltDB log store")
|
||||||
logStore, err := raftboltdb.NewBoltStore(filepath.Join(cfg.RaftFolder, "raft.db"))
|
logStore, err := raftboltdb.NewBoltStore(filepath.Join(cfg.ConsensusDataFolder, "raft.db"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating bolt store: ", err)
|
logger.Error("creating bolt store: ", err)
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
|
|
|
@ -16,9 +16,9 @@ import (
|
||||||
mux "github.com/gorilla/mux"
|
mux "github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ClusterHTTPAPI implements a API and aims to provides
|
// RESTAPI implements an API and aims to provides
|
||||||
// a RESTful HTTP API for Cluster.
|
// a RESTful HTTP API for Cluster.
|
||||||
type ClusterHTTPAPI struct {
|
type RESTAPI struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
listenAddr string
|
listenAddr string
|
||||||
listenPort int
|
listenPort int
|
||||||
|
@ -70,11 +70,11 @@ type statusResp []statusCidResp
|
||||||
|
|
||||||
// NewHTTPAPI creates a new object which is ready to be
|
// NewHTTPAPI creates a new object which is ready to be
|
||||||
// started.
|
// started.
|
||||||
func NewHTTPAPI(cfg *Config) (*ClusterHTTPAPI, error) {
|
func NewHTTPAPI(cfg *Config) (*RESTAPI, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
|
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
|
||||||
cfg.APIListenAddr,
|
cfg.APIAddr,
|
||||||
cfg.APIListenPort))
|
cfg.APIPort))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -83,10 +83,10 @@ func NewHTTPAPI(cfg *Config) (*ClusterHTTPAPI, error) {
|
||||||
s := &http.Server{Handler: router}
|
s := &http.Server{Handler: router}
|
||||||
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
|
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
|
||||||
|
|
||||||
api := &ClusterHTTPAPI{
|
api := &RESTAPI{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
listenAddr: cfg.APIListenAddr,
|
listenAddr: cfg.APIAddr,
|
||||||
listenPort: cfg.APIListenPort,
|
listenPort: cfg.APIPort,
|
||||||
listener: l,
|
listener: l,
|
||||||
server: s,
|
server: s,
|
||||||
rpcCh: make(chan RPC, RPCMaxQueue),
|
rpcCh: make(chan RPC, RPCMaxQueue),
|
||||||
|
@ -106,7 +106,7 @@ func NewHTTPAPI(cfg *Config) (*ClusterHTTPAPI, error) {
|
||||||
return api, nil
|
return api, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) routes() []route {
|
func (api *RESTAPI) routes() []route {
|
||||||
return []route{
|
return []route{
|
||||||
route{
|
route{
|
||||||
"Members",
|
"Members",
|
||||||
|
@ -165,7 +165,7 @@ func (api *ClusterHTTPAPI) routes() []route {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) run() {
|
func (api *RESTAPI) run() {
|
||||||
api.wg.Add(1)
|
api.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer api.wg.Done()
|
defer api.wg.Done()
|
||||||
|
@ -180,7 +180,7 @@ func (api *ClusterHTTPAPI) run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown stops any API listeners.
|
// Shutdown stops any API listeners.
|
||||||
func (api *ClusterHTTPAPI) Shutdown() error {
|
func (api *RESTAPI) Shutdown() error {
|
||||||
api.shutdownLock.Lock()
|
api.shutdownLock.Lock()
|
||||||
defer api.shutdownLock.Unlock()
|
defer api.shutdownLock.Unlock()
|
||||||
|
|
||||||
|
@ -202,11 +202,11 @@ func (api *ClusterHTTPAPI) Shutdown() error {
|
||||||
|
|
||||||
// RpcChan can be used by Cluster to read any
|
// RpcChan can be used by Cluster to read any
|
||||||
// requests from this component
|
// requests from this component
|
||||||
func (api *ClusterHTTPAPI) RpcChan() <-chan RPC {
|
func (api *RESTAPI) RpcChan() <-chan RPC {
|
||||||
return api.rpcCh
|
return api.rpcCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) versionHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) versionHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
rRpc := NewRPC(VersionRPC, nil)
|
rRpc := NewRPC(VersionRPC, nil)
|
||||||
|
@ -217,7 +217,7 @@ func (api *ClusterHTTPAPI) versionHandler(w http.ResponseWriter, r *http.Request
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) memberListHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) memberListHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
rRpc := NewRPC(MemberListRPC, nil)
|
rRpc := NewRPC(MemberListRPC, nil)
|
||||||
|
@ -232,7 +232,7 @@ func (api *ClusterHTTPAPI) memberListHandler(w http.ResponseWriter, r *http.Requ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) pinHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) pinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -245,7 +245,7 @@ func (api *ClusterHTTPAPI) pinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) unpinHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) unpinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -258,7 +258,7 @@ func (api *ClusterHTTPAPI) unpinHandler(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) pinListHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
rRpc := NewRPC(PinListRPC, nil)
|
rRpc := NewRPC(PinListRPC, nil)
|
||||||
|
@ -270,7 +270,7 @@ func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
rRpc := NewRPC(StatusRPC, nil)
|
rRpc := NewRPC(StatusRPC, nil)
|
||||||
|
@ -280,7 +280,7 @@ func (api *ClusterHTTPAPI) statusHandler(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -293,7 +293,7 @@ func (api *ClusterHTTPAPI) statusCidHandler(w http.ResponseWriter, r *http.Reque
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
rRpc := NewRPC(LocalSyncRPC, nil)
|
rRpc := NewRPC(LocalSyncRPC, nil)
|
||||||
|
@ -303,7 +303,7 @@ func (api *ClusterHTTPAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ClusterHTTPAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) {
|
func (api *RESTAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithCancel(api.ctx)
|
ctx, cancel := context.WithCancel(api.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user