types: include IPFSAddresses in pinInfo objects.

pinsvcapi: do not cache peer information here as all the needed information is
in the status objects.

This adds ipfs_addresses as a field broadcasted with the ping metrics.
This commit is contained in:
Hector Sanjuan 2022-03-10 13:42:43 +01:00
parent 5b0d9d68e3
commit 5fed4a2c5e
8 changed files with 118 additions and 920 deletions

View File

@ -10,9 +10,7 @@ import (
"context"
"encoding/json"
"errors"
"net"
"net/http"
"sync"
"time"
"github.com/gorilla/mux"
@ -21,13 +19,10 @@ import (
"github.com/ipfs/ipfs-cluster/api/common"
"github.com/ipfs/ipfs-cluster/api/pinsvcapi/pinsvc"
"github.com/ipfs/ipfs-cluster/state"
"github.com/multiformats/go-multiaddr"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
madns "github.com/multiformats/go-multiaddr-dns"
)
var (
@ -85,7 +80,6 @@ func svcPinToClusterPin(p pinsvc.Pin) (*types.Pin, error) {
func globalPinInfoToSvcPinStatus(
rID string,
gpi types.GlobalPinInfo,
clusterIDs []*types.ID,
) pinsvc.PinStatus {
status := pinsvc.PinStatus{
@ -98,7 +92,6 @@ func globalPinInfoToSvcPinStatus(
}
status.Status = trackerStatusToSvcStatus(statusMask)
status.Created = time.Now()
status.Pin = pinsvc.Pin{
Cid: gpi.Cid.String(),
Name: gpi.Name,
@ -106,60 +99,19 @@ func globalPinInfoToSvcPinStatus(
Meta: gpi.Metadata,
}
delegates := []types.Multiaddr{}
idMap := make(map[peer.ID]*types.ID)
for _, clusterID := range clusterIDs {
idMap[clusterID.ID] = clusterID
}
filteredClusterIDs := clusterIDs
if len(gpi.Allocations) > 0 {
filteredClusterIDs = []*types.ID{}
for _, alloc := range gpi.Allocations {
clid, ok := idMap[alloc]
if ok && clid.Error == "" {
filteredClusterIDs = append(filteredClusterIDs, clid)
}
for _, pi := range gpi.PeerMap {
status.Delegates = append(status.Delegates, pi.IPFSAddresses...)
// Set created to the oldest known timestamp
if status.Created.IsZero() || pi.TS.Before(status.Created) {
status.Created = pi.TS
}
}
// Get the multiaddresses of the IPFS peers storing this content.
for _, clid := range filteredClusterIDs {
if clid.IPFS == nil {
continue // should not be
}
for _, ma := range clid.IPFS.Addresses {
if madns.Matches(ma.Value()) { // a dns multiaddress: take it
delegates = append(delegates, ma)
continue
}
ip, err := ma.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
ip, err = ma.ValueForProtocol(multiaddr.P_IP6)
if err != nil {
continue
}
}
// We have an IP in the multiaddress. Only include
// global unicast.
netip := net.ParseIP(ip)
if netip == nil {
continue
}
if !netip.IsGlobalUnicast() {
continue
}
delegates = append(delegates, ma)
}
status.Delegates = delegates
}
status.Info = map[string]string{
"source": "IPFS cluster API",
"warning1": "disregard created time",
"warning2": "CID used for requestID. Conflicts possible",
"warning3": "experimenal",
"warning3": "experimental",
}
return status
}
@ -171,10 +123,6 @@ type API struct {
rpcClient *rpc.Client
config *Config
peersMux sync.RWMutex
peers []*types.ID
peersHaveBeenSet chan struct{}
}
// NewAPI creates a new REST API component.
@ -195,7 +143,6 @@ func NewAPIWithHost(ctx context.Context, cfg *Config, h host.Host) (*API, error)
// Routes returns endpoints supported by this API.
func (api *API) routes(c *rpc.Client) []common.Route {
api.rpcClient = c
go api.refreshPeerset()
return []common.Route{
{
Name: "AddPin",
@ -230,56 +177,6 @@ func (api *API) routes(c *rpc.Client) []common.Route {
}
}
func (api *API) refreshPeerset() {
t := time.NewTimer(0) // fire asap
api.peersHaveBeenSet = make(chan struct{})
for range t.C {
select {
case <-api.Context().Done():
return
default:
}
logger.Debug("Fetching peers for caching")
ctx, cancel := context.WithTimeout(api.Context(), time.Minute)
var peers []*types.ID
err := api.rpcClient.CallContext(
ctx,
"",
"Cluster",
"Peers",
struct{}{},
&peers,
)
cancel()
if err != nil {
logger.Errorf("error fetching peers for caching: %s", err)
t.Reset(10 * time.Second)
continue
}
api.peersMux.Lock()
if api.peers == nil && peers != nil {
close(api.peersHaveBeenSet)
}
if peers != nil {
api.peers = peers
}
api.peersMux.Unlock()
t.Reset(time.Minute)
}
}
func (api *API) getPeers() (peers []*types.ID) {
<-api.peersHaveBeenSet
api.peersMux.RLock()
defer api.peersMux.RUnlock()
return api.peers
}
func (api *API) parseBodyOrFail(w http.ResponseWriter, r *http.Request) *pinsvc.Pin {
dec := json.NewDecoder(r.Body)
defer r.Body.Close()
@ -376,7 +273,7 @@ func (api *API) addPin(w http.ResponseWriter, r *http.Request) {
time.Sleep(500 * time.Millisecond)
}
status := globalPinInfoToSvcPinStatus(pinObj.Cid.String(), pinInfo, api.getPeers())
status := globalPinInfoToSvcPinStatus(pinObj.Cid.String(), pinInfo)
api.SendResponse(w, common.SetStatusAutomatically, nil, status)
}
}
@ -386,7 +283,7 @@ func (api *API) getPinObject(ctx context.Context, c cid.Cid) (pinsvc.PinStatus,
if err != nil {
return pinsvc.PinStatus{}, types.GlobalPinInfo{}, err
}
return globalPinInfoToSvcPinStatus(c.String(), clusterPinStatus, api.getPeers()), clusterPinStatus, nil
return globalPinInfoToSvcPinStatus(c.String(), clusterPinStatus), clusterPinStatus, nil
}
@ -462,7 +359,7 @@ func (api *API) listPins(w http.ResponseWriter, r *http.Request) {
return
}
for i, gpi := range globalPinInfos {
st := globalPinInfoToSvcPinStatus(gpi.Cid.String(), *gpi, api.getPeers())
st := globalPinInfoToSvcPinStatus(gpi.Cid.String(), *gpi)
pinList.Results = append(pinList.Results, st)
if i+1 == opts.Limit {
break

View File

@ -2,19 +2,12 @@ package pinsvcapi
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
test "github.com/ipfs/ipfs-cluster/api/common/test"
clustertest "github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
libp2p "github.com/libp2p/go-libp2p"
peer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
@ -33,7 +26,7 @@ const (
func testAPIwithConfig(t *testing.T, cfg *Config, name string) *API {
ctx := context.Background()
apiMAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
h, err := libp2p.New(ctx, libp2p.ListenAddrs(apiMAddr))
h, err := libp2p.New(libp2p.ListenAddrs(apiMAddr))
if err != nil {
t.Fatal(err)
}
@ -62,743 +55,3 @@ func testAPI(t *testing.T) *API {
return testAPIwithConfig(t, cfg, "basic")
}
func TestRestAPIIDEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
id := api.ID{}
test.MakeGet(t, rest, url(rest)+"/id", &id)
if id.ID.Pretty() != clustertest.PeerID1.Pretty() {
t.Error("expected correct id")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIVersionEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
ver := api.Version{}
test.MakeGet(t, rest, url(rest)+"/version", &ver)
if ver.Version != "0.0.mock" {
t.Error("expected correct version")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIPeerstEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var list []*api.ID
test.MakeGet(t, rest, url(rest)+"/peers", &list)
if len(list) != 1 {
t.Fatal("expected 1 element")
}
if list[0].ID.Pretty() != clustertest.PeerID1.Pretty() {
t.Error("expected a different peer id list: ", list)
}
}
test.BothEndpoints(t, tf)
}
func TestAPIPeerAddEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
id := api.ID{}
// post with valid body
body := fmt.Sprintf("{\"peer_id\":\"%s\"}", clustertest.PeerID1.Pretty())
t.Log(body)
test.MakePost(t, rest, url(rest)+"/peers", []byte(body), &id)
if id.ID.Pretty() != clustertest.PeerID1.Pretty() {
t.Error("expected correct ID")
}
if id.Error != "" {
t.Error("did not expect an error")
}
// Send invalid body
errResp := api.Error{}
test.MakePost(t, rest, url(rest)+"/peers", []byte("oeoeoeoe"), &errResp)
if errResp.Code != 400 {
t.Error("expected error with bad body")
}
// Send invalid peer id
test.MakePost(t, rest, url(rest)+"/peers", []byte("{\"peer_id\": \"ab\"}"), &errResp)
if errResp.Code != 400 {
t.Error("expected error with bad peer_id")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIAddFileEndpointBadContentType(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1"
localURL := url(rest) + fmtStr1
errResp := api.Error{}
test.MakePost(t, rest, localURL, []byte("test"), &errResp)
if errResp.Code != 400 {
t.Error("expected error with bad content-type")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIAddFileEndpointLocal(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
sth := clustertest.NewShardingTestHelper()
defer sth.Clean(t)
// This generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
_, closer := sth.GetTreeMultiReader(t)
closer.Close()
tf := func(t *testing.T, url test.URLFunc) {
fmtStr1 := "/add?shard=false&repl_min=-1&repl_max=-1&stream-channels=true"
localURL := url(rest) + fmtStr1
body, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
resp := api.AddedOutput{}
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
test.MakeStreamingPost(t, rest, localURL, body, mpContentType, &resp)
// resp will contain the last object from the streaming
if resp.Cid.String() != clustertest.ShardingDirBalancedRootCID {
t.Error("Bad Cid after adding: ", resp.Cid)
}
}
test.BothEndpoints(t, tf)
}
func TestAPIAddFileEndpointShard(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
sth := clustertest.NewShardingTestHelper()
defer sth.Clean(t)
// This generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
_, closer := sth.GetTreeMultiReader(t)
closer.Close()
tf := func(t *testing.T, url test.URLFunc) {
body, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
resp := api.AddedOutput{}
fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1&stream-channels=true"
shardURL := url(rest) + fmtStr1
test.MakeStreamingPost(t, rest, shardURL, body, mpContentType, &resp)
}
test.BothEndpoints(t, tf)
}
func TestAPIAddFileEndpoint_StreamChannelsFalse(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
sth := clustertest.NewShardingTestHelper()
defer sth.Clean(t)
// This generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
_, closer := sth.GetTreeMultiReader(t)
closer.Close()
tf := func(t *testing.T, url test.URLFunc) {
body, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
fullBody, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
resp := []api.AddedOutput{}
fmtStr1 := "/add?shard=false&repl_min=-1&repl_max=-1&stream-channels=false"
shardURL := url(rest) + fmtStr1
test.MakePostWithContentType(t, rest, shardURL, fullBody, mpContentType, &resp)
lastHash := resp[len(resp)-1]
if lastHash.Cid.String() != clustertest.ShardingDirBalancedRootCID {
t.Error("Bad Cid after adding: ", lastHash.Cid)
}
}
test.BothEndpoints(t, tf)
}
func TestAPIPeerRemoveEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
test.MakeDelete(t, rest, url(rest)+"/peers/"+clustertest.PeerID1.Pretty(), &struct{}{})
}
test.BothEndpoints(t, tf)
}
func TestConnectGraphEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var cg api.ConnectGraph
test.MakeGet(t, rest, url(rest)+"/health/graph", &cg)
if cg.ClusterID.Pretty() != clustertest.PeerID1.Pretty() {
t.Error("unexpected cluster id")
}
if len(cg.IPFSLinks) != 3 {
t.Error("unexpected number of ipfs peers")
}
if len(cg.ClusterLinks) != 3 {
t.Error("unexpected number of cluster peers")
}
if len(cg.ClustertoIPFS) != 3 {
t.Error("unexpected number of cluster to ipfs links")
}
// test a few link values
pid1 := clustertest.PeerID1
pid4 := clustertest.PeerID4
if _, ok := cg.ClustertoIPFS[peer.Encode(pid1)]; !ok {
t.Fatal("missing cluster peer 1 from cluster to peer links map")
}
if cg.ClustertoIPFS[peer.Encode(pid1)] != pid4 {
t.Error("unexpected ipfs peer mapped to cluster peer 1 in graph")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIPinEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
// test regular post
test.MakePost(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String(), []byte{}, &struct{}{})
errResp := api.Error{}
test.MakePost(t, rest, url(rest)+"/pins/"+clustertest.ErrorCid.String(), []byte{}, &errResp)
if errResp.Message != clustertest.ErrBadCid.Error() {
t.Error("expected different error: ", errResp.Message)
}
test.MakePost(t, rest, url(rest)+"/pins/abcd", []byte{}, &errResp)
if errResp.Code != 400 {
t.Error("should fail with bad Cid")
}
}
test.BothEndpoints(t, tf)
}
type pathCase struct {
path string
opts api.PinOptions
wantErr bool
code int
expectedCid string
}
func (p *pathCase) WithQuery(t *testing.T) string {
query, err := p.opts.ToQuery()
if err != nil {
t.Fatal(err)
}
return p.path + "?" + query
}
var testPinOpts = api.PinOptions{
ReplicationFactorMax: 7,
ReplicationFactorMin: 6,
Name: "hello there",
UserAllocations: []peer.ID{clustertest.PeerID1, clustertest.PeerID2},
ExpireAt: time.Now().Add(30 * time.Second),
}
var pathTestCases = []pathCase{
{
"/ipfs/QmaNJ5acV31sx8jq626qTpAWW4DXKw34aGhx53dECLvXbY",
testPinOpts,
false,
http.StatusOK,
"QmaNJ5acV31sx8jq626qTpAWW4DXKw34aGhx53dECLvXbY",
},
{
"/ipfs/QmbUNM297ZwxB8CfFAznK7H9YMesDoY6Tt5bPgt5MSCB2u/im.gif",
testPinOpts,
false,
http.StatusOK,
clustertest.CidResolved.String(),
},
{
"/ipfs/invalidhash",
testPinOpts,
true,
http.StatusBadRequest,
"",
},
{
"/ipfs/bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4",
testPinOpts,
true,
http.StatusNotFound,
"",
},
// TODO: A case with trailing slash with paths
// clustertest.PathIPNS2, clustertest.PathIPLD2, clustertest.InvalidPath1
}
func TestAPIPinEndpointWithPath(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
for _, testCase := range pathTestCases[:3] {
c, _ := cid.Decode(testCase.expectedCid)
resultantPin := api.PinWithOpts(
c,
testPinOpts,
)
if testCase.wantErr {
errResp := api.Error{}
q := testCase.WithQuery(t)
test.MakePost(t, rest, url(rest)+"/pins"+q, []byte{}, &errResp)
if errResp.Code != testCase.code {
t.Errorf(
"status code: expected: %d, got: %d, path: %s\n",
testCase.code,
errResp.Code,
testCase.path,
)
}
continue
}
pin := api.Pin{}
q := testCase.WithQuery(t)
test.MakePost(t, rest, url(rest)+"/pins"+q, []byte{}, &pin)
if !pin.Equals(resultantPin) {
t.Errorf("pin: expected: %+v", resultantPin)
t.Errorf("pin: got: %+v", pin)
t.Errorf("path: %s", testCase.path)
}
}
}
test.BothEndpoints(t, tf)
}
func TestAPIUnpinEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
// test regular delete
test.MakeDelete(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String(), &struct{}{})
errResp := api.Error{}
test.MakeDelete(t, rest, url(rest)+"/pins/"+clustertest.ErrorCid.String(), &errResp)
if errResp.Message != clustertest.ErrBadCid.Error() {
t.Error("expected different error: ", errResp.Message)
}
test.MakeDelete(t, rest, url(rest)+"/pins/"+clustertest.NotFoundCid.String(), &errResp)
if errResp.Code != http.StatusNotFound {
t.Error("expected different error code: ", errResp.Code)
}
test.MakeDelete(t, rest, url(rest)+"/pins/abcd", &errResp)
if errResp.Code != 400 {
t.Error("expected different error code: ", errResp.Code)
}
}
test.BothEndpoints(t, tf)
}
func TestAPIUnpinEndpointWithPath(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
for _, testCase := range pathTestCases {
if testCase.wantErr {
errResp := api.Error{}
test.MakeDelete(t, rest, url(rest)+"/pins"+testCase.path, &errResp)
if errResp.Code != testCase.code {
t.Errorf(
"status code: expected: %d, got: %d, path: %s\n",
testCase.code,
errResp.Code,
testCase.path,
)
}
continue
}
pin := api.Pin{}
test.MakeDelete(t, rest, url(rest)+"/pins"+testCase.path, &pin)
if pin.Cid.String() != testCase.expectedCid {
t.Errorf(
"cid: expected: %s, got: %s, path: %s\n",
clustertest.CidResolved,
pin.Cid,
testCase.path,
)
}
}
}
test.BothEndpoints(t, tf)
}
func TestAPIAllocationsEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []*api.Pin
test.MakeGet(t, rest, url(rest)+"/allocations?filter=pin,meta-pin", &resp)
if len(resp) != 3 ||
!resp[0].Cid.Equals(clustertest.Cid1) || !resp[1].Cid.Equals(clustertest.Cid2) ||
!resp[2].Cid.Equals(clustertest.Cid3) {
t.Error("unexpected pin list: ", resp)
}
test.MakeGet(t, rest, url(rest)+"/allocations", &resp)
if len(resp) != 3 ||
!resp[0].Cid.Equals(clustertest.Cid1) || !resp[1].Cid.Equals(clustertest.Cid2) ||
!resp[2].Cid.Equals(clustertest.Cid3) {
t.Error("unexpected pin list: ", resp)
}
errResp := api.Error{}
test.MakeGet(t, rest, url(rest)+"/allocations?filter=invalid", &errResp)
if errResp.Code != http.StatusBadRequest {
t.Error("an invalid filter value should 400")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIAllocationEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp api.Pin
test.MakeGet(t, rest, url(rest)+"/allocations/"+clustertest.Cid1.String(), &resp)
if !resp.Cid.Equals(clustertest.Cid1) {
t.Errorf("cid should be the same: %s %s", resp.Cid, clustertest.Cid1)
}
errResp := api.Error{}
test.MakeGet(t, rest, url(rest)+"/allocations/"+clustertest.ErrorCid.String(), &errResp)
if errResp.Code != 404 {
t.Error("a non-pinned cid should 404")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIMetricsEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []*api.Metric
test.MakeGet(t, rest, url(rest)+"/monitor/metrics/somemetricstype", &resp)
if len(resp) == 0 {
t.Fatal("No metrics found")
}
for _, m := range resp {
if m.Name != "test" {
t.Error("Unexpected metric name: ", m.Name)
}
if m.Peer.Pretty() != clustertest.PeerID1.Pretty() {
t.Error("Unexpected peer id: ", m.Peer)
}
}
}
test.BothEndpoints(t, tf)
}
func TestAPIMetricNamesEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []string
test.MakeGet(t, rest, url(rest)+"/monitor/metrics", &resp)
if len(resp) == 0 {
t.Fatal("No metric names found")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIAlertsEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []api.Alert
test.MakeGet(t, rest, url(rest)+"/health/alerts", &resp)
if len(resp) != 1 {
t.Error("expected one alert")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIStatusAllEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins", &resp)
if len(resp) != 3 ||
!resp[0].Cid.Equals(clustertest.Cid1) ||
resp[1].PeerMap[peer.Encode(clustertest.PeerID1)].Status.String() != "pinning" {
t.Errorf("unexpected statusAll resp")
}
// Test local=true
var resp2 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins?local=true", &resp2)
if len(resp2) != 2 {
t.Errorf("unexpected statusAll+local resp:\n %+v", resp2)
}
// Test with filter
var resp3 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins?filter=queued", &resp3)
if len(resp3) != 0 {
t.Errorf("unexpected statusAll+filter=queued resp:\n %+v", resp3)
}
var resp4 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins?filter=pinned", &resp4)
if len(resp4) != 1 {
t.Errorf("unexpected statusAll+filter=pinned resp:\n %+v", resp4)
}
var resp5 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins?filter=pin_error", &resp5)
if len(resp5) != 1 {
t.Errorf("unexpected statusAll+filter=pin_error resp:\n %+v", resp5)
}
var resp6 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins?filter=error", &resp6)
if len(resp6) != 1 {
t.Errorf("unexpected statusAll+filter=error resp:\n %+v", resp6)
}
var resp7 []*api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins?filter=error,pinned", &resp7)
if len(resp7) != 2 {
t.Errorf("unexpected statusAll+filter=error,pinned resp:\n %+v", resp7)
}
var errorResp api.Error
test.MakeGet(t, rest, url(rest)+"/pins?filter=invalid", &errorResp)
if errorResp.Code != http.StatusBadRequest {
t.Error("an invalid filter value should 400")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIStatusEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String(), &resp)
if !resp.Cid.Equals(clustertest.Cid1) {
t.Error("expected the same cid")
}
info, ok := resp.PeerMap[peer.Encode(clustertest.PeerID1)]
if !ok {
t.Fatal("expected info for clustertest.PeerID1")
}
if info.Status.String() != "pinned" {
t.Error("expected different status")
}
// Test local=true
var resp2 api.GlobalPinInfo
test.MakeGet(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String()+"?local=true", &resp2)
if !resp2.Cid.Equals(clustertest.Cid1) {
t.Error("expected the same cid")
}
info, ok = resp2.PeerMap[peer.Encode(clustertest.PeerID2)]
if !ok {
t.Fatal("expected info for clustertest.PeerID2")
}
if info.Status.String() != "pinned" {
t.Error("expected different status")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIRecoverEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp api.GlobalPinInfo
test.MakePost(t, rest, url(rest)+"/pins/"+clustertest.Cid1.String()+"/recover", []byte{}, &resp)
if !resp.Cid.Equals(clustertest.Cid1) {
t.Error("expected the same cid")
}
info, ok := resp.PeerMap[peer.Encode(clustertest.PeerID1)]
if !ok {
t.Fatal("expected info for clustertest.PeerID1")
}
if info.Status.String() != "pinned" {
t.Error("expected different status")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIRecoverAllEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []*api.GlobalPinInfo
test.MakePost(t, rest, url(rest)+"/pins/recover?local=true", []byte{}, &resp)
if len(resp) != 0 {
t.Fatal("bad response length")
}
var resp1 []*api.GlobalPinInfo
test.MakePost(t, rest, url(rest)+"/pins/recover", []byte{}, &resp1)
if len(resp1) == 0 {
t.Fatal("bad response length")
}
}
test.BothEndpoints(t, tf)
}
func TestAPIIPFSGCEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
testGlobalRepoGC := func(t *testing.T, gRepoGC *api.GlobalRepoGC) {
if gRepoGC.PeerMap == nil {
t.Fatal("expected a non-nil peer map")
}
if len(gRepoGC.PeerMap) != 1 {
t.Error("expected repo gc information for one peer")
}
for _, repoGC := range gRepoGC.PeerMap {
if repoGC.Peer == "" {
t.Error("expected a cluster ID")
}
if repoGC.Error != "" {
t.Error("did not expect any error")
}
if repoGC.Keys == nil {
t.Fatal("expected a non-nil array of IPFSRepoGC")
}
if len(repoGC.Keys) == 0 {
t.Fatal("expected at least one key, but found none")
}
if !repoGC.Keys[0].Key.Equals(clustertest.Cid1) {
t.Errorf("expected a different cid, expected: %s, found: %s", clustertest.Cid1, repoGC.Keys[0].Key)
}
}
}
tf := func(t *testing.T, url test.URLFunc) {
var resp api.GlobalRepoGC
test.MakePost(t, rest, url(rest)+"/ipfs/gc?local=true", []byte{}, &resp)
testGlobalRepoGC(t, &resp)
var resp1 api.GlobalRepoGC
test.MakePost(t, rest, url(rest)+"/ipfs/gc", []byte{}, &resp1)
testGlobalRepoGC(t, &resp1)
}
test.BothEndpoints(t, tf)
}

View File

@ -303,13 +303,14 @@ func (gpi *GlobalPinInfo) Match(filter TrackerStatus) bool {
// PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo
// objects and does not carry redundant information as PinInfo would.
type PinInfoShort struct {
PeerName string `json:"peername" codec:"pn,omitempty"`
IPFS peer.ID `json:"ipfs_peer_id,omitempty" codec:"i,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
TS time.Time `json:"timestamp" codec:"ts,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
AttemptCount int `json:"attempt_count" codec:"a,omitempty"`
PriorityPin bool `json:"priority_pin" codec:"y,omitempty"`
PeerName string `json:"peername" codec:"pn,omitempty"`
IPFS peer.ID `json:"ipfs_peer_id,omitempty" codec:"i,omitempty"`
IPFSAddresses []Multiaddr `json:"ipfs_peer_addresses,omitempty" codec:"ia,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
TS time.Time `json:"timestamp" codec:"ts,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
AttemptCount int `json:"attempt_count" codec:"a,omitempty"`
PriorityPin bool `json:"priority_pin" codec:"y,omitempty"`
}
// PinInfo holds information about local pins. This is used by the Pin

View File

@ -383,8 +383,9 @@ func (c *Cluster) sendPingMetric(ctx context.Context) (*api.Metric, error) {
id := c.ID(ctx)
newPingVal := pingValue{
Peername: id.Peername,
IPFSID: id.IPFS.ID,
Peername: id.Peername,
IPFSID: id.IPFS.ID,
IPFSAddresses: publicIPFSAddresses(id.IPFS.Addresses),
}
if c.curPingVal.Valid() &&
!newPingVal.Valid() { // i.e. ipfs down
@ -1752,10 +1753,6 @@ func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer.
func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) {
for _, p := range peers {
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p))
peerName := pv.Peername
if peerName == "" {
peerName = p.String()
}
gpin.Add(api.PinInfo{
Cid: h,
Name: pin.Name,
@ -1764,10 +1761,11 @@ func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []p
Metadata: pin.Metadata,
Peer: p,
PinInfoShort: api.PinInfoShort{
PeerName: pv.Peername,
IPFS: pv.IPFSID,
Status: status,
TS: t,
PeerName: pv.Peername,
IPFS: pv.IPFSID,
IPFSAddresses: pv.IPFSAddresses,
Status: status,
TS: t,
},
})
}
@ -1883,10 +1881,6 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, dests[i]))
peerName := pv.Peername
if peerName == "" {
peerName = dests[i].String()
}
gpin.Add(api.PinInfo{
Cid: h,
Name: pin.Name,
@ -1895,11 +1889,12 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
Origins: pin.Origins,
Metadata: pin.Metadata,
PinInfoShort: api.PinInfoShort{
PeerName: pv.Peername,
IPFS: pv.IPFSID,
Status: api.TrackerStatusClusterError,
TS: timeNow,
Error: e.Error(),
PeerName: pv.Peername,
IPFS: pv.IPFSID,
IPFSAddresses: pv.IPFSAddresses,
Status: api.TrackerStatusClusterError,
TS: timeNow,
Error: e.Error(),
},
})
}
@ -1989,11 +1984,12 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, a
Origins: nil,
Metadata: nil,
PinInfoShort: api.PinInfoShort{
PeerName: pv.Peername,
IPFS: pv.IPFSID,
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
PeerName: pv.Peername,
IPFS: pv.IPFSID,
IPFSAddresses: pv.IPFSAddresses,
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
},
})
}
@ -2134,6 +2130,7 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) {
// to club `RepoGCLocal` responses of all peers into one
globalRepoGC := api.GlobalRepoGC{PeerMap: make(map[string]*api.RepoGC)}
for _, member := range members {
var repoGC api.RepoGC
err = c.rpcClient.CallContext(
@ -2156,9 +2153,11 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) {
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err)
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, member))
globalRepoGC.PeerMap[peer.Encode(member)] = &api.RepoGC{
Peer: member,
Peername: peer.Encode(member),
Peername: pv.Peername,
Keys: []api.IPFSRepoGC{},
Error: err.Error(),
}

View File

@ -86,20 +86,20 @@ func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Co
// we can get our IPFS id from our own monitor ping metrics which
// are refreshed regularly.
func (spt *Tracker) getIPFSID(ctx context.Context) peer.ID {
func (spt *Tracker) getIPFSID(ctx context.Context) api.IPFSID {
// Wait until RPC is ready
<-spt.rpcReady
var pid peer.ID
var ipfsid api.IPFSID
spt.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSID",
struct{}{},
&pid,
&ipfsid,
)
return pid
return ipfsid
}
// receives a pin Function (pin or unpin) and channels. Used for both pinning
@ -335,7 +335,8 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []*
// PinError.
ipfsid := spt.getIPFSID(ctx)
for _, infop := range spt.optracker.GetAll(ctx) {
infop.IPFS = ipfsid
infop.IPFS = ipfsid.ID
infop.IPFSAddresses = ipfsid.Addresses
pininfos[infop.Cid] = infop
}
@ -354,10 +355,13 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Status")
defer span.End()
ipfsid := spt.getIPFSID(ctx)
// check if c has an inflight operation or errorred operation in optracker
if oppi, ok := spt.optracker.GetExists(ctx, c); ok {
// if it does return the status of the operation
oppi.IPFS = spt.getIPFSID(ctx)
oppi.IPFS = ipfsid.ID
oppi.IPFSAddresses = ipfsid.Addresses
return oppi
}
@ -366,11 +370,12 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
Peer: spt.peerID,
Name: "", // etc to be filled later
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: spt.getIPFSID(ctx),
TS: time.Now(),
AttemptCount: 0,
PriorityPin: false,
PeerName: spt.peerName,
IPFS: ipfsid.ID,
IPFSAddresses: ipfsid.Addresses,
TS: time.Now(),
AttemptCount: 0,
PriorityPin: false,
},
}
@ -547,12 +552,13 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo
Metadata: nil, // to be filled later
Peer: spt.peerID,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: ipfsid,
Status: ips.ToTrackerStatus(),
TS: time.Now(), // to be set later
AttemptCount: 0,
PriorityPin: false,
PeerName: spt.peerName,
IPFS: ipfsid.ID,
IPFSAddresses: ipfsid.Addresses,
Status: ips.ToTrackerStatus(),
TS: time.Now(), // to be set later
AttemptCount: 0,
PriorityPin: false,
},
}
pins[c] = p
@ -615,11 +621,12 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.T
Origins: p.Origins,
Metadata: p.Metadata,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: ipfsid,
TS: p.Timestamp,
AttemptCount: 0,
PriorityPin: false,
PeerName: spt.peerName,
IPFS: ipfsid.ID,
IPFSAddresses: ipfsid.Addresses,
TS: p.Timestamp,
AttemptCount: 0,
PriorityPin: false,
},
}

