2016-12-06 21:29:59 +00:00
|
|
|
package ipfscluster
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2017-01-25 17:07:19 +00:00
|
|
|
"errors"
|
2016-12-06 21:29:59 +00:00
|
|
|
"sync"
|
2016-12-15 18:08:46 +00:00
|
|
|
"time"
|
2016-12-06 21:29:59 +00:00
|
|
|
|
2017-01-25 11:14:39 +00:00
|
|
|
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
2016-12-19 17:35:24 +00:00
|
|
|
cid "github.com/ipfs/go-cid"
|
2016-12-23 18:35:37 +00:00
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
2016-12-06 21:29:59 +00:00
|
|
|
)
|
|
|
|
|
2016-12-15 18:08:46 +00:00
|
|
|
// A Pin or Unpin operation will be considered failed
|
|
|
|
// if the Cid has stayed in Pinning or Unpinning state
|
|
|
|
// for longer than these values.
|
|
|
|
var (
|
|
|
|
PinningTimeout = 15 * time.Minute
|
|
|
|
UnpinningTimeout = 10 * time.Second
|
|
|
|
)
|
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
var (
|
|
|
|
errUnpinningTimeout = errors.New("unpinning operation is taking too long")
|
|
|
|
errPinningTimeout = errors.New("pinning operation is taking too long")
|
|
|
|
errPinned = errors.New("the item is unexpectedly pinned on IPFS")
|
|
|
|
errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS")
|
|
|
|
)
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// MapPinTracker is a PinTracker implementation which uses a Go map
|
|
|
|
// to store the status of the tracked Cids. This component is thread-safe.
|
2016-12-06 21:29:59 +00:00
|
|
|
type MapPinTracker struct {
|
2016-12-20 18:51:13 +00:00
|
|
|
mux sync.RWMutex
|
2016-12-19 17:35:24 +00:00
|
|
|
status map[string]PinInfo
|
2016-12-06 21:29:59 +00:00
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
ctx context.Context
|
|
|
|
rpcClient *rpc.Client
|
|
|
|
rpcReady chan struct{}
|
|
|
|
peerID peer.ID
|
2016-12-09 19:54:46 +00:00
|
|
|
|
2016-12-15 13:07:19 +00:00
|
|
|
shutdownLock sync.Mutex
|
|
|
|
shutdown bool
|
|
|
|
shutdownCh chan struct{}
|
|
|
|
wg sync.WaitGroup
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// NewMapPinTracker returns a new object which has been correcly
|
|
|
|
// initialized with the given configuration.
|
2016-12-19 17:35:24 +00:00
|
|
|
func NewMapPinTracker(cfg *Config) *MapPinTracker {
|
2016-12-09 19:54:46 +00:00
|
|
|
ctx := context.Background()
|
2016-12-19 17:35:24 +00:00
|
|
|
|
2016-12-06 21:29:59 +00:00
|
|
|
mpt := &MapPinTracker{
|
2016-12-09 19:54:46 +00:00
|
|
|
ctx: ctx,
|
2016-12-19 17:35:24 +00:00
|
|
|
status: make(map[string]PinInfo),
|
2016-12-23 18:35:37 +00:00
|
|
|
rpcReady: make(chan struct{}, 1),
|
2017-01-23 17:38:59 +00:00
|
|
|
peerID: cfg.ID,
|
2016-12-15 13:07:19 +00:00
|
|
|
shutdownCh: make(chan struct{}),
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
2016-12-15 13:07:19 +00:00
|
|
|
mpt.run()
|
2016-12-06 21:29:59 +00:00
|
|
|
return mpt
|
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// run does nothing other than give MapPinTracker a cancellable context.
|
2016-12-06 21:29:59 +00:00
|
|
|
func (mpt *MapPinTracker) run() {
|
2016-12-15 13:07:19 +00:00
|
|
|
mpt.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer mpt.wg.Done()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
mpt.ctx = ctx
|
2017-01-23 22:58:04 +00:00
|
|
|
<-mpt.rpcReady
|
|
|
|
logger.Info("PinTracker ready")
|
2016-12-15 13:07:19 +00:00
|
|
|
<-mpt.shutdownCh
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// Shutdown finishes the services provided by the MapPinTracker and cancels
|
|
|
|
// any active context.
|
2016-12-15 13:07:19 +00:00
|
|
|
func (mpt *MapPinTracker) Shutdown() error {
|
|
|
|
mpt.shutdownLock.Lock()
|
|
|
|
defer mpt.shutdownLock.Unlock()
|
|
|
|
|
|
|
|
if mpt.shutdown {
|
|
|
|
logger.Debug("already shutdown")
|
|
|
|
return nil
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
2016-12-15 13:07:19 +00:00
|
|
|
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Info("stopping MapPinTracker")
|
2016-12-15 13:07:19 +00:00
|
|
|
mpt.shutdownCh <- struct{}{}
|
|
|
|
mpt.wg.Wait()
|
|
|
|
mpt.shutdown = true
|
|
|
|
return nil
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
func (mpt *MapPinTracker) set(c *cid.Cid, s TrackerStatus) {
|
|
|
|
mpt.mux.Lock()
|
|
|
|
defer mpt.mux.Unlock()
|
|
|
|
mpt.unsafeSet(c, s)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s TrackerStatus) {
|
|
|
|
if s == TrackerStatusUnpinned {
|
2016-12-07 16:21:29 +00:00
|
|
|
delete(mpt.status, c.String())
|
2016-12-19 18:58:01 +00:00
|
|
|
return
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
|
2016-12-19 17:35:24 +00:00
|
|
|
mpt.status[c.String()] = PinInfo{
|
|
|
|
// cid: c,
|
|
|
|
CidStr: c.String(),
|
|
|
|
Peer: mpt.peerID,
|
2017-01-25 17:07:19 +00:00
|
|
|
Status: s,
|
2016-12-19 17:35:24 +00:00
|
|
|
TS: time.Now(),
|
2017-01-25 17:07:19 +00:00
|
|
|
Error: "",
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
|
|
|
|
mpt.mux.RLock()
|
|
|
|
defer mpt.mux.RUnlock()
|
|
|
|
return mpt.unsafeGet(c)
|
2016-12-19 17:35:24 +00:00
|
|
|
}
|
|
|
|
|
2016-12-20 18:51:13 +00:00
|
|
|
func (mpt *MapPinTracker) unsafeGet(c *cid.Cid) PinInfo {
|
2016-12-07 16:21:29 +00:00
|
|
|
p, ok := mpt.status[c.String()]
|
|
|
|
if !ok {
|
2016-12-19 17:35:24 +00:00
|
|
|
return PinInfo{
|
|
|
|
CidStr: c.String(),
|
|
|
|
Peer: mpt.peerID,
|
2017-01-25 17:07:19 +00:00
|
|
|
Status: TrackerStatusUnpinned,
|
2016-12-19 17:35:24 +00:00
|
|
|
TS: time.Now(),
|
2017-01-25 17:07:19 +00:00
|
|
|
Error: "",
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
// sets a Cid in error state
|
|
|
|
func (mpt *MapPinTracker) setError(c *cid.Cid, err error) {
|
|
|
|
mpt.mux.Lock()
|
|
|
|
defer mpt.mux.Unlock()
|
|
|
|
mpt.unsafeSetError(c, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) {
|
|
|
|
p := mpt.unsafeGet(c)
|
|
|
|
switch p.Status {
|
|
|
|
case TrackerStatusPinned, TrackerStatusPinning, TrackerStatusPinError:
|
|
|
|
mpt.status[c.String()] = PinInfo{
|
|
|
|
CidStr: c.String(),
|
|
|
|
Peer: mpt.peerID,
|
|
|
|
Status: TrackerStatusPinError,
|
|
|
|
TS: time.Now(),
|
|
|
|
Error: err.Error(),
|
|
|
|
}
|
|
|
|
case TrackerStatusUnpinned, TrackerStatusUnpinning, TrackerStatusUnpinError:
|
|
|
|
mpt.status[c.String()] = PinInfo{
|
|
|
|
CidStr: c.String(),
|
|
|
|
Peer: mpt.peerID,
|
|
|
|
Status: TrackerStatusUnpinError,
|
|
|
|
TS: time.Now(),
|
|
|
|
Error: err.Error(),
|
|
|
|
}
|
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
|
|
|
|
2016-12-19 17:35:24 +00:00
|
|
|
func (mpt *MapPinTracker) pin(c *cid.Cid) error {
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.set(c, TrackerStatusPinning)
|
2016-12-23 18:35:37 +00:00
|
|
|
err := mpt.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"IPFSPin",
|
|
|
|
NewCidArg(c),
|
|
|
|
&struct{}{})
|
|
|
|
|
|
|
|
if err != nil {
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.setError(c, err)
|
2016-12-23 18:35:37 +00:00
|
|
|
return err
|
2016-12-19 17:35:24 +00:00
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.set(c, TrackerStatusPinned)
|
2016-12-19 17:35:24 +00:00
|
|
|
return nil
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
|
2016-12-19 17:35:24 +00:00
|
|
|
func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.set(c, TrackerStatusUnpinning)
|
2016-12-23 18:35:37 +00:00
|
|
|
err := mpt.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"IPFSUnpin",
|
|
|
|
NewCidArg(c),
|
|
|
|
&struct{}{})
|
|
|
|
if err != nil {
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.setError(c, err)
|
2016-12-23 18:35:37 +00:00
|
|
|
return err
|
2016-12-19 17:35:24 +00:00
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.set(c, TrackerStatusUnpinned)
|
2016-12-19 17:35:24 +00:00
|
|
|
return nil
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// Track tells the MapPinTracker to start managing a Cid,
|
|
|
|
// possibly trigerring Pin operations on the IPFS daemon.
|
2016-12-19 17:35:24 +00:00
|
|
|
func (mpt *MapPinTracker) Track(c *cid.Cid) error {
|
|
|
|
return mpt.pin(c)
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// Untrack tells the MapPinTracker to stop managing a Cid.
|
|
|
|
// If the Cid is pinned locally, it will be unpinned.
|
2016-12-19 17:35:24 +00:00
|
|
|
func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
|
|
|
|
return mpt.unpin(c)
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// StatusCid returns information for a Cid tracked by this
|
|
|
|
// MapPinTracker.
|
2016-12-20 18:51:13 +00:00
|
|
|
func (mpt *MapPinTracker) StatusCid(c *cid.Cid) PinInfo {
|
2016-12-07 16:21:29 +00:00
|
|
|
return mpt.get(c)
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// Status returns information for all Cids tracked by this
|
|
|
|
// MapPinTracker.
|
2016-12-20 18:51:13 +00:00
|
|
|
func (mpt *MapPinTracker) Status() []PinInfo {
|
2016-12-06 21:29:59 +00:00
|
|
|
mpt.mux.Lock()
|
|
|
|
defer mpt.mux.Unlock()
|
2016-12-19 17:35:24 +00:00
|
|
|
pins := make([]PinInfo, 0, len(mpt.status))
|
2016-12-06 21:29:59 +00:00
|
|
|
for _, v := range mpt.status {
|
|
|
|
pins = append(pins, v)
|
|
|
|
}
|
|
|
|
return pins
|
|
|
|
}
|
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
// SyncCid verifies that the status of a Cid matches that of
|
|
|
|
// the IPFS daemon. If not, it will be transitioned
|
|
|
|
// to PinError or UnpinError.
|
|
|
|
//
|
|
|
|
// SyncCid returns the updated local status for the given Cid.
|
|
|
|
// Pins in error states can be recovered with Recover().
|
|
|
|
// An error is returned if we are unable to contact
|
|
|
|
// the IPFS daemon.
|
|
|
|
func (mpt *MapPinTracker) SyncCid(c *cid.Cid) (PinInfo, error) {
|
|
|
|
var ips IPFSPinStatus
|
2016-12-23 18:35:37 +00:00
|
|
|
err := mpt.rpcClient.Call("",
|
|
|
|
"Cluster",
|
2017-01-25 17:07:19 +00:00
|
|
|
"IPFSPinLsCid",
|
2016-12-23 18:35:37 +00:00
|
|
|
NewCidArg(c),
|
2017-01-25 17:07:19 +00:00
|
|
|
&ips)
|
|
|
|
if err != nil {
|
|
|
|
mpt.setError(c, err)
|
|
|
|
return mpt.get(c), err
|
|
|
|
}
|
|
|
|
return mpt.syncStatus(c, ips), nil
|
|
|
|
}
|
2016-12-23 18:35:37 +00:00
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
// Sync verifies that the statuses of all tracked Cids match the
|
|
|
|
// one reported by the IPFS daemon. If not, they will be transitioned
|
|
|
|
// to PinError or UnpinError.
|
|
|
|
//
|
|
|
|
// Sync returns the list of local status for all tracked Cids which
|
|
|
|
// were updated or have errors. Cids in error states can be recovered
|
|
|
|
// with Recover().
|
|
|
|
// An error is returned if we are unable to contact the IPFS daemon.
|
|
|
|
func (mpt *MapPinTracker) Sync() ([]PinInfo, error) {
|
|
|
|
var ipsMap map[string]IPFSPinStatus
|
|
|
|
var pInfos []PinInfo
|
|
|
|
err := mpt.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"IPFSPinLs",
|
|
|
|
struct{}{},
|
|
|
|
&ipsMap)
|
2016-12-23 18:35:37 +00:00
|
|
|
if err != nil {
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.mux.Lock()
|
|
|
|
for k := range mpt.status {
|
|
|
|
c, _ := cid.Decode(k)
|
|
|
|
mpt.unsafeSetError(c, err)
|
|
|
|
pInfos = append(pInfos, mpt.unsafeGet(c))
|
|
|
|
}
|
|
|
|
mpt.mux.Unlock()
|
|
|
|
return pInfos, err
|
|
|
|
}
|
|
|
|
|
|
|
|
status := mpt.Status()
|
|
|
|
for _, pInfoOrig := range status {
|
|
|
|
c, err := cid.Decode(pInfoOrig.CidStr)
|
|
|
|
if err != nil { // this should not happen but let's play safe
|
|
|
|
return pInfos, err
|
|
|
|
}
|
|
|
|
var pInfoNew PinInfo
|
|
|
|
ips, ok := ipsMap[pInfoOrig.CidStr]
|
|
|
|
if !ok {
|
|
|
|
pInfoNew = mpt.syncStatus(c, IPFSPinStatusUnpinned)
|
|
|
|
} else {
|
|
|
|
pInfoNew = mpt.syncStatus(c, ips)
|
|
|
|
}
|
|
|
|
|
|
|
|
if pInfoOrig.Status != pInfoNew.Status ||
|
|
|
|
pInfoNew.Status == TrackerStatusUnpinError ||
|
|
|
|
pInfoNew.Status == TrackerStatusPinError {
|
|
|
|
pInfos = append(pInfos, pInfoNew)
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
return pInfos, nil
|
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips IPFSPinStatus) PinInfo {
|
|
|
|
p := mpt.get(c)
|
|
|
|
if ips.IsPinned() {
|
|
|
|
switch p.Status {
|
|
|
|
case TrackerStatusPinned: // nothing
|
|
|
|
case TrackerStatusPinning, TrackerStatusPinError:
|
|
|
|
mpt.set(c, TrackerStatusPinned)
|
|
|
|
case TrackerStatusUnpinning:
|
2016-12-15 18:08:46 +00:00
|
|
|
if time.Since(p.TS) > UnpinningTimeout {
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.setError(c, errUnpinningTimeout)
|
2016-12-15 18:08:46 +00:00
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
case TrackerStatusUnpinned:
|
|
|
|
mpt.setError(c, errPinned)
|
|
|
|
case TrackerStatusUnpinError: // nothing, keep error as it was
|
2016-12-20 18:51:13 +00:00
|
|
|
default:
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
} else {
|
2017-01-25 17:07:19 +00:00
|
|
|
switch p.Status {
|
|
|
|
case TrackerStatusPinned:
|
|
|
|
|
|
|
|
mpt.setError(c, errUnpinned)
|
|
|
|
case TrackerStatusPinError: // nothing, keep error as it was
|
|
|
|
case TrackerStatusPinning:
|
2016-12-15 18:08:46 +00:00
|
|
|
if time.Since(p.TS) > PinningTimeout {
|
2017-01-25 17:07:19 +00:00
|
|
|
mpt.setError(c, errPinningTimeout)
|
2016-12-15 18:08:46 +00:00
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
case TrackerStatusUnpinning, TrackerStatusUnpinError:
|
|
|
|
mpt.set(c, TrackerStatusUnpinned)
|
|
|
|
case TrackerStatusUnpinned: // nothing
|
2016-12-20 18:51:13 +00:00
|
|
|
default:
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
return mpt.get(c)
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
|
2016-12-20 18:51:13 +00:00
|
|
|
// Recover will re-track or re-untrack a Cid in error state,
|
|
|
|
// possibly retriggering an IPFS pinning operation and returning
|
|
|
|
// only when it is done.
|
2016-12-07 16:21:29 +00:00
|
|
|
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
|
|
|
|
p := mpt.get(c)
|
2017-01-25 17:07:19 +00:00
|
|
|
if p.Status != TrackerStatusPinError &&
|
|
|
|
p.Status != TrackerStatusUnpinError {
|
2016-12-07 16:21:29 +00:00
|
|
|
return nil
|
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
logger.Infof("Recovering %s", c)
|
|
|
|
var err error
|
2017-01-25 17:07:19 +00:00
|
|
|
switch p.Status {
|
|
|
|
case TrackerStatusPinError:
|
2016-12-20 18:51:13 +00:00
|
|
|
err = mpt.Track(c)
|
2017-01-25 17:07:19 +00:00
|
|
|
case TrackerStatusUnpinError:
|
2016-12-20 18:51:13 +00:00
|
|
|
err = mpt.Untrack(c)
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Errorf("error recovering %s: %s", c, err)
|
|
|
|
return err
|
2016-12-09 19:54:46 +00:00
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
return nil
|
2016-12-09 19:54:46 +00:00
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// SetClient makes the MapPinTracker ready to perform RPC requests to
|
|
|
|
// other components.
|
2016-12-23 18:35:37 +00:00
|
|
|
func (mpt *MapPinTracker) SetClient(c *rpc.Client) {
|
|
|
|
mpt.rpcClient = c
|
|
|
|
mpt.rpcReady <- struct{}{}
|
2016-12-06 21:29:59 +00:00
|
|
|
}
|