Merge branch 'master' into feat/alerts

This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-12-13 10:22:06 +05:30
commit d21860eee7
39 changed files with 270 additions and 2887 deletions

View File

@ -30,9 +30,6 @@ jobs:
- name: "Main Tests with raft consensus"
script:
- travis_wait go test -v -timeout 15m -failfast -consensus raft .
- name: "Main Tests with stateless tracker"
script:
- travis_wait go test -v -timeout 15m -failfast -tracker stateless .
- name: "Golint and go vet"
script:
- go get -u golang.org/x/lint/golint

View File

@ -86,17 +86,6 @@ type Client interface {
// StatusAll gathers Status() for all tracked items.
StatusAll(ctx context.Context, filter api.TrackerStatus, local bool) ([]*api.GlobalPinInfo, error)
// Sync makes sure the state of a Cid corresponds to the state reported
// by the ipfs daemon, and returns it. If local is true, this operation
// only happens on the current peer, otherwise it happens on every
// cluster peer.
Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error)
// SyncAll triggers Sync() operations for all tracked items. It only
// returns informations for items that were de-synced or have an error
// state. If local is true, the operation is limited to the current
// peer. Otherwise it happens on every cluster peer.
SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error)
// Recover retriggers pin or unpin ipfs operations for a Cid in error
// state. If local is true, the operation is limited to the current
// peer, otherwise it happens on every cluster peer.

View File

@ -270,37 +270,6 @@ func (lc *loadBalancingClient) StatusAll(ctx context.Context, filter api.Tracker
return pinInfos, err
}
// Sync makes sure the state of a Cid corresponds to the state reported by
// the ipfs daemon, and returns it. If local is true, this operation only
// happens on the current peer, otherwise it happens on every cluster peer.
func (lc *loadBalancingClient) Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) {
var pinInfo *api.GlobalPinInfo
call := func(c Client) error {
var err error
pinInfo, err = c.Sync(ctx, ci, local)
return err
}
err := lc.retry(0, call)
return pinInfo, err
}
// SyncAll triggers Sync() operations for all tracked items. It only returns
// informations for items that were de-synced or have an error state. If
// local is true, the operation is limited to the current peer. Otherwise
// it happens on every cluster peer.
func (lc *loadBalancingClient) SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) {
var pinInfos []*api.GlobalPinInfo
call := func(c Client) error {
var err error
pinInfos, err = c.SyncAll(ctx, local)
return err
}
err := lc.retry(0, call)
return pinInfos, err
}
// Recover retriggers pin or unpin ipfs operations for a Cid in error state.
// If local is true, the operation is limited to the current peer, otherwise
// it happens on every cluster peer.

View File

@ -250,38 +250,6 @@ func (c *defaultClient) StatusAll(ctx context.Context, filter api.TrackerStatus,
return gpis, err
}
// Sync makes sure the state of a Cid corresponds to the state reported by
// the ipfs daemon, and returns it. If local is true, this operation only
// happens on the current peer, otherwise it happens on every cluster peer.
func (c *defaultClient) Sync(ctx context.Context, ci cid.Cid, local bool) (*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "client/Sync")
defer span.End()
var gpi api.GlobalPinInfo
err := c.do(
ctx,
"POST",
fmt.Sprintf("/pins/%s/sync?local=%t", ci.String(), local),
nil,
nil,
&gpi,
)
return &gpi, err
}
// SyncAll triggers Sync() operations for all tracked items. It only returns
// informations for items that were de-synced or have an error state. If
// local is true, the operation is limited to the current peer. Otherwise
// it happens on every cluster peer.
func (c *defaultClient) SyncAll(ctx context.Context, local bool) ([]*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "client/SyncAll")
defer span.End()
var gpis []*api.GlobalPinInfo
err := c.do(ctx, "POST", fmt.Sprintf("/pins/sync?local=%t", local), nil, nil, &gpis)
return gpis, err
}
// Recover retriggers pin or unpin ipfs operations for a Cid in error state.
// If local is true, the operation is limited to the current peer, otherwise
// it happens on every cluster peer.

View File

@ -376,43 +376,6 @@ func TestStatusAll(t *testing.T) {
testClients(t, api, testF)
}
func TestSync(t *testing.T) {
ctx := context.Background()
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c Client) {
pin, err := c.Sync(ctx, test.Cid1, false)
if err != nil {
t.Fatal(err)
}
if !pin.Cid.Equals(test.Cid1) {
t.Error("should be same pin")
}
}
testClients(t, api, testF)
}
func TestSyncAll(t *testing.T) {
ctx := context.Background()
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c Client) {
pins, err := c.SyncAll(ctx, false)
if err != nil {
t.Fatal(err)
}
if len(pins) == 0 {
t.Error("there should be some pins")
}
}
testClients(t, api, testF)
}
func TestRecover(t *testing.T) {
ctx := context.Background()
api := testAPI(t)

View File

@ -409,18 +409,6 @@ func (api *API) routes() []route {
"/pins",
api.statusAllHandler,
},
{
"Sync",
"POST",
"/pins/{hash}/sync",
api.syncHandler,
},
{
"SyncAll",
"POST",
"/pins/sync",
api.syncAllHandler,
},
{
"Recover",
"POST",
@ -997,66 +985,6 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (api *API) syncAllHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
if local == "true" {
var pinInfos []*types.PinInfo
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"SyncAllLocal",
struct{}{},
&pinInfos,
)
api.sendResponse(w, autoStatus, err, pinInfosToGlobal(pinInfos))
} else {
var pinInfos []*types.GlobalPinInfo
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"SyncAll",
struct{}{},
&pinInfos,
)
api.sendResponse(w, autoStatus, err, pinInfos)
}
}
func (api *API) syncHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
if pin := api.parseCidOrError(w, r); pin != nil {
if local == "true" {
var pinInfo types.PinInfo
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"SyncLocal",
pin.Cid,
&pinInfo,
)
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(&pinInfo))
} else {
var pinInfo types.GlobalPinInfo
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"Sync",
pin.Cid,
&pinInfo,
)
api.sendResponse(w, autoStatus, err, pinInfo)
}
}
}
func (api *API) recoverAllHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")

View File

@ -951,72 +951,6 @@ func TestAPIStatusEndpoint(t *testing.T) {
testBothEndpoints(t, tf)
}
func TestAPISyncAllEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url urlF) {
var resp []*api.GlobalPinInfo
makePost(t, rest, url(rest)+"/pins/sync", []byte{}, &resp)
if len(resp) != 3 ||
!resp[0].Cid.Equals(test.Cid1) ||
resp[1].PeerMap[peer.IDB58Encode(test.PeerID1)].Status.String() != "pinning" {
t.Errorf("unexpected syncAll resp:\n %+v", resp)
}
// Test local=true
var resp2 []*api.GlobalPinInfo
makePost(t, rest, url(rest)+"/pins/sync?local=true", []byte{}, &resp2)
if len(resp2) != 2 {
t.Errorf("unexpected syncAll+local resp:\n %+v", resp2)
}
}
testBothEndpoints(t, tf)
}
func TestAPISyncEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url urlF) {
var resp api.GlobalPinInfo
makePost(t, rest, url(rest)+"/pins/"+test.Cid1.String()+"/sync", []byte{}, &resp)
if !resp.Cid.Equals(test.Cid1) {
t.Error("expected the same cid")
}
info, ok := resp.PeerMap[peer.IDB58Encode(test.PeerID1)]
if !ok {
t.Fatal("expected info for test.PeerID1")
}
if info.Status.String() != "pinned" {
t.Error("expected different status")
}
// Test local=true
var resp2 api.GlobalPinInfo
makePost(t, rest, url(rest)+"/pins/"+test.Cid1.String()+"/sync?local=true", []byte{}, &resp2)
if !resp2.Cid.Equals(test.Cid1) {
t.Error("expected the same cid")
}
info, ok = resp2.PeerMap[peer.IDB58Encode(test.PeerID2)]
if !ok {
t.Fatal("expected info for test.PeerID2")
}
if info.Status.String() != "pinned" {
t.Error("expected different status")
}
}
testBothEndpoints(t, tf)
}
func TestAPIRecoverEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)

View File

