ipfs-cluster/pintracker/stateless/stateless.go
Hector Sanjuan 11124ee224 Fix: repinning does not re-allocate as needed
Long story: Since #1768 there has been a recurring repinning test failure with
Raft consensus.

Per the test, if a pin is allocated to a peer that has been shutdown,
submitting the pin again should re-allocate it to a peer that is still
running.

Investigation on why this test fails and why it fails only in Raft lead to
realizing that this and other similar tests, were passing by chance. The
needed re-allocations were made not by the new submission of the pin, but by
the automatic-repinning feature. The actual resubmitted pin was carrying the
same allocations (one of them being the peer that was down), but it was
silently failing because the RedirectToLeader() code path was using
cc.ctx and hitting the peer that had been shutdown, which caused it to error.

Fixing the context propagation, meant that we would re-overwrite the pin with
the old allocations, thus the actual behaviour did not pass the test.

So, on one side, this fix an number of tests that had not disabled automatic
repinning and was probably getting in the way of things. On the other side,
this removes a condition that prevents re-allocation of pins if they exists
and options have not changed.

I don't fully understand why this was there though, since the Allocate() code
does return the old allocations anyways when they are enough, so it should not
re-allocate randomly. I suspect this was preventing some misbehaviour in the
Allocate() code from the time before it was improved with multiple allocators
etc.
2022-09-27 12:31:24 +02:00

698 lines
17 KiB
Go

