diff --git a/api.go b/api.go index 59d20a92..4750a6f0 100644 --- a/api.go +++ b/api.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net" "net/http" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" @@ -22,6 +23,11 @@ type ClusterHTTPAPI struct { listenPort int rpcCh chan ClusterRPC router *mux.Router + + listener net.Listener + + doneCh chan bool + shutdownCh chan bool } type route struct { @@ -63,12 +69,22 @@ type pinListResp []pinElemResp // started. func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { ctx, cancel := context.WithCancel(context.Background()) + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", + cfg.ClusterAPIListenAddr, + cfg.ClusterAPIListenPort)) + if err != nil { + return nil, err + } + api := &ClusterHTTPAPI{ ctx: ctx, cancel: cancel, listenAddr: cfg.ClusterAPIListenAddr, listenPort: cfg.ClusterAPIListenPort, + listener: l, rpcCh: make(chan ClusterRPC, RPCMaxQueue), + doneCh: make(chan bool), + shutdownCh: make(chan bool), } router := mux.NewRouter().StrictSlash(true) @@ -123,20 +139,14 @@ func (api *ClusterHTTPAPI) routes() []route { func (api *ClusterHTTPAPI) run() { go func() { - // FIXME: make this with closable net listener - err := http.ListenAndServe( - fmt.Sprintf("%s:%d", api.listenAddr, api.listenPort), - api.router) - if err != nil { - logger.Error("starting ClusterHTTPAPI server:", err) - return - } - }() - // FIXME - go func() { + err := http.Serve(api.listener, api.router) select { - case <-api.ctx.Done(): - return + case <-api.shutdownCh: + close(api.doneCh) + default: + if err != nil { + logger.Error(err) + } } }() } @@ -145,6 +155,9 @@ func (api *ClusterHTTPAPI) run() { func (api *ClusterHTTPAPI) Shutdown() error { logger.Info("Stopping Cluster API") api.cancel() + close(api.shutdownCh) + api.listener.Close() + <-api.doneCh return nil } diff --git a/ipfs_connector.go b/ipfs_connector.go index 5f438d11..df0db9e1 100644 --- a/ipfs_connector.go +++ b/ipfs_connector.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "strings" @@ -31,6 +32,10 @@ type IPFSHTTPConnector struct { listenPort int handlers map[string]func(http.ResponseWriter, *http.Request) rpcCh chan ClusterRPC + listener net.Listener + + shutdownCh chan bool + doneCh chan bool } type ipfsError struct { @@ -40,6 +45,11 @@ type ipfsError struct { // NewIPFSHTTPConnector creates the component and leaves it ready to be started func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { ctx, cancel := context.WithCancel(context.Background()) + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", + cfg.IPFSAPIListenAddr, cfg.IPFSAPIListenPort)) + if err != nil { + return nil, err + } ipfs := &IPFSHTTPConnector{ ctx: ctx, cancel: cancel, @@ -49,6 +59,9 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { listenPort: cfg.IPFSAPIListenPort, handlers: make(map[string]func(http.ResponseWriter, *http.Request)), rpcCh: make(chan ClusterRPC, RPCMaxQueue), + listener: l, + shutdownCh: make(chan bool), + doneCh: make(chan bool), } logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort) @@ -104,19 +117,14 @@ func (ipfs *IPFSHTTPConnector) run() { smux := http.NewServeMux() smux.HandleFunc("/", ipfs.handle) // Fixme: make this with closable net listener - err := http.ListenAndServe( - fmt.Sprintf("%s:%d", ipfs.listenAddr, ipfs.listenPort), - smux) - if err != nil { - logger.Error(err) - return - } - }() - - go func() { + err := http.Serve(ipfs.listener, smux) select { - case <-ipfs.ctx.Done(): - return + case <-ipfs.shutdownCh: + close(ipfs.doneCh) + default: + if err != nil { + logger.Error(err) + } } }() } @@ -131,6 +139,9 @@ func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC { // any requests. func (ipfs *IPFSHTTPConnector) Shutdown() error { logger.Info("Stopping IPFS Proxy") + close(ipfs.shutdownCh) + ipfs.listener.Close() + <-ipfs.doneCh return nil }