Merge pull request #1562 from ipfs/feat/filter-cids

Restapi: add "cids" query param to /pins
This commit is contained in:
Hector Sanjuan 2022-02-02 00:57:43 +01:00 committed by GitHub
commit 5221afb9b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 24 deletions

View File

@ -513,7 +513,7 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob
if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfoShort{
PeerMap: map[string]types.PinInfoShort{
peer.Encode(test.PeerID1): {
Status: types.TrackerStatusPinned,
TS: wait.pinStart,
@ -535,7 +535,7 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob
} else { // pinning
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfoShort{
PeerMap: map[string]types.PinInfoShort{
peer.Encode(test.PeerID1): {
Status: types.TrackerStatusPinning,
TS: wait.pinStart,
@ -585,7 +585,7 @@ func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types
if time.Now().After(wait.unpinStart.Add(5 * time.Second)) { //unpinned
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfoShort{
PeerMap: map[string]types.PinInfoShort{
peer.Encode(test.PeerID1): {
Status: types.TrackerStatusUnpinned,
TS: wait.unpinStart,
@ -599,7 +599,7 @@ func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types
} else { // pinning
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfoShort{
PeerMap: map[string]types.PinInfoShort{
peer.Encode(test.PeerID1): {
Status: types.TrackerStatusUnpinning,
TS: wait.unpinStart,

View File

@ -10,15 +10,19 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"strings"
"sync"
"time"
"github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/adder/adderutils"
types "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/common"
"github.com/ipfs/ipfs-cluster/state"
"go.uber.org/multierr"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
@ -505,6 +509,11 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
if queryValues.Get("cids") != "" {
api.statusCidsHandler(w, r)
return
}
local := queryValues.Get("local")
var globalPinInfos []*types.GlobalPinInfo
@ -550,6 +559,80 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
api.SendResponse(w, common.SetStatusAutomatically, nil, globalPinInfos)
}
// request statuses for multiple CIDs in parallel.
func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
filterCidsStr := strings.Split(queryValues.Get("cids"), ",")
var cids []cid.Cid
for _, cidStr := range filterCidsStr {
c, err := cid.Decode(cidStr)
if err != nil {
api.SendResponse(w, http.StatusBadRequest, fmt.Errorf("error decoding Cid: %w", err), nil)
return
}
cids = append(cids, c)
}
local := queryValues.Get("local")
type gpiResult struct {
gpi types.GlobalPinInfo
err error
}
gpiCh := make(chan gpiResult, len(cids))
var wg sync.WaitGroup
wg.Add(len(cids))
// Close channel when done
go func() {
wg.Wait()
close(gpiCh)
}()
if local == "true" {
for _, ci := range cids {
go func(c cid.Cid) {
defer wg.Done()
var pinInfo types.PinInfo
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"StatusLocal",
c,
&pinInfo,
)
gpiCh <- gpiResult{gpi: pinInfo.ToGlobal(), err: err}
}(ci)
}
} else {
for _, ci := range cids {
go func(c cid.Cid) {
defer wg.Done()
var pinInfo types.GlobalPinInfo
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"Status",
c,
&pinInfo,
)
gpiCh <- gpiResult{gpi: pinInfo, err: err}
}(ci)
}
}
var gpis []types.GlobalPinInfo
var err error
for gpiResult := range gpiCh {
gpis = append(gpis, gpiResult.gpi)
err = multierr.Append(err, gpiResult.err)
}
api.SendResponse(w, common.SetStatusAutomatically, err, gpis)
}
func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
@ -682,7 +765,8 @@ func repoGCToGlobal(r *types.RepoGC) types.GlobalRepoGC {
func pinInfosToGlobal(pInfos []*types.PinInfo) []*types.GlobalPinInfo {
gPInfos := make([]*types.GlobalPinInfo, len(pInfos))
for i, p := range pInfos {
gPInfos[i] = p.ToGlobal()
gpi := (*p).ToGlobal()
gPInfos[i] = &gpi
}
return gPInfos
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
@ -617,6 +618,7 @@ func TestAPIStatusAllEndpoint(t *testing.T) {
var resp []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins", &resp)
// mockPinTracker returns 3 items for Cluster.StatusAll
if len(resp) != 3 ||
!resp[0].Cid.Equals(clustertest.Cid1) ||
resp[1].PeerMap[peer.Encode(clustertest.PeerID1)].Status.String() != "pinning" {
@ -626,6 +628,8 @@ func TestAPIStatusAllEndpoint(t *testing.T) {
// Test local=true
var resp2 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins?local=true", &resp2)
// mockPinTracker calls pintracker.StatusAll which returns 2
// items.
if len(resp2) != 2 {
t.Errorf("unexpected statusAll+local resp:\n %+v", resp2)
}
@ -671,6 +675,44 @@ func TestAPIStatusAllEndpoint(t *testing.T) {
test.BothEndpoints(t, tf)
}
func TestAPIStatusAllWithCidsEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []*api.GlobalPinInfo
cids := []string{
clustertest.Cid1.String(),
clustertest.Cid2.String(),
clustertest.Cid3.String(),
clustertest.Cid4.String(),
}
test.MakeGet(t, rest, url(rest)+"/pins/?cids="+strings.Join(cids, ","), &resp)
if len(resp) != 4 {
t.Error("wrong number of responses")
}
// Test local=true
var resp2 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins/?local=true&cids="+strings.Join(cids, ","), &resp2)
if len(resp2) != 4 {
t.Error("wrong number of responses")
}
// Test with an error
cids = append(cids, clustertest.ErrorCid.String())
var errorResp api.Error
test.MakeGet(t, rest, url(rest)+"/pins/?local=true&cids="+strings.Join(cids, ","), &errorResp)
if errorResp.Message != clustertest.ErrBadCid.Error() {
t.Error("expected an error")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIStatusEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)

View File

@ -255,31 +255,31 @@ type GlobalPinInfo struct {
// https://github.com/golang/go/issues/28827
// Peer IDs are of string Kind(). We can't use peer IDs here
// as Go ignores TextMarshaler.
PeerMap map[string]*PinInfoShort `json:"peer_map" codec:"pm,omitempty"`
PeerMap map[string]PinInfoShort `json:"peer_map" codec:"pm,omitempty"`
}
// String returns the string representation of a GlobalPinInfo.
func (gpi *GlobalPinInfo) String() string {
str := fmt.Sprintf("Cid: %v\n", gpi.Cid.String())
str = str + "Peer:\n"
for _, p := range gpi.PeerMap {
str = str + fmt.Sprintf("\t%+v\n", p)
str := fmt.Sprintf("Cid: %s\n", gpi.Cid)
str = str + "Peers:\n"
for pid, p := range gpi.PeerMap {
str = str + fmt.Sprintf("\t%s: %+v\n", pid, p)
}
return str
}
// Add adds a PinInfo object to a GlobalPinInfo
func (gpi *GlobalPinInfo) Add(pi *PinInfo) {
func (gpi *GlobalPinInfo) Add(pi PinInfo) {
if !gpi.Cid.Defined() {
gpi.Cid = pi.Cid
gpi.Name = pi.Name
}
if gpi.PeerMap == nil {
gpi.PeerMap = make(map[string]*PinInfoShort)
gpi.PeerMap = make(map[string]PinInfoShort)
}
gpi.PeerMap[peer.Encode(pi.Peer)] = &pi.PinInfoShort
gpi.PeerMap[peer.Encode(pi.Peer)] = pi.PinInfoShort
}
// PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo
@ -306,10 +306,10 @@ type PinInfo struct {
// ToGlobal converts a PinInfo object to a GlobalPinInfo with
// a single peer corresponding to the given PinInfo.
func (pi *PinInfo) ToGlobal() *GlobalPinInfo {
gpi := GlobalPinInfo{}
func (pi PinInfo) ToGlobal() GlobalPinInfo {
gpi := &GlobalPinInfo{}
gpi.Add(pi)
return &gpi
return *gpi
}
// Version holds version information

View File

@ -1751,7 +1751,7 @@ func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []p
if peerName == "" {
peerName = p.String()
}
gpin.Add(&api.PinInfo{
gpin.Add(api.PinInfo{
Cid: h,
Name: name,
Peer: p,
@ -1862,7 +1862,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
// No error. Parse and continue
if e == nil {
gpin.Add(r)
gpin.Add(*r)
continue
}
@ -1879,7 +1879,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
if peerName == "" {
peerName = dests[i].String()
}
gpin.Add(&api.PinInfo{
gpin.Add(api.PinInfo{
Cid: h,
Name: pin.Name,
Peer: dests[i],
@ -1946,7 +1946,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, a
info = &api.GlobalPinInfo{}
fullMap[p.Cid] = info
}
info.Add(p)
info.Add(*p)
}
erroredPeers := make(map[peer.ID]string)

View File

@ -224,7 +224,7 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, ou
gPinInfos := []*api.GlobalPinInfo{
{
Cid: Cid1,
PeerMap: map[string]*api.PinInfoShort{
PeerMap: map[string]api.PinInfoShort{
pid: {
Status: api.TrackerStatusPinned,
TS: time.Now(),
@ -233,7 +233,7 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, ou
},
{
Cid: Cid2,
PeerMap: map[string]*api.PinInfoShort{
PeerMap: map[string]api.PinInfoShort{
pid: {
Status: api.TrackerStatusPinning,
TS: time.Now(),
@ -242,7 +242,7 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, ou
},
{
Cid: Cid3,
PeerMap: map[string]*api.PinInfoShort{
PeerMap: map[string]api.PinInfoShort{
pid: {
Status: api.TrackerStatusPinError,
TS: time.Now(),
@ -281,7 +281,7 @@ func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.Global
}
*out = api.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*api.PinInfoShort{
PeerMap: map[string]api.PinInfoShort{
peer.Encode(PeerID1): {
Status: api.TrackerStatusPinned,
TS: time.Now(),