Merge pull request #1377 from ipfs/fix/1360-efficient-pin-status

Fix #1360: Efficient pinset status with filters
This commit is contained in:
Hector Sanjuan 2021-07-06 11:57:09 +02:00 committed by GitHub
commit 54c3608899
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 131 additions and 104 deletions

View File

@ -891,30 +891,6 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
}
}
// filterGlobalPinInfos takes a GlobalPinInfo slice and discards
// any item in it which does not carry a PinInfo matching the
// filter (OR-wise).
func filterGlobalPinInfos(globalPinInfos []*types.GlobalPinInfo, filter types.TrackerStatus) []*types.GlobalPinInfo {
if filter == types.TrackerStatusUndefined {
return globalPinInfos
}
var filteredGlobalPinInfos []*types.GlobalPinInfo
for _, globalPinInfo := range globalPinInfos {
for _, pinInfo := range globalPinInfo.PeerMap {
// silenced the error because we should have detected
// earlier if filters were invalid
if pinInfo.Status.Match(filter) {
filteredGlobalPinInfos = append(filteredGlobalPinInfos, globalPinInfo)
break
}
}
}
return filteredGlobalPinInfos
}
func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
@ -936,7 +912,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"StatusAllLocal",
struct{}{},
filter,
&pinInfos,
)
if err != nil {
@ -950,7 +926,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"StatusAll",
struct{}{},
filter,
&globalPinInfos,
)
if err != nil {
@ -959,8 +935,6 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
}
}
globalPinInfos = filterGlobalPinInfos(globalPinInfos, filter)
api.sendResponse(w, autoStatus, nil, globalPinInfos)
}

View File

@ -77,6 +77,9 @@ const (
// The IPFS daemon is not pinning the item through this cid but it is
// tracked in a cluster dag
TrackerStatusSharded
// The item is in the state and should be pinned, but
// it is however not pinned and not queued/pinning.
TrackerStatusUnexpectedlyUnpinned
)
// Composite TrackerStatus.
@ -102,6 +105,8 @@ var trackerStatusString = map[TrackerStatus]string{
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
TrackerStatusQueued: "queued",
TrackerStatusSharded: "sharded",
TrackerStatusUnexpectedlyUnpinned: "unexpectedly_unpinned",
}
// values autofilled in init()
@ -130,9 +135,11 @@ func (st TrackerStatus) String() string {
// Match returns true if the tracker status matches the given filter.
// For example TrackerStatusPinError will match TrackerStatusPinError
// and TrackerStatusError
// and TrackerStatusError.
func (st TrackerStatus) Match(filter TrackerStatus) bool {
return filter == 0 || st&filter > 0
return filter == TrackerStatusUndefined ||
st == TrackerStatusUndefined ||
st&filter > 0
}
// MarshalJSON uses the string representation of TrackerStatus for JSON

View File

@ -1087,21 +1087,21 @@ func (c *Cluster) StateSync(ctx context.Context) error {
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
// If an error happens, the slice will contain as much information as
// could be fetched from other peers.
func (c *Cluster) StatusAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus) ([]*api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/StatusAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll")
return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll", filter)
}
// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer.
func (c *Cluster) StatusAllLocal(ctx context.Context) []*api.PinInfo {
func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo {
_, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.tracker.StatusAll(ctx)
return c.tracker.StatusAll(ctx, filter)
}
// Status returns the GlobalPinInfo for a given Cid as fetched from all
@ -1157,7 +1157,7 @@ func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error)
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal")
return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal", nil)
}
// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
@ -1824,10 +1824,14 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
return gpin, nil
}
func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ([]*api.GlobalPinInfo, error) {
func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, arg interface{}) ([]*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoSlice")
defer span.End()
if arg == nil {
arg = struct{}{}
}
infos := make([]*api.GlobalPinInfo, 0)
fullMap := make(map[cid.Cid]*api.GlobalPinInfo)
@ -1857,7 +1861,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
members,
comp,
method,
struct{}{},
arg,
rpcutil.CopyPinInfoSliceToIfaces(replies),
)