@ -251,13 +251,12 @@ func (c *Cluster) setupRPCClients() {
}
}
// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time
func (c *Cluster) syncWatcher() {
ctx, span := trace.StartSpan(c.ctx, "cluster/syncWatcher")
// watchPinset triggers recurrent operations that loop on the pinset.
func (c *Cluster) watchPinset() {
ctx, span := trace.StartSpan(c.ctx, "cluster/watchPinset")
defer span.End()
stateSyncTicker := time.NewTicker(c.config.StateSyncInterval)
syncTicker := time.NewTicker(c.config.IPFSSyncInterval)
recoverTicker := time.NewTicker(c.config.PinRecoverInterval)
for {
@ -265,14 +264,12 @@ func (c *Cluster) syncWatcher() {
case <-stateSyncTicker.C:
logger.Debug("auto-triggering StateSync()")
c.StateSync(ctx)
case <-syncTicker.C:
logger.Debug("auto-triggering SyncAllLocal()")
c.SyncAllLocal(ctx)
case <-recoverTicker.C:
logger.Debug("auto-triggering RecoverAllLocal()")
c.RecoverAllLocal(ctx)
case <-c.ctx.Done():
stateSyncTicker.Stop()
recoverTicker.Stop()
return
}
}
@ -574,7 +571,7 @@ func (c *Cluster) run() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.syncWatcher()
c.watchPinset()
}()
c.wg.Add(1)
@ -996,113 +993,59 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
return nil
}
// StateSync syncs the consensus state to the Pin Tracker, ensuring
// that every Cid in the shared state is tracked and that the Pin Tracker
// is not tracking more Cids than it should.
// StateSync performs maintenance tasks on the global state that require
// looping through all the items. It is triggered automatically on
// StateSyncInterval. Currently it:
// * Sends unpin for expired items for which this peer is "closest"
// (skipped for follower peers)
func (c *Cluster) StateSync(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "cluster/StateSync")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
if c.config.FollowerMode {
return nil
}
cState, err := c.consensus.State(ctx)
if err != nil {
return err
}
logger.Debug("syncing state to tracker")
timeNow := time.Now()
clusterPins, err := cState.List(ctx)
if err != nil {
return err
}
trackedPins := c.tracker.StatusAll(ctx)
trackedPinsMap := make(map[string]struct{})
for _, tpin := range trackedPins {
trackedPinsMap[tpin.Cid.String()] = struct{}{}
// Only trigger pin operations if we are the closest with respect to
// other trusted peers. We cannot know if our peer ID is trusted by
// other peers in the Cluster. This assumes yes. Setting FollowerMode
// is a way to assume the opposite and skip this completely.
trustedPeers, err := c.getTrustedPeers(ctx)
if err != nil {
return nil
}
// Track items which are not tracked
for _, pin := range clusterPins {
_, tracked := trackedPinsMap[pin.Cid.String()]
if !tracked {
logger.Debugf("StateSync: tracking %s, part of the shared state", pin.Cid)
err = c.tracker.Track(ctx, pin)
if err != nil {
return err
}
}
checker := distanceChecker{
local: c.id,
otherPeers: trustedPeers,
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
}
isClosest := func(cid.Cid) bool {
return false
}
if !c.config.FollowerMode {
trustedPeers, err := c.getTrustedPeers(ctx)
if err != nil {
return nil
}
checker := distanceChecker{
local: c.id,
otherPeers: trustedPeers,
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
}
isClosest = func(c cid.Cid) bool {
return checker.isClosest(c)
}
}
// a. Untrack items which should not be tracked
// b. Unpin items which have expired
// c. Track items which should not be remote as local
// d. Track items which should not be local as remote
for _, p := range trackedPins {
pCid := p.Cid
currentPin, err := cState.Get(ctx, pCid)
if err != nil && err != state.ErrNotFound {
return err
}
if err == state.ErrNotFound {
logger.Debugf("StateSync: untracking %s: not part of shared state", pCid)
c.tracker.Untrack(ctx, pCid)
continue
}
if currentPin.ExpiredAt(timeNow) && isClosest(pCid) {
logger.Infof("Unpinning %s: pin expired at %s", pCid, currentPin.ExpireAt)
if _, err := c.Unpin(ctx, pCid); err != nil {
// Unpin expired items when we are the closest peer to them.
for _, p := range clusterPins {
if p.ExpiredAt(timeNow) && checker.isClosest(p.Cid) {
logger.Infof("Unpinning %s: pin expired at %s", p.Cid, p.ExpireAt)
if _, err := c.Unpin(ctx, p.Cid); err != nil {
logger.Error(err)
}
continue
}
err = c.updateRemotePins(ctx, currentPin, p)
if err != nil {
return err
}
}
return nil
}
func (c *Cluster) updateRemotePins(ctx context.Context, pin *api.Pin, p *api.PinInfo) error {
var err error
allocatedHere := pin.ReplicationFactorMin == -1 || containsPeer(pin.Allocations, c.id)
switch {
case p.Status == api.TrackerStatusRemote && allocatedHere:
logger.Debugf("StateSync: Tracking %s locally (currently remote)", p.Cid)
err = c.tracker.Track(ctx, pin)
case p.Status == api.TrackerStatusPinned && !allocatedHere:
logger.Debugf("StateSync: Tracking %s as remote (currently local)", p.Cid)
err = c.tracker.Track(ctx, pin)
}
return err
}
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
// If an error happens, the slice will contain as much information as
// could be fetched from other peers.
@ -1143,48 +1086,6 @@ func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) *api.PinInfo {
return c.tracker.Status(ctx, h)
}
// SyncAll triggers SyncAllLocal() operations in all cluster peers, making sure
// that the state of tracked items matches the state reported by the IPFS daemon
// and returning the results as GlobalPinInfo. If an error happens, the slice
// will contain as much information as could be fetched from the peers.
func (c *Cluster) SyncAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/SyncAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoSlice(ctx, "Cluster", "SyncAllLocal")
}
// SyncAllLocal makes sure that the current state for all tracked items
// in this peer matches the state reported by the IPFS daemon.
//
// SyncAllLocal returns the list of PinInfo that where updated because of
// the operation, along with those in error states.
func (c *Cluster) SyncAllLocal(ctx context.Context) ([]*api.PinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/SyncAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
syncedItems, err := c.tracker.SyncAll(ctx)
// Despite errors, tracker provides synced items that we can provide.
// They encapsulate the error.
if err != nil {
logger.Error("tracker.Sync() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
}
return syncedItems, err
}
// Sync triggers a SyncLocal() operation for a given Cid.
// in all cluster peers.
func (c *Cluster) Sync(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/Sync")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoCid(ctx, "Cluster", "SyncLocal", h)
}
// used for RecoverLocal and SyncLocal.
func (c *Cluster) localPinInfoOp(
ctx context.Context,
@ -1212,17 +1113,6 @@ func (c *Cluster) localPinInfoOp(
}
// SyncLocal performs a local sync operation for the given Cid. This will
// tell the tracker to verify the status of the Cid against the IPFS daemon.
// It returns the updated PinInfo for the Cid.
func (c *Cluster) SyncLocal(ctx context.Context, h cid.Cid) (pInfo *api.PinInfo, err error) {
_, span := trace.StartSpan(ctx, "cluster/SyncLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.localPinInfoOp(ctx, h, c.tracker.Sync)
}
// RecoverAll triggers a RecoverAllLocal operation on all peer.
func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/RecoverAll")

View File

@ -29,7 +29,6 @@ var DefaultListenAddrs = []string{"/ip4/0.0.0.0/tcp/9096", "/ip4/0.0.0.0/udp/909
const (
DefaultEnableRelayHop = true
DefaultStateSyncInterval = 600 * time.Second
DefaultIPFSSyncInterval = 130 * time.Second
DefaultPinRecoverInterval = 1 * time.Hour
DefaultMonitorPingInterval = 15 * time.Second
DefaultPeerWatchInterval = 5 * time.Second
@ -94,14 +93,6 @@ type Config struct {
// consistency, increase with larger states.
StateSyncInterval time.Duration
// Time between syncs of the local state and
// the state of the ipfs daemon. This ensures that cluster
// provides the right status for tracked items (for example
// to detect that a pin has been removed. Reduce for faster
// consistency, increase when the number of pinned items is very
// large.
IPFSSyncInterval time.Duration
// Time between automatic runs of the "recover" operation
// which will retry to pin/unpin items in error state.
PinRecoverInterval time.Duration
@ -177,7 +168,6 @@ type configJSON struct {
EnableRelayHop bool `json:"enable_relay_hop"`
ConnectionManager *connMgrConfigJSON `json:"connection_manager"`
StateSyncInterval string `json:"state_sync_interval"`
IPFSSyncInterval string `json:"ipfs_sync_interval"`
PinRecoverInterval string `json:"pin_recover_interval"`
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
@ -267,10 +257,6 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.state_sync_interval is invalid")
}
if cfg.IPFSSyncInterval <= 0 {
return errors.New("cluster.ipfs_sync_interval is invalid")
}
if cfg.PinRecoverInterval <= 0 {
return errors.New("cluster.pin_recover_interval is invalid")
}
@ -367,7 +353,6 @@ func (cfg *Config) setDefaults() {
}
cfg.LeaveOnShutdown = DefaultLeaveOnShutdown
cfg.StateSyncInterval = DefaultStateSyncInterval
cfg.IPFSSyncInterval = DefaultIPFSSyncInterval
cfg.PinRecoverInterval = DefaultPinRecoverInterval
cfg.ReplicationFactorMin = DefaultReplicationFactor
cfg.ReplicationFactorMax = DefaultReplicationFactor
@ -441,7 +426,6 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
err = config.ParseDurations("cluster",
&config.DurationOpt{Duration: jcfg.StateSyncInterval, Dst: &cfg.StateSyncInterval, Name: "state_sync_interval"},
&config.DurationOpt{Duration: jcfg.IPFSSyncInterval, Dst: &cfg.IPFSSyncInterval, Name: "ipfs_sync_interval"},
&config.DurationOpt{Duration: jcfg.PinRecoverInterval, Dst: &cfg.PinRecoverInterval, Name: "pin_recover_interval"},
&config.DurationOpt{Duration: jcfg.MonitorPingInterval, Dst: &cfg.MonitorPingInterval, Name: "monitor_ping_interval"},
&config.DurationOpt{Duration: jcfg.PeerWatchInterval, Dst: &cfg.PeerWatchInterval, Name: "peer_watch_interval"},
@ -507,7 +491,6 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
GracePeriod: cfg.ConnMgr.GracePeriod.String(),
}
jcfg.StateSyncInterval = cfg.StateSyncInterval.String()
jcfg.IPFSSyncInterval = cfg.IPFSSyncInterval.String()
jcfg.PinRecoverInterval = cfg.PinRecoverInterval.String()
jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String()
jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String()

View File

@ -24,7 +24,6 @@ var ccfgTestJSON = []byte(`
"/ip4/127.0.0.1/udp/10000/quic"
],
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"pin_recover_interval": "1m",
"replication_factor_min": 5,
"replication_factor_max": 5,

View File

@ -17,6 +17,7 @@ import (
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/test"
"github.com/ipfs/ipfs-cluster/version"
@ -148,7 +149,7 @@ type mockTracer struct {
}
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ident, clusterCfg, _, _, _, badgerCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ctx := context.Background()
host, pubsub, dht := createHost(t, ident.PrivateKey, clusterCfg.Secret, clusterCfg.ListenAddr)
@ -162,11 +163,12 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
api := &mockAPI{}
proxy := &mockProxy{}
ipfs := &mockConnector{}
tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
tracer := &mockTracer{}
store := makeStore(t, badgerCfg)
cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, false, crdtCfg)
tracker := stateless.New(statelesstrackerCfg, ident.ID, clusterCfg.Peername, cons.State)
var peersF func(context.Context) ([]peer.ID, error)
if consensus == "raft" {

View File

@ -779,9 +779,6 @@ Cluster, including which member is pinning them and any errors.
If a CID is provided, the status will be only fetched for a single
item. Metadata CIDs are included in the status response
The status of a CID may not be accurate. A manual sync can be triggered
with "sync".
When the --local flag is passed, it will only fetch the status from the
contacted cluster peer. By default, status will be fetched from all peers.
@ -817,42 +814,6 @@ separated list). The following are valid status values:
return nil
},
},
{
Name: "sync",
Usage: "Sync status of tracked items",
Description: `
This command asks Cluster peers to verify that the current status of tracked
CIDs is accurate by triggering queries to the IPFS daemons that pin them.
If a CID is provided, the sync and recover operations will be limited to
that single item.
Unless providing a specific CID, the command will output only items which
have changed status because of the sync or are in error state in some node,
therefore, the output should be empty if no operations were performed.
CIDs in error state may be manually recovered with "recover".
When the --local flag is passed, it will only trigger sync
operations on the contacted peer. By default, all peers will sync.
`,
ArgsUsage: "[CID]",
Flags: []cli.Flag{
localFlag(),
},
Action: func(c *cli.Context) error {
cidStr := c.Args().First()
if cidStr != "" {
ci, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp, cerr := globalClient.Sync(ctx, ci, c.Bool("local"))
formatResponse(c, resp, cerr)
} else {
resp, cerr := globalClient.SyncAll(ctx, c.Bool("local"))
formatResponse(c, resp, cerr)
}
return nil
},
},
{
Name: "recover",
Usage: "Recover tracked items in error state",

View File

@ -17,7 +17,6 @@ import (
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"go.opencensus.io/tag"
@ -150,14 +149,6 @@ func createCluster(
connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp)
checkErr("creating IPFS Connector component", err)
tracker := setupPinTracker(
c.String("pintracker"),
host,
cfgs.Maptracker,
cfgs.Statelesstracker,
cfgs.Cluster.Peername,
)
informer, err := disk.NewInformer(cfgs.Diskinf)
checkErr("creating disk informer", err)
alloc := descendalloc.NewAllocator()
@ -190,6 +181,9 @@ func createCluster(
peersF = cons.Peers
}
tracker := stateless.New(cfgs.Statelesstracker, host.ID(), cfgs.Cluster.Peername, cons.State)
logger.Debug("stateless pintracker loaded")
mon, err := pubsubmon.New(ctx, cfgs.Pubsubmon, pubsub, peersF)
if err != nil {
store.Close()
@ -225,29 +219,6 @@ func bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, bootstraps []m
}
}
func setupPinTracker(
name string,
h host.Host,
mapCfg *maptracker.Config,
statelessCfg *stateless.Config,
peerName string,
) ipfscluster.PinTracker {
switch name {
case "map":
ptrk := maptracker.NewMapPinTracker(mapCfg, h.ID(), peerName)
logger.Debug("map pintracker loaded")
return ptrk
case "stateless":
ptrk := stateless.New(statelessCfg, h.ID(), peerName)
logger.Debug("stateless pintracker loaded")
return ptrk
default:
err := errors.New("unknown pintracker type")
checkErr("", err)
return nil
}
}
func setupDatastore(cfgHelper *cmdutils.ConfigHelper) ds.Datastore {
stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgHelper.Configs())
checkErr("creating state manager", err)

View File

@ -29,9 +29,8 @@ const programName = "ipfs-cluster-service"
// flag defaults
const (
defaultPinTracker = "map"
defaultLogLevel = "info"
defaultConsensus = "crdt"
defaultLogLevel = "info"
defaultConsensus = "crdt"
)
const (
@ -413,12 +412,6 @@ the peer IDs in the given multiaddresses.
Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"",
Hidden: true,
},
cli.StringFlag{
Name: "pintracker",
Value: defaultPinTracker,
Hidden: true,
Usage: "pintracker to use [map,stateless].",
},
cli.BoolFlag{
Name: "stats",
Usage: "enable stats collection",

View File

@ -19,7 +19,6 @@ import (
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
)
@ -31,7 +30,6 @@ type Configs struct {
Ipfshttp *ipfshttp.Config
Raft *raft.Config
Crdt *crdt.Config
Maptracker *maptracker.Config
Statelesstracker *stateless.Config
Pubsubmon *pubsubmon.Config
Diskinf *disk.Config
@ -183,7 +181,6 @@ func (ch *ConfigHelper) init() {
Ipfshttp: &ipfshttp.Config{},
Raft: &raft.Config{},
Crdt: &crdt.Config{},
Maptracker: &maptracker.Config{},
Statelesstracker: &stateless.Config{},
Pubsubmon: &pubsubmon.Config{},
Diskinf: &disk.Config{},
@ -195,7 +192,6 @@ func (ch *ConfigHelper) init() {
man.RegisterComponent(config.API, cfgs.Restapi)
man.RegisterComponent(config.API, cfgs.Ipfsproxy)
man.RegisterComponent(config.IPFSConn, cfgs.Ipfshttp)
man.RegisterComponent(config.PinTracker, cfgs.Maptracker)
man.RegisterComponent(config.PinTracker, cfgs.Statelesstracker)
man.RegisterComponent(config.Monitor, cfgs.Pubsubmon)
man.RegisterComponent(config.Informer, cfgs.Diskinf)

View File

@ -11,7 +11,6 @@ import (
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
)
@ -32,7 +31,6 @@ var testingClusterCfg = []byte(`{
"grace_period": "2m0s"
},
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "1s",
"peer_watch_interval": "1s",
@ -130,8 +128,8 @@ var testingTracerCfg = []byte(`{
"service_name": "cluster-daemon"
}`)
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg := testingEmptyConfigs()
identity.LoadJSON(testingIdentity)
clusterCfg.LoadJSON(testingClusterCfg)
apiCfg.LoadJSON(testingAPICfg)
@ -140,16 +138,15 @@ func testingConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Confi
badgerCfg.LoadJSON(testingBadgerCfg)
raftCfg.LoadJSON(testingRaftCfg)
crdtCfg.LoadJSON(testingCrdtCfg)
maptrackerCfg.LoadJSON(testingTrackerCfg)
statelesstrkrCfg.LoadJSON(testingTrackerCfg)
pubsubmonCfg.LoadJSON(testingMonCfg)
diskInfCfg.LoadJSON(testingDiskInfCfg)
tracingCfg.LoadJSON(testingTracerCfg)
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfsCfg, badgerCfg, raftCfg, crdtCfg, statelesstrkrCfg, pubsubmonCfg, diskInfCfg, tracingCfg
}
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
identity := &config.Identity{}
clusterCfg := &Config{}
apiCfg := &rest.Config{}
@ -158,12 +155,11 @@ func testingEmptyConfigs() (*config.Identity, *Config, *rest.Config, *ipfsproxy.
badgerCfg := &badger.Config{}
raftCfg := &raft.Config{}
crdtCfg := &crdt.Config{}
maptrackerCfg := &maptracker.Config{}
statelessCfg := &stateless.Config{}
pubsubmonCfg := &pubsubmon.Config{}
diskInfCfg := &disk.Config{}
tracingCfg := &observations.TracingConfig{}
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg
return identity, clusterCfg, apiCfg, proxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, statelessCfg, pubsubmonCfg, diskInfCfg, tracingCfg
}
// func TestConfigDefault(t *testing.T) {

View File

@ -475,7 +475,7 @@ func (css *Consensus) Leader(ctx context.Context) (peer.ID, error) {
func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error) {
batching, ok := store.(ds.Batching)
if !ok {
return nil, errors.New("must provide a Bathing datastore")
return nil, errors.New("must provide a Batching datastore")
}
opts := crdt.DefaultOptions()
opts.Logger = logger

View File

@ -123,12 +123,6 @@ type PinTracker interface {
StatusAll(context.Context) []*api.PinInfo
// Status returns the local status of a given Cid.
Status(context.Context, cid.Cid) *api.PinInfo
// SyncAll makes sure that all tracked Cids reflect the real IPFS status.
// It returns the list of pins which were updated by the call.
SyncAll(context.Context) ([]*api.PinInfo, error)
// Sync makes sure that the Cid status reflect the real IPFS status.
// It returns the local status of the Cid.
Sync(context.Context, cid.Cid) (*api.PinInfo, error)
// RecoverAll calls Recover() for all pins tracked.
RecoverAll(context.Context) ([]*api.PinInfo, error)
// Recover retriggers a Pin/Unpin operation in a Cids with error status.

View File

@ -28,7 +28,6 @@ import (
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/test"
@ -174,7 +173,7 @@ func createComponents(
peername := fmt.Sprintf("peer_%d", i)
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, maptrackerCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs()
ident, clusterCfg, apiCfg, ipfsproxyCfg, ipfshttpCfg, badgerCfg, raftCfg, crdtCfg, statelesstrackerCfg, psmonCfg, diskInfCfg, tracingCfg := testingConfigs()
ident.ID = host.ID()
ident.PrivateKey = host.Peerstore().PrivKey(host.ID())
@ -208,8 +207,6 @@ func createComponents(
t.Fatal(err)
}
tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
alloc := descendalloc.NewAllocator()
inf, err := disk.NewInformer(diskInfCfg)
if err != nil {
@ -218,6 +215,7 @@ func createComponents(
store := makeStore(t, badgerCfg)
cons := makeConsensus(t, store, host, pubsub, dht, raftCfg, staging, crdtCfg)
tracker := stateless.New(statelesstrackerCfg, ident.ID, clusterCfg.Peername, cons.State)
var peersF func(context.Context) ([]peer.ID, error)
if consensus == "raft" {
@ -268,19 +266,6 @@ func makeConsensus(t *testing.T, store ds.Datastore, h host.Host, psub *pubsub.P
}
}
func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg *stateless.Config, peerName string) PinTracker {
var ptrkr PinTracker
switch ptracker {
case "map":
ptrkr = maptracker.NewMapPinTracker(mptCfg, pid, peerName)
case "stateless":
ptrkr = stateless.New(sptCfg, pid, peerName)
default:
panic("bad pintracker")
}
return ptrkr
}
func createCluster(t *testing.T, host host.Host, dht *dht.IpfsDHT, clusterCfg *Config, store ds.Datastore, consensus Consensus, apis []API, ipfs IPFSConnector, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer, tracer Tracer) *Cluster {
cl, err := NewCluster(context.Background(), host, dht, clusterCfg, store, consensus, apis, ipfs, tracker, mon, alloc, []Informer{inf}, tracer)
if err != nil {
@ -914,162 +899,6 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
runF(t, clusters, f)
}
func TestClustersSyncAllLocal(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{}) // This cid always fails
clusters[0].Pin(ctx, test.Cid2, api.PinOptions{})
pinDelay()
pinDelay()
f := func(t *testing.T, c *Cluster) {
// Sync bad ID
infos, err := c.SyncAllLocal(ctx)
if err != nil {
// LocalSync() is asynchronous and should not show an
// error even if Recover() fails.
t.Error(err)
}
if len(infos) != 1 {
t.Fatalf("expected 1 elem slice, got = %d", len(infos))
}
// Last-known state may still be pinning
if infos[0].Status != api.TrackerStatusPinError && infos[0].Status != api.TrackerStatusPinning {
t.Errorf("element should be in Pinning or PinError state, got = %v", infos[0].Status)
}
}
// Test Local syncs
runF(t, clusters, f)
}
func TestClustersSyncLocal(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h := test.ErrorCid
h2 := test.Cid2
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h2, api.PinOptions{})
pinDelay()
pinDelay()
f := func(t *testing.T, c *Cluster) {
info, err := c.SyncLocal(ctx, h)
if err != nil {
t.Error(err)
}
if info.Status != api.TrackerStatusPinError && info.Status != api.TrackerStatusPinning {
t.Errorf("element is %s and not PinError", info.Status)
}
// Sync good ID
info, err = c.SyncLocal(ctx, h2)
if err != nil {
t.Error(err)
}
if info.Status != api.TrackerStatusPinned {
t.Error("element should be in Pinned state")
}
}
// Test Local syncs
runF(t, clusters, f)
}
func TestClustersSyncAll(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{})
clusters[0].Pin(ctx, test.Cid2, api.PinOptions{})
pinDelay()
pinDelay()
j := rand.Intn(nClusters) // choose a random cluster peer
ginfos, err := clusters[j].SyncAll(ctx)
if err != nil {
t.Fatal(err)
}
if len(ginfos) != 1 {
t.Fatalf("expected globalsync to have 1 elements, got = %d", len(ginfos))
}
if !ginfos[0].Cid.Equals(test.ErrorCid) {
t.Error("expected globalsync to have problems with test.ErrorCid")
}
for _, c := range clusters {
inf, ok := ginfos[0].PeerMap[peer.IDB58Encode(c.host.ID())]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning {
t.Error("should be PinError in all peers")
}
}
}
func TestClustersSync(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h := test.ErrorCid // This cid always fails
h2 := test.Cid2
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h2, api.PinOptions{})
pinDelay()
pinDelay()
j := rand.Intn(nClusters)
ginfo, err := clusters[j].Sync(ctx, h)
if err != nil {
// we always attempt to return a valid response
// with errors contained in GlobalPinInfo
t.Fatal("did not expect an error")
}
pinfo, ok := ginfo.PeerMap[peer.IDB58Encode(clusters[j].host.ID())]
if !ok {
t.Fatal("should have info for this host")
}
if pinfo.Error == "" {
t.Error("pinInfo error should not be empty")
}
if !ginfo.Cid.Equals(h) {
t.Error("GlobalPinInfo should be for test.ErrorCid")
}
for _, c := range clusters {
inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())]
if !ok {
t.Logf("%+v", ginfo)
t.Fatal("GlobalPinInfo should not be empty for this host")
}
if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning {
t.Error("should be PinError or Pinning in all peers")
}
}
// Test with a good Cid
j = rand.Intn(nClusters)
ginfo, err = clusters[j].Sync(ctx, h2)
if err != nil {
t.Fatal(err)
}
if !ginfo.Cid.Equals(h2) {
t.Error("GlobalPinInfo should be for testrCid2")
}
for _, c := range clusters {
inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.Status != api.TrackerStatusPinned {
t.Error("the GlobalPinInfo should show Pinned in all peers")
}
}
}
func TestClustersRecoverLocal(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
@ -1098,10 +927,7 @@ func TestClustersRecoverLocal(t *testing.T) {
}
// Recover good ID
info, err = c.SyncLocal(ctx, h2)
if err != nil {
t.Error(err)
}
info, err = c.RecoverLocal(ctx, h2)
if info.Status != api.TrackerStatusPinned {
t.Error("element should be in Pinned state")
}
@ -2175,19 +2001,6 @@ func TestClustersFollowerMode(t *testing.T) {
}
})
t.Run("follower syncs itself", func(t *testing.T) {
gpis, err := clusters[1].SyncAll(ctx)
if err != nil {
t.Error("sync should work")
}
if len(gpis) != 1 {
t.Fatal("globalPinInfo should have 1 pins (in error)")
}
if len(gpis[0].PeerMap) != 1 {
t.Fatal("globalPinInfo[0] should only have one peer")
}
})
t.Run("follower status itself only", func(t *testing.T) {
gpi, err := clusters[1].Status(ctx, test.Cid1)
if err != nil {

View File

@ -1,115 +0,0 @@
package maptracker
import (
"encoding/json"
"errors"
"github.com/kelseyhightower/envconfig"
"github.com/ipfs/ipfs-cluster/config"
)
const configKey = "maptracker"
const envConfigKey = "cluster_maptracker"
// Default values for this Config.
const (
DefaultMaxPinQueueSize = 1000000
DefaultConcurrentPins = 10
)
// Config allows to initialize a Monitor and customize some parameters.
type Config struct {
config.Saver
// If higher, they will automatically marked with an error.
MaxPinQueueSize int
// ConcurrentPins specifies how many pin requests can be sent to the ipfs
// daemon in parallel. If the pinning method is "refs", it might increase
// speed. Unpin requests are always processed one by one.
ConcurrentPins int
}
type jsonConfig struct {
MaxPinQueueSize int `json:"max_pin_queue_size,omitempty"`
ConcurrentPins int `json:"concurrent_pins"`
}
// ConfigKey provides a human-friendly identifier for this type of Config.
func (cfg *Config) ConfigKey() string {
return configKey
}
// Default sets the fields of this Config to sensible values.
func (cfg *Config) Default() error {
cfg.MaxPinQueueSize = DefaultMaxPinQueueSize
cfg.ConcurrentPins = DefaultConcurrentPins
return nil
}
// ApplyEnvVars fills in any Config fields found
// as environment variables.
func (cfg *Config) ApplyEnvVars() error {
jcfg := cfg.toJSONConfig()
err := envconfig.Process(envConfigKey, jcfg)
if err != nil {
return err
}
return cfg.applyJSONConfig(jcfg)
}
// Validate checks that the fields of this Config have working values,
// at least in appearance.
func (cfg *Config) Validate() error {
if cfg.MaxPinQueueSize <= 0 {
return errors.New("maptracker.max_pin_queue_size too low")
}
if cfg.ConcurrentPins <= 0 {
return errors.New("maptracker.concurrent_pins is too low")
}
return nil
}
// LoadJSON sets the fields of this Config to the values defined by the JSON
// representation of it, as generated by ToJSON.
func (cfg *Config) LoadJSON(raw []byte) error {
jcfg := &jsonConfig{}
err := json.Unmarshal(raw, jcfg)
if err != nil {
logger.Error("Error unmarshaling maptracker config")
return err
}
cfg.Default()
return cfg.applyJSONConfig(jcfg)
}
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
config.SetIfNotDefault(jcfg.MaxPinQueueSize, &cfg.MaxPinQueueSize)
config.SetIfNotDefault(jcfg.ConcurrentPins, &cfg.ConcurrentPins)
return cfg.Validate()
}
// ToJSON generates a human-friendly JSON representation of this Config.
func (cfg *Config) ToJSON() ([]byte, error) {
jcfg := cfg.toJSONConfig()
return config.DefaultJSONMarshal(jcfg)
}
func (cfg *Config) toJSONConfig() *jsonConfig {
jCfg := &jsonConfig{
ConcurrentPins: cfg.ConcurrentPins,
}
if cfg.MaxPinQueueSize != DefaultMaxPinQueueSize {
jCfg.MaxPinQueueSize = cfg.MaxPinQueueSize
}
return jCfg
}

View File

@ -1,72 +0,0 @@
package maptracker
import (
"encoding/json"
"os"
"testing"
)
var cfgJSON = []byte(`
{
"max_pin_queue_size": 4092,
"concurrent_pins": 2
}
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(cfgJSON)
if err != nil {
t.Fatal(err)
}
j := &jsonConfig{}
json.Unmarshal(cfgJSON, j)
j.ConcurrentPins = 10
tst, _ := json.Marshal(j)
err = cfg.LoadJSON(tst)
if err != nil {
t.Error("did not expect an error")
}
if cfg.ConcurrentPins != 10 {
t.Error("expected 10 concurrent pins")
}
}
func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(cfgJSON)
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
}
cfg = &Config{}
err = cfg.LoadJSON(newjson)
if err != nil {
t.Fatal(err)
}
}
func TestDefault(t *testing.T) {
cfg := &Config{}
cfg.Default()
if cfg.Validate() != nil {
t.Fatal("error validating")
}
cfg.ConcurrentPins = -2
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}
func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_MAPTRACKER_CONCURRENTPINS", "22")
cfg := &Config{}
cfg.ApplyEnvVars()
if cfg.ConcurrentPins != 22 {
t.Fatal("failed to override concurrent_pins with env var")
}
}

View File

@ -1,452 +0,0 @@
// Package maptracker implements a PinTracker component for IPFS Cluster. It
// uses a map to keep track of the state of tracked pins.
package maptracker
import (
"context"
"errors"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pintracker/optracker"
"github.com/ipfs/ipfs-cluster/pintracker/util"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
"go.opencensus.io/trace"
)
var logger = logging.Logger("pintracker")
var (
errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS")
)
// MapPinTracker is a PinTracker implementation which uses a Go map
// to store the status of the tracked Cids. This component is thread-safe.
type MapPinTracker struct {
config *Config
optracker *optracker.OperationTracker
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
peerID peer.ID
pinCh chan *optracker.Operation
unpinCh chan *optracker.Operation
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// NewMapPinTracker returns a new object which has been correctly
// initialized with the given configuration.
func NewMapPinTracker(cfg *Config, pid peer.ID, peerName string) *MapPinTracker {
ctx, cancel := context.WithCancel(context.Background())
mpt := &MapPinTracker{
ctx: ctx,
cancel: cancel,
config: cfg,
optracker: optracker.NewOperationTracker(ctx, pid, peerName),
rpcReady: make(chan struct{}, 1),
peerID: pid,
pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
}
for i := 0; i < mpt.config.ConcurrentPins; i++ {
go mpt.opWorker(ctx, mpt.pin, mpt.pinCh)
}
go mpt.opWorker(ctx, mpt.unpin, mpt.unpinCh)
return mpt
}
// receives a pin Function (pin or unpin) and a channel.
// Used for both pinning and unpinning
func (mpt *MapPinTracker) opWorker(ctx context.Context, pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) {
for {
select {
case op := <-opChan:
if op.Cancelled() {
// operation was cancelled. Move on.
// This saves some time, but not 100% needed.
continue
}
op.SetPhase(optracker.PhaseInProgress)
err := pinF(op) // call pin/unpin
if err != nil {
if op.Cancelled() {
// there was an error because
// we were cancelled. Move on.
continue
}
op.SetError(err)
op.Cancel()
continue
}
op.SetPhase(optracker.PhaseDone)
op.Cancel()
// We keep all pinned things in the tracker,
// only clean unpinned things.
if op.Type() == optracker.OperationUnpin {
mpt.optracker.Clean(ctx, op)
}
case <-mpt.ctx.Done():
return
}
}
}
// Shutdown finishes the services provided by the MapPinTracker and cancels
// any active context.
func (mpt *MapPinTracker) Shutdown(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "tracker/map/Shutdown")
defer span.End()
mpt.shutdownLock.Lock()
defer mpt.shutdownLock.Unlock()
if mpt.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("stopping MapPinTracker")
mpt.cancel()
close(mpt.rpcReady)
mpt.wg.Wait()
mpt.shutdown = true
return nil
}
func (mpt *MapPinTracker) pin(op *optracker.Operation) error {
ctx, span := trace.StartSpan(op.Context(), "tracker/map/pin")
defer span.End()
logger.Debugf("issuing pin call for %s", op.Cid())
err := mpt.rpcClient.CallContext(
ctx,
"",
"IPFSConnector",
"Pin",
op.Pin(),
&struct{}{},
)
if err != nil {
return err
}
return nil
}
func (mpt *MapPinTracker) unpin(op *optracker.Operation) error {
ctx, span := trace.StartSpan(op.Context(), "tracker/map/unpin")
defer span.End()
logger.Debugf("issuing unpin call for %s", op.Cid())
err := mpt.rpcClient.CallContext(
ctx,
"",
"IPFSConnector",
"Unpin",
op.Pin(),
&struct{}{},
)
if err != nil {
return err
}
return nil
}
// puts a new operation on the queue, unless ongoing exists
func (mpt *MapPinTracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.OperationType, ch chan *optracker.Operation) error {
ctx, span := trace.StartSpan(ctx, "tracker/map/enqueue")
defer span.End()
op := mpt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued)
if op == nil {
return nil // ongoing pin operation.
}
select {
case ch <- op:
default:
err := util.ErrFullQueue
op.SetError(err)
op.Cancel()
logger.Error(err.Error())
return err
}
return nil
}
// Track tells the MapPinTracker to start managing a Cid,
// possibly triggering Pin operations on the IPFS daemon.
func (mpt *MapPinTracker) Track(ctx context.Context, c *api.Pin) error {
ctx, span := trace.StartSpan(ctx, "tracker/map/Track")
defer span.End()
logger.Debugf("tracking %s", c.Cid)
// Sharded pins are never pinned. A sharded pin cannot turn into
// something else or viceversa like it happens with Remote pins so
// we just track them.
if c.Type == api.MetaType {
mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone)
return nil
}
// Trigger unpin whenever something remote is tracked
// Note, IPFSConn checks with pin/ls before triggering
// pin/rm, so this actually does not always trigger unpin
// to ipfs.
if util.IsRemotePin(c, mpt.peerID) {
op := mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationRemote, optracker.PhaseInProgress)
if op == nil {
return nil // Ongoing operationRemote / PhaseInProgress
}
err := mpt.unpin(op) // unpin all the time, even if not pinned
op.Cancel()
if err != nil {
op.SetError(err)
} else {
op.SetPhase(optracker.PhaseDone)
}
return nil
}
return mpt.enqueue(ctx, c, optracker.OperationPin, mpt.pinCh)
}
// Untrack tells the MapPinTracker to stop managing a Cid.
// If the Cid is pinned locally, it will be unpinned.
func (mpt *MapPinTracker) Untrack(ctx context.Context, c cid.Cid) error {
ctx, span := trace.StartSpan(ctx, "tracker/map/Untrack")
defer span.End()
logger.Infof("untracking %s", c)
return mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh)
}
// Status returns information for a Cid tracked by this
// MapPinTracker.
func (mpt *MapPinTracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Status")
defer span.End()
return mpt.optracker.Get(ctx, c)
}
// StatusAll returns information for all Cids tracked by this
// MapPinTracker.
func (mpt *MapPinTracker) StatusAll(ctx context.Context) []*api.PinInfo {
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/StatusAll")
defer span.End()
return mpt.optracker.GetAll(ctx)
}
// Sync verifies that the status of a Cid matches that of
// the IPFS daemon. If not, it will be transitioned
// to PinError or UnpinError.
//
// Sync returns the updated local status for the given Cid.
// Pins in error states can be recovered with Recover().
// An error is returned if we are unable to contact
// the IPFS daemon.
func (mpt *MapPinTracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Sync")
defer span.End()
var ips api.IPFSPinStatus
err := mpt.rpcClient.Call(
"",
"IPFSConnector",
"PinLsCid",
c,
&ips,
)
if err != nil {
mpt.optracker.SetError(ctx, c, err)
return mpt.optracker.Get(ctx, c), nil
}
return mpt.syncStatus(ctx, c, ips), nil
}
// SyncAll verifies that the statuses of all tracked Cids match the
// one reported by the IPFS daemon. If not, they will be transitioned
// to PinError or UnpinError.
//
// SyncAll returns the list of local status for all tracked Cids which
// were updated or have errors. Cids in error states can be recovered
// with Recover().
// An error is returned if we are unable to contact the IPFS daemon.
func (mpt *MapPinTracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) {
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/SyncAll")
defer span.End()
var ipsMap map[string]api.IPFSPinStatus
var results []*api.PinInfo
err := mpt.rpcClient.Call(
"",
"IPFSConnector",
"PinLs",
"recursive",
&ipsMap,
)
if err != nil {
// set pinning or unpinning ops to error, since we can't
// verify them
pInfos := mpt.optracker.GetAll(ctx)
for _, pInfo := range pInfos {
op, _ := optracker.TrackerStatusToOperationPhase(pInfo.Status)
if op == optracker.OperationPin || op == optracker.OperationUnpin {
mpt.optracker.SetError(ctx, pInfo.Cid, err)
results = append(results, mpt.optracker.Get(ctx, pInfo.Cid))
} else {
results = append(results, pInfo)
}
}
return results, nil
}
status := mpt.StatusAll(ctx)
for _, pInfoOrig := range status {
var pInfoNew *api.PinInfo
c := pInfoOrig.Cid
ips, ok := ipsMap[c.String()]
if !ok {
pInfoNew = mpt.syncStatus(ctx, c, api.IPFSPinStatusUnpinned)
} else {
pInfoNew = mpt.syncStatus(ctx, c, ips)
}
if pInfoOrig.Status != pInfoNew.Status ||
pInfoNew.Status == api.TrackerStatusUnpinError ||
pInfoNew.Status == api.TrackerStatusPinError {
results = append(results, pInfoNew)
}
}
return results, nil
}
func (mpt *MapPinTracker) syncStatus(ctx context.Context, c cid.Cid, ips api.IPFSPinStatus) *api.PinInfo {
status, ok := mpt.optracker.Status(ctx, c)
if !ok {
status = api.TrackerStatusUnpinned
}
// TODO(hector): for sharding, we may need to check that a shard
// is pinned to the right depth. For now, we assumed that if it's pinned
// in some way, then it must be right (including direct).
pinned := func(i api.IPFSPinStatus) bool {
switch i {
case api.IPFSPinStatusRecursive:
return i.IsPinned(-1)
case api.IPFSPinStatusDirect:
return i.IsPinned(0)
default:
return i.IsPinned(1) // Pinned with depth 1 or more.
}
}
if pinned(ips) {
switch status {
case api.TrackerStatusPinError:
// If an item that we wanted to pin is pinned, we mark it so
mpt.optracker.TrackNewOperation(
ctx,
api.PinCid(c),
optracker.OperationPin,
optracker.PhaseDone,
)
default:
// 1. Unpinning phases
// 2. Pinned in ipfs but we are not tracking
// -> do nothing
}
} else {
switch status {
case api.TrackerStatusUnpinError:
// clean
op := mpt.optracker.TrackNewOperation(
ctx,
api.PinCid(c),
optracker.OperationUnpin,
optracker.PhaseDone,
)
if op != nil {
mpt.optracker.Clean(ctx, op)
}
case api.TrackerStatusPinned:
// not pinned in IPFS but we think it should be: mark as error
mpt.optracker.SetError(ctx, c, errUnpinned)
default:
// 1. Pinning phases
// -> do nothing
}
}
return mpt.optracker.Get(ctx, c)
}
// Recover will re-queue a Cid in error state for the failed operation,
// possibly retriggering an IPFS pinning operation.
func (mpt *MapPinTracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Recover")
defer span.End()
pInfo := mpt.optracker.Get(ctx, c)
var err error
switch pInfo.Status {
case api.TrackerStatusPinError:
logger.Infof("Restarting pin operation for %s", c)
err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationPin, mpt.pinCh)
case api.TrackerStatusUnpinError:
logger.Infof("Restarting unpin operation for %s", c)
err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh)
}
return mpt.optracker.Get(ctx, c), err
}
// RecoverAll attempts to recover all items tracked by this peer.
func (mpt *MapPinTracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/RecoverAll")
defer span.End()
pInfos := mpt.optracker.GetAll(ctx)
var results []*api.PinInfo
for _, pInfo := range pInfos {
res, err := mpt.Recover(ctx, pInfo.Cid)
results = append(results, res)
if err != nil {
return results, err
}
}
return results, nil
}
// SetClient makes the MapPinTracker ready to perform RPC requests to
// other components.
func (mpt *MapPinTracker) SetClient(c *rpc.Client) {
mpt.rpcClient = c
mpt.rpcReady <- struct{}{}
}
// OpContext exports the internal optracker's OpContext method.
// For testing purposes only.
func (mpt *MapPinTracker) OpContext(ctx context.Context, c cid.Cid) context.Context {
return mpt.optracker.OpContext(ctx, c)
}

View File

@ -1,591 +0,0 @@
package maptracker
import (
"context"
"errors"
"math/rand"
"sync"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var (
pinCancelCid = test.Cid3
unpinCancelCid = test.Cid2
ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
)
type mockIPFS struct{}
func mockRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("IPFSConnector", &mockIPFS{})
if err != nil {
t.Fatal(err)
}
return c
}
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid.String() {
case test.SlowCid1.String():
time.Sleep(2 * time.Second)
case pinCancelCid.String():
return ErrPinCancelCid
}
return nil
}
func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid.String() {
case test.SlowCid1.String():
time.Sleep(2 * time.Second)
case unpinCancelCid.String():
return ErrUnpinCancelCid
}
return nil
}
func testPin(c cid.Cid, min, max int, allocs ...peer.ID) *api.Pin {
pin := api.PinCid(c)
pin.ReplicationFactorMin = min
pin.ReplicationFactorMax = max
pin.Allocations = allocs
return pin
}
func testSlowMapPinTracker(t *testing.T) *MapPinTracker {
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
mpt := NewMapPinTracker(cfg, test.PeerID1, test.PeerName1)
mpt.SetClient(mockRPCClient(t))
return mpt
}
func testMapPinTracker(t *testing.T) *MapPinTracker {
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
mpt := NewMapPinTracker(cfg, test.PeerID1, test.PeerName1)
mpt.SetClient(test.NewMockRPCClient(t))
return mpt
}
func TestNew(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
}
func TestShutdown(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
err := mpt.Shutdown(ctx)
if err != nil {
t.Fatal(err)
}
err = mpt.Shutdown(ctx)
if err != nil {
t.Fatal(err)
}
}
func TestTrack(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
h := test.Cid1
// Let's tart with a local pin
c := testPin(h, -1, -1)
err := mpt.Track(context.Background(), c)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond) // let it be pinned
st := mpt.Status(context.Background(), h)
if st.Status != api.TrackerStatusPinned {
t.Fatalf("cid should be pinned and is %s", st.Status)
}
// Unpin and set remote
c = testPin(h, 1, 1, test.PeerID2)
err = mpt.Track(context.Background(), c)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond) // let it be unpinned
st = mpt.Status(context.Background(), h)
if st.Status != api.TrackerStatusRemote {
t.Fatalf("cid should be pinned and is %s", st.Status)
}
}
func TestUntrack(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
h1 := test.Cid1
h2 := test.Cid2
// LocalPin
c := testPin(h1, -1, -1)
err := mpt.Track(context.Background(), c)
if err != nil {
t.Fatal(err)
}
// Remote pin
c = testPin(h2, 1, 1, test.PeerID2)
err = mpt.Track(context.Background(), c)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
err = mpt.Untrack(context.Background(), h2)
if err != nil {
t.Fatal(err)
}
err = mpt.Untrack(context.Background(), h1)
if err != nil {
t.Fatal(err)
}
err = mpt.Untrack(context.Background(), h1)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
st := mpt.Status(context.Background(), h1)
if st.Status != api.TrackerStatusUnpinned {
t.Fatalf("cid should be unpinned and is %s", st.Status)
}
st = mpt.Status(context.Background(), h2)
if st.Status != api.TrackerStatusUnpinned {
t.Fatalf("cid should be unpinned and is %s", st.Status)
}
}
func TestStatusAll(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
h1 := test.Cid1
h2 := test.Cid2
// LocalPin
c := testPin(h1, -1, -1)
mpt.Track(context.Background(), c)
c = testPin(h2, 1, 1)
mpt.Track(context.Background(), c)
time.Sleep(200 * time.Millisecond)
stAll := mpt.StatusAll(context.Background())
if len(stAll) != 2 {
t.Logf("%+v", stAll)
t.Fatal("expected 2 pins")
}
for _, st := range stAll {
if st.Cid.Equals(h1) && st.Status != api.TrackerStatusPinned {
t.Fatal("expected pinned")
}
if st.Cid.Equals(h2) && st.Status != api.TrackerStatusRemote {
t.Fatal("expected remote")
}
}
}
func TestSyncAndRecover(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
h1 := test.Cid1
h2 := test.Cid2
c := testPin(h1, -1, -1)
mpt.Track(context.Background(), c)
c = testPin(h2, -1, -1)
mpt.Track(context.Background(), c)
time.Sleep(100 * time.Millisecond)
// IPFSPinLS RPC returns unpinned for anything != Cid1 or Cid3
info, err := mpt.Sync(context.Background(), h2)
if err != nil {
t.Fatal(err)
}
if info.Status != api.TrackerStatusPinError {
t.Error("expected pin_error")
}
info, err = mpt.Sync(context.Background(), h1)
if err != nil {
t.Fatal(err)
}
if info.Status != api.TrackerStatusPinned {
t.Error("expected pinned")
}
info, err = mpt.Recover(context.Background(), h1)
if err != nil {
t.Fatal(err)
}
if info.Status != api.TrackerStatusPinned {
t.Error("expected pinned")
}
_, err = mpt.Recover(context.Background(), h2)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
info = mpt.Status(context.Background(), h2)
if info.Status != api.TrackerStatusPinned {
t.Error("expected pinned")
}
}
func TestRecoverAll(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
h1 := test.Cid1
c := testPin(h1, -1, -1)
mpt.Track(context.Background(), c)
time.Sleep(100 * time.Millisecond)
mpt.optracker.SetError(context.Background(), h1, errors.New("fakeerror"))
pins, err := mpt.RecoverAll(context.Background())
if err != nil {
t.Fatal(err)
}
if len(pins) != 1 {
t.Fatal("there should be only one pin")
}
time.Sleep(100 * time.Millisecond)
info := mpt.Status(context.Background(), h1)
if info.Status != api.TrackerStatusPinned {
t.Error("the pin should have been recovered")
}
}
func TestSyncAll(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
synced, err := mpt.SyncAll(context.Background())
if err != nil {
t.Fatal(err)
}
// This relies on the rpc mock implementation
if len(synced) != 0 {
t.Fatal("should not have synced anything when it tracks nothing")
}
h1 := test.Cid1
h2 := test.Cid2
c := testPin(h1, -1, -1)
mpt.Track(context.Background(), c)
c = testPin(h2, -1, -1)
mpt.Track(context.Background(), c)
time.Sleep(100 * time.Millisecond)
synced, err = mpt.SyncAll(context.Background())
if err != nil {
t.Fatal(err)
}
if len(synced) != 1 || synced[0].Status != api.TrackerStatusPinError {
t.Logf("%+v", synced)
t.Fatal("should have synced h2")
}
}
func TestUntrackTrack(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
h1 := test.Cid1
// LocalPin
c := testPin(h1, -1, -1)
err := mpt.Track(context.Background(), c)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
err = mpt.Untrack(context.Background(), h1)
if err != nil {
t.Fatal(err)
}
}
func TestTrackUntrackWithCancel(t *testing.T) {
ctx := context.Background()
mpt := testSlowMapPinTracker(t)
defer mpt.Shutdown(ctx)
slowPinCid := test.SlowCid1
// LocalPin
slowPin := testPin(slowPinCid, -1, -1)
err := mpt.Track(context.Background(), slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // let pinning start
pInfo := mpt.Status(context.Background(), slowPin.Cid)
if pInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("slowPin should be tracked")
}
if pInfo.Status == api.TrackerStatusPinning {
go func() {
err = mpt.Untrack(context.Background(), slowPinCid)
if err != nil {
t.Fatal(err)
}
}()
select {
case <-mpt.optracker.OpContext(context.Background(), slowPinCid).Done():
return
case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been cancelled by now")
}
} else {
t.Error("slowPin should be pinning and is:", pInfo.Status)
}
}
func TestTrackUntrackWithNoCancel(t *testing.T) {
ctx := context.Background()
mpt := testSlowMapPinTracker(t)
defer mpt.Shutdown(ctx)
slowPinCid := test.SlowCid1
fastPinCid := pinCancelCid
// SlowLocalPin
slowPin := testPin(slowPinCid, -1, -1)
// LocalPin
fastPin := testPin(fastPinCid, -1, -1)
err := mpt.Track(context.Background(), slowPin)
if err != nil {
t.Fatal(err)
}
err = mpt.Track(context.Background(), fastPin)
if err != nil {
t.Fatal(err)
}
// fastPin should be queued because slow pin is pinning
pInfo := mpt.Status(context.Background(), fastPinCid)
if pInfo.Status == api.TrackerStatusPinQueued {
err = mpt.Untrack(context.Background(), fastPinCid)
if err != nil {
t.Fatal(err)
}
pi := mpt.Status(context.Background(), fastPinCid)
if pi.Error == ErrPinCancelCid.Error() {
t.Fatal(ErrPinCancelCid)
}
} else {
t.Error("fastPin should be queued to pin:", pInfo.Status)
}
time.Sleep(100 * time.Millisecond)
pInfo = mpt.Status(context.Background(), fastPinCid)
if pInfo.Status != api.TrackerStatusUnpinned {
t.Error("fastPin should have been removed from tracker:", pInfo.Status)
}
}
func TestUntrackTrackWithCancel(t *testing.T) {
ctx := context.Background()
mpt := testSlowMapPinTracker(t)
defer mpt.Shutdown(ctx)
slowPinCid := test.SlowCid1
// LocalPin
slowPin := testPin(slowPinCid, -1, -1)
err := mpt.Track(context.Background(), slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Untrack should cancel the ongoing request
// and unpin right away
err = mpt.Untrack(context.Background(), slowPinCid)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
pInfo := mpt.Status(context.Background(), slowPinCid)
if pInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("expected slowPin to be tracked")
}
if pInfo.Status == api.TrackerStatusUnpinning {
go func() {
err = mpt.Track(context.Background(), slowPin)
if err != nil {
t.Fatal(err)
}
}()
select {
case <-mpt.optracker.OpContext(context.Background(), slowPinCid).Done():
return
case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been cancelled by now")
}
} else {
t.Error("slowPin should be in unpinning")
}
}
func TestUntrackTrackWithNoCancel(t *testing.T) {
ctx := context.Background()
mpt := testSlowMapPinTracker(t)
defer mpt.Shutdown(ctx)
slowPinCid := test.SlowCid1
fastPinCid := unpinCancelCid
// SlowLocalPin
slowPin := testPin(slowPinCid, -1, -1)
// LocalPin
fastPin := testPin(fastPinCid, -1, -1)
err := mpt.Track(context.Background(), slowPin)
if err != nil {
t.Fatal(err)
}
err = mpt.Track(context.Background(), fastPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
err = mpt.Untrack(context.Background(), slowPin.Cid)
if err != nil {
t.Fatal(err)
}
err = mpt.Untrack(context.Background(), fastPin.Cid)
if err != nil {
t.Fatal(err)
}
pInfo := mpt.Status(context.Background(), fastPinCid)
if pInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("c untrack operation should be tracked")
}
if pInfo.Status == api.TrackerStatusUnpinQueued {
err = mpt.Track(context.Background(), fastPin)
if err != nil {
t.Fatal(err)
}
pi := mpt.Status(context.Background(), fastPinCid)
if pi.Error == ErrUnpinCancelCid.Error() {
t.Fatal(ErrUnpinCancelCid)
}
} else {
t.Error("c should be queued to unpin")
}
}
func TestTrackUntrackConcurrent(t *testing.T) {
ctx := context.Background()
mpt := testMapPinTracker(t)
defer mpt.Shutdown(ctx)
h1 := test.Cid1
// LocalPin
c := testPin(h1, -1, -1)
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 50; j++ {
var err error
op := rand.Intn(2)
if op == 1 {
err = mpt.Track(context.Background(), c)
} else {
err = mpt.Untrack(context.Background(), c.Cid)
}
if err != nil {
t.Error(err)
}
}
}()
}
wg.Wait()
time.Sleep(200 * time.Millisecond)
st := mpt.Status(context.Background(), h1)
t.Log(st.Status)
if st.Status != api.TrackerStatusUnpinned && st.Status != api.TrackerStatusPinned {
t.Fatal("should be pinned or unpinned")
}
}

View File

@ -105,9 +105,11 @@ func (op *Operation) String() string {
// Cid returns the Cid associated to this operation.
func (op *Operation) Cid() cid.Cid {
var c cid.Cid
op.mu.RLock()
defer op.mu.RUnlock()
return op.pin.Cid
c = op.pin.Cid
op.mu.RUnlock()
return c
}
// Context returns the context associated to this operation.
@ -117,48 +119,55 @@ func (op *Operation) Context() context.Context {
// Cancel will cancel the context associated to this operation.
func (op *Operation) Cancel() {
ctx, span := trace.StartSpan(op.ctx, "optracker/Cancel")
_ = ctx
defer span.End()
_, span := trace.StartSpan(op.ctx, "optracker/Cancel")
op.cancel()
span.End()
}
// Phase returns the Phase.
func (op *Operation) Phase() Phase {
var ph Phase
op.mu.RLock()
defer op.mu.RUnlock()
return op.phase
ph = op.phase
op.mu.RUnlock()
return ph
}
// SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) {
ctx, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
_ = ctx
defer span.End()
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
op.mu.Lock()
defer op.mu.Unlock()
op.phase = ph
op.ts = time.Now()
{
op.phase = ph
op.ts = time.Now()
}
op.mu.Unlock()
span.End()
}
// Error returns any error message attached to the operation.
func (op *Operation) Error() string {
var err string
op.mu.RLock()
defer op.mu.RUnlock()
return op.error
err = op.error
op.mu.RUnlock()
return err
}
// SetError sets the phase to PhaseError along with
// an error message. It updates the timestamp.
func (op *Operation) SetError(err error) {
ctx, span := trace.StartSpan(op.ctx, "optracker/SetError")
_ = ctx
defer span.End()
_, span := trace.StartSpan(op.ctx, "optracker/SetError")
op.mu.Lock()
defer op.mu.Unlock()
op.phase = PhaseError
op.error = err.Error()
op.ts = time.Now()
{
op.phase = PhaseError
op.error = err.Error()
op.ts = time.Now()
}
op.mu.Unlock()
span.End()
}
// Type returns the operation Type.
@ -174,9 +183,11 @@ func (op *Operation) Pin() *api.Pin {
// Timestamp returns the time when this operation was
// last modified (phase changed, error was set...).
func (op *Operation) Timestamp() time.Time {
var ts time.Time
op.mu.RLock()
defer op.mu.RUnlock()
return op.ts
ts = op.ts
op.mu.RUnlock()
return ts
}
// Cancelled returns whether the context for this

View File

@ -206,24 +206,6 @@ func (opt *OperationTracker) GetAll(ctx context.Context) []*api.PinInfo {
return pinfos
}
// CleanError removes the associated Operation, if it is
// in PhaseError.
func (opt *OperationTracker) CleanError(ctx context.Context, c cid.Cid) {
opt.mu.RLock()
defer opt.mu.RUnlock()
errop, ok := opt.operations[c.String()]
if !ok {
return
}
if errop.Phase() != PhaseError {
return
}
opt.Clean(ctx, errop)
return
}
// CleanAllDone deletes any operation from the tracker that is in PhaseDone.
func (opt *OperationTracker) CleanAllDone(ctx context.Context) {
opt.mu.Lock()

View File

@ -11,8 +11,10 @@ import (
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/datastore/inmem"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/dsstate"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
@ -31,17 +33,12 @@ var (
}
)
type mockCluster struct{}
type mockIPFS struct{}
func mockRPCClient(t testing.TB) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("Cluster", &mockCluster{})
if err != nil {
t.Fatal(err)
}
err = s.RegisterName("IPFSConnector", &mockIPFS{})
err := s.RegisterName("IPFSConnector", &mockIPFS{})
if err != nil {
t.Fatal(err)
}
@ -90,64 +87,58 @@ func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.
return nil
}
func (mock *mockCluster) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) error {
*out = []*api.Pin{
api.PinWithOpts(test.Cid1, pinOpts),
api.PinWithOpts(test.Cid3, pinOpts),
}
return nil
}
func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error {
switch in.String() {
case test.ErrorCid.String():
return errors.New("expected error when using ErrorCid")
case test.Cid1.String(), test.Cid2.String():
pin := api.PinWithOpts(in, pinOpts)
*out = *pin
return nil
}
pin := api.PinCid(in)
*out = *pin
return nil
}
var sortPinInfoByCid = func(p []*api.PinInfo) {
sort.Slice(p, func(i, j int) bool {
return p[i].Cid.String() < p[j].Cid.String()
})
}
func testSlowMapPinTracker(t testing.TB) *maptracker.MapPinTracker {
cfg := &maptracker.Config{}
cfg.Default()
cfg.ConcurrentPins = 1
mpt := maptracker.NewMapPinTracker(cfg, test.PeerID1, test.PeerName1)
mpt.SetClient(mockRPCClient(t))
return mpt
}
// prefilledState return a state instance with some pins.
func prefilledState(context.Context) (state.ReadOnly, error) {
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
return nil, err
}
func testMapPinTracker(t testing.TB) *maptracker.MapPinTracker {
cfg := &maptracker.Config{}
cfg.Default()
cfg.ConcurrentPins = 1
mpt := maptracker.NewMapPinTracker(cfg, test.PeerID1, test.PeerName1)
mpt.SetClient(test.NewMockRPCClient(t))
return mpt
remote := api.PinWithOpts(test.Cid4, api.PinOptions{
ReplicationFactorMax: 1,
ReplicationFactorMin: 1,
})
remote.Allocations = []peer.ID{test.PeerID2}
pins := []*api.Pin{
api.PinWithOpts(test.Cid1, pinOpts),
api.PinCid(test.Cid2),
api.PinWithOpts(test.Cid3, pinOpts),
remote,
}
ctx := context.Background()
for _, pin := range pins {
err = st.Add(ctx, pin)
if err != nil {
return nil, err
}
}
return st, nil
}
func testSlowStatelessPinTracker(t testing.TB) *stateless.Tracker {
t.Helper()
cfg := &stateless.Config{}
cfg.Default()
mpt := stateless.New(cfg, test.PeerID1, test.PeerName1)
mpt.SetClient(mockRPCClient(t))
return mpt
spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState)
spt.SetClient(mockRPCClient(t))
return spt
}
func testStatelessPinTracker(t testing.TB) *stateless.Tracker {
t.Helper()
cfg := &stateless.Config{}
cfg.Default()
spt := stateless.New(cfg, test.PeerID1, test.PeerName1)
spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState)
spt.SetClient(test.NewMockRPCClient(t))
return spt
}
@ -170,14 +161,6 @@ func TestPinTracker_Track(t *testing.T) {
},
false,
},
{
"basic map track",
args{
api.PinWithOpts(test.Cid1, pinOpts),
testMapPinTracker(t),
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -204,13 +187,6 @@ func BenchmarkPinTracker_Track(b *testing.B) {
testStatelessPinTracker(b),
},
},
{
"basic map track",
args{
api.PinWithOpts(test.Cid1, pinOpts),
testMapPinTracker(b),
},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
@ -242,14 +218,6 @@ func TestPinTracker_Untrack(t *testing.T) {
},
false,
},
{
"basic map untrack",
args{
test.Cid1,
testMapPinTracker(t),
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -289,18 +257,9 @@ func TestPinTracker_StatusAll(t *testing.T) {
Cid: test.Cid3,
Status: api.TrackerStatusPinned,
},
},
},
{
"basic map statusall",
args{
api.PinWithOpts(test.Cid1, pinOpts),
testMapPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
Cid: test.Cid4,
Status: api.TrackerStatusRemote,
},
},
},
@ -315,18 +274,13 @@ func TestPinTracker_StatusAll(t *testing.T) {
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
},
},
{
"slow map statusall",
args{
api.PinWithOpts(test.Cid1, pinOpts),
testSlowMapPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
Cid: test.Cid2,
Status: api.TrackerStatusRemote,
},
{
Cid: test.Cid4,
Status: api.TrackerStatusRemote,
},
},
},
@ -375,12 +329,6 @@ func BenchmarkPinTracker_StatusAll(b *testing.B) {
testStatelessPinTracker(b),
},
},
{
"basic map track",
args{
testMapPinTracker(b),
},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
@ -413,40 +361,17 @@ func TestPinTracker_Status(t *testing.T) {
Status: api.TrackerStatusPinned,
},
},
{
"basic map status",
args{
test.Cid1,
testMapPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
},
{
"basic stateless status/unpinned",
args{
test.Cid4,
test.Cid5,
testStatelessPinTracker(t),
},
api.PinInfo{
Cid: test.Cid4,
Cid: test.Cid5,
Status: api.TrackerStatusUnpinned,
},
},
{
"basic map status/unpinned",
args{
test.Cid4,
testMapPinTracker(t),
},
api.PinInfo{
Cid: test.Cid4,
Status: api.TrackerStatusUnpinned,
},
},
{
"slow stateless status",
args{
@ -458,31 +383,9 @@ func TestPinTracker_Status(t *testing.T) {
Status: api.TrackerStatusPinned,
},
},
{
"slow map status",
args{
test.Cid1,
testSlowMapPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch tt.args.tracker.(type) {
case *maptracker.MapPinTracker:
// the Track preps the internal map of the MapPinTracker
// not required by the Stateless impl
pin := api.PinWithOpts(test.Cid1, pinOpts)
if err := tt.args.tracker.Track(context.Background(), pin); err != nil {
t.Errorf("PinTracker.Track() error = %v", err)
}
time.Sleep(1 * time.Second)
}
got := tt.args.tracker.Status(context.Background(), tt.args.c)
if got.Cid.String() != tt.want.Cid.String() {
@ -496,231 +399,9 @@ func TestPinTracker_Status(t *testing.T) {
}
}
func TestPinTracker_SyncAll(t *testing.T) {
type args struct {
cs []cid.Cid
tracker ipfscluster.PinTracker
}
tests := []struct {
name string
args args
want []*api.PinInfo
wantErr bool
}{
{
"basic stateless syncall",
args{
[]cid.Cid{
test.Cid1,
test.Cid2,
},
testStatelessPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusPinned,
},
},
false,
},
{
"basic map syncall",
args{
[]cid.Cid{
test.Cid1,
test.Cid2,
},
testMapPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusPinned,
},
},
false,
},
{
"slow stateless syncall",
args{
[]cid.Cid{
test.Cid1,
test.Cid2,
},
testSlowStatelessPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusPinned,
},
},
false,
},
{
"slow map syncall",
args{
[]cid.Cid{
test.Cid1,
test.Cid2,
},
testSlowMapPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusPinned,
},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.args.tracker.SyncAll(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr)
return
}
if len(got) != 0 {
t.Fatalf("should not have synced anything when it tracks nothing")
}
for _, c := range tt.args.cs {
err := tt.args.tracker.Track(context.Background(), api.PinWithOpts(c, pinOpts))
if err != nil {
t.Fatal(err)
}
}
sortPinInfoByCid(got)
sortPinInfoByCid(tt.want)
for i := range got {
if got[i].Cid.String() != tt.want[i].Cid.String() {
t.Errorf("PinTracker.SyncAll() = %v, want %v", got, tt.want)
}
if got[i].Status != tt.want[i].Status {
t.Errorf("PinTracker.SyncAll() = %v, want %v", got, tt.want)
}
}
})
}
}
func TestPinTracker_Sync(t *testing.T) {
type args struct {
c cid.Cid
tracker ipfscluster.PinTracker
}
tests := []struct {
name string
args args
want api.PinInfo
wantErr bool
}{
{
"basic stateless sync",
args{
test.Cid1,
testStatelessPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
false,
},
{
"basic map sync",
args{
test.Cid1,
testMapPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
false,
},
{
"slow stateless sync",
args{
test.Cid1,
testSlowStatelessPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
false,
},
{
"slow map sync",
args{
test.Cid1,
testSlowMapPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch tt.args.tracker.(type) {
case *maptracker.MapPinTracker:
// the Track preps the internal map of the MapPinTracker; not required by the Stateless impl
pin := api.PinWithOpts(test.Cid1, pinOpts)
if err := tt.args.tracker.Track(context.Background(), pin); err != nil {
t.Errorf("PinTracker.Track() error = %v", err)
}
time.Sleep(1 * time.Second)
}
got, err := tt.args.tracker.Sync(context.Background(), tt.args.c)
if (err != nil) != tt.wantErr {
t.Errorf("PinTracker.Sync() error = %v, wantErr %v", err, tt.wantErr)
return
}
t.Logf("got: %+v\n", got)
if got.Cid.String() != tt.want.Cid.String() {
t.Errorf("PinTracker.Sync() = %v, want %v", got.Cid.String(), tt.want.Cid.String())
}
if got.Status != tt.want.Status {
t.Errorf("PinTracker.Sync() = %v, want %v", got.Status, tt.want.Status)
}
})
}
}
func TestPinTracker_RecoverAll(t *testing.T) {
type args struct {
tracker ipfscluster.PinTracker
pin *api.Pin // only used by maptracker
}
tests := []struct {
name string
@ -732,7 +413,6 @@ func TestPinTracker_RecoverAll(t *testing.T) {
"basic stateless recoverall",
args{
testStatelessPinTracker(t),
&api.Pin{},
},
[]*api.PinInfo{
{
@ -747,19 +427,9 @@ func TestPinTracker_RecoverAll(t *testing.T) {
Cid: test.Cid3,
Status: api.TrackerStatusPinned,
},
},
false,
},
{
"basic map recoverall",
args{
testMapPinTracker(t),
api.PinWithOpts(test.Cid1, pinOpts),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
Cid: test.Cid4,
Status: api.TrackerStatusRemote,
},
},
false,
@ -767,15 +437,6 @@ func TestPinTracker_RecoverAll(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch tt.args.tracker.(type) {
case *maptracker.MapPinTracker:
// the Track preps the internal map of the MapPinTracker; not required by the Stateless impl
if err := tt.args.tracker.Track(context.Background(), tt.args.pin); err != nil {
t.Errorf("PinTracker.Track() error = %v", err)
}
time.Sleep(1 * time.Second)
}
got, err := tt.args.tracker.RecoverAll(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("PinTracker.RecoverAll() error = %v, wantErr %v", err, tt.wantErr)
@ -829,18 +490,6 @@ func TestPinTracker_Recover(t *testing.T) {
},
false,
},
{
"basic map recover",
args{
test.Cid1,
testMapPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -880,18 +529,6 @@ func TestUntrackTrack(t *testing.T) {
},
false,
},
{
"basic map untrack track",
args{
test.Cid1,
testMapPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -933,18 +570,6 @@ func TestTrackUntrackWithCancel(t *testing.T) {
},
false,
},
{
"slow map tracker untrack w/ cancel",
args{
test.SlowCid1,
testSlowMapPinTracker(t),
},
api.PinInfo{
Cid: test.SlowCid1,
Status: api.TrackerStatusPinned,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -970,15 +595,13 @@ func TestTrackUntrackWithCancel(t *testing.T) {
}()
var ctx context.Context
switch trkr := tt.args.tracker.(type) {
case *maptracker.MapPinTracker:
ctx = trkr.OpContext(context.Background(), tt.args.c)
case *stateless.Tracker:
ctx = trkr.OpContext(context.Background(), tt.args.c)
}
select {
case <-ctx.Done():
return
case <-time.Tick(100 * time.Millisecond):
case <-time.Tick(150 * time.Millisecond):
t.Errorf("operation context should have been cancelled by now")
}
} else {
@ -1003,34 +626,14 @@ func TestPinTracker_RemoteIgnoresError(t *testing.T) {
t.Fatal(err)
}
// Sync triggers IPFSPinLs which will return an error
// (see mock)
pi, err := pt.Sync(ctx, remoteCid)
if err != nil {
t.Fatal(err)
}
if pi.Status != api.TrackerStatusRemote || pi.Error != "" {
t.Error("Remote pin should not be in error")
}
pi = pt.Status(ctx, remoteCid)
if err != nil {
t.Fatal(err)
}
pi := pt.Status(ctx, remoteCid)
if pi.Status != api.TrackerStatusRemote || pi.Error != "" {
t.Error("Remote pin should not be in error")
}
}
t.Run("basic pintracker", func(t *testing.T) {
pt := testMapPinTracker(t)
testF(t, pt)
})
t.Run("stateless pintracker", func(t *testing.T) {
pt := testStatelessPinTracker(t)
pt := testSlowStatelessPinTracker(t)
testF(t, pt)
})
}

View File

@ -11,7 +11,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pintracker/optracker"
"github.com/ipfs/ipfs-cluster/pintracker/util"
"github.com/ipfs/ipfs-cluster/state"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
@ -23,6 +23,11 @@ import (
var logger = logging.Logger("pintracker")
var (
// ErrFullQueue is the error used when pin or unpin operation channel is full.
ErrFullQueue = errors.New("pin/unpin operation queue is full. Try increasing max_pin_queue_size")
)
// Tracker uses the optracker.OperationTracker to manage
// transitioning shared ipfs-cluster state (Pins) to the local IPFS node.
type Tracker struct {
@ -30,11 +35,14 @@ type Tracker struct {
optracker *optracker.OperationTracker
peerID peer.ID
peerID peer.ID
peerName string
ctx context.Context
cancel func()
getState func(ctx context.Context) (state.ReadOnly, error)
rpcClient *rpc.Client
rpcReady chan struct{}
@ -47,14 +55,16 @@ type Tracker struct {
}
// New creates a new StatelessPinTracker.
func New(cfg *Config, pid peer.ID, peerName string) *Tracker {
func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Context) (state.ReadOnly, error)) *Tracker {
ctx, cancel := context.WithCancel(context.Background())
spt := &Tracker{
config: cfg,
peerID: pid,
peerName: peerName,
ctx: ctx,
cancel: cancel,
getState: getState,
optracker: optracker.NewOperationTracker(ctx, pid, peerName),
rpcReady: make(chan struct{}, 1),
pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
@ -71,13 +81,8 @@ func New(cfg *Config, pid peer.ID, peerName string) *Tracker {
// receives a pin Function (pin or unpin) and a channel.
// Used for both pinning and unpinning
func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) {
logger.Debug("entering opworker")
ticker := time.NewTicker(10 * time.Second) //TODO(ajl): make config var
for {
select {
case <-ticker.C:
// every tick, clear out all Done operations
spt.optracker.CleanAllDone(spt.ctx)
case op := <-opChan:
if cont := applyPinF(pinF, op); cont {
continue
@ -160,7 +165,7 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera
logger.Debugf("entering enqueue: pin: %+v", c)
op := spt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued)
if op == nil {
return nil // ongoing pin operation.
return nil // ongoing operation.
}
var ch chan *optracker.Operation
@ -170,14 +175,12 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera
ch = spt.pinCh
case optracker.OperationUnpin:
ch = spt.unpinCh
default:
return errors.New("operation doesn't have a associated channel")
}
select {
case ch <- op:
default:
err := util.ErrFullQueue
err := ErrFullQueue
op.SetError(err)
op.Cancel()
logger.Error(err.Error())
@ -226,9 +229,8 @@ func (spt *Tracker) Track(ctx context.Context, c *api.Pin) error {
// Sharded pins are never pinned. A sharded pin cannot turn into
// something else or viceversa like it happens with Remote pins so
// we just track them.
// we just ignore them.
if c.Type == api.MetaType {
spt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone)
return nil
}
@ -244,9 +246,11 @@ func (spt *Tracker) Track(ctx context.Context, c *api.Pin) error {
op.Cancel()
if err != nil {
op.SetError(err)
} else {
op.SetPhase(optracker.PhaseDone)
return nil
}
op.SetPhase(optracker.PhaseDone)
spt.optracker.Clean(ctx, op)
return nil
}
@ -270,13 +274,11 @@ func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo {
pininfos, err := spt.localStatus(ctx, true)
if err != nil {
logger.Error(err)
return nil
}
// get all inflight operations from optracker and
// put them into the map, deduplicating any already 'pinned' items with
// their inflight operation
// get all inflight operations from optracker and put them into the
// map, deduplicating any existing items with their inflight operation.
for _, infop := range spt.optracker.GetAll(ctx) {
pininfos[infop.Cid.String()] = infop
}
@ -299,59 +301,50 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
return oppi
}
pinInfo := &api.PinInfo{
Cid: c,
Peer: spt.peerID,
PeerName: spt.peerName,
TS: time.Now(),
}
// check global state to see if cluster should even be caring about
// the provided cid
var gpin api.Pin
err := spt.rpcClient.Call(
"",
"Cluster",
"PinGet",
c,
&gpin,
)
gpin := &api.Pin{}
st, err := spt.getState(ctx)
if err != nil {
if rpc.IsRPCError(err) {
logger.Error(err)
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusClusterError,
Error: err.Error(),
TS: time.Now(),
}
}
// not part of global state. we should not care about
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
}
logger.Error(err)
addError(pinInfo, err)
return pinInfo
}
gpin, err = st.Get(ctx, c)
if err == state.ErrNotFound {
pinInfo.Status = api.TrackerStatusUnpinned
return pinInfo
}
if err != nil {
logger.Error(err)
addError(pinInfo, err)
return pinInfo
}
// check if pin is a meta pin
if gpin.Type == api.MetaType {
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusSharded,
TS: time.Now(),
}
pinInfo.Status = api.TrackerStatusSharded
return pinInfo
}
// check if pin is a remote pin
if gpin.IsRemotePin(spt.peerID) {
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusRemote,
TS: time.Now(),
}
pinInfo.Status = api.TrackerStatusRemote
return pinInfo
}
// else attempt to get status from ipfs node
var ips api.IPFSPinStatus
err = spt.rpcClient.Call(
err = spt.rpcClient.CallContext(
ctx,
"",
"IPFSConnector",
"PinLsCid",
@ -360,137 +353,12 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
)
if err != nil {
logger.Error(err)
return nil
addError(pinInfo, err)
return pinInfo
}
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
}
}
// SyncAll verifies that the statuses of all tracked Cids (from the shared state)
// match the one reported by the IPFS daemon. If not, they will be transitioned
// to PinError or UnpinError.
//
// SyncAll returns the list of local status for all tracked Cids which
// were updated or have errors. Cids in error states can be recovered
// with Recover().
// An error is returned if we are unable to contact the IPFS daemon.
func (spt *Tracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/SyncAll")
defer span.End()
// get ipfs status for all
localpis, err := spt.localStatus(ctx, false)
if err != nil {
logger.Error(err)
return nil, err
}
for _, p := range spt.optracker.Filter(ctx, optracker.OperationPin, optracker.PhaseError) {
if _, ok := localpis[p.Cid.String()]; ok {
spt.optracker.CleanError(ctx, p.Cid)
}
}
for _, p := range spt.optracker.Filter(ctx, optracker.OperationUnpin, optracker.PhaseError) {
if _, ok := localpis[p.Cid.String()]; !ok {
spt.optracker.CleanError(ctx, p.Cid)
}
}
return spt.getErrorsAll(ctx), nil
}
// Sync returns the updated local status for the given Cid.
func (spt *Tracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Sync")
defer span.End()
oppi, ok := spt.optracker.GetExists(ctx, c)
if !ok {
return spt.Status(ctx, c), nil
}
if oppi.Status == api.TrackerStatusUnpinError {
// check global state to see if cluster should even be caring about
// the provided cid
var gpin api.Pin
err := spt.rpcClient.Call(
"",
"Cluster",
"PinGet",
c,
&gpin,
)
if err != nil {
if rpc.IsRPCError(err) {
logger.Error(err)
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusClusterError,
Error: err.Error(),
TS: time.Now(),
}, err
}
// it isn't in the global state
spt.optracker.CleanError(ctx, c)
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
}, nil
}
// check if pin is a remote pin
if gpin.IsRemotePin(spt.peerID) {
spt.optracker.CleanError(ctx, c)
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusRemote,
TS: time.Now(),
}, nil
}
}
if oppi.Status == api.TrackerStatusPinError {
// else attempt to get status from ipfs node
var ips api.IPFSPinStatus
err := spt.rpcClient.Call(
"",
"IPFSConnector",
"PinLsCid",
c,
&ips,
)
if err != nil {
logger.Error(err)
return &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusPinError,
TS: time.Now(),
Error: err.Error(),
}, err
}
if ips.ToTrackerStatus() == api.TrackerStatusPinned {
spt.optracker.CleanError(ctx, c)
pi := &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
}
return pi, nil
}
}
return spt.optracker.Get(ctx, c), nil
pinInfo.Status = ips.ToTrackerStatus()
return pinInfo
}
// RecoverAll attempts to recover all items tracked by this peer.
@ -563,10 +431,11 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo,
continue
}
p := &api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
Cid: c,
Peer: spt.peerID,
PeerName: spt.peerName,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
}
pins[cidstr] = p
}
@ -575,23 +444,21 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo,
// localStatus returns a joint set of consensusState and ipfsStatus
// marking pins which should be meta or remote and leaving any ipfs pins that
// aren't in the consensusState out.
// aren't in the consensusState out. If incExtra is true, Remote and Sharded
// pins will be added to the status slice.
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus")
defer span.End()
pininfos := make(map[string]*api.PinInfo)
// get shared state
var statePins []*api.Pin
err := spt.rpcClient.CallContext(
ctx,
"",
"Cluster",
"Pins",
struct{}{},
&statePins,
)
statePins := []*api.Pin{}
st, err := spt.getState(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
statePins, err = st.List(ctx)
if err != nil {
logger.Error(err)
return nil, err
@ -604,32 +471,41 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]
return nil, err
}
pininfos := make(map[string]*api.PinInfo, len(statePins))
for _, p := range statePins {
pCid := p.Cid.String()
if p.Type == api.MetaType && incExtra {
// add pin to pininfos with sharded status
pininfos[pCid] = &api.PinInfo{
Cid: p.Cid,
Peer: spt.peerID,
Status: api.TrackerStatusSharded,
TS: time.Now(),
}
continue
ipfsInfo, pinnedInIpfs := localpis[pCid]
// base pinInfo object - status to be filled.
pinInfo := &api.PinInfo{
Cid: p.Cid,
Peer: spt.peerID,
PeerName: spt.peerName,
TS: time.Now(),
}
if p.IsRemotePin(spt.peerID) && incExtra {
// add pin to pininfos with a status of remote
pininfos[pCid] = &api.PinInfo{
Cid: p.Cid,
Peer: spt.peerID,
Status: api.TrackerStatusRemote,
TS: time.Now(),
switch {
case p.Type == api.MetaType:
pinInfo.Status = api.TrackerStatusSharded
if incExtra {
pininfos[pCid] = pinInfo
}
continue
}
// lookup p in localpis
if lp, ok := localpis[pCid]; ok {
pininfos[pCid] = lp
case p.IsRemotePin(spt.peerID):
pinInfo.Status = api.TrackerStatusRemote
if incExtra {
pininfos[pCid] = pinInfo
}
case pinnedInIpfs:
pininfos[pCid] = ipfsInfo
default:
// report as undefined for this peer. this will be
// overwritten if the operation tracker has more info
// for this. Otherwise, this is a problem: a pin in
// the state that should be pinned by this peer but
// which no operation is handling.
// TODO (hector): Consider a pinError so it can be
// recovered?
pinInfo.Status = api.TrackerStatusUndefined
}
}
return pininfos, nil
@ -644,3 +520,8 @@ func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo {
func (spt *Tracker) OpContext(ctx context.Context, c cid.Cid) context.Context {
return spt.optracker.OpContext(ctx, c)
}
func addError(pinInfo *api.PinInfo, err error) {
pinInfo.Error = err.Error()
pinInfo.Status = api.TrackerStatusClusterError
}

View File

@ -8,6 +8,9 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/datastore/inmem"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/dsstate"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
@ -25,18 +28,13 @@ var (
}
)
type mockCluster struct{}
type mockIPFS struct{}
func mockRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("Cluster", &mockCluster{})
if err != nil {
t.Fatal(err)
}
err = s.RegisterName("IPFSConnector", &mockIPFS{})
err := s.RegisterName("IPFSConnector", &mockIPFS{})
if err != nil {
t.Fatal(err)
}
@ -81,40 +79,38 @@ func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPin
return nil
}
func (mock *mockCluster) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) error {
*out = []*api.Pin{
api.PinWithOpts(test.Cid1, pinOpts),
api.PinWithOpts(test.Cid3, pinOpts),
}
return nil
}
func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error {
switch in.String() {
case test.ErrorCid.String():
return errors.New("expected error when using ErrorCid")
case test.Cid1.String(), test.Cid2.String():
*out = *api.PinWithOpts(in, pinOpts)
return nil
default:
return errors.New("not found")
}
}
func testSlowStatelessPinTracker(t *testing.T) *Tracker {
t.Helper()
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
mpt := New(cfg, test.PeerID1, test.PeerName1)
mpt.SetClient(mockRPCClient(t))
return mpt
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
getState := func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
spt := New(cfg, test.PeerID1, test.PeerName1, getState)
spt.SetClient(mockRPCClient(t))
return spt
}
func testStatelessPinTracker(t testing.TB) *Tracker {
t.Helper()
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
spt := New(cfg, test.PeerID1, test.PeerName1)
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
getState := func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
spt := New(cfg, test.PeerID1, test.PeerName1, getState)
spt.SetClient(test.NewMockRPCClient(t))
return spt
}
@ -369,102 +365,6 @@ var sortPinInfoByCid = func(p []*api.PinInfo) {
})
}
func TestStatelessTracker_SyncAll(t *testing.T) {
type args struct {
cs []cid.Cid
tracker *Tracker
}
tests := []struct {
name string
args args
want []*api.PinInfo
wantErr bool
}{
{
"basic stateless syncall",
args{
[]cid.Cid{
test.Cid1,
test.Cid2,
},
testStatelessPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusPinned,
},
},
false,
},
{
"slow stateless syncall",
args{
[]cid.Cid{
test.Cid1,
test.Cid2,
},
testSlowStatelessPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusPinned,
},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.args.tracker.SyncAll(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr)
return
}
if len(got) != 0 {
t.Fatalf("should not have synced anything when it tracks nothing")
}
for _, c := range tt.args.cs {
err := tt.args.tracker.Track(context.Background(), api.PinWithOpts(c, pinOpts))
if err != nil {
t.Fatal(err)
}
tt.args.tracker.optracker.SetError(context.Background(), c, errors.New("test error"))
}
got, err = tt.args.tracker.SyncAll(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr)
return
}
sortPinInfoByCid(got)
sortPinInfoByCid(tt.want)
for i := range got {
if got[i].Cid.String() != tt.want[i].Cid.String() {
t.Errorf("got: %v\n want %v", got[i].Cid.String(), tt.want[i].Cid.String())
}
if got[i].Status != tt.want[i].Status {
t.Errorf("got: %v\n want %v", got[i].Status, tt.want[i].Status)
}
}
})
}
}
func BenchmarkTracker_localStatus(b *testing.B) {
tracker := testStatelessPinTracker(b)
b.ResetTimer()

View File

@ -1,29 +0,0 @@
package util
import (
"errors"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
)
var (
// ErrFullQueue is the error used when pin or unpin operation channel is full.
ErrFullQueue = errors.New("pin/unpin operation queue is full (too many operations), increasing max_pin_queue_size would help")
)
// IsRemotePin determines whether a Pin's ReplicationFactor has
// been met, so as to either pin or unpin it from the peer.
func IsRemotePin(c *api.Pin, pid peer.ID) bool {
if c.ReplicationFactorMax < 0 {
return false
}
for _, p := range c.Allocations {
if p == pid {
return false
}
}
return true
}

View File

@ -295,46 +295,6 @@ func (rpcapi *ClusterRPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *a
return nil
}
// SyncAll runs Cluster.SyncAll().
func (rpcapi *ClusterRPCAPI) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
pinfos, err := rpcapi.c.SyncAll(ctx)
if err != nil {
return err
}
*out = pinfos
return nil
}
// SyncAllLocal runs Cluster.SyncAllLocal().
func (rpcapi *ClusterRPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
pinfos, err := rpcapi.c.SyncAllLocal(ctx)
if err != nil {
return err
}
*out = pinfos
return nil
}
// Sync runs Cluster.Sync().
func (rpcapi *ClusterRPCAPI) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
pinfo, err := rpcapi.c.Sync(ctx, in)
if err != nil {
return err
}
*out = *pinfo
return nil
}
// SyncLocal runs Cluster.SyncLocal().
func (rpcapi *ClusterRPCAPI) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
pinfo, err := rpcapi.c.SyncLocal(ctx, in)
if err != nil {
return err
}
*out = *pinfo
return nil
}
// RecoverAll runs Cluster.RecoverAll().
func (rpcapi *ClusterRPCAPI) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
pinfos, err := rpcapi.c.RecoverAll(ctx)

View File

@ -31,10 +31,6 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.StatusAll": RPCClosed,
"Cluster.StatusAllLocal": RPCClosed,
"Cluster.StatusLocal": RPCClosed,
"Cluster.Sync": RPCClosed,
"Cluster.SyncAll": RPCClosed,
"Cluster.SyncAllLocal": RPCTrusted, // Called in broadcast from SyncAll()
"Cluster.SyncLocal": RPCTrusted, // Called in broadcast from Sync()
"Cluster.Unpin": RPCClosed,
"Cluster.UnpinPath": RPCClosed,
"Cluster.Version": RPCOpen,

View File

@ -27,8 +27,6 @@ var comments = map[string]string{
"Cluster.PeerAdd": "Used by Join()",
"Cluster.Peers": "Used by ConnectGraph()",
"Cluster.Pins": "Used in stateless tracker, ipfsproxy, restapi",
"Cluster.SyncAllLocal": "Called in broadcast from SyncAll()",
"Cluster.SyncLocal": "Called in broadcast from Sync()",
"PinTracker.Recover": "Called in broadcast from Recover()",
"PinTracker.RecoverAll": "Broadcast in RecoverAll unimplemented",
"Pintracker.Status": "Called in broadcast from Status()",

View File

@ -5,7 +5,6 @@
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
@ -69,10 +68,6 @@
}
},
"pin_tracker": {
"maptracker": {
"max_pin_queue_size": 50000,
"concurrent_pins": 10
},
"stateless": {
"max_pin_queue_size": 50000,
"concurrent_pins": 10

View File

@ -5,7 +5,6 @@
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
@ -69,10 +68,6 @@
}
},
"pin_tracker": {
"maptracker": {
"max_pin_queue_size": 50000,
"concurrent_pins": 10
},
"stateless": {
"max_pin_queue_size": 50000,
"concurrent_pins": 10

View File

@ -7,7 +7,6 @@
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
@ -68,10 +67,6 @@
}
},
"pin_tracker": {
"maptracker": {
"max_pin_queue_size": 50000,
"concurrent_pins": 10
},
"stateless": {
"max_pin_queue_size": 50000,
"concurrent_pins": 10

View File

@ -39,7 +39,6 @@ test_expect_success "cluster-ctl commands output looks good" '
egrep -q "ipfs-cluster-ctl peers" commands.txt &&
egrep -q "ipfs-cluster-ctl pin" commands.txt &&
egrep -q "ipfs-cluster-ctl status" commands.txt &&
egrep -q "ipfs-cluster-ctl sync" commands.txt &&
egrep -q "ipfs-cluster-ctl recover" commands.txt &&
egrep -q "ipfs-cluster-ctl version" commands.txt &&
egrep -q "ipfs-cluster-ctl commands" commands.txt

View File

@ -41,10 +41,6 @@ test_expect_success IPFS,CLUSTER "invalid CID status" '
test_must_fail ipfs-cluster-ctl status XXXinvalid-CIDXXX
'
test_expect_success IPFS,CLUSTER "empty cluster-ctl sync succeeds" '
ipfs-cluster-ctl sync
'
test_expect_success IPFS,CLUSTER "empty cluster_ctl recover should not fail" '
ipfs-cluster-ctl recover
'

View File

@ -11,6 +11,7 @@ var (
Cid2, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma")
Cid3, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb")
Cid4, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuc57")
Cid5, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd")
Cid4Data = "Cid4Data" // Cid resulting from block put NOT ipfs add
SlowCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd")
CidResolved, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc")

View File

@ -285,22 +285,6 @@ func (mock *mockCluster) StatusLocal(ctx context.Context, in cid.Cid, out *api.P
return (&mockPinTracker{}).Status(ctx, in, out)
}
func (mock *mockCluster) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
return mock.StatusAll(ctx, in, out)
}
func (mock *mockCluster) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
return mock.StatusAllLocal(ctx, in, out)
}
func (mock *mockCluster) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
return mock.Status(ctx, in, out)
}
func (mock *mockCluster) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
return mock.StatusLocal(ctx, in, out)
}
func (mock *mockCluster) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
return mock.StatusAll(ctx, in, out)
}