Improve startup messages and information
License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
e932b2f3f6
commit
afa8a5c33f
|
@ -93,7 +93,7 @@ You can add the multiaddresses for the other members of the cluster in the `clus
|
||||||
"api_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
|
"api_listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
|
||||||
"ipfs_proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
|
"ipfs_proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
|
||||||
"ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
"ipfs_node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||||
"consensus_data_folder": "/home/hector/go/src/github.com/ipfs/ipfs-cluster/ipfs-cluster-service/data",
|
"consensus_data_folder": "/home/user/.ipfs-cluster/data",
|
||||||
"raft_config": {
|
"raft_config": {
|
||||||
"SnapshotIntervalSeconds": 120,
|
"SnapshotIntervalSeconds": 120,
|
||||||
"EnableSingleNode": true
|
"EnableSingleNode": true
|
||||||
|
|
|
@ -51,6 +51,8 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
|
||||||
rpcServer := rpc.NewServer(host, RPCProtocol)
|
rpcServer := rpc.NewServer(host, RPCProtocol)
|
||||||
rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer)
|
rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer)
|
||||||
|
|
||||||
|
logger.Infof("IPFS Cluster v%s - %s/ipfs/%s", Version, cfg.ClusterAddr, host.ID().Pretty())
|
||||||
|
|
||||||
consensus, err := NewConsensus(cfg, host, state)
|
consensus, err := NewConsensus(cfg, host, state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("error creating consensus: %s", err)
|
logger.Errorf("error creating consensus: %s", err)
|
||||||
|
@ -88,8 +90,6 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
|
||||||
consensus.SetClient(rpcClient)
|
consensus.SetClient(rpcClient)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
logger.Infof("starting IPFS Cluster v%s", Version)
|
|
||||||
|
|
||||||
cluster.run()
|
cluster.run()
|
||||||
return cluster, nil
|
return cluster, nil
|
||||||
}
|
}
|
||||||
|
@ -136,7 +136,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("syncing state to tracker")
|
logger.Debug("syncing state to tracker")
|
||||||
clusterPins := cState.ListPins()
|
clusterPins := cState.ListPins()
|
||||||
var changed []*cid.Cid
|
var changed []*cid.Cid
|
||||||
|
|
||||||
|
|
10
consensus.go
10
consensus.go
|
@ -136,8 +136,7 @@ func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error)
|
||||||
rpcReady: make(chan struct{}, 1),
|
rpcReady: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("starting Consensus component")
|
logger.Infof("starting Consensus: waiting %d seconds for leader...", LeaderTimeout/time.Second)
|
||||||
logger.Infof("waiting %d seconds for leader", LeaderTimeout/time.Second)
|
|
||||||
con, actor, wrapper, err := makeLibp2pRaft(cc.cfg,
|
con, actor, wrapper, err := makeLibp2pRaft(cc.cfg,
|
||||||
cc.host, state, cc.baseOp)
|
cc.host, state, cc.baseOp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -162,7 +161,7 @@ func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error)
|
||||||
return nil, errors.New("no leader was found after timeout")
|
return nil, errors.New("no leader was found after timeout")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("raft leader is %s", leader)
|
logger.Infof("Consensus leader found (%s). Syncing state...", leader.Pretty())
|
||||||
cc.run(state)
|
cc.run(state)
|
||||||
return cc, nil
|
return cc, nil
|
||||||
}
|
}
|
||||||
|
@ -178,7 +177,7 @@ func (cc *Consensus) run(state State) {
|
||||||
|
|
||||||
upToDate := make(chan struct{})
|
upToDate := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
logger.Info("consensus state is catching up")
|
logger.Debug("consensus state is catching up")
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
for {
|
for {
|
||||||
lai := cc.p2pRaft.raft.AppliedIndex()
|
lai := cc.p2pRaft.raft.AppliedIndex()
|
||||||
|
@ -194,7 +193,7 @@ func (cc *Consensus) run(state State) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-upToDate
|
<-upToDate
|
||||||
logger.Info("consensus state is up to date")
|
logger.Info("Consensus state is up to date")
|
||||||
|
|
||||||
// While rpc is not ready we cannot perform a sync
|
// While rpc is not ready we cannot perform a sync
|
||||||
<-cc.rpcReady
|
<-cc.rpcReady
|
||||||
|
@ -208,6 +207,7 @@ func (cc *Consensus) run(state State) {
|
||||||
&pInfo,
|
&pInfo,
|
||||||
nil)
|
nil)
|
||||||
|
|
||||||
|
logger.Infof("IPFS Cluster is running")
|
||||||
<-cc.shutdownCh
|
<-cc.shutdownCh
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,8 @@ var (
|
||||||
// against the configured IPFS daemom (such as a pin request).
|
// against the configured IPFS daemom (such as a pin request).
|
||||||
type IPFSHTTPConnector struct {
|
type IPFSHTTPConnector struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
nodeAddr ma.Multiaddr
|
||||||
|
proxyAddr ma.Multiaddr
|
||||||
destHost string
|
destHost string
|
||||||
destPort int
|
destPort int
|
||||||
listenAddr string
|
listenAddr string
|
||||||
|
@ -108,7 +110,10 @@ func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
|
||||||
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
|
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
|
||||||
|
|
||||||
ipfs := &IPFSHTTPConnector{
|
ipfs := &IPFSHTTPConnector{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
nodeAddr: cfg.IPFSProxyAddr,
|
||||||
|
proxyAddr: cfg.IPFSNodeAddr,
|
||||||
|
|
||||||
destHost: destHost,
|
destHost: destHost,
|
||||||
destPort: destPort,
|
destPort: destPort,
|
||||||
listenAddr: listenAddr,
|
listenAddr: listenAddr,
|
||||||
|
@ -121,7 +126,6 @@ func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
|
||||||
|
|
||||||
smux.HandleFunc("/", ipfs.handle)
|
smux.HandleFunc("/", ipfs.handle)
|
||||||
|
|
||||||
logger.Infof("starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort)
|
|
||||||
ipfs.run()
|
ipfs.run()
|
||||||
return ipfs, nil
|
return ipfs, nil
|
||||||
}
|
}
|
||||||
|
@ -179,6 +183,9 @@ func (ipfs *IPFSHTTPConnector) run() {
|
||||||
|
|
||||||
<-ipfs.rpcReady
|
<-ipfs.rpcReady
|
||||||
|
|
||||||
|
logger.Infof("IPFS Proxy: %s -> %s",
|
||||||
|
ipfs.proxyAddr,
|
||||||
|
ipfs.nodeAddr)
|
||||||
err := ipfs.server.Serve(ipfs.listener)
|
err := ipfs.server.Serve(ipfs.listener)
|
||||||
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
|
@ -230,7 +237,7 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logger.Info("IPFS object is already pinned: ", hash)
|
logger.Debug("IPFS object is already pinned: ", hash)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,7 +257,7 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("IPFS object is already unpinned: ", hash)
|
logger.Debug("IPFS object is already unpinned: ", hash)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,6 @@ func NewMapPinTracker(cfg *Config) *MapPinTracker {
|
||||||
peerID: cfg.ID,
|
peerID: cfg.ID,
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
logger.Info("starting MapPinTracker")
|
|
||||||
mpt.run()
|
mpt.run()
|
||||||
return mpt
|
return mpt
|
||||||
}
|
}
|
||||||
|
@ -60,7 +59,8 @@ func (mpt *MapPinTracker) run() {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
mpt.ctx = ctx
|
mpt.ctx = ctx
|
||||||
//<-mpt.rpcReady
|
<-mpt.rpcReady
|
||||||
|
logger.Info("PinTracker ready")
|
||||||
<-mpt.shutdownCh
|
<-mpt.shutdownCh
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ var (
|
||||||
// a RESTful HTTP API for Cluster.
|
// a RESTful HTTP API for Cluster.
|
||||||
type RESTAPI struct {
|
type RESTAPI struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
apiAddr ma.Multiaddr
|
||||||
listenAddr string
|
listenAddr string
|
||||||
listenPort int
|
listenPort int
|
||||||
rpcClient *rpc.Client
|
rpcClient *rpc.Client
|
||||||
|
@ -122,6 +123,7 @@ func NewRESTAPI(cfg *Config) (*RESTAPI, error) {
|
||||||
|
|
||||||
api := &RESTAPI{
|
api := &RESTAPI{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
apiAddr: cfg.APIAddr,
|
||||||
listenAddr: listenAddr,
|
listenAddr: listenAddr,
|
||||||
listenPort: listenPort,
|
listenPort: listenPort,
|
||||||
listener: l,
|
listener: l,
|
||||||
|
@ -138,7 +140,6 @@ func NewRESTAPI(cfg *Config) (*RESTAPI, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
api.router = router
|
api.router = router
|
||||||
logger.Infof("starting Cluster API on %s:%d", api.listenAddr, api.listenPort)
|
|
||||||
api.run()
|
api.run()
|
||||||
return api, nil
|
return api, nil
|
||||||
}
|
}
|
||||||
|
@ -212,6 +213,7 @@ func (api *RESTAPI) run() {
|
||||||
|
|
||||||
<-api.rpcReady
|
<-api.rpcReady
|
||||||
|
|
||||||
|
logger.Infof("REST API: %s", api.apiAddr)
|
||||||
err := api.server.Serve(api.listener)
|
err := api.server.Serve(api.listener)
|
||||||
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user