View File

@ -119,8 +119,9 @@ type PinTracker interface {
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
// may perform an IPFS unpin operation.
Untrack(context.Context, cid.Cid) error
// StatusAll returns the list of pins with their local status.
StatusAll(context.Context) []*api.PinInfo
// StatusAll returns the list of pins with their local status. Takes a
// filter to specify which statuses to report.
StatusAll(context.Context, api.TrackerStatus) []*api.PinInfo
// Status returns the local status of a given Cid.
Status(context.Context, cid.Cid) *api.PinInfo
// RecoverAll calls Recover() for all pins tracked.

View File

@ -655,7 +655,7 @@ func TestClustersPin(t *testing.T) {
delay()
}
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll(ctx)
status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
for _, v := range status {
if v.Status != api.TrackerStatusPinned {
t.Errorf("%s should have been pinned but it is %s", v.Cid, v.Status)
@ -704,7 +704,7 @@ func TestClustersPin(t *testing.T) {
delay()
funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll(ctx)
status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
for _, v := range status {
t.Errorf("%s should have been unpinned but it is %s", v.Cid, v.Status)
}
@ -848,7 +848,7 @@ func TestClustersStatusAll(t *testing.T) {
pinDelay()
// Global status
f := func(t *testing.T, c *Cluster) {
statuses, err := c.StatusAll(ctx)
statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined)
if err != nil {
t.Error(err)
}
@ -910,7 +910,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
return
}
statuses, err := c.StatusAll(ctx)
statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined)
if err != nil {
t.Error(err)
}
@ -1194,7 +1194,7 @@ func TestClustersReplicationOverall(t *testing.T) {
f := func(t *testing.T, c *Cluster) {
// confirm that the pintracker state matches the current global state
pinfos := c.tracker.StatusAll(ctx)
pinfos := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
if len(pinfos) != nClusters {
t.Error("Pinfos does not have the expected pins")
}

View File

@ -204,7 +204,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
// in state but not on IPFS
Cid: test.Cid4,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinError,
Status: api.TrackerStatusUnexpectedlyUnpinned,
},
},
},
@ -216,7 +216,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
t.Errorf("PinTracker.Track() error = %v", err)
}
time.Sleep(200 * time.Millisecond)
got := tt.args.tracker.StatusAll(context.Background())
got := tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined)
if len(got) != len(tt.want) {
for _, pi := range got {
t.Logf("pinfo: %v", pi)
@ -259,7 +259,7 @@ func BenchmarkPinTracker_StatusAll(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
tt.args.tracker.StatusAll(context.Background())
tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined)
}
})
}

View File

