Merge pull request #34 from ipfs/8-sync-all

Fixes #8: Reworking PinTracker (sync/recover ops)
This commit is contained in:
Hector Sanjuan 2017-01-26 12:52:46 +01:00 committed by GitHub
commit 495ac27c57
12 changed files with 745 additions and 396 deletions

View File

@ -40,6 +40,7 @@ type Cluster struct {
wg sync.WaitGroup
}
// ID holds information about the Cluster peer
type ID struct {
ID peer.ID
PublicKey crypto.PubKey
@ -137,6 +138,7 @@ func (c *Cluster) Shutdown() error {
return nil
}
// ID returns information about the Cluster peer
func (c *Cluster) ID() ID {
return ID{
ID: c.host.ID(),
@ -163,7 +165,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
// Track items which are not tracked
for _, h := range clusterPins {
if c.tracker.StatusCid(h).IPFS == Unpinned {
if c.tracker.Status(h).Status == TrackerStatusUnpinned {
changed = append(changed, h)
err := c.rpcClient.Go("",
"Cluster",
@ -179,7 +181,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
}
// Untrack items which should not be tracked
for _, p := range c.tracker.Status() {
for _, p := range c.tracker.StatusAll() {
h, _ := cid.Decode(p.CidStr)
if !cState.HasPin(h) {
changed = append(changed, h)
@ -197,82 +199,75 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
var infos []PinInfo
for _, h := range changed {
infos = append(infos, c.tracker.StatusCid(h))
infos = append(infos, c.tracker.Status(h))
}
return infos, nil
}
// Status returns the GlobalPinInfo for all tracked Cids. If an error happens,
// the slice will contain as much information as could be fetched.
func (c *Cluster) Status() ([]GlobalPinInfo, error) {
return c.globalPinInfoSlice("TrackerStatus")
// StatusAll returns the GlobalPinInfo for all tracked Cids. If an error
// happens, the slice will contain as much information as could be fetched.
func (c *Cluster) StatusAll() ([]GlobalPinInfo, error) {
return c.globalPinInfoSlice("TrackerStatusAll")
}
// StatusCid returns the GlobalPinInfo for a given Cid. If an error happens,
// Status returns the GlobalPinInfo for a given Cid. If an error happens,
// the GlobalPinInfo should contain as much information as could be fetched.
func (c *Cluster) StatusCid(h *cid.Cid) (GlobalPinInfo, error) {
return c.globalPinInfoCid("TrackerStatusCid", h)
func (c *Cluster) Status(h *cid.Cid) (GlobalPinInfo, error) {
return c.globalPinInfoCid("TrackerStatus", h)
}
// LocalSync makes sure that the current state the Tracker matches
// the IPFS daemon state by triggering a Tracker.Sync() and Recover()
// on all items that need it. Returns PinInfo for items changed on Sync().
// SyncAllLocal makes sure that the current state for all tracked items
// matches the state reported by the IPFS daemon.
//
// LocalSync triggers recoveries asynchronously, and will not wait for
// them to fail or succeed before returning.
func (c *Cluster) LocalSync() ([]PinInfo, error) {
status := c.tracker.Status()
var toRecover []*cid.Cid
for _, p := range status {
h, _ := cid.Decode(p.CidStr)
modified := c.tracker.Sync(h)
if modified {
toRecover = append(toRecover, h)
}
// SyncAllLocal returns the list of PinInfo that where updated because of
// the operation, along with those in error states.
func (c *Cluster) SyncAllLocal() ([]PinInfo, error) {
syncedItems, err := c.tracker.SyncAll()
// Despite errors, tracker provides synced items that we can provide.
// They encapsulate the error.
if err != nil {
logger.Error("tracker.Sync() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
logger.Error("LocalSync returning without attempting recovers")
}
logger.Infof("%d items to recover after sync", len(toRecover))
for i, h := range toRecover {
logger.Infof("recovering in progress for %s (%d/%d",
h, i, len(toRecover))
go func(h *cid.Cid) {
c.tracker.Recover(h)
}(h)
}
var changed []PinInfo
for _, h := range toRecover {
changed = append(changed, c.tracker.StatusCid(h))
}
return changed, nil
return syncedItems, err
}
// LocalSyncCid performs a Tracker.Sync() operation followed by a
// Recover() when needed. It returns the latest known PinInfo for the Cid.
//
// LocalSyncCid will wait for the Recover operation to fail or succeed before
// returning.
func (c *Cluster) LocalSyncCid(h *cid.Cid) (PinInfo, error) {
// SyncLocal performs a local sync operation for the given Cid. This will
// tell the tracker to verify the status of the Cid against the IPFS daemon.
// It returns the updated PinInfo for the Cid.
func (c *Cluster) SyncLocal(h *cid.Cid) (PinInfo, error) {
var err error
if c.tracker.Sync(h) {
err = c.tracker.Recover(h)
pInfo, err := c.tracker.Sync(h)
// Despite errors, trackers provides an updated PinInfo so
// we just log it.
if err != nil {
logger.Error("tracker.SyncCid() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
}
return c.tracker.StatusCid(h), err
return pInfo, err
}
// GlobalSync triggers Sync() operations in all members of the Cluster.
func (c *Cluster) GlobalSync() ([]GlobalPinInfo, error) {
return c.globalPinInfoSlice("LocalSync")
// SyncAll triggers LocalSync() operations in all members of the Cluster.
func (c *Cluster) SyncAll() ([]GlobalPinInfo, error) {
return c.globalPinInfoSlice("SyncAllLocal")
}
// GlobalSyncCid triggers a LocalSyncCid() operation for a given Cid
// Sync triggers a LocalSyncCid() operation for a given Cid
// in all members of the Cluster.
//
// GlobalSyncCid will only return when all operations have either failed,
// succeeded or timed-out.
func (c *Cluster) GlobalSyncCid(h *cid.Cid) (GlobalPinInfo, error) {
return c.globalPinInfoCid("LocalSyncCid", h)
func (c *Cluster) Sync(h *cid.Cid) (GlobalPinInfo, error) {
return c.globalPinInfoCid("SyncLocal", h)
}
// RecoverLocal triggers a recover operation for a given Cid
func (c *Cluster) RecoverLocal(h *cid.Cid) (PinInfo, error) {
return c.tracker.Recover(h)
}
// Recover triggers a recover operation for a given Cid in all
// members of the Cluster.
func (c *Cluster) Recover(h *cid.Cid) (GlobalPinInfo, error) {
return c.globalPinInfoCid("TrackerRecover", h)
}
// Pins returns the list of Cids managed by Cluster and which are part
@ -310,7 +305,7 @@ func (c *Cluster) Pin(h *cid.Cid) error {
// to the global state. Unpin does not reflect the success or failure
// of underlying IPFS daemon unpinning operations.
func (c *Cluster) Unpin(h *cid.Cid) error {
logger.Info("pinning:", h)
logger.Info("unpinning:", h)
err := c.consensus.LogUnpin(h)
if err != nil {
return err
@ -422,8 +417,8 @@ func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args inte
func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, error) {
pin := GlobalPinInfo{
Cid: h,
Status: make(map[peer.ID]PinInfo),
Cid: h,
PeerMap: make(map[peer.ID]PinInfo),
}
members := c.Members()
@ -436,13 +431,13 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er
errs := c.multiRPC(members, "Cluster", method, args, ifaceReplies)
for i, r := range replies {
if e := errs[i]; e != nil {
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
if r.IPFS == Bug {
if r.Status == TrackerStatusBug {
r = PinInfo{
CidStr: h.String(),
Peer: members[i],
IPFS: ClusterError,
Status: TrackerStatusClusterError,
TS: time.Now(),
Error: e.Error(),
}
@ -450,7 +445,7 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er
r.Error = e.Error()
}
}
pin.Status[members[i]] = r
pin.PeerMap[members[i]] = r
}
return pin, nil
@ -476,19 +471,19 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
if !ok {
fullMap[p.CidStr] = GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
p.Peer: p,
},
}
} else {
item.Status[p.Peer] = p
item.PeerMap[p.Peer] = p
}
}
}
erroredPeers := make(map[peer.ID]string)
for i, r := range replies {
if e := errs[i]; e != nil {
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
erroredPeers[members[i]] = e.Error()
} else {
@ -499,10 +494,10 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
// Merge any errors
for p, msg := range erroredPeers {
for c := range fullMap {
fullMap[c].Status[p] = PinInfo{
fullMap[c].PeerMap[p] = PinInfo{
CidStr: c,
Peer: p,
IPFS: ClusterError,
Status: TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
}

View File

@ -44,11 +44,19 @@ func (ipfs *mockConnector) Unpin(c *cid.Cid) error {
return nil
}
func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) {
func (ipfs *mockConnector) PinLsCid(c *cid.Cid) (IPFSPinStatus, error) {
if ipfs.returnError {
return false, errors.New("")
return IPFSPinStatusError, errors.New("")
}
return true, nil
return IPFSPinStatusRecursive, nil
}
func (ipfs *mockConnector) PinLs() (map[string]IPFSPinStatus, error) {
if ipfs.returnError {
return nil, errors.New("")
}
m := make(map[string]IPFSPinStatus)
return m, nil
}
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *MapState, *MapPinTracker) {

View File

@ -18,7 +18,8 @@ import (
const programName = `ipfs-cluster-ctl`
// Version
// Version is the cluster-ctl tool version. It should match
// the IPFS cluster's version
const Version = "0.0.2"
var (
@ -162,7 +163,7 @@ in the cluster and should be part of the list offered by "pin ls".
checkErr("parsing cid", err)
request("POST", "/pins/"+cidStr)
time.Sleep(500 * time.Millisecond)
resp := request("GET", "/status/"+cidStr)
resp := request("GET", "/pins/"+cidStr)
formatResponse(resp)
return nil
},
@ -175,7 +176,7 @@ This command tells IPFS Cluster to no longer manage a CID. This will
trigger unpinning operations in all the IPFS nodes holding the content.
When the request has succeeded, the command returns the status of the CID
in the cluster. The CID should dissapear from the list offered by "pin ls",
in the cluster. The CID should disappear from the list offered by "pin ls",
although unpinning operations in the cluster may take longer or fail.
`,
ArgsUsage: "<cid>",
@ -185,7 +186,7 @@ although unpinning operations in the cluster may take longer or fail.
checkErr("parsing cid", err)
request("DELETE", "/pins/"+cidStr)
time.Sleep(500 * time.Millisecond)
resp := request("GET", "/status/"+cidStr)
resp := request("GET", "/pins/"+cidStr)
formatResponse(resp)
return nil
},
@ -200,7 +201,7 @@ merely represents the list of pins which are part of the global state of
the cluster. For specific information, use "status".
`,
Action: func(c *cli.Context) error {
resp := request("GET", "/pins")
resp := request("GET", "/pinlist")
formatResponse(resp)
return nil
},
@ -215,6 +216,9 @@ This command retrieves the status of the CIDs tracked by IPFS
Cluster, including which member is pinning them and any errors.
If a CID is provided, the status will be only fetched for a single
item.
The status of a CID may not be accurate. A manual sync can be triggered
with "sync".
`,
ArgsUsage: "[cid]",
Action: func(c *cli.Context) error {
@ -223,33 +227,67 @@ item.
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
}
resp := request("GET", "/status/"+cidStr)
resp := request("GET", "/pins/"+cidStr)
formatResponse(resp)
return nil
},
},
{
Name: "sync",
Usage: "Sync status and/or recover tracked items",
Usage: "Sync status of tracked items",
UsageText: `
This command verifies that the current status tracked CIDs are accurate by
triggering queries to the IPFS daemons that pin them. When the CID is in
error state, either because pinning or unpinning failed, IPFS Cluster will
attempt to retry the operation. If a CID is provided, the sync and recover
operations will be limited to that single item.
This command asks Cluster peers to verify that the current status of tracked
CIDs is accurate by triggering queries to the IPFS daemons that pin them.
If a CID is provided, the sync and recover operations will be limited to
that single item.
Unless providing a specific CID, the command will output only items which
have changed status because of the sync or are in error state in some node,
therefore, the output should be empty if no operations were performed.
CIDs in error state may be manually recovered with "recover".
`,
ArgsUsage: "[cid]",
Action: func(c *cli.Context) error {
cidStr := c.Args().First()
var resp *http.Response
if cidStr != "" {
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp = request("POST", "/pins/"+cidStr+"/sync")
} else {
resp = request("POST", "/pins/sync")
}
resp := request("POST", "/status/"+cidStr)
formatResponse(resp)
return nil
},
},
{
Name: "recover",
Usage: "Recover tracked items in error state",
UsageText: `
This command asks Cluster peers to re-track or re-forget an item which is in
error state, usually because the IPFS pin or unpin operation has failed.
The command will wait for any operations to succeed and will return the status
of the item upon completion.
`,
ArgsUsage: "<cid>",
Action: func(c *cli.Context) error {
cidStr := c.Args().First()
var resp *http.Response
if cidStr != "" {
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp = request("POST", "/pins/"+cidStr+"/recover")
formatResponse(resp)
} else {
return cli.NewExitError("A CID is required", 1)
}
return nil
},
},
{
Name: "version",
Usage: "Retrieve cluster version",

View File

@ -65,6 +65,12 @@ type ipfsError struct {
Message string
}
type pinLsResp struct {
Keys map[string]struct {
Type string
}
}
// NewIPFSHTTPConnector creates the component and leaves it ready to be started
func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
ctx := context.Background()
@ -225,11 +231,11 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error {
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
pinned, err := ipfs.IsPinned(hash)
pinStatus, err := ipfs.PinLsCid(hash)
if err != nil {
return err
}
if !pinned {
if !pinStatus.IsPinned() {
path := fmt.Sprintf("pin/add?arg=%s", hash)
_, err = ipfs.get(path)
if err == nil {
@ -244,11 +250,11 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
pinned, err := ipfs.IsPinned(hash)
pinStatus, err := ipfs.PinLsCid(hash)
if err != nil {
return err
}
if pinned {
if pinStatus.IsPinned() {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.get(path)
if err == nil {
@ -261,58 +267,73 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
return nil
}
// IsPinned performs a "pin ls" request against the configured IPFS
// daemon. It returns true when the given Cid is pinned not indirectly.
func (ipfs *IPFSHTTPConnector) IsPinned(hash *cid.Cid) (bool, error) {
pinType, err := ipfs.pinType(hash)
if err != nil {
return false, err
func parseIPFSPinType(t string) IPFSPinStatus {
switch {
case t == "indirect":
return IPFSPinStatusIndirect
case t == "direct":
return IPFSPinStatusDirect
case t == "recursive":
return IPFSPinStatusRecursive
default:
return IPFSPinStatusBug
}
if pinType == "unpinned" || strings.Contains(pinType, "indirect") {
return false, nil
}
return true, nil
}
// pinType performs a pin ls request and returns the information associated
// to the key. Unfortunately, the daemon does not provide an standarized
// output, so it may well be a sentence like "$hash is indirectly pinned through
// $otherhash".
func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
// PinLs performs a "pin ls" request against the configured IPFS daemon and
// returns a map of cid strings and their status.
func (ipfs *IPFSHTTPConnector) PinLs() (map[string]IPFSPinStatus, error) {
body, err := ipfs.get("pin/ls")
// Some error talking to the daemon
if err != nil {
return nil, err
}
var resp pinLsResp
err = json.Unmarshal(body, &resp)
if err != nil {
logger.Error("parsing pin/ls response")
logger.Error(string(body))
return nil, err
}
statusMap := make(map[string]IPFSPinStatus)
for k, v := range resp.Keys {
statusMap[k] = parseIPFSPinType(v.Type)
}
return statusMap, nil
}
// PinLsCid performs a "pin ls <hash> "request and returns IPFSPinStatus for
// that hash.
func (ipfs *IPFSHTTPConnector) PinLsCid(hash *cid.Cid) (IPFSPinStatus, error) {
lsPath := fmt.Sprintf("pin/ls?arg=%s", hash)
body, err := ipfs.get(lsPath)
// Network error, daemon down
if body == nil && err != nil {
return "", err
return IPFSPinStatusError, err
}
// Pin not found likely here
if err != nil { // Not pinned
return "unpinned", nil
}
// What type of pin it is
var resp struct {
Keys map[string]struct {
Type string
}
return IPFSPinStatusUnpinned, nil
}
var resp pinLsResp
err = json.Unmarshal(body, &resp)
if err != nil {
logger.Error("parsing pin/ls response:")
logger.Error("parsing pin/ls?arg=cid response:")
logger.Error(string(body))
return "", err
return IPFSPinStatusError, err
}
pinObj, ok := resp.Keys[hash.String()]
if !ok {
return "", errors.New("expected to find the pin in the response")
return IPFSPinStatusError, errors.New("expected to find the pin in the response")
}
pinType := pinObj.Type
logger.Debug("pinType check: ", pinType)
return pinType, nil
return parseIPFSPinType(pinObj.Type), nil
}
// get performs the heavy lifting of a get request against

View File

@ -43,11 +43,11 @@ func TestIPFSPin(t *testing.T) {
if err != nil {
t.Error("expected success pinning cid")
}
yes, err := ipfs.IsPinned(c)
pinSt, err := ipfs.PinLsCid(c)
if err != nil {
t.Fatal("expected success doing ls")
}
if !yes {
if !pinSt.IsPinned() {
t.Error("cid should have been pinned")
}
@ -74,7 +74,7 @@ func TestIPFSUnpin(t *testing.T) {
}
}
func TestIPFSIsPinned(t *testing.T) {
func TestIPFSPinLsCid(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
@ -82,17 +82,40 @@ func TestIPFSIsPinned(t *testing.T) {
c2, _ := cid.Decode(testCid2)
ipfs.Pin(c)
isp, err := ipfs.IsPinned(c)
if err != nil || !isp {
ips, err := ipfs.PinLsCid(c)
if err != nil || !ips.IsPinned() {
t.Error("c should appear pinned")
}
isp, err = ipfs.IsPinned(c2)
if err != nil || isp {
ips, err = ipfs.PinLsCid(c2)
if err != nil || ips != IPFSPinStatusUnpinned {
t.Error("c2 should appear unpinned")
}
}
func TestIPFSPinLs(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
c, _ := cid.Decode(testCid)
c2, _ := cid.Decode(testCid2)
ipfs.Pin(c)
ipfs.Pin(c2)
ipsMap, err := ipfs.PinLs()
if err != nil {
t.Error("should not error")
}
if len(ipsMap) != 2 {
t.Fatal("the map does not contain expected keys")
}
if !ipsMap[testCid].IsPinned() || !ipsMap[testCid2].IsPinned() {
t.Error("c1 and c2 should appear pinned")
}
}
func TestIPFSProxy(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()

View File

@ -42,36 +42,54 @@ func SetLogLevel(l string) {
//logging.SetLogLevel("libp2p-raft", l)
}
// IPFSStatus values
// TrackerStatus values
const (
// IPFSStatus should never take this value
Bug = iota
TrackerStatusBug = iota
// The cluster node is offline or not responding
ClusterError
TrackerStatusClusterError
// An error occurred pinning
PinError
TrackerStatusPinError
// An error occurred unpinning
UnpinError
TrackerStatusUnpinError
// The IPFS daemon has pinned the item
Pinned
TrackerStatusPinned
// The IPFS daemon is currently pinning the item
Pinning
TrackerStatusPinning
// The IPFS daemon is currently unpinning the item
Unpinning
TrackerStatusUnpinning
// The IPFS daemon is not pinning the item
Unpinned
TrackerStatusUnpinned
// The IPFS deamon is not pinning the item but it is being tracked
RemotePin
TrackerStatusRemotePin
)
// IPFSStatus represents the status of a tracked Cid in the IPFS daemon
type IPFSStatus int
// TrackerStatus represents the status of a tracked Cid in the PinTracker
type TrackerStatus int
// IPFSPinStatus values
const (
IPFSPinStatusBug = iota
IPFSPinStatusError
IPFSPinStatusDirect
IPFSPinStatusRecursive
IPFSPinStatusIndirect
IPFSPinStatusUnpinned
)
// IPFSPinStatus represents the status of a pin in IPFS (direct, recursive etc.)
type IPFSPinStatus int
// IsPinned returns true if the status is Direct or Recursive
func (ips IPFSPinStatus) IsPinned() bool {
return ips == IPFSPinStatusDirect || ips == IPFSPinStatusRecursive
}
// GlobalPinInfo contains cluster-wide status information about a tracked Cid,
// indexed by cluster member.
type GlobalPinInfo struct {
Cid *cid.Cid
Status map[peer.ID]PinInfo
Cid *cid.Cid
PeerMap map[peer.ID]PinInfo
}
// PinInfo holds information about local pins. PinInfo is
@ -80,32 +98,35 @@ type GlobalPinInfo struct {
type PinInfo struct {
CidStr string
Peer peer.ID
IPFS IPFSStatus
Status TrackerStatus
TS time.Time
Error string
}
// String converts an IPFSStatus into a readable string.
func (st IPFSStatus) String() string {
func (st TrackerStatus) String() string {
switch st {
case Bug:
case TrackerStatusBug:
return "bug"
case ClusterError:
case TrackerStatusClusterError:
return "cluster_error"
case PinError:
case TrackerStatusPinError:
return "pin_error"
case UnpinError:
case TrackerStatusUnpinError:
return "unpin_error"
case Pinned:
case TrackerStatusPinned:
return "pinned"
case Pinning:
case TrackerStatusPinning:
return "pinning"
case Unpinning:
case TrackerStatusUnpinning:
return "unpinning"
case Unpinned:
case TrackerStatusUnpinned:
return "unpinned"
case TrackerStatusRemotePin:
return "remote"
default:
return ""
}
return ""
}
// Component represents a piece of ipfscluster. Cluster components
@ -129,7 +150,8 @@ type IPFSConnector interface {
Component
Pin(*cid.Cid) error
Unpin(*cid.Cid) error
IsPinned(*cid.Cid) (bool, error)
PinLsCid(*cid.Cid) (IPFSPinStatus, error)
PinLs() (map[string]IPFSPinStatus, error)
}
// Peered represents a component which needs to be aware of the peers
@ -165,14 +187,16 @@ type PinTracker interface {
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
// may perform an IPFS unpin operation.
Untrack(*cid.Cid) error
// Status returns the list of pins with their local status.
Status() []PinInfo
// StatusCid returns the local status of a given Cid.
StatusCid(*cid.Cid) PinInfo
// StatusAll returns the list of pins with their local status.
StatusAll() []PinInfo
// Status returns the local status of a given Cid.
Status(*cid.Cid) PinInfo
// SyncAll makes sure that all tracked Cids reflect the real IPFS status.
// It returns the list of pins which were updated by the call.
SyncAll() ([]PinInfo, error)
// Sync makes sure that the Cid status reflect the real IPFS status.
// The return value indicates if the Cid status deserved attention,
// either because its state was updated or because it is in error state.
Sync(*cid.Cid) bool
// It returns the local status of the Cid.
Sync(*cid.Cid) (PinInfo, error)
// Recover retriggers a Pin/Unpin operation in Cids with error status.
Recover(*cid.Cid) error
Recover(*cid.Cid) (PinInfo, error)
}

View File

@ -186,12 +186,12 @@ func TestClustersPin(t *testing.T) {
}
delay()
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.Status()
status := c.tracker.StatusAll()
for _, v := range status {
if v.IPFS != Pinned {
if v.Status != TrackerStatusPinned {
t.Errorf("%s should have been pinned but it is %s",
v.CidStr,
v.IPFS.String())
v.Status.String())
}
}
if l := len(status); l != nPins {
@ -219,7 +219,7 @@ func TestClustersPin(t *testing.T) {
delay()
funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.Status()
status := c.tracker.StatusAll()
if l := len(status); l != 0 {
t.Errorf("Nothing should be pinned")
//t.Errorf("%+v", status)
@ -228,7 +228,7 @@ func TestClustersPin(t *testing.T) {
runF(t, clusters, funpinned)
}
func TestClustersStatus(t *testing.T) {
func TestClustersStatusAll(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(testCid)
@ -236,7 +236,7 @@ func TestClustersStatus(t *testing.T) {
delay()
// Global status
f := func(t *testing.T, c *Cluster) {
statuses, err := c.Status()
statuses, err := c.StatusAll()
if err != nil {
t.Error(err)
}
@ -246,33 +246,33 @@ func TestClustersStatus(t *testing.T) {
if statuses[0].Cid.String() != testCid {
t.Error("bad cid in status")
}
info := statuses[0].Status
info := statuses[0].PeerMap
if len(info) != nClusters {
t.Error("bad info in status")
}
if info[c.host.ID()].IPFS != Pinned {
if info[c.host.ID()].Status != TrackerStatusPinned {
t.Error("the hash should have been pinned")
}
status, err := c.StatusCid(h)
status, err := c.Status(h)
if err != nil {
t.Error(err)
}
pinfo, ok := status.Status[c.host.ID()]
pinfo, ok := status.PeerMap[c.host.ID()]
if !ok {
t.Fatal("Host not in status")
}
if pinfo.IPFS != Pinned {
if pinfo.Status != TrackerStatusPinned {
t.Error("the status should show the hash as pinned")
}
}
runF(t, clusters, f)
}
func TestClustersLocalSync(t *testing.T) {
func TestClustersSyncAllLocal(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(errorCid) // This cid always fails
@ -282,7 +282,7 @@ func TestClustersLocalSync(t *testing.T) {
delay()
f := func(t *testing.T, c *Cluster) {
// Sync bad ID
infos, err := c.LocalSync()
infos, err := c.SyncAllLocal()
if err != nil {
// LocalSync() is asynchronous and should not show an
// error even if Recover() fails.
@ -292,7 +292,7 @@ func TestClustersLocalSync(t *testing.T) {
t.Fatal("expected 1 elem slice")
}
// Last-known state may still be pinning
if infos[0].IPFS != PinError && infos[0].IPFS != Pinning {
if infos[0].Status != TrackerStatusPinError && infos[0].Status != TrackerStatusPinning {
t.Error("element should be in Pinning or PinError state")
}
}
@ -300,7 +300,7 @@ func TestClustersLocalSync(t *testing.T) {
runF(t, clusters, f)
}
func TestClustersLocalSyncCid(t *testing.T) {
func TestClustersSyncLocal(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(errorCid) // This cid always fails
@ -310,21 +310,20 @@ func TestClustersLocalSyncCid(t *testing.T) {
delay()
f := func(t *testing.T, c *Cluster) {
info, err := c.LocalSyncCid(h)
if err == nil {
// LocalSyncCid is synchronous
t.Error("expected an error")
}
if info.IPFS != PinError && info.IPFS != Pinning {
t.Errorf("element is %s and not PinError", info.IPFS)
}
// Sync good ID
info, err = c.LocalSyncCid(h2)
info, err := c.SyncLocal(h)
if err != nil {
t.Error(err)
}
if info.IPFS != Pinned {
if info.Status != TrackerStatusPinError && info.Status != TrackerStatusPinning {
t.Errorf("element is %s and not PinError", info.Status)
}
// Sync good ID
info, err = c.SyncLocal(h2)
if err != nil {
t.Error(err)
}
if info.Status != TrackerStatusPinned {
t.Error("element should be in Pinned state")
}
}
@ -332,7 +331,7 @@ func TestClustersLocalSyncCid(t *testing.T) {
runF(t, clusters, f)
}
func TestClustersGlobalSync(t *testing.T) {
func TestClustersSyncAll(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(errorCid) // This cid always fails
@ -342,7 +341,7 @@ func TestClustersGlobalSync(t *testing.T) {
delay()
j := rand.Intn(nClusters) // choose a random cluster member
ginfos, err := clusters[j].GlobalSync()
ginfos, err := clusters[j].SyncAll()
if err != nil {
t.Fatal(err)
}
@ -353,17 +352,17 @@ func TestClustersGlobalSync(t *testing.T) {
t.Error("expected globalsync to have problems with errorCid")
}
for _, c := range clusters {
inf, ok := ginfos[0].Status[c.host.ID()]
inf, ok := ginfos[0].PeerMap[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.IPFS != PinError && inf.IPFS != Pinning {
if inf.Status != TrackerStatusPinError && inf.Status != TrackerStatusPinning {
t.Error("should be PinError in all members")
}
}
}
func TestClustersGlobalSyncCid(t *testing.T) {
func TestClustersSync(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(errorCid) // This cid always fails
@ -373,13 +372,13 @@ func TestClustersGlobalSyncCid(t *testing.T) {
delay()
j := rand.Intn(nClusters)
ginfo, err := clusters[j].GlobalSyncCid(h)
ginfo, err := clusters[j].Sync(h)
if err != nil {
// we always attempt to return a valid response
// with errors contained in GlobalPinInfo
t.Fatal("did not expect an error")
}
pinfo, ok := ginfo.Status[clusters[j].host.ID()]
pinfo, ok := ginfo.PeerMap[clusters[j].host.ID()]
if !ok {
t.Fatal("should have info for this host")
}
@ -392,20 +391,20 @@ func TestClustersGlobalSyncCid(t *testing.T) {
}
for _, c := range clusters {
inf, ok := ginfo.Status[c.host.ID()]
inf, ok := ginfo.PeerMap[c.host.ID()]
if !ok {
t.Logf("%+v", ginfo)
t.Fatal("GlobalPinInfo should not be empty for this host")
}
if inf.IPFS != PinError && inf.IPFS != Pinning {
if inf.Status != TrackerStatusPinError && inf.Status != TrackerStatusPinning {
t.Error("should be PinError or Pinning in all members")
}
}
// Test with a good Cid
j = rand.Intn(nClusters)
ginfo, err = clusters[j].GlobalSyncCid(h2)
ginfo, err = clusters[j].Sync(h2)
if err != nil {
t.Fatal(err)
}
@ -414,11 +413,101 @@ func TestClustersGlobalSyncCid(t *testing.T) {
}
for _, c := range clusters {
inf, ok := ginfo.Status[c.host.ID()]
inf, ok := ginfo.PeerMap[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.IPFS != Pinned {
if inf.Status != TrackerStatusPinned {
t.Error("the GlobalPinInfo should show Pinned in all members")
}
}
}
func TestClustersRecoverLocal(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(errorCid) // This cid always fails
h2, _ := cid.Decode(testCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
delay()
f := func(t *testing.T, c *Cluster) {
info, err := c.RecoverLocal(h)
if err == nil {
t.Error("expected an error recovering")
}
if info.Status != TrackerStatusPinError {
t.Errorf("element is %s and not PinError", info.Status)
}
// Recover good ID
info, err = c.SyncLocal(h2)
if err != nil {
t.Error(err)
}
if info.Status != TrackerStatusPinned {
t.Error("element should be in Pinned state")
}
}
// Test Local syncs
runF(t, clusters, f)
}
func TestClustersRecover(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(errorCid) // This cid always fails
h2, _ := cid.Decode(testCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
delay()
j := rand.Intn(nClusters)
ginfo, err := clusters[j].Recover(h)
if err != nil {
// we always attempt to return a valid response
// with errors contained in GlobalPinInfo
t.Fatal("did not expect an error")
}
pinfo, ok := ginfo.PeerMap[clusters[j].host.ID()]
if !ok {
t.Fatal("should have info for this host")
}
if pinfo.Error == "" {
t.Error("pinInfo error should not be empty")
}
for _, c := range clusters {
inf, ok := ginfo.PeerMap[c.host.ID()]
if !ok {
t.Logf("%+v", ginfo)
t.Fatal("GlobalPinInfo should not be empty for this host")
}
if inf.Status != TrackerStatusPinError {
t.Error("should be PinError in all members")
}
}
// Test with a good Cid
j = rand.Intn(nClusters)
ginfo, err = clusters[j].Recover(h2)
if err != nil {
t.Fatal(err)
}
if ginfo.Cid.String() != testCid2 {
t.Error("GlobalPinInfo should be for testrCid2")
}
for _, c := range clusters {
inf, ok := ginfo.PeerMap[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.Status != TrackerStatusPinned {
t.Error("the GlobalPinInfo should show Pinned in all members")
}
}

View File

@ -2,6 +2,7 @@ package ipfscluster
import (
"context"
"errors"
"sync"
"time"
@ -18,6 +19,13 @@ var (
UnpinningTimeout = 10 * time.Second
)
var (
errUnpinningTimeout = errors.New("unpinning operation is taking too long")
errPinningTimeout = errors.New("pinning operation is taking too long")
errPinned = errors.New("the item is unexpectedly pinned on IPFS")
errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS")
)
// MapPinTracker is a PinTracker implementation which uses a Go map
// to store the status of the tracked Cids. This component is thread-safe.
type MapPinTracker struct {
@ -83,8 +91,14 @@ func (mpt *MapPinTracker) Shutdown() error {
return nil
}
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s IPFSStatus) {
if s == Unpinned {
func (mpt *MapPinTracker) set(c *cid.Cid, s TrackerStatus) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s TrackerStatus) {
if s == TrackerStatusUnpinned {
delete(mpt.status, c.String())
return
}
@ -93,38 +107,63 @@ func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s IPFSStatus) {
// cid: c,
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: s,
Status: s,
TS: time.Now(),
Error: "",
}
}
func (mpt *MapPinTracker) set(c *cid.Cid, s IPFSStatus) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) unsafeGet(c *cid.Cid) PinInfo {
p, ok := mpt.status[c.String()]
if !ok {
return PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: Unpinned,
TS: time.Now(),
}
}
return p
}
func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
mpt.mux.RLock()
defer mpt.mux.RUnlock()
return mpt.unsafeGet(c)
}
func (mpt *MapPinTracker) unsafeGet(c *cid.Cid) PinInfo {
p, ok := mpt.status[c.String()]
if !ok {
return PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
Status: TrackerStatusUnpinned,
TS: time.Now(),
Error: "",
}
}
return p
}
// sets a Cid in error state
func (mpt *MapPinTracker) setError(c *cid.Cid, err error) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSetError(c, err)
}
func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) {
p := mpt.unsafeGet(c)
switch p.Status {
case TrackerStatusPinned, TrackerStatusPinning, TrackerStatusPinError:
mpt.status[c.String()] = PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
Status: TrackerStatusPinError,
TS: time.Now(),
Error: err.Error(),
}
case TrackerStatusUnpinned, TrackerStatusUnpinning, TrackerStatusUnpinError:
mpt.status[c.String()] = PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
Status: TrackerStatusUnpinError,
TS: time.Now(),
Error: err.Error(),
}
}
}
func (mpt *MapPinTracker) pin(c *cid.Cid) error {
mpt.set(c, Pinning)
mpt.set(c, TrackerStatusPinning)
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSPin",
@ -132,25 +171,25 @@ func (mpt *MapPinTracker) pin(c *cid.Cid) error {
&struct{}{})
if err != nil {
mpt.set(c, PinError)
mpt.setError(c, err)
return err
}
mpt.set(c, Pinned)
mpt.set(c, TrackerStatusPinned)
return nil
}
func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
mpt.set(c, Unpinning)
mpt.set(c, TrackerStatusUnpinning)
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSUnpin",
NewCidArg(c),
&struct{}{})
if err != nil {
mpt.set(c, UnpinError)
mpt.setError(c, err)
return err
}
mpt.set(c, Unpinned)
mpt.set(c, TrackerStatusUnpinned)
return nil
}
@ -168,13 +207,13 @@ func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
// StatusCid returns information for a Cid tracked by this
// MapPinTracker.
func (mpt *MapPinTracker) StatusCid(c *cid.Cid) PinInfo {
func (mpt *MapPinTracker) Status(c *cid.Cid) PinInfo {
return mpt.get(c)
}
// Status returns information for all Cids tracked by this
// MapPinTracker.
func (mpt *MapPinTracker) Status() []PinInfo {
func (mpt *MapPinTracker) StatusAll() []PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
pins := make([]PinInfo, 0, len(mpt.status))
@ -184,97 +223,134 @@ func (mpt *MapPinTracker) Status() []PinInfo {
return pins
}
// Sync verifies that the status of a Cid matches the status
// of it in the IPFS daemon. If not, it will be transitioned
// to Pin or Unpin error. Sync returns true if the status was
// modified or the status is error. Pins in error states can be
// recovered with Recover().
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
var ipfsPinned bool
p := mpt.get(c)
// Sync verifies that the status of a Cid matches that of
// the IPFS daemon. If not, it will be transitioned
// to PinError or UnpinError.
//
// Sync returns the updated local status for the given Cid.
// Pins in error states can be recovered with Recover().
// An error is returned if we are unable to contact
// the IPFS daemon.
func (mpt *MapPinTracker) Sync(c *cid.Cid) (PinInfo, error) {
var ips IPFSPinStatus
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSIsPinned",
"IPFSPinLsCid",
NewCidArg(c),
&ipfsPinned)
&ips)
if err != nil {
switch p.IPFS {
case Pinned, Pinning:
mpt.set(c, PinError)
return true
case Unpinned, Unpinning:
mpt.set(c, UnpinError)
return true
case PinError, UnpinError:
return true
default:
return false
mpt.setError(c, err)
return mpt.get(c), err
}
return mpt.syncStatus(c, ips), nil
}
// SyncAll verifies that the statuses of all tracked Cids match the
// one reported by the IPFS daemon. If not, they will be transitioned
// to PinError or UnpinError.
//
// SyncAll returns the list of local status for all tracked Cids which
// were updated or have errors. Cids in error states can be recovered
// with Recover().
// An error is returned if we are unable to contact the IPFS daemon.
func (mpt *MapPinTracker) SyncAll() ([]PinInfo, error) {
var ipsMap map[string]IPFSPinStatus
var pInfos []PinInfo
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSPinLs",
struct{}{},
&ipsMap)
if err != nil {
mpt.mux.Lock()
for k := range mpt.status {
c, _ := cid.Decode(k)
mpt.unsafeSetError(c, err)
pInfos = append(pInfos, mpt.unsafeGet(c))
}
mpt.mux.Unlock()
return pInfos, err
}
if ipfsPinned {
switch p.IPFS {
case Pinned:
return false
case Pinning, PinError:
mpt.set(c, Pinned)
return true
case Unpinning:
status := mpt.StatusAll()
for _, pInfoOrig := range status {
c, err := cid.Decode(pInfoOrig.CidStr)
if err != nil { // this should not happen but let's play safe
return pInfos, err
}
var pInfoNew PinInfo
ips, ok := ipsMap[pInfoOrig.CidStr]
if !ok {
pInfoNew = mpt.syncStatus(c, IPFSPinStatusUnpinned)
} else {
pInfoNew = mpt.syncStatus(c, ips)
}
if pInfoOrig.Status != pInfoNew.Status ||
pInfoNew.Status == TrackerStatusUnpinError ||
pInfoNew.Status == TrackerStatusPinError {
pInfos = append(pInfos, pInfoNew)
}
}
return pInfos, nil
}
func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips IPFSPinStatus) PinInfo {
p := mpt.get(c)
if ips.IsPinned() {
switch p.Status {
case TrackerStatusPinned: // nothing
case TrackerStatusPinning, TrackerStatusPinError:
mpt.set(c, TrackerStatusPinned)
case TrackerStatusUnpinning:
if time.Since(p.TS) > UnpinningTimeout {
mpt.set(c, UnpinError)
return true
mpt.setError(c, errUnpinningTimeout)
}
return false
case Unpinned, UnpinError:
mpt.set(c, UnpinError)
return true
case TrackerStatusUnpinned:
mpt.setError(c, errPinned)
case TrackerStatusUnpinError: // nothing, keep error as it was
default:
return false
}
} else {
switch p.IPFS {
case Pinned, PinError:
mpt.set(c, PinError)
return true
case Pinning:
switch p.Status {
case TrackerStatusPinned:
mpt.setError(c, errUnpinned)
case TrackerStatusPinError: // nothing, keep error as it was
case TrackerStatusPinning:
if time.Since(p.TS) > PinningTimeout {
mpt.set(c, PinError)
return true
mpt.setError(c, errPinningTimeout)
}
return false
case Unpinning, UnpinError:
mpt.set(c, Unpinned)
return true
case Unpinned:
return false
case TrackerStatusUnpinning, TrackerStatusUnpinError:
mpt.set(c, TrackerStatusUnpinned)
case TrackerStatusUnpinned: // nothing
default:
return false
}
}
return mpt.get(c)
}
// Recover will re-track or re-untrack a Cid in error state,
// possibly retriggering an IPFS pinning operation and returning
// only when it is done.
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
func (mpt *MapPinTracker) Recover(c *cid.Cid) (PinInfo, error) {
p := mpt.get(c)
if p.IPFS != PinError && p.IPFS != UnpinError {
return nil
if p.Status != TrackerStatusPinError &&
p.Status != TrackerStatusUnpinError {
return p, nil
}
logger.Infof("Recovering %s", c)
var err error
if p.IPFS == PinError {
switch p.Status {
case TrackerStatusPinError:
err = mpt.Track(c)
}
if p.IPFS == UnpinError {
case TrackerStatusUnpinError:
err = mpt.Untrack(c)
}
if err != nil {
logger.Errorf("error recovering %s: %s", c, err)
return err
}
return nil
return mpt.get(c), err
}
// SetClient makes the MapPinTracker ready to perform RPC requests to

View File

@ -79,13 +79,13 @@ type unpinResp struct {
}
type statusInfo struct {
IPFS string `json:"ipfs"`
Error string `json:"error,omitempty"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
}
type statusCidResp struct {
Cid string `json:"cid"`
Status map[string]statusInfo `json:"status"`
Cid string `json:"cid"`
PeerMap map[string]statusInfo `json:"peer_map"`
}
type idResp struct {
@ -163,23 +163,45 @@ func (api *RESTAPI) routes() []route {
"/id",
api.idHandler,
},
{
"Version",
"GET",
"/version",
api.versionHandler,
},
{
"Members",
"GET",
"/members",
api.memberListHandler,
},
{
"Pins",
"GET",
"/pins",
"/pinlist",
api.pinListHandler,
},
{
"Version",
"StatusAll",
"GET",
"/version",
api.versionHandler,
"/pins",
api.statusAllHandler,
},
{
"SyncAll",
"POST",
"/pins/sync",
api.syncAllHandler,
},
{
"Status",
"GET",
"/pins/{hash}",
api.statusHandler,
},
{
"Pin",
@ -193,29 +215,17 @@ func (api *RESTAPI) routes() []route {
"/pins/{hash}",
api.unpinHandler,
},
{
"Status",
"GET",
"/status",
api.statusHandler,
},
{
"StatusCid",
"GET",
"/status/{hash}",
api.statusCidHandler,
},
{
"Sync",
"POST",
"/status",
"/pins/{hash}/sync",
api.syncHandler,
},
{
"SyncCid",
"Recover",
"POST",
"/status/{hash}",
api.syncCidHandler,
"/pins/{hash}/recover",
api.recoverHandler,
},
}
}
@ -367,11 +377,11 @@ func (api *RESTAPI) pinListHandler(w http.ResponseWriter, r *http.Request) {
}
func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
func (api *RESTAPI) statusAllHandler(w http.ResponseWriter, r *http.Request) {
var pinInfos []GlobalPinInfo
err := api.rpcClient.Call("",
"Cluster",
"Status",
"StatusAll",
struct{}{},
&pinInfos)
if checkRPCErr(w, err) {
@ -379,12 +389,12 @@ func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c != nil {
var pinInfo GlobalPinInfo
err := api.rpcClient.Call("",
"Cluster",
"StatusCid",
"Status",
c,
&pinInfo)
if checkRPCErr(w, err) {
@ -393,11 +403,11 @@ func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
func (api *RESTAPI) syncAllHandler(w http.ResponseWriter, r *http.Request) {
var pinInfos []GlobalPinInfo
err := api.rpcClient.Call("",
"Cluster",
"GlobalSync",
"SyncAll",
struct{}{},
&pinInfos)
if checkRPCErr(w, err) {
@ -405,12 +415,26 @@ func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (api *RESTAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) {
func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c != nil {
var pinInfo GlobalPinInfo
err := api.rpcClient.Call("",
"Cluster",
"GlobalSyncCid",
"Sync",
c,
&pinInfo)
if checkRPCErr(w, err) {
sendStatusCidResponse(w, http.StatusOK, pinInfo)
}
}
}
func (api *RESTAPI) recoverHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c != nil {
var pinInfo GlobalPinInfo
err := api.rpcClient.Call("",
"Cluster",
"Recover",
c,
&pinInfo)
if checkRPCErr(w, err) {
@ -465,11 +489,11 @@ func sendErrorResponse(w http.ResponseWriter, code int, msg string) {
func transformPinToStatusCid(p GlobalPinInfo) statusCidResp {
s := statusCidResp{}
s.Cid = p.Cid.String()
s.Status = make(map[string]statusInfo)
for k, v := range p.Status {
s.Status[k.Pretty()] = statusInfo{
IPFS: v.IPFS.String(),
Error: v.Error,
s.PeerMap = make(map[string]statusInfo)
for k, v := range p.PeerMap {
s.PeerMap[k.Pretty()] = statusInfo{
Status: v.Status.String(),
Error: v.Error,
}
}
return s

View File

@ -147,7 +147,7 @@ func TestRESTAPIPinListEndpoint(t *testing.T) {
defer api.Shutdown()
var resp []string
makeGet(t, "/pins", &resp)
makeGet(t, "/pinlist", &resp)
if len(resp) != 3 ||
resp[0] != testCid1 || resp[1] != testCid2 ||
resp[2] != testCid3 {
@ -155,67 +155,86 @@ func TestRESTAPIPinListEndpoint(t *testing.T) {
}
}
func TestRESTAPIStatusAllEndpoint(t *testing.T) {
api := testRESTAPI(t)
defer api.Shutdown()
var resp statusResp
makeGet(t, "/pins", &resp)
if len(resp) != 3 ||
resp[0].Cid != testCid1 ||
resp[1].PeerMap[testPeerID.Pretty()].Status != "pinning" {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
}
func TestRESTAPIStatusEndpoint(t *testing.T) {
api := testRESTAPI(t)
defer api.Shutdown()
var resp statusResp
makeGet(t, "/status", &resp)
if len(resp) != 3 ||
resp[0].Cid != testCid1 ||
resp[1].Status[testPeerID.Pretty()].IPFS != "pinning" {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
}
func TestRESTAPIStatusCidEndpoint(t *testing.T) {
api := testRESTAPI(t)
defer api.Shutdown()
var resp statusCidResp
makeGet(t, "/status/"+testCid, &resp)
makeGet(t, "/pins/"+testCid, &resp)
if resp.Cid != testCid {
t.Error("expected the same cid")
}
info, ok := resp.Status[testPeerID.Pretty()]
info, ok := resp.PeerMap[testPeerID.Pretty()]
if !ok {
t.Fatal("expected info for testPeerID")
}
if info.IPFS != "pinned" {
if info.Status != "pinned" {
t.Error("expected different status")
}
}
func TestRESTAPIStatusSyncEndpoint(t *testing.T) {
func TestRESTAPISyncAllEndpoint(t *testing.T) {
api := testRESTAPI(t)
defer api.Shutdown()
var resp statusResp
makePost(t, "/status", &resp)
makePost(t, "/pins/sync", &resp)
if len(resp) != 3 ||
resp[0].Cid != testCid1 ||
resp[1].Status[testPeerID.Pretty()].IPFS != "pinning" {
resp[1].PeerMap[testPeerID.Pretty()].Status != "pinning" {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
}
func TestRESTAPIStatusSyncCidEndpoint(t *testing.T) {
func TestRESTAPISyncEndpoint(t *testing.T) {
api := testRESTAPI(t)
defer api.Shutdown()
var resp statusCidResp
makePost(t, "/status/"+testCid, &resp)
makePost(t, "/pins/"+testCid+"/sync", &resp)
if resp.Cid != testCid {
t.Error("expected the same cid")
}
info, ok := resp.Status[testPeerID.Pretty()]
info, ok := resp.PeerMap[testPeerID.Pretty()]
if !ok {
t.Fatal("expected info for testPeerID")
}
if info.IPFS != "pinned" {
if info.Status != "pinned" {
t.Error("expected different status")
}
}
func TestRESTAPIRecoverEndpoint(t *testing.T) {
api := testRESTAPI(t)
defer api.Shutdown()
var resp statusCidResp
makePost(t, "/pins/"+testCid+"/recover", &resp)
if resp.Cid != testCid {
t.Error("expected the same cid")
}
info, ok := resp.PeerMap[testPeerID.Pretty()]
if !ok {
t.Fatal("expected info for testPeerID")
}
if info.Status != "pinned" {
t.Error("expected different status")
}
}

View File

@ -44,6 +44,7 @@ func (arg *CidArg) CID() (*cid.Cid, error) {
Cluster components methods
*/
// ID runs Cluster.ID()
func (api *RPCAPI) ID(in struct{}, out *ID) error {
*out = api.cluster.ID()
return nil
@ -90,56 +91,56 @@ func (api *RPCAPI) MemberList(in struct{}, out *[]peer.ID) error {
return nil
}
// StatusAll runs Cluster.StatusAll().
func (api *RPCAPI) StatusAll(in struct{}, out *[]GlobalPinInfo) error {
pinfo, err := api.cluster.StatusAll()
*out = pinfo
return err
}
// Status runs Cluster.Status().
func (api *RPCAPI) Status(in struct{}, out *[]GlobalPinInfo) error {
pinfo, err := api.cluster.Status()
*out = pinfo
return err
}
// StatusCid runs Cluster.StatusCid().
func (api *RPCAPI) StatusCid(in *CidArg, out *GlobalPinInfo) error {
func (api *RPCAPI) Status(in *CidArg, out *GlobalPinInfo) error {
c, err := in.CID()
if err != nil {
return err
}
pinfo, err := api.cluster.StatusCid(c)
pinfo, err := api.cluster.Status(c)
*out = pinfo
return err
}
// LocalSync runs Cluster.LocalSync().
func (api *RPCAPI) LocalSync(in struct{}, out *[]PinInfo) error {
pinfo, err := api.cluster.LocalSync()
// SyncAllLocal runs Cluster.SyncAllLocal().
func (api *RPCAPI) SyncAllLocal(in struct{}, out *[]PinInfo) error {
pinfo, err := api.cluster.SyncAllLocal()
*out = pinfo
return err
}
// LocalSyncCid runs Cluster.LocalSyncCid().
func (api *RPCAPI) LocalSyncCid(in *CidArg, out *PinInfo) error {
// SyncLocal runs Cluster.SyncLocal().
func (api *RPCAPI) SyncLocal(in *CidArg, out *PinInfo) error {
c, err := in.CID()
if err != nil {
return err
}
pinfo, err := api.cluster.LocalSyncCid(c)
pinfo, err := api.cluster.SyncLocal(c)
*out = pinfo
return err
}
// GlobalSync runs Cluster.GlobalSync().
func (api *RPCAPI) GlobalSync(in struct{}, out *[]GlobalPinInfo) error {
pinfo, err := api.cluster.GlobalSync()
// SyncAll runs Cluster.SyncAll().
func (api *RPCAPI) SyncAll(in struct{}, out *[]GlobalPinInfo) error {
pinfo, err := api.cluster.SyncAll()
*out = pinfo
return err
}
// GlobalSyncCid runs Cluster.GlobalSyncCid().
func (api *RPCAPI) GlobalSyncCid(in *CidArg, out *GlobalPinInfo) error {
// Sync runs Cluster.Sync().
func (api *RPCAPI) Sync(in *CidArg, out *GlobalPinInfo) error {
c, err := in.CID()
if err != nil {
return err
}
pinfo, err := api.cluster.GlobalSyncCid(c)
pinfo, err := api.cluster.Sync(c)
*out = pinfo
return err
}
@ -151,6 +152,17 @@ func (api *RPCAPI) StateSync(in struct{}, out *[]PinInfo) error {
return err
}
// Recover runs Cluster.Recover().
func (api *RPCAPI) Recover(in *CidArg, out *GlobalPinInfo) error {
c, err := in.CID()
if err != nil {
return err
}
pinfo, err := api.cluster.Recover(c)
*out = pinfo
return err
}
/*
Tracker component methods
*/
@ -173,24 +185,33 @@ func (api *RPCAPI) Untrack(in *CidArg, out *struct{}) error {
return api.cluster.tracker.Untrack(c)
}
// TrackerStatus runs PinTracker.Status().
func (api *RPCAPI) TrackerStatus(in struct{}, out *[]PinInfo) error {
*out = api.cluster.tracker.Status()
// TrackerStatusAll runs PinTracker.StatusAll().
func (api *RPCAPI) TrackerStatusAll(in struct{}, out *[]PinInfo) error {
*out = api.cluster.tracker.StatusAll()
return nil
}
// TrackerStatusCid runs PinTracker.StatusCid().
func (api *RPCAPI) TrackerStatusCid(in *CidArg, out *PinInfo) error {
// TrackerStatus runs PinTracker.Status().
func (api *RPCAPI) TrackerStatus(in *CidArg, out *PinInfo) error {
c, err := in.CID()
if err != nil {
return err
}
pinfo := api.cluster.tracker.StatusCid(c)
pinfo := api.cluster.tracker.Status(c)
*out = pinfo
return nil
}
// TrackerRecover not sure if needed
// TrackerRecover runs PinTracker.Recover().
func (api *RPCAPI) TrackerRecover(in *CidArg, out *PinInfo) error {
c, err := in.CID()
if err != nil {
return err
}
pinfo, err := api.cluster.tracker.Recover(c)
*out = pinfo
return err
}
/*
IPFS Connector component methods
@ -214,17 +235,24 @@ func (api *RPCAPI) IPFSUnpin(in *CidArg, out *struct{}) error {
return api.cluster.ipfs.Unpin(c)
}
// IPFSIsPinned runs IPFSConnector.IsPinned().
func (api *RPCAPI) IPFSIsPinned(in *CidArg, out *bool) error {
// IPFSPinLsCid runs IPFSConnector.PinLsCid().
func (api *RPCAPI) IPFSPinLsCid(in *CidArg, out *IPFSPinStatus) error {
c, err := in.CID()
if err != nil {
return err
}
b, err := api.cluster.ipfs.IsPinned(c)
b, err := api.cluster.ipfs.PinLsCid(c)
*out = b
return err
}
// IPFSPinLs runs IPFSConnector.PinLs().
func (api *RPCAPI) IPFSPinLs(in struct{}, out *map[string]IPFSPinStatus) error {
m, err := api.cluster.ipfs.PinLs()
*out = m
return err
}
/*
Consensus component methods
*/

View File

@ -66,40 +66,40 @@ func (mock *mockService) MemberList(in struct{}, out *[]peer.ID) error {
return nil
}
func (mock *mockService) Status(in struct{}, out *[]GlobalPinInfo) error {
func (mock *mockService) StatusAll(in struct{}, out *[]GlobalPinInfo) error {
c1, _ := cid.Decode(testCid1)
c2, _ := cid.Decode(testCid2)
c3, _ := cid.Decode(testCid3)
*out = []GlobalPinInfo{
{
Cid: c1,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid1,
Peer: testPeerID,
IPFS: Pinned,
Status: TrackerStatusPinned,
TS: time.Now(),
},
},
},
{
Cid: c2,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid2,
Peer: testPeerID,
IPFS: Pinning,
Status: TrackerStatusPinning,
TS: time.Now(),
},
},
},
{
Cid: c3,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid3,
Peer: testPeerID,
IPFS: PinError,
Status: TrackerStatusPinError,
TS: time.Now(),
},
},
@ -108,18 +108,18 @@ func (mock *mockService) Status(in struct{}, out *[]GlobalPinInfo) error {
return nil
}
func (mock *mockService) StatusCid(in *CidArg, out *GlobalPinInfo) error {
func (mock *mockService) Status(in *CidArg, out *GlobalPinInfo) error {
if in.Cid == errorCid {
return errBadCid
}
c1, _ := cid.Decode(testCid1)
*out = GlobalPinInfo{
Cid: c1,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid1,
Peer: testPeerID,
IPFS: Pinned,
Status: TrackerStatusPinned,
TS: time.Now(),
},
},
@ -127,12 +127,12 @@ func (mock *mockService) StatusCid(in *CidArg, out *GlobalPinInfo) error {
return nil
}
func (mock *mockService) GlobalSync(in struct{}, out *[]GlobalPinInfo) error {
return mock.Status(in, out)
func (mock *mockService) SyncAll(in struct{}, out *[]GlobalPinInfo) error {
return mock.StatusAll(in, out)
}
func (mock *mockService) GlobalSyncCid(in *CidArg, out *GlobalPinInfo) error {
return mock.StatusCid(in, out)
func (mock *mockService) Sync(in *CidArg, out *GlobalPinInfo) error {
return mock.Status(in, out)
}
func (mock *mockService) StateSync(in struct{}, out *[]PinInfo) error {
@ -140,6 +140,10 @@ func (mock *mockService) StateSync(in struct{}, out *[]PinInfo) error {
return nil
}
func (mock *mockService) Recover(in *CidArg, out *GlobalPinInfo) error {
return mock.Status(in, out)
}
func (mock *mockService) Track(in *CidArg, out *struct{}) error {
return nil
}