f4a445e019
The operation tracker was not setting some fields correctly when producing PinInfo objects. Additionally, recover operations were submitted with empty pin objects, which resulted in the status for pins sent on recover operations to be missing fields.
692 lines
17 KiB
Go
692 lines
17 KiB
Go
// Package stateless implements a PinTracker component for IPFS Cluster, which
|
|
// aims to reduce the memory footprint when handling really large cluster
|
|
// states.
|
|
package stateless
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
|
"github.com/ipfs-cluster/ipfs-cluster/pintracker/optracker"
|
|
"github.com/ipfs-cluster/ipfs-cluster/state"
|
|
|
|
logging "github.com/ipfs/go-log/v2"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
var logger = logging.Logger("pintracker")
|
|
|
|
const pinsChannelSize = 1024
|
|
|
|
var (
|
|
// ErrFullQueue is the error used when pin or unpin operation channel is full.
|
|
ErrFullQueue = errors.New("pin/unpin operation queue is full. Try increasing max_pin_queue_size")
|
|
|
|
// items with this error should be recovered
|
|
errUnexpectedlyUnpinned = errors.New("the item should be pinned but it is not")
|
|
)
|
|
|
|
// Tracker uses the optracker.OperationTracker to manage
|
|
// transitioning shared ipfs-cluster state (Pins) to the local IPFS node.
|
|
type Tracker struct {
|
|
config *Config
|
|
|
|
optracker *optracker.OperationTracker
|
|
|
|
peerID peer.ID
|
|
peerName string
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
getState func(ctx context.Context) (state.ReadOnly, error)
|
|
|
|
rpcClient *rpc.Client
|
|
rpcReady chan struct{}
|
|
|
|
priorityPinCh chan *optracker.Operation
|
|
pinCh chan *optracker.Operation
|
|
unpinCh chan *optracker.Operation
|
|
|
|
shutdownMu sync.Mutex
|
|
shutdown bool
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// New creates a new StatelessPinTracker.
|
|
func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Context) (state.ReadOnly, error)) *Tracker {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
spt := &Tracker{
|
|
config: cfg,
|
|
peerID: pid,
|
|
peerName: peerName,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
getState: getState,
|
|
optracker: optracker.NewOperationTracker(ctx, pid, peerName),
|
|
rpcReady: make(chan struct{}, 1),
|
|
priorityPinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
|
|
pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
|
|
unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
|
|
}
|
|
|
|
for i := 0; i < spt.config.ConcurrentPins; i++ {
|
|
go spt.opWorker(spt.pin, spt.priorityPinCh, spt.pinCh)
|
|
}
|
|
go spt.opWorker(spt.unpin, spt.unpinCh, nil)
|
|
|
|
return spt
|
|
}
|
|
|
|
// we can get our IPFS id from our own monitor ping metrics which
|
|
// are refreshed regularly.
|
|
func (spt *Tracker) getIPFSID(ctx context.Context) api.IPFSID {
|
|
// Wait until RPC is ready
|
|
<-spt.rpcReady
|
|
|
|
var ipfsid api.IPFSID
|
|
err := spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"Cluster",
|
|
"IPFSID",
|
|
peer.ID(""), // local peer
|
|
&ipfsid,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
return ipfsid
|
|
}
|
|
|
|
// receives a pin Function (pin or unpin) and channels. Used for both pinning
|
|
// and unpinning.
|
|
func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, prioCh, normalCh chan *optracker.Operation) {
|
|
|
|
var op *optracker.Operation
|
|
|
|
for {
|
|
// Process the priority channel first.
|
|
select {
|
|
case op = <-prioCh:
|
|
goto APPLY_OP
|
|
case <-spt.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Then process things on the other channels.
|
|
// Block if there are no things to process.
|
|
select {
|
|
case op = <-prioCh:
|
|
goto APPLY_OP
|
|
case op = <-normalCh:
|
|
goto APPLY_OP
|
|
case <-spt.ctx.Done():
|
|
return
|
|
}
|
|
|
|
// apply operations that came from some channel
|
|
APPLY_OP:
|
|
if clean := applyPinF(pinF, op); clean {
|
|
spt.optracker.Clean(op.Context(), op)
|
|
}
|
|
}
|
|
}
|
|
|
|
// applyPinF returns true if the operation can be considered "DONE".
|
|
func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) bool {
|
|
if op.Canceled() {
|
|
// operation was canceled. Move on.
|
|
// This saves some time, but not 100% needed.
|
|
return false
|
|
}
|
|
op.SetPhase(optracker.PhaseInProgress)
|
|
op.IncAttempt()
|
|
err := pinF(op) // call pin/unpin
|
|
if err != nil {
|
|
if op.Canceled() {
|
|
// there was an error because
|
|
// we were canceled. Move on.
|
|
return false
|
|
}
|
|
op.SetError(err)
|
|
op.Cancel()
|
|
return false
|
|
}
|
|
op.SetPhase(optracker.PhaseDone)
|
|
op.Cancel()
|
|
return true // this tells the opWorker to clean the operation from the tracker.
|
|
}
|
|
|
|
func (spt *Tracker) pin(op *optracker.Operation) error {
|
|
ctx, span := trace.StartSpan(op.Context(), "tracker/stateless/pin")
|
|
defer span.End()
|
|
|
|
logger.Debugf("issuing pin call for %s", op.Cid())
|
|
err := spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"Pin",
|
|
op.Pin(),
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (spt *Tracker) unpin(op *optracker.Operation) error {
|
|
ctx, span := trace.StartSpan(op.Context(), "tracker/stateless/unpin")
|
|
defer span.End()
|
|
|
|
logger.Debugf("issuing unpin call for %s", op.Cid())
|
|
err := spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"Unpin",
|
|
op.Pin(),
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Enqueue puts a new operation on the queue, unless ongoing exists.
|
|
func (spt *Tracker) enqueue(ctx context.Context, c api.Pin, typ optracker.OperationType) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/enqueue")
|
|
defer span.End()
|
|
|
|
logger.Debugf("entering enqueue: pin: %+v", c)
|
|
op := spt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued)
|
|
if op == nil {
|
|
return nil // the operation exists and must be queued already.
|
|
}
|
|
|
|
var ch chan *optracker.Operation
|
|
|
|
switch typ {
|
|
case optracker.OperationPin:
|
|
isPriorityPin := time.Now().Before(c.Timestamp.Add(spt.config.PriorityPinMaxAge)) &&
|
|
op.AttemptCount() <= spt.config.PriorityPinMaxRetries
|
|
op.SetPriorityPin(isPriorityPin)
|
|
|
|
if isPriorityPin {
|
|
ch = spt.priorityPinCh
|
|
} else {
|
|
ch = spt.pinCh
|
|
}
|
|
case optracker.OperationUnpin:
|
|
ch = spt.unpinCh
|
|
}
|
|
|
|
select {
|
|
case ch <- op:
|
|
default:
|
|
err := ErrFullQueue
|
|
op.SetError(err)
|
|
op.Cancel()
|
|
logger.Error(err.Error())
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetClient makes the StatelessPinTracker ready to perform RPC requests to
|
|
// other components.
|
|
func (spt *Tracker) SetClient(c *rpc.Client) {
|
|
spt.rpcClient = c
|
|
close(spt.rpcReady)
|
|
}
|
|
|
|
// Shutdown finishes the services provided by the StatelessPinTracker
|
|
// and cancels any active context.
|
|
func (spt *Tracker) Shutdown(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Shutdown")
|
|
_ = ctx
|
|
defer span.End()
|
|
|
|
spt.shutdownMu.Lock()
|
|
defer spt.shutdownMu.Unlock()
|
|
|
|
if spt.shutdown {
|
|
logger.Debug("already shutdown")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("stopping StatelessPinTracker")
|
|
spt.cancel()
|
|
spt.wg.Wait()
|
|
spt.shutdown = true
|
|
return nil
|
|
}
|
|
|
|
// Track tells the StatelessPinTracker to start managing a Cid,
|
|
// possibly triggering Pin operations on the IPFS daemon.
|
|
func (spt *Tracker) Track(ctx context.Context, c api.Pin) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Track")
|
|
defer span.End()
|
|
|
|
logger.Debugf("tracking %s", c.Cid)
|
|
|
|
// Sharded pins are never pinned. A sharded pin cannot turn into
|
|
// something else or viceversa like it happens with Remote pins so
|
|
// we just ignore them.
|
|
if c.Type == api.MetaType {
|
|
return nil
|
|
}
|
|
|
|
// Trigger unpin whenever something remote is tracked
|
|
// Note, IPFSConn checks with pin/ls before triggering
|
|
// pin/rm.
|
|
if c.IsRemotePin(spt.peerID) {
|
|
op := spt.optracker.TrackNewOperation(ctx, c, optracker.OperationRemote, optracker.PhaseInProgress)
|
|
if op == nil {
|
|
return nil // ongoing unpin
|
|
}
|
|
err := spt.unpin(op)
|
|
op.Cancel()
|
|
if err != nil {
|
|
op.SetError(err)
|
|
return nil
|
|
}
|
|
|
|
op.SetPhase(optracker.PhaseDone)
|
|
spt.optracker.Clean(ctx, op)
|
|
return nil
|
|
}
|
|
|
|
return spt.enqueue(ctx, c, optracker.OperationPin)
|
|
}
|
|
|
|
// Untrack tells the StatelessPinTracker to stop managing a Cid.
|
|
// If the Cid is pinned locally, it will be unpinned.
|
|
func (spt *Tracker) Untrack(ctx context.Context, c api.Cid) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Untrack")
|
|
defer span.End()
|
|
|
|
logger.Debugf("untracking %s", c)
|
|
return spt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin)
|
|
}
|
|
|
|
// StatusAll returns information for all Cids pinned to the local IPFS node.
|
|
func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/StatusAll")
|
|
defer span.End()
|
|
|
|
ipfsid := spt.getIPFSID(ctx)
|
|
|
|
// Any other states are just operation-tracker states, so we just give
|
|
// those and return.
|
|
if !filter.Match(
|
|
api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned |
|
|
api.TrackerStatusSharded | api.TrackerStatusRemote) {
|
|
return spt.optracker.GetAllChannel(ctx, filter, ipfsid, out)
|
|
}
|
|
|
|
defer close(out)
|
|
|
|
// get global state - cluster pinset
|
|
st, err := spt.getState(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
|
|
var ipfsRecursivePins map[api.Cid]api.IPFSPinStatus
|
|
// Only query IPFS if we want to status for pinned items
|
|
if filter.Match(api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned) {
|
|
ipfsRecursivePins = make(map[api.Cid]api.IPFSPinStatus)
|
|
// At some point we need a full map of what we have and what
|
|
// we don't. The IPFS pinset is the smallest thing we can keep
|
|
// on memory.
|
|
ipfsPinsCh, err := spt.ipfsPins(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
for ipfsPinInfo := range ipfsPinsCh {
|
|
ipfsRecursivePins[ipfsPinInfo.Cid] = ipfsPinInfo.Type
|
|
}
|
|
}
|
|
|
|
// Prepare pinset streaming
|
|
statePins := make(chan api.Pin, pinsChannelSize)
|
|
go func() {
|
|
err = st.List(ctx, statePins)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
}()
|
|
|
|
// a shorthand for this select.
|
|
trySend := func(info api.PinInfo) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case out <- info:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// For every item in the state.
|
|
for p := range statePins {
|
|
select {
|
|
case <-ctx.Done():
|
|
default:
|
|
}
|
|
|
|
// if there is an operation, issue that and move on
|
|
info, ok := spt.optracker.GetExists(ctx, p.Cid, ipfsid)
|
|
if ok && filter.Match(info.Status) {
|
|
if !trySend(info) {
|
|
return fmt.Errorf("error issuing PinInfo: %w", ctx.Err())
|
|
}
|
|
continue // next pin
|
|
}
|
|
|
|
// Preliminary PinInfo for this Pin.
|
|
info = api.PinInfo{
|
|
Cid: p.Cid,
|
|
Name: p.Name,
|
|
Peer: spt.peerID,
|
|
Allocations: p.Allocations,
|
|
Origins: p.Origins,
|
|
Created: p.Timestamp,
|
|
Metadata: p.Metadata,
|
|
|
|
PinInfoShort: api.PinInfoShort{
|
|
PeerName: spt.peerName,
|
|
IPFS: ipfsid.ID,
|
|
IPFSAddresses: ipfsid.Addresses,
|
|
Status: api.TrackerStatusUndefined, // TBD
|
|
TS: p.Timestamp,
|
|
Error: "",
|
|
AttemptCount: 0,
|
|
PriorityPin: false,
|
|
},
|
|
}
|
|
|
|
ipfsStatus, pinnedInIpfs := ipfsRecursivePins[api.Cid(p.Cid)]
|
|
|
|
switch {
|
|
case p.Type == api.MetaType:
|
|
info.Status = api.TrackerStatusSharded
|
|
case p.IsRemotePin(spt.peerID):
|
|
info.Status = api.TrackerStatusRemote
|
|
case pinnedInIpfs:
|
|
// No need to filter. pinnedInIpfs is false
|
|
// unless the filter is Pinned |
|
|
// UnexpectedlyUnpinned. We filter at the end.
|
|
info.Status = ipfsStatus.ToTrackerStatus()
|
|
default:
|
|
// Not on an operation
|
|
// Not a meta pin
|
|
// Not a remote pin
|
|
// Not a pin on ipfs
|
|
|
|
// We understand that this is something that
|
|
// should be pinned on IPFS and it is not.
|
|
info.Status = api.TrackerStatusUnexpectedlyUnpinned
|
|
info.Error = errUnexpectedlyUnpinned.Error()
|
|
}
|
|
if !filter.Match(info.Status) {
|
|
continue
|
|
}
|
|
|
|
if !trySend(info) {
|
|
return fmt.Errorf("error issuing PinInfo: %w", ctx.Err())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Status returns information for a Cid pinned to the local IPFS node.
|
|
func (spt *Tracker) Status(ctx context.Context, c api.Cid) api.PinInfo {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Status")
|
|
defer span.End()
|
|
|
|
ipfsid := spt.getIPFSID(ctx)
|
|
|
|
// check if c has an inflight operation or errorred operation in optracker
|
|
if oppi, ok := spt.optracker.GetExists(ctx, c, ipfsid); ok {
|
|
return oppi
|
|
}
|
|
|
|
pinInfo := api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Name: "", // etc to be filled later
|
|
PinInfoShort: api.PinInfoShort{
|
|
PeerName: spt.peerName,
|
|
IPFS: ipfsid.ID,
|
|
IPFSAddresses: ipfsid.Addresses,
|
|
TS: time.Now(),
|
|
AttemptCount: 0,
|
|
PriorityPin: false,
|
|
},
|
|
}
|
|
|
|
// check global state to see if cluster should even be caring about
|
|
// the provided cid
|
|
var gpin api.Pin
|
|
st, err := spt.getState(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
addError(&pinInfo, err)
|
|
return pinInfo
|
|
}
|
|
|
|
gpin, err = st.Get(ctx, c)
|
|
if err == state.ErrNotFound {
|
|
pinInfo.Status = api.TrackerStatusUnpinned
|
|
return pinInfo
|
|
}
|
|
if err != nil {
|
|
logger.Error(err)
|
|
addError(&pinInfo, err)
|
|
return pinInfo
|
|
}
|
|
// The pin IS in the state.
|
|
pinInfo.Name = gpin.Name
|
|
pinInfo.TS = gpin.Timestamp
|
|
pinInfo.Allocations = gpin.Allocations
|
|
pinInfo.Origins = gpin.Origins
|
|
pinInfo.Created = gpin.Timestamp
|
|
pinInfo.Metadata = gpin.Metadata
|
|
|
|
// check if pin is a meta pin
|
|
if gpin.Type == api.MetaType {
|
|
pinInfo.Status = api.TrackerStatusSharded
|
|
return pinInfo
|
|
}
|
|
|
|
// check if pin is a remote pin
|
|
if gpin.IsRemotePin(spt.peerID) {
|
|
pinInfo.Status = api.TrackerStatusRemote
|
|
return pinInfo
|
|
}
|
|
|
|
// else attempt to get status from ipfs node
|
|
var ips api.IPFSPinStatus
|
|
err = spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"PinLsCid",
|
|
gpin,
|
|
&ips,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
addError(&pinInfo, err)
|
|
return pinInfo
|
|
}
|
|
|
|
ipfsStatus := ips.ToTrackerStatus()
|
|
switch ipfsStatus {
|
|
case api.TrackerStatusUnpinned:
|
|
// The item is in the state but not in IPFS:
|
|
// PinError. Should be pinned.
|
|
pinInfo.Status = api.TrackerStatusUnexpectedlyUnpinned
|
|
pinInfo.Error = errUnexpectedlyUnpinned.Error()
|
|
default:
|
|
pinInfo.Status = ipfsStatus
|
|
}
|
|
return pinInfo
|
|
}
|
|
|
|
// RecoverAll attempts to recover all items tracked by this peer. It returns
|
|
// any errors or when it is done re-tracking.
|
|
func (spt *Tracker) RecoverAll(ctx context.Context, out chan<- api.PinInfo) error {
|
|
defer close(out)
|
|
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll")
|
|
defer span.End()
|
|
|
|
statusesCh := make(chan api.PinInfo, 1024)
|
|
go func() {
|
|
err := spt.StatusAll(ctx, api.TrackerStatusUndefined, statusesCh)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
}()
|
|
|
|
for st := range statusesCh {
|
|
// Break out if we shutdown. We might be going through
|
|
// a very long list of statuses.
|
|
select {
|
|
case <-spt.ctx.Done():
|
|
err := fmt.Errorf("RecoverAll aborted: %w", ctx.Err())
|
|
logger.Error(err)
|
|
return err
|
|
default:
|
|
p, err := spt.recoverWithPinInfo(ctx, st)
|
|
if err != nil {
|
|
err = fmt.Errorf("RecoverAll error: %w", err)
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
if p.Defined() {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = fmt.Errorf("RecoverAll aborted: %w", ctx.Err())
|
|
logger.Error(err)
|
|
return err
|
|
case out <- p:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Recover will trigger pinning or unpinning for items in
|
|
// PinError or UnpinError states.
|
|
func (spt *Tracker) Recover(ctx context.Context, c api.Cid) (api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover")
|
|
defer span.End()
|
|
|
|
pi := spt.Status(ctx, c)
|
|
|
|
recPi, err := spt.recoverWithPinInfo(ctx, pi)
|
|
// if it was not enqueued, no updated pin-info is returned.
|
|
// Use the one we had.
|
|
if !recPi.Defined() {
|
|
recPi = pi
|
|
}
|
|
return recPi, err
|
|
}
|
|
|
|
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi api.PinInfo) (api.PinInfo, error) {
|
|
st, err := spt.getState(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return api.PinInfo{}, err
|
|
}
|
|
|
|
var pin api.Pin
|
|
|
|
switch pi.Status {
|
|
case api.TrackerStatusPinError, api.TrackerStatusUnexpectedlyUnpinned:
|
|
pin, err = st.Get(ctx, pi.Cid)
|
|
if err != nil { // ignore error - in case pin was removed while recovering
|
|
logger.Warn(err)
|
|
return spt.Status(ctx, pi.Cid), nil
|
|
}
|
|
logger.Infof("Restarting pin operation for %s", pi.Cid)
|
|
err = spt.enqueue(ctx, pin, optracker.OperationPin)
|
|
case api.TrackerStatusUnpinError:
|
|
logger.Infof("Restarting unpin operation for %s", pi.Cid)
|
|
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationUnpin)
|
|
default:
|
|
// We do not return any information when recover was a no-op
|
|
return api.PinInfo{}, nil
|
|
}
|
|
if err != nil {
|
|
return spt.Status(ctx, pi.Cid), err
|
|
}
|
|
|
|
// This status call should be cheap as it would normally come from the
|
|
// optracker and does not need to hit ipfs.
|
|
return spt.Status(ctx, pi.Cid), nil
|
|
}
|
|
|
|
func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/ipfsStatusAll")
|
|
defer span.End()
|
|
|
|
in := make(chan []string, 1) // type filter.
|
|
in <- []string{"recursive", "direct"}
|
|
close(in)
|
|
out := make(chan api.IPFSPinInfo, pinsChannelSize)
|
|
|
|
go func() {
|
|
err := spt.rpcClient.Stream(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"PinLs",
|
|
in,
|
|
out,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
}()
|
|
return out, nil
|
|
}
|
|
|
|
// PinQueueSize returns the current size of the pinning queue.
|
|
func (spt *Tracker) PinQueueSize(ctx context.Context) (int64, error) {
|
|
return spt.optracker.PinQueueSize(), nil
|
|
}
|
|
|
|
// func (spt *Tracker) getErrorsAll(ctx context.Context) []api.PinInfo {
|
|
// return spt.optracker.Filter(ctx, optracker.PhaseError)
|
|
// }
|
|
|
|
// OpContext exports the internal optracker's OpContext method.
|
|
// For testing purposes only.
|
|
func (spt *Tracker) OpContext(ctx context.Context, c api.Cid) context.Context {
|
|
return spt.optracker.OpContext(ctx, c)
|
|
}
|
|
|
|
func addError(pinInfo *api.PinInfo, err error) {
|
|
pinInfo.Error = err.Error()
|
|
pinInfo.Status = api.TrackerStatusClusterError
|
|
}
|