Improve shutdown routines

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-15 14:07:19 +01:00
parent 38d878ee5f
commit a655288fd6
10 changed files with 220 additions and 124 deletions

37
api.go
View File

@ -6,6 +6,8 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"
peer "github.com/libp2p/go-libp2p-peer"
@ -26,8 +28,9 @@ type ClusterHTTPAPI struct {
listener net.Listener
server *http.Server
doneCh chan struct{}
shutdownCh chan struct{}
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
type route struct {
@ -87,8 +90,6 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
listener: l,
server: s,
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
doneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
}
for _, route := range api.routes() {
@ -101,7 +102,7 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
api.router = router
logger.Infof("Starting Cluster API on %s:%d", api.listenAddr, api.listenPort)
go api.run()
api.run()
return api, nil
}
@ -141,29 +142,37 @@ func (api *ClusterHTTPAPI) routes() []route {
}
func (api *ClusterHTTPAPI) run() {
api.wg.Add(1)
go func() {
defer api.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
api.ctx = ctx
err := api.server.Serve(api.listener)
select {
case <-api.shutdownCh:
close(api.doneCh)
default:
if err != nil {
logger.Error(err)
}
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
logger.Error(err)
}
}()
}
// Shutdown stops any API listeners.
func (api *ClusterHTTPAPI) Shutdown() error {
api.shutdownLock.Lock()
defer api.shutdownLock.Unlock()
if api.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("Stopping Cluster API")
close(api.shutdownCh)
// Cancel any outstanding ops
api.server.SetKeepAlivesEnabled(false)
api.listener.Close()
<-api.doneCh
api.wg.Wait()
api.shutdown = true
return nil
}

View File

@ -75,6 +75,7 @@ func TestAPIShutdown(t *testing.T) {
if err != nil {
t.Error("should shutdown cleanly: ", err)
}
api.Shutdown()
}
func TestVersionEndpoint(t *testing.T) {

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
@ -31,6 +32,11 @@ type Cluster struct {
ipfs IPFSConnector
state ClusterState
tracker PinTracker
shutdownLock sync.Mutex
shutdown bool
shutdownCh chan struct{}
wg sync.WaitGroup
}
// NewCluster builds a ready-to-start IPFS Cluster. It takes a ClusterAPI,
@ -50,19 +56,20 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
}
cluster := &Cluster{
ctx: ctx,
config: cfg,
host: host,
consensus: consensus,
api: api,
ipfs: ipfs,
state: state,
tracker: tracker,
ctx: ctx,
config: cfg,
host: host,
consensus: consensus,
api: api,
ipfs: ipfs,
state: state,
tracker: tracker,
shutdownCh: make(chan struct{}),
}
logger.Info("Starting IPFS Cluster")
go cluster.run()
cluster.run()
logger.Info("Performing State synchronization")
cluster.Sync()
return cluster, nil
@ -70,6 +77,13 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
// Shutdown stops the IPFS cluster components
func (c *Cluster) Shutdown() error {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdown {
logger.Warning("Cluster is already shutdown")
return nil
}
logger.Info("Shutting down IPFS Cluster")
if err := c.consensus.Shutdown(); err != nil {
logger.Errorf("Error stopping consensus: %s", err)
@ -88,6 +102,8 @@ func (c *Cluster) Shutdown() error {
logger.Errorf("Error stopping PinTracker: %s", err)
return err
}
c.shutdownCh <- struct{}{}
c.wg.Wait()
return nil
}
@ -155,42 +171,44 @@ func (c *Cluster) Members() []peer.ID {
// run reads from the RPC channels of the different components and launches
// short-lived go-routines to handle any requests.
func (c *Cluster) run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.ctx = ctx
ipfsCh := c.ipfs.RpcChan()
consensusCh := c.consensus.RpcChan()
apiCh := c.api.RpcChan()
trackerCh := c.tracker.RpcChan()
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.ctx = ctx
ipfsCh := c.ipfs.RpcChan()
consensusCh := c.consensus.RpcChan()
apiCh := c.api.RpcChan()
trackerCh := c.tracker.RpcChan()
var op ClusterRPC
for {
select {
case op = <-ipfsCh:
goto HANDLEOP
case op = <-consensusCh:
goto HANDLEOP
case op = <-apiCh:
goto HANDLEOP
case op = <-trackerCh:
goto HANDLEOP
case <-c.ctx.Done():
logger.Debug("Cluster is Done()")
return
var op ClusterRPC
for {
select {
case op = <-ipfsCh:
goto HANDLEOP
case op = <-consensusCh:
goto HANDLEOP
case op = <-apiCh:
goto HANDLEOP
case op = <-trackerCh:
goto HANDLEOP
case <-c.shutdownCh:
return
}
HANDLEOP:
switch op.(type) {
case *CidClusterRPC:
crpc := op.(*CidClusterRPC)
go c.handleCidRPC(crpc)
case *GenericClusterRPC:
grpc := op.(*GenericClusterRPC)
go c.handleGenericRPC(grpc)
default:
logger.Error("unknown ClusterRPC type")
}
}
HANDLEOP:
switch op.(type) {
case *CidClusterRPC:
crpc := op.(*CidClusterRPC)
go c.handleCidRPC(crpc)
case *GenericClusterRPC:
grpc := op.(*GenericClusterRPC)
go c.handleGenericRPC(grpc)
default:
logger.Error("unknown ClusterRPC type")
}
}
}()
}
func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {

View File

@ -79,6 +79,7 @@ func testClusterShutdown(t *testing.T) {
if err != nil {
t.Error("cluster shutdown failed:", err)
}
cl.Shutdown()
cl, _, _, _, _ = testingCluster(t)
err = cl.Shutdown()
if err != nil {

View File

@ -3,6 +3,8 @@ package ipfscluster
import (
"context"
"errors"
"strings"
"sync"
"time"
consensus "github.com/libp2p/go-libp2p-consensus"
@ -103,6 +105,11 @@ type ClusterConsensus struct {
rpcCh chan ClusterRPC
p2pRaft *libp2pRaftWrap
shutdownLock sync.Mutex
shutdown bool
shutdownCh chan struct{}
wg sync.WaitGroup
}
// NewClusterConsensus builds a new ClusterConsensus component. The state
@ -124,14 +131,17 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
con.SetActor(actor)
cc := &ClusterConsensus{
ctx: ctx,
consensus: con,
baseOp: op,
actor: actor,
rpcCh: rpcCh,
p2pRaft: wrapper,
ctx: ctx,
consensus: con,
baseOp: op,
actor: actor,
rpcCh: rpcCh,
p2pRaft: wrapper,
shutdownCh: make(chan struct{}),
}
cc.run()
// FIXME: this is broken.
logger.Info("Waiting for Consensus state to catch up")
time.Sleep(1 * time.Second)
@ -149,24 +159,64 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
return cc, nil
}
func (cc *ClusterConsensus) run() {
cc.wg.Add(1)
go func() {
defer cc.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc.ctx = ctx
cc.baseOp.ctx = ctx
<-cc.shutdownCh
}()
}
// Shutdown stops the component so it will not process any
// more updates. The underlying consensus is permanently
// shutdown, along with the libp2p transport.
func (cc *ClusterConsensus) Shutdown() error {
logger.Info("Stopping Consensus component")
defer cc.p2pRaft.transport.Close()
defer cc.p2pRaft.boltdb.Close() // important!
// When we take snapshot, we make sure that
// we re-start from the previous state, and that
// we don't replay the log. This includes
// pin and pin certain stuff.
f := cc.p2pRaft.raft.Snapshot()
_ = f.Error()
f = cc.p2pRaft.raft.Shutdown()
err := f.Error()
if err != nil {
return err
cc.shutdownLock.Lock()
defer cc.shutdownLock.Unlock()
if cc.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("Stopping Consensus component")
// Cancel any outstanding makeRPCs
cc.shutdownCh <- struct{}{}
// Raft shutdown
errMsgs := ""
f := cc.p2pRaft.raft.Snapshot()
err := f.Error()
if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") {
errMsgs += "could not take snapshot: " + err.Error() + ".\n"
}
f = cc.p2pRaft.raft.Shutdown()
err = f.Error()
if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
}
err = cc.p2pRaft.transport.Close()
if err != nil {
errMsgs += "could not close libp2p transport: " + err.Error() + ".\n"
}
err = cc.p2pRaft.boltdb.Close() // important!
if err != nil {
errMsgs += "could not close boltdb: " + err.Error() + ".\n"
}
if errMsgs != "" {
errMsgs += "Consensus shutdown unsucessful"
logger.Error(errMsgs)
return errors.New(errMsgs)
}
cc.wg.Wait()
cc.shutdown = true
return nil
}

View File

@ -110,6 +110,7 @@ func TestShutdownClusterConsensus(t *testing.T) {
if err != nil {
t.Fatal("ClusterConsensus cannot shutdown:", err)
}
cc.Shutdown()
cc = testingClusterConsensus(t)
err = cc.Shutdown()
if err != nil {

View File

@ -10,6 +10,7 @@ import (
"net"
"net/http"
"strings"
"sync"
cid "github.com/ipfs/go-cid"
)
@ -35,8 +36,9 @@ type IPFSHTTPConnector struct {
listener net.Listener
server *http.Server
shutdownCh chan struct{}
doneCh chan struct{}
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
type ipfsError struct {
@ -68,14 +70,12 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
listener: l,
server: s,
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
smux.HandleFunc("/", ipfs.handle)
logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort)
go ipfs.run()
ipfs.run()
return ipfs, nil
}
@ -123,18 +123,15 @@ func (ipfs *IPFSHTTPConnector) defaultHandler(w http.ResponseWriter, r *http.Req
func (ipfs *IPFSHTTPConnector) run() {
// This launches the proxy
ipfs.wg.Add(1)
go func() {
defer ipfs.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ipfs.ctx = ctx
err := ipfs.server.Serve(ipfs.listener)
select {
case <-ipfs.shutdownCh:
close(ipfs.doneCh)
default:
if err != nil {
logger.Error(err)
}
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
logger.Error(err)
}
}()
}
@ -148,11 +145,21 @@ func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC {
// Shutdown stops any listeners and stops the component from taking
// any requests.
func (ipfs *IPFSHTTPConnector) Shutdown() error {
ipfs.shutdownLock.Lock()
defer ipfs.shutdownLock.Unlock()
if ipfs.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("Stopping IPFS Proxy")
close(ipfs.shutdownCh)
ipfs.server.SetKeepAlivesEnabled(false)
ipfs.listener.Close()
<-ipfs.doneCh
ipfs.wg.Wait()
ipfs.shutdown = true
return nil
}
@ -266,13 +273,13 @@ func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) {
if resp.StatusCode != http.StatusOK {
var msg string
if decodeErr == nil {
msg = fmt.Sprintf("IPFS error: %d: %s",
msg = fmt.Sprintf("IPFS unsuccessful: %d: %s",
resp.StatusCode, ipfsErr.Message)
} else {
msg = fmt.Sprintf("IPFS error: %d: %s",
msg = fmt.Sprintf("IPFS-get unsuccessful: %d: %s",
resp.StatusCode, body)
}
logger.Error(msg)
logger.Warning(msg)
return body, errors.New(msg)
}
return body, nil

View File

@ -76,7 +76,7 @@ func TestNewIPFSHTTPConnector(t *testing.T) {
}
}
func TestPin(t *testing.T) {
func TestIPFSPin(t *testing.T) {
ipfs, ts := ipfsConnector(t)
defer ts.Close()
defer ipfs.Shutdown()
@ -92,7 +92,7 @@ func TestPin(t *testing.T) {
}
}
func TestUnpin(t *testing.T) {
func TestIPFSUnpin(t *testing.T) {
ipfs, ts := ipfsConnector(t)
defer ts.Close()
defer ipfs.Shutdown()
@ -148,10 +148,11 @@ func TestProxy(t *testing.T) {
}
}
func TestShutdown(t *testing.T) {
func TestIPFSShutdown(t *testing.T) {
ipfs, ts := ipfsConnector(t)
defer ts.Close()
if err := ipfs.Shutdown(); err != nil {
t.Error("expected a clean shutdown")
}
ipfs.Shutdown()
}

View File

@ -9,9 +9,10 @@ import (
// MapState is a very simple database to store
// the state of the system.
type MapState struct {
mux sync.RWMutex
PinMap map[string]struct{}
rpcCh chan ClusterRPC
mux sync.Mutex
rpcCh chan ClusterRPC
}
func NewMapState() *MapState {
@ -37,8 +38,8 @@ func (st *MapState) RmPin(c *cid.Cid) error {
}
func (st *MapState) ListPins() []*cid.Cid {
st.mux.Lock()
defer st.mux.Unlock()
st.mux.RLock()
defer st.mux.RUnlock()
cids := make([]*cid.Cid, 0, len(st.PinMap))
for k, _ := range st.PinMap {
c, _ := cid.Decode(k)

View File

@ -32,12 +32,14 @@ type PinStatus int
type MapPinTracker struct {
mux sync.Mutex
status map[string]Pin
rpcCh chan ClusterRPC
shutdownCh chan struct{}
doneCh chan struct{}
ctx context.Context
rpcCh chan ClusterRPC
ctx context.Context
shutdownLock sync.Mutex
shutdown bool
shutdownCh chan struct{}
wg sync.WaitGroup
}
func NewMapPinTracker() *MapPinTracker {
@ -45,26 +47,38 @@ func NewMapPinTracker() *MapPinTracker {
mpt := &MapPinTracker{
status: make(map[string]Pin),
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
ctx: ctx,
shutdownCh: make(chan struct{}),
}
go mpt.run()
mpt.run()
return mpt
}
func (mpt *MapPinTracker) run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mpt.ctx = ctx
for {
select {
case <-mpt.shutdownCh:
close(mpt.doneCh)
return
}
mpt.wg.Add(1)
go func() {
defer mpt.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mpt.ctx = ctx
<-mpt.shutdownCh
}()
}
func (mpt *MapPinTracker) Shutdown() error {
mpt.shutdownLock.Lock()
defer mpt.shutdownLock.Unlock()
if mpt.shutdown {
logger.Debug("already shutdown")
return nil
}
// Great plans for this thread
logger.Info("Stopping MapPinTracker")
mpt.shutdownCh <- struct{}{}
mpt.wg.Wait()
mpt.shutdown = true
return nil
}
func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error {
@ -278,13 +292,6 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin {
return changed
}
func (mpt *MapPinTracker) Shutdown() error {
logger.Info("Stopping MapPinTracker")
close(mpt.shutdownCh)
<-mpt.doneCh
return nil
}
func (mpt *MapPinTracker) RpcChan() <-chan ClusterRPC {
return mpt.rpcCh
}