From 5fed4a2c5e4efffb9d42bce82abd6633aaef9e7e Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 10 Mar 2022 13:42:43 +0100 Subject: [PATCH] types: include IPFSAddresses in pinInfo objects. pinsvcapi: do not cache peer information here as all the needed information is in the status objects. This adds ipfs_addresses as a field broadcasted with the ping metrics. --- api/pinsvcapi/pinsvcapi.go | 121 +---- api/pinsvcapi/pinsvcapi_test.go | 749 +----------------------------- api/types.go | 15 +- cluster.go | 49 +- pintracker/stateless/stateless.go | 51 +- rpc_api.go | 9 +- test/rpc_api_mock.go | 6 +- util.go | 38 +- 8 files changed, 118 insertions(+), 920 deletions(-) diff --git a/api/pinsvcapi/pinsvcapi.go b/api/pinsvcapi/pinsvcapi.go index 15f49ba2..a3007618 100644 --- a/api/pinsvcapi/pinsvcapi.go +++ b/api/pinsvcapi/pinsvcapi.go @@ -10,9 +10,7 @@ import ( "context" "encoding/json" "errors" - "net" "net/http" - "sync" "time" "github.com/gorilla/mux" @@ -21,13 +19,10 @@ import ( "github.com/ipfs/ipfs-cluster/api/common" "github.com/ipfs/ipfs-cluster/api/pinsvcapi/pinsvc" "github.com/ipfs/ipfs-cluster/state" - "github.com/multiformats/go-multiaddr" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" - peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" - madns "github.com/multiformats/go-multiaddr-dns" ) var ( @@ -85,7 +80,6 @@ func svcPinToClusterPin(p pinsvc.Pin) (*types.Pin, error) { func globalPinInfoToSvcPinStatus( rID string, gpi types.GlobalPinInfo, - clusterIDs []*types.ID, ) pinsvc.PinStatus { status := pinsvc.PinStatus{ @@ -98,7 +92,6 @@ func globalPinInfoToSvcPinStatus( } status.Status = trackerStatusToSvcStatus(statusMask) - status.Created = time.Now() status.Pin = pinsvc.Pin{ Cid: gpi.Cid.String(), Name: gpi.Name, @@ -106,60 +99,19 @@ func globalPinInfoToSvcPinStatus( Meta: gpi.Metadata, } - delegates := []types.Multiaddr{} - idMap := make(map[peer.ID]*types.ID) - for _, clusterID := range clusterIDs { - idMap[clusterID.ID] = clusterID - } - filteredClusterIDs := clusterIDs - if len(gpi.Allocations) > 0 { - filteredClusterIDs = []*types.ID{} - for _, alloc := range gpi.Allocations { - clid, ok := idMap[alloc] - if ok && clid.Error == "" { - filteredClusterIDs = append(filteredClusterIDs, clid) - } + for _, pi := range gpi.PeerMap { + status.Delegates = append(status.Delegates, pi.IPFSAddresses...) + // Set created to the oldest known timestamp + if status.Created.IsZero() || pi.TS.Before(status.Created) { + status.Created = pi.TS } } - // Get the multiaddresses of the IPFS peers storing this content. - for _, clid := range filteredClusterIDs { - if clid.IPFS == nil { - continue // should not be - } - for _, ma := range clid.IPFS.Addresses { - if madns.Matches(ma.Value()) { // a dns multiaddress: take it - delegates = append(delegates, ma) - continue - } - - ip, err := ma.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - ip, err = ma.ValueForProtocol(multiaddr.P_IP6) - if err != nil { - continue - } - } - // We have an IP in the multiaddress. Only include - // global unicast. - netip := net.ParseIP(ip) - if netip == nil { - continue - } - - if !netip.IsGlobalUnicast() { - continue - } - delegates = append(delegates, ma) - } - status.Delegates = delegates - } - status.Info = map[string]string{ "source": "IPFS cluster API", "warning1": "disregard created time", "warning2": "CID used for requestID. Conflicts possible", - "warning3": "experimenal", + "warning3": "experimental", } return status } @@ -171,10 +123,6 @@ type API struct { rpcClient *rpc.Client config *Config - - peersMux sync.RWMutex - peers []*types.ID - peersHaveBeenSet chan struct{} } // NewAPI creates a new REST API component. @@ -195,7 +143,6 @@ func NewAPIWithHost(ctx context.Context, cfg *Config, h host.Host) (*API, error) // Routes returns endpoints supported by this API. func (api *API) routes(c *rpc.Client) []common.Route { api.rpcClient = c - go api.refreshPeerset() return []common.Route{ { Name: "AddPin", @@ -230,56 +177,6 @@ func (api *API) routes(c *rpc.Client) []common.Route { } } -func (api *API) refreshPeerset() { - t := time.NewTimer(0) // fire asap - api.peersHaveBeenSet = make(chan struct{}) - - for range t.C { - select { - case <-api.Context().Done(): - return - default: - } - - logger.Debug("Fetching peers for caching") - ctx, cancel := context.WithTimeout(api.Context(), time.Minute) - var peers []*types.ID - err := api.rpcClient.CallContext( - ctx, - "", - "Cluster", - "Peers", - struct{}{}, - &peers, - ) - cancel() - - if err != nil { - logger.Errorf("error fetching peers for caching: %s", err) - t.Reset(10 * time.Second) - continue - } - - api.peersMux.Lock() - if api.peers == nil && peers != nil { - close(api.peersHaveBeenSet) - } - if peers != nil { - api.peers = peers - } - api.peersMux.Unlock() - t.Reset(time.Minute) - } -} - -func (api *API) getPeers() (peers []*types.ID) { - <-api.peersHaveBeenSet - - api.peersMux.RLock() - defer api.peersMux.RUnlock() - return api.peers -} - func (api *API) parseBodyOrFail(w http.ResponseWriter, r *http.Request) *pinsvc.Pin { dec := json.NewDecoder(r.Body) defer r.Body.Close() @@ -376,7 +273,7 @@ func (api *API) addPin(w http.ResponseWriter, r *http.Request) { time.Sleep(500 * time.Millisecond) } - status := globalPinInfoToSvcPinStatus(pinObj.Cid.String(), pinInfo, api.getPeers()) + status := globalPinInfoToSvcPinStatus(pinObj.Cid.String(), pinInfo) api.SendResponse(w, common.SetStatusAutomatically, nil, status) } } @@ -386,7 +283,7 @@ func (api *API) getPinObject(ctx context.Context, c cid.Cid) (pinsvc.PinStatus, if err != nil { return pinsvc.PinStatus{}, types.GlobalPinInfo{}, err } - return globalPinInfoToSvcPinStatus(c.String(), clusterPinStatus, api.getPeers()), clusterPinStatus, nil + return globalPinInfoToSvcPinStatus(c.String(), clusterPinStatus), clusterPinStatus, nil } @@ -462,7 +359,7 @@ func (api *API) listPins(w http.ResponseWriter, r *http.Request) { return } for i, gpi := range globalPinInfos { - st := globalPinInfoToSvcPinStatus(gpi.Cid.String(), *gpi, api.getPeers()) + st := globalPinInfoToSvcPinStatus(gpi.Cid.String(), *gpi) pinList.Results = append(pinList.Results, st) if i+1 == opts.Limit { break diff --git a/api/pinsvcapi/pinsvcapi_test.go b/api/pinsvcapi/pinsvcapi_test.go index f1c67cde..5742f2c6 100644 --- a/api/pinsvcapi/pinsvcapi_test.go +++ b/api/pinsvcapi/pinsvcapi_test.go @@ -2,19 +2,12 @@ package pinsvcapi import ( "context" - "fmt" - "io/ioutil" - "net/http" "testing" "time" - "github.com/ipfs/ipfs-cluster/api" - test "github.com/ipfs/ipfs-cluster/api/common/test" clustertest "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" libp2p "github.com/libp2p/go-libp2p" - peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" ) @@ -33,7 +26,7 @@ const ( func testAPIwithConfig(t *testing.T, cfg *Config, name string) *API { ctx := context.Background() apiMAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - h, err := libp2p.New(ctx, libp2p.ListenAddrs(apiMAddr)) + h, err := libp2p.New(libp2p.ListenAddrs(apiMAddr)) if err != nil { t.Fatal(err) } @@ -62,743 +55,3 @@ func testAPI(t *testing.T) *API { return testAPIwithConfig(t, cfg, "basic") } - -func TestRestAPIIDEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - id := api.ID{} - test.MakeGet(t, rest, url(rest)+"/id", &id) - if id.ID.Pretty() != clustertest.PeerID1.Pretty() { - t.Error("expected correct id") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIVersionEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - ver := api.Version{} - test.MakeGet(t, rest, url(rest)+"/version", &ver) - if ver.Version != "0.0.mock" { - t.Error("expected correct version") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIPeerstEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var list []*api.ID - test.MakeGet(t, rest, url(rest)+"/peers", &list) - if len(list) != 1 { - t.Fatal("expected 1 element") - } - if list[0].ID.Pretty() != clustertest.PeerID1.Pretty() { - t.Error("expected a different peer id list: ", list) - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIPeerAddEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - id := api.ID{} - // post with valid body - body := fmt.Sprintf("{\"peer_id\":\"%s\"}", clustertest.PeerID1.Pretty()) - t.Log(body) - test.MakePost(t, rest, url(rest)+"/peers", []byte(body), &id) - if id.ID.Pretty() != clustertest.PeerID1.Pretty() { - t.Error("expected correct ID") - } - if id.Error != "" { - t.Error("did not expect an error") - } - - // Send invalid body - errResp := api.Error{} - test.MakePost(t, rest, url(rest)+"/peers", []byte("oeoeoeoe"), &errResp) - if errResp.Code != 400 { - t.Error("expected error with bad body") - } - // Send invalid peer id - test.MakePost(t, rest, url(rest)+"/peers", []byte("{\"peer_id\": \"ab\"}"), &errResp) - if errResp.Code != 400 { - t.Error("expected error with bad peer_id") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIAddFileEndpointBadContentType(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1" - localURL := url(rest) + fmtStr1 - - errResp := api.Error{} - test.MakePost(t, rest, localURL, []byte("test"), &errResp) - - if errResp.Code != 400 { - t.Error("expected error with bad content-type") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIAddFileEndpointLocal(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - sth := clustertest.NewShardingTestHelper() - defer sth.Clean(t) - - // This generates the testing files and - // writes them to disk. - // This is necessary here because we run tests - // in parallel, and otherwise a write-race might happen. - _, closer := sth.GetTreeMultiReader(t) - closer.Close() - - tf := func(t *testing.T, url test.URLFunc) { - fmtStr1 := "/add?shard=false&repl_min=-1&repl_max=-1&stream-channels=true" - localURL := url(rest) + fmtStr1 - body, closer := sth.GetTreeMultiReader(t) - defer closer.Close() - resp := api.AddedOutput{} - mpContentType := "multipart/form-data; boundary=" + body.Boundary() - test.MakeStreamingPost(t, rest, localURL, body, mpContentType, &resp) - - // resp will contain the last object from the streaming - if resp.Cid.String() != clustertest.ShardingDirBalancedRootCID { - t.Error("Bad Cid after adding: ", resp.Cid) - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIAddFileEndpointShard(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - sth := clustertest.NewShardingTestHelper() - defer sth.Clean(t) - - // This generates the testing files and - // writes them to disk. - // This is necessary here because we run tests - // in parallel, and otherwise a write-race might happen. - _, closer := sth.GetTreeMultiReader(t) - closer.Close() - - tf := func(t *testing.T, url test.URLFunc) { - body, closer := sth.GetTreeMultiReader(t) - defer closer.Close() - mpContentType := "multipart/form-data; boundary=" + body.Boundary() - resp := api.AddedOutput{} - fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1&stream-channels=true" - shardURL := url(rest) + fmtStr1 - test.MakeStreamingPost(t, rest, shardURL, body, mpContentType, &resp) - } - - test.BothEndpoints(t, tf) -} - -func TestAPIAddFileEndpoint_StreamChannelsFalse(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - sth := clustertest.NewShardingTestHelper() - defer sth.Clean(t) - - // This generates the testing files and - // writes them to disk. - // This is necessary here because we run tests - // in parallel, and otherwise a write-race might happen. - _, closer := sth.GetTreeMultiReader(t) - closer.Close() - - tf := func(t *testing.T, url test.URLFunc) { - body, closer := sth.GetTreeMultiReader(t) - defer closer.Close() - fullBody, err := ioutil.ReadAll(body) - if err != nil { - t.Fatal(err) - } - mpContentType := "multipart/form-data; boundary=" + body.Boundary() - resp := []api.AddedOutput{} - fmtStr1 := "/add?shard=false&repl_min=-1&repl_max=-1&stream-channels=false" - shardURL := url(rest) + fmtStr1 - - test.MakePostWithContentType(t, rest, shardURL, fullBody, mpContentType, &resp) - lastHash := resp[len(resp)-1] - if lastHash.Cid.String() != clustertest.ShardingDirBalancedRootCID { - t.Error("Bad Cid after adding: ", lastHash.Cid) - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIPeerRemoveEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - test.MakeDelete(t, rest, url(rest)+"/peers/"+clustertest.PeerID1.Pretty(), &struct{}{}) - } - - test.BothEndpoints(t, tf) -} - -func TestConnectGraphEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var cg api.ConnectGraph - test.MakeGet(t, rest, url(rest)+"/health/graph", &cg) - if cg.ClusterID.Pretty() != clustertest.PeerID1.Pretty() { - t.Error("unexpected cluster id") - } - if len(cg.IPFSLinks) != 3 { - t.Error("unexpected number of ipfs peers") - } - if len(cg.ClusterLinks) != 3 { - t.Error("unexpected number of cluster peers") - } - if len(cg.ClustertoIPFS) != 3 { - t.Error("unexpected number of cluster to ipfs links") - } - // test a few link values - pid1 := clustertest.PeerID1 - pid4 := clustertest.PeerID4 - if _, ok := cg.ClustertoIPFS[peer.Encode(pid1)]; !ok { - t.Fatal("missing cluster peer 1 from cluster to peer links map") - } - if cg.ClustertoIPFS[peer.Encode(pid1)] != pid4 { - t.Error("unexpected ipfs peer mapped to cluster peer 1 in graph") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIPinEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - // test regular post - test.MakePost(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String(), []byte{}, &struct{}{}) - - errResp := api.Error{} - test.MakePost(t, rest, url(rest)+"/pins/"+clustertest.ErrorCid.String(), []byte{}, &errResp) - if errResp.Message != clustertest.ErrBadCid.Error() { - t.Error("expected different error: ", errResp.Message) - } - - test.MakePost(t, rest, url(rest)+"/pins/abcd", []byte{}, &errResp) - if errResp.Code != 400 { - t.Error("should fail with bad Cid") - } - } - - test.BothEndpoints(t, tf) -} - -type pathCase struct { - path string - opts api.PinOptions - wantErr bool - code int - expectedCid string -} - -func (p *pathCase) WithQuery(t *testing.T) string { - query, err := p.opts.ToQuery() - if err != nil { - t.Fatal(err) - } - return p.path + "?" + query -} - -var testPinOpts = api.PinOptions{ - ReplicationFactorMax: 7, - ReplicationFactorMin: 6, - Name: "hello there", - UserAllocations: []peer.ID{clustertest.PeerID1, clustertest.PeerID2}, - ExpireAt: time.Now().Add(30 * time.Second), -} - -var pathTestCases = []pathCase{ - { - "/ipfs/QmaNJ5acV31sx8jq626qTpAWW4DXKw34aGhx53dECLvXbY", - testPinOpts, - false, - http.StatusOK, - "QmaNJ5acV31sx8jq626qTpAWW4DXKw34aGhx53dECLvXbY", - }, - { - "/ipfs/QmbUNM297ZwxB8CfFAznK7H9YMesDoY6Tt5bPgt5MSCB2u/im.gif", - testPinOpts, - false, - http.StatusOK, - clustertest.CidResolved.String(), - }, - { - "/ipfs/invalidhash", - testPinOpts, - true, - http.StatusBadRequest, - "", - }, - { - "/ipfs/bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4", - testPinOpts, - true, - http.StatusNotFound, - "", - }, - // TODO: A case with trailing slash with paths - // clustertest.PathIPNS2, clustertest.PathIPLD2, clustertest.InvalidPath1 -} - -func TestAPIPinEndpointWithPath(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - for _, testCase := range pathTestCases[:3] { - c, _ := cid.Decode(testCase.expectedCid) - resultantPin := api.PinWithOpts( - c, - testPinOpts, - ) - - if testCase.wantErr { - errResp := api.Error{} - q := testCase.WithQuery(t) - test.MakePost(t, rest, url(rest)+"/pins"+q, []byte{}, &errResp) - if errResp.Code != testCase.code { - t.Errorf( - "status code: expected: %d, got: %d, path: %s\n", - testCase.code, - errResp.Code, - testCase.path, - ) - } - continue - } - pin := api.Pin{} - q := testCase.WithQuery(t) - test.MakePost(t, rest, url(rest)+"/pins"+q, []byte{}, &pin) - if !pin.Equals(resultantPin) { - t.Errorf("pin: expected: %+v", resultantPin) - t.Errorf("pin: got: %+v", pin) - t.Errorf("path: %s", testCase.path) - } - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIUnpinEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - // test regular delete - test.MakeDelete(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String(), &struct{}{}) - - errResp := api.Error{} - test.MakeDelete(t, rest, url(rest)+"/pins/"+clustertest.ErrorCid.String(), &errResp) - if errResp.Message != clustertest.ErrBadCid.Error() { - t.Error("expected different error: ", errResp.Message) - } - - test.MakeDelete(t, rest, url(rest)+"/pins/"+clustertest.NotFoundCid.String(), &errResp) - if errResp.Code != http.StatusNotFound { - t.Error("expected different error code: ", errResp.Code) - } - - test.MakeDelete(t, rest, url(rest)+"/pins/abcd", &errResp) - if errResp.Code != 400 { - t.Error("expected different error code: ", errResp.Code) - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIUnpinEndpointWithPath(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - for _, testCase := range pathTestCases { - if testCase.wantErr { - errResp := api.Error{} - test.MakeDelete(t, rest, url(rest)+"/pins"+testCase.path, &errResp) - if errResp.Code != testCase.code { - t.Errorf( - "status code: expected: %d, got: %d, path: %s\n", - testCase.code, - errResp.Code, - testCase.path, - ) - } - continue - } - pin := api.Pin{} - test.MakeDelete(t, rest, url(rest)+"/pins"+testCase.path, &pin) - if pin.Cid.String() != testCase.expectedCid { - t.Errorf( - "cid: expected: %s, got: %s, path: %s\n", - clustertest.CidResolved, - pin.Cid, - testCase.path, - ) - } - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIAllocationsEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp []*api.Pin - test.MakeGet(t, rest, url(rest)+"/allocations?filter=pin,meta-pin", &resp) - if len(resp) != 3 || - !resp[0].Cid.Equals(clustertest.Cid1) || !resp[1].Cid.Equals(clustertest.Cid2) || - !resp[2].Cid.Equals(clustertest.Cid3) { - t.Error("unexpected pin list: ", resp) - } - - test.MakeGet(t, rest, url(rest)+"/allocations", &resp) - if len(resp) != 3 || - !resp[0].Cid.Equals(clustertest.Cid1) || !resp[1].Cid.Equals(clustertest.Cid2) || - !resp[2].Cid.Equals(clustertest.Cid3) { - t.Error("unexpected pin list: ", resp) - } - - errResp := api.Error{} - test.MakeGet(t, rest, url(rest)+"/allocations?filter=invalid", &errResp) - if errResp.Code != http.StatusBadRequest { - t.Error("an invalid filter value should 400") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIAllocationEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp api.Pin - test.MakeGet(t, rest, url(rest)+"/allocations/"+clustertest.Cid1.String(), &resp) - if !resp.Cid.Equals(clustertest.Cid1) { - t.Errorf("cid should be the same: %s %s", resp.Cid, clustertest.Cid1) - } - - errResp := api.Error{} - test.MakeGet(t, rest, url(rest)+"/allocations/"+clustertest.ErrorCid.String(), &errResp) - if errResp.Code != 404 { - t.Error("a non-pinned cid should 404") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIMetricsEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp []*api.Metric - test.MakeGet(t, rest, url(rest)+"/monitor/metrics/somemetricstype", &resp) - if len(resp) == 0 { - t.Fatal("No metrics found") - } - for _, m := range resp { - if m.Name != "test" { - t.Error("Unexpected metric name: ", m.Name) - } - if m.Peer.Pretty() != clustertest.PeerID1.Pretty() { - t.Error("Unexpected peer id: ", m.Peer) - } - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIMetricNamesEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp []string - test.MakeGet(t, rest, url(rest)+"/monitor/metrics", &resp) - if len(resp) == 0 { - t.Fatal("No metric names found") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIAlertsEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp []api.Alert - test.MakeGet(t, rest, url(rest)+"/health/alerts", &resp) - if len(resp) != 1 { - t.Error("expected one alert") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIStatusAllEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp []*api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins", &resp) - - if len(resp) != 3 || - !resp[0].Cid.Equals(clustertest.Cid1) || - resp[1].PeerMap[peer.Encode(clustertest.PeerID1)].Status.String() != "pinning" { - t.Errorf("unexpected statusAll resp") - } - - // Test local=true - var resp2 []*api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins?local=true", &resp2) - if len(resp2) != 2 { - t.Errorf("unexpected statusAll+local resp:\n %+v", resp2) - } - - // Test with filter - var resp3 []*api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins?filter=queued", &resp3) - if len(resp3) != 0 { - t.Errorf("unexpected statusAll+filter=queued resp:\n %+v", resp3) - } - - var resp4 []*api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins?filter=pinned", &resp4) - if len(resp4) != 1 { - t.Errorf("unexpected statusAll+filter=pinned resp:\n %+v", resp4) - } - - var resp5 []*api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins?filter=pin_error", &resp5) - if len(resp5) != 1 { - t.Errorf("unexpected statusAll+filter=pin_error resp:\n %+v", resp5) - } - - var resp6 []*api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins?filter=error", &resp6) - if len(resp6) != 1 { - t.Errorf("unexpected statusAll+filter=error resp:\n %+v", resp6) - } - - var resp7 []*api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins?filter=error,pinned", &resp7) - if len(resp7) != 2 { - t.Errorf("unexpected statusAll+filter=error,pinned resp:\n %+v", resp7) - } - - var errorResp api.Error - test.MakeGet(t, rest, url(rest)+"/pins?filter=invalid", &errorResp) - if errorResp.Code != http.StatusBadRequest { - t.Error("an invalid filter value should 400") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIStatusEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String(), &resp) - - if !resp.Cid.Equals(clustertest.Cid1) { - t.Error("expected the same cid") - } - info, ok := resp.PeerMap[peer.Encode(clustertest.PeerID1)] - if !ok { - t.Fatal("expected info for clustertest.PeerID1") - } - if info.Status.String() != "pinned" { - t.Error("expected different status") - } - - // Test local=true - var resp2 api.GlobalPinInfo - test.MakeGet(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String()+"?local=true", &resp2) - - if !resp2.Cid.Equals(clustertest.Cid1) { - t.Error("expected the same cid") - } - info, ok = resp2.PeerMap[peer.Encode(clustertest.PeerID2)] - if !ok { - t.Fatal("expected info for clustertest.PeerID2") - } - if info.Status.String() != "pinned" { - t.Error("expected different status") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIRecoverEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp api.GlobalPinInfo - test.MakePost(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String()+"/recover", []byte{}, &resp) - - if !resp.Cid.Equals(clustertest.Cid1) { - t.Error("expected the same cid") - } - info, ok := resp.PeerMap[peer.Encode(clustertest.PeerID1)] - if !ok { - t.Fatal("expected info for clustertest.PeerID1") - } - if info.Status.String() != "pinned" { - t.Error("expected different status") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIRecoverAllEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - tf := func(t *testing.T, url test.URLFunc) { - var resp []*api.GlobalPinInfo - test.MakePost(t, rest, url(rest)+"/pins/recover?local=true", []byte{}, &resp) - if len(resp) != 0 { - t.Fatal("bad response length") - } - - var resp1 []*api.GlobalPinInfo - test.MakePost(t, rest, url(rest)+"/pins/recover", []byte{}, &resp1) - if len(resp1) == 0 { - t.Fatal("bad response length") - } - } - - test.BothEndpoints(t, tf) -} - -func TestAPIIPFSGCEndpoint(t *testing.T) { - ctx := context.Background() - rest := testAPI(t) - defer rest.Shutdown(ctx) - - testGlobalRepoGC := func(t *testing.T, gRepoGC *api.GlobalRepoGC) { - if gRepoGC.PeerMap == nil { - t.Fatal("expected a non-nil peer map") - } - - if len(gRepoGC.PeerMap) != 1 { - t.Error("expected repo gc information for one peer") - } - - for _, repoGC := range gRepoGC.PeerMap { - if repoGC.Peer == "" { - t.Error("expected a cluster ID") - } - if repoGC.Error != "" { - t.Error("did not expect any error") - } - if repoGC.Keys == nil { - t.Fatal("expected a non-nil array of IPFSRepoGC") - } - if len(repoGC.Keys) == 0 { - t.Fatal("expected at least one key, but found none") - } - if !repoGC.Keys[0].Key.Equals(clustertest.Cid1) { - t.Errorf("expected a different cid, expected: %s, found: %s", clustertest.Cid1, repoGC.Keys[0].Key) - } - - } - } - - tf := func(t *testing.T, url test.URLFunc) { - var resp api.GlobalRepoGC - test.MakePost(t, rest, url(rest)+"/ipfs/gc?local=true", []byte{}, &resp) - testGlobalRepoGC(t, &resp) - - var resp1 api.GlobalRepoGC - test.MakePost(t, rest, url(rest)+"/ipfs/gc", []byte{}, &resp1) - testGlobalRepoGC(t, &resp1) - } - - test.BothEndpoints(t, tf) -} diff --git a/api/types.go b/api/types.go index ec2a5daf..f6ce95b3 100644 --- a/api/types.go +++ b/api/types.go @@ -303,13 +303,14 @@ func (gpi *GlobalPinInfo) Match(filter TrackerStatus) bool { // PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo // objects and does not carry redundant information as PinInfo would. type PinInfoShort struct { - PeerName string `json:"peername" codec:"pn,omitempty"` - IPFS peer.ID `json:"ipfs_peer_id,omitempty" codec:"i,omitempty"` - Status TrackerStatus `json:"status" codec:"st,omitempty"` - TS time.Time `json:"timestamp" codec:"ts,omitempty"` - Error string `json:"error" codec:"e,omitempty"` - AttemptCount int `json:"attempt_count" codec:"a,omitempty"` - PriorityPin bool `json:"priority_pin" codec:"y,omitempty"` + PeerName string `json:"peername" codec:"pn,omitempty"` + IPFS peer.ID `json:"ipfs_peer_id,omitempty" codec:"i,omitempty"` + IPFSAddresses []Multiaddr `json:"ipfs_peer_addresses,omitempty" codec:"ia,omitempty"` + Status TrackerStatus `json:"status" codec:"st,omitempty"` + TS time.Time `json:"timestamp" codec:"ts,omitempty"` + Error string `json:"error" codec:"e,omitempty"` + AttemptCount int `json:"attempt_count" codec:"a,omitempty"` + PriorityPin bool `json:"priority_pin" codec:"y,omitempty"` } // PinInfo holds information about local pins. This is used by the Pin diff --git a/cluster.go b/cluster.go index cdd2cf2e..2c33b080 100644 --- a/cluster.go +++ b/cluster.go @@ -383,8 +383,9 @@ func (c *Cluster) sendPingMetric(ctx context.Context) (*api.Metric, error) { id := c.ID(ctx) newPingVal := pingValue{ - Peername: id.Peername, - IPFSID: id.IPFS.ID, + Peername: id.Peername, + IPFSID: id.IPFS.ID, + IPFSAddresses: publicIPFSAddresses(id.IPFS.Addresses), } if c.curPingVal.Valid() && !newPingVal.Valid() { // i.e. ipfs down @@ -1752,10 +1753,6 @@ func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer. func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) { for _, p := range peers { pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p)) - peerName := pv.Peername - if peerName == "" { - peerName = p.String() - } gpin.Add(api.PinInfo{ Cid: h, Name: pin.Name, @@ -1764,10 +1761,11 @@ func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []p Metadata: pin.Metadata, Peer: p, PinInfoShort: api.PinInfoShort{ - PeerName: pv.Peername, - IPFS: pv.IPFSID, - Status: status, - TS: t, + PeerName: pv.Peername, + IPFS: pv.IPFSID, + IPFSAddresses: pv.IPFSAddresses, + Status: status, + TS: t, }, }) } @@ -1883,10 +1881,6 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e) pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, dests[i])) - peerName := pv.Peername - if peerName == "" { - peerName = dests[i].String() - } gpin.Add(api.PinInfo{ Cid: h, Name: pin.Name, @@ -1895,11 +1889,12 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c Origins: pin.Origins, Metadata: pin.Metadata, PinInfoShort: api.PinInfoShort{ - PeerName: pv.Peername, - IPFS: pv.IPFSID, - Status: api.TrackerStatusClusterError, - TS: timeNow, - Error: e.Error(), + PeerName: pv.Peername, + IPFS: pv.IPFSID, + IPFSAddresses: pv.IPFSAddresses, + Status: api.TrackerStatusClusterError, + TS: timeNow, + Error: e.Error(), }, }) } @@ -1989,11 +1984,12 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, a Origins: nil, Metadata: nil, PinInfoShort: api.PinInfoShort{ - PeerName: pv.Peername, - IPFS: pv.IPFSID, - Status: api.TrackerStatusClusterError, - TS: time.Now(), - Error: msg, + PeerName: pv.Peername, + IPFS: pv.IPFSID, + IPFSAddresses: pv.IPFSAddresses, + Status: api.TrackerStatusClusterError, + TS: time.Now(), + Error: msg, }, }) } @@ -2134,6 +2130,7 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) { // to club `RepoGCLocal` responses of all peers into one globalRepoGC := api.GlobalRepoGC{PeerMap: make(map[string]*api.RepoGC)} + for _, member := range members { var repoGC api.RepoGC err = c.rpcClient.CallContext( @@ -2156,9 +2153,11 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) { logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err) + pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, member)) + globalRepoGC.PeerMap[peer.Encode(member)] = &api.RepoGC{ Peer: member, - Peername: peer.Encode(member), + Peername: pv.Peername, Keys: []api.IPFSRepoGC{}, Error: err.Error(), } diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index f0b9a54e..46c09016 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -86,20 +86,20 @@ func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Co // we can get our IPFS id from our own monitor ping metrics which // are refreshed regularly. -func (spt *Tracker) getIPFSID(ctx context.Context) peer.ID { +func (spt *Tracker) getIPFSID(ctx context.Context) api.IPFSID { // Wait until RPC is ready <-spt.rpcReady - var pid peer.ID + var ipfsid api.IPFSID spt.rpcClient.CallContext( ctx, "", "Cluster", "IPFSID", struct{}{}, - &pid, + &ipfsid, ) - return pid + return ipfsid } // receives a pin Function (pin or unpin) and channels. Used for both pinning @@ -335,7 +335,8 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []* // PinError. ipfsid := spt.getIPFSID(ctx) for _, infop := range spt.optracker.GetAll(ctx) { - infop.IPFS = ipfsid + infop.IPFS = ipfsid.ID + infop.IPFSAddresses = ipfsid.Addresses pininfos[infop.Cid] = infop } @@ -354,10 +355,13 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { ctx, span := trace.StartSpan(ctx, "tracker/stateless/Status") defer span.End() + ipfsid := spt.getIPFSID(ctx) + // check if c has an inflight operation or errorred operation in optracker if oppi, ok := spt.optracker.GetExists(ctx, c); ok { // if it does return the status of the operation - oppi.IPFS = spt.getIPFSID(ctx) + oppi.IPFS = ipfsid.ID + oppi.IPFSAddresses = ipfsid.Addresses return oppi } @@ -366,11 +370,12 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { Peer: spt.peerID, Name: "", // etc to be filled later PinInfoShort: api.PinInfoShort{ - PeerName: spt.peerName, - IPFS: spt.getIPFSID(ctx), - TS: time.Now(), - AttemptCount: 0, - PriorityPin: false, + PeerName: spt.peerName, + IPFS: ipfsid.ID, + IPFSAddresses: ipfsid.Addresses, + TS: time.Now(), + AttemptCount: 0, + PriorityPin: false, }, } @@ -547,12 +552,13 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo Metadata: nil, // to be filled later Peer: spt.peerID, PinInfoShort: api.PinInfoShort{ - PeerName: spt.peerName, - IPFS: ipfsid, - Status: ips.ToTrackerStatus(), - TS: time.Now(), // to be set later - AttemptCount: 0, - PriorityPin: false, + PeerName: spt.peerName, + IPFS: ipfsid.ID, + IPFSAddresses: ipfsid.Addresses, + Status: ips.ToTrackerStatus(), + TS: time.Now(), // to be set later + AttemptCount: 0, + PriorityPin: false, }, } pins[c] = p @@ -615,11 +621,12 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.T Origins: p.Origins, Metadata: p.Metadata, PinInfoShort: api.PinInfoShort{ - PeerName: spt.peerName, - IPFS: ipfsid, - TS: p.Timestamp, - AttemptCount: 0, - PriorityPin: false, + PeerName: spt.peerName, + IPFS: ipfsid.ID, + IPFSAddresses: ipfsid.Addresses, + TS: p.Timestamp, + AttemptCount: 0, + PriorityPin: false, }, } diff --git a/rpc_api.go b/rpc_api.go index eec0205b..d101fdfe 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -432,8 +432,13 @@ func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *[]api } // IPFSID returns the current cached IPFS ID. -func (rpcapi *ClusterRPCAPI) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error { - *out = rpcapi.c.curPingVal.IPFSID +func (rpcapi *ClusterRPCAPI) IPFSID(ctx context.Context, in struct{}, out *api.IPFSID) error { + pingVal := pingValueFromMetric(rpcapi.c.monitor.LatestForPeer(ctx, pingMetricName, rpcapi.c.host.ID())) + i := api.IPFSID{ + ID: pingVal.IPFSID, + Addresses: pingVal.IPFSAddresses, + } + *out = i return nil } diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 6521df2e..928aec55 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -379,8 +379,10 @@ func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *[]api.Ale return nil } -func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error { - *out = PeerID1 +func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *api.IPFSID) error { + var id api.ID + _ = mock.ID(ctx, in, &id) + *out = id.IPFS return nil } diff --git a/util.go b/util.go index 52cbab9f..49c461a5 100644 --- a/util.go +++ b/util.go @@ -5,13 +5,16 @@ import ( "encoding/json" "errors" "fmt" + "net" blake2b "golang.org/x/crypto/blake2b" cid "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" + madns "github.com/multiformats/go-multiaddr-dns" ) // PeersFromMultiaddrs returns all the different peers in the given addresses. @@ -162,8 +165,9 @@ func peersSubtract(a []peer.ID, b []peer.ID) []peer.ID { // pingValue describes the value carried by ping metrics type pingValue struct { - Peername string `json:"peer_name,omitempty"` - IPFSID peer.ID `json:"ipfs_id,omitempty"` + Peername string `json:"peer_name,omitempty"` + IPFSID peer.ID `json:"ipfs_id,omitempty"` + IPFSAddresses []api.Multiaddr `json:"ipfs_addresses,omitempty"` } // Valid returns true if the PingValue has IPFSID set. @@ -180,3 +184,33 @@ func pingValueFromMetric(m *api.Metric) (pv pingValue) { json.Unmarshal([]byte(m.Value), &pv) return } + +func publicIPFSAddresses(in []api.Multiaddr) []api.Multiaddr { + var out []api.Multiaddr + for _, ma := range in { + if madns.Matches(ma.Value()) { // a dns multiaddress: take it + out = append(out, ma) + continue + } + + ip, err := ma.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + ip, err = ma.ValueForProtocol(multiaddr.P_IP6) + if err != nil { + continue + } + } + // We have an IP in the multiaddress. Only include + // global unicast. + netip := net.ParseIP(ip) + if netip == nil { + continue + } + + if !netip.IsGlobalUnicast() { + continue + } + out = append(out, ma) + } + return out +}