Issue 8: Make SyncAll efficient with a single PinLs call.

This has implied changes to the PinTracker API, to the IPFSConnector API and
a few renames on some PinTracker related constants.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-01-25 18:07:19 +01:00
parent d8d4a0094a
commit 58702d04bc
11 changed files with 405 additions and 240 deletions

View File

@ -40,6 +40,7 @@ type Cluster struct {
wg sync.WaitGroup
}
// ID holds information about the Cluster peer
type ID struct {
ID peer.ID
PublicKey crypto.PubKey
@ -137,6 +138,7 @@ func (c *Cluster) Shutdown() error {
return nil
}
// ID returns information about the Cluster peer
func (c *Cluster) ID() ID {
return ID{
ID: c.host.ID(),
@ -163,7 +165,7 @@ func (c *Cluster) StateSync() ([]PinInfo, error) {
// Track items which are not tracked
for _, h := range clusterPins {
if c.tracker.StatusCid(h).IPFS == Unpinned {
if c.tracker.StatusCid(h).Status == TrackerStatusUnpinned {
changed = append(changed, h)
err := c.rpcClient.Go("",
"Cluster",
@ -219,33 +221,30 @@ func (c *Cluster) StatusCid(h *cid.Cid) (GlobalPinInfo, error) {
// on all items that need it. Returns PinInfo for items changed on Sync().
//
// LocalSync triggers recoveries asynchronously, and will not wait for
// them to fail or succeed before returning.
// them to fail or succeed before returning. The PinInfo may not reflect
// the recovery attempt.
func (c *Cluster) LocalSync() ([]PinInfo, error) {
status := c.tracker.Status()
var toRecover []*cid.Cid
for _, p := range status {
h, _ := cid.Decode(p.CidStr)
modified := c.tracker.Sync(h)
if modified {
toRecover = append(toRecover, h)
}
syncedItems, err := c.tracker.Sync()
// Despite errors, tracker provides synced items that we can work with.
// However we skip recover() on those cases, as probably the ipfs daemon
// is gone.
if err != nil {
logger.Error("tracker.Sync() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
logger.Error("LocalSync returning without attempting recovers")
return syncedItems, nil
}
logger.Infof("%d items to recover after sync", len(toRecover))
for i, h := range toRecover {
logger.Infof("recovering in progress for %s (%d/%d",
h, i, len(toRecover))
// FIXME: at this point, recover probably deserves it's own api endpoint
// or be optional or be synchronous.
logger.Infof("%d items changed on sync", len(syncedItems))
for _, pInfo := range syncedItems {
pCid, _ := cid.Decode(pInfo.CidStr)
go func(h *cid.Cid) {
c.tracker.Recover(h)
}(h)
}(pCid)
}
var changed []PinInfo
for _, h := range toRecover {
changed = append(changed, c.tracker.StatusCid(h))
}
return changed, nil
return syncedItems, nil
}
// LocalSyncCid performs a Tracker.Sync() operation followed by a
@ -255,10 +254,16 @@ func (c *Cluster) LocalSync() ([]PinInfo, error) {
// returning.
func (c *Cluster) LocalSyncCid(h *cid.Cid) (PinInfo, error) {
var err error
if c.tracker.Sync(h) {
err = c.tracker.Recover(h)
pInfo, err := c.tracker.SyncCid(h)
// Despite errors, trackers provides an updated PinInfo so
// we just log it.
if err != nil {
logger.Error("tracker.SyncCid() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
return pInfo, nil
}
return c.tracker.StatusCid(h), err
c.tracker.Recover(h)
return c.tracker.StatusCid(h), nil
}
// GlobalSync triggers Sync() operations in all members of the Cluster.
@ -422,8 +427,8 @@ func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args inte
func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, error) {
pin := GlobalPinInfo{
Cid: h,
Status: make(map[peer.ID]PinInfo),
Cid: h,
PeerMap: make(map[peer.ID]PinInfo),
}
members := c.Members()
@ -436,13 +441,13 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er
errs := c.multiRPC(members, "Cluster", method, args, ifaceReplies)
for i, r := range replies {
if e := errs[i]; e != nil {
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
if r.IPFS == Bug {
if r.Status == TrackerStatusBug {
r = PinInfo{
CidStr: h.String(),
Peer: members[i],
IPFS: ClusterError,
Status: TrackerStatusClusterError,
TS: time.Now(),
Error: e.Error(),
}
@ -450,7 +455,7 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er
r.Error = e.Error()
}
}
pin.Status[members[i]] = r
pin.PeerMap[members[i]] = r
}
return pin, nil
@ -476,19 +481,19 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
if !ok {
fullMap[p.CidStr] = GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
p.Peer: p,
},
}
} else {
item.Status[p.Peer] = p
item.PeerMap[p.Peer] = p
}
}
}
erroredPeers := make(map[peer.ID]string)
for i, r := range replies {
if e := errs[i]; e != nil {
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
erroredPeers[members[i]] = e.Error()
} else {
@ -499,10 +504,10 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
// Merge any errors
for p, msg := range erroredPeers {
for c := range fullMap {
fullMap[c].Status[p] = PinInfo{
fullMap[c].PeerMap[p] = PinInfo{
CidStr: c,
Peer: p,
IPFS: ClusterError,
Status: TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
}

View File

@ -44,11 +44,19 @@ func (ipfs *mockConnector) Unpin(c *cid.Cid) error {
return nil
}
func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) {
func (ipfs *mockConnector) PinLsCid(c *cid.Cid) (IPFSPinStatus, error) {
if ipfs.returnError {
return false, errors.New("")
return IPFSPinStatusError, errors.New("")
}
return true, nil
return IPFSPinStatusRecursive, nil
}
func (ipfs *mockConnector) PinLs() (map[string]IPFSPinStatus, error) {
if ipfs.returnError {
return nil, errors.New("")
}
m := make(map[string]IPFSPinStatus)
return m, nil
}
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *MapState, *MapPinTracker) {

View File

@ -65,6 +65,12 @@ type ipfsError struct {
Message string
}
type pinLsResp struct {
Keys map[string]struct {
Type string
}
}
// NewIPFSHTTPConnector creates the component and leaves it ready to be started
func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
ctx := context.Background()
@ -225,11 +231,11 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error {
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
pinned, err := ipfs.IsPinned(hash)
pinStatus, err := ipfs.PinLsCid(hash)
if err != nil {
return err
}
if !pinned {
if !pinStatus.IsPinned() {
path := fmt.Sprintf("pin/add?arg=%s", hash)
_, err = ipfs.get(path)
if err == nil {
@ -244,11 +250,11 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
pinned, err := ipfs.IsPinned(hash)
pinStatus, err := ipfs.PinLsCid(hash)
if err != nil {
return err
}
if pinned {
if pinStatus.IsPinned() {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.get(path)
if err == nil {
@ -261,58 +267,73 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
return nil
}
// IsPinned performs a "pin ls" request against the configured IPFS
// daemon. It returns true when the given Cid is pinned not indirectly.
func (ipfs *IPFSHTTPConnector) IsPinned(hash *cid.Cid) (bool, error) {
pinType, err := ipfs.pinType(hash)
if err != nil {
return false, err
func parseIPFSPinType(t string) IPFSPinStatus {
switch {
case t == "indirect":
return IPFSPinStatusIndirect
case t == "direct":
return IPFSPinStatusDirect
case t == "recursive":
return IPFSPinStatusRecursive
default:
return IPFSPinStatusBug
}
if pinType == "unpinned" || strings.Contains(pinType, "indirect") {
return false, nil
}
return true, nil
}
// pinType performs a pin ls request and returns the information associated
// to the key. Unfortunately, the daemon does not provide an standarized
// output, so it may well be a sentence like "$hash is indirectly pinned through
// $otherhash".
func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
// PinLs performs a "pin ls" request against the configured IPFS daemon and
// returns a map of cid strings and their status.
func (ipfs *IPFSHTTPConnector) PinLs() (map[string]IPFSPinStatus, error) {
body, err := ipfs.get("pin/ls")
// Some error talking to the daemon
if err != nil {
return nil, err
}
var resp pinLsResp
err = json.Unmarshal(body, &resp)
if err != nil {
logger.Error("parsing pin/ls response")
logger.Error(string(body))
return nil, err
}
statusMap := make(map[string]IPFSPinStatus)
for k, v := range resp.Keys {
statusMap[k] = parseIPFSPinType(v.Type)
}
return statusMap, nil
}
// PinLsCid performs a "pin ls <hash> "request and returns IPFSPinStatus for
// that hash.
func (ipfs *IPFSHTTPConnector) PinLsCid(hash *cid.Cid) (IPFSPinStatus, error) {
lsPath := fmt.Sprintf("pin/ls?arg=%s", hash)
body, err := ipfs.get(lsPath)
// Network error, daemon down
if body == nil && err != nil {
return "", err
return IPFSPinStatusError, err
}
// Pin not found likely here
if err != nil { // Not pinned
return "unpinned", nil
}
// What type of pin it is
var resp struct {
Keys map[string]struct {
Type string
}
return IPFSPinStatusUnpinned, nil
}
var resp pinLsResp
err = json.Unmarshal(body, &resp)
if err != nil {
logger.Error("parsing pin/ls response:")
logger.Error("parsing pin/ls?arg=cid response:")
logger.Error(string(body))
return "", err
return IPFSPinStatusError, err
}
pinObj, ok := resp.Keys[hash.String()]
if !ok {
return "", errors.New("expected to find the pin in the response")
return IPFSPinStatusError, errors.New("expected to find the pin in the response")
}
pinType := pinObj.Type
logger.Debug("pinType check: ", pinType)
return pinType, nil
return parseIPFSPinType(pinObj.Type), nil
}
// get performs the heavy lifting of a get request against

View File

@ -43,11 +43,11 @@ func TestIPFSPin(t *testing.T) {
if err != nil {
t.Error("expected success pinning cid")
}
yes, err := ipfs.IsPinned(c)
pinSt, err := ipfs.PinLsCid(c)
if err != nil {
t.Fatal("expected success doing ls")
}
if !yes {
if !pinSt.IsPinned() {
t.Error("cid should have been pinned")
}
@ -74,7 +74,7 @@ func TestIPFSUnpin(t *testing.T) {
}
}
func TestIPFSIsPinned(t *testing.T) {
func TestIPFSPinLsCid(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
@ -82,17 +82,40 @@ func TestIPFSIsPinned(t *testing.T) {
c2, _ := cid.Decode(testCid2)
ipfs.Pin(c)
isp, err := ipfs.IsPinned(c)
if err != nil || !isp {
ips, err := ipfs.PinLsCid(c)
if err != nil || !ips.IsPinned() {
t.Error("c should appear pinned")
}
isp, err = ipfs.IsPinned(c2)
if err != nil || isp {
ips, err = ipfs.PinLsCid(c2)
if err != nil || ips != IPFSPinStatusUnpinned {
t.Error("c2 should appear unpinned")
}
}
func TestIPFSPinLs(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
c, _ := cid.Decode(testCid)
c2, _ := cid.Decode(testCid2)
ipfs.Pin(c)
ipfs.Pin(c2)
ipsMap, err := ipfs.PinLs()
if err != nil {
t.Error("should not error")
}
if len(ipsMap) != 2 {
t.Fatal("the map does not contain expected keys")
}
if !ipsMap[testCid].IsPinned() || !ipsMap[testCid2].IsPinned() {
t.Error("c1 and c2 should appear pinned")
}
}
func TestIPFSProxy(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()

View File

@ -42,36 +42,54 @@ func SetLogLevel(l string) {
//logging.SetLogLevel("libp2p-raft", l)
}
// IPFSStatus values
// TrackerStatus values
const (
// IPFSStatus should never take this value
Bug = iota
TrackerStatusBug = iota
// The cluster node is offline or not responding
ClusterError
TrackerStatusClusterError
// An error occurred pinning
PinError
TrackerStatusPinError
// An error occurred unpinning
UnpinError
TrackerStatusUnpinError
// The IPFS daemon has pinned the item
Pinned
TrackerStatusPinned
// The IPFS daemon is currently pinning the item
Pinning
TrackerStatusPinning
// The IPFS daemon is currently unpinning the item
Unpinning
TrackerStatusUnpinning
// The IPFS daemon is not pinning the item
Unpinned
TrackerStatusUnpinned
// The IPFS deamon is not pinning the item but it is being tracked
RemotePin
TrackerStatusRemotePin
)
// IPFSStatus represents the status of a tracked Cid in the IPFS daemon
type IPFSStatus int
// TrackerStatus represents the status of a tracked Cid in the PinTracker
type TrackerStatus int
// IPFSPinStatus values
const (
IPFSPinStatusBug = iota
IPFSPinStatusError
IPFSPinStatusDirect
IPFSPinStatusRecursive
IPFSPinStatusIndirect
IPFSPinStatusUnpinned
)
// IPFSPinStatus represents the status of a pin in IPFS (direct, recursive etc.)
type IPFSPinStatus int
// IsPinned returns true if the status is Direct or Recursive
func (ips IPFSPinStatus) IsPinned() bool {
return ips == IPFSPinStatusDirect || ips == IPFSPinStatusRecursive
}
// GlobalPinInfo contains cluster-wide status information about a tracked Cid,
// indexed by cluster member.
type GlobalPinInfo struct {
Cid *cid.Cid
Status map[peer.ID]PinInfo
Cid *cid.Cid
PeerMap map[peer.ID]PinInfo
}
// PinInfo holds information about local pins. PinInfo is
@ -80,32 +98,35 @@ type GlobalPinInfo struct {
type PinInfo struct {
CidStr string
Peer peer.ID
IPFS IPFSStatus
Status TrackerStatus
TS time.Time
Error string
}
// String converts an IPFSStatus into a readable string.
func (st IPFSStatus) String() string {
func (st TrackerStatus) String() string {
switch st {
case Bug:
case TrackerStatusBug:
return "bug"
case ClusterError:
case TrackerStatusClusterError:
return "cluster_error"
case PinError:
case TrackerStatusPinError:
return "pin_error"
case UnpinError:
case TrackerStatusUnpinError:
return "unpin_error"
case Pinned:
case TrackerStatusPinned:
return "pinned"
case Pinning:
case TrackerStatusPinning:
return "pinning"
case Unpinning:
case TrackerStatusUnpinning:
return "unpinning"
case Unpinned:
case TrackerStatusUnpinned:
return "unpinned"
case TrackerStatusRemotePin:
return "remote"
default:
return ""
}
return ""
}
// Component represents a piece of ipfscluster. Cluster components
@ -129,7 +150,8 @@ type IPFSConnector interface {
Component
Pin(*cid.Cid) error
Unpin(*cid.Cid) error
IsPinned(*cid.Cid) (bool, error)
PinLsCid(*cid.Cid) (IPFSPinStatus, error)
PinLs() (map[string]IPFSPinStatus, error)
}
// Peered represents a component which needs to be aware of the peers
@ -169,10 +191,12 @@ type PinTracker interface {
Status() []PinInfo
// StatusCid returns the local status of a given Cid.
StatusCid(*cid.Cid) PinInfo
// Sync makes sure that the Cid status reflect the real IPFS status.
// The return value indicates if the Cid status deserved attention,
// either because its state was updated or because it is in error state.
Sync(*cid.Cid) bool
// Sync makes sure that all tracked Cids reflect the real IPFS status.
// It returns the list of pins which were updated by the call.
Sync() ([]PinInfo, error)
// SyncCid makes sure that the Cid status reflect the real IPFS status.
// It return the local status of the Cid.
SyncCid(*cid.Cid) (PinInfo, error)
// Recover retriggers a Pin/Unpin operation in Cids with error status.
Recover(*cid.Cid) error
}

View File

@ -188,10 +188,10 @@ func TestClustersPin(t *testing.T) {
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.Status()
for _, v := range status {
if v.IPFS != Pinned {
if v.Status != TrackerStatusPinned {
t.Errorf("%s should have been pinned but it is %s",
v.CidStr,
v.IPFS.String())
v.Status.String())
}
}
if l := len(status); l != nPins {
@ -246,12 +246,12 @@ func TestClustersStatus(t *testing.T) {
if statuses[0].Cid.String() != testCid {
t.Error("bad cid in status")
}
info := statuses[0].Status
info := statuses[0].PeerMap
if len(info) != nClusters {
t.Error("bad info in status")
}
if info[c.host.ID()].IPFS != Pinned {
if info[c.host.ID()].Status != TrackerStatusPinned {
t.Error("the hash should have been pinned")
}
@ -260,12 +260,12 @@ func TestClustersStatus(t *testing.T) {
t.Error(err)
}
pinfo, ok := status.Status[c.host.ID()]
pinfo, ok := status.PeerMap[c.host.ID()]
if !ok {
t.Fatal("Host not in status")
}
if pinfo.IPFS != Pinned {
if pinfo.Status != TrackerStatusPinned {
t.Error("the status should show the hash as pinned")
}
}
@ -292,7 +292,7 @@ func TestClustersLocalSync(t *testing.T) {
t.Fatal("expected 1 elem slice")
}
// Last-known state may still be pinning
if infos[0].IPFS != PinError && infos[0].IPFS != Pinning {
if infos[0].Status != TrackerStatusPinError && infos[0].Status != TrackerStatusPinning {
t.Error("element should be in Pinning or PinError state")
}
}
@ -311,12 +311,11 @@ func TestClustersLocalSyncCid(t *testing.T) {
f := func(t *testing.T, c *Cluster) {
info, err := c.LocalSyncCid(h)
if err == nil {
// LocalSyncCid is synchronous
t.Error("expected an error")
if err != nil {
t.Error(err)
}
if info.IPFS != PinError && info.IPFS != Pinning {
t.Errorf("element is %s and not PinError", info.IPFS)
if info.Status != TrackerStatusPinError && info.Status != TrackerStatusPinning {
t.Errorf("element is %s and not PinError", info.Status)
}
// Sync good ID
@ -324,7 +323,7 @@ func TestClustersLocalSyncCid(t *testing.T) {
if err != nil {
t.Error(err)
}
if info.IPFS != Pinned {
if info.Status != TrackerStatusPinned {
t.Error("element should be in Pinned state")
}
}
@ -353,11 +352,11 @@ func TestClustersGlobalSync(t *testing.T) {
t.Error("expected globalsync to have problems with errorCid")
}
for _, c := range clusters {
inf, ok := ginfos[0].Status[c.host.ID()]
inf, ok := ginfos[0].PeerMap[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.IPFS != PinError && inf.IPFS != Pinning {
if inf.Status != TrackerStatusPinError && inf.Status != TrackerStatusPinning {
t.Error("should be PinError in all members")
}
}
@ -379,7 +378,7 @@ func TestClustersGlobalSyncCid(t *testing.T) {
// with errors contained in GlobalPinInfo
t.Fatal("did not expect an error")
}
pinfo, ok := ginfo.Status[clusters[j].host.ID()]
pinfo, ok := ginfo.PeerMap[clusters[j].host.ID()]
if !ok {
t.Fatal("should have info for this host")
}
@ -392,13 +391,13 @@ func TestClustersGlobalSyncCid(t *testing.T) {
}
for _, c := range clusters {
inf, ok := ginfo.Status[c.host.ID()]
inf, ok := ginfo.PeerMap[c.host.ID()]
if !ok {
t.Logf("%+v", ginfo)
t.Fatal("GlobalPinInfo should not be empty for this host")
}
if inf.IPFS != PinError && inf.IPFS != Pinning {
if inf.Status != TrackerStatusPinError && inf.Status != TrackerStatusPinning {
t.Error("should be PinError or Pinning in all members")
}
}
@ -414,11 +413,11 @@ func TestClustersGlobalSyncCid(t *testing.T) {
}
for _, c := range clusters {
inf, ok := ginfo.Status[c.host.ID()]
inf, ok := ginfo.PeerMap[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.IPFS != Pinned {
if inf.Status != TrackerStatusPinned {
t.Error("the GlobalPinInfo should show Pinned in all members")
}
}

View File

@ -2,6 +2,7 @@ package ipfscluster
import (
"context"
"errors"
"sync"
"time"
@ -18,6 +19,13 @@ var (
UnpinningTimeout = 10 * time.Second
)
var (
errUnpinningTimeout = errors.New("unpinning operation is taking too long")
errPinningTimeout = errors.New("pinning operation is taking too long")
errPinned = errors.New("the item is unexpectedly pinned on IPFS")
errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS")
)
// MapPinTracker is a PinTracker implementation which uses a Go map
// to store the status of the tracked Cids. This component is thread-safe.
type MapPinTracker struct {
@ -83,8 +91,14 @@ func (mpt *MapPinTracker) Shutdown() error {
return nil
}
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s IPFSStatus) {
if s == Unpinned {
func (mpt *MapPinTracker) set(c *cid.Cid, s TrackerStatus) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s TrackerStatus) {
if s == TrackerStatusUnpinned {
delete(mpt.status, c.String())
return
}
@ -93,38 +107,63 @@ func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s IPFSStatus) {
// cid: c,
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: s,
Status: s,
TS: time.Now(),
Error: "",
}
}
func (mpt *MapPinTracker) set(c *cid.Cid, s IPFSStatus) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) unsafeGet(c *cid.Cid) PinInfo {
p, ok := mpt.status[c.String()]
if !ok {
return PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: Unpinned,
TS: time.Now(),
}
}
return p
}
func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
mpt.mux.RLock()
defer mpt.mux.RUnlock()
return mpt.unsafeGet(c)
}
func (mpt *MapPinTracker) unsafeGet(c *cid.Cid) PinInfo {
p, ok := mpt.status[c.String()]
if !ok {
return PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
Status: TrackerStatusUnpinned,
TS: time.Now(),
Error: "",
}
}
return p
}
// sets a Cid in error state
func (mpt *MapPinTracker) setError(c *cid.Cid, err error) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSetError(c, err)
}
func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) {
p := mpt.unsafeGet(c)
switch p.Status {
case TrackerStatusPinned, TrackerStatusPinning, TrackerStatusPinError:
mpt.status[c.String()] = PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
Status: TrackerStatusPinError,
TS: time.Now(),
Error: err.Error(),
}
case TrackerStatusUnpinned, TrackerStatusUnpinning, TrackerStatusUnpinError:
mpt.status[c.String()] = PinInfo{
CidStr: c.String(),
Peer: mpt.peerID,
Status: TrackerStatusUnpinError,
TS: time.Now(),
Error: err.Error(),
}
}
}
func (mpt *MapPinTracker) pin(c *cid.Cid) error {
mpt.set(c, Pinning)
mpt.set(c, TrackerStatusPinning)
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSPin",
@ -132,25 +171,25 @@ func (mpt *MapPinTracker) pin(c *cid.Cid) error {
&struct{}{})
if err != nil {
mpt.set(c, PinError)
mpt.setError(c, err)
return err
}
mpt.set(c, Pinned)
mpt.set(c, TrackerStatusPinned)
return nil
}
func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
mpt.set(c, Unpinning)
mpt.set(c, TrackerStatusUnpinning)
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSUnpin",
NewCidArg(c),
&struct{}{})
if err != nil {
mpt.set(c, UnpinError)
mpt.setError(c, err)
return err
}
mpt.set(c, Unpinned)
mpt.set(c, TrackerStatusUnpinned)
return nil
}
@ -184,74 +223,111 @@ func (mpt *MapPinTracker) Status() []PinInfo {
return pins
}
// Sync verifies that the status of a Cid matches the status
// of it in the IPFS daemon. If not, it will be transitioned
// to Pin or Unpin error. Sync returns true if the status was
// modified or the status is error. Pins in error states can be
// recovered with Recover().
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
var ipfsPinned bool
p := mpt.get(c)
// SyncCid verifies that the status of a Cid matches that of
// the IPFS daemon. If not, it will be transitioned
// to PinError or UnpinError.
//
// SyncCid returns the updated local status for the given Cid.
// Pins in error states can be recovered with Recover().
// An error is returned if we are unable to contact
// the IPFS daemon.
func (mpt *MapPinTracker) SyncCid(c *cid.Cid) (PinInfo, error) {
var ips IPFSPinStatus
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSIsPinned",
"IPFSPinLsCid",
NewCidArg(c),
&ipfsPinned)
&ips)
if err != nil {
switch p.IPFS {
case Pinned, Pinning:
mpt.set(c, PinError)
return true
case Unpinned, Unpinning:
mpt.set(c, UnpinError)
return true
case PinError, UnpinError:
return true
default:
return false
mpt.setError(c, err)
return mpt.get(c), err
}
return mpt.syncStatus(c, ips), nil
}
// Sync verifies that the statuses of all tracked Cids match the
// one reported by the IPFS daemon. If not, they will be transitioned
// to PinError or UnpinError.
//
// Sync returns the list of local status for all tracked Cids which
// were updated or have errors. Cids in error states can be recovered
// with Recover().
// An error is returned if we are unable to contact the IPFS daemon.
func (mpt *MapPinTracker) Sync() ([]PinInfo, error) {
var ipsMap map[string]IPFSPinStatus
var pInfos []PinInfo
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSPinLs",
struct{}{},
&ipsMap)
if err != nil {
mpt.mux.Lock()
for k := range mpt.status {
c, _ := cid.Decode(k)
mpt.unsafeSetError(c, err)
pInfos = append(pInfos, mpt.unsafeGet(c))
}
mpt.mux.Unlock()
return pInfos, err
}
if ipfsPinned {
switch p.IPFS {
case Pinned:
return false
case Pinning, PinError:
mpt.set(c, Pinned)
return true
case Unpinning:
status := mpt.Status()
for _, pInfoOrig := range status {
c, err := cid.Decode(pInfoOrig.CidStr)
if err != nil { // this should not happen but let's play safe
return pInfos, err
}
var pInfoNew PinInfo
ips, ok := ipsMap[pInfoOrig.CidStr]
if !ok {
pInfoNew = mpt.syncStatus(c, IPFSPinStatusUnpinned)
} else {
pInfoNew = mpt.syncStatus(c, ips)
}
if pInfoOrig.Status != pInfoNew.Status ||
pInfoNew.Status == TrackerStatusUnpinError ||
pInfoNew.Status == TrackerStatusPinError {
pInfos = append(pInfos, pInfoNew)
}
}
return pInfos, nil
}
func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips IPFSPinStatus) PinInfo {
p := mpt.get(c)
if ips.IsPinned() {
switch p.Status {
case TrackerStatusPinned: // nothing
case TrackerStatusPinning, TrackerStatusPinError:
mpt.set(c, TrackerStatusPinned)
case TrackerStatusUnpinning:
if time.Since(p.TS) > UnpinningTimeout {
mpt.set(c, UnpinError)
return true
mpt.setError(c, errUnpinningTimeout)
}
return false
case Unpinned, UnpinError:
mpt.set(c, UnpinError)
return true
case TrackerStatusUnpinned:
mpt.setError(c, errPinned)
case TrackerStatusUnpinError: // nothing, keep error as it was
default:
return false
}
} else {
switch p.IPFS {
case Pinned, PinError:
mpt.set(c, PinError)
return true
case Pinning:
switch p.Status {
case TrackerStatusPinned:
mpt.setError(c, errUnpinned)
case TrackerStatusPinError: // nothing, keep error as it was
case TrackerStatusPinning:
if time.Since(p.TS) > PinningTimeout {
mpt.set(c, PinError)
return true
mpt.setError(c, errPinningTimeout)
}
return false
case Unpinning, UnpinError:
mpt.set(c, Unpinned)
return true
case Unpinned:
return false
case TrackerStatusUnpinning, TrackerStatusUnpinError:
mpt.set(c, TrackerStatusUnpinned)
case TrackerStatusUnpinned: // nothing
default:
return false
}
}
return mpt.get(c)
}
// Recover will re-track or re-untrack a Cid in error state,
@ -259,15 +335,16 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
// only when it is done.
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
p := mpt.get(c)
if p.IPFS != PinError && p.IPFS != UnpinError {
if p.Status != TrackerStatusPinError &&
p.Status != TrackerStatusUnpinError {
return nil
}
logger.Infof("Recovering %s", c)
var err error
if p.IPFS == PinError {
switch p.Status {
case TrackerStatusPinError:
err = mpt.Track(c)
}
if p.IPFS == UnpinError {
case TrackerStatusUnpinError:
err = mpt.Untrack(c)
}
if err != nil {

View File

@ -79,13 +79,13 @@ type unpinResp struct {
}
type statusInfo struct {
IPFS string `json:"ipfs"`
Error string `json:"error,omitempty"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
}
type statusCidResp struct {
Cid string `json:"cid"`
Status map[string]statusInfo `json:"status"`
Cid string `json:"cid"`
PeerMap map[string]statusInfo `json:"peer_map"`
}
type idResp struct {
@ -465,11 +465,11 @@ func sendErrorResponse(w http.ResponseWriter, code int, msg string) {
func transformPinToStatusCid(p GlobalPinInfo) statusCidResp {
s := statusCidResp{}
s.Cid = p.Cid.String()
s.Status = make(map[string]statusInfo)
for k, v := range p.Status {
s.Status[k.Pretty()] = statusInfo{
IPFS: v.IPFS.String(),
Error: v.Error,
s.PeerMap = make(map[string]statusInfo)
for k, v := range p.PeerMap {
s.PeerMap[k.Pretty()] = statusInfo{
Status: v.Status.String(),
Error: v.Error,
}
}
return s

View File

@ -163,7 +163,7 @@ func TestRESTAPIStatusEndpoint(t *testing.T) {
makeGet(t, "/status", &resp)
if len(resp) != 3 ||
resp[0].Cid != testCid1 ||
resp[1].Status[testPeerID.Pretty()].IPFS != "pinning" {
resp[1].PeerMap[testPeerID.Pretty()].Status != "pinning" {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
}
@ -178,11 +178,11 @@ func TestRESTAPIStatusCidEndpoint(t *testing.T) {
if resp.Cid != testCid {
t.Error("expected the same cid")
}
info, ok := resp.Status[testPeerID.Pretty()]
info, ok := resp.PeerMap[testPeerID.Pretty()]
if !ok {
t.Fatal("expected info for testPeerID")
}
if info.IPFS != "pinned" {
if info.Status != "pinned" {
t.Error("expected different status")
}
}
@ -196,7 +196,7 @@ func TestRESTAPIStatusSyncEndpoint(t *testing.T) {
if len(resp) != 3 ||
resp[0].Cid != testCid1 ||
resp[1].Status[testPeerID.Pretty()].IPFS != "pinning" {
resp[1].PeerMap[testPeerID.Pretty()].Status != "pinning" {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
}
@ -211,11 +211,11 @@ func TestRESTAPIStatusSyncCidEndpoint(t *testing.T) {
if resp.Cid != testCid {
t.Error("expected the same cid")
}
info, ok := resp.Status[testPeerID.Pretty()]
info, ok := resp.PeerMap[testPeerID.Pretty()]
if !ok {
t.Fatal("expected info for testPeerID")
}
if info.IPFS != "pinned" {
if info.Status != "pinned" {
t.Error("expected different status")
}
}

View File

@ -44,6 +44,7 @@ func (arg *CidArg) CID() (*cid.Cid, error) {
Cluster components methods
*/
// ID runs Cluster.ID()
func (api *RPCAPI) ID(in struct{}, out *ID) error {
*out = api.cluster.ID()
return nil
@ -214,17 +215,24 @@ func (api *RPCAPI) IPFSUnpin(in *CidArg, out *struct{}) error {
return api.cluster.ipfs.Unpin(c)
}
// IPFSIsPinned runs IPFSConnector.IsPinned().
func (api *RPCAPI) IPFSIsPinned(in *CidArg, out *bool) error {
// IPFSPinLsCid runs IPFSConnector.PinLsCid().
func (api *RPCAPI) IPFSPinLsCid(in *CidArg, out *IPFSPinStatus) error {
c, err := in.CID()
if err != nil {
return err
}
b, err := api.cluster.ipfs.IsPinned(c)
b, err := api.cluster.ipfs.PinLsCid(c)
*out = b
return err
}
// IPFSPinLs runs IPFSConnector.PinLs().
func (api *RPCAPI) IPFSPinLs(in struct{}, out *map[string]IPFSPinStatus) error {
m, err := api.cluster.ipfs.PinLs()
*out = m
return err
}
/*
Consensus component methods
*/

View File

@ -73,33 +73,33 @@ func (mock *mockService) Status(in struct{}, out *[]GlobalPinInfo) error {
*out = []GlobalPinInfo{
{
Cid: c1,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid1,
Peer: testPeerID,
IPFS: Pinned,
Status: TrackerStatusPinned,
TS: time.Now(),
},
},
},
{
Cid: c2,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid2,
Peer: testPeerID,
IPFS: Pinning,
Status: TrackerStatusPinning,
TS: time.Now(),
},
},
},
{
Cid: c3,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid3,
Peer: testPeerID,
IPFS: PinError,
Status: TrackerStatusPinError,
TS: time.Now(),
},
},
@ -115,11 +115,11 @@ func (mock *mockService) StatusCid(in *CidArg, out *GlobalPinInfo) error {
c1, _ := cid.Decode(testCid1)
*out = GlobalPinInfo{
Cid: c1,
Status: map[peer.ID]PinInfo{
PeerMap: map[peer.ID]PinInfo{
testPeerID: {
CidStr: testCid1,
Peer: testPeerID,
IPFS: Pinned,
Status: TrackerStatusPinned,
TS: time.Now(),
},
},