From 34b2b6cbd1fb38a4f3110492abab9ea6c999dd12 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 9 Dec 2016 20:54:46 +0100 Subject: [PATCH] Sync between tracker and cluster state. go vet. tests. License: MIT Signed-off-by: Hector Sanjuan --- Makefile | 7 +- api.go | 35 ++++--- api_test.go | 215 +++++++++++++++++++++++++++++++++++++++++ cluster.go | 26 ++++- consensus.go | 38 +++++++- ipfs_cluster.go | 6 ++ ipfs_cluster_test.go | 9 +- ipfs_connector.go | 35 ++++--- ipfs_connector_test.go | 1 - state.go | 28 ++++-- tracker.go | 100 ++++++++++++++----- 11 files changed, 427 insertions(+), 73 deletions(-) create mode 100644 api_test.go diff --git a/Makefile b/Makefile index 8d8fe929..fdb0230b 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,12 @@ all: deps gx: go get github.com/whyrusleeping/gx go get github.com/whyrusleeping/gx-go -deps: gx +deps: gx + go get github.com/gorilla/mux + go get github.com/hashicorp/raft + go get github.com/hashicorp/raft-boltdb + go get github.com/ugorji/go/codec + gx --verbose install --global gx-go rewrite test: deps diff --git a/api.go b/api.go index 538b95a9..bd0246e3 100644 --- a/api.go +++ b/api.go @@ -18,16 +18,16 @@ import ( // a RESTful HTTP API for Cluster. type ClusterHTTPAPI struct { ctx context.Context - cancel context.CancelFunc listenAddr string listenPort int rpcCh chan ClusterRPC router *mux.Router listener net.Listener + server *http.Server - doneCh chan bool - shutdownCh chan bool + doneCh chan struct{} + shutdownCh chan struct{} } type route struct { @@ -68,7 +68,7 @@ type pinListResp []pinElemResp // NewHTTPClusterAPI creates a new object which is ready to be // started. func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ClusterAPIListenAddr, cfg.ClusterAPIListenPort)) @@ -76,18 +76,21 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { return nil, err } + router := mux.NewRouter().StrictSlash(true) + s := &http.Server{Handler: router} + s.SetKeepAlivesEnabled(true) // A reminder that this can be changed + api := &ClusterHTTPAPI{ ctx: ctx, - cancel: cancel, listenAddr: cfg.ClusterAPIListenAddr, listenPort: cfg.ClusterAPIListenPort, listener: l, + server: s, rpcCh: make(chan ClusterRPC, RPCMaxQueue), - doneCh: make(chan bool), - shutdownCh: make(chan bool), + doneCh: make(chan struct{}), + shutdownCh: make(chan struct{}), } - router := mux.NewRouter().StrictSlash(true) for _, route := range api.routes() { router. Methods(route.Method). @@ -139,7 +142,10 @@ func (api *ClusterHTTPAPI) routes() []route { func (api *ClusterHTTPAPI) run() { go func() { - err := http.Serve(api.listener, api.router) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + api.ctx = ctx + err := api.server.Serve(api.listener) select { case <-api.shutdownCh: close(api.doneCh) @@ -154,8 +160,8 @@ func (api *ClusterHTTPAPI) run() { // Shutdown stops any API listeners. func (api *ClusterHTTPAPI) Shutdown() error { logger.Info("Stopping Cluster API") - api.cancel() close(api.shutdownCh) + api.server.SetKeepAlivesEnabled(false) api.listener.Close() <-api.doneCh return nil @@ -200,7 +206,8 @@ func (api *ClusterHTTPAPI) pinHandler(w http.ResponseWriter, r *http.Request) { hash := vars["hash"] c, err := cid.Decode(hash) if err != nil { - sendErrorResponse(w, 400, err.Error()) + sendErrorResponse(w, 400, "error decoding Cid: "+err.Error()) + return } rRpc := RPC(PinRPC, c) @@ -218,7 +225,8 @@ func (api *ClusterHTTPAPI) unpinHandler(w http.ResponseWriter, r *http.Request) hash := vars["hash"] c, err := cid.Decode(hash) if err != nil { - sendErrorResponse(w, 400, err.Error()) + sendErrorResponse(w, 400, "error decoding Cid: "+err.Error()) + return } rRpc := RPC(UnpinRPC, c) @@ -288,7 +296,8 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool { ok = false } if !ok { - logger.Error("unexpected RPC Response format") + logger.Errorf("unexpected RPC Response format for %d:", op) + logger.Errorf("%+v", resp.Data) sendErrorResponse(w, 500, "Unexpected RPC Response format") return false } diff --git a/api_test.go b/api_test.go new file mode 100644 index 00000000..cc4b2837 --- /dev/null +++ b/api_test.go @@ -0,0 +1,215 @@ +package ipfscluster + +import ( + "bytes" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + "testing" + + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" +) + +var ( + apiHost = "http://127.0.0.1:5000" +) + +func testClusterApi(t *testing.T) *ClusterHTTPAPI { + //logging.SetDebugLogging() + cfg := &ClusterConfig{ + ClusterAPIListenAddr: "127.0.0.1", + ClusterAPIListenPort: 5000, + } + api, err := NewHTTPClusterAPI(cfg) + // No keep alive! Otherwise tests hang with + // connections re-used from previous tests + api.server.SetKeepAlivesEnabled(false) + if err != nil { + t.Fatal("should be able to create a new Api: ", err) + } + + if api.RpcChan() == nil { + t.Fatal("should create the Rpc channel") + } + return api +} + +func simulateAnswer(ch <-chan ClusterRPC, answer interface{}, err error) { + go func() { + req := <-ch + req.ResponseCh() <- RPCResponse{ + Data: answer, + Error: err, + } + }() +} + +func processResp(t *testing.T, httpResp *http.Response, err error, resp interface{}) { + if err != nil { + t.Fatal("error making get request: ", err) + } + body, err := ioutil.ReadAll(httpResp.Body) + defer httpResp.Body.Close() + if err != nil { + t.Fatal("error reading body: ", err) + } + + if len(body) != 0 { + err = json.Unmarshal(body, resp) + if err != nil { + t.Error(string(body)) + t.Fatal("error parsing json: ", err) + } + } +} + +func makeGet(t *testing.T, path string, resp interface{}) { + httpResp, err := http.Get(apiHost + path) + processResp(t, httpResp, err, resp) +} + +func makePost(t *testing.T, path string, resp interface{}) { + httpResp, err := http.Post(apiHost+path, "application/json", bytes.NewReader([]byte{})) + processResp(t, httpResp, err, resp) +} + +func makeDelete(t *testing.T, path string, resp interface{}) { + req, _ := http.NewRequest("DELETE", apiHost+path, bytes.NewReader([]byte{})) + c := &http.Client{} + httpResp, err := c.Do(req) + processResp(t, httpResp, err, resp) +} + +func TestAPIShutdown(t *testing.T) { + api := testClusterApi(t) + err := api.Shutdown() + if err != nil { + t.Error("should shutdown cleanly: ", err) + } +} + +func TestVersionEndpoint(t *testing.T) { + api := testClusterApi(t) + api.server.SetKeepAlivesEnabled(false) + defer api.Shutdown() + simulateAnswer(api.RpcChan(), "v", nil) + ver := versionResp{} + makeGet(t, "/version", &ver) + if ver.Version != "v" { + t.Error("expected correct version") + } + + simulateAnswer(api.RpcChan(), nil, errors.New("an error")) + errResp := errorResp{} + makeGet(t, "/version", &errResp) + if errResp.Message != "an error" { + t.Error("expected different error") + } +} + +func TestMemberListEndpoint(t *testing.T) { + api := testClusterApi(t) + api.server.SetKeepAlivesEnabled(false) + defer api.Shutdown() + pList := []peer.ID{ + testPeerID, + } + simulateAnswer(api.RpcChan(), pList, nil) + var list []string + makeGet(t, "/members", &list) + if len(list) != 1 || list[0] != testPeerID.Pretty() { + t.Error("expected a peer id list: ", list) + } + + simulateAnswer(api.RpcChan(), nil, errors.New("an error")) + errResp := errorResp{} + makeGet(t, "/members", &errResp) + if errResp.Message != "an error" { + t.Error("expected different error") + } +} + +func TestPinEndpoint(t *testing.T) { + api := testClusterApi(t) + defer api.Shutdown() + simulateAnswer(api.RpcChan(), nil, nil) + var i interface{} = nil + makePost(t, "/pins/"+testCid, &i) + if i != nil { + t.Error("pin should have returned an empty response") + } + + simulateAnswer(api.RpcChan(), nil, errors.New("an error")) + errResp := errorResp{} + makePost(t, "/pins/"+testCid2, &errResp) + if errResp.Message != "an error" { + t.Error("expected different error") + } + + makePost(t, "/pins/abcd", &errResp) + if errResp.Code != 400 { + t.Error("should fail with wrong Cid") + } +} + +func TestUnpinEndpoint(t *testing.T) { + api := testClusterApi(t) + defer api.Shutdown() + simulateAnswer(api.RpcChan(), nil, nil) + var i interface{} = nil + makeDelete(t, "/pins/"+testCid, &i) + if i != nil { + t.Error("pin should have returned an empty response") + } + + simulateAnswer(api.RpcChan(), nil, errors.New("an error")) + errResp := errorResp{} + makeDelete(t, "/pins/"+testCid2, &errResp) + if errResp.Message != "an error" { + t.Error("expected different error") + } + + makeDelete(t, "/pins/abcd", &errResp) + if errResp.Code != 400 { + t.Error("should fail with wrong Cid") + } +} + +func TestPinListEndpoint(t *testing.T) { + c, _ := cid.Decode(testCid) + c2, _ := cid.Decode(testCid2) + c3, _ := cid.Decode(testCid3) + api := testClusterApi(t) + defer api.Shutdown() + pList := []Pin{ + Pin{ + Status: PinError, + Cid: c, + }, + Pin{ + Status: UnpinError, + Cid: c, + }, + Pin{ + Status: Pinned, + Cid: c3, + }, + Pin{ + Status: Pinning, + Cid: c, + }, + Pin{ + Status: Unpinning, + Cid: c2, + }, + } + + simulateAnswer(api.RpcChan(), pList, nil) + var resp pinListResp + makeGet(t, "/pins", &resp) + if len(resp) != 5 { + t.Error("unexpected pinListResp: ", resp) + } +} diff --git a/cluster.go b/cluster.go index 24e018bf..741d9b39 100644 --- a/cluster.go +++ b/cluster.go @@ -21,8 +21,7 @@ import ( // Cluster is the main IPFS cluster component. It provides // the go-API for it and orchestrates the componenets that make up the system. type Cluster struct { - ctx context.Context - cancel context.CancelFunc + ctx context.Context config *ClusterConfig host host.Host @@ -38,7 +37,7 @@ type Cluster struct { // an IPFSConnector and a ClusterState as parameters, allowing the user, // to provide custom implementations of these components. func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state ClusterState, tracker PinTracker) (*Cluster, error) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() host, err := makeHost(ctx, cfg) if err != nil { return nil, err @@ -52,7 +51,6 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl cluster := &Cluster{ ctx: ctx, - cancel: cancel, config: cfg, host: host, consensus: consensus, @@ -63,6 +61,9 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl } logger.Info("Starting IPFS Cluster") + + logger.Info("Performing State synchronization") + cluster.Sync() go cluster.run() return cluster, nil } @@ -87,7 +88,19 @@ func (c *Cluster) Shutdown() error { logger.Errorf("Error stopping PinTracker: %s", err) return err } - c.cancel() + return nil +} + +func (c *Cluster) Sync() error { + cState, err := c.consensus.State() + if err != nil { + return err + } + changed := c.tracker.SyncState(cState) + for _, p := range changed { + logger.Debugf("Recovering %s", p.Cid) + c.tracker.Recover(p.Cid) + } return nil } @@ -142,6 +155,9 @@ func (c *Cluster) Members() []peer.ID { // run reads from the RPC channels of the different components and launches // short-lived go-routines to handle any requests. func (c *Cluster) run() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c.ctx = ctx ipfsCh := c.ipfs.RpcChan() consensusCh := c.consensus.RpcChan() apiCh := c.api.RpcChan() diff --git a/consensus.go b/consensus.go index ee2e5e18..a5891d08 100644 --- a/consensus.go +++ b/consensus.go @@ -3,6 +3,7 @@ package ipfscluster import ( "context" "errors" + "time" host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" consensus "gx/ipfs/QmZ88KbrvZMJpXaNwAGffswcYKz8EbeafzAFGMCA6MEZKt/go-libp2p-consensus" @@ -25,6 +26,10 @@ const ( type clusterLogOpType int +// We will wait for the consensus state to be updated up to this +// amount of seconds. +var MaxStartupDelay = 10 * time.Second + // clusterLogOp represents an operation for the OpLogConsensus system. // It implements the consensus.Op interface. type clusterLogOp struct { @@ -90,8 +95,7 @@ ROLLBACK: // the members of an IPFS Cluster, as well as modifying that state and // applying any updates in a thread-safe manner. type ClusterConsensus struct { - ctx context.Context - cancel context.CancelFunc + ctx context.Context consensus consensus.OpLogConsensus actor consensus.Actor @@ -106,7 +110,7 @@ type ClusterConsensus struct { // is discarded. func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) (*ClusterConsensus, error) { logger.Info("Starting Consensus component") - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() rpcCh := make(chan ClusterRPC, RPCMaxQueue) op := clusterLogOp{ ctx: ctx, @@ -121,12 +125,25 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) cc := &ClusterConsensus{ ctx: ctx, - cancel: cancel, consensus: con, actor: actor, rpcCh: rpcCh, p2pRaft: wrapper, } + + logger.Info("Waiting for Consensus state to catch up") + time.Sleep(1 * time.Second) + start := time.Now() + for { + time.Sleep(500 * time.Millisecond) + li := wrapper.raft.LastIndex() + lai := wrapper.raft.AppliedIndex() + if lai == li || time.Since(start) > MaxStartupDelay { + break + } + logger.Debugf("Waiting for Raft index: %d/%d", lai, li) + } + return cc, nil } @@ -135,7 +152,6 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) // shutdown, along with the libp2p transport. func (cc *ClusterConsensus) Shutdown() error { logger.Info("Stopping Consensus component") - cc.cancel() // When we take snapshot, we make sure that // we re-start from the previous state, and that @@ -190,6 +206,18 @@ func (cc *ClusterConsensus) RmPin(c *cid.Cid) error { return nil } +func (cc *ClusterConsensus) State() (ClusterState, error) { + st, err := cc.consensus.GetLogHead() + if err != nil { + return nil, err + } + state, ok := st.(ClusterState) + if !ok { + return nil, errors.New("Wrong state type") + } + return state, nil +} + // Leader() returns the peerID of the Leader of the // cluster. func (cc *ClusterConsensus) Leader() peer.ID { diff --git a/ipfs_cluster.go b/ipfs_cluster.go index e320621f..63e27345 100644 --- a/ipfs_cluster.go +++ b/ipfs_cluster.go @@ -73,6 +73,8 @@ type ClusterState interface { AddPin(*cid.Cid) error // RmPin removes a pin from the ClusterState RmPin(*cid.Cid) error + // ListPins lists all the pins in the state + ListPins() []*cid.Cid } // PinTracker represents a component which tracks the status of @@ -104,6 +106,9 @@ type PinTracker interface { Recover(*cid.Cid) error // SyncAll runs Sync() on every known Pin. It returns a list of changed Pins SyncAll() []Pin + // SyncState makes sure that the tracked Pins matches those in the + // cluster state and runs SyncAll(). It returns a list of changed Pins. + SyncState(ClusterState) []Pin } // MakeRPC sends a ClusterRPC object over a channel and optionally waits for a @@ -132,6 +137,7 @@ func MakeRPC(ctx context.Context, rpcCh chan ClusterRPC, r ClusterRPC, waitForRe } } if !waitForResponse { + logger.Debug("Not waiting for response. Returning directly") return RPCResponse{} } diff --git a/ipfs_cluster_test.go b/ipfs_cluster_test.go index 886d0e8c..35852c1e 100644 --- a/ipfs_cluster_test.go +++ b/ipfs_cluster_test.go @@ -5,12 +5,15 @@ import ( "strings" "testing" "time" + + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) var ( - testCid = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq" - testCid2 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma" - testCid3 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb" + testCid = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq" + testCid2 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma" + testCid3 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb" + testPeerID, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") ) func TestMakeRPC(t *testing.T) { diff --git a/ipfs_connector.go b/ipfs_connector.go index b9247e5a..1c2f0a60 100644 --- a/ipfs_connector.go +++ b/ipfs_connector.go @@ -25,17 +25,18 @@ import ( // against the configured IPFS daemom (such as a pin request). type IPFSHTTPConnector struct { ctx context.Context - cancel context.CancelFunc destHost string destPort int listenAddr string listenPort int handlers map[string]func(http.ResponseWriter, *http.Request) rpcCh chan ClusterRPC - listener net.Listener - shutdownCh chan bool - doneCh chan bool + listener net.Listener + server *http.Server + + shutdownCh chan struct{} + doneCh chan struct{} } type ipfsError struct { @@ -44,15 +45,21 @@ type ipfsError struct { // NewIPFSHTTPConnector creates the component and leaves it ready to be started func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.IPFSAPIListenAddr, cfg.IPFSAPIListenPort)) if err != nil { return nil, err } + + smux := http.NewServeMux() + s := &http.Server{ + Handler: smux, + } + s.SetKeepAlivesEnabled(true) // A reminder that this can be changed + ipfs := &IPFSHTTPConnector{ ctx: ctx, - cancel: cancel, destHost: cfg.IPFSHost, destPort: cfg.IPFSPort, listenAddr: cfg.IPFSAPIListenAddr, @@ -60,10 +67,13 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { handlers: make(map[string]func(http.ResponseWriter, *http.Request)), rpcCh: make(chan ClusterRPC, RPCMaxQueue), listener: l, - shutdownCh: make(chan bool), - doneCh: make(chan bool), + server: s, + shutdownCh: make(chan struct{}), + doneCh: make(chan struct{}), } + smux.HandleFunc("/", ipfs.handle) + logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort) go ipfs.run() return ipfs, nil @@ -114,10 +124,10 @@ func (ipfs *IPFSHTTPConnector) defaultHandler(w http.ResponseWriter, r *http.Req func (ipfs *IPFSHTTPConnector) run() { // This launches the proxy go func() { - smux := http.NewServeMux() - smux.HandleFunc("/", ipfs.handle) - // Fixme: make this with closable net listener - err := http.Serve(ipfs.listener, smux) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ipfs.ctx = ctx + err := ipfs.server.Serve(ipfs.listener) select { case <-ipfs.shutdownCh: close(ipfs.doneCh) @@ -140,6 +150,7 @@ func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC { func (ipfs *IPFSHTTPConnector) Shutdown() error { logger.Info("Stopping IPFS Proxy") close(ipfs.shutdownCh) + ipfs.server.SetKeepAlivesEnabled(false) ipfs.listener.Close() <-ipfs.doneCh return nil diff --git a/ipfs_connector_test.go b/ipfs_connector_test.go index 65639b3c..788e9695 100644 --- a/ipfs_connector_test.go +++ b/ipfs_connector_test.go @@ -40,7 +40,6 @@ func testServer(t *testing.T) *httptest.Server { w.WriteHeader(http.StatusNotFound) } })) - t.Log("test server url: ", ts.URL) return ts } diff --git a/state.go b/state.go index 715b54b1..547cee3b 100644 --- a/state.go +++ b/state.go @@ -8,31 +8,41 @@ import ( // MapState is a very simple database to store // the state of the system. -// PinMap is public because it is serialized -// and maintained by Raft. type MapState struct { - PinMap map[string]bool + PinMap map[string]struct{} rpcCh chan ClusterRPC mux sync.Mutex } -func NewMapState() MapState { - return MapState{ - PinMap: make(map[string]bool), +func NewMapState() *MapState { + return &MapState{ + PinMap: make(map[string]struct{}), rpcCh: make(chan ClusterRPC), } } -func (st MapState) AddPin(c *cid.Cid) error { +func (st *MapState) AddPin(c *cid.Cid) error { st.mux.Lock() defer st.mux.Unlock() - st.PinMap[c.String()] = true + var a struct{} + st.PinMap[c.String()] = a return nil } -func (st MapState) RmPin(c *cid.Cid) error { +func (st *MapState) RmPin(c *cid.Cid) error { st.mux.Lock() defer st.mux.Unlock() delete(st.PinMap, c.String()) return nil } + +func (st *MapState) ListPins() []*cid.Cid { + st.mux.Lock() + defer st.mux.Unlock() + cids := make([]*cid.Cid, 0, len(st.PinMap)) + for k, _ := range st.PinMap { + c, _ := cid.Decode(k) + cids = append(cids, c) + } + return cids +} diff --git a/tracker.go b/tracker.go index 94db38f8..d53b4a24 100644 --- a/tracker.go +++ b/tracker.go @@ -33,41 +33,39 @@ type MapPinTracker struct { status map[string]Pin rpcCh chan ClusterRPC - mux sync.Mutex - doneCh chan bool + mux sync.Mutex - ctx context.Context - cancel context.CancelFunc + shutdownCh chan struct{} + doneCh chan struct{} + + ctx context.Context } func NewMapPinTracker() *MapPinTracker { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() mpt := &MapPinTracker{ - status: make(map[string]Pin), - rpcCh: make(chan ClusterRPC), - doneCh: make(chan bool), - ctx: ctx, - cancel: cancel, + status: make(map[string]Pin), + rpcCh: make(chan ClusterRPC, RPCMaxQueue), + shutdownCh: make(chan struct{}), + doneCh: make(chan struct{}), + ctx: ctx, } go mpt.run() return mpt } func (mpt *MapPinTracker) run() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mpt.ctx = ctx + for { + select { + case <-mpt.shutdownCh: + close(mpt.doneCh) + return + } + } // Great plans for this thread - - // The first time we run, we sync all - // and try to recover any errors - changed := mpt.SyncAll() - for _, p := range changed { - logger.Debugf("Recovering %s", p.Cid) - mpt.Recover(p.Cid) - } - - select { - case <-mpt.ctx.Done(): - close(mpt.doneCh) - } } func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error { @@ -226,9 +224,63 @@ func (mpt *MapPinTracker) SyncAll() []Pin { return changedPins } +func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin { + clusterPins := cState.ListPins() + clusterMap := make(map[string]struct{}) + // Make a map for faster lookup + for _, c := range clusterPins { + var a struct{} + clusterMap[c.String()] = a + } + var toRemove []*cid.Cid + var toAdd []*cid.Cid + var changed []Pin + mpt.mux.Lock() + + // Collect items in the ClusterState not in the tracker + for _, c := range clusterPins { + _, ok := mpt.status[c.String()] + if !ok { + toAdd = append(toAdd, c) + } + } + + // Collect items in the tracker not in the ClusterState + for _, p := range mpt.status { + _, ok := clusterMap[p.Cid.String()] + if !ok { + toRemove = append(toRemove, p.Cid) + } + } + + // Update new items and mark them as pinning error + for _, c := range toAdd { + p := Pin{ + Cid: c, + PinMode: pinEverywhere, + Status: PinError, + } + mpt.status[c.String()] = p + changed = append(changed, p) + } + + // Mark items that need to be removed as unpin error + for _, c := range toRemove { + p := Pin{ + Cid: c, + PinMode: pinEverywhere, + Status: UnpinError, + } + mpt.status[c.String()] = p + changed = append(changed, p) + } + mpt.mux.Unlock() + return changed +} + func (mpt *MapPinTracker) Shutdown() error { logger.Info("Stopping MapPinTracker") - mpt.cancel() + close(mpt.shutdownCh) <-mpt.doneCh return nil }