diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 7c482cd4..0c39e553 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -513,7 +513,7 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned *out = types.GlobalPinInfo{ Cid: in, - PeerMap: map[string]*types.PinInfoShort{ + PeerMap: map[string]types.PinInfoShort{ peer.Encode(test.PeerID1): { Status: types.TrackerStatusPinned, TS: wait.pinStart, @@ -535,7 +535,7 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob } else { // pinning *out = types.GlobalPinInfo{ Cid: in, - PeerMap: map[string]*types.PinInfoShort{ + PeerMap: map[string]types.PinInfoShort{ peer.Encode(test.PeerID1): { Status: types.TrackerStatusPinning, TS: wait.pinStart, @@ -585,7 +585,7 @@ func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types if time.Now().After(wait.unpinStart.Add(5 * time.Second)) { //unpinned *out = types.GlobalPinInfo{ Cid: in, - PeerMap: map[string]*types.PinInfoShort{ + PeerMap: map[string]types.PinInfoShort{ peer.Encode(test.PeerID1): { Status: types.TrackerStatusUnpinned, TS: wait.unpinStart, @@ -599,7 +599,7 @@ func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types } else { // pinning *out = types.GlobalPinInfo{ Cid: in, - PeerMap: map[string]*types.PinInfoShort{ + PeerMap: map[string]types.PinInfoShort{ peer.Encode(test.PeerID1): { Status: types.TrackerStatusUnpinning, TS: wait.unpinStart, diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 04450adf..f608786f 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -10,15 +10,19 @@ import ( "context" "encoding/json" "errors" + "fmt" "math/rand" "net/http" "strings" + "sync" "time" + "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/adder/adderutils" types "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/common" "github.com/ipfs/ipfs-cluster/state" + "go.uber.org/multierr" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" @@ -505,6 +509,11 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) { func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query() + if queryValues.Get("cids") != "" { + api.statusCidsHandler(w, r) + return + } + local := queryValues.Get("local") var globalPinInfos []*types.GlobalPinInfo @@ -550,6 +559,80 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) { api.SendResponse(w, common.SetStatusAutomatically, nil, globalPinInfos) } +// request statuses for multiple CIDs in parallel. +func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) { + queryValues := r.URL.Query() + filterCidsStr := strings.Split(queryValues.Get("cids"), ",") + var cids []cid.Cid + + for _, cidStr := range filterCidsStr { + c, err := cid.Decode(cidStr) + if err != nil { + api.SendResponse(w, http.StatusBadRequest, fmt.Errorf("error decoding Cid: %w", err), nil) + return + } + cids = append(cids, c) + } + + local := queryValues.Get("local") + + type gpiResult struct { + gpi types.GlobalPinInfo + err error + } + gpiCh := make(chan gpiResult, len(cids)) + var wg sync.WaitGroup + wg.Add(len(cids)) + + // Close channel when done + go func() { + wg.Wait() + close(gpiCh) + }() + + if local == "true" { + for _, ci := range cids { + go func(c cid.Cid) { + defer wg.Done() + var pinInfo types.PinInfo + err := api.rpcClient.CallContext( + r.Context(), + "", + "Cluster", + "StatusLocal", + c, + &pinInfo, + ) + gpiCh <- gpiResult{gpi: pinInfo.ToGlobal(), err: err} + }(ci) + } + } else { + for _, ci := range cids { + go func(c cid.Cid) { + defer wg.Done() + var pinInfo types.GlobalPinInfo + err := api.rpcClient.CallContext( + r.Context(), + "", + "Cluster", + "Status", + c, + &pinInfo, + ) + gpiCh <- gpiResult{gpi: pinInfo, err: err} + }(ci) + } + } + + var gpis []types.GlobalPinInfo + var err error + for gpiResult := range gpiCh { + gpis = append(gpis, gpiResult.gpi) + err = multierr.Append(err, gpiResult.err) + } + api.SendResponse(w, common.SetStatusAutomatically, err, gpis) +} + func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query() local := queryValues.Get("local") @@ -682,7 +765,8 @@ func repoGCToGlobal(r *types.RepoGC) types.GlobalRepoGC { func pinInfosToGlobal(pInfos []*types.PinInfo) []*types.GlobalPinInfo { gPInfos := make([]*types.GlobalPinInfo, len(pInfos)) for i, p := range pInfos { - gPInfos[i] = p.ToGlobal() + gpi := (*p).ToGlobal() + gPInfos[i] = &gpi } return gPInfos } diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 3f1144ef..5647320d 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strings" "testing" "time" @@ -617,6 +618,7 @@ func TestAPIStatusAllEndpoint(t *testing.T) { var resp []*api.GlobalPinInfo test.MakeGet(t, rest, url(rest)+"/pins", &resp) + // mockPinTracker returns 3 items for Cluster.StatusAll if len(resp) != 3 || !resp[0].Cid.Equals(clustertest.Cid1) || resp[1].PeerMap[peer.Encode(clustertest.PeerID1)].Status.String() != "pinning" { @@ -626,6 +628,8 @@ func TestAPIStatusAllEndpoint(t *testing.T) { // Test local=true var resp2 []*api.GlobalPinInfo test.MakeGet(t, rest, url(rest)+"/pins?local=true", &resp2) + // mockPinTracker calls pintracker.StatusAll which returns 2 + // items. if len(resp2) != 2 { t.Errorf("unexpected statusAll+local resp:\n %+v", resp2) } @@ -671,6 +675,44 @@ func TestAPIStatusAllEndpoint(t *testing.T) { test.BothEndpoints(t, tf) } +func TestAPIStatusAllWithCidsEndpoint(t *testing.T) { + ctx := context.Background() + rest := testAPI(t) + defer rest.Shutdown(ctx) + + tf := func(t *testing.T, url test.URLFunc) { + var resp []*api.GlobalPinInfo + cids := []string{ + clustertest.Cid1.String(), + clustertest.Cid2.String(), + clustertest.Cid3.String(), + clustertest.Cid4.String(), + } + test.MakeGet(t, rest, url(rest)+"/pins/?cids="+strings.Join(cids, ","), &resp) + + if len(resp) != 4 { + t.Error("wrong number of responses") + } + + // Test local=true + var resp2 []*api.GlobalPinInfo + test.MakeGet(t, rest, url(rest)+"/pins/?local=true&cids="+strings.Join(cids, ","), &resp2) + if len(resp2) != 4 { + t.Error("wrong number of responses") + } + + // Test with an error + cids = append(cids, clustertest.ErrorCid.String()) + var errorResp api.Error + test.MakeGet(t, rest, url(rest)+"/pins/?local=true&cids="+strings.Join(cids, ","), &errorResp) + if errorResp.Message != clustertest.ErrBadCid.Error() { + t.Error("expected an error") + } + } + + test.BothEndpoints(t, tf) +} + func TestAPIStatusEndpoint(t *testing.T) { ctx := context.Background() rest := testAPI(t) diff --git a/api/types.go b/api/types.go index ca8a4397..1003d475 100644 --- a/api/types.go +++ b/api/types.go @@ -255,31 +255,31 @@ type GlobalPinInfo struct { // https://github.com/golang/go/issues/28827 // Peer IDs are of string Kind(). We can't use peer IDs here // as Go ignores TextMarshaler. - PeerMap map[string]*PinInfoShort `json:"peer_map" codec:"pm,omitempty"` + PeerMap map[string]PinInfoShort `json:"peer_map" codec:"pm,omitempty"` } // String returns the string representation of a GlobalPinInfo. func (gpi *GlobalPinInfo) String() string { - str := fmt.Sprintf("Cid: %v\n", gpi.Cid.String()) - str = str + "Peer:\n" - for _, p := range gpi.PeerMap { - str = str + fmt.Sprintf("\t%+v\n", p) + str := fmt.Sprintf("Cid: %s\n", gpi.Cid) + str = str + "Peers:\n" + for pid, p := range gpi.PeerMap { + str = str + fmt.Sprintf("\t%s: %+v\n", pid, p) } return str } // Add adds a PinInfo object to a GlobalPinInfo -func (gpi *GlobalPinInfo) Add(pi *PinInfo) { +func (gpi *GlobalPinInfo) Add(pi PinInfo) { if !gpi.Cid.Defined() { gpi.Cid = pi.Cid gpi.Name = pi.Name } if gpi.PeerMap == nil { - gpi.PeerMap = make(map[string]*PinInfoShort) + gpi.PeerMap = make(map[string]PinInfoShort) } - gpi.PeerMap[peer.Encode(pi.Peer)] = &pi.PinInfoShort + gpi.PeerMap[peer.Encode(pi.Peer)] = pi.PinInfoShort } // PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo @@ -306,10 +306,10 @@ type PinInfo struct { // ToGlobal converts a PinInfo object to a GlobalPinInfo with // a single peer corresponding to the given PinInfo. -func (pi *PinInfo) ToGlobal() *GlobalPinInfo { - gpi := GlobalPinInfo{} +func (pi PinInfo) ToGlobal() GlobalPinInfo { + gpi := &GlobalPinInfo{} gpi.Add(pi) - return &gpi + return *gpi } // Version holds version information diff --git a/cluster.go b/cluster.go index c48144c5..22d3ce7f 100644 --- a/cluster.go +++ b/cluster.go @@ -1751,7 +1751,7 @@ func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []p if peerName == "" { peerName = p.String() } - gpin.Add(&api.PinInfo{ + gpin.Add(api.PinInfo{ Cid: h, Name: name, Peer: p, @@ -1862,7 +1862,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c // No error. Parse and continue if e == nil { - gpin.Add(r) + gpin.Add(*r) continue } @@ -1879,7 +1879,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c if peerName == "" { peerName = dests[i].String() } - gpin.Add(&api.PinInfo{ + gpin.Add(api.PinInfo{ Cid: h, Name: pin.Name, Peer: dests[i], @@ -1946,7 +1946,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, a info = &api.GlobalPinInfo{} fullMap[p.Cid] = info } - info.Add(p) + info.Add(*p) } erroredPeers := make(map[peer.ID]string) diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 47f46006..899add40 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -224,7 +224,7 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, ou gPinInfos := []*api.GlobalPinInfo{ { Cid: Cid1, - PeerMap: map[string]*api.PinInfoShort{ + PeerMap: map[string]api.PinInfoShort{ pid: { Status: api.TrackerStatusPinned, TS: time.Now(), @@ -233,7 +233,7 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, ou }, { Cid: Cid2, - PeerMap: map[string]*api.PinInfoShort{ + PeerMap: map[string]api.PinInfoShort{ pid: { Status: api.TrackerStatusPinning, TS: time.Now(), @@ -242,7 +242,7 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, ou }, { Cid: Cid3, - PeerMap: map[string]*api.PinInfoShort{ + PeerMap: map[string]api.PinInfoShort{ pid: { Status: api.TrackerStatusPinError, TS: time.Now(), @@ -281,7 +281,7 @@ func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.Global } *out = api.GlobalPinInfo{ Cid: in, - PeerMap: map[string]*api.PinInfoShort{ + PeerMap: map[string]api.PinInfoShort{ peer.Encode(PeerID1): { Status: api.TrackerStatusPinned, TS: time.Now(),