@ -271,25 +271,32 @@ func (spt *Tracker) Untrack(ctx context.Context, c cid.Cid) error {
}
// StatusAll returns information for all Cids pinned to the local IPFS node.
func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo {
func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/StatusAll")
defer span.End()
pininfos, err := spt.localStatus(ctx, true)
pininfos, err := spt.localStatus(ctx, true, filter)
if err != nil {
return nil
}
// get all inflight operations from optracker and put them into the
// map, deduplicating any existing items with their inflight operation.
//
// we cannot filter in GetAll, because we are meant to replace items in
// pininfos and set the correct status, as otherwise they will remain in
// PinError.
for _, infop := range spt.optracker.GetAll(ctx) {
pininfos[infop.Cid] = infop
}
var pis []*api.PinInfo
for _, pi := range pininfos {
// Last filter.
if pi.Status.Match(filter) {
pis = append(pis, pi)
}
}
return pis
}
@ -382,7 +389,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll")
defer span.End()
statuses := spt.StatusAll(ctx)
statuses := spt.StatusAll(ctx, api.TrackerStatusUndefined)
resp := make([]*api.PinInfo, 0)
for _, st := range statuses {
r, err := spt.recoverWithPinInfo(ctx, st)
@ -412,7 +419,7 @@ func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*api.PinInfo, error) {
var err error
switch pi.Status {
case api.TrackerStatusPinError:
case api.TrackerStatusPinError, api.TrackerStatusUnexpectedlyUnpinned:
logger.Infof("Restarting pin operation for %s", pi.Cid)
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationPin)
case api.TrackerStatusUnpinError:
@ -465,11 +472,12 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo
return pins, nil
}
// localStatus returns a joint set of consensusState and ipfsStatus
// marking pins which should be meta or remote and leaving any ipfs pins that
// aren't in the consensusState out. If incExtra is true, Remote and Sharded
// pins will be added to the status slice.
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid]*api.PinInfo, error) {
// localStatus returns a joint set of consensusState and ipfsStatus marking
// pins which should be meta or remote and leaving any ipfs pins that aren't
// in the consensusState out. If incExtra is true, Remote and Sharded pins
// will be added to the status slice. If a filter is provided, only statuses
// matching the filter will be returned.
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.TrackerStatus) (map[cid.Cid]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus")
defer span.End()
@ -486,12 +494,15 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid
return nil, err
}
// get statuses from ipfs node first
localpis, err := spt.ipfsStatusAll(ctx)
var localpis map[cid.Cid]*api.PinInfo
// Only query IPFS if we want to status for pinned items
if filter.Match(api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned) {
localpis, err = spt.ipfsStatusAll(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}
pininfos := make(map[cid.Cid]*api.PinInfo, len(statePins))
for _, p := range statePins {
@ -509,25 +520,29 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid
switch {
case p.Type == api.MetaType:
if !incExtra || !filter.Match(api.TrackerStatusSharded) {
continue
}
pinInfo.Status = api.TrackerStatusSharded
if incExtra {
pininfos[p.Cid] = &pinInfo
}
case p.IsRemotePin(spt.peerID):
pinInfo.Status = api.TrackerStatusRemote
if incExtra {
pininfos[p.Cid] = &pinInfo
if !incExtra || !filter.Match(api.TrackerStatusRemote) {
continue
}
case pinnedInIpfs:
pinInfo.Status = api.TrackerStatusRemote
pininfos[p.Cid] = &pinInfo
case pinnedInIpfs: // always false unless filter matches TrackerStatusPinnned
ipfsInfo.Name = p.Name
pininfos[p.Cid] = ipfsInfo
default:
// report as PIN_ERROR for this peer. this will be
// overwritten if the operation tracker has more info
// for this (an ongoing pinning operation). Otherwise,
// it means something should be pinned and it is not
// known by IPFS. Should be handled to "recover".
pinInfo.Status = api.TrackerStatusPinError
// report as UNEXPECTEDLY_UNPINNED for this peer.
// this will be overwritten if the operation tracker
// has more info for this (an ongoing pinning
// operation). Otherwise, it means something should be
// pinned and it is not known by IPFS. Should be
// handled to "recover".
pinInfo.Status = api.TrackerStatusUnexpectedlyUnpinned
pinInfo.Error = errUnexpectedlyUnpinned.Error()
pininfos[p.Cid] = &pinInfo
}

View File

@ -397,7 +397,7 @@ func TestStatusAll(t *testing.T) {
// * A slow CID pinning
// * Cid1 is pinned
// * Cid4 should be in PinError (it's in the state but not on IPFS)
stAll := spt.StatusAll(ctx)
stAll := spt.StatusAll(ctx, api.TrackerStatusUndefined)
if len(stAll) != 3 {
t.Errorf("wrong status length. Expected 3, got: %d", len(stAll))
}
@ -409,8 +409,8 @@ func TestStatusAll(t *testing.T) {
t.Error("cid1 should be pinned")
}
case test.Cid4:
if pi.Status != api.TrackerStatusPinError {
t.Error("cid2 should be in pin_error status")
if pi.Status != api.TrackerStatusUnexpectedlyUnpinned {
t.Error("cid2 should be in unexpectedly_unpinned status")
}
case test.SlowCid1:
if pi.Status != api.TrackerStatusPinning {
@ -471,6 +471,6 @@ func BenchmarkTracker_localStatus(b *testing.B) {
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
tracker.localStatus(ctx, true)
tracker.localStatus(ctx, true, api.TrackerStatusUndefined)
}
}

View File

@ -263,8 +263,8 @@ func (rpcapi *ClusterRPCAPI) Join(ctx context.Context, in api.Multiaddr, out *st
}
// StatusAll runs Cluster.StatusAll().
func (rpcapi *ClusterRPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
pinfos, err := rpcapi.c.StatusAll(ctx)
func (rpcapi *ClusterRPCAPI) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.GlobalPinInfo) error {
pinfos, err := rpcapi.c.StatusAll(ctx, in)
if err != nil {
return err
}
@ -273,8 +273,8 @@ func (rpcapi *ClusterRPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]
}
// StatusAllLocal runs Cluster.StatusAllLocal().
func (rpcapi *ClusterRPCAPI) StatusAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
pinfos := rpcapi.c.StatusAllLocal(ctx)
func (rpcapi *ClusterRPCAPI) StatusAllLocal(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error {
pinfos := rpcapi.c.StatusAllLocal(ctx, in)
*out = pinfos
return nil
}
@ -452,10 +452,10 @@ func (rpcapi *PinTrackerRPCAPI) Untrack(ctx context.Context, in *api.Pin, out *s
}
// StatusAll runs PinTracker.StatusAll().
func (rpcapi *PinTrackerRPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
func (rpcapi *PinTrackerRPCAPI) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error {
ctx, span := trace.StartSpan(ctx, "rpc/tracker/StatusAll")
defer span.End()
*out = rpcapi.tracker.StatusAll(ctx)
*out = rpcapi.tracker.StatusAll(ctx, in)
return nil
}

View File

@ -219,9 +219,9 @@ func (mock *mockCluster) ConnectGraph(ctx context.Context, in struct{}, out *api
return nil
}
func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.GlobalPinInfo) error {
pid := peer.Encode(PeerID1)
*out = []*api.GlobalPinInfo{
gPinInfos := []*api.GlobalPinInfo{
{
Cid: Cid1,
PeerMap: map[string]*api.PinInfoShort{
@ -250,10 +250,28 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api
},
},
}
// If there is no filter match, we will not return that status and we
// will not have an entry for that peer in the peerMap. In turn, when
// a single peer, we will not have an entry for the cid at all.
for _, gpi := range gPinInfos {
for id, pi := range gpi.PeerMap {
if !in.Match(pi.Status) {
delete(gpi.PeerMap, id)
}
}
}
filtered := make([]*api.GlobalPinInfo, 0, len(gPinInfos))
for _, gpi := range gPinInfos {
if len(gpi.PeerMap) > 0 {
filtered = append(filtered, gpi)
}
}
*out = filtered
return nil
}
func (mock *mockCluster) StatusAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
func (mock *mockCluster) StatusAllLocal(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error {
return (&mockPinTracker{}).StatusAll(ctx, in, out)
}
@ -278,7 +296,7 @@ func (mock *mockCluster) StatusLocal(ctx context.Context, in cid.Cid, out *api.P
}
func (mock *mockCluster) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
return mock.StatusAll(ctx, in, out)
return mock.StatusAll(ctx, api.TrackerStatusUndefined, out)
}
func (mock *mockCluster) RecoverAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
@ -367,8 +385,8 @@ func (mock *mockPinTracker) Untrack(ctx context.Context, in *api.Pin, out *struc
return nil
}
func (mock *mockPinTracker) StatusAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
*out = []*api.PinInfo{
func (mock *mockPinTracker) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error {
pinInfos := []*api.PinInfo{
{
Cid: Cid1,
Peer: PeerID1,
@ -386,6 +404,14 @@ func (mock *mockPinTracker) StatusAll(ctx context.Context, in struct{}, out *[]*
},
},
}
filtered := make([]*api.PinInfo, 0, len(pinInfos))
for _, pi := range pinInfos {
if in.Match(pi.Status) {
filtered = append(filtered, pi)
}
}
*out = filtered
return nil
}