62b7054d31
Currently logs every pin we call recover with. We call recover regularly. So it will print all pins.
646 lines
16 KiB
Go
646 lines
16 KiB
Go
// Package stateless implements a PinTracker component for IPFS Cluster, which
|
|
// aims to reduce the memory footprint when handling really large cluster
|
|
// states.
|
|
package stateless
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/pintracker/optracker"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
var logger = logging.Logger("pintracker")
|
|
|
|
// Tracker uses the optracker.OperationTracker to manage
|
|
// transitioning shared ipfs-cluster state (Pins) to the local IPFS node.
|
|
type Tracker struct {
|
|
config *Config
|
|
|
|
optracker *optracker.OperationTracker
|
|
|
|
peerID peer.ID
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
rpcClient *rpc.Client
|
|
rpcReady chan struct{}
|
|
|
|
pinCh chan *optracker.Operation
|
|
unpinCh chan *optracker.Operation
|
|
|
|
shutdownMu sync.Mutex
|
|
shutdown bool
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// New creates a new StatelessPinTracker.
|
|
func New(cfg *Config, pid peer.ID, peerName string) *Tracker {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
spt := &Tracker{
|
|
config: cfg,
|
|
peerID: pid,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
optracker: optracker.NewOperationTracker(ctx, pid, peerName),
|
|
rpcReady: make(chan struct{}, 1),
|
|
pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
|
|
unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
|
|
}
|
|
|
|
for i := 0; i < spt.config.ConcurrentPins; i++ {
|
|
go spt.opWorker(spt.pin, spt.pinCh)
|
|
}
|
|
go spt.opWorker(spt.unpin, spt.unpinCh)
|
|
return spt
|
|
}
|
|
|
|
// receives a pin Function (pin or unpin) and a channel.
|
|
// Used for both pinning and unpinning
|
|
func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) {
|
|
logger.Debug("entering opworker")
|
|
ticker := time.NewTicker(10 * time.Second) //TODO(ajl): make config var
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// every tick, clear out all Done operations
|
|
spt.optracker.CleanAllDone(spt.ctx)
|
|
case op := <-opChan:
|
|
if cont := applyPinF(pinF, op); cont {
|
|
continue
|
|
}
|
|
|
|
spt.optracker.Clean(op.Context(), op)
|
|
case <-spt.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// applyPinF returns true if caller should call `continue` inside calling loop.
|
|
func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) bool {
|
|
if op.Cancelled() {
|
|
// operation was cancelled. Move on.
|
|
// This saves some time, but not 100% needed.
|
|
return true
|
|
}
|
|
op.SetPhase(optracker.PhaseInProgress)
|
|
err := pinF(op) // call pin/unpin
|
|
if err != nil {
|
|
if op.Cancelled() {
|
|
// there was an error because
|
|
// we were cancelled. Move on.
|
|
return true
|
|
}
|
|
op.SetError(err)
|
|
op.Cancel()
|
|
return true
|
|
}
|
|
op.SetPhase(optracker.PhaseDone)
|
|
op.Cancel()
|
|
return false
|
|
}
|
|
|
|
func (spt *Tracker) pin(op *optracker.Operation) error {
|
|
ctx, span := trace.StartSpan(op.Context(), "tracker/stateless/pin")
|
|
defer span.End()
|
|
|
|
logger.Debugf("issuing pin call for %s", op.Cid())
|
|
err := spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"Pin",
|
|
op.Pin(),
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (spt *Tracker) unpin(op *optracker.Operation) error {
|
|
ctx, span := trace.StartSpan(op.Context(), "tracker/stateless/unpin")
|
|
defer span.End()
|
|
|
|
logger.Debugf("issuing unpin call for %s", op.Cid())
|
|
err := spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"Unpin",
|
|
op.Pin(),
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Enqueue puts a new operation on the queue, unless ongoing exists.
|
|
func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.OperationType) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/enqueue")
|
|
defer span.End()
|
|
|
|
logger.Debugf("entering enqueue: pin: %+v", c)
|
|
op := spt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued)
|
|
if op == nil {
|
|
return nil // ongoing pin operation.
|
|
}
|
|
|
|
var ch chan *optracker.Operation
|
|
|
|
switch typ {
|
|
case optracker.OperationPin:
|
|
ch = spt.pinCh
|
|
case optracker.OperationUnpin:
|
|
ch = spt.unpinCh
|
|
default:
|
|
return errors.New("operation doesn't have a associated channel")
|
|
}
|
|
|
|
select {
|
|
case ch <- op:
|
|
default:
|
|
err := errors.New("queue is full")
|
|
op.SetError(err)
|
|
op.Cancel()
|
|
logger.Error(err.Error())
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetClient makes the StatelessPinTracker ready to perform RPC requests to
|
|
// other components.
|
|
func (spt *Tracker) SetClient(c *rpc.Client) {
|
|
spt.rpcClient = c
|
|
spt.rpcReady <- struct{}{}
|
|
}
|
|
|
|
// Shutdown finishes the services provided by the StatelessPinTracker
|
|
// and cancels any active context.
|
|
func (spt *Tracker) Shutdown(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Shutdown")
|
|
_ = ctx
|
|
defer span.End()
|
|
|
|
spt.shutdownMu.Lock()
|
|
defer spt.shutdownMu.Unlock()
|
|
|
|
if spt.shutdown {
|
|
logger.Debug("already shutdown")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("stopping StatelessPinTracker")
|
|
spt.cancel()
|
|
close(spt.rpcReady)
|
|
spt.wg.Wait()
|
|
spt.shutdown = true
|
|
return nil
|
|
}
|
|
|
|
// Track tells the StatelessPinTracker to start managing a Cid,
|
|
// possibly triggering Pin operations on the IPFS daemon.
|
|
func (spt *Tracker) Track(ctx context.Context, c *api.Pin) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Track")
|
|
defer span.End()
|
|
|
|
logger.Debugf("tracking %s", c.Cid)
|
|
|
|
// Sharded pins are never pinned. A sharded pin cannot turn into
|
|
// something else or viceversa like it happens with Remote pins so
|
|
// we just track them.
|
|
if c.Type == api.MetaType {
|
|
spt.optracker.TrackNewOperation(ctx, c, optracker.OperationShard, optracker.PhaseDone)
|
|
return nil
|
|
}
|
|
|
|
// Trigger unpin whenever something remote is tracked
|
|
// Note, IPFSConn checks with pin/ls before triggering
|
|
// pin/rm.
|
|
if c.IsRemotePin(spt.peerID) {
|
|
op := spt.optracker.TrackNewOperation(ctx, c, optracker.OperationRemote, optracker.PhaseInProgress)
|
|
if op == nil {
|
|
return nil // ongoing unpin
|
|
}
|
|
err := spt.unpin(op)
|
|
op.Cancel()
|
|
if err != nil {
|
|
op.SetError(err)
|
|
} else {
|
|
op.SetPhase(optracker.PhaseDone)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return spt.enqueue(ctx, c, optracker.OperationPin)
|
|
}
|
|
|
|
// Untrack tells the StatelessPinTracker to stop managing a Cid.
|
|
// If the Cid is pinned locally, it will be unpinned.
|
|
func (spt *Tracker) Untrack(ctx context.Context, c cid.Cid) error {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Untrack")
|
|
defer span.End()
|
|
|
|
logger.Debugf("untracking %s", c)
|
|
return spt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin)
|
|
}
|
|
|
|
// StatusAll returns information for all Cids pinned to the local IPFS node.
|
|
func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/StatusAll")
|
|
defer span.End()
|
|
|
|
pininfos, err := spt.localStatus(ctx, true)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return nil
|
|
}
|
|
|
|
// get all inflight operations from optracker and
|
|
// put them into the map, deduplicating any already 'pinned' items with
|
|
// their inflight operation
|
|
for _, infop := range spt.optracker.GetAll(ctx) {
|
|
pininfos[infop.Cid.String()] = infop
|
|
}
|
|
|
|
var pis []*api.PinInfo
|
|
for _, pi := range pininfos {
|
|
pis = append(pis, pi)
|
|
}
|
|
return pis
|
|
}
|
|
|
|
// Status returns information for a Cid pinned to the local IPFS node.
|
|
func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Status")
|
|
defer span.End()
|
|
|
|
// check if c has an inflight operation or errorred operation in optracker
|
|
if oppi, ok := spt.optracker.GetExists(ctx, c); ok {
|
|
// if it does return the status of the operation
|
|
return oppi
|
|
}
|
|
|
|
// check global state to see if cluster should even be caring about
|
|
// the provided cid
|
|
var gpin api.Pin
|
|
err := spt.rpcClient.Call(
|
|
"",
|
|
"Cluster",
|
|
"PinGet",
|
|
c,
|
|
&gpin,
|
|
)
|
|
if err != nil {
|
|
if rpc.IsRPCError(err) {
|
|
logger.Error(err)
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusClusterError,
|
|
Error: err.Error(),
|
|
TS: time.Now(),
|
|
}
|
|
}
|
|
// not part of global state. we should not care about
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusUnpinned,
|
|
TS: time.Now(),
|
|
}
|
|
}
|
|
|
|
// check if pin is a meta pin
|
|
if gpin.Type == api.MetaType {
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusSharded,
|
|
TS: time.Now(),
|
|
}
|
|
}
|
|
|
|
// check if pin is a remote pin
|
|
if gpin.IsRemotePin(spt.peerID) {
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusRemote,
|
|
TS: time.Now(),
|
|
}
|
|
}
|
|
|
|
// else attempt to get status from ipfs node
|
|
var ips api.IPFSPinStatus
|
|
err = spt.rpcClient.Call(
|
|
"",
|
|
"IPFSConnector",
|
|
"PinLsCid",
|
|
c,
|
|
&ips,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return nil
|
|
}
|
|
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: ips.ToTrackerStatus(),
|
|
TS: time.Now(),
|
|
}
|
|
}
|
|
|
|
// SyncAll verifies that the statuses of all tracked Cids (from the shared state)
|
|
// match the one reported by the IPFS daemon. If not, they will be transitioned
|
|
// to PinError or UnpinError.
|
|
//
|
|
// SyncAll returns the list of local status for all tracked Cids which
|
|
// were updated or have errors. Cids in error states can be recovered
|
|
// with Recover().
|
|
// An error is returned if we are unable to contact the IPFS daemon.
|
|
func (spt *Tracker) SyncAll(ctx context.Context) ([]*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/SyncAll")
|
|
defer span.End()
|
|
|
|
// get ipfs status for all
|
|
localpis, err := spt.localStatus(ctx, false)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return nil, err
|
|
}
|
|
|
|
for _, p := range spt.optracker.Filter(ctx, optracker.OperationPin, optracker.PhaseError) {
|
|
if _, ok := localpis[p.Cid.String()]; ok {
|
|
spt.optracker.CleanError(ctx, p.Cid)
|
|
}
|
|
}
|
|
|
|
for _, p := range spt.optracker.Filter(ctx, optracker.OperationUnpin, optracker.PhaseError) {
|
|
if _, ok := localpis[p.Cid.String()]; !ok {
|
|
spt.optracker.CleanError(ctx, p.Cid)
|
|
}
|
|
}
|
|
|
|
return spt.getErrorsAll(ctx), nil
|
|
}
|
|
|
|
// Sync returns the updated local status for the given Cid.
|
|
func (spt *Tracker) Sync(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Sync")
|
|
defer span.End()
|
|
|
|
oppi, ok := spt.optracker.GetExists(ctx, c)
|
|
if !ok {
|
|
return spt.Status(ctx, c), nil
|
|
}
|
|
|
|
if oppi.Status == api.TrackerStatusUnpinError {
|
|
// check global state to see if cluster should even be caring about
|
|
// the provided cid
|
|
var gpin api.Pin
|
|
err := spt.rpcClient.Call(
|
|
"",
|
|
"Cluster",
|
|
"PinGet",
|
|
c,
|
|
&gpin,
|
|
)
|
|
if err != nil {
|
|
if rpc.IsRPCError(err) {
|
|
logger.Error(err)
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusClusterError,
|
|
Error: err.Error(),
|
|
TS: time.Now(),
|
|
}, err
|
|
}
|
|
// it isn't in the global state
|
|
spt.optracker.CleanError(ctx, c)
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusUnpinned,
|
|
TS: time.Now(),
|
|
}, nil
|
|
}
|
|
// check if pin is a remote pin
|
|
if gpin.IsRemotePin(spt.peerID) {
|
|
spt.optracker.CleanError(ctx, c)
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusRemote,
|
|
TS: time.Now(),
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
if oppi.Status == api.TrackerStatusPinError {
|
|
// else attempt to get status from ipfs node
|
|
var ips api.IPFSPinStatus
|
|
err := spt.rpcClient.Call(
|
|
"",
|
|
"IPFSConnector",
|
|
"PinLsCid",
|
|
c,
|
|
&ips,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusPinError,
|
|
TS: time.Now(),
|
|
Error: err.Error(),
|
|
}, err
|
|
}
|
|
if ips.ToTrackerStatus() == api.TrackerStatusPinned {
|
|
spt.optracker.CleanError(ctx, c)
|
|
pi := &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: ips.ToTrackerStatus(),
|
|
TS: time.Now(),
|
|
}
|
|
return pi, nil
|
|
}
|
|
}
|
|
|
|
return spt.optracker.Get(ctx, c), nil
|
|
}
|
|
|
|
// RecoverAll attempts to recover all items tracked by this peer.
|
|
func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll")
|
|
defer span.End()
|
|
|
|
statuses := spt.StatusAll(ctx)
|
|
resp := make([]*api.PinInfo, 0)
|
|
for _, st := range statuses {
|
|
r, err := spt.Recover(ctx, st.Cid)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
resp = append(resp, r)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// Recover will re-track or re-untrack a Cid in error state,
|
|
// possibly retriggering an IPFS pinning operation and returning
|
|
// only when it is done.
|
|
func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover")
|
|
defer span.End()
|
|
|
|
pInfo, ok := spt.optracker.GetExists(ctx, c)
|
|
if !ok {
|
|
return spt.Status(ctx, c), nil
|
|
}
|
|
|
|
var err error
|
|
switch pInfo.Status {
|
|
case api.TrackerStatusPinError:
|
|
logger.Infof("Restarting pin operation for %s", c)
|
|
err = spt.enqueue(ctx, api.PinCid(c), optracker.OperationPin)
|
|
case api.TrackerStatusUnpinError:
|
|
logger.Infof("Restarting unpin operation for %s", c)
|
|
err = spt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin)
|
|
}
|
|
if err != nil {
|
|
return spt.Status(ctx, c), err
|
|
}
|
|
|
|
return spt.Status(ctx, c), nil
|
|
}
|
|
|
|
func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/ipfsStatusAll")
|
|
defer span.End()
|
|
|
|
var ipsMap map[string]api.IPFSPinStatus
|
|
err := spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"IPFSConnector",
|
|
"PinLs",
|
|
"recursive",
|
|
&ipsMap,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return nil, err
|
|
}
|
|
pins := make(map[string]*api.PinInfo, 0)
|
|
for cidstr, ips := range ipsMap {
|
|
c, err := cid.Decode(cidstr)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
continue
|
|
}
|
|
p := &api.PinInfo{
|
|
Cid: c,
|
|
Peer: spt.peerID,
|
|
Status: ips.ToTrackerStatus(),
|
|
TS: time.Now(),
|
|
}
|
|
pins[cidstr] = p
|
|
}
|
|
return pins, nil
|
|
}
|
|
|
|
// localStatus returns a joint set of consensusState and ipfsStatus
|
|
// marking pins which should be meta or remote and leaving any ipfs pins that
|
|
// aren't in the consensusState out.
|
|
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]*api.PinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus")
|
|
defer span.End()
|
|
|
|
pininfos := make(map[string]*api.PinInfo)
|
|
|
|
// get shared state
|
|
var statePins []*api.Pin
|
|
err := spt.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"Cluster",
|
|
"Pins",
|
|
struct{}{},
|
|
&statePins,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return nil, err
|
|
}
|
|
|
|
// get statuses from ipfs node first
|
|
localpis, err := spt.ipfsStatusAll(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return nil, err
|
|
}
|
|
|
|
for _, p := range statePins {
|
|
pCid := p.Cid.String()
|
|
if p.Type == api.MetaType && incExtra {
|
|
// add pin to pininfos with sharded status
|
|
pininfos[pCid] = &api.PinInfo{
|
|
Cid: p.Cid,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusSharded,
|
|
TS: time.Now(),
|
|
}
|
|
continue
|
|
}
|
|
|
|
if p.IsRemotePin(spt.peerID) && incExtra {
|
|
// add pin to pininfos with a status of remote
|
|
pininfos[pCid] = &api.PinInfo{
|
|
Cid: p.Cid,
|
|
Peer: spt.peerID,
|
|
Status: api.TrackerStatusRemote,
|
|
TS: time.Now(),
|
|
}
|
|
continue
|
|
}
|
|
// lookup p in localpis
|
|
if lp, ok := localpis[pCid]; ok {
|
|
pininfos[pCid] = lp
|
|
}
|
|
}
|
|
return pininfos, nil
|
|
}
|
|
|
|
func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo {
|
|
return spt.optracker.Filter(ctx, optracker.PhaseError)
|
|
}
|
|
|
|
// OpContext exports the internal optracker's OpContext method.
|
|
// For testing purposes only.
|
|
func (spt *Tracker) OpContext(ctx context.Context, c cid.Cid) context.Context {
|
|
return spt.optracker.OpContext(ctx, c)
|
|
}
|