View File

@ -432,8 +432,13 @@ func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *[]api
}
// IPFSID returns the current cached IPFS ID.
func (rpcapi *ClusterRPCAPI) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error {
*out = rpcapi.c.curPingVal.IPFSID
func (rpcapi *ClusterRPCAPI) IPFSID(ctx context.Context, in struct{}, out *api.IPFSID) error {
pingVal := pingValueFromMetric(rpcapi.c.monitor.LatestForPeer(ctx, pingMetricName, rpcapi.c.host.ID()))
i := api.IPFSID{
ID: pingVal.IPFSID,
Addresses: pingVal.IPFSAddresses,
}
*out = i
return nil
}

View File

@ -379,8 +379,10 @@ func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *[]api.Ale
return nil
}
func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error {
*out = PeerID1
func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *api.IPFSID) error {
var id api.ID
_ = mock.ID(ctx, in, &id)
*out = id.IPFS
return nil
}

38
util.go
View File

@ -5,13 +5,16 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
blake2b "golang.org/x/crypto/blake2b"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
)
// PeersFromMultiaddrs returns all the different peers in the given addresses.
@ -162,8 +165,9 @@ func peersSubtract(a []peer.ID, b []peer.ID) []peer.ID {
// pingValue describes the value carried by ping metrics
type pingValue struct {
Peername string `json:"peer_name,omitempty"`
IPFSID peer.ID `json:"ipfs_id,omitempty"`
Peername string `json:"peer_name,omitempty"`
IPFSID peer.ID `json:"ipfs_id,omitempty"`
IPFSAddresses []api.Multiaddr `json:"ipfs_addresses,omitempty"`
}
// Valid returns true if the PingValue has IPFSID set.
@ -180,3 +184,33 @@ func pingValueFromMetric(m *api.Metric) (pv pingValue) {
json.Unmarshal([]byte(m.Value), &pv)
return
}
func publicIPFSAddresses(in []api.Multiaddr) []api.Multiaddr {
var out []api.Multiaddr
for _, ma := range in {
if madns.Matches(ma.Value()) { // a dns multiaddress: take it
out = append(out, ma)
continue
}
ip, err := ma.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
ip, err = ma.ValueForProtocol(multiaddr.P_IP6)
if err != nil {
continue
}
}
// We have an IP in the multiaddress. Only include
// global unicast.
netip := net.ParseIP(ip)
if netip == nil {
continue
}
if !netip.IsGlobalUnicast() {
continue
}
out = append(out, ma)
}
return out
}