Merge pull request #1538 from ipfs/pintracker/recover-speedup

pintracker: RecoverAll should only return status for recovered items
This commit is contained in:
Hector Sanjuan 2022-01-11 17:09:20 +01:00 committed by GitHub
commit af0cf8b106
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 34 deletions

View File

@ -1198,6 +1198,8 @@ func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error)
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
//
// It returns the list of pins that were re-queued for pinning.
//
// RecoverAllLocal is called automatically every PinRecoverInterval.
func (c *Cluster) RecoverAllLocal(ctx context.Context) ([]*api.PinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")

View File

@ -62,6 +62,9 @@ func (ipfs *mockConnector) ID(ctx context.Context) (*api.IPFSID, error) {
}
func (ipfs *mockConnector) Pin(ctx context.Context, pin *api.Pin) error {
if pin.Cid == test.ErrorCid {
return errors.New("trying to pin ErrorCid")
}
ipfs.pins.Store(pin.Cid.String(), pin.MaxDepth)
return nil
}
@ -935,7 +938,7 @@ func TestClusterRecoverAllLocal(t *testing.T) {
defer cleanState()
defer cl.Shutdown(ctx)
_, err := cl.Pin(ctx, test.Cid1, api.PinOptions{})
_, err := cl.Pin(ctx, test.ErrorCid, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}
@ -947,11 +950,9 @@ func TestClusterRecoverAllLocal(t *testing.T) {
t.Error("did not expect an error")
}
if len(recov) != 1 {
t.Fatalf("there should be only one pin, got = %d", len(recov))
}
if recov[0].Status != api.TrackerStatusPinned {
t.Errorf("the pin should have been recovered, got = %v", recov[0].Status)
t.Fatalf("there should be one pin recovered, got = %d", len(recov))
}
// Recovery will fail, but the pin appearing in the response is good enough to know it was requeued.
}
func TestClusterRepoGC(t *testing.T) {

View File

@ -124,7 +124,8 @@ type PinTracker interface {
StatusAll(context.Context, api.TrackerStatus) []*api.PinInfo
// Status returns the local status of a given Cid.
Status(context.Context, cid.Cid) *api.PinInfo
// RecoverAll calls Recover() for all pins tracked.
// RecoverAll calls Recover() for all pins tracked. Returns only
// informations for retriggered pins.
RecoverAll(context.Context) ([]*api.PinInfo, error)
// Recover retriggers a Pin/Unpin operation in a Cids with error status.
Recover(context.Context, cid.Cid) (*api.PinInfo, error)

View File

@ -1109,8 +1109,8 @@ func TestClustersRecoverAll(t *testing.T) {
}
delay()
if len(gInfos) != 2 {
t.Error("expected two items")
if len(gInfos) != 1 {
t.Error("expected one items")
}
for _, gInfo := range gInfos {

View File

@ -332,25 +332,8 @@ func TestPinTracker_RecoverAll(t *testing.T) {
args{
testStatelessPinTracker(t),
},
// The only CID to recover is test.Cid4 which is in error.
[]*api.PinInfo{
{
Cid: test.Cid1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
},
},
{
Cid: test.Cid2,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusRemote,
},
},
{
Cid: test.Cid3,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusRemote,
},
},
{
// This will recover and status
// is ignored as it could come back as

View File

@ -416,7 +416,8 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
return pinInfo
}
// RecoverAll attempts to recover all items tracked by this peer.
// RecoverAll attempts to recover all items tracked by this peer. It returns
// items that have been re-queued.
func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll")
defer span.End()
@ -434,9 +435,10 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
if err != nil {
return resp, err
}
if r != nil {
resp = append(resp, r)
}
}
}
return resp, nil
}
@ -447,13 +449,21 @@ func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover")
defer span.End()
// Check if we have a status in the operation tracker
// Check if we have a status in the operation tracker and use that
// pininfo. Otherwise, get a status by checking against IPFS and use
// that.
pi, ok := spt.optracker.GetExists(ctx, c)
if ok {
return spt.recoverWithPinInfo(ctx, pi)
if !ok {
pi = spt.Status(ctx, c)
}
// Get a status by checking against IPFS and use that.
return spt.recoverWithPinInfo(ctx, spt.Status(ctx, c))
recPi, err := spt.recoverWithPinInfo(ctx, pi)
// if it was not enqueued, no updated pin-info is returned.
// Use the one we had.
if recPi == nil {
recPi = pi
}
return recPi, err
}
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*api.PinInfo, error) {
@ -465,11 +475,16 @@ func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*a
case api.TrackerStatusUnpinError:
logger.Infof("Restarting unpin operation for %s", pi.Cid)
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationUnpin)
default:
// We do not return any information when recover was a no-op
return nil, nil
}
if err != nil {
return spt.Status(ctx, pi.Cid), err
}
// This status call should be cheap as it would normally come from the
// optracker and does not need to hit ipfs.
return spt.Status(ctx, pi.Cid), nil
}