diff --git a/.travis.yml b/.travis.yml index 04db3c85..9e37f22b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,9 +30,6 @@ jobs: - name: "Main Tests with raft consensus" script: - travis_wait go test -v -timeout 15m -failfast -consensus raft . - - name: "Main Tests with stateless tracker" - script: - - travis_wait go test -v -timeout 15m -failfast -tracker stateless . - name: "Golint and go vet" script: - go get -u golang.org/x/lint/golint diff --git a/api/rest/client/client.go b/api/rest/client/client.go index 4b121fe7..0fb221b7 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -86,17 +86,6 @@ type Client interface { // StatusAll gathers Status() for all tracked items. StatusAll(ctx context.Context, filter api.TrackerStatus, local bool) ([]*api.GlobalPinInfo, error) - // Sync makes sure the state of a Cid corresponds to the state reported - // by the ipfs daemon, and returns it. If local is true, this operation - // only happens on the current peer, otherwise it happens on every - // cluster peer. - Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) - // SyncAll triggers Sync() operations for all tracked items. It only - // returns informations for items that were de-synced or have an error - // state. If local is true, the operation is limited to the current - // peer. Otherwise it happens on every cluster peer. - SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) - // Recover retriggers pin or unpin ipfs operations for a Cid in error // state. If local is true, the operation is limited to the current // peer, otherwise it happens on every cluster peer. diff --git a/api/rest/client/lbclient.go b/api/rest/client/lbclient.go index 629cb472..031264a9 100644 --- a/api/rest/client/lbclient.go +++ b/api/rest/client/lbclient.go @@ -270,37 +270,6 @@ func (lc *loadBalancingClient) StatusAll(ctx context.Context, filter api.Tracker return pinInfos, err } -// Sync makes sure the state of a Cid corresponds to the state reported by -// the ipfs daemon, and returns it. If local is true, this operation only -// happens on the current peer, otherwise it happens on every cluster peer. -func (lc *loadBalancingClient) Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) { - var pinInfo *api.GlobalPinInfo - call := func(c Client) error { - var err error - pinInfo, err = c.Sync(ctx, ci, local) - return err - } - - err := lc.retry(0, call) - return pinInfo, err -} - -// SyncAll triggers Sync() operations for all tracked items. It only returns -// informations for items that were de-synced or have an error state. If -// local is true, the operation is limited to the current peer. Otherwise -// it happens on every cluster peer. -func (lc *loadBalancingClient) SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) { - var pinInfos []*api.GlobalPinInfo - call := func(c Client) error { - var err error - pinInfos, err = c.SyncAll(ctx, local) - return err - } - - err := lc.retry(0, call) - return pinInfos, err -} - // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index 532aba8b..5b91ac55 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -250,38 +250,6 @@ func (c *defaultClient) StatusAll(ctx context.Context, filter api.TrackerStatus, return gpis, err } -// Sync makes sure the state of a Cid corresponds to the state reported by -// the ipfs daemon, and returns it. If local is true, this operation only -// happens on the current peer, otherwise it happens on every cluster peer. -func (c *defaultClient) Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) { - ctx, span := trace.StartSpan(ctx, "client/Sync") - defer span.End() - - var gpi api.GlobalPinInfo - err := c.do( - ctx, - "POST", - fmt.Sprintf("/pins/%s/sync?local=%t", ci.String(), local), - nil, - nil, - &gpi, - ) - return &gpi, err -} - -// SyncAll triggers Sync() operations for all tracked items. It only returns -// informations for items that were de-synced or have an error state. If -// local is true, the operation is limited to the current peer. Otherwise -// it happens on every cluster peer. -func (c *defaultClient) SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) { - ctx, span := trace.StartSpan(ctx, "client/SyncAll") - defer span.End() - - var gpis []*api.GlobalPinInfo - err := c.do(ctx, "POST", fmt.Sprintf("/pins/sync?local=%t", local), nil, nil, &gpis) - return gpis, err -} - // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 561f24e8..d2b39e87 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -376,43 +376,6 @@ func TestStatusAll(t *testing.T) { testClients(t, api, testF) } -func TestSync(t *testing.T) { - ctx := context.Background() - api := testAPI(t) - defer shutdown(api) - - testF := func(t *testing.T, c Client) { - pin, err := c.Sync(ctx, test.Cid1, false) - if err != nil { - t.Fatal(err) - } - if !pin.Cid.Equals(test.Cid1) { - t.Error("should be same pin") - } - } - - testClients(t, api, testF) -} - -func TestSyncAll(t *testing.T) { - ctx := context.Background() - api := testAPI(t) - defer shutdown(api) - - testF := func(t *testing.T, c Client) { - pins, err := c.SyncAll(ctx, false) - if err != nil { - t.Fatal(err) - } - - if len(pins) == 0 { - t.Error("there should be some pins") - } - } - - testClients(t, api, testF) -} - func TestRecover(t *testing.T) { ctx := context.Background() api := testAPI(t) diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 62beafc9..52e95aba 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -409,18 +409,6 @@ func (api *API) routes() []route { "/pins", api.statusAllHandler, }, - { - "Sync", - "POST", - "/pins/{hash}/sync", - api.syncHandler, - }, - { - "SyncAll", - "POST", - "/pins/sync", - api.syncAllHandler, - }, { "Recover", "POST", @@ -978,66 +966,6 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) { } } -func (api *API) syncAllHandler(w http.ResponseWriter, r *http.Request) { - queryValues := r.URL.Query() - local := queryValues.Get("local") - - if local == "true" { - var pinInfos []*types.PinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "SyncAllLocal", - struct{}{}, - &pinInfos, - ) - api.sendResponse(w, autoStatus, err, pinInfosToGlobal(pinInfos)) - } else { - var pinInfos []*types.GlobalPinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "SyncAll", - struct{}{}, - &pinInfos, - ) - api.sendResponse(w, autoStatus, err, pinInfos) - } -} - -func (api *API) syncHandler(w http.ResponseWriter, r *http.Request) { - queryValues := r.URL.Query() - local := queryValues.Get("local") - - if pin := api.parseCidOrError(w, r); pin != nil { - if local == "true" { - var pinInfo types.PinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "SyncLocal", - pin.Cid, - &pinInfo, - ) - api.sendResponse(w, autoStatus, err, pinInfoToGlobal(&pinInfo)) - } else { - var pinInfo types.GlobalPinInfo - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "Sync", - pin.Cid, - &pinInfo, - ) - api.sendResponse(w, autoStatus, err, pinInfo) - } - } -} - func (api *API) recoverAllHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query() local := queryValues.Get("local") diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 39a27c58..496da90a 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -951,72 +951,6 @@ func TestAPIStatusEndpoint(t *testing.T) { testBothEndpoints(t, tf) } -func TestAPISyncAllEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url urlF) { - var resp []*api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/sync", []byte{}, &resp) - - if len(resp) != 3 || - !resp[0].Cid.Equals(test.Cid1) || - resp[1].PeerMap[peer.IDB58Encode(test.PeerID1)].Status.String() != "pinning" { - t.Errorf("unexpected syncAll resp:\n %+v", resp) - } - - // Test local=true - var resp2 []*api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/sync?local=true", []byte{}, &resp2) - - if len(resp2) != 2 { - t.Errorf("unexpected syncAll+local resp:\n %+v", resp2) - } - } - - testBothEndpoints(t, tf) -} - -func TestAPISyncEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url urlF) { - var resp api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/"+test.Cid1.String()+"/sync", []byte{}, &resp) - - if !resp.Cid.Equals(test.Cid1) { - t.Error("expected the same cid") - } - info, ok := resp.PeerMap[peer.IDB58Encode(test.PeerID1)] - if !ok { - t.Fatal("expected info for test.PeerID1") - } - if info.Status.String() != "pinned" { - t.Error("expected different status") - } - - // Test local=true - var resp2 api.GlobalPinInfo - makePost(t, rest, url(rest)+"/pins/"+test.Cid1.String()+"/sync?local=true", []byte{}, &resp2) - - if !resp2.Cid.Equals(test.Cid1) { - t.Error("expected the same cid") - } - info, ok = resp2.PeerMap[peer.IDB58Encode(test.PeerID2)] - if !ok { - t.Fatal("expected info for test.PeerID2") - } - if info.Status.String() != "pinned" { - t.Error("expected different status") - } - } - - testBothEndpoints(t, tf) -} - func TestAPIRecoverEndpoint(t *testing.T) { ctx := context.Background() rest := testAPI(t) diff --git a/cluster.go b/cluster.go index e76ada25..5b02c475 100644 --- a/cluster.go +++ b/cluster.go @@ -247,13 +247,12 @@ func (c *Cluster) setupRPCClients() { } } -// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time -func (c *Cluster) syncWatcher() { - ctx, span := trace.StartSpan(c.ctx, "cluster/syncWatcher") +// watchPinset triggers recurrent operations that loop on the pinset. +func (c *Cluster) watchPinset() { + ctx, span := trace.StartSpan(c.ctx, "cluster/watchPinset") defer span.End() stateSyncTicker := time.NewTicker(c.config.StateSyncInterval) - syncTicker := time.NewTicker(c.config.IPFSSyncInterval) recoverTicker := time.NewTicker(c.config.PinRecoverInterval) for { @@ -261,14 +260,12 @@ func (c *Cluster) syncWatcher() { case <-stateSyncTicker.C: logger.Debug("auto-triggering StateSync()") c.StateSync(ctx) - case <-syncTicker.C: - logger.Debug("auto-triggering SyncAllLocal()") - c.SyncAllLocal(ctx) case <-recoverTicker.C: logger.Debug("auto-triggering RecoverAllLocal()") c.RecoverAllLocal(ctx) case <-c.ctx.Done(): stateSyncTicker.Stop() + recoverTicker.Stop() return } } @@ -553,7 +550,7 @@ func (c *Cluster) run() { c.wg.Add(1) go func() { defer c.wg.Done() - c.syncWatcher() + c.watchPinset() }() c.wg.Add(1) @@ -975,113 +972,59 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { return nil } -// StateSync syncs the consensus state to the Pin Tracker, ensuring -// that every Cid in the shared state is tracked and that the Pin Tracker -// is not tracking more Cids than it should. +// StateSync performs maintenance tasks on the global state that require +// looping through all the items. It is triggered automatically on +// StateSyncInterval. Currently it: +// * Sends unpin for expired items for which this peer is "closest" +// (skipped for follower peers) func (c *Cluster) StateSync(ctx context.Context) error { _, span := trace.StartSpan(ctx, "cluster/StateSync") defer span.End() ctx = trace.NewContext(c.ctx, span) + if c.config.FollowerMode { + return nil + } + cState, err := c.consensus.State(ctx) if err != nil { return err } - logger.Debug("syncing state to tracker") timeNow := time.Now() clusterPins, err := cState.List(ctx) if err != nil { return err } - trackedPins := c.tracker.StatusAll(ctx) - trackedPinsMap := make(map[string]struct{}) - for _, tpin := range trackedPins { - trackedPinsMap[tpin.Cid.String()] = struct{}{} + // Only trigger pin operations if we are the closest with respect to + // other trusted peers. We cannot know if our peer ID is trusted by + // other peers in the Cluster. This assumes yes. Setting FollowerMode + // is a way to assume the opposite and skip this completely. + trustedPeers, err := c.getTrustedPeers(ctx) + if err != nil { + return nil } - // Track items which are not tracked - for _, pin := range clusterPins { - _, tracked := trackedPinsMap[pin.Cid.String()] - if !tracked { - logger.Debugf("StateSync: tracking %s, part of the shared state", pin.Cid) - err = c.tracker.Track(ctx, pin) - if err != nil { - return err - } - } + checker := distanceChecker{ + local: c.id, + otherPeers: trustedPeers, + cache: make(map[peer.ID]distance, len(trustedPeers)+1), } - isClosest := func(cid.Cid) bool { - return false - } - - if !c.config.FollowerMode { - trustedPeers, err := c.getTrustedPeers(ctx) - if err != nil { - return nil - } - checker := distanceChecker{ - local: c.id, - otherPeers: trustedPeers, - cache: make(map[peer.ID]distance, len(trustedPeers)+1), - } - isClosest = func(c cid.Cid) bool { - return checker.isClosest(c) - } - } - - // a. Untrack items which should not be tracked - // b. Unpin items which have expired - // c. Track items which should not be remote as local - // d. Track items which should not be local as remote - for _, p := range trackedPins { - pCid := p.Cid - currentPin, err := cState.Get(ctx, pCid) - if err != nil && err != state.ErrNotFound { - return err - } - - if err == state.ErrNotFound { - logger.Debugf("StateSync: untracking %s: not part of shared state", pCid) - c.tracker.Untrack(ctx, pCid) - continue - } - - if currentPin.ExpiredAt(timeNow) && isClosest(pCid) { - logger.Infof("Unpinning %s: pin expired at %s", pCid, currentPin.ExpireAt) - if _, err := c.Unpin(ctx, pCid); err != nil { + // Unpin expired items when we are the closest peer to them. + for _, p := range clusterPins { + if p.ExpiredAt(timeNow) && checker.isClosest(p.Cid) { + logger.Infof("Unpinning %s: pin expired at %s", p.Cid, p.ExpireAt) + if _, err := c.Unpin(ctx, p.Cid); err != nil { logger.Error(err) } - continue - } - - err = c.updateRemotePins(ctx, currentPin, p) - if err != nil { - return err } } return nil } -func (c *Cluster) updateRemotePins(ctx context.Context, pin *api.Pin, p *api.PinInfo) error { - var err error - allocatedHere := pin.ReplicationFactorMin == -1 || containsPeer(pin.Allocations, c.id) - - switch { - case p.Status == api.TrackerStatusRemote && allocatedHere: - logger.Debugf("StateSync: Tracking %s locally (currently remote)", p.Cid) - err = c.tracker.Track(ctx, pin) - case p.Status == api.TrackerStatusPinned && !allocatedHere: - logger.Debugf("StateSync: Tracking %s as remote (currently local)", p.Cid) - err = c.tracker.Track(ctx, pin) - } - - return err -} - // StatusAll returns the GlobalPinInfo for all tracked Cids in all peers. // If an error happens, the slice will contain as much information as // could be fetched from other peers. @@ -1122,48 +1065,6 @@ func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) *api.PinInfo { return c.tracker.Status(ctx, h) } -// SyncAll triggers SyncAllLocal() operations in all cluster peers, making sure -// that the state of tracked items matches the state reported by the IPFS daemon -// and returning the results as GlobalPinInfo. If an error happens, the slice -// will contain as much information as could be fetched from the peers. -func (c *Cluster) SyncAll(ctx context.Context) ([]*api.GlobalPinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/SyncAll") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - return c.globalPinInfoSlice(ctx, "Cluster", "SyncAllLocal") -} - -// SyncAllLocal makes sure that the current state for all tracked items -// in this peer matches the state reported by the IPFS daemon. -// -// SyncAllLocal returns the list of PinInfo that where updated because of -// the operation, along with those in error states. -func (c *Cluster) SyncAllLocal(ctx context.Context) ([]*api.PinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/SyncAllLocal") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - syncedItems, err := c.tracker.SyncAll(ctx) - // Despite errors, tracker provides synced items that we can provide. - // They encapsulate the error. - if err != nil { - logger.Error("tracker.Sync() returned with error: ", err) - logger.Error("Is the ipfs daemon running?") - } - return syncedItems, err -} - -// Sync triggers a SyncLocal() operation for a given Cid. -// in all cluster peers. -func (c *Cluster) Sync(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/Sync") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - return c.globalPinInfoCid(ctx, "Cluster", "SyncLocal", h) -} - // used for RecoverLocal and SyncLocal. func (c *Cluster) localPinInfoOp( ctx context.Context, @@ -1191,17 +1092,6 @@ func (c *Cluster) localPinInfoOp( } -// SyncLocal performs a local sync operation for the given Cid. This will -// tell the tracker to verify the status of the Cid against the IPFS daemon. -// It returns the updated PinInfo for the Cid. -func (c *Cluster) SyncLocal(ctx context.Context, h cid.Cid) (pInfo *api.PinInfo, err error) { - _, span := trace.StartSpan(ctx, "cluster/SyncLocal") - defer span.End() - ctx = trace.NewContext(c.ctx, span) - - return c.localPinInfoOp(ctx, h, c.tracker.Sync) -} - // RecoverAll triggers a RecoverAllLocal operation on all peer. func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/RecoverAll") diff --git a/cluster_config.go b/cluster_config.go index 53b63b34..c1057663 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -29,7 +29,6 @@ var DefaultListenAddrs = []string{"/ip4/0.0.0.0/tcp/9096", "/ip4/0.0.0.0/udp/909 const ( DefaultEnableRelayHop = true DefaultStateSyncInterval = 600 * time.Second - DefaultIPFSSyncInterval = 130 * time.Second DefaultPinRecoverInterval = 1 * time.Hour DefaultMonitorPingInterval = 15 * time.Second DefaultPeerWatchInterval = 5 * time.Second @@ -94,14 +93,6 @@ type Config struct { // consistency, increase with larger states. StateSyncInterval time.Duration - // Time between syncs of the local state and - // the state of the ipfs daemon. This ensures that cluster - // provides the right status for tracked items (for example - // to detect that a pin has been removed. Reduce for faster - // consistency, increase when the number of pinned items is very - // large. - IPFSSyncInterval time.Duration - // Time between automatic runs of the "recover" operation // which will retry to pin/unpin items in error state. PinRecoverInterval time.Duration @@ -177,7 +168,6 @@ type configJSON struct { EnableRelayHop bool `json:"enable_relay_hop"` ConnectionManager *connMgrConfigJSON `json:"connection_manager"` StateSyncInterval string `json:"state_sync_interval"` - IPFSSyncInterval string `json:"ipfs_sync_interval"` PinRecoverInterval string `json:"pin_recover_interval"` ReplicationFactorMin int `json:"replication_factor_min"` ReplicationFactorMax int `json:"replication_factor_max"` @@ -267,10 +257,6 @@ func (cfg *Config) Validate() error { return errors.New("cluster.state_sync_interval is invalid") } - if cfg.IPFSSyncInterval <= 0 { - return errors.New("cluster.ipfs_sync_interval is invalid") - } - if cfg.PinRecoverInterval <= 0 { return errors.New("cluster.pin_recover_interval is invalid") } @@ -367,7 +353,6 @@ func (cfg *Config) setDefaults() { } cfg.LeaveOnShutdown = DefaultLeaveOnShutdown cfg.StateSyncInterval = DefaultStateSyncInterval - cfg.IPFSSyncInterval = DefaultIPFSSyncInterval cfg.PinRecoverInterval = DefaultPinRecoverInterval cfg.ReplicationFactorMin = DefaultReplicationFactor cfg.ReplicationFactorMax = DefaultReplicationFactor @@ -441,7 +426,6 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error { err = config.ParseDurations("cluster", &config.DurationOpt{Duration: jcfg.StateSyncInterval, Dst: &cfg.StateSyncInterval, Name: "state_sync_interval"}, - &config.DurationOpt{Duration: jcfg.IPFSSyncInterval, Dst: &cfg.IPFSSyncInterval, Name: "ipfs_sync_interval"}, &config.DurationOpt{Duration: jcfg.PinRecoverInterval, Dst: &cfg.PinRecoverInterval, Name: "pin_recover_interval"}, &config.DurationOpt{Duration: jcfg.MonitorPingInterval, Dst: &cfg.MonitorPingInterval, Name: "monitor_ping_interval"}, &config.DurationOpt{Duration: jcfg.PeerWatchInterval, Dst: &cfg.PeerWatchInterval, Name: "peer_watch_interval"}, @@ -507,7 +491,6 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) { GracePeriod: cfg.ConnMgr.GracePeriod.String(), } jcfg.StateSyncInterval = cfg.StateSyncInterval.String() - jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String() jcfg.PinRecoverInterval = cfg.PinRecoverInterval.String() jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String() jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String() diff --git a/cluster_config_test.go b/cluster_config_test.go index e30798f9..5e03576d 100644 --- a/cluster_config_test.go +++ b/cluster_config_test.go @@ -24,7 +24,6 @@ var ccfgTestJSON = []byte(` "/ip4/127.0.0.1/udp/10000/quic" ], "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "pin_recover_interval": "1m", "replication_factor_min": 5, "replication_factor_max": 5, diff --git a/cluster_test.go b/cluster_test.go index 7fdafd2a..16c6ee91 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/ipfs-cluster/config" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" + "github.com/ipfs/ipfs-cluster/pintracker/stateless" "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/version" @@ -148,7 +149,7 @@ type mockTracer struct { } func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) { - ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() + ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs() ctx := context.Background() host, pubsub, dht := createHost(t, ident.PrivateKey, clusterCfg.Secret, clusterCfg.ListenAddr) @@ -162,11 +163,12 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke api := &mockAPI{} proxy := &mockProxy{} ipfs := &mockConnector{} - tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) + tracer := &mockTracer{} store := makeStore(t, badgerCfg) cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, false, crdtCfg) + tracker := stateless.New(statelesstrackerCfg, ident.ID, clusterCfg.Peername, cons.State) var peersF func(context.Context) ([]peer.ID, error) if consensus == "raft" { diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index 091a00e9..b9948bc4 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -779,9 +779,6 @@ Cluster, including which member is pinning them and any errors. If a CID is provided, the status will be only fetched for a single item. Metadata CIDs are included in the status response -The status of a CID may not be accurate. A manual sync can be triggered -with "sync". - When the --local flag is passed, it will only fetch the status from the contacted cluster peer. By default, status will be fetched from all peers. @@ -817,42 +814,6 @@ separated list). The following are valid status values: return nil }, }, - { - Name: "sync", - Usage: "Sync status of tracked items", - Description: ` -This command asks Cluster peers to verify that the current status of tracked -CIDs is accurate by triggering queries to the IPFS daemons that pin them. -If a CID is provided, the sync and recover operations will be limited to -that single item. - -Unless providing a specific CID, the command will output only items which -have changed status because of the sync or are in error state in some node, -therefore, the output should be empty if no operations were performed. - -CIDs in error state may be manually recovered with "recover". - -When the --local flag is passed, it will only trigger sync -operations on the contacted peer. By default, all peers will sync. -`, - ArgsUsage: "[CID]", - Flags: []cli.Flag{ - localFlag(), - }, - Action: func(c *cli.Context) error { - cidStr := c.Args().First() - if cidStr != "" { - ci, err := cid.Decode(cidStr) - checkErr("parsing cid", err) - resp, cerr := globalClient.Sync(ctx, ci, c.Bool("local")) - formatResponse(c, resp, cerr) - } else { - resp, cerr := globalClient.SyncAll(ctx, c.Bool("local")) - formatResponse(c, resp, cerr) - } - return nil - }, - }, { Name: "recover", Usage: "Recover tracked items in error state", diff --git a/cmd/ipfs-cluster-service/daemon.go b/cmd/ipfs-cluster-service/daemon.go index fbedbbbe..df343951 100644 --- a/cmd/ipfs-cluster-service/daemon.go +++ b/cmd/ipfs-cluster-service/daemon.go @@ -17,7 +17,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" "go.opencensus.io/tag" @@ -150,14 +149,6 @@ func createCluster( connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp) checkErr("creating IPFS Connector component", err) - tracker := setupPinTracker( - c.String("pintracker"), - host, - cfgs.Maptracker, - cfgs.Statelesstracker, - cfgs.Cluster.Peername, - ) - informer, err := disk.NewInformer(cfgs.Diskinf) checkErr("creating disk informer", err) alloc := descendalloc.NewAllocator() @@ -190,6 +181,9 @@ func createCluster( peersF = cons.Peers } + tracker := stateless.New(cfgs.Statelesstracker, host.ID(), cfgs.Cluster.Peername, cons.State) + logger.Debug("stateless pintracker loaded") + mon, err := pubsubmon.New(ctx, cfgs.Pubsubmon, pubsub, peersF) if err != nil { store.Close() @@ -225,29 +219,6 @@ func bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, bootstraps []m } } -func setupPinTracker( - name string, - h host.Host, - mapCfg *maptracker.Config, - statelessCfg *stateless.Config, - peerName string, -) ipfscluster.PinTracker { - switch name { - case "map": - ptrk := maptracker.NewMapPinTracker(mapCfg, h.ID(), peerName) - logger.Debug("map pintracker loaded") - return ptrk - case "stateless": - ptrk := stateless.New(statelessCfg, h.ID(), peerName) - logger.Debug("stateless pintracker loaded") - return ptrk - default: - err := errors.New("unknown pintracker type") - checkErr("", err) - return nil - } -} - func setupDatastore(cfgHelper *cmdutils.ConfigHelper) ds.Datastore { stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgHelper.Configs()) checkErr("creating state manager", err) diff --git a/cmd/ipfs-cluster-service/main.go b/cmd/ipfs-cluster-service/main.go index 68c9cbad..8dd3ee2e 100644 --- a/cmd/ipfs-cluster-service/main.go +++ b/cmd/ipfs-cluster-service/main.go @@ -29,9 +29,8 @@ const programName = "ipfs-cluster-service" // flag defaults const ( - defaultPinTracker = "map" - defaultLogLevel = "info" - defaultConsensus = "crdt" + defaultLogLevel = "info" + defaultConsensus = "crdt" ) const ( @@ -413,12 +412,6 @@ the peer IDs in the given multiaddresses. Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"", Hidden: true, }, - cli.StringFlag{ - Name: "pintracker", - Value: defaultPinTracker, - Hidden: true, - Usage: "pintracker to use [map,stateless].", - }, cli.BoolFlag{ Name: "stats", Usage: "enable stats collection", diff --git a/cmdutils/configs.go b/cmdutils/configs.go index a6483ed7..b802b8c1 100644 --- a/cmdutils/configs.go +++ b/cmdutils/configs.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" ) @@ -31,7 +30,6 @@ type Configs struct { Ipfshttp *ipfshttp.Config Raft *raft.Config Crdt *crdt.Config - Maptracker *maptracker.Config Statelesstracker *stateless.Config Pubsubmon *pubsubmon.Config Diskinf *disk.Config @@ -183,7 +181,6 @@ func (ch *ConfigHelper) init() { Ipfshttp: &ipfshttp.Config{}, Raft: &raft.Config{}, Crdt: &crdt.Config{}, - Maptracker: &maptracker.Config{}, Statelesstracker: &stateless.Config{}, Pubsubmon: &pubsubmon.Config{}, Diskinf: &disk.Config{}, @@ -195,7 +192,6 @@ func (ch *ConfigHelper) init() { man.RegisterComponent(config.API, cfgs.Restapi) man.RegisterComponent(config.API, cfgs.Ipfsproxy) man.RegisterComponent(config.IPFSConn, cfgs.Ipfshttp) - man.RegisterComponent(config.PinTracker, cfgs.Maptracker) man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker) man.RegisterComponent(config.Monitor, cfgs.Pubsubmon) man.RegisterComponent(config.Informer, cfgs.Diskinf) diff --git a/config_test.go b/config_test.go index c6746b7d..baf3d5c3 100644 --- a/config_test.go +++ b/config_test.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" ) @@ -32,7 +31,6 @@ var testingClusterCfg = []byte(`{ "grace_period": "2m0s" }, "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "1s", "peer_watch_interval": "1s", @@ -130,8 +128,8 @@ var testingTracerCfg = []byte(`{ "service_name": "cluster-daemon" }`) -func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { - identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs() +func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { + identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs() identity.LoadJSON(testingIdentity) clusterCfg.LoadJSON(testingClusterCfg) apiCfg.LoadJSON(testingAPICfg) @@ -140,16 +138,15 @@ func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Confi badgerCfg.LoadJSON(testingBadgerCfg) raftCfg.LoadJSON(testingRaftCfg) crdtCfg.LoadJSON(testingCrdtCfg) - maptrackerCfg.LoadJSON(testingTrackerCfg) statelesstrkrCfg.LoadJSON(testingTrackerCfg) pubsubmonCfg.LoadJSON(testingMonCfg) diskInfCfg.LoadJSON(testingDiskInfCfg) tracingCfg.LoadJSON(testingTracerCfg) - return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg + return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg } -func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { +func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) { identity := &config.Identity{} clusterCfg := &Config{} apiCfg := &rest.Config{} @@ -158,12 +155,11 @@ func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy. badgerCfg := &badger.Config{} raftCfg := &raft.Config{} crdtCfg := &crdt.Config{} - maptrackerCfg := &maptracker.Config{} statelessCfg := &stateless.Config{} pubsubmonCfg := &pubsubmon.Config{} diskInfCfg := &disk.Config{} tracingCfg := &observations.TracingConfig{} - return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg + return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg } // func TestConfigDefault(t *testing.T) { diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index f12b0a91..498d48b7 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -475,7 +475,7 @@ func (css *Consensus) Leader(ctx context.Context) (peer.ID, error) { func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error) { batching, ok := store.(ds.Batching) if !ok { - return nil, errors.New("must provide a Bathing datastore") + return nil, errors.New("must provide a Batching datastore") } opts := crdt.DefaultOptions() opts.Logger = logger diff --git a/ipfscluster.go b/ipfscluster.go index fa01b3fc..a779c945 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -123,12 +123,6 @@ type PinTracker interface { StatusAll(context.Context) []*api.PinInfo // Status returns the local status of a given Cid. Status(context.Context, cid.Cid) *api.PinInfo - // SyncAll makes sure that all tracked Cids reflect the real IPFS status. - // It returns the list of pins which were updated by the call. - SyncAll(context.Context) ([]*api.PinInfo, error) - // Sync makes sure that the Cid status reflect the real IPFS status. - // It returns the local status of the Cid. - Sync(context.Context, cid.Cid) (*api.PinInfo, error) // RecoverAll calls Recover() for all pins tracked. RecoverAll(context.Context) ([]*api.PinInfo, error) // Recover retriggers a Pin/Unpin operation in a Cids with error status. diff --git a/ipfscluster_test.go b/ipfscluster_test.go index f8cc7947..79d3208c 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -28,7 +28,6 @@ import ( "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/observations" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/test" @@ -174,7 +173,7 @@ func createComponents( peername := fmt.Sprintf("peer_%d", i) - ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs() + ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs() ident.ID = host.ID() ident.PrivateKey = host.Peerstore().PrivKey(host.ID()) @@ -208,8 +207,6 @@ func createComponents( t.Fatal(err) } - tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername) - alloc := descendalloc.NewAllocator() inf, err := disk.NewInformer(diskInfCfg) if err != nil { @@ -218,6 +215,7 @@ func createComponents( store := makeStore(t, badgerCfg) cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, staging, crdtCfg) + tracker := stateless.New(statelesstrackerCfg, ident.ID, clusterCfg.Peername, cons.State) var peersF func(context.Context) ([]peer.ID, error) if consensus == "raft" { @@ -268,19 +266,6 @@ func makeConsensus(t *testing.T, store ds.Datastore, h host.Host, psub *pubsub.P } } -func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg *stateless.Config, peerName string) PinTracker { - var ptrkr PinTracker - switch ptracker { - case "map": - ptrkr = maptracker.NewMapPinTracker(mptCfg, pid, peerName) - case "stateless": - ptrkr = stateless.New(sptCfg, pid, peerName) - default: - panic("bad pintracker") - } - return ptrkr -} - func createCluster(t *testing.T, host host.Host, dht *dht.IpfsDHT, clusterCfg *Config, store ds.Datastore, consensus Consensus, apis []API, ipfs IPFSConnector, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer, tracer Tracer) *Cluster { cl, err := NewCluster(context.Background(), host, dht, clusterCfg, store, consensus, apis, ipfs, tracker, mon, alloc, []Informer{inf}, tracer) if err != nil { @@ -303,15 +288,14 @@ func createHosts(t *testing.T, clusterSecret []byte, nClusters int) ([]host.Host dhts := make([]*dht.IpfsDHT, nClusters, nClusters) tcpaddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - // Disable quic as it is proving a bit unstable - //quicAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic") + quicAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic") for i := range hosts { priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) if err != nil { t.Fatal(err) } - h, p, d := createHost(t, priv, clusterSecret, []ma.Multiaddr{tcpaddr}) + h, p, d := createHost(t, priv, clusterSecret, []ma.Multiaddr{quicAddr, tcpaddr}) hosts[i] = h dhts[i] = d pubsubs[i] = p @@ -915,162 +899,6 @@ func TestClustersStatusAllWithErrors(t *testing.T) { runF(t, clusters, f) } -func TestClustersSyncAllLocal(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{}) // This cid always fails - clusters[0].Pin(ctx, test.Cid2, api.PinOptions{}) - pinDelay() - pinDelay() - - f := func(t *testing.T, c *Cluster) { - // Sync bad ID - infos, err := c.SyncAllLocal(ctx) - if err != nil { - // LocalSync() is asynchronous and should not show an - // error even if Recover() fails. - t.Error(err) - } - if len(infos) != 1 { - t.Fatalf("expected 1 elem slice, got = %d", len(infos)) - } - // Last-known state may still be pinning - if infos[0].Status != api.TrackerStatusPinError && infos[0].Status != api.TrackerStatusPinning { - t.Errorf("element should be in Pinning or PinError state, got = %v", infos[0].Status) - } - } - // Test Local syncs - runF(t, clusters, f) -} - -func TestClustersSyncLocal(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - h := test.ErrorCid - h2 := test.Cid2 - clusters[0].Pin(ctx, h, api.PinOptions{}) - clusters[0].Pin(ctx, h2, api.PinOptions{}) - pinDelay() - pinDelay() - - f := func(t *testing.T, c *Cluster) { - info, err := c.SyncLocal(ctx, h) - if err != nil { - t.Error(err) - } - if info.Status != api.TrackerStatusPinError && info.Status != api.TrackerStatusPinning { - t.Errorf("element is %s and not PinError", info.Status) - } - - // Sync good ID - info, err = c.SyncLocal(ctx, h2) - if err != nil { - t.Error(err) - } - if info.Status != api.TrackerStatusPinned { - t.Error("element should be in Pinned state") - } - } - // Test Local syncs - runF(t, clusters, f) -} - -func TestClustersSyncAll(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{}) - clusters[0].Pin(ctx, test.Cid2, api.PinOptions{}) - pinDelay() - pinDelay() - - j := rand.Intn(nClusters) // choose a random cluster peer - ginfos, err := clusters[j].SyncAll(ctx) - if err != nil { - t.Fatal(err) - } - if len(ginfos) != 1 { - t.Fatalf("expected globalsync to have 1 elements, got = %d", len(ginfos)) - } - if !ginfos[0].Cid.Equals(test.ErrorCid) { - t.Error("expected globalsync to have problems with test.ErrorCid") - } - for _, c := range clusters { - inf, ok := ginfos[0].PeerMap[peer.IDB58Encode(c.host.ID())] - if !ok { - t.Fatal("GlobalPinInfo should have this cluster") - } - if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning { - t.Error("should be PinError in all peers") - } - } -} - -func TestClustersSync(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - h := test.ErrorCid // This cid always fails - h2 := test.Cid2 - clusters[0].Pin(ctx, h, api.PinOptions{}) - clusters[0].Pin(ctx, h2, api.PinOptions{}) - pinDelay() - pinDelay() - - j := rand.Intn(nClusters) - ginfo, err := clusters[j].Sync(ctx, h) - if err != nil { - // we always attempt to return a valid response - // with errors contained in GlobalPinInfo - t.Fatal("did not expect an error") - } - pinfo, ok := ginfo.PeerMap[peer.IDB58Encode(clusters[j].host.ID())] - if !ok { - t.Fatal("should have info for this host") - } - if pinfo.Error == "" { - t.Error("pinInfo error should not be empty") - } - - if !ginfo.Cid.Equals(h) { - t.Error("GlobalPinInfo should be for test.ErrorCid") - } - - for _, c := range clusters { - inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())] - if !ok { - t.Logf("%+v", ginfo) - t.Fatal("GlobalPinInfo should not be empty for this host") - } - - if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning { - t.Error("should be PinError or Pinning in all peers") - } - } - - // Test with a good Cid - j = rand.Intn(nClusters) - ginfo, err = clusters[j].Sync(ctx, h2) - if err != nil { - t.Fatal(err) - } - if !ginfo.Cid.Equals(h2) { - t.Error("GlobalPinInfo should be for testrCid2") - } - - for _, c := range clusters { - inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())] - if !ok { - t.Fatal("GlobalPinInfo should have this cluster") - } - if inf.Status != api.TrackerStatusPinned { - t.Error("the GlobalPinInfo should show Pinned in all peers") - } - } -} - func TestClustersRecoverLocal(t *testing.T) { ctx := context.Background() clusters, mock := createClusters(t) @@ -1099,10 +927,7 @@ func TestClustersRecoverLocal(t *testing.T) { } // Recover good ID - info, err = c.SyncLocal(ctx, h2) - if err != nil { - t.Error(err) - } + info, err = c.RecoverLocal(ctx, h2) if info.Status != api.TrackerStatusPinned { t.Error("element should be in Pinned state") } @@ -2176,19 +2001,6 @@ func TestClustersFollowerMode(t *testing.T) { } }) - t.Run("follower syncs itself", func(t *testing.T) { - gpis, err := clusters[1].SyncAll(ctx) - if err != nil { - t.Error("sync should work") - } - if len(gpis) != 1 { - t.Fatal("globalPinInfo should have 1 pins (in error)") - } - if len(gpis[0].PeerMap) != 1 { - t.Fatal("globalPinInfo[0] should only have one peer") - } - }) - t.Run("follower status itself only", func(t *testing.T) { gpi, err := clusters[1].Status(ctx, test.Cid1) if err != nil { diff --git a/pintracker/maptracker/config.go b/pintracker/maptracker/config.go deleted file mode 100644 index 8430e341..00000000 --- a/pintracker/maptracker/config.go +++ /dev/null @@ -1,115 +0,0 @@ -package maptracker - -import ( - "encoding/json" - "errors" - - "github.com/kelseyhightower/envconfig" - - "github.com/ipfs/ipfs-cluster/config" -) - -const configKey = "maptracker" -const envConfigKey = "cluster_maptracker" - -// Default values for this Config. -const ( - DefaultMaxPinQueueSize = 1000000 - DefaultConcurrentPins = 10 -) - -// Config allows to initialize a Monitor and customize some parameters. -type Config struct { - config.Saver - - // If higher, they will automatically marked with an error. - MaxPinQueueSize int - // ConcurrentPins specifies how many pin requests can be sent to the ipfs - // daemon in parallel. If the pinning method is "refs", it might increase - // speed. Unpin requests are always processed one by one. - ConcurrentPins int -} - -type jsonConfig struct { - MaxPinQueueSize int `json:"max_pin_queue_size,omitempty"` - ConcurrentPins int `json:"concurrent_pins"` -} - -// ConfigKey provides a human-friendly identifier for this type of Config. -func (cfg *Config) ConfigKey() string { - return configKey -} - -// Default sets the fields of this Config to sensible values. -func (cfg *Config) Default() error { - cfg.MaxPinQueueSize = DefaultMaxPinQueueSize - cfg.ConcurrentPins = DefaultConcurrentPins - return nil -} - -// ApplyEnvVars fills in any Config fields found -// as environment variables. -func (cfg *Config) ApplyEnvVars() error { - jcfg := cfg.toJSONConfig() - - err := envconfig.Process(envConfigKey, jcfg) - if err != nil { - return err - } - - return cfg.applyJSONConfig(jcfg) -} - -// Validate checks that the fields of this Config have working values, -// at least in appearance. -func (cfg *Config) Validate() error { - if cfg.MaxPinQueueSize <= 0 { - return errors.New("maptracker.max_pin_queue_size too low") - } - - if cfg.ConcurrentPins <= 0 { - return errors.New("maptracker.concurrent_pins is too low") - } - - return nil -} - -// LoadJSON sets the fields of this Config to the values defined by the JSON -// representation of it, as generated by ToJSON. -func (cfg *Config) LoadJSON(raw []byte) error { - jcfg := &jsonConfig{} - err := json.Unmarshal(raw, jcfg) - if err != nil { - logger.Error("Error unmarshaling maptracker config") - return err - } - - cfg.Default() - - return cfg.applyJSONConfig(jcfg) -} - -func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { - config.SetIfNotDefault(jcfg.MaxPinQueueSize, &cfg.MaxPinQueueSize) - config.SetIfNotDefault(jcfg.ConcurrentPins, &cfg.ConcurrentPins) - - return cfg.Validate() -} - -// ToJSON generates a human-friendly JSON representation of this Config. -func (cfg *Config) ToJSON() ([]byte, error) { - jcfg := cfg.toJSONConfig() - - return config.DefaultJSONMarshal(jcfg) -} - -func (cfg *Config) toJSONConfig() *jsonConfig { - jCfg := &jsonConfig{ - ConcurrentPins: cfg.ConcurrentPins, - } - if cfg.MaxPinQueueSize != DefaultMaxPinQueueSize { - jCfg.MaxPinQueueSize = cfg.MaxPinQueueSize - } - - return jCfg -} diff --git a/pintracker/maptracker/config_test.go b/pintracker/maptracker/config_test.go deleted file mode 100644 index 9a918318..00000000 --- a/pintracker/maptracker/config_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package maptracker - -import ( - "encoding/json" - "os" - "testing" -) - -var cfgJSON = []byte(` -{ - "max_pin_queue_size": 4092, - "concurrent_pins": 2 -} -`) - -func TestLoadJSON(t *testing.T) { - cfg := &Config{} - err := cfg.LoadJSON(cfgJSON) - if err != nil { - t.Fatal(err) - } - - j := &jsonConfig{} - - json.Unmarshal(cfgJSON, j) - j.ConcurrentPins = 10 - tst, _ := json.Marshal(j) - err = cfg.LoadJSON(tst) - if err != nil { - t.Error("did not expect an error") - } - if cfg.ConcurrentPins != 10 { - t.Error("expected 10 concurrent pins") - } -} - -func TestToJSON(t *testing.T) { - cfg := &Config{} - cfg.LoadJSON(cfgJSON) - newjson, err := cfg.ToJSON() - if err != nil { - t.Fatal(err) - } - cfg = &Config{} - err = cfg.LoadJSON(newjson) - if err != nil { - t.Fatal(err) - } -} - -func TestDefault(t *testing.T) { - cfg := &Config{} - cfg.Default() - if cfg.Validate() != nil { - t.Fatal("error validating") - } - - cfg.ConcurrentPins = -2 - if cfg.Validate() == nil { - t.Fatal("expected error validating") - } -} - -func TestApplyEnvVars(t *testing.T) { - os.Setenv("CLUSTER_MAPTRACKER_CONCURRENTPINS", "22") - cfg := &Config{} - cfg.ApplyEnvVars() - - if cfg.ConcurrentPins != 22 { - t.Fatal("failed to override concurrent_pins with env var") - } -} diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go deleted file mode 100644 index 5523cc37..00000000 --- a/pintracker/maptracker/maptracker.go +++ /dev/null @@ -1,452 +0,0 @@ -// Package maptracker implements a PinTracker component for IPFS Cluster. It -// uses a map to keep track of the state of tracked pins. -package maptracker - -import ( - "context" - "errors" - "sync" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/pintracker/optracker" - "github.com/ipfs/ipfs-cluster/pintracker/util" - - cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-core/peer" - rpc "github.com/libp2p/go-libp2p-gorpc" - - "go.opencensus.io/trace" -) - -var logger = logging.Logger("pintracker") - -var ( - errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS") -) - -// MapPinTracker is a PinTracker implementation which uses a Go map -// to store the status of the tracked Cids. This component is thread-safe. -type MapPinTracker struct { - config *Config - - optracker *optracker.OperationTracker - - ctx context.Context - cancel func() - - rpcClient *rpc.Client - rpcReady chan struct{} - - peerID peer.ID - pinCh chan *optracker.Operation - unpinCh chan *optracker.Operation - - shutdownLock sync.Mutex - shutdown bool - wg sync.WaitGroup -} - -// NewMapPinTracker returns a new object which has been correctly -// initialized with the given configuration. -func NewMapPinTracker(cfg *Config, pid peer.ID, peerName string) *MapPinTracker { - ctx, cancel := context.WithCancel(context.Background()) - - mpt := &MapPinTracker{ - ctx: ctx, - cancel: cancel, - config: cfg, - optracker: optracker.NewOperationTracker(ctx, pid, peerName), - rpcReady: make(chan struct{}, 1), - peerID: pid, - pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), - unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), - } - - for i := 0; i < mpt.config.ConcurrentPins; i++ { - go mpt.opWorker(ctx, mpt.pin, mpt.pinCh) - } - go mpt.opWorker(ctx, mpt.unpin, mpt.unpinCh) - return mpt -} - -// receives a pin Function (pin or unpin) and a channel. -// Used for both pinning and unpinning -func (mpt *MapPinTracker) opWorker(ctx context.Context, pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) { - for { - select { - case op := <-opChan: - if op.Cancelled() { - // operation was cancelled. Move on. - // This saves some time, but not 100% needed. - continue - } - op.SetPhase(optracker.PhaseInProgress) - err := pinF(op) // call pin/unpin - if err != nil { - if op.Cancelled() { - // there was an error because - // we were cancelled. Move on. - continue - } - op.SetError(err) - op.Cancel() - continue - } - op.SetPhase(optracker.PhaseDone) - op.Cancel() - - // We keep all pinned things in the tracker, - // only clean unpinned things. - if op.Type() == optracker.OperationUnpin { - mpt.optracker.Clean(ctx, op) - } - case <-mpt.ctx.Done(): - return - } - } -} - -// Shutdown finishes the services provided by the MapPinTracker and cancels -// any active context. -func (mpt *MapPinTracker) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/Shutdown") - defer span.End() - - mpt.shutdownLock.Lock() - defer mpt.shutdownLock.Unlock() - - if mpt.shutdown { - logger.Debug("already shutdown") - return nil - } - - logger.Info("stopping MapPinTracker") - mpt.cancel() - close(mpt.rpcReady) - mpt.wg.Wait() - mpt.shutdown = true - return nil -} - -func (mpt *MapPinTracker) pin(op *optracker.Operation) error { - ctx, span := trace.StartSpan(op.Context(), "tracker/map/pin") - defer span.End() - - logger.Debugf("issuing pin call for %s", op.Cid()) - err := mpt.rpcClient.CallContext( - ctx, - "", - "IPFSConnector", - "Pin", - op.Pin(), - &struct{}{}, - ) - if err != nil { - return err - } - return nil -} - -func (mpt *MapPinTracker) unpin(op *optracker.Operation) error { - ctx, span := trace.StartSpan(op.Context(), "tracker/map/unpin") - defer span.End() - - logger.Debugf("issuing unpin call for %s", op.Cid()) - err := mpt.rpcClient.CallContext( - ctx, - "", - "IPFSConnector", - "Unpin", - op.Pin(), - &struct{}{}, - ) - if err != nil { - return err - } - return nil -} - -// puts a new operation on the queue, unless ongoing exists -func (mpt *MapPinTracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.OperationType, ch chan *optracker.Operation) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/enqueue") - defer span.End() - - op := mpt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued) - if op == nil { - return nil // ongoing pin operation. - } - - select { - case ch <- op: - default: - err := util.ErrFullQueue - op.SetError(err) - op.Cancel() - logger.Error(err.Error()) - return err - } - return nil -} - -// Track tells the MapPinTracker to start managing a Cid, -// possibly triggering Pin operations on the IPFS daemon. -func (mpt *MapPinTracker) Track(ctx context.Context, c *api.Pin) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/Track") - defer span.End() - - logger.Debugf("tracking %s", c.Cid) - - // Sharded pins are never pinned. A sharded pin cannot turn into - // something else or viceversa like it happens with Remote pins so - // we just track them. - if c.Type == api.MetaType { - mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone) - return nil - } - - // Trigger unpin whenever something remote is tracked - // Note, IPFSConn checks with pin/ls before triggering - // pin/rm, so this actually does not always trigger unpin - // to ipfs. - if util.IsRemotePin(c, mpt.peerID) { - op := mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationRemote, optracker.PhaseInProgress) - if op == nil { - return nil // Ongoing operationRemote / PhaseInProgress - } - err := mpt.unpin(op) // unpin all the time, even if not pinned - op.Cancel() - if err != nil { - op.SetError(err) - } else { - op.SetPhase(optracker.PhaseDone) - } - return nil - } - - return mpt.enqueue(ctx, c, optracker.OperationPin, mpt.pinCh) -} - -// Untrack tells the MapPinTracker to stop managing a Cid. -// If the Cid is pinned locally, it will be unpinned. -func (mpt *MapPinTracker) Untrack(ctx context.Context, c cid.Cid) error { - ctx, span := trace.StartSpan(ctx, "tracker/map/Untrack") - defer span.End() - - logger.Infof("untracking %s", c) - return mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh) -} - -// Status returns information for a Cid tracked by this -// MapPinTracker. -func (mpt *MapPinTracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Status") - defer span.End() - - return mpt.optracker.Get(ctx, c) -} - -// StatusAll returns information for all Cids tracked by this -// MapPinTracker. -func (mpt *MapPinTracker) StatusAll(ctx context.Context) []*api.PinInfo { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/StatusAll") - defer span.End() - - return mpt.optracker.GetAll(ctx) -} - -// Sync verifies that the status of a Cid matches that of -// the IPFS daemon. If not, it will be transitioned -// to PinError or UnpinError. -// -// Sync returns the updated local status for the given Cid. -// Pins in error states can be recovered with Recover(). -// An error is returned if we are unable to contact -// the IPFS daemon. -func (mpt *MapPinTracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Sync") - defer span.End() - - var ips api.IPFSPinStatus - err := mpt.rpcClient.Call( - "", - "IPFSConnector", - "PinLsCid", - c, - &ips, - ) - - if err != nil { - mpt.optracker.SetError(ctx, c, err) - return mpt.optracker.Get(ctx, c), nil - } - - return mpt.syncStatus(ctx, c, ips), nil -} - -// SyncAll verifies that the statuses of all tracked Cids match the -// one reported by the IPFS daemon. If not, they will be transitioned -// to PinError or UnpinError. -// -// SyncAll returns the list of local status for all tracked Cids which -// were updated or have errors. Cids in error states can be recovered -// with Recover(). -// An error is returned if we are unable to contact the IPFS daemon. -func (mpt *MapPinTracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/SyncAll") - defer span.End() - - var ipsMap map[string]api.IPFSPinStatus - var results []*api.PinInfo - err := mpt.rpcClient.Call( - "", - "IPFSConnector", - "PinLs", - "recursive", - &ipsMap, - ) - if err != nil { - // set pinning or unpinning ops to error, since we can't - // verify them - pInfos := mpt.optracker.GetAll(ctx) - for _, pInfo := range pInfos { - op, _ := optracker.TrackerStatusToOperationPhase(pInfo.Status) - if op == optracker.OperationPin || op == optracker.OperationUnpin { - mpt.optracker.SetError(ctx, pInfo.Cid, err) - results = append(results, mpt.optracker.Get(ctx, pInfo.Cid)) - } else { - results = append(results, pInfo) - } - } - return results, nil - } - - status := mpt.StatusAll(ctx) - for _, pInfoOrig := range status { - var pInfoNew *api.PinInfo - c := pInfoOrig.Cid - ips, ok := ipsMap[c.String()] - if !ok { - pInfoNew = mpt.syncStatus(ctx, c, api.IPFSPinStatusUnpinned) - } else { - pInfoNew = mpt.syncStatus(ctx, c, ips) - } - - if pInfoOrig.Status != pInfoNew.Status || - pInfoNew.Status == api.TrackerStatusUnpinError || - pInfoNew.Status == api.TrackerStatusPinError { - results = append(results, pInfoNew) - } - } - return results, nil -} - -func (mpt *MapPinTracker) syncStatus(ctx context.Context, c cid.Cid, ips api.IPFSPinStatus) *api.PinInfo { - status, ok := mpt.optracker.Status(ctx, c) - if !ok { - status = api.TrackerStatusUnpinned - } - - // TODO(hector): for sharding, we may need to check that a shard - // is pinned to the right depth. For now, we assumed that if it's pinned - // in some way, then it must be right (including direct). - pinned := func(i api.IPFSPinStatus) bool { - switch i { - case api.IPFSPinStatusRecursive: - return i.IsPinned(-1) - case api.IPFSPinStatusDirect: - return i.IsPinned(0) - default: - return i.IsPinned(1) // Pinned with depth 1 or more. - } - } - - if pinned(ips) { - switch status { - case api.TrackerStatusPinError: - // If an item that we wanted to pin is pinned, we mark it so - mpt.optracker.TrackNewOperation( - ctx, - api.PinCid(c), - optracker.OperationPin, - optracker.PhaseDone, - ) - default: - // 1. Unpinning phases - // 2. Pinned in ipfs but we are not tracking - // -> do nothing - } - } else { - switch status { - case api.TrackerStatusUnpinError: - // clean - op := mpt.optracker.TrackNewOperation( - ctx, - api.PinCid(c), - optracker.OperationUnpin, - optracker.PhaseDone, - ) - if op != nil { - mpt.optracker.Clean(ctx, op) - } - - case api.TrackerStatusPinned: - // not pinned in IPFS but we think it should be: mark as error - mpt.optracker.SetError(ctx, c, errUnpinned) - default: - // 1. Pinning phases - // -> do nothing - } - } - return mpt.optracker.Get(ctx, c) -} - -// Recover will re-queue a Cid in error state for the failed operation, -// possibly retriggering an IPFS pinning operation. -func (mpt *MapPinTracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Recover") - defer span.End() - - pInfo := mpt.optracker.Get(ctx, c) - var err error - - switch pInfo.Status { - case api.TrackerStatusPinError: - logger.Infof("Restarting pin operation for %s", c) - err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationPin, mpt.pinCh) - case api.TrackerStatusUnpinError: - logger.Infof("Restarting unpin operation for %s", c) - err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh) - } - return mpt.optracker.Get(ctx, c), err -} - -// RecoverAll attempts to recover all items tracked by this peer. -func (mpt *MapPinTracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) { - ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/RecoverAll") - defer span.End() - - pInfos := mpt.optracker.GetAll(ctx) - var results []*api.PinInfo - for _, pInfo := range pInfos { - res, err := mpt.Recover(ctx, pInfo.Cid) - results = append(results, res) - if err != nil { - return results, err - } - } - return results, nil -} - -// SetClient makes the MapPinTracker ready to perform RPC requests to -// other components. -func (mpt *MapPinTracker) SetClient(c *rpc.Client) { - mpt.rpcClient = c - mpt.rpcReady <- struct{}{} -} - -// OpContext exports the internal optracker's OpContext method. -// For testing purposes only. -func (mpt *MapPinTracker) OpContext(ctx context.Context, c cid.Cid) context.Context { - return mpt.optracker.OpContext(ctx, c) -} diff --git a/pintracker/maptracker/maptracker_test.go b/pintracker/maptracker/maptracker_test.go deleted file mode 100644 index 60f7f669..00000000 --- a/pintracker/maptracker/maptracker_test.go +++ /dev/null @@ -1,591 +0,0 @@ -package maptracker - -import ( - "context" - "errors" - "math/rand" - "sync" - "testing" - "time" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/test" - - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" - rpc "github.com/libp2p/go-libp2p-gorpc" -) - -var ( - pinCancelCid = test.Cid3 - unpinCancelCid = test.Cid2 - ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation") - ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation") -) - -type mockIPFS struct{} - -func mockRPCClient(t *testing.T) *rpc.Client { - s := rpc.NewServer(nil, "mock") - c := rpc.NewClientWithServer(nil, "mock", s) - err := s.RegisterName("IPFSConnector", &mockIPFS{}) - if err != nil { - t.Fatal(err) - } - return c -} - -func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error { - switch in.Cid.String() { - case test.SlowCid1.String(): - time.Sleep(2 * time.Second) - case pinCancelCid.String(): - return ErrPinCancelCid - } - return nil -} - -func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error { - switch in.Cid.String() { - case test.SlowCid1.String(): - time.Sleep(2 * time.Second) - case unpinCancelCid.String(): - return ErrUnpinCancelCid - } - return nil -} - -func testPin(c cid.Cid, min, max int, allocs ...peer.ID) *api.Pin { - pin := api.PinCid(c) - pin.ReplicationFactorMin = min - pin.ReplicationFactorMax = max - pin.Allocations = allocs - return pin -} - -func testSlowMapPinTracker(t *testing.T) *MapPinTracker { - cfg := &Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt -} - -func testMapPinTracker(t *testing.T) *MapPinTracker { - cfg := &Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(test.NewMockRPCClient(t)) - return mpt -} - -func TestNew(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) -} - -func TestShutdown(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - err := mpt.Shutdown(ctx) - if err != nil { - t.Fatal(err) - } - err = mpt.Shutdown(ctx) - if err != nil { - t.Fatal(err) - } -} - -func TestTrack(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h := test.Cid1 - - // Let's tart with a local pin - c := testPin(h, -1, -1) - - err := mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(200 * time.Millisecond) // let it be pinned - - st := mpt.Status(context.Background(), h) - if st.Status != api.TrackerStatusPinned { - t.Fatalf("cid should be pinned and is %s", st.Status) - } - - // Unpin and set remote - c = testPin(h, 1, 1, test.PeerID2) - err = mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(200 * time.Millisecond) // let it be unpinned - - st = mpt.Status(context.Background(), h) - if st.Status != api.TrackerStatusRemote { - t.Fatalf("cid should be pinned and is %s", st.Status) - } -} - -func TestUntrack(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - h2 := test.Cid2 - - // LocalPin - c := testPin(h1, -1, -1) - - err := mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - // Remote pin - c = testPin(h2, 1, 1, test.PeerID2) - err = mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - err = mpt.Untrack(context.Background(), h2) - if err != nil { - t.Fatal(err) - } - err = mpt.Untrack(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - err = mpt.Untrack(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - st := mpt.Status(context.Background(), h1) - if st.Status != api.TrackerStatusUnpinned { - t.Fatalf("cid should be unpinned and is %s", st.Status) - } - - st = mpt.Status(context.Background(), h2) - if st.Status != api.TrackerStatusUnpinned { - t.Fatalf("cid should be unpinned and is %s", st.Status) - } -} - -func TestStatusAll(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - h2 := test.Cid2 - - // LocalPin - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - c = testPin(h2, 1, 1) - mpt.Track(context.Background(), c) - - time.Sleep(200 * time.Millisecond) - - stAll := mpt.StatusAll(context.Background()) - if len(stAll) != 2 { - t.Logf("%+v", stAll) - t.Fatal("expected 2 pins") - } - - for _, st := range stAll { - if st.Cid.Equals(h1) && st.Status != api.TrackerStatusPinned { - t.Fatal("expected pinned") - } - if st.Cid.Equals(h2) && st.Status != api.TrackerStatusRemote { - t.Fatal("expected remote") - } - } -} - -func TestSyncAndRecover(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - h2 := test.Cid2 - - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - c = testPin(h2, -1, -1) - mpt.Track(context.Background(), c) - - time.Sleep(100 * time.Millisecond) - - // IPFSPinLS RPC returns unpinned for anything != Cid1 or Cid3 - info, err := mpt.Sync(context.Background(), h2) - if err != nil { - t.Fatal(err) - } - if info.Status != api.TrackerStatusPinError { - t.Error("expected pin_error") - } - - info, err = mpt.Sync(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - if info.Status != api.TrackerStatusPinned { - t.Error("expected pinned") - } - - info, err = mpt.Recover(context.Background(), h1) - if err != nil { - t.Fatal(err) - } - if info.Status != api.TrackerStatusPinned { - t.Error("expected pinned") - } - - _, err = mpt.Recover(context.Background(), h2) - if err != nil { - t.Fatal(err) - } - - time.Sleep(100 * time.Millisecond) - - info = mpt.Status(context.Background(), h2) - if info.Status != api.TrackerStatusPinned { - t.Error("expected pinned") - } -} - -func TestRecoverAll(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - time.Sleep(100 * time.Millisecond) - mpt.optracker.SetError(context.Background(), h1, errors.New("fakeerror")) - pins, err := mpt.RecoverAll(context.Background()) - if err != nil { - t.Fatal(err) - } - if len(pins) != 1 { - t.Fatal("there should be only one pin") - } - - time.Sleep(100 * time.Millisecond) - info := mpt.Status(context.Background(), h1) - - if info.Status != api.TrackerStatusPinned { - t.Error("the pin should have been recovered") - } -} - -func TestSyncAll(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - synced, err := mpt.SyncAll(context.Background()) - if err != nil { - t.Fatal(err) - } - // This relies on the rpc mock implementation - - if len(synced) != 0 { - t.Fatal("should not have synced anything when it tracks nothing") - } - - h1 := test.Cid1 - h2 := test.Cid2 - - c := testPin(h1, -1, -1) - mpt.Track(context.Background(), c) - c = testPin(h2, -1, -1) - mpt.Track(context.Background(), c) - - time.Sleep(100 * time.Millisecond) - - synced, err = mpt.SyncAll(context.Background()) - if err != nil { - t.Fatal(err) - } - if len(synced) != 1 || synced[0].Status != api.TrackerStatusPinError { - t.Logf("%+v", synced) - t.Fatal("should have synced h2") - } -} - -func TestUntrackTrack(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - - // LocalPin - c := testPin(h1, -1, -1) - err := mpt.Track(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - err = mpt.Untrack(context.Background(), h1) - if err != nil { - t.Fatal(err) - } -} - -func TestTrackUntrackWithCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - - // LocalPin - slowPin := testPin(slowPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - time.Sleep(100 * time.Millisecond) // let pinning start - - pInfo := mpt.Status(context.Background(), slowPin.Cid) - if pInfo.Status == api.TrackerStatusUnpinned { - t.Fatal("slowPin should be tracked") - } - - if pInfo.Status == api.TrackerStatusPinning { - go func() { - err = mpt.Untrack(context.Background(), slowPinCid) - if err != nil { - t.Fatal(err) - } - }() - select { - case <-mpt.optracker.OpContext(context.Background(), slowPinCid).Done(): - return - case <-time.Tick(100 * time.Millisecond): - t.Errorf("operation context should have been cancelled by now") - } - } else { - t.Error("slowPin should be pinning and is:", pInfo.Status) - } -} - -func TestTrackUntrackWithNoCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - fastPinCid := pinCancelCid - - // SlowLocalPin - slowPin := testPin(slowPinCid, -1, -1) - - // LocalPin - fastPin := testPin(fastPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - err = mpt.Track(context.Background(), fastPin) - if err != nil { - t.Fatal(err) - } - - // fastPin should be queued because slow pin is pinning - pInfo := mpt.Status(context.Background(), fastPinCid) - if pInfo.Status == api.TrackerStatusPinQueued { - err = mpt.Untrack(context.Background(), fastPinCid) - if err != nil { - t.Fatal(err) - } - pi := mpt.Status(context.Background(), fastPinCid) - if pi.Error == ErrPinCancelCid.Error() { - t.Fatal(ErrPinCancelCid) - } - } else { - t.Error("fastPin should be queued to pin:", pInfo.Status) - } - - time.Sleep(100 * time.Millisecond) - pInfo = mpt.Status(context.Background(), fastPinCid) - if pInfo.Status != api.TrackerStatusUnpinned { - t.Error("fastPin should have been removed from tracker:", pInfo.Status) - } -} - -func TestUntrackTrackWithCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - - // LocalPin - slowPin := testPin(slowPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second / 2) - - // Untrack should cancel the ongoing request - // and unpin right away - err = mpt.Untrack(context.Background(), slowPinCid) - if err != nil { - t.Fatal(err) - } - - time.Sleep(100 * time.Millisecond) - - pInfo := mpt.Status(context.Background(), slowPinCid) - if pInfo.Status == api.TrackerStatusUnpinned { - t.Fatal("expected slowPin to be tracked") - } - - if pInfo.Status == api.TrackerStatusUnpinning { - go func() { - err = mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - }() - select { - case <-mpt.optracker.OpContext(context.Background(), slowPinCid).Done(): - return - case <-time.Tick(100 * time.Millisecond): - t.Errorf("operation context should have been cancelled by now") - } - } else { - t.Error("slowPin should be in unpinning") - } - -} - -func TestUntrackTrackWithNoCancel(t *testing.T) { - ctx := context.Background() - mpt := testSlowMapPinTracker(t) - defer mpt.Shutdown(ctx) - - slowPinCid := test.SlowCid1 - fastPinCid := unpinCancelCid - - // SlowLocalPin - slowPin := testPin(slowPinCid, -1, -1) - - // LocalPin - fastPin := testPin(fastPinCid, -1, -1) - - err := mpt.Track(context.Background(), slowPin) - if err != nil { - t.Fatal(err) - } - - err = mpt.Track(context.Background(), fastPin) - if err != nil { - t.Fatal(err) - } - - time.Sleep(3 * time.Second) - - err = mpt.Untrack(context.Background(), slowPin.Cid) - if err != nil { - t.Fatal(err) - } - - err = mpt.Untrack(context.Background(), fastPin.Cid) - if err != nil { - t.Fatal(err) - } - - pInfo := mpt.Status(context.Background(), fastPinCid) - if pInfo.Status == api.TrackerStatusUnpinned { - t.Fatal("c untrack operation should be tracked") - } - - if pInfo.Status == api.TrackerStatusUnpinQueued { - err = mpt.Track(context.Background(), fastPin) - if err != nil { - t.Fatal(err) - } - - pi := mpt.Status(context.Background(), fastPinCid) - if pi.Error == ErrUnpinCancelCid.Error() { - t.Fatal(ErrUnpinCancelCid) - } - } else { - t.Error("c should be queued to unpin") - } -} - -func TestTrackUntrackConcurrent(t *testing.T) { - ctx := context.Background() - mpt := testMapPinTracker(t) - defer mpt.Shutdown(ctx) - - h1 := test.Cid1 - - // LocalPin - c := testPin(h1, -1, -1) - - var wg sync.WaitGroup - - for i := 0; i < 50; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < 50; j++ { - var err error - op := rand.Intn(2) - if op == 1 { - err = mpt.Track(context.Background(), c) - } else { - err = mpt.Untrack(context.Background(), c.Cid) - } - if err != nil { - t.Error(err) - } - } - }() - } - - wg.Wait() - - time.Sleep(200 * time.Millisecond) - st := mpt.Status(context.Background(), h1) - t.Log(st.Status) - if st.Status != api.TrackerStatusUnpinned && st.Status != api.TrackerStatusPinned { - t.Fatal("should be pinned or unpinned") - } -} diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index 12b5cb38..bb67829b 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -105,9 +105,11 @@ func (op *Operation) String() string { // Cid returns the Cid associated to this operation. func (op *Operation) Cid() cid.Cid { + var c cid.Cid op.mu.RLock() - defer op.mu.RUnlock() - return op.pin.Cid + c = op.pin.Cid + op.mu.RUnlock() + return c } // Context returns the context associated to this operation. @@ -117,48 +119,55 @@ func (op *Operation) Context() context.Context { // Cancel will cancel the context associated to this operation. func (op *Operation) Cancel() { - ctx, span := trace.StartSpan(op.ctx, "optracker/Cancel") - _ = ctx - defer span.End() + _, span := trace.StartSpan(op.ctx, "optracker/Cancel") op.cancel() + span.End() } // Phase returns the Phase. func (op *Operation) Phase() Phase { + var ph Phase + op.mu.RLock() - defer op.mu.RUnlock() - return op.phase + ph = op.phase + op.mu.RUnlock() + + return ph } // SetPhase changes the Phase and updates the timestamp. func (op *Operation) SetPhase(ph Phase) { - ctx, span := trace.StartSpan(op.ctx, "optracker/SetPhase") - _ = ctx - defer span.End() + _, span := trace.StartSpan(op.ctx, "optracker/SetPhase") op.mu.Lock() - defer op.mu.Unlock() - op.phase = ph - op.ts = time.Now() + { + op.phase = ph + op.ts = time.Now() + } + op.mu.Unlock() + span.End() } // Error returns any error message attached to the operation. func (op *Operation) Error() string { + var err string op.mu.RLock() - defer op.mu.RUnlock() - return op.error + err = op.error + op.mu.RUnlock() + return err } // SetError sets the phase to PhaseError along with // an error message. It updates the timestamp. func (op *Operation) SetError(err error) { - ctx, span := trace.StartSpan(op.ctx, "optracker/SetError") - _ = ctx - defer span.End() + _, span := trace.StartSpan(op.ctx, "optracker/SetError") op.mu.Lock() - defer op.mu.Unlock() - op.phase = PhaseError - op.error = err.Error() - op.ts = time.Now() + { + op.phase = PhaseError + op.error = err.Error() + op.ts = time.Now() + } + op.mu.Unlock() + span.End() } // Type returns the operation Type. @@ -174,9 +183,11 @@ func (op *Operation) Pin() *api.Pin { // Timestamp returns the time when this operation was // last modified (phase changed, error was set...). func (op *Operation) Timestamp() time.Time { + var ts time.Time op.mu.RLock() - defer op.mu.RUnlock() - return op.ts + ts = op.ts + op.mu.RUnlock() + return ts } // Cancelled returns whether the context for this diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index bcec88f0..50adce85 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -206,24 +206,6 @@ func (opt *OperationTracker) GetAll(ctx context.Context) []*api.PinInfo { return pinfos } -// CleanError removes the associated Operation, if it is -// in PhaseError. -func (opt *OperationTracker) CleanError(ctx context.Context, c cid.Cid) { - opt.mu.RLock() - defer opt.mu.RUnlock() - errop, ok := opt.operations[c.String()] - if !ok { - return - } - - if errop.Phase() != PhaseError { - return - } - - opt.Clean(ctx, errop) - return -} - // CleanAllDone deletes any operation from the tracker that is in PhaseDone. func (opt *OperationTracker) CleanAllDone(ctx context.Context) { opt.mu.Lock() diff --git a/pintracker/pintracker_test.go b/pintracker/pintracker_test.go index 352fed9e..a7a6d43c 100644 --- a/pintracker/pintracker_test.go +++ b/pintracker/pintracker_test.go @@ -11,8 +11,10 @@ import ( ipfscluster "github.com/ipfs/ipfs-cluster" "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/pintracker/maptracker" + "github.com/ipfs/ipfs-cluster/datastore/inmem" "github.com/ipfs/ipfs-cluster/pintracker/stateless" + "github.com/ipfs/ipfs-cluster/state" + "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" cid "github.com/ipfs/go-cid" @@ -31,17 +33,12 @@ var ( } ) -type mockCluster struct{} type mockIPFS struct{} func mockRPCClient(t testing.TB) *rpc.Client { s := rpc.NewServer(nil, "mock") c := rpc.NewClientWithServer(nil, "mock", s) - err := s.RegisterName("Cluster", &mockCluster{}) - if err != nil { - t.Fatal(err) - } - err = s.RegisterName("IPFSConnector", &mockIPFS{}) + err := s.RegisterName("IPFSConnector", &mockIPFS{}) if err != nil { t.Fatal(err) } @@ -90,64 +87,58 @@ func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api. return nil } -func (mock *mockCluster) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) error { - *out = []*api.Pin{ - api.PinWithOpts(test.Cid1, pinOpts), - api.PinWithOpts(test.Cid3, pinOpts), - } - return nil -} - -func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { - switch in.String() { - case test.ErrorCid.String(): - return errors.New("expected error when using ErrorCid") - case test.Cid1.String(), test.Cid2.String(): - pin := api.PinWithOpts(in, pinOpts) - *out = *pin - return nil - } - pin := api.PinCid(in) - *out = *pin - return nil -} - var sortPinInfoByCid = func(p []*api.PinInfo) { sort.Slice(p, func(i, j int) bool { return p[i].Cid.String() < p[j].Cid.String() }) } -func testSlowMapPinTracker(t testing.TB) *maptracker.MapPinTracker { - cfg := &maptracker.Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := maptracker.NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt -} +// prefilledState return a state instance with some pins. +func prefilledState(context.Context) (state.ReadOnly, error) { + st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + if err != nil { + return nil, err + } -func testMapPinTracker(t testing.TB) *maptracker.MapPinTracker { - cfg := &maptracker.Config{} - cfg.Default() - cfg.ConcurrentPins = 1 - mpt := maptracker.NewMapPinTracker(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(test.NewMockRPCClient(t)) - return mpt + remote := api.PinWithOpts(test.Cid4, api.PinOptions{ + ReplicationFactorMax: 1, + ReplicationFactorMin: 1, + }) + remote.Allocations = []peer.ID{test.PeerID2} + + pins := []*api.Pin{ + api.PinWithOpts(test.Cid1, pinOpts), + api.PinCid(test.Cid2), + api.PinWithOpts(test.Cid3, pinOpts), + remote, + } + + ctx := context.Background() + for _, pin := range pins { + err = st.Add(ctx, pin) + if err != nil { + return nil, err + } + } + return st, nil } func testSlowStatelessPinTracker(t testing.TB) *stateless.Tracker { + t.Helper() + cfg := &stateless.Config{} cfg.Default() - mpt := stateless.New(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt + spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState) + spt.SetClient(mockRPCClient(t)) + return spt } func testStatelessPinTracker(t testing.TB) *stateless.Tracker { + t.Helper() + cfg := &stateless.Config{} cfg.Default() - spt := stateless.New(cfg, test.PeerID1, test.PeerName1) + spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState) spt.SetClient(test.NewMockRPCClient(t)) return spt } @@ -170,14 +161,6 @@ func TestPinTracker_Track(t *testing.T) { }, false, }, - { - "basic map track", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testMapPinTracker(t), - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -204,13 +187,6 @@ func BenchmarkPinTracker_Track(b *testing.B) { testStatelessPinTracker(b), }, }, - { - "basic map track", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testMapPinTracker(b), - }, - }, } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { @@ -242,14 +218,6 @@ func TestPinTracker_Untrack(t *testing.T) { }, false, }, - { - "basic map untrack", - args{ - test.Cid1, - testMapPinTracker(t), - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -289,18 +257,9 @@ func TestPinTracker_StatusAll(t *testing.T) { Cid: test.Cid3, Status: api.TrackerStatusPinned, }, - }, - }, - { - "basic map statusall", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testMapPinTracker(t), - }, - []*api.PinInfo{ { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, + Cid: test.Cid4, + Status: api.TrackerStatusRemote, }, }, }, @@ -315,18 +274,13 @@ func TestPinTracker_StatusAll(t *testing.T) { Cid: test.Cid1, Status: api.TrackerStatusPinned, }, - }, - }, - { - "slow map statusall", - args{ - api.PinWithOpts(test.Cid1, pinOpts), - testSlowMapPinTracker(t), - }, - []*api.PinInfo{ { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, + Cid: test.Cid2, + Status: api.TrackerStatusRemote, + }, + { + Cid: test.Cid4, + Status: api.TrackerStatusRemote, }, }, }, @@ -375,12 +329,6 @@ func BenchmarkPinTracker_StatusAll(b *testing.B) { testStatelessPinTracker(b), }, }, - { - "basic map track", - args{ - testMapPinTracker(b), - }, - }, } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { @@ -413,40 +361,17 @@ func TestPinTracker_Status(t *testing.T) { Status: api.TrackerStatusPinned, }, }, - { - "basic map status", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - }, { "basic stateless status/unpinned", args{ - test.Cid4, + test.Cid5, testStatelessPinTracker(t), }, api.PinInfo{ - Cid: test.Cid4, + Cid: test.Cid5, Status: api.TrackerStatusUnpinned, }, }, - { - "basic map status/unpinned", - args{ - test.Cid4, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid4, - Status: api.TrackerStatusUnpinned, - }, - }, - { "slow stateless status", args{ @@ -458,31 +383,9 @@ func TestPinTracker_Status(t *testing.T) { Status: api.TrackerStatusPinned, }, }, - { - "slow map status", - args{ - test.Cid1, - testSlowMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - switch tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - // the Track preps the internal map of the MapPinTracker - // not required by the Stateless impl - pin := api.PinWithOpts(test.Cid1, pinOpts) - if err := tt.args.tracker.Track(context.Background(), pin); err != nil { - t.Errorf("PinTracker.Track() error = %v", err) - } - time.Sleep(1 * time.Second) - } - got := tt.args.tracker.Status(context.Background(), tt.args.c) if got.Cid.String() != tt.want.Cid.String() { @@ -496,231 +399,9 @@ func TestPinTracker_Status(t *testing.T) { } } -func TestPinTracker_SyncAll(t *testing.T) { - type args struct { - cs []cid.Cid - tracker ipfscluster.PinTracker - } - tests := []struct { - name string - args args - want []*api.PinInfo - wantErr bool - }{ - { - "basic stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "basic map syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testMapPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "slow stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testSlowStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "slow map syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testSlowMapPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := tt.args.tracker.SyncAll(context.Background()) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if len(got) != 0 { - t.Fatalf("should not have synced anything when it tracks nothing") - } - - for _, c := range tt.args.cs { - err := tt.args.tracker.Track(context.Background(), api.PinWithOpts(c, pinOpts)) - if err != nil { - t.Fatal(err) - } - } - - sortPinInfoByCid(got) - sortPinInfoByCid(tt.want) - - for i := range got { - if got[i].Cid.String() != tt.want[i].Cid.String() { - t.Errorf("PinTracker.SyncAll() = %v, want %v", got, tt.want) - } - - if got[i].Status != tt.want[i].Status { - t.Errorf("PinTracker.SyncAll() = %v, want %v", got, tt.want) - } - } - }) - } -} - -func TestPinTracker_Sync(t *testing.T) { - type args struct { - c cid.Cid - tracker ipfscluster.PinTracker - } - tests := []struct { - name string - args args - want api.PinInfo - wantErr bool - }{ - { - "basic stateless sync", - args{ - test.Cid1, - testStatelessPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - { - "basic map sync", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - { - "slow stateless sync", - args{ - test.Cid1, - testSlowStatelessPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - { - "slow map sync", - args{ - test.Cid1, - testSlowMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - switch tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - // the Track preps the internal map of the MapPinTracker; not required by the Stateless impl - pin := api.PinWithOpts(test.Cid1, pinOpts) - if err := tt.args.tracker.Track(context.Background(), pin); err != nil { - t.Errorf("PinTracker.Track() error = %v", err) - } - time.Sleep(1 * time.Second) - } - - got, err := tt.args.tracker.Sync(context.Background(), tt.args.c) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.Sync() error = %v, wantErr %v", err, tt.wantErr) - return - } - - t.Logf("got: %+v\n", got) - if got.Cid.String() != tt.want.Cid.String() { - t.Errorf("PinTracker.Sync() = %v, want %v", got.Cid.String(), tt.want.Cid.String()) - } - - if got.Status != tt.want.Status { - t.Errorf("PinTracker.Sync() = %v, want %v", got.Status, tt.want.Status) - } - }) - } -} - func TestPinTracker_RecoverAll(t *testing.T) { type args struct { tracker ipfscluster.PinTracker - pin *api.Pin // only used by maptracker } tests := []struct { name string @@ -732,7 +413,6 @@ func TestPinTracker_RecoverAll(t *testing.T) { "basic stateless recoverall", args{ testStatelessPinTracker(t), - &api.Pin{}, }, []*api.PinInfo{ { @@ -747,19 +427,9 @@ func TestPinTracker_RecoverAll(t *testing.T) { Cid: test.Cid3, Status: api.TrackerStatusPinned, }, - }, - false, - }, - { - "basic map recoverall", - args{ - testMapPinTracker(t), - api.PinWithOpts(test.Cid1, pinOpts), - }, - []*api.PinInfo{ { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, + Cid: test.Cid4, + Status: api.TrackerStatusRemote, }, }, false, @@ -767,15 +437,6 @@ func TestPinTracker_RecoverAll(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - switch tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - // the Track preps the internal map of the MapPinTracker; not required by the Stateless impl - if err := tt.args.tracker.Track(context.Background(), tt.args.pin); err != nil { - t.Errorf("PinTracker.Track() error = %v", err) - } - time.Sleep(1 * time.Second) - } - got, err := tt.args.tracker.RecoverAll(context.Background()) if (err != nil) != tt.wantErr { t.Errorf("PinTracker.RecoverAll() error = %v, wantErr %v", err, tt.wantErr) @@ -829,18 +490,6 @@ func TestPinTracker_Recover(t *testing.T) { }, false, }, - { - "basic map recover", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -880,18 +529,6 @@ func TestUntrackTrack(t *testing.T) { }, false, }, - { - "basic map untrack track", - args{ - test.Cid1, - testMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -933,18 +570,6 @@ func TestTrackUntrackWithCancel(t *testing.T) { }, false, }, - { - "slow map tracker untrack w/ cancel", - args{ - test.SlowCid1, - testSlowMapPinTracker(t), - }, - api.PinInfo{ - Cid: test.SlowCid1, - Status: api.TrackerStatusPinned, - }, - false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -970,15 +595,13 @@ func TestTrackUntrackWithCancel(t *testing.T) { }() var ctx context.Context switch trkr := tt.args.tracker.(type) { - case *maptracker.MapPinTracker: - ctx = trkr.OpContext(context.Background(), tt.args.c) case *stateless.Tracker: ctx = trkr.OpContext(context.Background(), tt.args.c) } select { case <-ctx.Done(): return - case <-time.Tick(100 * time.Millisecond): + case <-time.Tick(150 * time.Millisecond): t.Errorf("operation context should have been cancelled by now") } } else { @@ -1003,34 +626,14 @@ func TestPinTracker_RemoteIgnoresError(t *testing.T) { t.Fatal(err) } - // Sync triggers IPFSPinLs which will return an error - // (see mock) - pi, err := pt.Sync(ctx, remoteCid) - if err != nil { - t.Fatal(err) - } - - if pi.Status != api.TrackerStatusRemote || pi.Error != "" { - t.Error("Remote pin should not be in error") - } - - pi = pt.Status(ctx, remoteCid) - if err != nil { - t.Fatal(err) - } - + pi := pt.Status(ctx, remoteCid) if pi.Status != api.TrackerStatusRemote || pi.Error != "" { t.Error("Remote pin should not be in error") } } - t.Run("basic pintracker", func(t *testing.T) { - pt := testMapPinTracker(t) - testF(t, pt) - }) - t.Run("stateless pintracker", func(t *testing.T) { - pt := testStatelessPinTracker(t) + pt := testSlowStatelessPinTracker(t) testF(t, pt) }) } diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index dba6dc8b..879341f4 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -11,7 +11,7 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/pintracker/optracker" - "github.com/ipfs/ipfs-cluster/pintracker/util" + "github.com/ipfs/ipfs-cluster/state" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" @@ -23,6 +23,11 @@ import ( var logger = logging.Logger("pintracker") +var ( + // ErrFullQueue is the error used when pin or unpin operation channel is full. + ErrFullQueue = errors.New("pin/unpin operation queue is full. Try increasing max_pin_queue_size") +) + // Tracker uses the optracker.OperationTracker to manage // transitioning shared ipfs-cluster state (Pins) to the local IPFS node. type Tracker struct { @@ -30,11 +35,14 @@ type Tracker struct { optracker *optracker.OperationTracker - peerID peer.ID + peerID peer.ID + peerName string ctx context.Context cancel func() + getState func(ctx context.Context) (state.ReadOnly, error) + rpcClient *rpc.Client rpcReady chan struct{} @@ -47,14 +55,16 @@ type Tracker struct { } // New creates a new StatelessPinTracker. -func New(cfg *Config, pid peer.ID, peerName string) *Tracker { +func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Context) (state.ReadOnly, error)) *Tracker { ctx, cancel := context.WithCancel(context.Background()) spt := &Tracker{ config: cfg, peerID: pid, + peerName: peerName, ctx: ctx, cancel: cancel, + getState: getState, optracker: optracker.NewOperationTracker(ctx, pid, peerName), rpcReady: make(chan struct{}, 1), pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize), @@ -71,13 +81,8 @@ func New(cfg *Config, pid peer.ID, peerName string) *Tracker { // receives a pin Function (pin or unpin) and a channel. // Used for both pinning and unpinning func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) { - logger.Debug("entering opworker") - ticker := time.NewTicker(10 * time.Second) //TODO(ajl): make config var for { select { - case <-ticker.C: - // every tick, clear out all Done operations - spt.optracker.CleanAllDone(spt.ctx) case op := <-opChan: if cont := applyPinF(pinF, op); cont { continue @@ -160,7 +165,7 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera logger.Debugf("entering enqueue: pin: %+v", c) op := spt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued) if op == nil { - return nil // ongoing pin operation. + return nil // ongoing operation. } var ch chan *optracker.Operation @@ -170,14 +175,12 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera ch = spt.pinCh case optracker.OperationUnpin: ch = spt.unpinCh - default: - return errors.New("operation doesn't have a associated channel") } select { case ch <- op: default: - err := util.ErrFullQueue + err := ErrFullQueue op.SetError(err) op.Cancel() logger.Error(err.Error()) @@ -226,9 +229,8 @@ func (spt *Tracker) Track(ctx context.Context, c *api.Pin) error { // Sharded pins are never pinned. A sharded pin cannot turn into // something else or viceversa like it happens with Remote pins so - // we just track them. + // we just ignore them. if c.Type == api.MetaType { - spt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone) return nil } @@ -244,9 +246,11 @@ func (spt *Tracker) Track(ctx context.Context, c *api.Pin) error { op.Cancel() if err != nil { op.SetError(err) - } else { - op.SetPhase(optracker.PhaseDone) + return nil } + + op.SetPhase(optracker.PhaseDone) + spt.optracker.Clean(ctx, op) return nil } @@ -270,13 +274,11 @@ func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo { pininfos, err := spt.localStatus(ctx, true) if err != nil { - logger.Error(err) return nil } - // get all inflight operations from optracker and - // put them into the map, deduplicating any already 'pinned' items with - // their inflight operation + // get all inflight operations from optracker and put them into the + // map, deduplicating any existing items with their inflight operation. for _, infop := range spt.optracker.GetAll(ctx) { pininfos[infop.Cid.String()] = infop } @@ -299,59 +301,50 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { return oppi } + pinInfo := &api.PinInfo{ + Cid: c, + Peer: spt.peerID, + PeerName: spt.peerName, + TS: time.Now(), + } + // check global state to see if cluster should even be caring about // the provided cid - var gpin api.Pin - err := spt.rpcClient.Call( - "", - "Cluster", - "PinGet", - c, - &gpin, - ) + gpin := &api.Pin{} + st, err := spt.getState(ctx) if err != nil { - if rpc.IsRPCError(err) { - logger.Error(err) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusClusterError, - Error: err.Error(), - TS: time.Now(), - } - } - // not part of global state. we should not care about - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusUnpinned, - TS: time.Now(), - } + logger.Error(err) + addError(pinInfo, err) + return pinInfo + } + + gpin, err = st.Get(ctx, c) + if err == state.ErrNotFound { + pinInfo.Status = api.TrackerStatusUnpinned + return pinInfo + } + if err != nil { + logger.Error(err) + addError(pinInfo, err) + return pinInfo } // check if pin is a meta pin if gpin.Type == api.MetaType { - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusSharded, - TS: time.Now(), - } + pinInfo.Status = api.TrackerStatusSharded + return pinInfo } // check if pin is a remote pin if gpin.IsRemotePin(spt.peerID) { - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusRemote, - TS: time.Now(), - } + pinInfo.Status = api.TrackerStatusRemote + return pinInfo } // else attempt to get status from ipfs node var ips api.IPFSPinStatus - err = spt.rpcClient.Call( + err = spt.rpcClient.CallContext( + ctx, "", "IPFSConnector", "PinLsCid", @@ -360,137 +353,12 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { ) if err != nil { logger.Error(err) - return nil + addError(pinInfo, err) + return pinInfo } - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: ips.ToTrackerStatus(), - TS: time.Now(), - } -} - -// SyncAll verifies that the statuses of all tracked Cids (from the shared state) -// match the one reported by the IPFS daemon. If not, they will be transitioned -// to PinError or UnpinError. -// -// SyncAll returns the list of local status for all tracked Cids which -// were updated or have errors. Cids in error states can be recovered -// with Recover(). -// An error is returned if we are unable to contact the IPFS daemon. -func (spt *Tracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) { - ctx, span := trace.StartSpan(ctx, "tracker/stateless/SyncAll") - defer span.End() - - // get ipfs status for all - localpis, err := spt.localStatus(ctx, false) - if err != nil { - logger.Error(err) - return nil, err - } - - for _, p := range spt.optracker.Filter(ctx, optracker.OperationPin, optracker.PhaseError) { - if _, ok := localpis[p.Cid.String()]; ok { - spt.optracker.CleanError(ctx, p.Cid) - } - } - - for _, p := range spt.optracker.Filter(ctx, optracker.OperationUnpin, optracker.PhaseError) { - if _, ok := localpis[p.Cid.String()]; !ok { - spt.optracker.CleanError(ctx, p.Cid) - } - } - - return spt.getErrorsAll(ctx), nil -} - -// Sync returns the updated local status for the given Cid. -func (spt *Tracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) { - ctx, span := trace.StartSpan(ctx, "tracker/stateless/Sync") - defer span.End() - - oppi, ok := spt.optracker.GetExists(ctx, c) - if !ok { - return spt.Status(ctx, c), nil - } - - if oppi.Status == api.TrackerStatusUnpinError { - // check global state to see if cluster should even be caring about - // the provided cid - var gpin api.Pin - err := spt.rpcClient.Call( - "", - "Cluster", - "PinGet", - c, - &gpin, - ) - if err != nil { - if rpc.IsRPCError(err) { - logger.Error(err) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusClusterError, - Error: err.Error(), - TS: time.Now(), - }, err - } - // it isn't in the global state - spt.optracker.CleanError(ctx, c) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusUnpinned, - TS: time.Now(), - }, nil - } - // check if pin is a remote pin - if gpin.IsRemotePin(spt.peerID) { - spt.optracker.CleanError(ctx, c) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusRemote, - TS: time.Now(), - }, nil - } - } - - if oppi.Status == api.TrackerStatusPinError { - // else attempt to get status from ipfs node - var ips api.IPFSPinStatus - err := spt.rpcClient.Call( - "", - "IPFSConnector", - "PinLsCid", - c, - &ips, - ) - if err != nil { - logger.Error(err) - return &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: api.TrackerStatusPinError, - TS: time.Now(), - Error: err.Error(), - }, err - } - if ips.ToTrackerStatus() == api.TrackerStatusPinned { - spt.optracker.CleanError(ctx, c) - pi := &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: ips.ToTrackerStatus(), - TS: time.Now(), - } - return pi, nil - } - } - - return spt.optracker.Get(ctx, c), nil + pinInfo.Status = ips.ToTrackerStatus() + return pinInfo } // RecoverAll attempts to recover all items tracked by this peer. @@ -563,10 +431,11 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, continue } p := &api.PinInfo{ - Cid: c, - Peer: spt.peerID, - Status: ips.ToTrackerStatus(), - TS: time.Now(), + Cid: c, + Peer: spt.peerID, + PeerName: spt.peerName, + Status: ips.ToTrackerStatus(), + TS: time.Now(), } pins[cidstr] = p } @@ -575,23 +444,21 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, // localStatus returns a joint set of consensusState and ipfsStatus // marking pins which should be meta or remote and leaving any ipfs pins that -// aren't in the consensusState out. +// aren't in the consensusState out. If incExtra is true, Remote and Sharded +// pins will be added to the status slice. func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]*api.PinInfo, error) { ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus") defer span.End() - pininfos := make(map[string]*api.PinInfo) - // get shared state - var statePins []*api.Pin - err := spt.rpcClient.CallContext( - ctx, - "", - "Cluster", - "Pins", - struct{}{}, - &statePins, - ) + statePins := []*api.Pin{} + st, err := spt.getState(ctx) + if err != nil { + logger.Error(err) + return nil, err + } + + statePins, err = st.List(ctx) if err != nil { logger.Error(err) return nil, err @@ -604,32 +471,41 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string] return nil, err } + pininfos := make(map[string]*api.PinInfo, len(statePins)) for _, p := range statePins { pCid := p.Cid.String() - if p.Type == api.MetaType && incExtra { - // add pin to pininfos with sharded status - pininfos[pCid] = &api.PinInfo{ - Cid: p.Cid, - Peer: spt.peerID, - Status: api.TrackerStatusSharded, - TS: time.Now(), - } - continue + ipfsInfo, pinnedInIpfs := localpis[pCid] + // base pinInfo object - status to be filled. + pinInfo := &api.PinInfo{ + Cid: p.Cid, + Peer: spt.peerID, + PeerName: spt.peerName, + TS: time.Now(), } - if p.IsRemotePin(spt.peerID) && incExtra { - // add pin to pininfos with a status of remote - pininfos[pCid] = &api.PinInfo{ - Cid: p.Cid, - Peer: spt.peerID, - Status: api.TrackerStatusRemote, - TS: time.Now(), + switch { + case p.Type == api.MetaType: + pinInfo.Status = api.TrackerStatusSharded + if incExtra { + pininfos[pCid] = pinInfo } - continue - } - // lookup p in localpis - if lp, ok := localpis[pCid]; ok { - pininfos[pCid] = lp + case p.IsRemotePin(spt.peerID): + pinInfo.Status = api.TrackerStatusRemote + if incExtra { + pininfos[pCid] = pinInfo + } + case pinnedInIpfs: + pininfos[pCid] = ipfsInfo + default: + // report as undefined for this peer. this will be + // overwritten if the operation tracker has more info + // for this. Otherwise, this is a problem: a pin in + // the state that should be pinned by this peer but + // which no operation is handling. + + // TODO (hector): Consider a pinError so it can be + // recovered? + pinInfo.Status = api.TrackerStatusUndefined } } return pininfos, nil @@ -644,3 +520,8 @@ func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo { func (spt *Tracker) OpContext(ctx context.Context, c cid.Cid) context.Context { return spt.optracker.OpContext(ctx, c) } + +func addError(pinInfo *api.PinInfo, err error) { + pinInfo.Error = err.Error() + pinInfo.Status = api.TrackerStatusClusterError +} diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index f6b186fa..a8123c61 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -8,6 +8,9 @@ import ( "time" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/datastore/inmem" + "github.com/ipfs/ipfs-cluster/state" + "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" cid "github.com/ipfs/go-cid" @@ -25,18 +28,13 @@ var ( } ) -type mockCluster struct{} - type mockIPFS struct{} func mockRPCClient(t *testing.T) *rpc.Client { s := rpc.NewServer(nil, "mock") c := rpc.NewClientWithServer(nil, "mock", s) - err := s.RegisterName("Cluster", &mockCluster{}) - if err != nil { - t.Fatal(err) - } - err = s.RegisterName("IPFSConnector", &mockIPFS{}) + + err := s.RegisterName("IPFSConnector", &mockIPFS{}) if err != nil { t.Fatal(err) } @@ -81,40 +79,38 @@ func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPin return nil } -func (mock *mockCluster) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) error { - *out = []*api.Pin{ - api.PinWithOpts(test.Cid1, pinOpts), - api.PinWithOpts(test.Cid3, pinOpts), - } - return nil -} - -func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { - switch in.String() { - case test.ErrorCid.String(): - return errors.New("expected error when using ErrorCid") - case test.Cid1.String(), test.Cid2.String(): - *out = *api.PinWithOpts(in, pinOpts) - return nil - default: - return errors.New("not found") - } -} - func testSlowStatelessPinTracker(t *testing.T) *Tracker { + t.Helper() + cfg := &Config{} cfg.Default() cfg.ConcurrentPins = 1 - mpt := New(cfg, test.PeerID1, test.PeerName1) - mpt.SetClient(mockRPCClient(t)) - return mpt + st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + if err != nil { + t.Fatal(err) + } + getState := func(ctx context.Context) (state.ReadOnly, error) { + return st, nil + } + spt := New(cfg, test.PeerID1, test.PeerName1, getState) + spt.SetClient(mockRPCClient(t)) + return spt } func testStatelessPinTracker(t testing.TB) *Tracker { + t.Helper() + cfg := &Config{} cfg.Default() cfg.ConcurrentPins = 1 - spt := New(cfg, test.PeerID1, test.PeerName1) + st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) + if err != nil { + t.Fatal(err) + } + getState := func(ctx context.Context) (state.ReadOnly, error) { + return st, nil + } + spt := New(cfg, test.PeerID1, test.PeerName1, getState) spt.SetClient(test.NewMockRPCClient(t)) return spt } @@ -369,102 +365,6 @@ var sortPinInfoByCid = func(p []*api.PinInfo) { }) } -func TestStatelessTracker_SyncAll(t *testing.T) { - type args struct { - cs []cid.Cid - tracker *Tracker - } - tests := []struct { - name string - args args - want []*api.PinInfo - wantErr bool - }{ - { - "basic stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - { - "slow stateless syncall", - args{ - []cid.Cid{ - test.Cid1, - test.Cid2, - }, - testSlowStatelessPinTracker(t), - }, - []*api.PinInfo{ - { - Cid: test.Cid1, - Status: api.TrackerStatusPinned, - }, - { - Cid: test.Cid2, - Status: api.TrackerStatusPinned, - }, - }, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := tt.args.tracker.SyncAll(context.Background()) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if len(got) != 0 { - t.Fatalf("should not have synced anything when it tracks nothing") - } - - for _, c := range tt.args.cs { - err := tt.args.tracker.Track(context.Background(), api.PinWithOpts(c, pinOpts)) - if err != nil { - t.Fatal(err) - } - tt.args.tracker.optracker.SetError(context.Background(), c, errors.New("test error")) - } - - got, err = tt.args.tracker.SyncAll(context.Background()) - if (err != nil) != tt.wantErr { - t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr) - return - } - - sortPinInfoByCid(got) - sortPinInfoByCid(tt.want) - - for i := range got { - if got[i].Cid.String() != tt.want[i].Cid.String() { - t.Errorf("got: %v\n want %v", got[i].Cid.String(), tt.want[i].Cid.String()) - } - - if got[i].Status != tt.want[i].Status { - t.Errorf("got: %v\n want %v", got[i].Status, tt.want[i].Status) - } - } - }) - } -} - func BenchmarkTracker_localStatus(b *testing.B) { tracker := testStatelessPinTracker(b) b.ResetTimer() diff --git a/pintracker/util/pin.go b/pintracker/util/pin.go deleted file mode 100644 index c8ed2cfe..00000000 --- a/pintracker/util/pin.go +++ /dev/null @@ -1,29 +0,0 @@ -package util - -import ( - "errors" - - "github.com/ipfs/ipfs-cluster/api" - - peer "github.com/libp2p/go-libp2p-core/peer" -) - -var ( - // ErrFullQueue is the error used when pin or unpin operation channel is full. - ErrFullQueue = errors.New("pin/unpin operation queue is full (too many operations), increasing max_pin_queue_size would help") -) - -// IsRemotePin determines whether a Pin's ReplicationFactor has -// been met, so as to either pin or unpin it from the peer. -func IsRemotePin(c *api.Pin, pid peer.ID) bool { - if c.ReplicationFactorMax < 0 { - return false - } - - for _, p := range c.Allocations { - if p == pid { - return false - } - } - return true -} diff --git a/rpc_api.go b/rpc_api.go index f6254478..dfaeb5f0 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -295,46 +295,6 @@ func (rpcapi *ClusterRPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *a return nil } -// SyncAll runs Cluster.SyncAll(). -func (rpcapi *ClusterRPCAPI) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { - pinfos, err := rpcapi.c.SyncAll(ctx) - if err != nil { - return err - } - *out = pinfos - return nil -} - -// SyncAllLocal runs Cluster.SyncAllLocal(). -func (rpcapi *ClusterRPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { - pinfos, err := rpcapi.c.SyncAllLocal(ctx) - if err != nil { - return err - } - *out = pinfos - return nil -} - -// Sync runs Cluster.Sync(). -func (rpcapi *ClusterRPCAPI) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { - pinfo, err := rpcapi.c.Sync(ctx, in) - if err != nil { - return err - } - *out = *pinfo - return nil -} - -// SyncLocal runs Cluster.SyncLocal(). -func (rpcapi *ClusterRPCAPI) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { - pinfo, err := rpcapi.c.SyncLocal(ctx, in) - if err != nil { - return err - } - *out = *pinfo - return nil -} - // RecoverAll runs Cluster.RecoverAll(). func (rpcapi *ClusterRPCAPI) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { pinfos, err := rpcapi.c.RecoverAll(ctx) diff --git a/rpc_policy.go b/rpc_policy.go index e784d3c8..f641930d 100644 --- a/rpc_policy.go +++ b/rpc_policy.go @@ -30,10 +30,6 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{ "Cluster.StatusAll": RPCClosed, "Cluster.StatusAllLocal": RPCClosed, "Cluster.StatusLocal": RPCClosed, - "Cluster.Sync": RPCClosed, - "Cluster.SyncAll": RPCClosed, - "Cluster.SyncAllLocal": RPCTrusted, // Called in broadcast from SyncAll() - "Cluster.SyncLocal": RPCTrusted, // Called in broadcast from Sync() "Cluster.Unpin": RPCClosed, "Cluster.UnpinPath": RPCClosed, "Cluster.Version": RPCOpen, diff --git a/rpcutil/policygen/policygen.go b/rpcutil/policygen/policygen.go index b0443efb..95d74c0e 100644 --- a/rpcutil/policygen/policygen.go +++ b/rpcutil/policygen/policygen.go @@ -27,8 +27,6 @@ var comments = map[string]string{ "Cluster.PeerAdd": "Used by Join()", "Cluster.Peers": "Used by ConnectGraph()", "Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi", - "Cluster.SyncAllLocal": "Called in broadcast from SyncAll()", - "Cluster.SyncLocal": "Called in broadcast from Sync()", "PinTracker.Recover": "Called in broadcast from Recover()", "PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented", "Pintracker.Status": "Called in broadcast from Status()", diff --git a/sharness/config/basic_auth/service.json b/sharness/config/basic_auth/service.json index 5945cbdc..09c9632e 100644 --- a/sharness/config/basic_auth/service.json +++ b/sharness/config/basic_auth/service.json @@ -5,7 +5,6 @@ "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, @@ -69,10 +68,6 @@ } }, "pin_tracker": { - "maptracker": { - "max_pin_queue_size": 50000, - "concurrent_pins": 10 - }, "stateless": { "max_pin_queue_size": 50000, "concurrent_pins": 10 diff --git a/sharness/config/ssl-basic_auth/service.json b/sharness/config/ssl-basic_auth/service.json index 4fb90df0..6013ec97 100644 --- a/sharness/config/ssl-basic_auth/service.json +++ b/sharness/config/ssl-basic_auth/service.json @@ -5,7 +5,6 @@ "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, @@ -69,10 +68,6 @@ } }, "pin_tracker": { - "maptracker": { - "max_pin_queue_size": 50000, - "concurrent_pins": 10 - }, "stateless": { "max_pin_queue_size": 50000, "concurrent_pins": 10 diff --git a/sharness/config/ssl/service.json b/sharness/config/ssl/service.json index 6dca5856..1484ceb6 100644 --- a/sharness/config/ssl/service.json +++ b/sharness/config/ssl/service.json @@ -7,7 +7,6 @@ "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", - "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, @@ -68,10 +67,6 @@ } }, "pin_tracker": { - "maptracker": { - "max_pin_queue_size": 50000, - "concurrent_pins": 10 - }, "stateless": { "max_pin_queue_size": 50000, "concurrent_pins": 10 diff --git a/sharness/t0010-ctl-basic-commands.sh b/sharness/t0010-ctl-basic-commands.sh index 7241b7c4..d4aeea6f 100755 --- a/sharness/t0010-ctl-basic-commands.sh +++ b/sharness/t0010-ctl-basic-commands.sh @@ -39,7 +39,6 @@ test_expect_success "cluster-ctl commands output looks good" ' egrep -q "ipfs-cluster-ctl peers" commands.txt && egrep -q "ipfs-cluster-ctl pin" commands.txt && egrep -q "ipfs-cluster-ctl status" commands.txt && - egrep -q "ipfs-cluster-ctl sync" commands.txt && egrep -q "ipfs-cluster-ctl recover" commands.txt && egrep -q "ipfs-cluster-ctl version" commands.txt && egrep -q "ipfs-cluster-ctl commands" commands.txt diff --git a/sharness/t0025-ctl-status-report-commands.sh b/sharness/t0025-ctl-status-report-commands.sh index 63edf90a..4b2b978b 100755 --- a/sharness/t0025-ctl-status-report-commands.sh +++ b/sharness/t0025-ctl-status-report-commands.sh @@ -41,10 +41,6 @@ test_expect_success IPFS,CLUSTER "invalid CID status" ' test_must_fail ipfs-cluster-ctl status XXXinvalid-CIDXXX ' -test_expect_success IPFS,CLUSTER "empty cluster-ctl sync succeeds" ' - ipfs-cluster-ctl sync -' - test_expect_success IPFS,CLUSTER "empty cluster_ctl recover should not fail" ' ipfs-cluster-ctl recover ' diff --git a/test/cids.go b/test/cids.go index 06e321a8..079d9c96 100644 --- a/test/cids.go +++ b/test/cids.go @@ -11,6 +11,7 @@ var ( Cid2, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma") Cid3, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb") Cid4, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuc57") + Cid5, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") Cid4Data = "Cid4Data" // Cid resulting from block put NOT ipfs add SlowCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") CidResolved, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc") diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 0c563890..7aa31db0 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -285,22 +285,6 @@ func (mock *mockCluster) StatusLocal(ctx context.Context, in cid.Cid, out *api.P return (&mockPinTracker{}).Status(ctx, in, out) } -func (mock *mockCluster) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { - return mock.StatusAll(ctx, in, out) -} - -func (mock *mockCluster) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { - return mock.StatusAllLocal(ctx, in, out) -} - -func (mock *mockCluster) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { - return mock.Status(ctx, in, out) -} - -func (mock *mockCluster) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { - return mock.StatusLocal(ctx, in, out) -} - func (mock *mockCluster) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { return mock.StatusAll(ctx, in, out) }