2d9e6c1de8
- abort if a Track() calls fails due to queue being full - increase max pin queue size to 1 million - hind max_pin_queue_size from configuration - use an elaborated error message Fixes #377
454 lines
12 KiB
Go
454 lines
12 KiB
Go
// Package maptracker implements a PinTracker component for IPFS Cluster. It
|
|
// uses a map to keep track of the state of tracked pins.
|
|
package maptracker
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/pintracker/optracker"
|
|
"github.com/ipfs/ipfs-cluster/pintracker/util"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
var logger = logging.Logger("pintracker")
|
|
|
|
var (
|
|
errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS")
|
|
// ErrFullQueue is the error used when pin or unpin operation channel is full.
|
|
ErrFullQueue = errors.New("pin/unpin operation queue is full (too many operations), increasing max_pin_queue_size would help")
|
|
)
|
|
|
|
// MapPinTracker is a PinTracker implementation which uses a Go map
|
|
// to store the status of the tracked Cids. This component is thread-safe.
|
|
type MapPinTracker struct {
|
|
config *Config
|
|
|
|
optracker *optracker.OperationTracker
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
rpcClient *rpc.Client
|
|
rpcReady chan struct{}
|
|
|
|
peerID peer.ID
|
|
pinCh chan *optracker.Operation
|
|
unpinCh chan *optracker.Operation
|
|
|
|
shutdownLock sync.Mutex
|
|
shutdown bool
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewMapPinTracker returns a new object which has been correctly
|
|
// initialized with the given configuration.
|
|
func NewMapPinTracker(cfg *Config, pid peer.ID, peerName string) *MapPinTracker {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
mpt := &MapPinTracker{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
config: cfg,
|
|
optracker: optracker.NewOperationTracker(ctx, pid, peerName),
|
|
rpcReady: make(chan struct{}, 1),
|
|
peerID: pid,
|
|
pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
|
|
unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
|
|
}
|
|
|
|
for i := 0; i < mpt.config.ConcurrentPins; i++ {
|
|
go mpt.opWorker(ctx, mpt.pin, mpt.pinCh)
|
|
}
|
|
go mpt.opWorker(ctx, mpt.unpin, mpt.unpinCh)
|
|
return mpt
|
|
}
|
|
|
|
// receives a pin Function (pin or unpin) and a channel.
|
|
// Used for both pinning and unpinning
|
|
func (mpt *MapPinTracker) opWorker(ctx context.Context, pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) {
|
|
for {
|
|
select {
|
|
case op := <-opChan:
|
|
if op.Cancelled() {
|
|
// operation was cancelled. Move on.
|
|
// This saves some time, but not 100% needed.
|
|
continue
|
|
}
|
|
op.SetPhase(optracker.PhaseInProgress)
|
|
err := pinF(op) // call pin/unpin
|
|
if err != nil {
|
|
if op.Cancelled() {
|
|
// there was an error because
|
|
// we were cancelled. Move on.
|
|
continue
|
|
}
|
|
op.SetError(err)
|
|
op.Cancel()
|
|
continue
|
|
}
|
|
op.SetPhase(optracker.PhaseDone)
|
|
op.Cancel()
|
|
|
|
// We keep all pinned things in the tracker,
|
|
// only clean unpinned things.
|
|
if op.Type() == optracker.OperationUnpin {
|
|
mpt.optracker.Clean(ctx, op)
|
|
}
|
|
case <-mpt.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown finishes the services provided by the MapPinTracker and cancels
|
|
// any active context.
|
|
func (mpt *MapPinTracker) Shutdown(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/map/Shutdown")
|
|
defer span.End()
|
|
|
|
mpt.shutdownLock.Lock()
|
|
defer mpt.shutdownLock.Unlock()
|
|
|
|
if mpt.shutdown {
|
|
logger.Debug("already shutdown")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("stopping MapPinTracker")
|
|
mpt.cancel()
|
|
close(mpt.rpcReady)
|
|
mpt.wg.Wait()
|
|
mpt.shutdown = true
|
|
return nil
|
|
}
|
|
|
|
func (mpt *MapPinTracker) pin(op *optracker.Operation) error {
|
|
ctx, span := trace.StartSpan(op.Context(), "tracker/map/pin")
|
|
defer span.End()
|
|
|
|
logger.Debugf("issuing pin call for %s", op.Cid())
|
|
err := mpt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"Pin",
|
|
op.Pin(),
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (mpt *MapPinTracker) unpin(op *optracker.Operation) error {
|
|
ctx, span := trace.StartSpan(op.Context(), "tracker/map/unpin")
|
|
defer span.End()
|
|
|
|
logger.Debugf("issuing unpin call for %s", op.Cid())
|
|
err := mpt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"Unpin",
|
|
op.Pin(),
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// puts a new operation on the queue, unless ongoing exists
|
|
func (mpt *MapPinTracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.OperationType, ch chan *optracker.Operation) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/map/enqueue")
|
|
defer span.End()
|
|
|
|
op := mpt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued)
|
|
if op == nil {
|
|
return nil // ongoing pin operation.
|
|
}
|
|
|
|
select {
|
|
case ch <- op:
|
|
default:
|
|
err := ErrFullQueue
|
|
op.SetError(err)
|
|
op.Cancel()
|
|
logger.Error(err.Error())
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Track tells the MapPinTracker to start managing a Cid,
|
|
// possibly triggering Pin operations on the IPFS daemon.
|
|
func (mpt *MapPinTracker) Track(ctx context.Context, c *api.Pin) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/map/Track")
|
|
defer span.End()
|
|
|
|
logger.Debugf("tracking %s", c.Cid)
|
|
|
|
// Sharded pins are never pinned. A sharded pin cannot turn into
|
|
// something else or viceversa like it happens with Remote pins so
|
|
// we just track them.
|
|
if c.Type == api.MetaType {
|
|
mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone)
|
|
return nil
|
|
}
|
|
|
|
// Trigger unpin whenever something remote is tracked
|
|
// Note, IPFSConn checks with pin/ls before triggering
|
|
// pin/rm, so this actually does not always trigger unpin
|
|
// to ipfs.
|
|
if util.IsRemotePin(c, mpt.peerID) {
|
|
op := mpt.optracker.TrackNewOperation(ctx, c, optracker.OperationRemote, optracker.PhaseInProgress)
|
|
if op == nil {
|
|
return nil // Ongoing operationRemote / PhaseInProgress
|
|
}
|
|
err := mpt.unpin(op) // unpin all the time, even if not pinned
|
|
op.Cancel()
|
|
if err != nil {
|
|
op.SetError(err)
|
|
} else {
|
|
op.SetPhase(optracker.PhaseDone)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return mpt.enqueue(ctx, c, optracker.OperationPin, mpt.pinCh)
|
|
}
|
|
|
|
// Untrack tells the MapPinTracker to stop managing a Cid.
|
|
// If the Cid is pinned locally, it will be unpinned.
|
|
func (mpt *MapPinTracker) Untrack(ctx context.Context, c cid.Cid) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/map/Untrack")
|
|
defer span.End()
|
|
|
|
logger.Infof("untracking %s", c)
|
|
return mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh)
|
|
}
|
|
|
|
// Status returns information for a Cid tracked by this
|
|
// MapPinTracker.
|
|
func (mpt *MapPinTracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
|
|
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Status")
|
|
defer span.End()
|
|
|
|
return mpt.optracker.Get(ctx, c)
|
|
}
|
|
|
|
// StatusAll returns information for all Cids tracked by this
|
|
// MapPinTracker.
|
|
func (mpt *MapPinTracker) StatusAll(ctx context.Context) []*api.PinInfo {
|
|
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/StatusAll")
|
|
defer span.End()
|
|
|
|
return mpt.optracker.GetAll(ctx)
|
|
}
|
|
|
|
// Sync verifies that the status of a Cid matches that of
|
|
// the IPFS daemon. If not, it will be transitioned
|
|
// to PinError or UnpinError.
|
|
//
|
|
// Sync returns the updated local status for the given Cid.
|
|
// Pins in error states can be recovered with Recover().
|
|
// An error is returned if we are unable to contact
|
|
// the IPFS daemon.
|
|
func (mpt *MapPinTracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Sync")
|
|
defer span.End()
|
|
|
|
var ips api.IPFSPinStatus
|
|
err := mpt.rpcClient.Call(
|
|
"",
|
|
"IPFSConnector",
|
|
"PinLsCid",
|
|
c,
|
|
&ips,
|
|
)
|
|
|
|
if err != nil {
|
|
mpt.optracker.SetError(ctx, c, err)
|
|
return mpt.optracker.Get(ctx, c), nil
|
|
}
|
|
|
|
return mpt.syncStatus(ctx, c, ips), nil
|
|
}
|
|
|
|
// SyncAll verifies that the statuses of all tracked Cids match the
|
|
// one reported by the IPFS daemon. If not, they will be transitioned
|
|
// to PinError or UnpinError.
|
|
//
|
|
// SyncAll returns the list of local status for all tracked Cids which
|
|
// were updated or have errors. Cids in error states can be recovered
|
|
// with Recover().
|
|
// An error is returned if we are unable to contact the IPFS daemon.
|
|
func (mpt *MapPinTracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/SyncAll")
|
|
defer span.End()
|
|
|
|
var ipsMap map[string]api.IPFSPinStatus
|
|
var results []*api.PinInfo
|
|
err := mpt.rpcClient.Call(
|
|
"",
|
|
"IPFSConnector",
|
|
"PinLs",
|
|
"recursive",
|
|
&ipsMap,
|
|
)
|
|
if err != nil {
|
|
// set pinning or unpinning ops to error, since we can't
|
|
// verify them
|
|
pInfos := mpt.optracker.GetAll(ctx)
|
|
for _, pInfo := range pInfos {
|
|
op, _ := optracker.TrackerStatusToOperationPhase(pInfo.Status)
|
|
if op == optracker.OperationPin || op == optracker.OperationUnpin {
|
|
mpt.optracker.SetError(ctx, pInfo.Cid, err)
|
|
results = append(results, mpt.optracker.Get(ctx, pInfo.Cid))
|
|
} else {
|
|
results = append(results, pInfo)
|
|
}
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
status := mpt.StatusAll(ctx)
|
|
for _, pInfoOrig := range status {
|
|
var pInfoNew *api.PinInfo
|
|
c := pInfoOrig.Cid
|
|
ips, ok := ipsMap[c.String()]
|
|
if !ok {
|
|
pInfoNew = mpt.syncStatus(ctx, c, api.IPFSPinStatusUnpinned)
|
|
} else {
|
|
pInfoNew = mpt.syncStatus(ctx, c, ips)
|
|
}
|
|
|
|
if pInfoOrig.Status != pInfoNew.Status ||
|
|
pInfoNew.Status == api.TrackerStatusUnpinError ||
|
|
pInfoNew.Status == api.TrackerStatusPinError {
|
|
results = append(results, pInfoNew)
|
|
}
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
func (mpt *MapPinTracker) syncStatus(ctx context.Context, c cid.Cid, ips api.IPFSPinStatus) *api.PinInfo {
|
|
status, ok := mpt.optracker.Status(ctx, c)
|
|
if !ok {
|
|
status = api.TrackerStatusUnpinned
|
|
}
|
|
|
|
// TODO(hector): for sharding, we may need to check that a shard
|
|
// is pinned to the right depth. For now, we assumed that if it's pinned
|
|
// in some way, then it must be right (including direct).
|
|
pinned := func(i api.IPFSPinStatus) bool {
|
|
switch i {
|
|
case api.IPFSPinStatusRecursive:
|
|
return i.IsPinned(-1)
|
|
case api.IPFSPinStatusDirect:
|
|
return i.IsPinned(0)
|
|
default:
|
|
return i.IsPinned(1) // Pinned with depth 1 or more.
|
|
}
|
|
}
|
|
|
|
if pinned(ips) {
|
|
switch status {
|
|
case api.TrackerStatusPinError:
|
|
// If an item that we wanted to pin is pinned, we mark it so
|
|
mpt.optracker.TrackNewOperation(
|
|
ctx,
|
|
api.PinCid(c),
|
|
optracker.OperationPin,
|
|
optracker.PhaseDone,
|
|
)
|
|
default:
|
|
// 1. Unpinning phases
|
|
// 2. Pinned in ipfs but we are not tracking
|
|
// -> do nothing
|
|
}
|
|
} else {
|
|
switch status {
|
|
case api.TrackerStatusUnpinError:
|
|
// clean
|
|
op := mpt.optracker.TrackNewOperation(
|
|
ctx,
|
|
api.PinCid(c),
|
|
optracker.OperationUnpin,
|
|
optracker.PhaseDone,
|
|
)
|
|
if op != nil {
|
|
mpt.optracker.Clean(ctx, op)
|
|
}
|
|
|
|
case api.TrackerStatusPinned:
|
|
// not pinned in IPFS but we think it should be: mark as error
|
|
mpt.optracker.SetError(ctx, c, errUnpinned)
|
|
default:
|
|
// 1. Pinning phases
|
|
// -> do nothing
|
|
}
|
|
}
|
|
return mpt.optracker.Get(ctx, c)
|
|
}
|
|
|
|
// Recover will re-queue a Cid in error state for the failed operation,
|
|
// possibly retriggering an IPFS pinning operation.
|
|
func (mpt *MapPinTracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/Recover")
|
|
defer span.End()
|
|
|
|
logger.Infof("Attempting to recover %s", c)
|
|
pInfo := mpt.optracker.Get(ctx, c)
|
|
var err error
|
|
|
|
switch pInfo.Status {
|
|
case api.TrackerStatusPinError:
|
|
err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationPin, mpt.pinCh)
|
|
case api.TrackerStatusUnpinError:
|
|
err = mpt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin, mpt.unpinCh)
|
|
}
|
|
return mpt.optracker.Get(ctx, c), err
|
|
}
|
|
|
|
// RecoverAll attempts to recover all items tracked by this peer.
|
|
func (mpt *MapPinTracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(mpt.ctx, "tracker/map/RecoverAll")
|
|
defer span.End()
|
|
|
|
pInfos := mpt.optracker.GetAll(ctx)
|
|
var results []*api.PinInfo
|
|
for _, pInfo := range pInfos {
|
|
res, err := mpt.Recover(ctx, pInfo.Cid)
|
|
results = append(results, res)
|
|
if err != nil {
|
|
return results, err
|
|
}
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// SetClient makes the MapPinTracker ready to perform RPC requests to
|
|
// other components.
|
|
func (mpt *MapPinTracker) SetClient(c *rpc.Client) {
|
|
mpt.rpcClient = c
|
|
mpt.rpcReady <- struct{}{}
|
|
}
|
|
|
|
// OpContext exports the internal optracker's OpContext method.
|
|
// For testing purposes only.
|
|
func (mpt *MapPinTracker) OpContext(ctx context.Context, c cid.Cid) context.Context {
|
|
return mpt.optracker.OpContext(ctx, c)
|
|
}
|