Use closable listeners for listening so we can have an orderly shutdown.

This commit is contained in:
Hector Sanjuan 2016-12-05 16:24:41 +01:00 committed by Hector Sanjuan
parent 2285f8d1a8
commit e746ccecb9
2 changed files with 49 additions and 25 deletions

39
api.go
View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"net/http" "net/http"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
@ -22,6 +23,11 @@ type ClusterHTTPAPI struct {
listenPort int listenPort int
rpcCh chan ClusterRPC rpcCh chan ClusterRPC
router *mux.Router router *mux.Router
listener net.Listener
doneCh chan bool
shutdownCh chan bool
} }
type route struct { type route struct {
@ -63,12 +69,22 @@ type pinListResp []pinElemResp
// started. // started.
func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.ClusterAPIListenAddr,
cfg.ClusterAPIListenPort))
if err != nil {
return nil, err
}
api := &ClusterHTTPAPI{ api := &ClusterHTTPAPI{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
listenAddr: cfg.ClusterAPIListenAddr, listenAddr: cfg.ClusterAPIListenAddr,
listenPort: cfg.ClusterAPIListenPort, listenPort: cfg.ClusterAPIListenPort,
listener: l,
rpcCh: make(chan ClusterRPC, RPCMaxQueue), rpcCh: make(chan ClusterRPC, RPCMaxQueue),
doneCh: make(chan bool),
shutdownCh: make(chan bool),
} }
router := mux.NewRouter().StrictSlash(true) router := mux.NewRouter().StrictSlash(true)
@ -123,20 +139,14 @@ func (api *ClusterHTTPAPI) routes() []route {
func (api *ClusterHTTPAPI) run() { func (api *ClusterHTTPAPI) run() {
go func() { go func() {
// FIXME: make this with closable net listener err := http.Serve(api.listener, api.router)
err := http.ListenAndServe(
fmt.Sprintf("%s:%d", api.listenAddr, api.listenPort),
api.router)
if err != nil {
logger.Error("starting ClusterHTTPAPI server:", err)
return
}
}()
// FIXME
go func() {
select { select {
case <-api.ctx.Done(): case <-api.shutdownCh:
return close(api.doneCh)
default:
if err != nil {
logger.Error(err)
}
} }
}() }()
} }
@ -145,6 +155,9 @@ func (api *ClusterHTTPAPI) run() {
func (api *ClusterHTTPAPI) Shutdown() error { func (api *ClusterHTTPAPI) Shutdown() error {
logger.Info("Stopping Cluster API") logger.Info("Stopping Cluster API")
api.cancel() api.cancel()
close(api.shutdownCh)
api.listener.Close()
<-api.doneCh
return nil return nil
} }

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"strings" "strings"
@ -31,6 +32,10 @@ type IPFSHTTPConnector struct {
listenPort int listenPort int
handlers map[string]func(http.ResponseWriter, *http.Request) handlers map[string]func(http.ResponseWriter, *http.Request)
rpcCh chan ClusterRPC rpcCh chan ClusterRPC
listener net.Listener
shutdownCh chan bool
doneCh chan bool
} }
type ipfsError struct { type ipfsError struct {
@ -40,6 +45,11 @@ type ipfsError struct {
// NewIPFSHTTPConnector creates the component and leaves it ready to be started // NewIPFSHTTPConnector creates the component and leaves it ready to be started
func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.IPFSAPIListenAddr, cfg.IPFSAPIListenPort))
if err != nil {
return nil, err
}
ipfs := &IPFSHTTPConnector{ ipfs := &IPFSHTTPConnector{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -49,6 +59,9 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
listenPort: cfg.IPFSAPIListenPort, listenPort: cfg.IPFSAPIListenPort,
handlers: make(map[string]func(http.ResponseWriter, *http.Request)), handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
rpcCh: make(chan ClusterRPC, RPCMaxQueue), rpcCh: make(chan ClusterRPC, RPCMaxQueue),
listener: l,
shutdownCh: make(chan bool),
doneCh: make(chan bool),
} }
logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort) logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort)
@ -104,19 +117,14 @@ func (ipfs *IPFSHTTPConnector) run() {
smux := http.NewServeMux() smux := http.NewServeMux()
smux.HandleFunc("/", ipfs.handle) smux.HandleFunc("/", ipfs.handle)
// Fixme: make this with closable net listener // Fixme: make this with closable net listener
err := http.ListenAndServe( err := http.Serve(ipfs.listener, smux)
fmt.Sprintf("%s:%d", ipfs.listenAddr, ipfs.listenPort),
smux)
if err != nil {
logger.Error(err)
return
}
}()
go func() {
select { select {
case <-ipfs.ctx.Done(): case <-ipfs.shutdownCh:
return close(ipfs.doneCh)
default:
if err != nil {
logger.Error(err)
}
} }
}() }()
} }
@ -131,6 +139,9 @@ func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC {
// any requests. // any requests.
func (ipfs *IPFSHTTPConnector) Shutdown() error { func (ipfs *IPFSHTTPConnector) Shutdown() error {
logger.Info("Stopping IPFS Proxy") logger.Info("Stopping IPFS Proxy")
close(ipfs.shutdownCh)
ipfs.listener.Close()
<-ipfs.doneCh
return nil return nil
} }