diff --git a/api.go b/api.go index 9274a4f7..69efe2c7 100644 --- a/api.go +++ b/api.go @@ -6,6 +6,8 @@ import ( "fmt" "net" "net/http" + "strings" + "sync" peer "github.com/libp2p/go-libp2p-peer" @@ -26,8 +28,9 @@ type ClusterHTTPAPI struct { listener net.Listener server *http.Server - doneCh chan struct{} - shutdownCh chan struct{} + shutdownLock sync.Mutex + shutdown bool + wg sync.WaitGroup } type route struct { @@ -87,8 +90,6 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { listener: l, server: s, rpcCh: make(chan ClusterRPC, RPCMaxQueue), - doneCh: make(chan struct{}), - shutdownCh: make(chan struct{}), } for _, route := range api.routes() { @@ -101,7 +102,7 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { api.router = router logger.Infof("Starting Cluster API on %s:%d", api.listenAddr, api.listenPort) - go api.run() + api.run() return api, nil } @@ -141,29 +142,37 @@ func (api *ClusterHTTPAPI) routes() []route { } func (api *ClusterHTTPAPI) run() { + api.wg.Add(1) go func() { + defer api.wg.Done() ctx, cancel := context.WithCancel(context.Background()) defer cancel() api.ctx = ctx err := api.server.Serve(api.listener) - select { - case <-api.shutdownCh: - close(api.doneCh) - default: - if err != nil { - logger.Error(err) - } + if err != nil && !strings.Contains(err.Error(), "closed network connection") { + logger.Error(err) } }() } // Shutdown stops any API listeners. func (api *ClusterHTTPAPI) Shutdown() error { + api.shutdownLock.Lock() + defer api.shutdownLock.Unlock() + + if api.shutdown { + logger.Debug("already shutdown") + return nil + } + logger.Info("Stopping Cluster API") - close(api.shutdownCh) + + // Cancel any outstanding ops api.server.SetKeepAlivesEnabled(false) api.listener.Close() - <-api.doneCh + + api.wg.Wait() + api.shutdown = true return nil } diff --git a/api_test.go b/api_test.go index c0716990..e74b1790 100644 --- a/api_test.go +++ b/api_test.go @@ -75,6 +75,7 @@ func TestAPIShutdown(t *testing.T) { if err != nil { t.Error("should shutdown cleanly: ", err) } + api.Shutdown() } func TestVersionEndpoint(t *testing.T) { diff --git a/cluster.go b/cluster.go index 6d94738e..77e0855f 100644 --- a/cluster.go +++ b/cluster.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" @@ -31,6 +32,11 @@ type Cluster struct { ipfs IPFSConnector state ClusterState tracker PinTracker + + shutdownLock sync.Mutex + shutdown bool + shutdownCh chan struct{} + wg sync.WaitGroup } // NewCluster builds a ready-to-start IPFS Cluster. It takes a ClusterAPI, @@ -50,19 +56,20 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl } cluster := &Cluster{ - ctx: ctx, - config: cfg, - host: host, - consensus: consensus, - api: api, - ipfs: ipfs, - state: state, - tracker: tracker, + ctx: ctx, + config: cfg, + host: host, + consensus: consensus, + api: api, + ipfs: ipfs, + state: state, + tracker: tracker, + shutdownCh: make(chan struct{}), } logger.Info("Starting IPFS Cluster") - go cluster.run() + cluster.run() logger.Info("Performing State synchronization") cluster.Sync() return cluster, nil @@ -70,6 +77,13 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl // Shutdown stops the IPFS cluster components func (c *Cluster) Shutdown() error { + c.shutdownLock.Lock() + defer c.shutdownLock.Unlock() + if c.shutdown { + logger.Warning("Cluster is already shutdown") + return nil + } + logger.Info("Shutting down IPFS Cluster") if err := c.consensus.Shutdown(); err != nil { logger.Errorf("Error stopping consensus: %s", err) @@ -88,6 +102,8 @@ func (c *Cluster) Shutdown() error { logger.Errorf("Error stopping PinTracker: %s", err) return err } + c.shutdownCh <- struct{}{} + c.wg.Wait() return nil } @@ -155,42 +171,44 @@ func (c *Cluster) Members() []peer.ID { // run reads from the RPC channels of the different components and launches // short-lived go-routines to handle any requests. func (c *Cluster) run() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c.ctx = ctx - ipfsCh := c.ipfs.RpcChan() - consensusCh := c.consensus.RpcChan() - apiCh := c.api.RpcChan() - trackerCh := c.tracker.RpcChan() + c.wg.Add(1) + go func() { + defer c.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c.ctx = ctx + ipfsCh := c.ipfs.RpcChan() + consensusCh := c.consensus.RpcChan() + apiCh := c.api.RpcChan() + trackerCh := c.tracker.RpcChan() - var op ClusterRPC - for { - select { - case op = <-ipfsCh: - goto HANDLEOP - case op = <-consensusCh: - goto HANDLEOP - case op = <-apiCh: - goto HANDLEOP - case op = <-trackerCh: - goto HANDLEOP - case <-c.ctx.Done(): - logger.Debug("Cluster is Done()") - return + var op ClusterRPC + for { + select { + case op = <-ipfsCh: + goto HANDLEOP + case op = <-consensusCh: + goto HANDLEOP + case op = <-apiCh: + goto HANDLEOP + case op = <-trackerCh: + goto HANDLEOP + case <-c.shutdownCh: + return + } + HANDLEOP: + switch op.(type) { + case *CidClusterRPC: + crpc := op.(*CidClusterRPC) + go c.handleCidRPC(crpc) + case *GenericClusterRPC: + grpc := op.(*GenericClusterRPC) + go c.handleGenericRPC(grpc) + default: + logger.Error("unknown ClusterRPC type") + } } - - HANDLEOP: - switch op.(type) { - case *CidClusterRPC: - crpc := op.(*CidClusterRPC) - go c.handleCidRPC(crpc) - case *GenericClusterRPC: - grpc := op.(*GenericClusterRPC) - go c.handleGenericRPC(grpc) - default: - logger.Error("unknown ClusterRPC type") - } - } + }() } func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) { diff --git a/cluster_test.go b/cluster_test.go index 6e94d64e..97bd4516 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -79,6 +79,7 @@ func testClusterShutdown(t *testing.T) { if err != nil { t.Error("cluster shutdown failed:", err) } + cl.Shutdown() cl, _, _, _, _ = testingCluster(t) err = cl.Shutdown() if err != nil { diff --git a/consensus.go b/consensus.go index 4b85c4de..ae3c9651 100644 --- a/consensus.go +++ b/consensus.go @@ -3,6 +3,8 @@ package ipfscluster import ( "context" "errors" + "strings" + "sync" "time" consensus "github.com/libp2p/go-libp2p-consensus" @@ -103,6 +105,11 @@ type ClusterConsensus struct { rpcCh chan ClusterRPC p2pRaft *libp2pRaftWrap + + shutdownLock sync.Mutex + shutdown bool + shutdownCh chan struct{} + wg sync.WaitGroup } // NewClusterConsensus builds a new ClusterConsensus component. The state @@ -124,14 +131,17 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) con.SetActor(actor) cc := &ClusterConsensus{ - ctx: ctx, - consensus: con, - baseOp: op, - actor: actor, - rpcCh: rpcCh, - p2pRaft: wrapper, + ctx: ctx, + consensus: con, + baseOp: op, + actor: actor, + rpcCh: rpcCh, + p2pRaft: wrapper, + shutdownCh: make(chan struct{}), } + cc.run() + // FIXME: this is broken. logger.Info("Waiting for Consensus state to catch up") time.Sleep(1 * time.Second) @@ -149,24 +159,64 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) return cc, nil } +func (cc *ClusterConsensus) run() { + cc.wg.Add(1) + go func() { + defer cc.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cc.ctx = ctx + cc.baseOp.ctx = ctx + <-cc.shutdownCh + }() +} + // Shutdown stops the component so it will not process any // more updates. The underlying consensus is permanently // shutdown, along with the libp2p transport. func (cc *ClusterConsensus) Shutdown() error { - logger.Info("Stopping Consensus component") - defer cc.p2pRaft.transport.Close() - defer cc.p2pRaft.boltdb.Close() // important! - // When we take snapshot, we make sure that - // we re-start from the previous state, and that - // we don't replay the log. This includes - // pin and pin certain stuff. - f := cc.p2pRaft.raft.Snapshot() - _ = f.Error() - f = cc.p2pRaft.raft.Shutdown() - err := f.Error() - if err != nil { - return err + cc.shutdownLock.Lock() + defer cc.shutdownLock.Unlock() + + if cc.shutdown { + logger.Debug("already shutdown") + return nil } + + logger.Info("Stopping Consensus component") + + // Cancel any outstanding makeRPCs + cc.shutdownCh <- struct{}{} + + // Raft shutdown + errMsgs := "" + + f := cc.p2pRaft.raft.Snapshot() + err := f.Error() + if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") { + errMsgs += "could not take snapshot: " + err.Error() + ".\n" + } + f = cc.p2pRaft.raft.Shutdown() + err = f.Error() + if err != nil { + errMsgs += "could not shutdown raft: " + err.Error() + ".\n" + } + err = cc.p2pRaft.transport.Close() + if err != nil { + errMsgs += "could not close libp2p transport: " + err.Error() + ".\n" + } + err = cc.p2pRaft.boltdb.Close() // important! + if err != nil { + errMsgs += "could not close boltdb: " + err.Error() + ".\n" + } + + if errMsgs != "" { + errMsgs += "Consensus shutdown unsucessful" + logger.Error(errMsgs) + return errors.New(errMsgs) + } + cc.wg.Wait() + cc.shutdown = true return nil } diff --git a/consensus_test.go b/consensus_test.go index 6e1433da..fb6d51b0 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -110,6 +110,7 @@ func TestShutdownClusterConsensus(t *testing.T) { if err != nil { t.Fatal("ClusterConsensus cannot shutdown:", err) } + cc.Shutdown() cc = testingClusterConsensus(t) err = cc.Shutdown() if err != nil { diff --git a/ipfs_connector.go b/ipfs_connector.go index ea06159f..8d051afe 100644 --- a/ipfs_connector.go +++ b/ipfs_connector.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "strings" + "sync" cid "github.com/ipfs/go-cid" ) @@ -35,8 +36,9 @@ type IPFSHTTPConnector struct { listener net.Listener server *http.Server - shutdownCh chan struct{} - doneCh chan struct{} + shutdownLock sync.Mutex + shutdown bool + wg sync.WaitGroup } type ipfsError struct { @@ -68,14 +70,12 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { rpcCh: make(chan ClusterRPC, RPCMaxQueue), listener: l, server: s, - shutdownCh: make(chan struct{}), - doneCh: make(chan struct{}), } smux.HandleFunc("/", ipfs.handle) logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort) - go ipfs.run() + ipfs.run() return ipfs, nil } @@ -123,18 +123,15 @@ func (ipfs *IPFSHTTPConnector) defaultHandler(w http.ResponseWriter, r *http.Req func (ipfs *IPFSHTTPConnector) run() { // This launches the proxy + ipfs.wg.Add(1) go func() { + defer ipfs.wg.Done() ctx, cancel := context.WithCancel(context.Background()) defer cancel() ipfs.ctx = ctx err := ipfs.server.Serve(ipfs.listener) - select { - case <-ipfs.shutdownCh: - close(ipfs.doneCh) - default: - if err != nil { - logger.Error(err) - } + if err != nil && !strings.Contains(err.Error(), "closed network connection") { + logger.Error(err) } }() } @@ -148,11 +145,21 @@ func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC { // Shutdown stops any listeners and stops the component from taking // any requests. func (ipfs *IPFSHTTPConnector) Shutdown() error { + ipfs.shutdownLock.Lock() + defer ipfs.shutdownLock.Unlock() + + if ipfs.shutdown { + logger.Debug("already shutdown") + return nil + } + logger.Info("Stopping IPFS Proxy") - close(ipfs.shutdownCh) + ipfs.server.SetKeepAlivesEnabled(false) ipfs.listener.Close() - <-ipfs.doneCh + + ipfs.wg.Wait() + ipfs.shutdown = true return nil } @@ -266,13 +273,13 @@ func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) { if resp.StatusCode != http.StatusOK { var msg string if decodeErr == nil { - msg = fmt.Sprintf("IPFS error: %d: %s", + msg = fmt.Sprintf("IPFS unsuccessful: %d: %s", resp.StatusCode, ipfsErr.Message) } else { - msg = fmt.Sprintf("IPFS error: %d: %s", + msg = fmt.Sprintf("IPFS-get unsuccessful: %d: %s", resp.StatusCode, body) } - logger.Error(msg) + logger.Warning(msg) return body, errors.New(msg) } return body, nil diff --git a/ipfs_connector_test.go b/ipfs_connector_test.go index f262aaa2..5d38ebbe 100644 --- a/ipfs_connector_test.go +++ b/ipfs_connector_test.go @@ -76,7 +76,7 @@ func TestNewIPFSHTTPConnector(t *testing.T) { } } -func TestPin(t *testing.T) { +func TestIPFSPin(t *testing.T) { ipfs, ts := ipfsConnector(t) defer ts.Close() defer ipfs.Shutdown() @@ -92,7 +92,7 @@ func TestPin(t *testing.T) { } } -func TestUnpin(t *testing.T) { +func TestIPFSUnpin(t *testing.T) { ipfs, ts := ipfsConnector(t) defer ts.Close() defer ipfs.Shutdown() @@ -148,10 +148,11 @@ func TestProxy(t *testing.T) { } } -func TestShutdown(t *testing.T) { +func TestIPFSShutdown(t *testing.T) { ipfs, ts := ipfsConnector(t) defer ts.Close() if err := ipfs.Shutdown(); err != nil { t.Error("expected a clean shutdown") } + ipfs.Shutdown() } diff --git a/state.go b/state.go index ae12b962..8becd773 100644 --- a/state.go +++ b/state.go @@ -9,9 +9,10 @@ import ( // MapState is a very simple database to store // the state of the system. type MapState struct { + mux sync.RWMutex PinMap map[string]struct{} - rpcCh chan ClusterRPC - mux sync.Mutex + + rpcCh chan ClusterRPC } func NewMapState() *MapState { @@ -37,8 +38,8 @@ func (st *MapState) RmPin(c *cid.Cid) error { } func (st *MapState) ListPins() []*cid.Cid { - st.mux.Lock() - defer st.mux.Unlock() + st.mux.RLock() + defer st.mux.RUnlock() cids := make([]*cid.Cid, 0, len(st.PinMap)) for k, _ := range st.PinMap { c, _ := cid.Decode(k) diff --git a/tracker.go b/tracker.go index 7d4795c0..b32a4b1f 100644 --- a/tracker.go +++ b/tracker.go @@ -32,12 +32,14 @@ type PinStatus int type MapPinTracker struct { mux sync.Mutex status map[string]Pin - rpcCh chan ClusterRPC - shutdownCh chan struct{} - doneCh chan struct{} + ctx context.Context + rpcCh chan ClusterRPC - ctx context.Context + shutdownLock sync.Mutex + shutdown bool + shutdownCh chan struct{} + wg sync.WaitGroup } func NewMapPinTracker() *MapPinTracker { @@ -45,26 +47,38 @@ func NewMapPinTracker() *MapPinTracker { mpt := &MapPinTracker{ status: make(map[string]Pin), rpcCh: make(chan ClusterRPC, RPCMaxQueue), - shutdownCh: make(chan struct{}), - doneCh: make(chan struct{}), ctx: ctx, + shutdownCh: make(chan struct{}), } - go mpt.run() + mpt.run() return mpt } func (mpt *MapPinTracker) run() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mpt.ctx = ctx - for { - select { - case <-mpt.shutdownCh: - close(mpt.doneCh) - return - } + mpt.wg.Add(1) + go func() { + defer mpt.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mpt.ctx = ctx + <-mpt.shutdownCh + }() +} + +func (mpt *MapPinTracker) Shutdown() error { + mpt.shutdownLock.Lock() + defer mpt.shutdownLock.Unlock() + + if mpt.shutdown { + logger.Debug("already shutdown") + return nil } - // Great plans for this thread + + logger.Info("Stopping MapPinTracker") + mpt.shutdownCh <- struct{}{} + mpt.wg.Wait() + mpt.shutdown = true + return nil } func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error { @@ -278,13 +292,6 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin { return changed } -func (mpt *MapPinTracker) Shutdown() error { - logger.Info("Stopping MapPinTracker") - close(mpt.shutdownCh) - <-mpt.doneCh - return nil -} - func (mpt *MapPinTracker) RpcChan() <-chan ClusterRPC { return mpt.rpcCh }