Fix: stateless: cluster should pin items that are in the state but not on ipfs
StateSync() used to take care of this by issuing Track() calls. But this functionality was removed. This starts returning items that are in the state but not on IPFS as PIN_ERRORs. It ensures that the Recover methods see them so that they can trigger repinnings for missing items. This covers cases where the user modifies the ipfs state manually, or resets the ipfs daemon but keeps the cluster state, and cases where cluster was stopped half-way through a pinning.
This commit is contained in:
parent
26f553f3db
commit
9f660ba38e
15
cluster.go
15
cluster.go
|
@ -614,10 +614,10 @@ This might be due to one or several causes:
|
|||
c.Shutdown(ctx)
|
||||
return
|
||||
case <-c.consensus.Ready(ctx):
|
||||
// Consensus ready means the state is up to date so we can sync
|
||||
// it to the tracker. We ignore errors (normal when state
|
||||
// doesn't exist in new peers).
|
||||
c.StateSync(ctx)
|
||||
// Consensus ready means the state is up to date. Every item
|
||||
// in the state that is not pinned will appear as PinError so
|
||||
// we can proceed to recover all of those in the tracker.
|
||||
c.RecoverAllLocal(ctx)
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
}
|
||||
|
@ -981,7 +981,8 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
|
|||
return err
|
||||
}
|
||||
|
||||
c.StateSync(ctx)
|
||||
// Start pinning items in the state that are not on IPFS yet.
|
||||
c.RecoverAllLocal(ctx)
|
||||
|
||||
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
|
||||
return nil
|
||||
|
@ -1107,7 +1108,7 @@ func (c *Cluster) localPinInfoOp(
|
|||
|
||||
}
|
||||
|
||||
// RecoverAll triggers a RecoverAllLocal operation on all peer.
|
||||
// RecoverAll triggers a RecoverAllLocal operation on all peers.
|
||||
func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
|
||||
_, span := trace.StartSpan(ctx, "cluster/RecoverAll")
|
||||
defer span.End()
|
||||
|
@ -1122,6 +1123,8 @@ func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error)
|
|||
// Recover operations ask IPFS to pin or unpin items in error state. Recover
|
||||
// is faster than calling Pin on the same CID as it avoids committing an
|
||||
// identical pin to the consensus layer.
|
||||
//
|
||||
// RecoverAllLocal is called automatically every PinRecoverInterval.
|
||||
func (c *Cluster) RecoverAllLocal(ctx context.Context) ([]*api.PinInfo, error) {
|
||||
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
|
||||
defer span.End()
|
||||
|
|
|
@ -26,6 +26,9 @@ var logger = logging.Logger("pintracker")
|
|||
var (
|
||||
// ErrFullQueue is the error used when pin or unpin operation channel is full.
|
||||
ErrFullQueue = errors.New("pin/unpin operation queue is full. Try increasing max_pin_queue_size")
|
||||
|
||||
// items with this error should be recovered
|
||||
errUnexpectedlyUnpinned = errors.New("the item should be pinned but it is not")
|
||||
)
|
||||
|
||||
// Tracker uses the optracker.OperationTracker to manage
|
||||
|
@ -328,6 +331,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
|
|||
addError(pinInfo, err)
|
||||
return pinInfo
|
||||
}
|
||||
// The pin IS in the state.
|
||||
|
||||
// check if pin is a meta pin
|
||||
if gpin.Type == api.MetaType {
|
||||
|
@ -357,7 +361,16 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
|
|||
return pinInfo
|
||||
}
|
||||
|
||||
pinInfo.Status = ips.ToTrackerStatus()
|
||||
ipfsStatus := ips.ToTrackerStatus()
|
||||
switch ipfsStatus {
|
||||
case api.TrackerStatusUnpinned:
|
||||
// The item is in the state but not in IPFS:
|
||||
// PinError. Should be pinned.
|
||||
pinInfo.Status = api.TrackerStatusPinError
|
||||
pinInfo.Error = errUnexpectedlyUnpinned.Error()
|
||||
default:
|
||||
pinInfo.Status = ipfsStatus
|
||||
}
|
||||
return pinInfo
|
||||
}
|
||||
|
||||
|
@ -369,7 +382,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
|
|||
statuses := spt.StatusAll(ctx)
|
||||
resp := make([]*api.PinInfo, 0)
|
||||
for _, st := range statuses {
|
||||
r, err := spt.Recover(ctx, st.Cid)
|
||||
r, err := spt.recoverWithPinInfo(ctx, st)
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
@ -378,32 +391,36 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
// Recover will re-track or re-untrack a Cid in error state,
|
||||
// possibly retriggering an IPFS pinning operation and returning
|
||||
// only when it is done.
|
||||
// Recover will trigger pinning or unpinning for items in
|
||||
// PinError or UnpinError states.
|
||||
func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover")
|
||||
defer span.End()
|
||||
|
||||
pInfo, ok := spt.optracker.GetExists(ctx, c)
|
||||
if !ok {
|
||||
return spt.Status(ctx, c), nil
|
||||
// Check if we have a status in the operation tracker
|
||||
pi, ok := spt.optracker.GetExists(ctx, c)
|
||||
if ok {
|
||||
return spt.recoverWithPinInfo(ctx, pi)
|
||||
}
|
||||
// Get a status by checking against IPFS and use that.
|
||||
return spt.recoverWithPinInfo(ctx, spt.Status(ctx, c))
|
||||
}
|
||||
|
||||
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*api.PinInfo, error) {
|
||||
var err error
|
||||
switch pInfo.Status {
|
||||
switch pi.Status {
|
||||
case api.TrackerStatusPinError:
|
||||
logger.Infof("Restarting pin operation for %s", c)
|
||||
err = spt.enqueue(ctx, api.PinCid(c), optracker.OperationPin)
|
||||
logger.Infof("Restarting pin operation for %s", pi.Cid)
|
||||
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationPin)
|
||||
case api.TrackerStatusUnpinError:
|
||||
logger.Infof("Restarting unpin operation for %s", c)
|
||||
err = spt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin)
|
||||
logger.Infof("Restarting unpin operation for %s", pi.Cid)
|
||||
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationUnpin)
|
||||
}
|
||||
if err != nil {
|
||||
return spt.Status(ctx, c), err
|
||||
return spt.Status(ctx, pi.Cid), err
|
||||
}
|
||||
|
||||
return spt.Status(ctx, c), nil
|
||||
return spt.Status(ctx, pi.Cid), nil
|
||||
}
|
||||
|
||||
func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, error) {
|
||||
|
@ -497,15 +514,14 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]
|
|||
case pinnedInIpfs:
|
||||
pininfos[pCid] = ipfsInfo
|
||||
default:
|
||||
// report as undefined for this peer. this will be
|
||||
// report as PIN_ERROR for this peer. this will be
|
||||
// overwritten if the operation tracker has more info
|
||||
// for this. Otherwise, this is a problem: a pin in
|
||||
// the state that should be pinned by this peer but
|
||||
// which no operation is handling.
|
||||
|
||||
// TODO (hector): Consider a pinError so it can be
|
||||
// recovered?
|
||||
pinInfo.Status = api.TrackerStatusUndefined
|
||||
// for this (an ongoing pinning operation). Otherwise,
|
||||
// it means something should be pinned and it is not
|
||||
// known by IPFS. Should be handled to "recover".
|
||||
pinInfo.Status = api.TrackerStatusPinError
|
||||
pinInfo.Error = errUnexpectedlyUnpinned.Error()
|
||||
pininfos[pCid] = pinInfo
|
||||
}
|
||||
}
|
||||
return pininfos, nil
|
||||
|
|
Loading…
Reference in New Issue
Block a user