Separate recover() from sync()
This includes adding a new API endpoint, CLI command. I have also changed some api endpoints. I find: POST /pins/<cid>/sync POST /pins/<cid>/recover GET /pins/<cid> GET /pins better. The problem is makes the pin list /pinlist but it general its more consistent. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
2566a0cca5
commit
7f9cb0b269
96
cluster.go
96
cluster.go
|
@ -165,7 +165,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
|
|||
|
||||
// Track items which are not tracked
|
||||
for _, h := range clusterPins {
|
||||
if c.tracker.StatusCid(h).Status == TrackerStatusUnpinned {
|
||||
if c.tracker.Status(h).Status == TrackerStatusUnpinned {
|
||||
changed = append(changed, h)
|
||||
err := c.rpcClient.Go("",
|
||||
"Cluster",
|
||||
|
@ -181,7 +181,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
|
|||
}
|
||||
|
||||
// Untrack items which should not be tracked
|
||||
for _, p := range c.tracker.Status() {
|
||||
for _, p := range c.tracker.StatusAll() {
|
||||
h, _ := cid.Decode(p.CidStr)
|
||||
if !cState.HasPin(h) {
|
||||
changed = append(changed, h)
|
||||
|
@ -199,85 +199,75 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
|
|||
|
||||
var infos []PinInfo
|
||||
for _, h := range changed {
|
||||
infos = append(infos, c.tracker.StatusCid(h))
|
||||
infos = append(infos, c.tracker.Status(h))
|
||||
}
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// Status returns the GlobalPinInfo for all tracked Cids. If an error happens,
|
||||
// the slice will contain as much information as could be fetched.
|
||||
func (c *Cluster) Status() ([]GlobalPinInfo, error) {
|
||||
return c.globalPinInfoSlice("TrackerStatus")
|
||||
// StatusAll returns the GlobalPinInfo for all tracked Cids. If an error
|
||||
// happens, the slice will contain as much information as could be fetched.
|
||||
func (c *Cluster) StatusAll() ([]GlobalPinInfo, error) {
|
||||
return c.globalPinInfoSlice("TrackerStatusAll")
|
||||
}
|
||||
|
||||
// StatusCid returns the GlobalPinInfo for a given Cid. If an error happens,
|
||||
// Status returns the GlobalPinInfo for a given Cid. If an error happens,
|
||||
// the GlobalPinInfo should contain as much information as could be fetched.
|
||||
func (c *Cluster) StatusCid(h *cid.Cid) (GlobalPinInfo, error) {
|
||||
return c.globalPinInfoCid("TrackerStatusCid", h)
|
||||
func (c *Cluster) Status(h *cid.Cid) (GlobalPinInfo, error) {
|
||||
return c.globalPinInfoCid("TrackerStatus", h)
|
||||
}
|
||||
|
||||
// LocalSync makes sure that the current state the Tracker matches
|
||||
// the IPFS daemon state by triggering a Tracker.Sync() and Recover()
|
||||
// on all items that need it. Returns PinInfo for items changed on Sync().
|
||||
// SyncAllLocal makes sure that the current state for all tracked items
|
||||
// matches the state reported by the IPFS daemon.
|
||||
//
|
||||
// LocalSync triggers recoveries asynchronously, and will not wait for
|
||||
// them to fail or succeed before returning. The PinInfo may not reflect
|
||||
// the recovery attempt.
|
||||
func (c *Cluster) LocalSync() ([]PinInfo, error) {
|
||||
syncedItems, err := c.tracker.Sync()
|
||||
// Despite errors, tracker provides synced items that we can work with.
|
||||
// However we skip recover() on those cases, as probably the ipfs daemon
|
||||
// is gone.
|
||||
// SyncAllLocal returns the list of PinInfo that where updated because of
|
||||
// the operation, along with those in error states.
|
||||
func (c *Cluster) SyncAllLocal() ([]PinInfo, error) {
|
||||
syncedItems, err := c.tracker.SyncAll()
|
||||
// Despite errors, tracker provides synced items that we can provide.
|
||||
// They encapsulate the error.
|
||||
if err != nil {
|
||||
logger.Error("tracker.Sync() returned with error: ", err)
|
||||
logger.Error("Is the ipfs daemon running?")
|
||||
logger.Error("LocalSync returning without attempting recovers")
|
||||
return syncedItems, nil
|
||||
}
|
||||
|
||||
// FIXME: at this point, recover probably deserves it's own api endpoint
|
||||
// or be optional or be synchronous.
|
||||
logger.Infof("%d items changed on sync", len(syncedItems))
|
||||
for _, pInfo := range syncedItems {
|
||||
pCid, _ := cid.Decode(pInfo.CidStr)
|
||||
go func(h *cid.Cid) {
|
||||
c.tracker.Recover(h)
|
||||
}(pCid)
|
||||
}
|
||||
return syncedItems, nil
|
||||
return syncedItems, err
|
||||
}
|
||||
|
||||
// LocalSyncCid performs a Tracker.Sync() operation followed by a
|
||||
// Recover() when needed. It returns the latest known PinInfo for the Cid.
|
||||
//
|
||||
// LocalSyncCid will wait for the Recover operation to fail or succeed before
|
||||
// returning.
|
||||
func (c *Cluster) LocalSyncCid(h *cid.Cid) (PinInfo, error) {
|
||||
// SyncLocal performs a local sync operation for the given Cid. This will
|
||||
// tell the tracker to verify the status of the Cid against the IPFS daemon.
|
||||
// It returns the updated PinInfo for the Cid.
|
||||
func (c *Cluster) SyncLocal(h *cid.Cid) (PinInfo, error) {
|
||||
var err error
|
||||
pInfo, err := c.tracker.SyncCid(h)
|
||||
pInfo, err := c.tracker.Sync(h)
|
||||
// Despite errors, trackers provides an updated PinInfo so
|
||||
// we just log it.
|
||||
if err != nil {
|
||||
logger.Error("tracker.SyncCid() returned with error: ", err)
|
||||
logger.Error("Is the ipfs daemon running?")
|
||||
return pInfo, nil
|
||||
}
|
||||
c.tracker.Recover(h)
|
||||
return c.tracker.StatusCid(h), nil
|
||||
return pInfo, err
|
||||
}
|
||||
|
||||
// GlobalSync triggers Sync() operations in all members of the Cluster.
|
||||
func (c *Cluster) GlobalSync() ([]GlobalPinInfo, error) {
|
||||
return c.globalPinInfoSlice("LocalSync")
|
||||
// SyncAll triggers LocalSync() operations in all members of the Cluster.
|
||||
func (c *Cluster) SyncAll() ([]GlobalPinInfo, error) {
|
||||
return c.globalPinInfoSlice("SyncAllLocal")
|
||||
}
|
||||
|
||||
// GlobalSyncCid triggers a LocalSyncCid() operation for a given Cid
|
||||
// Sync triggers a LocalSyncCid() operation for a given Cid
|
||||
// in all members of the Cluster.
|
||||
//
|
||||
// GlobalSyncCid will only return when all operations have either failed,
|
||||
// succeeded or timed-out.
|
||||
func (c *Cluster) GlobalSyncCid(h *cid.Cid) (GlobalPinInfo, error) {
|
||||
return c.globalPinInfoCid("LocalSyncCid", h)
|
||||
func (c *Cluster) Sync(h *cid.Cid) (GlobalPinInfo, error) {
|
||||
return c.globalPinInfoCid("SyncLocal", h)
|
||||
}
|
||||
|
||||
// RecoverLocal triggers a recover operation for a given Cid
|
||||
func (c *Cluster) RecoverLocal(h *cid.Cid) (PinInfo, error) {
|
||||
return c.tracker.Recover(h)
|
||||
}
|
||||
|
||||
// Recover triggers a recover operation for a given Cid in all
|
||||
// members of the Cluster.
|
||||
func (c *Cluster) Recover(h *cid.Cid) (GlobalPinInfo, error) {
|
||||
return c.globalPinInfoCid("TrackerRecover", h)
|
||||
}
|
||||
|
||||
// Pins returns the list of Cids managed by Cluster and which are part
|
||||
|
@ -315,7 +305,7 @@ func (c *Cluster) Pin(h *cid.Cid) error {
|
|||
// to the global state. Unpin does not reflect the success or failure
|
||||
// of underlying IPFS daemon unpinning operations.
|
||||
func (c *Cluster) Unpin(h *cid.Cid) error {
|
||||
logger.Info("pinning:", h)
|
||||
logger.Info("unpinning:", h)
|
||||
err := c.consensus.LogUnpin(h)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -163,7 +163,7 @@ in the cluster and should be part of the list offered by "pin ls".
|
|||
checkErr("parsing cid", err)
|
||||
request("POST", "/pins/"+cidStr)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
resp := request("GET", "/status/"+cidStr)
|
||||
resp := request("GET", "/pins/"+cidStr)
|
||||
formatResponse(resp)
|
||||
return nil
|
||||
},
|
||||
|
@ -186,7 +186,7 @@ although unpinning operations in the cluster may take longer or fail.
|
|||
checkErr("parsing cid", err)
|
||||
request("DELETE", "/pins/"+cidStr)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
resp := request("GET", "/status/"+cidStr)
|
||||
resp := request("GET", "/pins/"+cidStr)
|
||||
formatResponse(resp)
|
||||
return nil
|
||||
},
|
||||
|
@ -201,7 +201,7 @@ merely represents the list of pins which are part of the global state of
|
|||
the cluster. For specific information, use "status".
|
||||
`,
|
||||
Action: func(c *cli.Context) error {
|
||||
resp := request("GET", "/pins")
|
||||
resp := request("GET", "/pinlist")
|
||||
formatResponse(resp)
|
||||
return nil
|
||||
},
|
||||
|
@ -216,6 +216,9 @@ This command retrieves the status of the CIDs tracked by IPFS
|
|||
Cluster, including which member is pinning them and any errors.
|
||||
If a CID is provided, the status will be only fetched for a single
|
||||
item.
|
||||
|
||||
The status of a CID may not be accurate. A manual sync can be triggered
|
||||
with "sync".
|
||||
`,
|
||||
ArgsUsage: "[cid]",
|
||||
Action: func(c *cli.Context) error {
|
||||
|
@ -224,33 +227,67 @@ item.
|
|||
_, err := cid.Decode(cidStr)
|
||||
checkErr("parsing cid", err)
|
||||
}
|
||||
resp := request("GET", "/status/"+cidStr)
|
||||
resp := request("GET", "/pins/"+cidStr)
|
||||
formatResponse(resp)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "sync",
|
||||
Usage: "Sync status and/or recover tracked items",
|
||||
Usage: "Sync status of tracked items",
|
||||
UsageText: `
|
||||
This command verifies that the current status tracked CIDs are accurate by
|
||||
triggering queries to the IPFS daemons that pin them. When the CID is in
|
||||
error state, either because pinning or unpinning failed, IPFS Cluster will
|
||||
attempt to retry the operation. If a CID is provided, the sync and recover
|
||||
operations will be limited to that single item.
|
||||
This command asks Cluster peers to verify that the current status of tracked
|
||||
CIDs is accurate by triggering queries to the IPFS daemons that pin them.
|
||||
If a CID is provided, the sync and recover operations will be limited to
|
||||
that single item.
|
||||
|
||||
Unless providing a specific CID, the command will output only items which
|
||||
have changed status because of the sync or are in error state in some node,
|
||||
therefore, the output should be empty if no operations were performed.
|
||||
|
||||
CIDs in error state may be manually recovered with "recover".
|
||||
`,
|
||||
ArgsUsage: "[cid]",
|
||||
Action: func(c *cli.Context) error {
|
||||
cidStr := c.Args().First()
|
||||
var resp *http.Response
|
||||
if cidStr != "" {
|
||||
_, err := cid.Decode(cidStr)
|
||||
checkErr("parsing cid", err)
|
||||
resp = request("POST", "/pins/"+cidStr+"/sync")
|
||||
} else {
|
||||
resp = request("POST", "/pins/sync")
|
||||
}
|
||||
resp := request("POST", "/status/"+cidStr)
|
||||
formatResponse(resp)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "recover",
|
||||
Usage: "Recover tracked items in error state",
|
||||
UsageText: `
|
||||
This command asks Cluster peers to re-track or re-forget an item which is in
|
||||
error state, usually because the IPFS pin or unpin operation has failed.
|
||||
|
||||
The command will wait for any operations to succeed and will return the status
|
||||
of the item upon completion.
|
||||
`,
|
||||
ArgsUsage: "<cid>",
|
||||
Action: func(c *cli.Context) error {
|
||||
cidStr := c.Args().First()
|
||||
var resp *http.Response
|
||||
if cidStr != "" {
|
||||
_, err := cid.Decode(cidStr)
|
||||
checkErr("parsing cid", err)
|
||||
resp = request("POST", "/pins/"+cidStr+"/recover")
|
||||
formatResponse(resp)
|
||||
|
||||
} else {
|
||||
return cli.NewExitError("A CID is required", 1)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "version",
|
||||
Usage: "Retrieve cluster version",
|
||||
|
|
|
@ -187,16 +187,16 @@ type PinTracker interface {
|
|||
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
|
||||
// may perform an IPFS unpin operation.
|
||||
Untrack(*cid.Cid) error
|
||||
// Status returns the list of pins with their local status.
|
||||
Status() []PinInfo
|
||||
// StatusCid returns the local status of a given Cid.
|
||||
StatusCid(*cid.Cid) PinInfo
|
||||
// Sync makes sure that all tracked Cids reflect the real IPFS status.
|
||||
// StatusAll returns the list of pins with their local status.
|
||||
StatusAll() []PinInfo
|
||||
// Status returns the local status of a given Cid.
|
||||
Status(*cid.Cid) PinInfo
|
||||
// SyncAll makes sure that all tracked Cids reflect the real IPFS status.
|
||||
// It returns the list of pins which were updated by the call.
|
||||
Sync() ([]PinInfo, error)
|
||||
// SyncCid makes sure that the Cid status reflect the real IPFS status.
|
||||
// It return the local status of the Cid.
|
||||
SyncCid(*cid.Cid) (PinInfo, error)
|
||||
SyncAll() ([]PinInfo, error)
|
||||
// Sync makes sure that the Cid status reflect the real IPFS status.
|
||||
// It returns the local status of the Cid.
|
||||
Sync(*cid.Cid) (PinInfo, error)
|
||||
// Recover retriggers a Pin/Unpin operation in Cids with error status.
|
||||
Recover(*cid.Cid) error
|
||||
Recover(*cid.Cid) (PinInfo, error)
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ func TestClustersPin(t *testing.T) {
|
|||
}
|
||||
delay()
|
||||
fpinned := func(t *testing.T, c *Cluster) {
|
||||
status := c.tracker.Status()
|
||||
status := c.tracker.StatusAll()
|
||||
for _, v := range status {
|
||||
if v.Status != TrackerStatusPinned {
|
||||
t.Errorf("%s should have been pinned but it is %s",
|
||||
|
@ -219,7 +219,7 @@ func TestClustersPin(t *testing.T) {
|
|||
delay()
|
||||
|
||||
funpinned := func(t *testing.T, c *Cluster) {
|
||||
status := c.tracker.Status()
|
||||
status := c.tracker.StatusAll()
|
||||
if l := len(status); l != 0 {
|
||||
t.Errorf("Nothing should be pinned")
|
||||
//t.Errorf("%+v", status)
|
||||
|
@ -228,7 +228,7 @@ func TestClustersPin(t *testing.T) {
|
|||
runF(t, clusters, funpinned)
|
||||
}
|
||||
|
||||
func TestClustersStatus(t *testing.T) {
|
||||
func TestClustersStatusAll(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
h, _ := cid.Decode(testCid)
|
||||
|
@ -236,7 +236,7 @@ func TestClustersStatus(t *testing.T) {
|
|||
delay()
|
||||
// Global status
|
||||
f := func(t *testing.T, c *Cluster) {
|
||||
statuses, err := c.Status()
|
||||
statuses, err := c.StatusAll()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -255,7 +255,7 @@ func TestClustersStatus(t *testing.T) {
|
|||
t.Error("the hash should have been pinned")
|
||||
}
|
||||
|
||||
status, err := c.StatusCid(h)
|
||||
status, err := c.Status(h)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -272,7 +272,7 @@ func TestClustersStatus(t *testing.T) {
|
|||
runF(t, clusters, f)
|
||||
}
|
||||
|
||||
func TestClustersLocalSync(t *testing.T) {
|
||||
func TestClustersSyncAllLocal(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
h, _ := cid.Decode(errorCid) // This cid always fails
|
||||
|
@ -282,7 +282,7 @@ func TestClustersLocalSync(t *testing.T) {
|
|||
delay()
|
||||
f := func(t *testing.T, c *Cluster) {
|
||||
// Sync bad ID
|
||||
infos, err := c.LocalSync()
|
||||
infos, err := c.SyncAllLocal()
|
||||
if err != nil {
|
||||
// LocalSync() is asynchronous and should not show an
|
||||
// error even if Recover() fails.
|
||||
|
@ -300,7 +300,7 @@ func TestClustersLocalSync(t *testing.T) {
|
|||
runF(t, clusters, f)
|
||||
}
|
||||
|
||||
func TestClustersLocalSyncCid(t *testing.T) {
|
||||
func TestClustersSyncLocal(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
h, _ := cid.Decode(errorCid) // This cid always fails
|
||||
|
@ -310,7 +310,7 @@ func TestClustersLocalSyncCid(t *testing.T) {
|
|||
delay()
|
||||
|
||||
f := func(t *testing.T, c *Cluster) {
|
||||
info, err := c.LocalSyncCid(h)
|
||||
info, err := c.SyncLocal(h)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -319,7 +319,7 @@ func TestClustersLocalSyncCid(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sync good ID
|
||||
info, err = c.LocalSyncCid(h2)
|
||||
info, err = c.SyncLocal(h2)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -331,7 +331,7 @@ func TestClustersLocalSyncCid(t *testing.T) {
|
|||
runF(t, clusters, f)
|
||||
}
|
||||
|
||||
func TestClustersGlobalSync(t *testing.T) {
|
||||
func TestClustersSyncAll(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
h, _ := cid.Decode(errorCid) // This cid always fails
|
||||
|
@ -341,7 +341,7 @@ func TestClustersGlobalSync(t *testing.T) {
|
|||
delay()
|
||||
|
||||
j := rand.Intn(nClusters) // choose a random cluster member
|
||||
ginfos, err := clusters[j].GlobalSync()
|
||||
ginfos, err := clusters[j].SyncAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -362,7 +362,7 @@ func TestClustersGlobalSync(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClustersGlobalSyncCid(t *testing.T) {
|
||||
func TestClustersSync(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
h, _ := cid.Decode(errorCid) // This cid always fails
|
||||
|
@ -372,7 +372,7 @@ func TestClustersGlobalSyncCid(t *testing.T) {
|
|||
delay()
|
||||
|
||||
j := rand.Intn(nClusters)
|
||||
ginfo, err := clusters[j].GlobalSyncCid(h)
|
||||
ginfo, err := clusters[j].Sync(h)
|
||||
if err != nil {
|
||||
// we always attempt to return a valid response
|
||||
// with errors contained in GlobalPinInfo
|
||||
|
@ -404,7 +404,97 @@ func TestClustersGlobalSyncCid(t *testing.T) {
|
|||
|
||||
// Test with a good Cid
|
||||
j = rand.Intn(nClusters)
|
||||
ginfo, err = clusters[j].GlobalSyncCid(h2)
|
||||
ginfo, err = clusters[j].Sync(h2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if ginfo.Cid.String() != testCid2 {
|
||||
t.Error("GlobalPinInfo should be for testrCid2")
|
||||
}
|
||||
|
||||
for _, c := range clusters {
|
||||
inf, ok := ginfo.PeerMap[c.host.ID()]
|
||||
if !ok {
|
||||
t.Fatal("GlobalPinInfo should have this cluster")
|
||||
}
|
||||
if inf.Status != TrackerStatusPinned {
|
||||
t.Error("the GlobalPinInfo should show Pinned in all members")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClustersRecoverLocal(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
h, _ := cid.Decode(errorCid) // This cid always fails
|
||||
h2, _ := cid.Decode(testCid2)
|
||||
clusters[0].Pin(h)
|
||||
clusters[0].Pin(h2)
|
||||
|
||||
delay()
|
||||
|
||||
f := func(t *testing.T, c *Cluster) {
|
||||
info, err := c.RecoverLocal(h)
|
||||
if err == nil {
|
||||
t.Error("expected an error recovering")
|
||||
}
|
||||
if info.Status != TrackerStatusPinError {
|
||||
t.Errorf("element is %s and not PinError", info.Status)
|
||||
}
|
||||
|
||||
// Recover good ID
|
||||
info, err = c.SyncLocal(h2)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if info.Status != TrackerStatusPinned {
|
||||
t.Error("element should be in Pinned state")
|
||||
}
|
||||
}
|
||||
// Test Local syncs
|
||||
runF(t, clusters, f)
|
||||
}
|
||||
|
||||
func TestClustersRecover(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
h, _ := cid.Decode(errorCid) // This cid always fails
|
||||
h2, _ := cid.Decode(testCid2)
|
||||
clusters[0].Pin(h)
|
||||
clusters[0].Pin(h2)
|
||||
|
||||
delay()
|
||||
|
||||
j := rand.Intn(nClusters)
|
||||
ginfo, err := clusters[j].Recover(h)
|
||||
if err != nil {
|
||||
// we always attempt to return a valid response
|
||||
// with errors contained in GlobalPinInfo
|
||||
t.Fatal("did not expect an error")
|
||||
}
|
||||
pinfo, ok := ginfo.PeerMap[clusters[j].host.ID()]
|
||||
if !ok {
|
||||
t.Fatal("should have info for this host")
|
||||
}
|
||||
if pinfo.Error == "" {
|
||||
t.Error("pinInfo error should not be empty")
|
||||
}
|
||||
|
||||
for _, c := range clusters {
|
||||
inf, ok := ginfo.PeerMap[c.host.ID()]
|
||||
if !ok {
|
||||
t.Logf("%+v", ginfo)
|
||||
t.Fatal("GlobalPinInfo should not be empty for this host")
|
||||
}
|
||||
|
||||
if inf.Status != TrackerStatusPinError {
|
||||
t.Error("should be PinError in all members")
|
||||
}
|
||||
}
|
||||
|
||||
// Test with a good Cid
|
||||
j = rand.Intn(nClusters)
|
||||
ginfo, err = clusters[j].Recover(h2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -207,13 +207,13 @@ func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
|
|||
|
||||
// StatusCid returns information for a Cid tracked by this
|
||||
// MapPinTracker.
|
||||
func (mpt *MapPinTracker) StatusCid(c *cid.Cid) PinInfo {
|
||||
func (mpt *MapPinTracker) Status(c *cid.Cid) PinInfo {
|
||||
return mpt.get(c)
|
||||
}
|
||||
|
||||
// Status returns information for all Cids tracked by this
|
||||
// MapPinTracker.
|
||||
func (mpt *MapPinTracker) Status() []PinInfo {
|
||||
func (mpt *MapPinTracker) StatusAll() []PinInfo {
|
||||
mpt.mux.Lock()
|
||||
defer mpt.mux.Unlock()
|
||||
pins := make([]PinInfo, 0, len(mpt.status))
|
||||
|
@ -223,15 +223,15 @@ func (mpt *MapPinTracker) Status() []PinInfo {
|
|||
return pins
|
||||
}
|
||||
|
||||
// SyncCid verifies that the status of a Cid matches that of
|
||||
// Sync verifies that the status of a Cid matches that of
|
||||
// the IPFS daemon. If not, it will be transitioned
|
||||
// to PinError or UnpinError.
|
||||
//
|
||||
// SyncCid returns the updated local status for the given Cid.
|
||||
// Sync returns the updated local status for the given Cid.
|
||||
// Pins in error states can be recovered with Recover().
|
||||
// An error is returned if we are unable to contact
|
||||
// the IPFS daemon.
|
||||
func (mpt *MapPinTracker) SyncCid(c *cid.Cid) (PinInfo, error) {
|
||||
func (mpt *MapPinTracker) Sync(c *cid.Cid) (PinInfo, error) {
|
||||
var ips IPFSPinStatus
|
||||
err := mpt.rpcClient.Call("",
|
||||
"Cluster",
|
||||
|
@ -245,15 +245,15 @@ func (mpt *MapPinTracker) SyncCid(c *cid.Cid) (PinInfo, error) {
|
|||
return mpt.syncStatus(c, ips), nil
|
||||
}
|
||||
|
||||
// Sync verifies that the statuses of all tracked Cids match the
|
||||
// SyncAll verifies that the statuses of all tracked Cids match the
|
||||
// one reported by the IPFS daemon. If not, they will be transitioned
|
||||
// to PinError or UnpinError.
|
||||
//
|
||||
// Sync returns the list of local status for all tracked Cids which
|
||||
// SyncAll returns the list of local status for all tracked Cids which
|
||||
// were updated or have errors. Cids in error states can be recovered
|
||||
// with Recover().
|
||||
// An error is returned if we are unable to contact the IPFS daemon.
|
||||
func (mpt *MapPinTracker) Sync() ([]PinInfo, error) {
|
||||
func (mpt *MapPinTracker) SyncAll() ([]PinInfo, error) {
|
||||
var ipsMap map[string]IPFSPinStatus
|
||||
var pInfos []PinInfo
|
||||
err := mpt.rpcClient.Call("",
|
||||
|
@ -272,7 +272,7 @@ func (mpt *MapPinTracker) Sync() ([]PinInfo, error) {
|
|||
return pInfos, err
|
||||
}
|
||||
|
||||
status := mpt.Status()
|
||||
status := mpt.StatusAll()
|
||||
for _, pInfoOrig := range status {
|
||||
c, err := cid.Decode(pInfoOrig.CidStr)
|
||||
if err != nil { // this should not happen but let's play safe
|
||||
|
@ -333,11 +333,11 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips IPFSPinStatus) PinInfo {
|
|||
// Recover will re-track or re-untrack a Cid in error state,
|
||||
// possibly retriggering an IPFS pinning operation and returning
|
||||
// only when it is done.
|
||||
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
|
||||
func (mpt *MapPinTracker) Recover(c *cid.Cid) (PinInfo, error) {
|
||||
p := mpt.get(c)
|
||||
if p.Status != TrackerStatusPinError &&
|
||||
p.Status != TrackerStatusUnpinError {
|
||||
return nil
|
||||
return p, nil
|
||||
}
|
||||
logger.Infof("Recovering %s", c)
|
||||
var err error
|
||||
|
@ -349,9 +349,8 @@ func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
|
|||
}
|
||||
if err != nil {
|
||||
logger.Errorf("error recovering %s: %s", c, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return mpt.get(c), err
|
||||
}
|
||||
|
||||
// SetClient makes the MapPinTracker ready to perform RPC requests to
|
||||
|
|
80
rest_api.go
80
rest_api.go
|
@ -163,23 +163,45 @@ func (api *RESTAPI) routes() []route {
|
|||
"/id",
|
||||
api.idHandler,
|
||||
},
|
||||
|
||||
{
|
||||
"Version",
|
||||
"GET",
|
||||
"/version",
|
||||
api.versionHandler,
|
||||
},
|
||||
|
||||
{
|
||||
"Members",
|
||||
"GET",
|
||||
"/members",
|
||||
api.memberListHandler,
|
||||
},
|
||||
|
||||
{
|
||||
"Pins",
|
||||
"GET",
|
||||
"/pins",
|
||||
"/pinlist",
|
||||
api.pinListHandler,
|
||||
},
|
||||
|
||||
{
|
||||
"Version",
|
||||
"StatusAll",
|
||||
"GET",
|
||||
"/version",
|
||||
api.versionHandler,
|
||||
"/pins",
|
||||
api.statusAllHandler,
|
||||
},
|
||||
{
|
||||
"SyncAll",
|
||||
"POST",
|
||||
"/pins/sync",
|
||||
api.syncAllHandler,
|
||||
},
|
||||
{
|
||||
"Status",
|
||||
"GET",
|
||||
"/pins/{hash}",
|
||||
api.statusHandler,
|
||||
},
|
||||
{
|
||||
"Pin",
|
||||
|
@ -193,29 +215,17 @@ func (api *RESTAPI) routes() []route {
|
|||
"/pins/{hash}",
|
||||
api.unpinHandler,
|
||||
},
|
||||
{
|
||||
"Status",
|
||||
"GET",
|
||||
"/status",
|
||||
api.statusHandler,
|
||||
},
|
||||
{
|
||||
"StatusCid",
|
||||
"GET",
|
||||
"/status/{hash}",
|
||||
api.statusCidHandler,
|
||||
},
|
||||
{
|
||||
"Sync",
|
||||
"POST",
|
||||
"/status",
|
||||
"/pins/{hash}/sync",
|
||||
api.syncHandler,
|
||||
},
|
||||
{
|
||||
"SyncCid",
|
||||
"Recover",
|
||||
"POST",
|
||||
"/status/{hash}",
|
||||
api.syncCidHandler,
|
||||
"/pins/{hash}/recover",
|
||||
api.recoverHandler,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -367,11 +377,11 @@ func (api *RESTAPI) pinListHandler(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
}
|
||||
|
||||
func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func (api *RESTAPI) statusAllHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var pinInfos []GlobalPinInfo
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"Status",
|
||||
"StatusAll",
|
||||
struct{}{},
|
||||
&pinInfos)
|
||||
if checkRPCErr(w, err) {
|
||||
|
@ -379,12 +389,12 @@ func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if c := parseCidOrError(w, r); c != nil {
|
||||
var pinInfo GlobalPinInfo
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"StatusCid",
|
||||
"Status",
|
||||
c,
|
||||
&pinInfo)
|
||||
if checkRPCErr(w, err) {
|
||||
|
@ -393,11 +403,11 @@ func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func (api *RESTAPI) syncAllHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var pinInfos []GlobalPinInfo
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"GlobalSync",
|
||||
"SyncAll",
|
||||
struct{}{},
|
||||
&pinInfos)
|
||||
if checkRPCErr(w, err) {
|
||||
|
@ -405,12 +415,26 @@ func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (api *RESTAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if c := parseCidOrError(w, r); c != nil {
|
||||
var pinInfo GlobalPinInfo
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"GlobalSyncCid",
|
||||
"Sync",
|
||||
c,
|
||||
&pinInfo)
|
||||
if checkRPCErr(w, err) {
|
||||
sendStatusCidResponse(w, http.StatusOK, pinInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (api *RESTAPI) recoverHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if c := parseCidOrError(w, r); c != nil {
|
||||
var pinInfo GlobalPinInfo
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"Recover",
|
||||
c,
|
||||
&pinInfo)
|
||||
if checkRPCErr(w, err) {
|
||||
|
|
|
@ -147,7 +147,7 @@ func TestRESTAPIPinListEndpoint(t *testing.T) {
|
|||
defer api.Shutdown()
|
||||
|
||||
var resp []string
|
||||
makeGet(t, "/pins", &resp)
|
||||
makeGet(t, "/pinlist", &resp)
|
||||
if len(resp) != 3 ||
|
||||
resp[0] != testCid1 || resp[1] != testCid2 ||
|
||||
resp[2] != testCid3 {
|
||||
|
@ -155,25 +155,25 @@ func TestRESTAPIPinListEndpoint(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRESTAPIStatusAllEndpoint(t *testing.T) {
|
||||
api := testRESTAPI(t)
|
||||
defer api.Shutdown()
|
||||
|
||||
var resp statusResp
|
||||
makeGet(t, "/pins", &resp)
|
||||
if len(resp) != 3 ||
|
||||
resp[0].Cid != testCid1 ||
|
||||
resp[1].PeerMap[testPeerID.Pretty()].Status != "pinning" {
|
||||
t.Errorf("unexpected statusResp:\n %+v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRESTAPIStatusEndpoint(t *testing.T) {
|
||||
api := testRESTAPI(t)
|
||||
defer api.Shutdown()
|
||||
|
||||
var resp statusResp
|
||||
makeGet(t, "/status", &resp)
|
||||
if len(resp) != 3 ||
|
||||
resp[0].Cid != testCid1 ||
|
||||
resp[1].PeerMap[testPeerID.Pretty()].Status != "pinning" {
|
||||
t.Errorf("unexpected statusResp:\n %+v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRESTAPIStatusCidEndpoint(t *testing.T) {
|
||||
api := testRESTAPI(t)
|
||||
defer api.Shutdown()
|
||||
|
||||
var resp statusCidResp
|
||||
makeGet(t, "/status/"+testCid, &resp)
|
||||
makeGet(t, "/pins/"+testCid, &resp)
|
||||
|
||||
if resp.Cid != testCid {
|
||||
t.Error("expected the same cid")
|
||||
|
@ -187,12 +187,12 @@ func TestRESTAPIStatusCidEndpoint(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRESTAPIStatusSyncEndpoint(t *testing.T) {
|
||||
func TestRESTAPISyncAllEndpoint(t *testing.T) {
|
||||
api := testRESTAPI(t)
|
||||
defer api.Shutdown()
|
||||
|
||||
var resp statusResp
|
||||
makePost(t, "/status", &resp)
|
||||
makePost(t, "/pins/sync", &resp)
|
||||
|
||||
if len(resp) != 3 ||
|
||||
resp[0].Cid != testCid1 ||
|
||||
|
@ -201,12 +201,31 @@ func TestRESTAPIStatusSyncEndpoint(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRESTAPIStatusSyncCidEndpoint(t *testing.T) {
|
||||
func TestRESTAPISyncEndpoint(t *testing.T) {
|
||||
api := testRESTAPI(t)
|
||||
defer api.Shutdown()
|
||||
|
||||
var resp statusCidResp
|
||||
makePost(t, "/status/"+testCid, &resp)
|
||||
makePost(t, "/pins/"+testCid+"/sync", &resp)
|
||||
|
||||
if resp.Cid != testCid {
|
||||
t.Error("expected the same cid")
|
||||
}
|
||||
info, ok := resp.PeerMap[testPeerID.Pretty()]
|
||||
if !ok {
|
||||
t.Fatal("expected info for testPeerID")
|
||||
}
|
||||
if info.Status != "pinned" {
|
||||
t.Error("expected different status")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRESTAPIRecoverEndpoint(t *testing.T) {
|
||||
api := testRESTAPI(t)
|
||||
defer api.Shutdown()
|
||||
|
||||
var resp statusCidResp
|
||||
makePost(t, "/pins/"+testCid+"/recover", &resp)
|
||||
|
||||
if resp.Cid != testCid {
|
||||
t.Error("expected the same cid")
|
||||
|
|
76
rpc_api.go
76
rpc_api.go
|
@ -91,56 +91,56 @@ func (api *RPCAPI) MemberList(in struct{}, out *[]peer.ID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// StatusAll runs Cluster.StatusAll().
|
||||
func (api *RPCAPI) StatusAll(in struct{}, out *[]GlobalPinInfo) error {
|
||||
pinfo, err := api.cluster.StatusAll()
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
// Status runs Cluster.Status().
|
||||
func (api *RPCAPI) Status(in struct{}, out *[]GlobalPinInfo) error {
|
||||
pinfo, err := api.cluster.Status()
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
// StatusCid runs Cluster.StatusCid().
|
||||
func (api *RPCAPI) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
||||
func (api *RPCAPI) Status(in *CidArg, out *GlobalPinInfo) error {
|
||||
c, err := in.CID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pinfo, err := api.cluster.StatusCid(c)
|
||||
pinfo, err := api.cluster.Status(c)
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
// LocalSync runs Cluster.LocalSync().
|
||||
func (api *RPCAPI) LocalSync(in struct{}, out *[]PinInfo) error {
|
||||
pinfo, err := api.cluster.LocalSync()
|
||||
// SyncAllLocal runs Cluster.SyncAllLocal().
|
||||
func (api *RPCAPI) SyncAllLocal(in struct{}, out *[]PinInfo) error {
|
||||
pinfo, err := api.cluster.SyncAllLocal()
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
// LocalSyncCid runs Cluster.LocalSyncCid().
|
||||
func (api *RPCAPI) LocalSyncCid(in *CidArg, out *PinInfo) error {
|
||||
// SyncLocal runs Cluster.SyncLocal().
|
||||
func (api *RPCAPI) SyncLocal(in *CidArg, out *PinInfo) error {
|
||||
c, err := in.CID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pinfo, err := api.cluster.LocalSyncCid(c)
|
||||
pinfo, err := api.cluster.SyncLocal(c)
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
// GlobalSync runs Cluster.GlobalSync().
|
||||
func (api *RPCAPI) GlobalSync(in struct{}, out *[]GlobalPinInfo) error {
|
||||
pinfo, err := api.cluster.GlobalSync()
|
||||
// SyncAll runs Cluster.SyncAll().
|
||||
func (api *RPCAPI) SyncAll(in struct{}, out *[]GlobalPinInfo) error {
|
||||
pinfo, err := api.cluster.SyncAll()
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
// GlobalSyncCid runs Cluster.GlobalSyncCid().
|
||||
func (api *RPCAPI) GlobalSyncCid(in *CidArg, out *GlobalPinInfo) error {
|
||||
// Sync runs Cluster.Sync().
|
||||
func (api *RPCAPI) Sync(in *CidArg, out *GlobalPinInfo) error {
|
||||
c, err := in.CID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pinfo, err := api.cluster.GlobalSyncCid(c)
|
||||
pinfo, err := api.cluster.Sync(c)
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
@ -152,6 +152,17 @@ func (api *RPCAPI) StateSync(in struct{}, out *[]PinInfo) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Recover runs Cluster.Recover().
|
||||
func (api *RPCAPI) Recover(in *CidArg, out *GlobalPinInfo) error {
|
||||
c, err := in.CID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pinfo, err := api.cluster.Recover(c)
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
Tracker component methods
|
||||
*/
|
||||
|
@ -174,24 +185,33 @@ func (api *RPCAPI) Untrack(in *CidArg, out *struct{}) error {
|
|||
return api.cluster.tracker.Untrack(c)
|
||||
}
|
||||
|
||||
// TrackerStatus runs PinTracker.Status().
|
||||
func (api *RPCAPI) TrackerStatus(in struct{}, out *[]PinInfo) error {
|
||||
*out = api.cluster.tracker.Status()
|
||||
// TrackerStatusAll runs PinTracker.StatusAll().
|
||||
func (api *RPCAPI) TrackerStatusAll(in struct{}, out *[]PinInfo) error {
|
||||
*out = api.cluster.tracker.StatusAll()
|
||||
return nil
|
||||
}
|
||||
|
||||
// TrackerStatusCid runs PinTracker.StatusCid().
|
||||
func (api *RPCAPI) TrackerStatusCid(in *CidArg, out *PinInfo) error {
|
||||
// TrackerStatus runs PinTracker.Status().
|
||||
func (api *RPCAPI) TrackerStatus(in *CidArg, out *PinInfo) error {
|
||||
c, err := in.CID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pinfo := api.cluster.tracker.StatusCid(c)
|
||||
pinfo := api.cluster.tracker.Status(c)
|
||||
*out = pinfo
|
||||
return nil
|
||||
}
|
||||
|
||||
// TrackerRecover not sure if needed
|
||||
// TrackerRecover runs PinTracker.Recover().
|
||||
func (api *RPCAPI) TrackerRecover(in *CidArg, out *PinInfo) error {
|
||||
c, err := in.CID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pinfo, err := api.cluster.tracker.Recover(c)
|
||||
*out = pinfo
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
IPFS Connector component methods
|
||||
|
|
|
@ -66,7 +66,7 @@ func (mock *mockService) MemberList(in struct{}, out *[]peer.ID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) Status(in struct{}, out *[]GlobalPinInfo) error {
|
||||
func (mock *mockService) StatusAll(in struct{}, out *[]GlobalPinInfo) error {
|
||||
c1, _ := cid.Decode(testCid1)
|
||||
c2, _ := cid.Decode(testCid2)
|
||||
c3, _ := cid.Decode(testCid3)
|
||||
|
@ -108,7 +108,7 @@ func (mock *mockService) Status(in struct{}, out *[]GlobalPinInfo) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
||||
func (mock *mockService) Status(in *CidArg, out *GlobalPinInfo) error {
|
||||
if in.Cid == errorCid {
|
||||
return errBadCid
|
||||
}
|
||||
|
@ -127,12 +127,12 @@ func (mock *mockService) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) GlobalSync(in struct{}, out *[]GlobalPinInfo) error {
|
||||
return mock.Status(in, out)
|
||||
func (mock *mockService) SyncAll(in struct{}, out *[]GlobalPinInfo) error {
|
||||
return mock.StatusAll(in, out)
|
||||
}
|
||||
|
||||
func (mock *mockService) GlobalSyncCid(in *CidArg, out *GlobalPinInfo) error {
|
||||
return mock.StatusCid(in, out)
|
||||
func (mock *mockService) Sync(in *CidArg, out *GlobalPinInfo) error {
|
||||
return mock.Status(in, out)
|
||||
}
|
||||
|
||||
func (mock *mockService) StateSync(in struct{}, out *[]PinInfo) error {
|
||||
|
@ -140,6 +140,10 @@ func (mock *mockService) StateSync(in struct{}, out *[]PinInfo) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) Recover(in *CidArg, out *GlobalPinInfo) error {
|
||||
return mock.Status(in, out)
|
||||
}
|
||||
|
||||
func (mock *mockService) Track(in *CidArg, out *struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user