From 5258a4d428600976ebae1b14be9205dfacdca920 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Fri, 13 Dec 2019 01:52:54 +0530 Subject: [PATCH] Remove map pintracker (#944) This removes mappintracker and sets stateless tracker as the default (and only) pintracker component. Because the stateless tracker matches the cluster state with only ongoing operations being kept on memory, and additional information provided by ipfs-pin-ls, syncing operations are not necessary. Therefore the Sync/SyncAll operations are removed cluster-wide. --- .travis.yml | 3 - api/rest/client/client.go | 11 - api/rest/client/lbclient.go | 31 - api/rest/client/methods.go | 32 - api/rest/client/methods_test.go | 37 -- api/rest/restapi.go | 72 --- api/rest/restapi_test.go | 66 --- cluster.go | 170 +----- cluster_config.go | 17 - cluster_config_test.go | 1 - cluster_test.go | 6 +- cmd/ipfs-cluster-ctl/main.go | 39 -- cmd/ipfs-cluster-service/daemon.go | 35 +- cmd/ipfs-cluster-service/main.go | 11 +- cmdutils/configs.go | 4 - config_test.go | 14 +- consensus/crdt/consensus.go | 2 +- ipfscluster.go | 6 - ipfscluster_test.go | 193 +----- pintracker/maptracker/config.go | 115 ---- pintracker/maptracker/config_test.go | 72 --- pintracker/maptracker/maptracker.go | 452 -------------- pintracker/maptracker/maptracker_test.go | 591 ------------------- pintracker/optracker/operation.go | 59 +- pintracker/optracker/operationtracker.go | 18 - pintracker/pintracker_test.go | 505 ++-------------- pintracker/stateless/stateless.go | 329 ++++------- pintracker/stateless/stateless_test.go | 154 +---- pintracker/util/pin.go | 29 - rpc_api.go | 40 -- rpc_policy.go | 4 - rpcutil/policygen/policygen.go | 2 - sharness/config/basic_auth/service.json | 5 - sharness/config/ssl-basic_auth/service.json | 5 - sharness/config/ssl/service.json | 5 - sharness/t0010-ctl-basic-commands.sh | 1 - sharness/t0025-ctl-status-report-commands.sh | 4 - test/cids.go | 1 + test/rpc_api_mock.go | 16 - 39 files changed, 270 insertions(+), 2887 deletions(-) delete mode 100644 pintracker/maptracker/config.go delete mode 100644 pintracker/maptracker/config_test.go delete mode 100644 pintracker/maptracker/maptracker.go delete mode 100644 pintracker/maptracker/maptracker_test.go delete mode 100644 pintracker/util/pin.go diff --git a/.travis.yml b/.travis.yml index 04db3c85..9e37f22b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,9 +30,6 @@ jobs: - name: "Main Tests with raft consensus" script: - travis_wait go test -v -timeout 15m -failfast -consensus raft . - - name: "Main Tests with stateless tracker" - script: - - travis_wait go test -v -timeout 15m -failfast -tracker stateless . - name: "Golint and go vet" script: - go get -u golang.org/x/lint/golint diff --git a/api/rest/client/client.go b/api/rest/client/client.go index 4b121fe7..0fb221b7 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -86,17 +86,6 @@ type Client interface { // StatusAll gathers Status() for all tracked items. StatusAll(ctx context.Context, filter api.TrackerStatus, local bool) ([]*api.GlobalPinInfo, error) - // Sync makes sure the state of a Cid corresponds to the state reported - // by the ipfs daemon, and returns it. If local is true, this operation - // only happens on the current peer, otherwise it happens on every - // cluster peer. - Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) - // SyncAll triggers Sync() operations for all tracked items. It only - // returns informations for items that were de-synced or have an error - // state. If local is true, the operation is limited to the current - // peer. Otherwise it happens on every cluster peer. - SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) - // Recover retriggers pin or unpin ipfs operations for a Cid in error // state. If local is true, the operation is limited to the current // peer, otherwise it happens on every cluster peer. diff --git a/api/rest/client/lbclient.go b/api/rest/client/lbclient.go index 629cb472..031264a9 100644 --- a/api/rest/client/lbclient.go +++ b/api/rest/client/lbclient.go @@ -270,37 +270,6 @@ func (lc *loadBalancingClient) StatusAll(ctx context.Context, filter api.Tracker return pinInfos, err } -// Sync makes sure the state of a Cid corresponds to the state reported by -// the ipfs daemon, and returns it. If local is true, this operation only -// happens on the current peer, otherwise it happens on every cluster peer. -func (lc *loadBalancingClient) Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) { - var pinInfo *api.GlobalPinInfo - call := func(c Client) error { - var err error - pinInfo, err = c.Sync(ctx, ci, local) - return err - } - - err := lc.retry(0, call) - return pinInfo, err -} - -// SyncAll triggers Sync() operations for all tracked items. It only returns -// informations for items that were de-synced or have an error state. If -// local is true, the operation is limited to the current peer. Otherwise -// it happens on every cluster peer. -func (lc *loadBalancingClient) SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) { - var pinInfos []*api.GlobalPinInfo - call := func(c Client) error { - var err error - pinInfos, err = c.SyncAll(ctx, local) - return err - } - - err := lc.retry(0, call) - return pinInfos, err -} - // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index 532aba8b..5b91ac55 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -250,38 +250,6 @@ func (c *defaultClient) StatusAll(ctx context.Context, filter api.TrackerStatus, return gpis, err } -// Sync makes sure the state of a Cid corresponds to the state reported by -// the ipfs daemon, and returns it. If local is true, this operation only -// happens on the current peer, otherwise it happens on every cluster peer. -func (c *defaultClient) Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) { - ctx, span := trace.StartSpan(ctx, "client/Sync") - defer span.End() - - var gpi api.GlobalPinInfo - err := c.do( - ctx, - "POST", - fmt.Sprintf("/pins/%s/sync?local=%t", ci.String(), local), - nil, - nil, - &gpi, - ) - return &gpi, err -} - -// SyncAll triggers Sync() operations for all tracked items. It only returns -// informations for items that were de-synced or have an error state. If -// local is true, the operation is limited to the current peer. Otherwise -// it happens on every cluster peer. -func (c *defaultClient) SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) { - ctx, span := trace.StartSpan(ctx, "client/SyncAll") - defer span.End() - - var gpis []*api.GlobalPinInfo - err := c.do(ctx, "POST", fmt.Sprintf("/pins/sync?local=%t", local), nil, nil, &gpis) - return gpis, err -} - // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 561f24e8..d2b39e87 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -376,43 +376,6 @@ func TestStatusAll(t *testing.T) { testClients(t, api, testF) } -func TestSync(t *testing.T) { - ctx := context.Background() - api := testAPI(t) - defer shutdown(api) - - testF := func(t *testing.T, c Client) { - pin, err := c.Sync(ctx, test.Cid1, false) - if err != nil { - t.Fatal(err) - } - if !pin.Cid.Equals(test.Cid1) { - t.Error("should be same pin") - } - } - - testClients(t, api, testF) -} - -func TestSyncAll(t *testing.T) { - ctx := context.Background() - api := testAPI(t) - defer shutdown(api) - - testF := func(t *testing.T, c Client) { - pins, err := c.SyncAll(ctx, false) - if err != nil { - t.Fatal(err) - } - - if len(pins) == 0 { - t.Error("there should be some pins") - } - } - - testClients(t, api, testF) -} - func TestRecover(t *testing.T) { ctx := context.Background() api := testAPI(t) diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 62beafc9..52e95aba 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -409,18 +409,6 @@ func (api *API) routes() []route { "/pins", api.statusAllHandler, }, - { - "Sync", - "POST", - "/pins/{hash}/sync", - api.syncHandler, - }, - { - "SyncAll", - "POST", - "/pins/sync", - api.syncAllHandler, - }, { "Recover", "POST", @@ -978,66 +966,6 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) { } } -func (api *API) syncAllHandler(w http.ResponseWriter, r *http.Request) { - queryValues := r.URL.Query() - local := queryValues.Get("local") - - if local == "true" { - var pinInfos []*types.PinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "SyncAllLocal", - struct{}{}, - &pinInfos, - ) - api.sendResponse(w, autoStatus, err, pinInfosToGlobal(pinInfos)) - } else { - var pinInfos []*types.GlobalPinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "SyncAll", - struct{}{}, - &pinInfos, - ) - api.sendResponse(w, autoStatus, err, pinInfos) - } -} - -func (api *API) syncHandler(w http.ResponseWriter, r *http.Request) { - queryValues := r.URL.Query() - local := queryValues.Get("local") - - if pin := api.parseCidOrError(w, r); pin != nil { - if local == "true" { - var pinInfo types.PinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "SyncLocal", - pin.Cid, - &pinInfo, - ) - api.sendResponse(w, autoStatus, err, pinInfoToGlobal(&pinInfo)) - } else { - var pinInfo types.GlobalPinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "Sync", - pin.Cid, - &pinInfo, - ) - api.sendResponse(w, autoStatus, err, pinInfo) - } - } -} - func (api *API) recoverAllHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query() local := queryValues.Get("local") diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 39a27c58..496da90a 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -951,72 +951,6 @@ func TestAPIStatusEndpoint(t *testing.T) { testBothEndpoints(t, tf) } -func TestAPISyncAllEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url urlF) { - var resp []*api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/sync", []byte{}, &resp) - - if len(resp) != 3 || - !resp[0].Cid.Equals(test.Cid1) || - resp[1].PeerMap[peer.IDB58Encode(test.PeerID1)].Status.String() != "pinning" { - t.Errorf("unexpected syncAll resp:\n %+v", resp) - } - - // Test local=true - var resp2 []*api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/sync?local=true", []byte{}, &resp2) - - if len(resp2) != 2 { - t.Errorf("unexpected syncAll+local resp:\n %+v", resp2) - } - } - - testBothEndpoints(t, tf) -} - -func TestAPISyncEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url urlF) { - var resp api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/"+test.Cid1.String()+"/sync", []byte{}, &resp) - - if !resp.Cid.Equals(test.Cid1) { - t.Error("expected the same cid") - } - info, ok := resp.PeerMap[peer.IDB58Encode(test.PeerID1)] - if !ok { - t.Fatal("expected info for test.PeerID1") - } - if info.Status.String() != "pinned" { - t.Error("expected different status") - } - - // Test local=true - var resp2 api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/"+test.Cid1.String()+"/sync?local=true", []byte{}, &resp2) - - if !resp2.Cid.Equals(test.Cid1) { - t.Error("expected the same cid") - } - info, ok = resp2.PeerMap[peer.IDB58Encode(test.PeerID2)] - if !ok { - t.Fatal("expected info for test.PeerID2") - } - if info.Status.String() != "pinned" { - t.Error("expected different status") - } - } - - testBothEndpoints(t, tf) -} - func TestAPIRecoverEndpoint(t *testing.T) { ctx := context.Background() rest := testAPI(t) diff --git a/cluster.go b/cluster.go index e76ada25..5b02c475 100644 --- a/cluster.go +++ b/cluster.go @@ -247,13 +247,12 @@ func (c *Cluster) setupRPCClients() { } } -// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time -func (c *Cluster) syncWatcher() { - ctx, span := trace.StartSpan(c.ctx, "cluster/syncWatcher") +// watchPinset triggers recurrent operations that loop on the pinset. +func (c *Cluster) watchPinset() { + ctx, span := trace.StartSpan(c.ctx, "cluster/watchPinset") defer span.End() stateSyncTicker := time.NewTicker(c.config.StateSyncInterval) - syncTicker := time.NewTicker(c.config.IPFSSyncInterval) recoverTicker := time.NewTicker(c.config.PinRecoverInterval) for { @@ -261,14 +260,12 @@ func (c *Cluster) syncWatcher() { case <-stateSyncTicker.C: logger.Debug("auto-triggering StateSync()") c.StateSync(ctx) - case <-syncTicker.C: - logger.Debug("auto-triggering SyncAllLocal()") - c.SyncAllLocal(ctx) case <-recoverTicker.C: logger.Debug("auto-triggering RecoverAllLocal()") c.RecoverAllLocal(ctx) case <-c.ctx.Done(): stateSyncTicker.Stop() + recoverTicker.Stop() return } } @@ -553,7 +550,7 @@ func (c *Cluster) run() { c.wg.Add(1) go func() { defer c.wg.Done() - c.syncWatcher() + c.watchPinset() }() c.wg.Add(1) @@ -975,113 +972,59 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { return nil } -// StateSync syncs the consensus state to the Pin Tracker, ensuring -// that every Cid in the shared state is tracked and that the Pin Tracker -// is not tracking more Cids than it should. +// StateSync performs maintenance tasks on the global state that require +// looping through all the items. It is triggered automatically on +// StateSyncInterval. Currently it: +// * Sends unpin for expired items for which this peer is "closest" +// (skipped for follower peers) func (c *Cluster) StateSync(ctx context.Context) error { _, span := trace.StartSpan(ctx, "cluster/StateSync") defer span.End() ctx = trace.NewContext(c.ctx, span) + if c.config.FollowerMode { + return nil + } + cState, err := c.consensus.State(ctx) if err != nil { return err } - logger.Debug("syncing state to tracker") timeNow := time.Now() clusterPins, err := cState.List(ctx) if err != nil { return err } - trackedPins := c.tracker.StatusAll(ctx) - trackedPinsMap := make(map[string]struct{}) - for _, tpin := range trackedPins { - trackedPinsMap[tpin.Cid.String()] = struct{}{} + // Only trigger pin operations if we are the closest with respect to + // other trusted peers. We cannot know if our peer ID is trusted by + // other peers in the Cluster. This assumes yes. Setting FollowerMode + // is a way to assume the opposite and skip this completely. + trustedPeers, err := c.getTrustedPeers(ctx) + if err != nil { + return nil } - // Track items which are not tracked - for _, pin := range clusterPins { - _, tracked := trackedPinsMap[pin.Cid.String()] - if !tracked { - logger.Debugf("StateSync: tracking %s, part of the shared state", pin.Cid) - err = c.tracker.Track(ctx, pin) - if err != nil { - return err - } - } + checker := distanceChecker{ + local: c.id, + otherPeers: trustedPeers, + cache: make(map[peer.ID]distance, len(trustedPeers)+1), } - isClosest := func(cid.Cid) bool { - return false - } - - if !c.config.FollowerMode { - trustedPeers, err := c.getTrustedPeers(ctx) - if err != nil { - return nil - } - checker := distanceChecker{ - local: c.id, - otherPeers: trustedPeers, - cache: make(map[peer.ID]distance, len(trustedPeers)+1), - } - isClosest = func(c cid.Cid) bool { - return checker.isClosest(c) - } - } - - // a. Untrack items which should not be tracked - // b. Unpin items which have expired - // c. Track items which should not be remote as local - // d. Track items which should not be local as remote - for _, p := range trackedPins { - pCid := p.Cid - currentPin, err := cState.Get(ctx, pCid) - if err != nil && err != state.ErrNotFound { - return err - } - - if err == state.ErrNotFound { - logger.Debugf("StateSync: untracking %s: not part of shared state", pCid) - c.tracker.Untrack(ctx, pCid) - continue - } - - if currentPin.ExpiredAt(timeNow) && isClosest(pCid) { - logger.Infof("Unpinning %s: pin expired at %s", pCid, currentPin.ExpireAt) - if _, err := c.Unpin(ctx, pCid); err != nil { + // Unpin expired items when we are the closest peer to them. + for _, p := range clusterPins { + if p.ExpiredAt(timeNow) && checker.isClosest(p.Cid) { + logger.Infof("Unpinning %s: pin expired at %s", p.Cid, p.ExpireAt) + if _, err := c.Unpin(ctx, p.Cid); err != nil { logger.Error(err) } - continue - } - - err = c.updateRemotePins(ctx, currentPin, p) - if err != nil { - return err } } return nil } -func (c *Cluster) updateRemotePins(ctx context.Context, pin *api.Pin, p *api.PinInfo) error { - var err error - allocatedHere := pin.ReplicationFactorMin == -1 || containsPeer(pin.Allocations, c.id) - - switch { - case p.Status == api.TrackerStatusRemote && allocatedHere: - logger.Debugf("StateSync: Tracking %s locally (currently remote)", p.Cid) - err = c.tracker.Track(ctx, pin) - case p.Status == api.TrackerStatusPinned && !allocatedHere: - logger.Debugf("StateSync: Tracking %s as remote (currently local)", p.Cid) - err = c.tracker.Track(ctx, pin) - } - - return err -} - // StatusAll returns the GlobalPinInfo for all tracked Cids in all peers. // If an error happens, the slice will contain as much information as // could be fetched from other peers. @@ -1122,48 +1065,6 @@ func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) *api.PinInfo { return c.tracker.Status(ctx, h) } -// SyncAll triggers SyncAllLocal() operations in all cluster peers, making sure -// that the state of tracked items matches the state reported by the IPFS daemon -// and returning the results as GlobalPinInfo. If an error happens, the slice -// will contain as much information as could be fetched from the peers. -func (c *Cluster) SyncAll(ctx context.Context) ([]*api.GlobalPinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/SyncAll") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - return c.globalPinInfoSlice(ctx, "Cluster", "SyncAllLocal") -} - -// SyncAllLocal makes sure that the current state for all tracked items -// in this peer matches the state reported by the IPFS daemon. -// -// SyncAllLocal returns the list of PinInfo that where updated because of -// the operation, along with those in error states. -func (c *Cluster) SyncAllLocal(ctx context.Context) ([]*api.PinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/SyncAllLocal") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - syncedItems, err := c.tracker.SyncAll(ctx) - // Despite errors, tracker provides synced items that we can provide. - // They encapsulate the error. - if err != nil { - logger.Error("tracker.Sync() returned with error: ", err) - logger.Error("Is the ipfs daemon running?") - } - return syncedItems, err -} - -// Sync triggers a SyncLocal() operation for a given Cid. -// in all cluster peers. -func (c *Cluster) Sync(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/Sync") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - return c.globalPinInfoCid(ctx, "Cluster", "SyncLocal", h) -} - // used for RecoverLocal and SyncLocal. func (c *Cluster) localPinInfoOp( ctx context.Context, @@ -1191,17 +1092,6 @@ func (c *Cluster) localPinInfoOp( } -// SyncLocal performs a local sync operation for the given Cid. This will -// tell the tracker to verify the status of the Cid against the IPFS daemon. -// It returns the updated PinInfo for the Cid. -func (c *Cluster) SyncLocal(ctx context.Context, h cid.Cid) (pInfo *api.PinInfo, err error) { - _, span := trace.StartSpan(ctx, "cluster/SyncLocal") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - return c.localPinInfoOp(ctx, h, c.tracker.Sync) -} - // RecoverAll triggers a RecoverAllLocal operation on all peer. func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/RecoverAll") diff --git a/cluster_config.go b/cluster_config.go index 53b63b34..c1057663 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -29,7 +29,6 @@ var DefaultListenAddrs = []string{"/ip4/0.0.0.0/tcp/9096", "/ip4/0.0.0.0/udp/909 const ( DefaultEnableRelayHop = true DefaultStateSyncInterval = 600 * time.Second - DefaultIPFSSyncInterval = 130 * time.Second DefaultPinRecoverInterval = 1 * time.Hour DefaultMonitorPingInterval = 15 * time.Second DefaultPeerWatchInterval = 5 * time.Second @@ -94,14 +93,6 @@ type Config struct { // consistency, increase with larger states. StateSyncInterval time.Duration - // Time between syncs of the local state and - // the state of the ipfs daemon. This ensures that cluster - // provides the right status for tracked items (for example - // to detect that a pin has been removed. Reduce for faster - // consistency, increase when the number of pinned items is very - // large. - IPFSSyncInterval time.Duration - // Time between automatic runs of the "recover" operation // which will retry to pin/unpin items in error state. PinRecoverInterval time.Duration @@ -177,7 +168,6 @@ type configJSON struct { EnableRelayHop bool `json:"enable_relay_hop"` ConnectionManager *connMgrConfigJSON `json:"connection_manager"` StateSyncInterval string `json:"state_sync_interval"` - IPFSSyncInterval string `json:"ipfs_sync_interval"` PinRecoverInterval string `json:"pin_recover_interval"` ReplicationFactorMin int `json:"replication_factor_min"` ReplicationFactorMax int `json:"replication_factor_max"` @@ -267,10 +257,6 @@ func (cfg *Config) Validate() error { return errors.New("cluster.state_sync_interval is invalid") } - if cfg.IPFSSyncInterval <= 0 { - return errors.New("cluster.ipfs_sync_interval is invalid") - } - if cfg.PinRecoverInterval <= 0 { return errors.New("cluster.pin_recover_interval is invalid") } @@ -367,7 +353,6 @@ func (cfg *Config) setDefaults() { } cfg.LeaveOnShutdown = DefaultLeaveOnShutdown cfg.StateSyncInterval = DefaultStateSyncInterval - cfg.IPFSSyncInterval = DefaultIPFSSyncInterval cfg.PinRecoverInterval = DefaultPinRecoverInterval cfg.ReplicationFactorMin = DefaultReplicationFactor cfg.ReplicationFactorMax = DefaultReplicationFactor @@ -441,7 +426,6 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error { err = config.ParseDurations("cluster", &config.DurationOpt{Duration: jcfg.StateSyncInterval, Dst: &cfg.StateSyncInterval, Name: "state_sync_interval"}, - &config.DurationOpt{Duration: jcfg.IPFSSyncInterval, Dst: &cfg.IPFSSyncInterval, Name: "ipfs_sync_interval"}, &config.DurationOpt{Duration: jcfg.PinRecoverInterval, Dst: &cfg.PinRecoverInterval, Name: "pin_recover_interval"}, &config.DurationOpt{Duration: jcfg.MonitorPingInterval, Dst: &cfg.MonitorPingInterval, Name: "monitor_ping_interval"}, &config.DurationOpt{Duration: jcfg.PeerWatchInterval, Dst: &cfg.PeerWatchInterval, Name: "peer_watch_interval"}, @@ -507,7 +491,6 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) { GracePeriod: cfg.ConnMgr.GracePeriod.String(), } jcfg.StateSyncInterval = cfg.StateSyncInterval.String() - jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String() jcfg.PinRecoverInterval = cfg.PinRecoverInterval.String() jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String() jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String() diff --git a/cluster_config_test.go b/cluster_config_test.go index e30798f9..5e03576d 100644 --- a/cluster_config_test.go +++ b/cluster_config_test.go @@ -24,7 +24,6 @@ var ccfgTestJSON = []byte(` "/ip4/127.0.0.1/udp/10000/quic" ], "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "pin_recover_interval": "1m", "replication_factor_min": 5, "replication_factor_max": 5, diff --git a/cluster_test.go b/cluster_test.go index 7fdafd2a..16c6ee91 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/ipfs-cluster/config" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" + "github.com/ipfs/ipfs-cluster/pintracker/stateless" "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/version" @@ -148,7 +149,7 @@ type mockTracer struct { } func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) { - ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() + ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() ctx := context.Background() host, pubsub, dht := createHost(t, ident.PrivateKey, clusterCfg.Secret, clusterCfg.ListenAddr) @@ -162,11 +163,12 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke api := &mockAPI{} proxy := &mockProxy{} ipfs := &mockConnector{} - tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) + tracer := &mockTracer{} store := makeStore(t, badgerCfg) cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, false, crdtCfg) + tracker := stateless.New(statelesstrackerCfg, ident.ID, clusterCfg.Peername, cons.State) var peersF func(context.Context) ([]peer.ID, error) if consensus == "raft" { diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index f9e86a5f..52a81d9b 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -779,9 +779,6 @@ Cluster, including which member is pinning them and any errors. If a CID is provided, the status will be only fetched for a single item. Metadata CIDs are included in the status response -The status of a CID may not be accurate. A manual sync can be triggered -with "sync". - When the --local flag is passed, it will only fetch the status from the contacted cluster peer. By default, status will be fetched from all peers. @@ -817,42 +814,6 @@ separated list). The following are valid status values: return nil }, }, - { - Name: "sync", - Usage: "Sync status of tracked items", - Description: ` -This command asks Cluster peers to verify that the current status of tracked -CIDs is accurate by triggering queries to the IPFS daemons that pin them. -If a CID is provided, the sync and recover operations will be limited to -that single item. - -Unless providing a specific CID, the command will output only items which -have changed status because of the sync or are in error state in some node, -therefore, the output should be empty if no operations were performed. - -CIDs in error state may be manually recovered with "recover". - -When the --local flag is passed, it will only trigger sync -operations on the contacted peer. By default, all peers will sync. -`, - ArgsUsage: "[CID]", - Flags: []cli.Flag{ - localFlag(), - }, - Action: func(c *cli.Context) error { - cidStr := c.Args().First() - if cidStr != "" { - ci, err := cid.Decode(cidStr) - checkErr("parsing cid", err) - resp, cerr := globalClient.Sync(ctx, ci, c.Bool("local")) - formatResponse(c, resp, cerr) - } else { - resp, cerr := globalClient.SyncAll(ctx, c.Bool("local")) - formatResponse(c, resp, cerr) - } - return nil - }, - }, { Name: "recover", Usage: "Recover tracked items in error state", diff --git a/cmd/ipfs-cluster-service/daemon.go b/cmd/ipfs-cluster-service/daemon.go index fbedbbbe..df343951 100644 --- a/cmd/ipfs-cluster-service/daemon.go +++ b/cmd/ipfs-cluster-service/daemon.go @@ -17,7 +17,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" "go.opencensus.io/tag" @@ -150,14 +149,6 @@ func createCluster( connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp) checkErr("creating IPFS Connector component", err) - tracker := setupPinTracker( - c.String("pintracker"), - host, - cfgs.Maptracker, - cfgs.Statelesstracker, - cfgs.Cluster.Peername, - ) - informer, err := disk.NewInformer(cfgs.Diskinf) checkErr("creating disk informer", err) alloc := descendalloc.NewAllocator() @@ -190,6 +181,9 @@ func createCluster( peersF = cons.Peers } + tracker := stateless.New(cfgs.Statelesstracker, host.ID(), cfgs.Cluster.Peername, cons.State) + logger.Debug("stateless pintracker loaded") + mon, err := pubsubmon.New(ctx, cfgs.Pubsubmon, pubsub, peersF) if err != nil { store.Close() @@ -225,29 +219,6 @@ func bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, bootstraps []m } } -func setupPinTracker( - name string, - h host.Host, - mapCfg *maptracker.Config, - statelessCfg *stateless.Config, - peerName string, -) ipfscluster.PinTracker { - switch name { - case "map": - ptrk := maptracker.NewMapPinTracker(mapCfg, h.ID(), peerName) - logger.Debug("map pintracker loaded") - return ptrk - case "stateless": - ptrk := stateless.New(statelessCfg, h.ID(), peerName) - logger.Debug("stateless pintracker loaded") - return ptrk - default: - err := errors.New("unknown pintracker type") - checkErr("", err) - return nil - } -} - func setupDatastore(cfgHelper *cmdutils.ConfigHelper) ds.Datastore { stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgHelper.Configs()) checkErr("creating state manager", err) diff --git a/cmd/ipfs-cluster-service/main.go b/cmd/ipfs-cluster-service/main.go index 68c9cbad..8dd3ee2e 100644 --- a/cmd/ipfs-cluster-service/main.go +++ b/cmd/ipfs-cluster-service/main.go @@ -29,9 +29,8 @@ const programName = "ipfs-cluster-service" // flag defaults const ( - defaultPinTracker = "map" - defaultLogLevel = "info" - defaultConsensus = "crdt" + defaultLogLevel = "info" + defaultConsensus = "crdt" ) const ( @@ -413,12 +412,6 @@ the peer IDs in the given multiaddresses. Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"", Hidden: true, }, - cli.StringFlag{ - Name: "pintracker", - Value: defaultPinTracker, - Hidden: true, - Usage: "pintracker to use [map,stateless].", - }, cli.BoolFlag{ Name: "stats", Usage: "enable stats collection", diff --git a/cmdutils/configs.go b/cmdutils/configs.go index a6483ed7..b802b8c1 100644 --- a/cmdutils/configs.go +++ b/cmdutils/configs.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" ) @@ -31,7 +30,6 @@ type Configs struct { Ipfshttp *ipfshttp.Config Raft *raft.Config Crdt *crdt.Config - Maptracker *maptracker.Config Statelesstracker *stateless.Config Pubsubmon *pubsubmon.Config Diskinf *disk.Config @@ -183,7 +181,6 @@ func (ch *ConfigHelper) init() { Ipfshttp: &ipfshttp.Config{}, Raft: &raft.Config{}, Crdt: &crdt.Config{}, - Maptracker: &maptracker.Config{}, Statelesstracker: &stateless.Config{}, Pubsubmon: &pubsubmon.Config{}, Diskinf: &disk.Config{}, @@ -195,7 +192,6 @@ func (ch *ConfigHelper) init() { man.RegisterComponent(config.API, cfgs.Restapi) man.RegisterComponent(config.API, cfgs.Ipfsproxy) man.RegisterComponent(config.IPFSConn, cfgs.Ipfshttp) - man.RegisterComponent(config.PinTracker, cfgs.Maptracker) man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker) man.RegisterComponent(config.Monitor, cfgs.Pubsubmon) man.RegisterComponent(config.Informer, cfgs.Diskinf) diff --git a/config_test.go b/config_test.go index c6746b7d..baf3d5c3 100644 --- a/config_test.go +++ b/config_test.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" ) @@ -32,7 +31,6 @@ var testingClusterCfg = []byte(`{ "grace_period": "2m0s" }, "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "1s", "peer_watch_interval": "1s", @@ -130,8 +128,8 @@ var testingTracerCfg = []byte(`{ "service_name": "cluster-daemon" }`) -func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { - identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs() +func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { + identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs() identity.LoadJSON(testingIdentity) clusterCfg.LoadJSON(testingClusterCfg) apiCfg.LoadJSON(testingAPICfg) @@ -140,16 +138,15 @@ func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Confi badgerCfg.LoadJSON(testingBadgerCfg) raftCfg.LoadJSON(testingRaftCfg) crdtCfg.LoadJSON(testingCrdtCfg) - maptrackerCfg.LoadJSON(testingTrackerCfg) statelesstrkrCfg.LoadJSON(testingTrackerCfg) pubsubmonCfg.LoadJSON(testingMonCfg) diskInfCfg.LoadJSON(testingDiskInfCfg) tracingCfg.LoadJSON(testingTracerCfg) - return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg + return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg } -func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { +func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { identity := &config.Identity{} clusterCfg := &Config{} apiCfg := &rest.Config{} @@ -158,12 +155,11 @@ func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy. badgerCfg := &badger.Config{} raftCfg := &raft.Config{} crdtCfg := &crdt.Config{} - maptrackerCfg := &maptracker.Config{} statelessCfg := &stateless.Config{} pubsubmonCfg := &pubsubmon.Config{} diskInfCfg := &disk.Config{} tracingCfg := &observations.TracingConfig{} - return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg + return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg } // func TestConfigDefault(t *testing.T) { diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index f12b0a91..498d48b7 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -475,7 +475,7 @@ func (css *Consensus) Leader(ctx context.Context) (peer.ID, error) { func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error) { batching, ok := store.(ds.Batching) if !ok { - return nil, errors.New("must provide a Bathing datastore") + return nil, errors.New("must provide a Batching datastore") } opts := crdt.DefaultOptions() opts.Logger = logger diff --git a/ipfscluster.go b/ipfscluster.go index fa01b3fc..a779c945 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -123,12 +123,6 @@ type PinTracker interface { StatusAll(context.Context) []*api.PinInfo // Status returns the local status of a given Cid. Status(context.Context, cid.Cid) *api.PinInfo - // SyncAll makes sure that all tracked Cids reflect the real IPFS status. - // It returns the list of pins which were updated by the call. - SyncAll(context.Context) ([]*api.PinInfo, error) - // Sync makes sure that the Cid status reflect the real IPFS status. - // It returns the local status of the Cid. - Sync(context.Context, cid.Cid) (*api.PinInfo, error) // RecoverAll calls Recover() for all pins tracked. RecoverAll(context.Context) ([]*api.PinInfo, error) // Recover retriggers a Pin/Unpin operation in a Cids with error status. diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 82432253..79d3208c 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -28,7 +28,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/test" @@ -174,7 +173,7 @@ func createComponents( peername := fmt.Sprintf("peer_%d", i) - ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs() + ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs() ident.ID = host.ID() ident.PrivateKey = host.Peerstore().PrivKey(host.ID()) @@ -208,8 +207,6 @@ func createComponents( t.Fatal(err) } - tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) - alloc := descendalloc.NewAllocator() inf, err := disk.NewInformer(diskInfCfg) if err != nil { @@ -218,6 +215,7 @@ func createComponents( store := makeStore(t, badgerCfg) cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, staging, crdtCfg) + tracker := stateless.New(statelesstrackerCfg, ident.ID, clusterCfg.Peername, cons.State) var peersF func(context.Context) ([]peer.ID, error) if consensus == "raft" { @@ -268,19 +266,6 @@ func makeConsensus(t *testing.T, store ds.Datastore, h host.Host, psub *pubsub.P } } -func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg *stateless.Config, peerName string) PinTracker { - var ptrkr PinTracker - switch ptracker { - case "map": - ptrkr = maptracker.NewMapPinTracker(mptCfg, pid, peerName) - case "stateless": - ptrkr = stateless.New(sptCfg, pid, peerName) - default: - panic("bad pintracker") - } - return ptrkr -} - func createCluster(t *testing.T, host host.Host, dht *dht.IpfsDHT, clusterCfg *Config, store ds.Datastore, consensus Consensus, apis []API, ipfs IPFSConnector, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer, tracer Tracer) *Cluster { cl, err := NewCluster(context.Background(), host, dht, clusterCfg, store, consensus, apis, ipfs, tracker, mon, alloc, []Informer{inf}, tracer) if err != nil { @@ -914,162 +899,6 @@ func TestClustersStatusAllWithErrors(t *testing.T) { runF(t, clusters, f) } -func TestClustersSyncAllLocal(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{}) // This cid always fails - clusters[0].Pin(ctx, test.Cid2, api.PinOptions{}) - pinDelay() - pinDelay() - - f := func(t *testing.T, c *Cluster) { - // Sync bad ID - infos, err := c.SyncAllLocal(ctx) - if err != nil { - // LocalSync() is asynchronous and should not show an - // error even if Recover() fails. - t.Error(err) - } - if len(infos) != 1 { - t.Fatalf("expected 1 elem slice, got = %d", len(infos)) - } - // Last-known state may still be pinning - if infos[0].Status != api.TrackerStatusPinError && infos[0].Status != api.TrackerStatusPinning { - t.Errorf("element should be in Pinning or PinError state, got = %v", infos[0].Status) - } - } - // Test Local syncs - runF(t, clusters, f) -} - -func TestClustersSyncLocal(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - h := test.ErrorCid - h2 := test.Cid2 - clusters[0].Pin(ctx, h, api.PinOptions{}) - clusters[0].Pin(ctx, h2, api.PinOptions{}) - pinDelay() - pinDelay() - - f := func(t *testing.T, c *Cluster) { - info, err := c.SyncLocal(ctx, h) - if err != nil { - t.Error(err) - } - if info.Status != api.TrackerStatusPinError && info.Status != api.TrackerStatusPinning { - t.Errorf("element is %s and not PinError", info.Status) - } - - // Sync good ID - info, err = c.SyncLocal(ctx, h2) - if err != nil { - t.Error(err) - } - if info.Status != api.TrackerStatusPinned { - t.Error("element should be in Pinned state") - } - } - // Test Local syncs - runF(t, clusters, f) -} - -func TestClustersSyncAll(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{}) - clusters[0].Pin(ctx, test.Cid2, api.PinOptions{}) - pinDelay() - pinDelay() - - j := rand.Intn(nClusters) // choose a random cluster peer - ginfos, err := clusters[j].SyncAll(ctx) - if err != nil { - t.Fatal(err) - } - if len(ginfos) != 1 { - t.Fatalf("expected globalsync to have 1 elements, got = %d", len(ginfos)) - } - if !ginfos[0].Cid.Equals(test.ErrorCid) { - t.Error("expected globalsync to have problems with test.ErrorCid") - } - for _, c := range clusters { - inf, ok := ginfos[0].PeerMap[peer.IDB58Encode(c.host.ID())] - if !ok { - t.Fatal("GlobalPinInfo should have this cluster") - } - if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning { - t.Error("should be PinError in all peers") - } - } -} - -func TestClustersSync(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - h := test.ErrorCid // This cid always fails - h2 := test.Cid2 - clusters[0].Pin(ctx, h, api.PinOptions{}) - clusters[0].Pin(ctx, h2, api.PinOptions{}) - pinDelay() - pinDelay() - - j := rand.Intn(nClusters) - ginfo, err := clusters[j].Sync(ctx, h) - if err != nil { - // we always attempt to return a valid response - // with errors contained in GlobalPinInfo - t.Fatal("did not expect an error") - } - pinfo, ok := ginfo.PeerMap[peer.IDB58Encode(clusters[j].host.ID())] - if !ok { - t.Fatal("should have info for this host") - } - if pinfo.Error == "" { - t.Error("pinInfo error should not be empty") - } - - if !ginfo.Cid.Equals(h) { - t.Error("GlobalPinInfo should be for test.ErrorCid") - } - - for _, c := range clusters { - inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())] - if !ok { - t.Logf("%+v", ginfo) - t.Fatal("GlobalPinInfo should not be empty for this host") - } - - if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning { - t.Error("should be PinError or Pinning in all peers") - } - } - - // Test with a good Cid - j = rand.Intn(nClusters) - ginfo, err = clusters[j].Sync(ctx, h2) - if err != nil { - t.Fatal(err) - } - if !ginfo.Cid.Equals(h2) { - t.Error("GlobalPinInfo should be for testrCid2") - } - - for _, c := range clusters { - inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())] - if !ok { - t.Fatal("GlobalPinInfo should have this cluster") - } - if inf.Status != api.TrackerStatusPinned { - t.Error("the GlobalPinInfo should show Pinned in all peers") - } - } -} - func TestClustersRecoverLocal(t *testing.T) { ctx := context.Background() clusters, mock := createClusters(t) @@ -1098,10 +927,7 @@ func TestClustersRecoverLocal(t *testing.T) { } // Recover good ID - info, err = c.SyncLocal(ctx, h2) - if err != nil { - t.Error(err) - } + info, err = c.RecoverLocal(ctx, h2) if info.Status != api.TrackerStatusPinned { t.Error("element should be in Pinned state") } @@ -2175,19 +2001,6 @@ func TestClustersFollowerMode(t *testing.T) { } }) - t.Run("follower syncs itself", func(t *testing.T) { - gpis, err := clusters[1].SyncAll(ctx) - if err != nil { - t.Error("sync should work") - } - if len(gpis) != 1 { - t.Fatal("globalPinInfo should have 1 pins (in error)") - } - if len(gpis[0].PeerMap) != 1 { - t.Fatal("globalPinInfo[0] should only have one peer") - } - }) - t.Run("follower status itself only", func(t *testing.T) { gpi, err := clusters[1].Status(ctx, test.Cid1) if err != nil { diff --git a/pintracker/maptracker/config.go b/pintracker/maptracker/config.go deleted file mode 100644 index 8430e341..00000000 --- a/pintracker/maptracker/config.go +++ /dev/null @@ -1,115 +0,0 @@ -package maptracker - -import ( - "encoding/json" - "errors" - - "github.com/kelseyhightower/envconfig" - - "github.com/ipfs/ipfs-cluster/config" -) - -const configKey = "maptracker" -const envConfigKey = "cluster_maptracker" - -// Default values for this Config. -const ( - DefaultMaxPinQueueSize = 1000000 - DefaultConcurrentPins = 10 -) - -// Config allows to initialize a Monitor and customize some parameters. -type Config struct { - config.Saver - - // If higher, they will automatically marked with an error. - MaxPinQueueSize int - // ConcurrentPins specifies how many pin requests can be sent to the ipfs - // daemon in parallel. If the pinning method is "refs", it might increase - // speed. Unpin requests are always processed one by one. - ConcurrentPins int -} - -type jsonConfig struct { - MaxPinQueueSize int `json:"max_pin_queue_size,omitempty"` - ConcurrentPins int `json:"concurrent_pins"` -} - -// ConfigKey provides a human-friendly identifier for this type of Config. -func (cfg *Config) ConfigKey() string { - return configKey -} - -// Default sets the fields of this Config to sensible values. -func (cfg *Config) Default() error { - cfg.MaxPinQueueSize = DefaultMaxPinQueueSize - cfg.ConcurrentPins = DefaultConcurrentPins - return nil -} - -// ApplyEnvVars fills in any Config fields found -// as environment variables. -func (cfg *Config) ApplyEnvVars() error { - jcfg := cfg.toJSONConfig() - - err := envconfig.Process(envConfigKey, jcfg) - if err != nil { - return err - } - - return cfg.applyJSONConfig(jcfg) -} - -// Validate checks that the fields of this Config have working values, -// at least in appearance. -func (cfg *Config) Validate() error { - if cfg.MaxPinQueueSize <= 0 { - return errors.New("maptracker.max_pin_queue_size too low") - } - - if cfg.ConcurrentPins <= 0 { - return errors.New("maptracker.concurrent_pins is too low") - } - - return nil -} - -// LoadJSON sets the fields of this Config to the values defined by the JSON -// representation of it, as generated by ToJSON. -func (cfg *Config) LoadJSON(raw []byte) error { - jcfg := &jsonConfig{} - err := json.Unmarshal(raw, jcfg) - if err != nil { - logger.Error("Error unmarshaling maptracker config") - return err - } - - cfg.Default() - - return cfg.applyJSONConfig(jcfg) -} - -func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { - config.SetIfNotDefault(jcfg.MaxPinQueueSize, &cfg.MaxPinQueueSize) - config.SetIfNotDefault(jcfg.ConcurrentPins, &cfg.ConcurrentPins) - - return cfg.Validate() -} - -// ToJSON generates a human-friendly JSON representation of this Config. -func (cfg *Config) ToJSON() ([]byte, error) { - jcfg := cfg.toJSONConfig() - - return config.DefaultJSONMarshal(jcfg) -} - -func (cfg *Config) toJSONConfig() *jsonConfig { - jCfg := &jsonConfig{ - ConcurrentPins: cfg.ConcurrentPins, - } - if cfg.MaxPinQueueSize != DefaultMaxPinQueueSize { - jCfg.MaxPinQueueSize = cfg.MaxPinQueueSize - } - - return jCfg -} diff --git a/pintracker/maptracker/config_test.go b/pintracker/maptracker/config_test.go deleted file mode 100644 index 9a918318..00000000 --- a/pintracker/maptracker/config_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package maptracker - -import ( - "encoding/json" - "os" - "testing" -) - -var cfgJSON = []byte(` -{ - "max_pin_queue_size": 4092, - "concurrent_pins": 2 -} -`) - -func TestLoadJSON(t *testing.T) { - cfg := &Config{} - err := cfg.LoadJSON(cfgJSON) - if err != nil { - t.Fatal(err) - } - - j := &jsonConfig{} - - json.Unmarshal(cfgJSON, j) - j.ConcurrentPins = 10 - tst, _ := json.Marshal(j) - err = cfg.LoadJSON(tst) - if err != nil { - t.Error("did not expect an error") - } - if cfg.ConcurrentPins != 10 { - t.Error("expected 10 concurrent pins") - } -} - -func TestToJSON(t *testing.T) { - cfg := &Config{} - cfg.LoadJSON(cfgJSON) - newjson, err := cfg.ToJSON() - if err != nil { - t.Fatal(err) - } - cfg = &Config{} - err = cfg.LoadJSON(newjson) - if err != nil { - t.Fatal(err) - } -} - -func TestDefault(t *testing.T) { - cfg := &Config{} - cfg.Default() - if cfg.Validate() != nil { - t.Fatal("error validating") - } - - cfg.ConcurrentPins = -2 - if cfg.Validate() == nil { - t.Fatal("expected error validating") - } -} - -func TestApplyEnvVars(t *testing.T) { - os.Setenv("CLUSTER_MAPTRACKER_CONCURRENTPINS", "22") - cfg := &Config{} - cfg.ApplyEnvVars() - - if cfg.ConcurrentPins != 22 { - t.Fatal("failed to override concurrent_pins with env var") - } -} diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go deleted file mode 100644 index 5523cc37..00000000 --- a/pintracker/maptracker/maptracker.go +++ /dev/null @@ -1,452 +0,0 @@ -// Package maptracker implements a PinTracker component for IPFS Cluster. It -// uses a map to keep track of the state of tracked pins. -package maptracker - -import ( - "context" - "errors" - "sync" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/pintracker/optracker" - "github.com/ipfs/ipfs-cluster/pintracker/util" - - cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-core/peer" - rpc "github.com/libp2p/go-libp2p-gorpc" - - "go.opencensus.io/trace" -) - -var logger = logging.Logger("pintracker") - -var ( - errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS") -) - -// MapPinTracker is a PinTracker implementation which uses a Go map -// to store the status of the tracked Cids. This component is thread-safe. -type MapPinTracker struct { - config *Config - - optracker *optracker.OperationTracker - - ctx context.Context - cancel func() - - rpcClient *rpc.Client - rpcReady chan struct{} - - peerID peer.ID - pinCh chan *optracker.Operation - unpinCh chan *optracker.Operation - - shutdownLock sync.Mutex - shutdown bool - wg sync.WaitGroup -} - -// NewMapPinTracker returns a new object which has been correctly -// initialized with the given configuration. -func NewMapPinTracker(cfg *Config, pid peer.ID, peerName string) *MapPinTracker { - ctx, cancel := context.WithCancel(context.Background()) - - mpt := &MapPinTracker{ - ctx: ctx, - cancel: cancel, - config: cfg, - optracker: optracker.NewOperationTracker(ctx, pid, peerName), - rpcReady: make(chan struct{}, 1), - peerID: pid, - pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), - unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), - } - - for i := 0; i < mpt.config.ConcurrentPins; i++ { - go mpt.opWorker(ctx, mpt.pin, mpt.pinCh) - } - go mpt.opWorker(ctx, mpt.unpin, mpt.unpinCh) - return mpt -} - -// receives a pin Function (pin or unpin) and a channel. -// Used for both pinning and unpinning -func (mpt *MapPinTracker) opWorker(ctx context.Context, pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) { - for { - select { - case op := <-opChan: - if op.Cancelled() { - // operation was cancelled. Move on. - // This saves some time, but not 100% needed. - continue - } - op.SetPhase(optracker.PhaseInProgress) - err := pinF(op) // call pin/unpin - if err != nil { - if op.Cancelled() { - // there was an error because - // we were cancelled. Move on. - continue - } - op.SetError(err) - op.Cancel() - continue - } - op.SetPhase(optracker.PhaseDone) - op.Cancel() - - // We keep all pinned things in the tracker, - // only clean unpinned things. - if op.Type() == optracker.OperationUnpin { - mpt.optracker.Clean(ctx, op) - } - case <-mpt.ctx.Done(): - return - } - } -} - -// Shutdown finishes the services provided by the MapPinTracker and cancels -// any active context. -func (mpt *MapPinTracker) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/Shutdown") - defer span.End() - - mpt.shutdownLock.Lock() - defer mpt.shutdownLock.Unlock() - - if mpt.shutdown { - logger.Debug("already shutdown") - return nil - } - - logger.Info("stopping MapPinTracker") - mpt.cancel() - close(mpt.rpcReady) - mpt.wg.Wait() - mpt.shutdown = true - return nil -} - -func (mpt *MapPinTracker) pin(op *optracker.Operation) error { - ctx, span := trace.StartSpan(op.Context(), "tracker/map/pin") - defer span.End() - - logger.Debugf("issuing pin call for %s", op.Cid()) - err := mpt.rpcClient.CallContext( - ctx, - "", - "IPFSConnector", - "Pin", - op.Pin(), - &struct{}{}, - ) - if err != nil { - return err - } - return nil -} - -func (mpt *MapPinTracker) unpin(op *optracker.Operation) error { - ctx, span := trace.StartSpan(op.Context(), "tracker/map/unpin") - defer span.End() - - logger.Debugf("issuing unpin call for %s", op.Cid()) - err := mpt.rpcClient.CallContext( - ctx, - "", - "IPFSConnector", - "Unpin", - op.Pin(), - &struct{}{}, - ) - if err != nil { - return err - } - return nil -} - -// puts a new operation on the queue, unless ongoing exists -func (mpt *MapPinTracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.OperationType, ch chan *optracker.Operation) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/enqueue") - defer span.End() - - op := mpt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued) - if op == nil { - return nil // ongoing pin operation. - } - - select { - case ch <- op: - default: - err := util.ErrFullQueue - op.SetError(err) - op.Cancel() - logger.Error(err.Error()) - return err - } - return nil -} - -// Track tells the MapPinTracker to start managing a Cid, -// possibly triggering Pin operations on the IPFS daemon. -func (mpt *MapPinTracker) Track(ctx context.Context, c *api.Pin) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/Track") - defer span.End() - - logger.Debugf("tracking %s", c.Cid) - - // Sharded pins are never pinned. A sharded pin cannot turn into - // something else or viceversa like it happens with Remote pins so - // we just track them. - if c.Type == api.MetaType { - mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone) - return nil - } - - // Trigger unpin whenever something remote is tracked - // Note, IPFSConn checks with pin/ls before triggering - // pin/rm, so this actually does not always trigger unpin - // to ipfs. - if util.IsRemotePin(c, mpt.peerID) { - op := mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationRemote, optracker.PhaseInProgress) - if op == nil { - return nil // Ongoing operationRemote / PhaseInProgress - } - err := mpt.unpin(op) // unpin all the time, even if not pinned - op.Cancel() - if err != nil { - op.SetError(err) - } else { - op.SetPhase(optracker.PhaseDone) - } - return nil - } - - return mpt.enqueue(ctx, c, optracker.OperationPin, mpt.pinCh) -} - -// Untrack tells the MapPinTracker to stop managing a Cid. -// If the Cid is pinned locally, it will be unpinned. -func (mpt *MapPinTracker) Untrack(ctx context.Context, c cid.Cid) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/Untrack") - defer span.End() - - logger.Infof("untracking %s", c) - return mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh) -} - -// Status returns information for a Cid tracked by this -// MapPinTracker. -func (mpt *MapPinTracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Status") - defer span.End() - - return mpt.optracker.Get(ctx, c) -} - -// StatusAll returns information for all Cids tracked by this -// MapPinTracker. -func (mpt *MapPinTracker) StatusAll(ctx context.Context) []*api.PinInfo { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/StatusAll") - defer span.End() - - return mpt.optracker.GetAll(ctx) -} - -// Sync verifies that the status of a Cid matches that of -// the IPFS daemon. If not, it will be transitioned -// to PinError or UnpinError. -// -// Sync returns the updated local status for the given Cid. -// Pins in error states can be recovered with Recover(). -// An error is returned if we are unable to contact -// the IPFS daemon. -func (mpt *MapPinTracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Sync") - defer span.End() - - var ips api.IPFSPinStatus - err := mpt.rpcClient.Call( - "", - "IPFSConnector", - "PinLsCid", - c, - &ips, - ) - - if err != nil { - mpt.optracker.SetError(ctx, c, err) - return mpt.optracker.Get(ctx, c), nil - } - - return mpt.syncStatus(ctx, c, ips), nil -} - -// SyncAll verifies that the statuses of all tracked Cids match the -// one reported by the IPFS daemon. If not, they will be transitioned -// to PinError or UnpinError. -// -// SyncAll returns the list of local status for all tracked Cids which -// were updated or have errors. Cids in error states can be recovered -// with Recover(). -// An error is returned if we are unable to contact the IPFS daemon. -func (mpt *MapPinTracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/SyncAll") - defer span.End() - - var ipsMap map[string]api.IPFSPinStatus - var results []*api.PinInfo - err := mpt.rpcClient.Call( - "", - "IPFSConnector", - "PinLs", - "recursive", - &ipsMap, - ) - if err != nil { - // set pinning or unpinning ops to error, since we can't - // verify them - pInfos := mpt.optracker.GetAll(ctx) - for _, pInfo := range pInfos { - op, _ := optracker.TrackerStatusToOperationPhase(pInfo.Status) - if op == optracker.OperationPin || op == optracker.OperationUnpin { - mpt.optracker.SetError(ctx, pInfo.Cid, err) - results = append(results, mpt.optracker.Get(ctx, pInfo.Cid)) - } else { - results = append(results, pInfo) - } - } - return results, nil - } - - status := mpt.StatusAll(ctx) - for _, pInfoOrig := range status { - var pInfoNew *api.PinInfo - c := pInfoOrig.Cid - ips, ok := ipsMap[c.String()] - if !ok { - pInfoNew = mpt.syncStatus(ctx, c, api.IPFSPinStatusUnpinned) - } else { - pInfoNew = mpt.syncStatus(ctx, c, ips) - } - - if pInfoOrig.Status != pInfoNew.Status || - pInfoNew.Status == api.TrackerStatusUnpinError || - pInfoNew.Status == api.TrackerStatusPinError { - results = append(results, pInfoNew) - } - } - return results, nil -} - -func (mpt *MapPinTracker) syncStatus(ctx context.Context, c cid.Cid, ips api.IPFSPinStatus) *api.PinInfo { - status, ok := mpt.optracker.Status(ctx, c) - if !ok { - status = api.TrackerStatusUnpinned - } - - // TODO(hector): for sharding, we may need to check that a shard - // is pinned to the right depth. For now, we assumed that if it's pinned - // in some way, then it must be right (including direct). - pinned := func(i api.IPFSPinStatus) bool { - switch i { - case api.IPFSPinStatusRecursive: - return i.IsPinned(-1) - case api.IPFSPinStatusDirect: - return i.IsPinned(0) - default: - return i.IsPinned(1) // Pinned with depth 1 or more. - } - } - - if pinned(ips) { - switch status { - case api.TrackerStatusPinError: - // If an item that we wanted to pin is pinned, we mark it so - mpt.optracker.TrackNewOperation( - ctx, - api.PinCid(c), - optracker.OperationPin, - optracker.PhaseDone, - ) - default: - // 1. Unpinning phases - // 2. Pinned in ipfs but we are not tracking - // -> do nothing - } - } else { - switch status { - case api.TrackerStatusUnpinError: - // clean - op := mpt.optracker.TrackNewOperation( - ctx, - api.PinCid(c), - optracker.OperationUnpin, - optracker.PhaseDone, - ) - if op != nil { - mpt.optracker.Clean(ctx, op) - } - - case api.TrackerStatusPinned: - // not pinned in IPFS but we think it should be: mark as error - mpt.optracker.SetError(ctx, c, errUnpinned) - default: - // 1. Pinning phases - // -> do nothing - } - } - return mpt.optracker.Get(ctx, c) -} - -// Recover will re-queue a Cid in error state for the failed operation, -// possibly retriggering an IPFS pinning operation. -func (mpt *MapPinTracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Recover") - defer span.End() - - pInfo := mpt.optracker.Get(ctx, c) - var err error - - switch pInfo.Status { - case api.TrackerStatusPinError: - logger.Infof("Restarting pin operation for %s", c) - err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationPin, mpt.pinCh) - case api.TrackerStatusUnpinError: - logger.Infof("Restarting unpin operation for %s", c) - err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh) - } - return mpt.optracker.Get(ctx, c), err -} - -// RecoverAll attempts to recover all items tracked by this peer. -func (mpt *MapPinTracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/RecoverAll") - defer span.End() - - pInfos := mpt.optracker.GetAll(ctx) - var results []*api.PinInfo - for _, pInfo := range pInfos { - res, err := mpt.Recover(ctx, pInfo.Cid) - results = append(results, res) - if err != nil { - return results, err - } - } - return results, nil -} - -// SetClient makes the MapPinTracker ready to perform RPC requests to -// other components. -func (mpt *MapPinTracker) SetClient(c *rpc.Client) { - mpt.rpcClient = c - mpt.rpcReady <- struct{}{} -} - -// OpContext exports the internal optracker's OpContext method. -// For testing purposes only. -func (mpt *MapPinTracker) OpContext(ctx context.Context, c cid.Cid) context.Context { - return mpt.optracker.OpContext(ctx, c) -} diff --git a/pintracker/maptracker/maptracker_test.go b/pintracker/maptracker/maptracker_test.go deleted file mode 100644 index 60f7f669..00000000 --- a/pintracker/maptracker/maptracker_test.go +++ /dev/null @@ -1,591 +0,0 @@ -package maptracker - -import ( - "context" - "errors" - "math/rand" - "sync" - "testing" - "time" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/test" - - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" - rpc "github.com/libp2p/go-libp2p-gorpc" -) - -var ( - pinCancelCid = test.Cid3 - unpinCancelCid = test.Cid2 - ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation") - ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation") -) - -type mockIPFS struct{} - -func mockRPCClient(t *testing.T) *rpc.Client { - s := rpc.NewServer(nil, "mock") - c := rpc.NewClientWithServer(nil, "mock", s) - err := s.RegisterName("IPFSConnector", &mockIPFS{}) - if err != nil { - t.Fatal(err) - } - return c -} - -func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error { - switch in.Cid.String() { - case test.SlowCid1.String(): - time.Sleep(2 * time.Second) - case pinCancelCid.String(): - return ErrPinCancelCid - } - return nil -} - -func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error { - switch in.Cid.String() { - case test.SlowCid1.String(): - time.Sleep(2 * time.Second) - case unpinCancelCid.String(): - return ErrUnpinCancelCid - } - return nil -} - -func testPin(c cid.Cid, min, max int, allocs ...peer.ID) *api.Pin { - pin := api.PinCid(c) - pin.ReplicationFactorMin = min - pin.ReplicationFactorMax = max - pin.Allocations = allocs - return pin -} - -func testSlowMapPinTracker(t *testing.T) *MapPinTracker { - cfg := &Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt -} - -func testMapPinTracker(t *testing.T) *MapPinTracker { - cfg := &Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(test.NewMockRPCClient(t)) - return mpt -} - -func TestNew(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) -} - -func TestShutdown(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - err := mpt.Shutdown(ctx) - if err != nil { - t.Fatal(err) - } - err = mpt.Shutdown(ctx) - if err != nil { - t.Fatal(err) - } -} - -func TestTrack(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h := test.Cid1 - - // Let's tart with a local pin - c := testPin(h, -1, -1) - - err := mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(200 * time.Millisecond) // let it be pinned - - st := mpt.Status(context.Background(), h) - if st.Status != api.TrackerStatusPinned { - t.Fatalf("cid should be pinned and is %s", st.Status) - } - - // Unpin and set remote - c = testPin(h, 1, 1, test.PeerID2) - err = mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(200 * time.Millisecond) // let it be unpinned - - st = mpt.Status(context.Background(), h) - if st.Status != api.TrackerStatusRemote { - t.Fatalf("cid should be pinned and is %s", st.Status) - } -} - -func TestUntrack(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - h2 := test.Cid2 - - // LocalPin - c := testPin(h1, -1, -1) - - err := mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - // Remote pin - c = testPin(h2, 1, 1, test.PeerID2) - err = mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - err = mpt.Untrack(context.Background(), h2) - if err != nil { - t.Fatal(err) - } - err = mpt.Untrack(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - err = mpt.Untrack(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - st := mpt.Status(context.Background(), h1) - if st.Status != api.TrackerStatusUnpinned { - t.Fatalf("cid should be unpinned and is %s", st.Status) - } - - st = mpt.Status(context.Background(), h2) - if st.Status != api.TrackerStatusUnpinned { - t.Fatalf("cid should be unpinned and is %s", st.Status) - } -} - -func TestStatusAll(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - h2 := test.Cid2 - - // LocalPin - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - c = testPin(h2, 1, 1) - mpt.Track(context.Background(), c) - - time.Sleep(200 * time.Millisecond) - - stAll := mpt.StatusAll(context.Background()) - if len(stAll) != 2 { - t.Logf("%+v", stAll) - t.Fatal("expected 2 pins") - } - - for _, st := range stAll { - if st.Cid.Equals(h1) && st.Status != api.TrackerStatusPinned { - t.Fatal("expected pinned") - } - if st.Cid.Equals(h2) && st.Status != api.TrackerStatusRemote { - t.Fatal("expected remote") - } - } -} - -func TestSyncAndRecover(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - h2 := test.Cid2 - - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - c = testPin(h2, -1, -1) - mpt.Track(context.Background(), c) - - time.Sleep(100 * time.Millisecond) - - // IPFSPinLS RPC returns unpinned for anything != Cid1 or Cid3 - info, err := mpt.Sync(context.Background(), h2) - if err != nil { - t.Fatal(err) - } - if info.Status != api.TrackerStatusPinError { - t.Error("expected pin_error") - } - - info, err = mpt.Sync(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - if info.Status != api.TrackerStatusPinned { - t.Error("expected pinned") - } - - info, err = mpt.Recover(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - if info.Status != api.TrackerStatusPinned { - t.Error("expected pinned") - } - - _, err = mpt.Recover(context.Background(), h2) - if err != nil { - t.Fatal(err) - } - - time.Sleep(100 * time.Millisecond) - - info = mpt.Status(context.Background(), h2) - if info.Status != api.TrackerStatusPinned { - t.Error("expected pinned") - } -} - -func TestRecoverAll(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - time.Sleep(100 * time.Millisecond) - mpt.optracker.SetError(context.Background(), h1, errors.New("fakeerror")) - pins, err := mpt.RecoverAll(context.Background()) - if err != nil { - t.Fatal(err) - } - if len(pins) != 1 { - t.Fatal("there should be only one pin") - } - - time.Sleep(100 * time.Millisecond) - info := mpt.Status(context.Background(), h1) - - if info.Status != api.TrackerStatusPinned { - t.Error("the pin should have been recovered") - } -} - -func TestSyncAll(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - synced, err := mpt.SyncAll(context.Background()) - if err != nil { - t.Fatal(err) - } - // This relies on the rpc mock implementation - - if len(synced) != 0 { - t.Fatal("should not have synced anything when it tracks nothing") - } - - h1 := test.Cid1 - h2 := test.Cid2 - - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - c = testPin(h2, -1, -1) - mpt.Track(context.Background(), c) - - time.Sleep(100 * time.Millisecond) - - synced, err = mpt.SyncAll(context.Background()) - if err != nil { - t.Fatal(err) - } - if len(synced) != 1 || synced[0].Status != api.TrackerStatusPinError { - t.Logf("%+v", synced) - t.Fatal("should have synced h2") - } -} - -func TestUntrackTrack(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - - // LocalPin - c := testPin(h1, -1, -1) - err := mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - err = mpt.Untrack(context.Background(), h1) - if err != nil { - t.Fatal(err) - } -} - -func TestTrackUntrackWithCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - - // LocalPin - slowPin := testPin(slowPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - time.Sleep(100 * time.Millisecond) // let pinning start - - pInfo := mpt.Status(context.Background(), slowPin.Cid) - if pInfo.Status == api.TrackerStatusUnpinned { - t.Fatal("slowPin should be tracked") - } - - if pInfo.Status == api.TrackerStatusPinning { - go func() { - err = mpt.Untrack(context.Background(), slowPinCid) - if err != nil { - t.Fatal(err) - } - }() - select { - case <-mpt.optracker.OpContext(context.Background(), slowPinCid).Done(): - return - case <-time.Tick(100 * time.Millisecond): - t.Errorf("operation context should have been cancelled by now") - } - } else { - t.Error("slowPin should be pinning and is:", pInfo.Status) - } -} - -func TestTrackUntrackWithNoCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - fastPinCid := pinCancelCid - - // SlowLocalPin - slowPin := testPin(slowPinCid, -1, -1) - - // LocalPin - fastPin := testPin(fastPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - err = mpt.Track(context.Background(), fastPin) - if err != nil { - t.Fatal(err) - } - - // fastPin should be queued because slow pin is pinning - pInfo := mpt.Status(context.Background(), fastPinCid) - if pInfo.Status == api.TrackerStatusPinQueued { - err = mpt.Untrack(context.Background(), fastPinCid) - if err != nil { - t.Fatal(err) - } - pi := mpt.Status(context.Background(), fastPinCid) - if pi.Error == ErrPinCancelCid.Error() { - t.Fatal(ErrPinCancelCid) - } - } else { - t.Error("fastPin should be queued to pin:", pInfo.Status) - } - - time.Sleep(100 * time.Millisecond) - pInfo = mpt.Status(context.Background(), fastPinCid) - if pInfo.Status != api.TrackerStatusUnpinned { - t.Error("fastPin should have been removed from tracker:", pInfo.Status) - } -} - -func TestUntrackTrackWithCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - - // LocalPin - slowPin := testPin(slowPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - // Untrack should cancel the ongoing request - // and unpin right away - err = mpt.Untrack(context.Background(), slowPinCid) - if err != nil { - t.Fatal(err) - } - - time.Sleep(100 * time.Millisecond) - - pInfo := mpt.Status(context.Background(), slowPinCid) - if pInfo.Status == api.TrackerStatusUnpinned { - t.Fatal("expected slowPin to be tracked") - } - - if pInfo.Status == api.TrackerStatusUnpinning { - go func() { - err = mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - }() - select { - case <-mpt.optracker.OpContext(context.Background(), slowPinCid).Done(): - return - case <-time.Tick(100 * time.Millisecond): - t.Errorf("operation context should have been cancelled by now") - } - } else { - t.Error("slowPin should be in unpinning") - } - -} - -func TestUntrackTrackWithNoCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - fastPinCid := unpinCancelCid - - // SlowLocalPin - slowPin := testPin(slowPinCid, -1, -1) - - // LocalPin - fastPin := testPin(fastPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - err = mpt.Track(context.Background(), fastPin) - if err != nil { - t.Fatal(err) - } - - time.Sleep(3 * time.Second) - - err = mpt.Untrack(context.Background(), slowPin.Cid) - if err != nil { - t.Fatal(err) - } - - err = mpt.Untrack(context.Background(), fastPin.Cid) - if err != nil { - t.Fatal(err) - } - - pInfo := mpt.Status(context.Background(), fastPinCid) - if pInfo.Status == api.TrackerStatusUnpinned { - t.Fatal("c untrack operation should be tracked") - } - - if pInfo.Status == api.TrackerStatusUnpinQueued { - err = mpt.Track(context.Background(), fastPin) - if err != nil { - t.Fatal(err) - } - - pi := mpt.Status(context.Background(), fastPinCid) - if pi.Error == ErrUnpinCancelCid.Error() { - t.Fatal(ErrUnpinCancelCid) - } - } else { - t.Error("c should be queued to unpin") - } -} - -func TestTrackUntrackConcurrent(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - - // LocalPin - c := testPin(h1, -1, -1) - - var wg sync.WaitGroup - - for i := 0; i < 50; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < 50; j++ { - var err error - op := rand.Intn(2) - if op == 1 { - err = mpt.Track(context.Background(), c) - } else { - err = mpt.Untrack(context.Background(), c.Cid) - } - if err != nil { - t.Error(err) - } - } - }() - } - - wg.Wait() - - time.Sleep(200 * time.Millisecond) - st := mpt.Status(context.Background(), h1) - t.Log(st.Status) - if st.Status != api.TrackerStatusUnpinned && st.Status != api.TrackerStatusPinned { - t.Fatal("should be pinned or unpinned") - } -} diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index 12b5cb38..bb67829b 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -105,9 +105,11 @@ func (op *Operation) String() string { // Cid returns the Cid associated to this operation. func (op *Operation) Cid() cid.Cid { + var c cid.Cid op.mu.RLock() - defer op.mu.RUnlock() - return op.pin.Cid + c = op.pin.Cid + op.mu.RUnlock() + return c } // Context returns the context associated to this operation. @@ -117,48 +119,55 @@ func (op *Operation) Context() context.Context { // Cancel will cancel the context associated to this operation. func (op *Operation) Cancel() { - ctx, span := trace.StartSpan(op.ctx, "optracker/Cancel") - _ = ctx - defer span.End() + _, span := trace.StartSpan(op.ctx, "optracker/Cancel") op.cancel() + span.End() } // Phase returns the Phase. func (op *Operation) Phase() Phase { + var ph Phase + op.mu.RLock() - defer op.mu.RUnlock() - return op.phase + ph = op.phase + op.mu.RUnlock() + + return ph } // SetPhase changes the Phase and updates the timestamp. func (op *Operation) SetPhase(ph Phase) { - ctx, span := trace.StartSpan(op.ctx, "optracker/SetPhase") - _ = ctx - defer span.End() + _, span := trace.StartSpan(op.ctx, "optracker/SetPhase") op.mu.Lock() - defer op.mu.Unlock() - op.phase = ph - op.ts = time.Now() + { + op.phase = ph + op.ts = time.Now() + } + op.mu.Unlock() + span.End() } // Error returns any error message attached to the operation. func (op *Operation) Error() string { + var err string op.mu.RLock() - defer op.mu.RUnlock() - return op.error + err = op.error + op.mu.RUnlock() + return err } // SetError sets the phase to PhaseError along with // an error message. It updates the timestamp. func (op *Operation) SetError(err error) { - ctx, span := trace.StartSpan(op.ctx, "optracker/SetError") - _ = ctx - defer span.End() + _, span := trace.StartSpan(op.ctx, "optracker/SetError") op.mu.Lock() - defer op.mu.Unlock() - op.phase = PhaseError - op.error = err.Error() - op.ts = time.Now() + { + op.phase = PhaseError + op.error = err.Error() + op.ts = time.Now() + } + op.mu.Unlock() + span.End() } // Type returns the operation Type. @@ -174,9 +183,11 @@ func (op *Operation) Pin() *api.Pin { // Timestamp returns the time when this operation was // last modified (phase changed, error was set...). func (op *Operation) Timestamp() time.Time { + var ts time.Time op.mu.RLock() - defer op.mu.RUnlock() - return op.ts + ts = op.ts + op.mu.RUnlock() + return ts } // Cancelled returns whether the context for this diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index bcec88f0..50adce85 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -206,24 +206,6 @@ func (opt *OperationTracker) GetAll(ctx context.Context) []*api.PinInfo { return pinfos } -// CleanError removes the associated Operation, if it is -// in PhaseError. -func (opt *OperationTracker) CleanError(ctx context.Context, c cid.Cid) { - opt.mu.RLock() - defer opt.mu.RUnlock() - errop, ok := opt.operations[c.String()] - if !ok { - return - } - - if errop.Phase() != PhaseError { - return - } - - opt.Clean(ctx, errop) - return -} - // CleanAllDone deletes any operation from the tracker that is in PhaseDone. func (opt *OperationTracker) CleanAllDone(ctx context.Context) { opt.mu.Lock() diff --git a/pintracker/pintracker_test.go b/pintracker/pintracker_test.go index 352fed9e..a7a6d43c 100644 --- a/pintracker/pintracker_test.go +++ b/pintracker/pintracker_test.go @@ -11,8 +11,10 @@ import ( ipfscluster "github.com/ipfs/ipfs-cluster" "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" + "github.com/ipfs/ipfs-cluster/datastore/inmem" "github.com/ipfs/ipfs-cluster/pintracker/stateless" + "github.com/ipfs/ipfs-cluster/state" + "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" cid "github.com/ipfs/go-cid" @@ -31,17 +33,12 @@ var ( } ) -type mockCluster struct{} type mockIPFS struct{} func mockRPCClient(t testing.TB) *rpc.Client { s := rpc.NewServer(nil, "mock") c := rpc.NewClientWithServer(nil, "mock", s) - err := s.RegisterName("Cluster", &mockCluster{}) - if err != nil { - t.Fatal(err) - } - err = s.RegisterName("IPFSConnector", &mockIPFS{}) + err := s.RegisterName("IPFSConnector", &mockIPFS{}) if err != nil { t.Fatal(err) } @@ -90,64 +87,58 @@ func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api. return nil } -func (mock *mockCluster) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) error { - *out = []*api.Pin{ - api.PinWithOpts(test.Cid1, pinOpts), - api.PinWithOpts(test.Cid3, pinOpts), - } - return nil -} - -func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { - switch in.String() { - case test.ErrorCid.String(): - return errors.New("expected error when using ErrorCid") - case test.Cid1.String(), test.Cid2.String(): - pin := api.PinWithOpts(in, pinOpts) - *out = *pin - return nil - } - pin := api.PinCid(in) - *out = *pin - return nil -} - var sortPinInfoByCid = func(p []*api.PinInfo) { sort.Slice(p, func(i, j int) bool { return p[i].Cid.String() < p[j].Cid.String() }) } -func testSlowMapPinTracker(t testing.TB) *maptracker.MapPinTracker { - cfg := &maptracker.Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := maptracker.NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt -} +// prefilledState return a state instance with some pins. +func prefilledState(context.Context) (state.ReadOnly, error) { + st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + if err != nil { + return nil, err + } -func testMapPinTracker(t testing.TB) *maptracker.MapPinTracker { - cfg := &maptracker.Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := maptracker.NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(test.NewMockRPCClient(t)) - return mpt + remote := api.PinWithOpts(test.Cid4, api.PinOptions{ + ReplicationFactorMax: 1, + ReplicationFactorMin: 1, + }) + remote.Allocations = []peer.ID{test.PeerID2} + + pins := []*api.Pin{ + api.PinWithOpts(test.Cid1, pinOpts), + api.PinCid(test.Cid2), + api.PinWithOpts(test.Cid3, pinOpts), + remote, + } + + ctx := context.Background() + for _, pin := range pins { + err = st.Add(ctx, pin) + if err != nil { + return nil, err + } + } + return st, nil } func testSlowStatelessPinTracker(t testing.TB) *stateless.Tracker { + t.Helper() + cfg := &stateless.Config{} cfg.Default() - mpt := stateless.New(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt + spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState) + spt.SetClient(mockRPCClient(t)) + return spt } func testStatelessPinTracker(t testing.TB) *stateless.Tracker { + t.Helper() + cfg := &stateless.Config{} cfg.Default() - spt := stateless.New(cfg, test.PeerID1, test.PeerName1) + spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState) spt.SetClient(test.NewMockRPCClient(t)) return spt } @@ -170,14 +161,6 @@ func TestPinTracker_Track(t *testing.T) { }, false, }, - { - "basic map track", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testMapPinTracker(t), - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -204,13 +187,6 @@ func BenchmarkPinTracker_Track(b *testing.B) { testStatelessPinTracker(b), }, }, - { - "basic map track", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testMapPinTracker(b), - }, - }, } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { @@ -242,14 +218,6 @@ func TestPinTracker_Untrack(t *testing.T) { }, false, }, - { - "basic map untrack", - args{ - test.Cid1, - testMapPinTracker(t), - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -289,18 +257,9 @@ func TestPinTracker_StatusAll(t *testing.T) { Cid: test.Cid3, Status: api.TrackerStatusPinned, }, - }, - }, - { - "basic map statusall", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testMapPinTracker(t), - }, - []*api.PinInfo{ { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, + Cid: test.Cid4, + Status: api.TrackerStatusRemote, }, }, }, @@ -315,18 +274,13 @@ func TestPinTracker_StatusAll(t *testing.T) { Cid: test.Cid1, Status: api.TrackerStatusPinned, }, - }, - }, - { - "slow map statusall", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testSlowMapPinTracker(t), - }, - []*api.PinInfo{ { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, + Cid: test.Cid2, + Status: api.TrackerStatusRemote, + }, + { + Cid: test.Cid4, + Status: api.TrackerStatusRemote, }, }, }, @@ -375,12 +329,6 @@ func BenchmarkPinTracker_StatusAll(b *testing.B) { testStatelessPinTracker(b), }, }, - { - "basic map track", - args{ - testMapPinTracker(b), - }, - }, } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { @@ -413,40 +361,17 @@ func TestPinTracker_Status(t *testing.T) { Status: api.TrackerStatusPinned, }, }, - { - "basic map status", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - }, { "basic stateless status/unpinned", args{ - test.Cid4, + test.Cid5, testStatelessPinTracker(t), }, api.PinInfo{ - Cid: test.Cid4, + Cid: test.Cid5, Status: api.TrackerStatusUnpinned, }, }, - { - "basic map status/unpinned", - args{ - test.Cid4, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid4, - Status: api.TrackerStatusUnpinned, - }, - }, - { "slow stateless status", args{ @@ -458,31 +383,9 @@ func TestPinTracker_Status(t *testing.T) { Status: api.TrackerStatusPinned, }, }, - { - "slow map status", - args{ - test.Cid1, - testSlowMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - switch tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - // the Track preps the internal map of the MapPinTracker - // not required by the Stateless impl - pin := api.PinWithOpts(test.Cid1, pinOpts) - if err := tt.args.tracker.Track(context.Background(), pin); err != nil { - t.Errorf("PinTracker.Track() error = %v", err) - } - time.Sleep(1 * time.Second) - } - got := tt.args.tracker.Status(context.Background(), tt.args.c) if got.Cid.String() != tt.want.Cid.String() { @@ -496,231 +399,9 @@ func TestPinTracker_Status(t *testing.T) { } } -func TestPinTracker_SyncAll(t *testing.T) { - type args struct { - cs []cid.Cid - tracker ipfscluster.PinTracker - } - tests := []struct { - name string - args args - want []*api.PinInfo - wantErr bool - }{ - { - "basic stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "basic map syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testMapPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "slow stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testSlowStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "slow map syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testSlowMapPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := tt.args.tracker.SyncAll(context.Background()) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if len(got) != 0 { - t.Fatalf("should not have synced anything when it tracks nothing") - } - - for _, c := range tt.args.cs { - err := tt.args.tracker.Track(context.Background(), api.PinWithOpts(c, pinOpts)) - if err != nil { - t.Fatal(err) - } - } - - sortPinInfoByCid(got) - sortPinInfoByCid(tt.want) - - for i := range got { - if got[i].Cid.String() != tt.want[i].Cid.String() { - t.Errorf("PinTracker.SyncAll() = %v, want %v", got, tt.want) - } - - if got[i].Status != tt.want[i].Status { - t.Errorf("PinTracker.SyncAll() = %v, want %v", got, tt.want) - } - } - }) - } -} - -func TestPinTracker_Sync(t *testing.T) { - type args struct { - c cid.Cid - tracker ipfscluster.PinTracker - } - tests := []struct { - name string - args args - want api.PinInfo - wantErr bool - }{ - { - "basic stateless sync", - args{ - test.Cid1, - testStatelessPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - { - "basic map sync", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - { - "slow stateless sync", - args{ - test.Cid1, - testSlowStatelessPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - { - "slow map sync", - args{ - test.Cid1, - testSlowMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - switch tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - // the Track preps the internal map of the MapPinTracker; not required by the Stateless impl - pin := api.PinWithOpts(test.Cid1, pinOpts) - if err := tt.args.tracker.Track(context.Background(), pin); err != nil { - t.Errorf("PinTracker.Track() error = %v", err) - } - time.Sleep(1 * time.Second) - } - - got, err := tt.args.tracker.Sync(context.Background(), tt.args.c) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.Sync() error = %v, wantErr %v", err, tt.wantErr) - return - } - - t.Logf("got: %+v\n", got) - if got.Cid.String() != tt.want.Cid.String() { - t.Errorf("PinTracker.Sync() = %v, want %v", got.Cid.String(), tt.want.Cid.String()) - } - - if got.Status != tt.want.Status { - t.Errorf("PinTracker.Sync() = %v, want %v", got.Status, tt.want.Status) - } - }) - } -} - func TestPinTracker_RecoverAll(t *testing.T) { type args struct { tracker ipfscluster.PinTracker - pin *api.Pin // only used by maptracker } tests := []struct { name string @@ -732,7 +413,6 @@ func TestPinTracker_RecoverAll(t *testing.T) { "basic stateless recoverall", args{ testStatelessPinTracker(t), - &api.Pin{}, }, []*api.PinInfo{ { @@ -747,19 +427,9 @@ func TestPinTracker_RecoverAll(t *testing.T) { Cid: test.Cid3, Status: api.TrackerStatusPinned, }, - }, - false, - }, - { - "basic map recoverall", - args{ - testMapPinTracker(t), - api.PinWithOpts(test.Cid1, pinOpts), - }, - []*api.PinInfo{ { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, + Cid: test.Cid4, + Status: api.TrackerStatusRemote, }, }, false, @@ -767,15 +437,6 @@ func TestPinTracker_RecoverAll(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - switch tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - // the Track preps the internal map of the MapPinTracker; not required by the Stateless impl - if err := tt.args.tracker.Track(context.Background(), tt.args.pin); err != nil { - t.Errorf("PinTracker.Track() error = %v", err) - } - time.Sleep(1 * time.Second) - } - got, err := tt.args.tracker.RecoverAll(context.Background()) if (err != nil) != tt.wantErr { t.Errorf("PinTracker.RecoverAll() error = %v, wantErr %v", err, tt.wantErr) @@ -829,18 +490,6 @@ func TestPinTracker_Recover(t *testing.T) { }, false, }, - { - "basic map recover", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -880,18 +529,6 @@ func TestUntrackTrack(t *testing.T) { }, false, }, - { - "basic map untrack track", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -933,18 +570,6 @@ func TestTrackUntrackWithCancel(t *testing.T) { }, false, }, - { - "slow map tracker untrack w/ cancel", - args{ - test.SlowCid1, - testSlowMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.SlowCid1, - Status: api.TrackerStatusPinned, - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -970,15 +595,13 @@ func TestTrackUntrackWithCancel(t *testing.T) { }() var ctx context.Context switch trkr := tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - ctx = trkr.OpContext(context.Background(), tt.args.c) case *stateless.Tracker: ctx = trkr.OpContext(context.Background(), tt.args.c) } select { case <-ctx.Done(): return - case <-time.Tick(100 * time.Millisecond): + case <-time.Tick(150 * time.Millisecond): t.Errorf("operation context should have been cancelled by now") } } else { @@ -1003,34 +626,14 @@ func TestPinTracker_RemoteIgnoresError(t *testing.T) { t.Fatal(err) } - // Sync triggers IPFSPinLs which will return an error - // (see mock) - pi, err := pt.Sync(ctx, remoteCid) - if err != nil { - t.Fatal(err) - } - - if pi.Status != api.TrackerStatusRemote || pi.Error != "" { - t.Error("Remote pin should not be in error") - } - - pi = pt.Status(ctx, remoteCid) - if err != nil { - t.Fatal(err) - } - + pi := pt.Status(ctx, remoteCid) if pi.Status != api.TrackerStatusRemote || pi.Error != "" { t.Error("Remote pin should not be in error") } } - t.Run("basic pintracker", func(t *testing.T) { - pt := testMapPinTracker(t) - testF(t, pt) - }) - t.Run("stateless pintracker", func(t *testing.T) { - pt := testStatelessPinTracker(t) + pt := testSlowStatelessPinTracker(t) testF(t, pt) }) } diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index dba6dc8b..879341f4 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -11,7 +11,7 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/pintracker/optracker" - "github.com/ipfs/ipfs-cluster/pintracker/util" + "github.com/ipfs/ipfs-cluster/state" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" @@ -23,6 +23,11 @@ import ( var logger = logging.Logger("pintracker") +var ( + // ErrFullQueue is the error used when pin or unpin operation channel is full. + ErrFullQueue = errors.New("pin/unpin operation queue is full. Try increasing max_pin_queue_size") +) + // Tracker uses the optracker.OperationTracker to manage // transitioning shared ipfs-cluster state (Pins) to the local IPFS node. type Tracker struct { @@ -30,11 +35,14 @@ type Tracker struct { optracker *optracker.OperationTracker - peerID peer.ID + peerID peer.ID + peerName string ctx context.Context cancel func() + getState func(ctx context.Context) (state.ReadOnly, error) + rpcClient *rpc.Client rpcReady chan struct{} @@ -47,14 +55,16 @@ type Tracker struct { } // New creates a new StatelessPinTracker. -func New(cfg *Config, pid peer.ID, peerName string) *Tracker { +func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Context) (state.ReadOnly, error)) *Tracker { ctx, cancel := context.WithCancel(context.Background()) spt := &Tracker{ config: cfg, peerID: pid, + peerName: peerName, ctx: ctx, cancel: cancel, + getState: getState, optracker: optracker.NewOperationTracker(ctx, pid, peerName), rpcReady: make(chan struct{}, 1), pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), @@ -71,13 +81,8 @@ func New(cfg *Config, pid peer.ID, peerName string) *Tracker { // receives a pin Function (pin or unpin) and a channel. // Used for both pinning and unpinning func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) { - logger.Debug("entering opworker") - ticker := time.NewTicker(10 * time.Second) //TODO(ajl): make config var for { select { - case <-ticker.C: - // every tick, clear out all Done operations - spt.optracker.CleanAllDone(spt.ctx) case op := <-opChan: if cont := applyPinF(pinF, op); cont { continue @@ -160,7 +165,7 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera logger.Debugf("entering enqueue: pin: %+v", c) op := spt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued) if op == nil { - return nil // ongoing pin operation. + return nil // ongoing operation. } var ch chan *optracker.Operation @@ -170,14 +175,12 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera ch = spt.pinCh case optracker.OperationUnpin: ch = spt.unpinCh - default: - return errors.New("operation doesn't have a associated channel") } select { case ch <- op: default: - err := util.ErrFullQueue + err := ErrFullQueue op.SetError(err) op.Cancel() logger.Error(err.Error()) @@ -226,9 +229,8 @@ func (spt *Tracker) Track(ctx context.Context, c *api.Pin) error { // Sharded pins are never pinned. A sharded pin cannot turn into // something else or viceversa like it happens with Remote pins so - // we just track them. + // we just ignore them. if c.Type == api.MetaType { - spt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone) return nil } @@ -244,9 +246,11 @@ func (spt *Tracker) Track(ctx context.Context, c *api.Pin) error { op.Cancel() if err != nil { op.SetError(err) - } else { - op.SetPhase(optracker.PhaseDone) + return nil } + + op.SetPhase(optracker.PhaseDone) + spt.optracker.Clean(ctx, op) return nil } @@ -270,13 +274,11 @@ func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo { pininfos, err := spt.localStatus(ctx, true) if err != nil { - logger.Error(err) return nil } - // get all inflight operations from optracker and - // put them into the map, deduplicating any already 'pinned' items with - // their inflight operation + // get all inflight operations from optracker and put them into the + // map, deduplicating any existing items with their inflight operation. for _, infop := range spt.optracker.GetAll(ctx) { pininfos[infop.Cid.String()] = infop } @@ -299,59 +301,50 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { return oppi } + pinInfo := &api.PinInfo{ + Cid: c, + Peer: spt.peerID, + PeerName: spt.peerName, + TS: time.Now(), + } + // check global state to see if cluster should even be caring about // the provided cid - var gpin api.Pin - err := spt.rpcClient.Call( - "", - "Cluster", - "PinGet", - c, - &gpin, - ) + gpin := &api.Pin{} + st, err := spt.getState(ctx) if err != nil { - if rpc.IsRPCError(err) { - logger.Error(err) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusClusterError, - Error: err.Error(), - TS: time.Now(), - } - } - // not part of global state. we should not care about - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusUnpinned, - TS: time.Now(), - } + logger.Error(err) + addError(pinInfo, err) + return pinInfo + } + + gpin, err = st.Get(ctx, c) + if err == state.ErrNotFound { + pinInfo.Status = api.TrackerStatusUnpinned + return pinInfo + } + if err != nil { + logger.Error(err) + addError(pinInfo, err) + return pinInfo } // check if pin is a meta pin if gpin.Type == api.MetaType { - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusSharded, - TS: time.Now(), - } + pinInfo.Status = api.TrackerStatusSharded + return pinInfo } // check if pin is a remote pin if gpin.IsRemotePin(spt.peerID) { - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusRemote, - TS: time.Now(), - } + pinInfo.Status = api.TrackerStatusRemote + return pinInfo } // else attempt to get status from ipfs node var ips api.IPFSPinStatus - err = spt.rpcClient.Call( + err = spt.rpcClient.CallContext( + ctx, "", "IPFSConnector", "PinLsCid", @@ -360,137 +353,12 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { ) if err != nil { logger.Error(err) - return nil + addError(pinInfo, err) + return pinInfo } - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: ips.ToTrackerStatus(), - TS: time.Now(), - } -} - -// SyncAll verifies that the statuses of all tracked Cids (from the shared state) -// match the one reported by the IPFS daemon. If not, they will be transitioned -// to PinError or UnpinError. -// -// SyncAll returns the list of local status for all tracked Cids which -// were updated or have errors. Cids in error states can be recovered -// with Recover(). -// An error is returned if we are unable to contact the IPFS daemon. -func (spt *Tracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) { - ctx, span := trace.StartSpan(ctx, "tracker/stateless/SyncAll") - defer span.End() - - // get ipfs status for all - localpis, err := spt.localStatus(ctx, false) - if err != nil { - logger.Error(err) - return nil, err - } - - for _, p := range spt.optracker.Filter(ctx, optracker.OperationPin, optracker.PhaseError) { - if _, ok := localpis[p.Cid.String()]; ok { - spt.optracker.CleanError(ctx, p.Cid) - } - } - - for _, p := range spt.optracker.Filter(ctx, optracker.OperationUnpin, optracker.PhaseError) { - if _, ok := localpis[p.Cid.String()]; !ok { - spt.optracker.CleanError(ctx, p.Cid) - } - } - - return spt.getErrorsAll(ctx), nil -} - -// Sync returns the updated local status for the given Cid. -func (spt *Tracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) { - ctx, span := trace.StartSpan(ctx, "tracker/stateless/Sync") - defer span.End() - - oppi, ok := spt.optracker.GetExists(ctx, c) - if !ok { - return spt.Status(ctx, c), nil - } - - if oppi.Status == api.TrackerStatusUnpinError { - // check global state to see if cluster should even be caring about - // the provided cid - var gpin api.Pin - err := spt.rpcClient.Call( - "", - "Cluster", - "PinGet", - c, - &gpin, - ) - if err != nil { - if rpc.IsRPCError(err) { - logger.Error(err) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusClusterError, - Error: err.Error(), - TS: time.Now(), - }, err - } - // it isn't in the global state - spt.optracker.CleanError(ctx, c) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusUnpinned, - TS: time.Now(), - }, nil - } - // check if pin is a remote pin - if gpin.IsRemotePin(spt.peerID) { - spt.optracker.CleanError(ctx, c) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusRemote, - TS: time.Now(), - }, nil - } - } - - if oppi.Status == api.TrackerStatusPinError { - // else attempt to get status from ipfs node - var ips api.IPFSPinStatus - err := spt.rpcClient.Call( - "", - "IPFSConnector", - "PinLsCid", - c, - &ips, - ) - if err != nil { - logger.Error(err) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusPinError, - TS: time.Now(), - Error: err.Error(), - }, err - } - if ips.ToTrackerStatus() == api.TrackerStatusPinned { - spt.optracker.CleanError(ctx, c) - pi := &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: ips.ToTrackerStatus(), - TS: time.Now(), - } - return pi, nil - } - } - - return spt.optracker.Get(ctx, c), nil + pinInfo.Status = ips.ToTrackerStatus() + return pinInfo } // RecoverAll attempts to recover all items tracked by this peer. @@ -563,10 +431,11 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, continue } p := &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: ips.ToTrackerStatus(), - TS: time.Now(), + Cid: c, + Peer: spt.peerID, + PeerName: spt.peerName, + Status: ips.ToTrackerStatus(), + TS: time.Now(), } pins[cidstr] = p } @@ -575,23 +444,21 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, // localStatus returns a joint set of consensusState and ipfsStatus // marking pins which should be meta or remote and leaving any ipfs pins that -// aren't in the consensusState out. +// aren't in the consensusState out. If incExtra is true, Remote and Sharded +// pins will be added to the status slice. func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]*api.PinInfo, error) { ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus") defer span.End() - pininfos := make(map[string]*api.PinInfo) - // get shared state - var statePins []*api.Pin - err := spt.rpcClient.CallContext( - ctx, - "", - "Cluster", - "Pins", - struct{}{}, - &statePins, - ) + statePins := []*api.Pin{} + st, err := spt.getState(ctx) + if err != nil { + logger.Error(err) + return nil, err + } + + statePins, err = st.List(ctx) if err != nil { logger.Error(err) return nil, err @@ -604,32 +471,41 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string] return nil, err } + pininfos := make(map[string]*api.PinInfo, len(statePins)) for _, p := range statePins { pCid := p.Cid.String() - if p.Type == api.MetaType && incExtra { - // add pin to pininfos with sharded status - pininfos[pCid] = &api.PinInfo{ - Cid: p.Cid, - Peer: spt.peerID, - Status: api.TrackerStatusSharded, - TS: time.Now(), - } - continue + ipfsInfo, pinnedInIpfs := localpis[pCid] + // base pinInfo object - status to be filled. + pinInfo := &api.PinInfo{ + Cid: p.Cid, + Peer: spt.peerID, + PeerName: spt.peerName, + TS: time.Now(), } - if p.IsRemotePin(spt.peerID) && incExtra { - // add pin to pininfos with a status of remote - pininfos[pCid] = &api.PinInfo{ - Cid: p.Cid, - Peer: spt.peerID, - Status: api.TrackerStatusRemote, - TS: time.Now(), + switch { + case p.Type == api.MetaType: + pinInfo.Status = api.TrackerStatusSharded + if incExtra { + pininfos[pCid] = pinInfo } - continue - } - // lookup p in localpis - if lp, ok := localpis[pCid]; ok { - pininfos[pCid] = lp + case p.IsRemotePin(spt.peerID): + pinInfo.Status = api.TrackerStatusRemote + if incExtra { + pininfos[pCid] = pinInfo + } + case pinnedInIpfs: + pininfos[pCid] = ipfsInfo + default: + // report as undefined for this peer. this will be + // overwritten if the operation tracker has more info + // for this. Otherwise, this is a problem: a pin in + // the state that should be pinned by this peer but + // which no operation is handling. + + // TODO (hector): Consider a pinError so it can be + // recovered? + pinInfo.Status = api.TrackerStatusUndefined } } return pininfos, nil @@ -644,3 +520,8 @@ func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo { func (spt *Tracker) OpContext(ctx context.Context, c cid.Cid) context.Context { return spt.optracker.OpContext(ctx, c) } + +func addError(pinInfo *api.PinInfo, err error) { + pinInfo.Error = err.Error() + pinInfo.Status = api.TrackerStatusClusterError +} diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index f6b186fa..a8123c61 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -8,6 +8,9 @@ import ( "time" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/datastore/inmem" + "github.com/ipfs/ipfs-cluster/state" + "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" cid "github.com/ipfs/go-cid" @@ -25,18 +28,13 @@ var ( } ) -type mockCluster struct{} - type mockIPFS struct{} func mockRPCClient(t *testing.T) *rpc.Client { s := rpc.NewServer(nil, "mock") c := rpc.NewClientWithServer(nil, "mock", s) - err := s.RegisterName("Cluster", &mockCluster{}) - if err != nil { - t.Fatal(err) - } - err = s.RegisterName("IPFSConnector", &mockIPFS{}) + + err := s.RegisterName("IPFSConnector", &mockIPFS{}) if err != nil { t.Fatal(err) } @@ -81,40 +79,38 @@ func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPin return nil } -func (mock *mockCluster) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) error { - *out = []*api.Pin{ - api.PinWithOpts(test.Cid1, pinOpts), - api.PinWithOpts(test.Cid3, pinOpts), - } - return nil -} - -func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { - switch in.String() { - case test.ErrorCid.String(): - return errors.New("expected error when using ErrorCid") - case test.Cid1.String(), test.Cid2.String(): - *out = *api.PinWithOpts(in, pinOpts) - return nil - default: - return errors.New("not found") - } -} - func testSlowStatelessPinTracker(t *testing.T) *Tracker { + t.Helper() + cfg := &Config{} cfg.Default() cfg.ConcurrentPins = 1 - mpt := New(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt + st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + if err != nil { + t.Fatal(err) + } + getState := func(ctx context.Context) (state.ReadOnly, error) { + return st, nil + } + spt := New(cfg, test.PeerID1, test.PeerName1, getState) + spt.SetClient(mockRPCClient(t)) + return spt } func testStatelessPinTracker(t testing.TB) *Tracker { + t.Helper() + cfg := &Config{} cfg.Default() cfg.ConcurrentPins = 1 - spt := New(cfg, test.PeerID1, test.PeerName1) + st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + if err != nil { + t.Fatal(err) + } + getState := func(ctx context.Context) (state.ReadOnly, error) { + return st, nil + } + spt := New(cfg, test.PeerID1, test.PeerName1, getState) spt.SetClient(test.NewMockRPCClient(t)) return spt } @@ -369,102 +365,6 @@ var sortPinInfoByCid = func(p []*api.PinInfo) { }) } -func TestStatelessTracker_SyncAll(t *testing.T) { - type args struct { - cs []cid.Cid - tracker *Tracker - } - tests := []struct { - name string - args args - want []*api.PinInfo - wantErr bool - }{ - { - "basic stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "slow stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testSlowStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := tt.args.tracker.SyncAll(context.Background()) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if len(got) != 0 { - t.Fatalf("should not have synced anything when it tracks nothing") - } - - for _, c := range tt.args.cs { - err := tt.args.tracker.Track(context.Background(), api.PinWithOpts(c, pinOpts)) - if err != nil { - t.Fatal(err) - } - tt.args.tracker.optracker.SetError(context.Background(), c, errors.New("test error")) - } - - got, err = tt.args.tracker.SyncAll(context.Background()) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr) - return - } - - sortPinInfoByCid(got) - sortPinInfoByCid(tt.want) - - for i := range got { - if got[i].Cid.String() != tt.want[i].Cid.String() { - t.Errorf("got: %v\n want %v", got[i].Cid.String(), tt.want[i].Cid.String()) - } - - if got[i].Status != tt.want[i].Status { - t.Errorf("got: %v\n want %v", got[i].Status, tt.want[i].Status) - } - } - }) - } -} - func BenchmarkTracker_localStatus(b *testing.B) { tracker := testStatelessPinTracker(b) b.ResetTimer() diff --git a/pintracker/util/pin.go b/pintracker/util/pin.go deleted file mode 100644 index c8ed2cfe..00000000 --- a/pintracker/util/pin.go +++ /dev/null @@ -1,29 +0,0 @@ -package util - -import ( - "errors" - - "github.com/ipfs/ipfs-cluster/api" - - peer "github.com/libp2p/go-libp2p-core/peer" -) - -var ( - // ErrFullQueue is the error used when pin or unpin operation channel is full. - ErrFullQueue = errors.New("pin/unpin operation queue is full (too many operations), increasing max_pin_queue_size would help") -) - -// IsRemotePin determines whether a Pin's ReplicationFactor has -// been met, so as to either pin or unpin it from the peer. -func IsRemotePin(c *api.Pin, pid peer.ID) bool { - if c.ReplicationFactorMax < 0 { - return false - } - - for _, p := range c.Allocations { - if p == pid { - return false - } - } - return true -} diff --git a/rpc_api.go b/rpc_api.go index f6254478..dfaeb5f0 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -295,46 +295,6 @@ func (rpcapi *ClusterRPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *a return nil } -// SyncAll runs Cluster.SyncAll(). -func (rpcapi *ClusterRPCAPI) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { - pinfos, err := rpcapi.c.SyncAll(ctx) - if err != nil { - return err - } - *out = pinfos - return nil -} - -// SyncAllLocal runs Cluster.SyncAllLocal(). -func (rpcapi *ClusterRPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { - pinfos, err := rpcapi.c.SyncAllLocal(ctx) - if err != nil { - return err - } - *out = pinfos - return nil -} - -// Sync runs Cluster.Sync(). -func (rpcapi *ClusterRPCAPI) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { - pinfo, err := rpcapi.c.Sync(ctx, in) - if err != nil { - return err - } - *out = *pinfo - return nil -} - -// SyncLocal runs Cluster.SyncLocal(). -func (rpcapi *ClusterRPCAPI) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { - pinfo, err := rpcapi.c.SyncLocal(ctx, in) - if err != nil { - return err - } - *out = *pinfo - return nil -} - // RecoverAll runs Cluster.RecoverAll(). func (rpcapi *ClusterRPCAPI) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { pinfos, err := rpcapi.c.RecoverAll(ctx) diff --git a/rpc_policy.go b/rpc_policy.go index e784d3c8..f641930d 100644 --- a/rpc_policy.go +++ b/rpc_policy.go @@ -30,10 +30,6 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{ "Cluster.StatusAll": RPCClosed, "Cluster.StatusAllLocal": RPCClosed, "Cluster.StatusLocal": RPCClosed, - "Cluster.Sync": RPCClosed, - "Cluster.SyncAll": RPCClosed, - "Cluster.SyncAllLocal": RPCTrusted, // Called in broadcast from SyncAll() - "Cluster.SyncLocal": RPCTrusted, // Called in broadcast from Sync() "Cluster.Unpin": RPCClosed, "Cluster.UnpinPath": RPCClosed, "Cluster.Version": RPCOpen, diff --git a/rpcutil/policygen/policygen.go b/rpcutil/policygen/policygen.go index b0443efb..95d74c0e 100644 --- a/rpcutil/policygen/policygen.go +++ b/rpcutil/policygen/policygen.go @@ -27,8 +27,6 @@ var comments = map[string]string{ "Cluster.PeerAdd": "Used by Join()", "Cluster.Peers": "Used by ConnectGraph()", "Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi", - "Cluster.SyncAllLocal": "Called in broadcast from SyncAll()", - "Cluster.SyncLocal": "Called in broadcast from Sync()", "PinTracker.Recover": "Called in broadcast from Recover()", "PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented", "Pintracker.Status": "Called in broadcast from Status()", diff --git a/sharness/config/basic_auth/service.json b/sharness/config/basic_auth/service.json index 5945cbdc..09c9632e 100644 --- a/sharness/config/basic_auth/service.json +++ b/sharness/config/basic_auth/service.json @@ -5,7 +5,6 @@ "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, @@ -69,10 +68,6 @@ } }, "pin_tracker": { - "maptracker": { - "max_pin_queue_size": 50000, - "concurrent_pins": 10 - }, "stateless": { "max_pin_queue_size": 50000, "concurrent_pins": 10 diff --git a/sharness/config/ssl-basic_auth/service.json b/sharness/config/ssl-basic_auth/service.json index 4fb90df0..6013ec97 100644 --- a/sharness/config/ssl-basic_auth/service.json +++ b/sharness/config/ssl-basic_auth/service.json @@ -5,7 +5,6 @@ "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, @@ -69,10 +68,6 @@ } }, "pin_tracker": { - "maptracker": { - "max_pin_queue_size": 50000, - "concurrent_pins": 10 - }, "stateless": { "max_pin_queue_size": 50000, "concurrent_pins": 10 diff --git a/sharness/config/ssl/service.json b/sharness/config/ssl/service.json index 6dca5856..1484ceb6 100644 --- a/sharness/config/ssl/service.json +++ b/sharness/config/ssl/service.json @@ -7,7 +7,6 @@ "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, @@ -68,10 +67,6 @@ } }, "pin_tracker": { - "maptracker": { - "max_pin_queue_size": 50000, - "concurrent_pins": 10 - }, "stateless": { "max_pin_queue_size": 50000, "concurrent_pins": 10 diff --git a/sharness/t0010-ctl-basic-commands.sh b/sharness/t0010-ctl-basic-commands.sh index 7241b7c4..d4aeea6f 100755 --- a/sharness/t0010-ctl-basic-commands.sh +++ b/sharness/t0010-ctl-basic-commands.sh @@ -39,7 +39,6 @@ test_expect_success "cluster-ctl commands output looks good" ' egrep -q "ipfs-cluster-ctl peers" commands.txt && egrep -q "ipfs-cluster-ctl pin" commands.txt && egrep -q "ipfs-cluster-ctl status" commands.txt && - egrep -q "ipfs-cluster-ctl sync" commands.txt && egrep -q "ipfs-cluster-ctl recover" commands.txt && egrep -q "ipfs-cluster-ctl version" commands.txt && egrep -q "ipfs-cluster-ctl commands" commands.txt diff --git a/sharness/t0025-ctl-status-report-commands.sh b/sharness/t0025-ctl-status-report-commands.sh index 63edf90a..4b2b978b 100755 --- a/sharness/t0025-ctl-status-report-commands.sh +++ b/sharness/t0025-ctl-status-report-commands.sh @@ -41,10 +41,6 @@ test_expect_success IPFS,CLUSTER "invalid CID status" ' test_must_fail ipfs-cluster-ctl status XXXinvalid-CIDXXX ' -test_expect_success IPFS,CLUSTER "empty cluster-ctl sync succeeds" ' - ipfs-cluster-ctl sync -' - test_expect_success IPFS,CLUSTER "empty cluster_ctl recover should not fail" ' ipfs-cluster-ctl recover ' diff --git a/test/cids.go b/test/cids.go index 06e321a8..079d9c96 100644 --- a/test/cids.go +++ b/test/cids.go @@ -11,6 +11,7 @@ var ( Cid2, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma") Cid3, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb") Cid4, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuc57") + Cid5, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") Cid4Data = "Cid4Data" // Cid resulting from block put NOT ipfs add SlowCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") CidResolved, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc") diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 0c563890..7aa31db0 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -285,22 +285,6 @@ func (mock *mockCluster) StatusLocal(ctx context.Context, in cid.Cid, out *api.P return (&mockPinTracker{}).Status(ctx, in, out) } -func (mock *mockCluster) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { - return mock.StatusAll(ctx, in, out) -} - -func (mock *mockCluster) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { - return mock.StatusAllLocal(ctx, in, out) -} - -func (mock *mockCluster) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { - return mock.Status(ctx, in, out) -} - -func (mock *mockCluster) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { - return mock.StatusLocal(ctx, in, out) -} - func (mock *mockCluster) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { return mock.StatusAll(ctx, in, out) }