// Package stateless implements a PinTracker component for IPFS Cluster, which
// aims to reduce the memory footprint when handling really large cluster
// states.
package stateless
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs-cluster/ipfs-cluster/pintracker/optracker"
"github.com/ipfs-cluster/ipfs-cluster/state"
logging "github.com/ipfs/go-log/v2"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opencensus.io/trace"
)
var logger = logging.Logger("pintracker")
const pinsChannelSize = 1024
var (
// ErrFullQueue is the error used when pin or unpin operation channel is full.
ErrFullQueue = errors.New("pin/unpin operation queue is full. Try increasing max_pin_queue_size")
// items with this error should be recovered
errUnexpectedlyUnpinned = errors.New("the item should be pinned but it is not")
)
// Tracker uses the optracker.OperationTracker to manage
// transitioning shared ipfs-cluster state (Pins) to the local IPFS node.
type Tracker struct {
config *Config
optracker *optracker.OperationTracker
peerID peer.ID
peerName string
ctx context.Context
cancel func()
getState func(ctx context.Context) (state.ReadOnly, error)
rpcClient *rpc.Client
rpcReady chan struct{}
priorityPinCh chan *optracker.Operation
pinCh chan *optracker.Operation
unpinCh chan *optracker.Operation
shutdownMu sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// New creates a new StatelessPinTracker.
func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Context) (state.ReadOnly, error)) *Tracker {
ctx, cancel := context.WithCancel(context.Background())
spt := &Tracker{
config: cfg,
peerID: pid,
peerName: peerName,
ctx: ctx,
cancel: cancel,
getState: getState,
optracker: optracker.NewOperationTracker(ctx, pid, peerName),
rpcReady: make(chan struct{}, 1),
priorityPinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
pinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
unpinCh: make(chan *optracker.Operation, cfg.MaxPinQueueSize),
}
for i := 0; i < spt.config.ConcurrentPins; i++ {
go spt.opWorker(spt.pin, spt.priorityPinCh, spt.pinCh)
}
go spt.opWorker(spt.unpin, spt.unpinCh, nil)
return spt
}
// we can get our IPFS id from our own monitor ping metrics which
// are refreshed regularly.
func (spt *Tracker) getIPFSID(ctx context.Context) api.IPFSID {
// Wait until RPC is ready
<-spt.rpcReady
var ipfsid api.IPFSID
err := spt.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSID",
peer.ID(""), // local peer
&ipfsid,
)
if err != nil {
logger.Error(err)
}
return ipfsid
}
// receives a pin Function (pin or unpin) and channels. Used for both pinning
// and unpinning.
func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, prioCh, normalCh chan *optracker.Operation) {
var op *optracker.Operation
for {
// Process the priority channel first.
select {
case op = <-prioCh:
goto APPLY_OP
case <-spt.ctx.Done():
return
default:
}
// Then process things on the other channels.
// Block if there are no things to process.
select {
case op = <-prioCh:
goto APPLY_OP
case op = <-normalCh:
goto APPLY_OP
case <-spt.ctx.Done():
return
}
// apply operations that came from some channel
APPLY_OP:
if clean := applyPinF(pinF, op); clean {
spt.optracker.Clean(op.Context(), op)
}
}
}
// applyPinF returns true if the operation can be considered "DONE".
func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) bool {
if op.Canceled() {
// operation was canceled. Move on.
// This saves some time, but not 100% needed.
return false
}
op.SetPhase(optracker.PhaseInProgress)
op.IncAttempt()
err := pinF(op) // call pin/unpin
if err != nil {
if op.Canceled() {
// there was an error because
// we were canceled. Move on.
return false
}
op.SetError(err)
op.Cancel()
return false
}
op.SetPhase(optracker.PhaseDone)
op.Cancel()
return true // this tells the opWorker to clean the operation from the tracker.
}
func (spt *Tracker) pin(op *optracker.Operation) error {
ctx, span := trace.StartSpan(op.Context(), "tracker/stateless/pin")
defer span.End()
logger.Debugf("issuing pin call for %s", op.Cid())
err := spt.rpcClient.CallContext(
ctx,
"",
"IPFSConnector",
"Pin",
op.Pin(),
&struct{}{},
)
if err != nil {
return err
}
return nil
}
func (spt *Tracker) unpin(op *optracker.Operation) error {
ctx, span := trace.StartSpan(op.Context(), "tracker/stateless/unpin")
defer span.End()
logger.Debugf("issuing unpin call for %s", op.Cid())
err := spt.rpcClient.CallContext(
ctx,
"",
"IPFSConnector",
"Unpin",
op.Pin(),
&struct{}{},
)
if err != nil {
return err
}
return nil
}
// Enqueue puts a new operation on the queue, unless ongoing exists.
func (spt *Tracker) enqueue(ctx context.Context, c api.Pin, typ optracker.OperationType) error {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/enqueue")
defer span.End()
logger.Debugf("entering enqueue: pin: %+v", c)
op := spt.optracker.TrackNewOperation(ctx, c, typ, optracker.PhaseQueued)
if op == nil {
return nil // the operation exists and must be queued already.
}
var ch chan *optracker.Operation
switch typ {
case optracker.OperationPin:
isPriorityPin := time.Now().Before(c.Timestamp.Add(spt.config.PriorityPinMaxAge)) &&
op.AttemptCount() <= spt.config.PriorityPinMaxRetries
op.SetPriorityPin(isPriorityPin)
if isPriorityPin {
ch = spt.priorityPinCh
} else {
ch = spt.pinCh
}
case optracker.OperationUnpin:
ch = spt.unpinCh
}
select {
case ch <- op:
default:
err := ErrFullQueue
op.SetError(err)
op.Cancel()
logger.Error(err.Error())
return err
}
return nil
}
// SetClient makes the StatelessPinTracker ready to perform RPC requests to
// other components.
func (spt *Tracker) SetClient(c *rpc.Client) {
spt.rpcClient = c
close(spt.rpcReady)
}
// Shutdown finishes the services provided by the StatelessPinTracker
// and cancels any active context.
func (spt *Tracker) Shutdown(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Shutdown")
_ = ctx
defer span.End()
spt.shutdownMu.Lock()
defer spt.shutdownMu.Unlock()
if spt.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("stopping StatelessPinTracker")
spt.cancel()
spt.wg.Wait()
spt.shutdown = true
return nil
}
// Track tells the StatelessPinTracker to start managing a Cid,
// possibly triggering Pin operations on the IPFS daemon.
func (spt *Tracker) Track(ctx context.Context, c api.Pin) error {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Track")
defer span.End()
logger.Debugf("tracking %s", c.Cid)
// Sharded pins are never pinned. A sharded pin cannot turn into
// something else or viceversa like it happens with Remote pins so
// we just ignore them.
if c.Type == api.MetaType {
return nil
}
// Trigger unpin whenever something remote is tracked
// Note, IPFSConn checks with pin/ls before triggering
// pin/rm.
if c.IsRemotePin(spt.peerID) {
op := spt.optracker.TrackNewOperation(ctx, c, optracker.OperationRemote, optracker.PhaseInProgress)
if op == nil {
return nil // ongoing unpin
}
err := spt.unpin(op)
op.Cancel()
if err != nil {
op.SetError(err)
return nil
}
op.SetPhase(optracker.PhaseDone)
spt.optracker.Clean(ctx, op)
return nil
}
return spt.enqueue(ctx, c, optracker.OperationPin)
}
// Untrack tells the StatelessPinTracker to stop managing a Cid.
// If the Cid is pinned locally, it will be unpinned.
func (spt *Tracker) Untrack(ctx context.Context, c api.Cid) error {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Untrack")
defer span.End()
logger.Debugf("untracking %s", c)
return spt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin)
}
// StatusAll returns information for all Cids pinned to the local IPFS node.
func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/StatusAll")
defer span.End()
ipfsid := spt.getIPFSID(ctx)
// Any other states are just operation-tracker states, so we just give
// those and return.
if !filter.Match(
api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned |
api.TrackerStatusSharded | api.TrackerStatusRemote) {
return spt.optracker.GetAllChannel(ctx, filter, ipfsid, out)
}
defer close(out)
// get global state - cluster pinset
st, err := spt.getState(ctx)
if err != nil {
logger.Error(err)
return err
}
var ipfsRecursivePins map[api.Cid]api.IPFSPinStatus
// Only query IPFS if we want to status for pinned items
if filter.Match(api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned) {
ipfsRecursivePins = make(map[api.Cid]api.IPFSPinStatus)
// At some point we need a full map of what we have and what
// we don't. The IPFS pinset is the smallest thing we can keep
// on memory.
ipfsPinsCh, errCh := spt.ipfsPins(ctx)
for ipfsPinInfo := range ipfsPinsCh {
ipfsRecursivePins[ipfsPinInfo.Cid] = ipfsPinInfo.Type
}
// If there was an error listing recursive pins then abort.
err := <-errCh
if err != nil {
err := fmt.Errorf("could not get pinset from IPFS: %w", err)
logger.Error(err)
return err
}
}
// Prepare pinset streaming
statePins := make(chan api.Pin, pinsChannelSize)
go func() {
err = st.List(ctx, statePins)
if err != nil {
logger.Error(err)
}
}()
// a shorthand for this select.
trySend := func(info api.PinInfo) bool {
select {
case <-ctx.Done():
return false
case <-spt.ctx.Done():
return false
case out <- info:
return true
}
}
// For every item in the state.
for p := range statePins {
select {
case <-ctx.Done():
case <-spt.ctx.Done():
default:
}
// if there is an operation, issue that and move on
info, ok := spt.optracker.GetExists(ctx, p.Cid, ipfsid)
if ok && filter.Match(info.Status) {
if !trySend(info) {
return fmt.Errorf("error issuing PinInfo: %w", ctx.Err())
}
continue // next pin
}
// Preliminary PinInfo for this Pin.
info = api.PinInfo{
Cid: p.Cid,
Name: p.Name,
Peer: spt.peerID,
Allocations: p.Allocations,
Origins: p.Origins,
Created: p.Timestamp,
Metadata: p.Metadata,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: ipfsid.ID,
IPFSAddresses: ipfsid.Addresses,
Status: api.TrackerStatusUndefined, // TBD
TS: p.Timestamp,
Error: "",
AttemptCount: 0,
PriorityPin: false,
},
}
ipfsStatus, pinnedInIpfs := ipfsRecursivePins[api.Cid(p.Cid)]
switch {
case p.Type == api.MetaType:
info.Status = api.TrackerStatusSharded
case p.IsRemotePin(spt.peerID):
info.Status = api.TrackerStatusRemote
case pinnedInIpfs:
// No need to filter. pinnedInIpfs is false
// unless the filter is Pinned |
// UnexpectedlyUnpinned. We filter at the end.
info.Status = ipfsStatus.ToTrackerStatus()
default:
// Not on an operation
// Not a meta pin
// Not a remote pin
// Not a pin on ipfs
// We understand that this is something that
// should be pinned on IPFS and it is not.
info.Status = api.TrackerStatusUnexpectedlyUnpinned
info.Error = errUnexpectedlyUnpinned.Error()
}
if !filter.Match(info.Status) {
continue
}
if !trySend(info) {
return fmt.Errorf("error issuing PinInfo: %w", ctx.Err())
}
}
return nil
}
// Status returns information for a Cid pinned to the local IPFS node.
func (spt *Tracker) Status(ctx context.Context, c api.Cid) api.PinInfo {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Status")
defer span.End()
ipfsid := spt.getIPFSID(ctx)
// check if c has an inflight operation or errorred operation in optracker
if oppi, ok := spt.optracker.GetExists(ctx, c, ipfsid); ok {
return oppi
}
pinInfo := api.PinInfo{
Cid: c,
Peer: spt.peerID,
Name: "", // etc to be filled later
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: ipfsid.ID,
IPFSAddresses: ipfsid.Addresses,
TS: time.Now(),
AttemptCount: 0,
PriorityPin: false,
},
}
// check global state to see if cluster should even be caring about
// the provided cid
var gpin api.Pin
st, err := spt.getState(ctx)
if err != nil {
logger.Error(err)
addError(&pinInfo, err)
return pinInfo
}
gpin, err = st.Get(ctx, c)
if err == state.ErrNotFound {
pinInfo.Status = api.TrackerStatusUnpinned
return pinInfo
}
if err != nil {
logger.Error(err)
addError(&pinInfo, err)
return pinInfo
}
// The pin IS in the state.
pinInfo.Name = gpin.Name
pinInfo.TS = gpin.Timestamp
pinInfo.Allocations = gpin.Allocations
pinInfo.Origins = gpin.Origins
pinInfo.Created = gpin.Timestamp
pinInfo.Metadata = gpin.Metadata
// check if pin is a meta pin
if gpin.Type == api.MetaType {
pinInfo.Status = api.TrackerStatusSharded
return pinInfo
}
// check if pin is a remote pin
if gpin.IsRemotePin(spt.peerID) {
pinInfo.Status = api.TrackerStatusRemote
return pinInfo
}
// else attempt to get status from ipfs node
var ips api.IPFSPinStatus
err = spt.rpcClient.CallContext(
ctx,
"",
"IPFSConnector",
"PinLsCid",
gpin,
&ips,
)
if err != nil {
logger.Error(err)
addError(&pinInfo, err)
return pinInfo
}
ipfsStatus := ips.ToTrackerStatus()
switch ipfsStatus {
case api.TrackerStatusUnpinned:
// The item is in the state but not in IPFS:
// PinError. Should be pinned.
pinInfo.Status = api.TrackerStatusUnexpectedlyUnpinned
pinInfo.Error = errUnexpectedlyUnpinned.Error()
default:
pinInfo.Status = ipfsStatus
}
return pinInfo
}
// RecoverAll attempts to recover all items tracked by this peer. It returns
// any errors or when it is done re-tracking.
func (spt *Tracker) RecoverAll(ctx context.Context, out chan<- api.PinInfo) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll")
defer span.End()
statusesCh := make(chan api.PinInfo, 1024)
go func() {
err := spt.StatusAll(ctx, api.TrackerStatusUndefined, statusesCh)
if err != nil {
logger.Error(err)
}
}()
for st := range statusesCh {
// Break out if we shutdown. We might be going through
// a very long list of statuses.
select {
case <-spt.ctx.Done():
err := fmt.Errorf("RecoverAll aborted: %w", ctx.Err())
logger.Error(err)
return err
default:
p, err := spt.recoverWithPinInfo(ctx, st)
if err != nil {
err = fmt.Errorf("RecoverAll error: %w", err)
logger.Error(err)
return err
}
if p.Defined() {
select {
case <-ctx.Done():
err = fmt.Errorf("RecoverAll aborted: %w", ctx.Err())
logger.Error(err)
return err
case out <- p:
}
}
}
}
return nil
}
// Recover will trigger pinning or unpinning for items in
// PinError or UnpinError states.
func (spt *Tracker) Recover(ctx context.Context, c api.Cid) (api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover")
defer span.End()
pi := spt.Status(ctx, c)
recPi, err := spt.recoverWithPinInfo(ctx, pi)
// if it was not enqueued, no updated pin-info is returned.
// Use the one we had.
if !recPi.Defined() {
recPi = pi
}
return recPi, err
}
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi api.PinInfo) (api.PinInfo, error) {
st, err := spt.getState(ctx)
if err != nil {
logger.Error(err)
return api.PinInfo{}, err
}
var pin api.Pin
switch pi.Status {
case api.TrackerStatusPinError, api.TrackerStatusUnexpectedlyUnpinned:
pin, err = st.Get(ctx, pi.Cid)
if err != nil { // ignore error - in case pin was removed while recovering
logger.Warn(err)
return spt.Status(ctx, pi.Cid), nil
}
logger.Infof("Restarting pin operation for %s", pi.Cid)
err = spt.enqueue(ctx, pin, optracker.OperationPin)
case api.TrackerStatusUnpinError:
logger.Infof("Restarting unpin operation for %s", pi.Cid)
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationUnpin)
default:
// We do not return any information when recover was a no-op
return api.PinInfo{}, nil
}
if err != nil {
return spt.Status(ctx, pi.Cid), err
}
// This status call should be cheap as it would normally come from the
// optracker and does not need to hit ipfs.
return spt.Status(ctx, pi.Cid), nil
}
func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, <-chan error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/ipfspins")
defer span.End()
in := make(chan []string, 1) // type filter.
in <- []string{"recursive", "direct"}
close(in)
out := make(chan api.IPFSPinInfo, pinsChannelSize)
errCh := make(chan error)
go func() {
err := spt.rpcClient.Stream(
ctx,
"",
"IPFSConnector",
"PinLs",
in,
out,
)
errCh <- err
close(errCh)
}()
return out, errCh
}
// PinQueueSize returns the current size of the pinning queue.
func (spt *Tracker) PinQueueSize(ctx context.Context) (int64, error) {
return spt.optracker.PinQueueSize(), nil
}
// func (spt *Tracker) getErrorsAll(ctx context.Context) []api.PinInfo {
// return spt.optracker.Filter(ctx, optracker.PhaseError)
// }
// OpContext exports the internal optracker's OpContext method.
// For testing purposes only.
func (spt *Tracker) OpContext(ctx context.Context, c api.Cid) context.Context {
return spt.optracker.OpContext(ctx, c)
}
func addError(pinInfo *api.PinInfo, err error) {
pinInfo.Error = err.Error()
pinInfo.Status = api.TrackerStatusClusterError
}