Sync between tracker and cluster state. go vet. tests.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-09 20:54:46 +01:00
parent 8a60d49c82
commit 34b2b6cbd1
11 changed files with 427 additions and 73 deletions

View File

@ -2,7 +2,12 @@ all: deps
gx:
go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go
deps: gx
deps: gx
go get github.com/gorilla/mux
go get github.com/hashicorp/raft
go get github.com/hashicorp/raft-boltdb
go get github.com/ugorji/go/codec
gx --verbose install --global
gx-go rewrite
test: deps

35
api.go
View File

@ -18,16 +18,16 @@ import (
// a RESTful HTTP API for Cluster.
type ClusterHTTPAPI struct {
ctx context.Context
cancel context.CancelFunc
listenAddr string
listenPort int
rpcCh chan ClusterRPC
router *mux.Router
listener net.Listener
server *http.Server
doneCh chan bool
shutdownCh chan bool
doneCh chan struct{}
shutdownCh chan struct{}
}
type route struct {
@ -68,7 +68,7 @@ type pinListResp []pinElemResp
// NewHTTPClusterAPI creates a new object which is ready to be
// started.
func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.ClusterAPIListenAddr,
cfg.ClusterAPIListenPort))
@ -76,18 +76,21 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
return nil, err
}
router := mux.NewRouter().StrictSlash(true)
s := &http.Server{Handler: router}
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
api := &ClusterHTTPAPI{
ctx: ctx,
cancel: cancel,
listenAddr: cfg.ClusterAPIListenAddr,
listenPort: cfg.ClusterAPIListenPort,
listener: l,
server: s,
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
doneCh: make(chan bool),
shutdownCh: make(chan bool),
doneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
}
router := mux.NewRouter().StrictSlash(true)
for _, route := range api.routes() {
router.
Methods(route.Method).
@ -139,7 +142,10 @@ func (api *ClusterHTTPAPI) routes() []route {
func (api *ClusterHTTPAPI) run() {
go func() {
err := http.Serve(api.listener, api.router)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
api.ctx = ctx
err := api.server.Serve(api.listener)
select {
case <-api.shutdownCh:
close(api.doneCh)
@ -154,8 +160,8 @@ func (api *ClusterHTTPAPI) run() {
// Shutdown stops any API listeners.
func (api *ClusterHTTPAPI) Shutdown() error {
logger.Info("Stopping Cluster API")
api.cancel()
close(api.shutdownCh)
api.server.SetKeepAlivesEnabled(false)
api.listener.Close()
<-api.doneCh
return nil
@ -200,7 +206,8 @@ func (api *ClusterHTTPAPI) pinHandler(w http.ResponseWriter, r *http.Request) {
hash := vars["hash"]
c, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, err.Error())
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return
}
rRpc := RPC(PinRPC, c)
@ -218,7 +225,8 @@ func (api *ClusterHTTPAPI) unpinHandler(w http.ResponseWriter, r *http.Request)
hash := vars["hash"]
c, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, err.Error())
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return
}
rRpc := RPC(UnpinRPC, c)
@ -288,7 +296,8 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool {
ok = false
}
if !ok {
logger.Error("unexpected RPC Response format")
logger.Errorf("unexpected RPC Response format for %d:", op)
logger.Errorf("%+v", resp.Data)
sendErrorResponse(w, 500, "Unexpected RPC Response format")
return false
}

215
api_test.go Normal file
View File

@ -0,0 +1,215 @@
package ipfscluster
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"testing"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
var (
apiHost = "http://127.0.0.1:5000"
)
func testClusterApi(t *testing.T) *ClusterHTTPAPI {
//logging.SetDebugLogging()
cfg := &ClusterConfig{
ClusterAPIListenAddr: "127.0.0.1",
ClusterAPIListenPort: 5000,
}
api, err := NewHTTPClusterAPI(cfg)
// No keep alive! Otherwise tests hang with
// connections re-used from previous tests
api.server.SetKeepAlivesEnabled(false)
if err != nil {
t.Fatal("should be able to create a new Api: ", err)
}
if api.RpcChan() == nil {
t.Fatal("should create the Rpc channel")
}
return api
}
func simulateAnswer(ch <-chan ClusterRPC, answer interface{}, err error) {
go func() {
req := <-ch
req.ResponseCh() <- RPCResponse{
Data: answer,
Error: err,
}
}()
}
func processResp(t *testing.T, httpResp *http.Response, err error, resp interface{}) {
if err != nil {
t.Fatal("error making get request: ", err)
}
body, err := ioutil.ReadAll(httpResp.Body)
defer httpResp.Body.Close()
if err != nil {
t.Fatal("error reading body: ", err)
}
if len(body) != 0 {
err = json.Unmarshal(body, resp)
if err != nil {
t.Error(string(body))
t.Fatal("error parsing json: ", err)
}
}
}
func makeGet(t *testing.T, path string, resp interface{}) {
httpResp, err := http.Get(apiHost + path)
processResp(t, httpResp, err, resp)
}
func makePost(t *testing.T, path string, resp interface{}) {
httpResp, err := http.Post(apiHost+path, "application/json", bytes.NewReader([]byte{}))
processResp(t, httpResp, err, resp)
}
func makeDelete(t *testing.T, path string, resp interface{}) {
req, _ := http.NewRequest("DELETE", apiHost+path, bytes.NewReader([]byte{}))
c := &http.Client{}
httpResp, err := c.Do(req)
processResp(t, httpResp, err, resp)
}
func TestAPIShutdown(t *testing.T) {
api := testClusterApi(t)
err := api.Shutdown()
if err != nil {
t.Error("should shutdown cleanly: ", err)
}
}
func TestVersionEndpoint(t *testing.T) {
api := testClusterApi(t)
api.server.SetKeepAlivesEnabled(false)
defer api.Shutdown()
simulateAnswer(api.RpcChan(), "v", nil)
ver := versionResp{}
makeGet(t, "/version", &ver)
if ver.Version != "v" {
t.Error("expected correct version")
}
simulateAnswer(api.RpcChan(), nil, errors.New("an error"))
errResp := errorResp{}
makeGet(t, "/version", &errResp)
if errResp.Message != "an error" {
t.Error("expected different error")
}
}
func TestMemberListEndpoint(t *testing.T) {
api := testClusterApi(t)
api.server.SetKeepAlivesEnabled(false)
defer api.Shutdown()
pList := []peer.ID{
testPeerID,
}
simulateAnswer(api.RpcChan(), pList, nil)
var list []string
makeGet(t, "/members", &list)
if len(list) != 1 || list[0] != testPeerID.Pretty() {
t.Error("expected a peer id list: ", list)
}
simulateAnswer(api.RpcChan(), nil, errors.New("an error"))
errResp := errorResp{}
makeGet(t, "/members", &errResp)
if errResp.Message != "an error" {
t.Error("expected different error")
}
}
func TestPinEndpoint(t *testing.T) {
api := testClusterApi(t)
defer api.Shutdown()
simulateAnswer(api.RpcChan(), nil, nil)
var i interface{} = nil
makePost(t, "/pins/"+testCid, &i)
if i != nil {
t.Error("pin should have returned an empty response")
}
simulateAnswer(api.RpcChan(), nil, errors.New("an error"))
errResp := errorResp{}
makePost(t, "/pins/"+testCid2, &errResp)
if errResp.Message != "an error" {
t.Error("expected different error")
}
makePost(t, "/pins/abcd", &errResp)
if errResp.Code != 400 {
t.Error("should fail with wrong Cid")
}
}
func TestUnpinEndpoint(t *testing.T) {
api := testClusterApi(t)
defer api.Shutdown()
simulateAnswer(api.RpcChan(), nil, nil)
var i interface{} = nil
makeDelete(t, "/pins/"+testCid, &i)
if i != nil {
t.Error("pin should have returned an empty response")
}
simulateAnswer(api.RpcChan(), nil, errors.New("an error"))
errResp := errorResp{}
makeDelete(t, "/pins/"+testCid2, &errResp)
if errResp.Message != "an error" {
t.Error("expected different error")
}
makeDelete(t, "/pins/abcd", &errResp)
if errResp.Code != 400 {
t.Error("should fail with wrong Cid")
}
}
func TestPinListEndpoint(t *testing.T) {
c, _ := cid.Decode(testCid)
c2, _ := cid.Decode(testCid2)
c3, _ := cid.Decode(testCid3)
api := testClusterApi(t)
defer api.Shutdown()
pList := []Pin{
Pin{
Status: PinError,
Cid: c,
},
Pin{
Status: UnpinError,
Cid: c,
},
Pin{
Status: Pinned,
Cid: c3,
},
Pin{
Status: Pinning,
Cid: c,
},
Pin{
Status: Unpinning,
Cid: c2,
},
}
simulateAnswer(api.RpcChan(), pList, nil)
var resp pinListResp
makeGet(t, "/pins", &resp)
if len(resp) != 5 {
t.Error("unexpected pinListResp: ", resp)
}
}

View File

@ -21,8 +21,7 @@ import (
// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the componenets that make up the system.
type Cluster struct {
ctx context.Context
cancel context.CancelFunc
ctx context.Context
config *ClusterConfig
host host.Host
@ -38,7 +37,7 @@ type Cluster struct {
// an IPFSConnector and a ClusterState as parameters, allowing the user,
// to provide custom implementations of these components.
func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state ClusterState, tracker PinTracker) (*Cluster, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
host, err := makeHost(ctx, cfg)
if err != nil {
return nil, err
@ -52,7 +51,6 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
cluster := &Cluster{
ctx: ctx,
cancel: cancel,
config: cfg,
host: host,
consensus: consensus,
@ -63,6 +61,9 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
}
logger.Info("Starting IPFS Cluster")
logger.Info("Performing State synchronization")
cluster.Sync()
go cluster.run()
return cluster, nil
}
@ -87,7 +88,19 @@ func (c *Cluster) Shutdown() error {
logger.Errorf("Error stopping PinTracker: %s", err)
return err
}
c.cancel()
return nil
}
func (c *Cluster) Sync() error {
cState, err := c.consensus.State()
if err != nil {
return err
}
changed := c.tracker.SyncState(cState)
for _, p := range changed {
logger.Debugf("Recovering %s", p.Cid)
c.tracker.Recover(p.Cid)
}
return nil
}
@ -142,6 +155,9 @@ func (c *Cluster) Members() []peer.ID {
// run reads from the RPC channels of the different components and launches
// short-lived go-routines to handle any requests.
func (c *Cluster) run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.ctx = ctx
ipfsCh := c.ipfs.RpcChan()
consensusCh := c.consensus.RpcChan()
apiCh := c.api.RpcChan()

View File

@ -3,6 +3,7 @@ package ipfscluster
import (
"context"
"errors"
"time"
host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host"
consensus "gx/ipfs/QmZ88KbrvZMJpXaNwAGffswcYKz8EbeafzAFGMCA6MEZKt/go-libp2p-consensus"
@ -25,6 +26,10 @@ const (
type clusterLogOpType int
// We will wait for the consensus state to be updated up to this
// amount of seconds.
var MaxStartupDelay = 10 * time.Second
// clusterLogOp represents an operation for the OpLogConsensus system.
// It implements the consensus.Op interface.
type clusterLogOp struct {
@ -90,8 +95,7 @@ ROLLBACK:
// the members of an IPFS Cluster, as well as modifying that state and
// applying any updates in a thread-safe manner.
type ClusterConsensus struct {
ctx context.Context
cancel context.CancelFunc
ctx context.Context
consensus consensus.OpLogConsensus
actor consensus.Actor
@ -106,7 +110,7 @@ type ClusterConsensus struct {
// is discarded.
func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) (*ClusterConsensus, error) {
logger.Info("Starting Consensus component")
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
rpcCh := make(chan ClusterRPC, RPCMaxQueue)
op := clusterLogOp{
ctx: ctx,
@ -121,12 +125,25 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
cc := &ClusterConsensus{
ctx: ctx,
cancel: cancel,
consensus: con,
actor: actor,
rpcCh: rpcCh,
p2pRaft: wrapper,
}
logger.Info("Waiting for Consensus state to catch up")
time.Sleep(1 * time.Second)
start := time.Now()
for {
time.Sleep(500 * time.Millisecond)
li := wrapper.raft.LastIndex()
lai := wrapper.raft.AppliedIndex()
if lai == li || time.Since(start) > MaxStartupDelay {
break
}
logger.Debugf("Waiting for Raft index: %d/%d", lai, li)
}
return cc, nil
}
@ -135,7 +152,6 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
// shutdown, along with the libp2p transport.
func (cc *ClusterConsensus) Shutdown() error {
logger.Info("Stopping Consensus component")
cc.cancel()
// When we take snapshot, we make sure that
// we re-start from the previous state, and that
@ -190,6 +206,18 @@ func (cc *ClusterConsensus) RmPin(c *cid.Cid) error {
return nil
}
func (cc *ClusterConsensus) State() (ClusterState, error) {
st, err := cc.consensus.GetLogHead()
if err != nil {
return nil, err
}
state, ok := st.(ClusterState)
if !ok {
return nil, errors.New("Wrong state type")
}
return state, nil
}
// Leader() returns the peerID of the Leader of the
// cluster.
func (cc *ClusterConsensus) Leader() peer.ID {

View File

@ -73,6 +73,8 @@ type ClusterState interface {
AddPin(*cid.Cid) error
// RmPin removes a pin from the ClusterState
RmPin(*cid.Cid) error
// ListPins lists all the pins in the state
ListPins() []*cid.Cid
}
// PinTracker represents a component which tracks the status of
@ -104,6 +106,9 @@ type PinTracker interface {
Recover(*cid.Cid) error
// SyncAll runs Sync() on every known Pin. It returns a list of changed Pins
SyncAll() []Pin
// SyncState makes sure that the tracked Pins matches those in the
// cluster state and runs SyncAll(). It returns a list of changed Pins.
SyncState(ClusterState) []Pin
}
// MakeRPC sends a ClusterRPC object over a channel and optionally waits for a
@ -132,6 +137,7 @@ func MakeRPC(ctx context.Context, rpcCh chan ClusterRPC, r ClusterRPC, waitForRe
}
}
if !waitForResponse {
logger.Debug("Not waiting for response. Returning directly")
return RPCResponse{}
}

View File

@ -5,12 +5,15 @@ import (
"strings"
"testing"
"time"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
var (
testCid = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq"
testCid2 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma"
testCid3 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb"
testCid = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq"
testCid2 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma"
testCid3 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb"
testPeerID, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
)
func TestMakeRPC(t *testing.T) {

View File

@ -25,17 +25,18 @@ import (
// against the configured IPFS daemom (such as a pin request).
type IPFSHTTPConnector struct {
ctx context.Context
cancel context.CancelFunc
destHost string
destPort int
listenAddr string
listenPort int
handlers map[string]func(http.ResponseWriter, *http.Request)
rpcCh chan ClusterRPC
listener net.Listener
shutdownCh chan bool
doneCh chan bool
listener net.Listener
server *http.Server
shutdownCh chan struct{}
doneCh chan struct{}
}
type ipfsError struct {
@ -44,15 +45,21 @@ type ipfsError struct {
// NewIPFSHTTPConnector creates the component and leaves it ready to be started
func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.IPFSAPIListenAddr, cfg.IPFSAPIListenPort))
if err != nil {
return nil, err
}
smux := http.NewServeMux()
s := &http.Server{
Handler: smux,
}
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
ipfs := &IPFSHTTPConnector{
ctx: ctx,
cancel: cancel,
destHost: cfg.IPFSHost,
destPort: cfg.IPFSPort,
listenAddr: cfg.IPFSAPIListenAddr,
@ -60,10 +67,13 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
listener: l,
shutdownCh: make(chan bool),
doneCh: make(chan bool),
server: s,
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
smux.HandleFunc("/", ipfs.handle)
logger.Infof("Starting IPFS Proxy on %s:%d", ipfs.listenAddr, ipfs.listenPort)
go ipfs.run()
return ipfs, nil
@ -114,10 +124,10 @@ func (ipfs *IPFSHTTPConnector) defaultHandler(w http.ResponseWriter, r *http.Req
func (ipfs *IPFSHTTPConnector) run() {
// This launches the proxy
go func() {
smux := http.NewServeMux()
smux.HandleFunc("/", ipfs.handle)
// Fixme: make this with closable net listener
err := http.Serve(ipfs.listener, smux)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ipfs.ctx = ctx
err := ipfs.server.Serve(ipfs.listener)
select {
case <-ipfs.shutdownCh:
close(ipfs.doneCh)
@ -140,6 +150,7 @@ func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC {
func (ipfs *IPFSHTTPConnector) Shutdown() error {
logger.Info("Stopping IPFS Proxy")
close(ipfs.shutdownCh)
ipfs.server.SetKeepAlivesEnabled(false)
ipfs.listener.Close()
<-ipfs.doneCh
return nil

View File

@ -40,7 +40,6 @@ func testServer(t *testing.T) *httptest.Server {
w.WriteHeader(http.StatusNotFound)
}
}))
t.Log("test server url: ", ts.URL)
return ts
}

View File

@ -8,31 +8,41 @@ import (
// MapState is a very simple database to store
// the state of the system.
// PinMap is public because it is serialized
// and maintained by Raft.
type MapState struct {
PinMap map[string]bool
PinMap map[string]struct{}
rpcCh chan ClusterRPC
mux sync.Mutex
}
func NewMapState() MapState {
return MapState{
PinMap: make(map[string]bool),
func NewMapState() *MapState {
return &MapState{
PinMap: make(map[string]struct{}),
rpcCh: make(chan ClusterRPC),
}
}
func (st MapState) AddPin(c *cid.Cid) error {
func (st *MapState) AddPin(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = true
var a struct{}
st.PinMap[c.String()] = a
return nil
}
func (st MapState) RmPin(c *cid.Cid) error {
func (st *MapState) RmPin(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
delete(st.PinMap, c.String())
return nil
}
func (st *MapState) ListPins() []*cid.Cid {
st.mux.Lock()
defer st.mux.Unlock()
cids := make([]*cid.Cid, 0, len(st.PinMap))
for k, _ := range st.PinMap {
c, _ := cid.Decode(k)
cids = append(cids, c)
}
return cids
}

View File

@ -33,41 +33,39 @@ type MapPinTracker struct {
status map[string]Pin
rpcCh chan ClusterRPC
mux sync.Mutex
doneCh chan bool
mux sync.Mutex
ctx context.Context
cancel context.CancelFunc
shutdownCh chan struct{}
doneCh chan struct{}
ctx context.Context
}
func NewMapPinTracker() *MapPinTracker {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
mpt := &MapPinTracker{
status: make(map[string]Pin),
rpcCh: make(chan ClusterRPC),
doneCh: make(chan bool),
ctx: ctx,
cancel: cancel,
status: make(map[string]Pin),
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
ctx: ctx,
}
go mpt.run()
return mpt
}
func (mpt *MapPinTracker) run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mpt.ctx = ctx
for {
select {
case <-mpt.shutdownCh:
close(mpt.doneCh)
return
}
}
// Great plans for this thread
// The first time we run, we sync all
// and try to recover any errors
changed := mpt.SyncAll()
for _, p := range changed {
logger.Debugf("Recovering %s", p.Cid)
mpt.Recover(p.Cid)
}
select {
case <-mpt.ctx.Done():
close(mpt.doneCh)
}
}
func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error {
@ -226,9 +224,63 @@ func (mpt *MapPinTracker) SyncAll() []Pin {
return changedPins
}
func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin {
clusterPins := cState.ListPins()
clusterMap := make(map[string]struct{})
// Make a map for faster lookup
for _, c := range clusterPins {
var a struct{}
clusterMap[c.String()] = a
}
var toRemove []*cid.Cid
var toAdd []*cid.Cid
var changed []Pin
mpt.mux.Lock()
// Collect items in the ClusterState not in the tracker
for _, c := range clusterPins {
_, ok := mpt.status[c.String()]
if !ok {
toAdd = append(toAdd, c)
}
}
// Collect items in the tracker not in the ClusterState
for _, p := range mpt.status {
_, ok := clusterMap[p.Cid.String()]
if !ok {
toRemove = append(toRemove, p.Cid)
}
}
// Update new items and mark them as pinning error
for _, c := range toAdd {
p := Pin{
Cid: c,
PinMode: pinEverywhere,
Status: PinError,
}
mpt.status[c.String()] = p
changed = append(changed, p)
}
// Mark items that need to be removed as unpin error
for _, c := range toRemove {
p := Pin{
Cid: c,
PinMode: pinEverywhere,
Status: UnpinError,
}
mpt.status[c.String()] = p
changed = append(changed, p)
}
mpt.mux.Unlock()
return changed
}
func (mpt *MapPinTracker) Shutdown() error {
logger.Info("Stopping MapPinTracker")
mpt.cancel()
close(mpt.shutdownCh)
<-mpt.doneCh
return nil
}