Lowercase error messages
License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
a655288fd6
commit
09cc7e9265
8
api.go
8
api.go
|
@ -101,7 +101,7 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
|
|||
}
|
||||
|
||||
api.router = router
|
||||
logger.Infof("Starting Cluster API on %s:%d", api.listenAddr, api.listenPort)
|
||||
logger.Infof("starting Cluster API on %s:%d", api.listenAddr, api.listenPort)
|
||||
api.run()
|
||||
return api, nil
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ func (api *ClusterHTTPAPI) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Stopping Cluster API")
|
||||
logger.Info("stopping Cluster API")
|
||||
|
||||
// Cancel any outstanding ops
|
||||
api.server.SetKeepAlivesEnabled(false)
|
||||
|
@ -307,7 +307,7 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool {
|
|||
if !ok {
|
||||
logger.Errorf("unexpected RPC Response format for %d:", op)
|
||||
logger.Errorf("%+v", resp.Data)
|
||||
sendErrorResponse(w, 500, "Unexpected RPC Response format")
|
||||
sendErrorResponse(w, 500, "unexpected RPC Response format")
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -331,6 +331,6 @@ func sendJSONResponse(w http.ResponseWriter, code int, resp interface{}) {
|
|||
|
||||
func sendErrorResponse(w http.ResponseWriter, code int, msg string) {
|
||||
errorResp := errorResp{code, msg}
|
||||
logger.Errorf("Sending error response: %d: %s", code, msg)
|
||||
logger.Errorf("sending error response: %d: %s", code, msg)
|
||||
sendJSONResponse(w, code, errorResp)
|
||||
}
|
||||
|
|
28
cluster.go
28
cluster.go
|
@ -51,7 +51,7 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
|
|||
|
||||
consensus, err := NewClusterConsensus(cfg, host, state)
|
||||
if err != nil {
|
||||
logger.Errorf("Error creating consensus: %s", err)
|
||||
logger.Errorf("error creating consensus: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -67,10 +67,10 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
|
|||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
logger.Info("Starting IPFS Cluster")
|
||||
logger.Info("starting IPFS Cluster")
|
||||
|
||||
cluster.run()
|
||||
logger.Info("Performing State synchronization")
|
||||
logger.Info("performing State synchronization")
|
||||
cluster.Sync()
|
||||
return cluster, nil
|
||||
}
|
||||
|
@ -84,22 +84,22 @@ func (c *Cluster) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Shutting down IPFS Cluster")
|
||||
logger.Info("shutting down IPFS Cluster")
|
||||
if err := c.consensus.Shutdown(); err != nil {
|
||||
logger.Errorf("Error stopping consensus: %s", err)
|
||||
logger.Errorf("error stopping consensus: %s", err)
|
||||
return err
|
||||
}
|
||||
if err := c.api.Shutdown(); err != nil {
|
||||
logger.Errorf("Error stopping API: %s", err)
|
||||
logger.Errorf("error stopping API: %s", err)
|
||||
return err
|
||||
}
|
||||
if err := c.ipfs.Shutdown(); err != nil {
|
||||
logger.Errorf("Error stopping IPFS Connector: %s", err)
|
||||
logger.Errorf("error stopping IPFS Connector: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.tracker.Shutdown(); err != nil {
|
||||
logger.Errorf("Error stopping PinTracker: %s", err)
|
||||
logger.Errorf("error stopping PinTracker: %s", err)
|
||||
return err
|
||||
}
|
||||
c.shutdownCh <- struct{}{}
|
||||
|
@ -114,7 +114,7 @@ func (c *Cluster) Sync() error {
|
|||
}
|
||||
changed := c.tracker.SyncState(cState)
|
||||
for _, p := range changed {
|
||||
logger.Debugf("Recovering %s", p.Cid)
|
||||
logger.Debugf("recovering %s", p.Cid)
|
||||
c.tracker.Recover(p.Cid)
|
||||
}
|
||||
return nil
|
||||
|
@ -130,7 +130,7 @@ func (c *Cluster) Sync() error {
|
|||
// of underlying IPFS daemon pinning operations.
|
||||
func (c *Cluster) Pin(h *cid.Cid) error {
|
||||
// TODO: Check this hash makes any sense
|
||||
logger.Info("Pinning:", h)
|
||||
logger.Info("pinning:", h)
|
||||
err := c.consensus.AddPin(h)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
|
@ -149,7 +149,7 @@ func (c *Cluster) Pin(h *cid.Cid) error {
|
|||
// of underlying IPFS daemon unpinning operations.
|
||||
func (c *Cluster) Unpin(h *cid.Cid) error {
|
||||
// TODO: Check this hash makes any sense
|
||||
logger.Info("Unpinning:", h)
|
||||
logger.Info("unpinning:", h)
|
||||
err := c.consensus.RmPin(h)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
|
@ -224,7 +224,7 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {
|
|||
case RollbackRPC:
|
||||
state, ok := grpc.Argument.(ClusterState)
|
||||
if !ok {
|
||||
err = errors.New("Bad RollbackRPC type")
|
||||
err = errors.New("bad RollbackRPC type")
|
||||
break
|
||||
}
|
||||
err = c.consensus.Rollback(state)
|
||||
|
@ -233,7 +233,7 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {
|
|||
// by the Consensus Leader. Arguments is a wrapped RPC.
|
||||
rpc, ok := grpc.Argument.(*ClusterRPC)
|
||||
if !ok {
|
||||
err = errors.New("Bad LeaderRPC type")
|
||||
err = errors.New("bad LeaderRPC type")
|
||||
}
|
||||
data, err = c.leaderRPC(rpc)
|
||||
default:
|
||||
|
@ -290,7 +290,7 @@ func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) {
|
|||
|
||||
// This uses libp2p to contact the cluster leader and ask him to do something
|
||||
func (c *Cluster) leaderRPC(rpc *ClusterRPC) (interface{}, error) {
|
||||
return nil, errors.New("Not implemented yet")
|
||||
return nil, errors.New("not implemented yet")
|
||||
}
|
||||
|
||||
// makeHost makes a libp2p-host
|
||||
|
|
|
@ -201,7 +201,7 @@ func TestClusterRun(t *testing.T) {
|
|||
select {
|
||||
case <-rpc.ResponseCh():
|
||||
case <-timer.C:
|
||||
t.Errorf("Generic RPC %d was not handled correctly by Cluster", i)
|
||||
t.Errorf("GenericRPC %d was not handled correctly by Cluster", i)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,7 +223,7 @@ func TestClusterRun(t *testing.T) {
|
|||
select {
|
||||
case <-rpc.ResponseCh():
|
||||
case <-timer.C:
|
||||
t.Errorf("Cid RPC %d was not handled correctly by Cluster", i)
|
||||
t.Errorf("CidRPC %d was not handled correctly by Cluster", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
18
consensus.go
18
consensus.go
|
@ -47,13 +47,13 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
|
|||
var err error
|
||||
if !ok {
|
||||
// Should never be here
|
||||
panic("Received unexpected state type")
|
||||
panic("received unexpected state type")
|
||||
}
|
||||
|
||||
c, err := cid.Decode(op.Cid)
|
||||
if err != nil {
|
||||
// Should never be here
|
||||
panic("Could not decode a CID we ourselves encoded")
|
||||
panic("could not decode a CID we ourselves encoded")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(op.ctx)
|
||||
|
@ -116,7 +116,7 @@ type ClusterConsensus struct {
|
|||
// is used to initialize the Consensus system, so any information in it
|
||||
// is discarded.
|
||||
func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) (*ClusterConsensus, error) {
|
||||
logger.Info("Starting Consensus component")
|
||||
logger.Info("starting Consensus component")
|
||||
ctx := context.Background()
|
||||
rpcCh := make(chan ClusterRPC, RPCMaxQueue)
|
||||
op := &clusterLogOp{
|
||||
|
@ -143,7 +143,7 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
|
|||
cc.run()
|
||||
|
||||
// FIXME: this is broken.
|
||||
logger.Info("Waiting for Consensus state to catch up")
|
||||
logger.Info("waiting for Consensus state to catch up")
|
||||
time.Sleep(1 * time.Second)
|
||||
start := time.Now()
|
||||
for {
|
||||
|
@ -153,7 +153,7 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
|
|||
if lai == li || time.Since(start) > MaxStartupDelay {
|
||||
break
|
||||
}
|
||||
logger.Debugf("Waiting for Raft index: %d/%d", lai, li)
|
||||
logger.Debugf("waiting for Raft index: %d/%d", lai, li)
|
||||
}
|
||||
|
||||
return cc, nil
|
||||
|
@ -183,7 +183,7 @@ func (cc *ClusterConsensus) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Stopping Consensus component")
|
||||
logger.Info("stopping Consensus component")
|
||||
|
||||
// Cancel any outstanding makeRPCs
|
||||
cc.shutdownCh <- struct{}{}
|
||||
|
@ -242,7 +242,7 @@ func (cc *ClusterConsensus) AddPin(c *cid.Cid) error {
|
|||
// This means the op did not make it to the log
|
||||
return err
|
||||
}
|
||||
logger.Infof("Pin commited to global state: %s", c)
|
||||
logger.Infof("pin commited to global state: %s", c)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -254,7 +254,7 @@ func (cc *ClusterConsensus) RmPin(c *cid.Cid) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Infof("Unpin commited to global state: %s", c)
|
||||
logger.Infof("unpin commited to global state: %s", c)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -265,7 +265,7 @@ func (cc *ClusterConsensus) State() (ClusterState, error) {
|
|||
}
|
||||
state, ok := st.(ClusterState)
|
||||
if !ok {
|
||||
return nil, errors.New("Wrong state type")
|
||||
return nil, errors.New("wrong state type")
|
||||
}
|
||||
return state, nil
|
||||
}
|
||||
|
|
|
@ -118,17 +118,17 @@ type PinTracker interface {
|
|||
// If the message cannot be placed in the ClusterRPC channel, retries will be
|
||||
// issued every MakeRPCRetryInterval.
|
||||
func MakeRPC(ctx context.Context, rpcCh chan ClusterRPC, r ClusterRPC, waitForResponse bool) RPCResponse {
|
||||
logger.Debugf("Sending RPC %d", r.Op())
|
||||
logger.Debugf("sending RPC %d", r.Op())
|
||||
exitLoop := false
|
||||
for !exitLoop {
|
||||
select {
|
||||
case rpcCh <- r:
|
||||
exitLoop = true
|
||||
case <-ctx.Done():
|
||||
logger.Debug("Cancelling sending RPC")
|
||||
logger.Debug("cancelling sending RPC")
|
||||
return RPCResponse{
|
||||
Data: nil,
|
||||
Error: errors.New("Operation timed out while sending RPC"),
|
||||
Error: errors.New("operation timed out while sending RPC"),
|
||||
}
|
||||
default:
|
||||
logger.Error("RPC channel is full. Will retry.")
|
||||
|
@ -136,16 +136,16 @@ func MakeRPC(ctx context.Context, rpcCh chan ClusterRPC, r ClusterRPC, waitForRe
|
|||
}
|
||||
}
|
||||
if !waitForResponse {
|
||||
logger.Debug("Not waiting for response. Returning directly")
|
||||
logger.Debug("not waiting for response. Returning directly")
|
||||
return RPCResponse{}
|
||||
}
|
||||
|
||||
logger.Debug("Waiting for response")
|
||||
logger.Debug("waiting for response")
|
||||
select {
|
||||
case resp, ok := <-r.ResponseCh():
|
||||
logger.Debug("Response received")
|
||||
logger.Debug("response received")
|
||||
if !ok {
|
||||
logger.Warning("Response channel closed. Ignoring")
|
||||
logger.Warning("response channel closed. Ignoring")
|
||||
return RPCResponse{
|
||||
Data: nil,
|
||||
Error: nil,
|
||||
|
@ -153,10 +153,10 @@ func MakeRPC(ctx context.Context, rpcCh chan ClusterRPC, r ClusterRPC, waitForRe
|
|||
}
|
||||
return resp
|
||||
case <-ctx.Done():
|
||||
logger.Debug("Cancelling waiting for RPC Response")
|
||||
logger.Debug("cancelling waiting for RPC Response")
|
||||
return RPCResponse{
|
||||
Data: nil,
|
||||
Error: errors.New("Operation timed out while waiting for response"),
|
||||
Error: errors.New("operation timed out while waiting for response"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
|
|||
|
||||
smux.HandleFunc("/", ipfs.handle)
|
||||
|
||||
logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort)
|
||||
logger.Infof("starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort)
|
||||
ipfs.run()
|
||||
return ipfs, nil
|
||||
}
|
||||
|
@ -153,7 +153,7 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Stopping IPFS Proxy")
|
||||
logger.Info("stopping IPFS Proxy")
|
||||
|
||||
ipfs.server.SetKeepAlivesEnabled(false)
|
||||
ipfs.listener.Close()
|
||||
|
@ -250,20 +250,20 @@ func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
|
|||
// get performs the heavy lifting of a get request against
|
||||
// the IPFS daemon.
|
||||
func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) {
|
||||
logger.Debugf("Getting %s", path)
|
||||
logger.Debugf("getting %s", path)
|
||||
url := fmt.Sprintf("%s/%s",
|
||||
ipfs.apiURL(),
|
||||
path)
|
||||
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
logger.Error("Error getting:", err)
|
||||
logger.Error("error getting:", err)
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("Error reading response body: %s", err)
|
||||
logger.Errorf("error reading response body: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ func NewMapPinTracker() *MapPinTracker {
|
|||
ctx: ctx,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
logger.Info("starting MapPinTracker")
|
||||
mpt.run()
|
||||
return mpt
|
||||
}
|
||||
|
@ -74,7 +75,7 @@ func (mpt *MapPinTracker) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Stopping MapPinTracker")
|
||||
logger.Info("stopping MapPinTracker")
|
||||
mpt.shutdownCh <- struct{}{}
|
||||
mpt.wg.Wait()
|
||||
mpt.shutdown = true
|
||||
|
|
Loading…
Reference in New Issue
Block a user