Pintracker: add IPFS ID to Pin Information

Fixes #1554
Fixes: peer names unset for remote peers

This adds an IPFS field to pin status information (PinInfoShort).

It has not been easy to add this, given that the IPFS ID is something that
comes from outside of cluster (unlike the peer name). After several tries I
have settled in the following things:

- Use the ping metric to send out peer names and IPFS IDs to the peers in the
  cluster.
- Cache the latest known IPFS ID (if IPFS dies we should still be setting
  the ID).
- Provide an RPC method for the Pintracker to obtain IPFS ID from the cache.
- Given we now know information for peernames and IPFS IDs from other peers,
  we can use that information even if the requests to them error or we are not
  contacting (i.e. peers allocated as remote are not queried for status). We can
  use the information from the last received ping metric.
- This means we should keep metrics around even if peers go away, at least for
  a while rather than deleting them as soon as we detect that they have expired.

Puting it all together we now have a system to gossip peer information around on top
of the ping metrics.
This commit is contained in:
Hector Sanjuan 2022-01-28 18:21:11 +01:00
parent 000dccc1cc
commit 809b7fbda5
15 changed files with 184 additions and 31 deletions

View File

@ -198,7 +198,7 @@ const (
// IPFSPinStatus represents the status of a pin in IPFS (direct, recursive etc.)
type IPFSPinStatus int
// IPFSPinStatusFromString parses a string and returns the matching
// IPFSPinStatusFromString parses a strixng and returns the matching
// IPFSPinStatus.
func IPFSPinStatusFromString(t string) IPFSPinStatus {
// Since indirect statuses are of the form "indirect through <cid>"
@ -286,6 +286,7 @@ func (gpi *GlobalPinInfo) Add(pi *PinInfo) {
// objects and does not carry redundant information as PinInfo would.
type PinInfoShort struct {
PeerName string `json:"peername" codec:"pn,omitempty"`
IPFS peer.ID `json:"ipfs_peer_id,omitempty" codec:"i,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
TS time.Time `json:"timestamp" codec:"ts,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
@ -298,7 +299,7 @@ type PinInfoShort struct {
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"m,omitempty"`
Peer peer.ID `json:"Peer" codec:"p,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
PinInfoShort
}
@ -400,7 +401,7 @@ type ID struct {
Commit string `json:"commit" codec:"c,omitempty"`
RPCProtocolVersion protocol.ID `json:"rpc_protocol_version" codec:"rv,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
IPFS *IPFSID `json:"ipfs,omitempty" codec:"ip,omitempty"`
IPFS IPFSID `json:"ipfs,omitempty" codec:"ip,omitempty"`
Peername string `json:"peername" codec:"pn,omitempty"`
//PublicKey crypto.PubKey
}

View File

@ -236,7 +236,7 @@ func TestIDCodec(t *testing.T) {
Commit: "",
RPCProtocolVersion: "abc",
Error: "",
IPFS: &IPFSID{
IPFS: IPFSID{
ID: TestPeerID3,
Addresses: []Multiaddr{addr},
Error: "",

View File

@ -2,6 +2,7 @@ package ipfscluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"mime/multipart"
@ -89,6 +90,8 @@ type Cluster struct {
shutdownLock sync.Mutex
shutdownB bool
removed bool
curPingVal pingValue
}
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
@ -233,8 +236,8 @@ func (c *Cluster) setupRPC() error {
}
func (c *Cluster) setupRPCClients() {
c.tracker.SetClient(c.rpcClient)
c.ipfs.SetClient(c.rpcClient)
c.tracker.SetClient(c.rpcClient)
for _, api := range c.apis {
api.SetClient(c.rpcClient)
}
@ -379,10 +382,28 @@ func (c *Cluster) sendPingMetric(ctx context.Context) (*api.Metric, error) {
ctx, span := trace.StartSpan(ctx, "cluster/sendPingMetric")
defer span.End()
id := c.ID(ctx)
newPingVal := pingValue{
Peername: id.Peername,
IPFSID: id.IPFS.ID,
}
if c.curPingVal.Valid() &&
!newPingVal.Valid() { // i.e. ipfs down
newPingVal = c.curPingVal // use last good value
}
c.curPingVal = newPingVal
v, err := json.Marshal(newPingVal)
if err != nil {
logger.Error(err)
// continue anyways
}
metric := &api.Metric{
Name: pingMetricName,
Peer: c.id,
Valid: true,
Value: string(v),
}
metric.SetTTL(c.config.MonitorPingInterval * 2)
return metric, c.monitor.PublishMetric(ctx, metric)
@ -871,7 +892,7 @@ func (c *Cluster) ID(ctx context.Context) *api.ID {
ClusterPeersAddresses: addresses,
Version: version.Version.String(),
RPCProtocolVersion: version.RPCProtocol,
IPFS: ipfsID,
IPFS: *ipfsID,
Peername: c.config.Peername,
}
if err != nil {
@ -1724,14 +1745,20 @@ func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer.
return trustedPeers, nil
}
func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, name string, t time.Time) {
func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, name string, t time.Time) {
for _, p := range peers {
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p))
peerName := pv.Peername
if peerName == "" {
peerName = p.String()
}
gpin.Add(&api.PinInfo{
Cid: h,
Name: name,
Peer: p,
PinInfoShort: api.PinInfoShort{
PeerName: p.String(),
PeerName: peerName,
IPFS: pv.IPFSID,
Status: status,
TS: t,
},
@ -1750,6 +1777,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
var dests []peer.ID
// un-allocated peers, we will set remote status
var remote []peer.ID
timeNow := time.Now()
// If pin is not part of the pinset, mark it unpinned
@ -1773,7 +1801,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
}
}
setTrackerStatus(
c.setTrackerStatus(
gpin,
h,
members,
@ -1810,7 +1838,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
}
// set status remote on un-allocated peers
setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, pin.Name, timeNow)
c.setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, pin.Name, timeNow)
lenDests := len(dests)
replies := make([]*api.PinInfo, lenDests)
@ -1846,12 +1874,19 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
// Deal with error cases (err != nil): wrap errors in PinInfo
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, dests[i]))
peerName := pv.Peername
if peerName == "" {
peerName = dests[i].String()
}
gpin.Add(&api.PinInfo{
Cid: h,
Name: pin.Name,
Peer: dests[i],
PinInfoShort: api.PinInfoShort{
PeerName: dests[i].String(),
PeerName: peerName,
IPFS: pv.IPFSID,
Status: api.TrackerStatusClusterError,
TS: timeNow,
Error: e.Error(),
@ -1934,15 +1969,23 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, a
// Merge any errors
for p, msg := range erroredPeers {
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, p))
peerName := pv.Peername
if peerName == "" {
peerName = p.String()
}
for c := range fullMap {
setPinInfo(&api.PinInfo{
Cid: c,
Name: "",
Peer: p,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
PeerName: peerName,
IPFS: pv.IPFSID,
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
},
})
}

View File

@ -121,9 +121,10 @@ type Config struct {
// possible.
ReplicationFactorMin int
// MonitorPingInterval is the frequency with which a cluster peer pings
// the monitoring component. The ping metric has a TTL set to the double
// of this value.
// MonitorPingInterval is the frequency with which a cluster peer
// sends a "ping" metric. The metric has a TTL set to the double of
// this value. This metric sends information about this peer to other
// peers.
MonitorPingInterval time.Duration
// PeerWatchInterval is the frequency that we use to watch for changes

View File

@ -3,7 +3,6 @@ package ipfscluster
import (
"context"
"errors"
"fmt"
"mime/multipart"
"os"
"path/filepath"
@ -916,8 +915,6 @@ func TestClusterPeers(t *testing.T) {
}
if peers[0].ID != ident.ID {
fmt.Println(peers[0].ID)
fmt.Println(ident.ID)
t.Error("bad member")
}
}

View File

@ -34,8 +34,9 @@ var testingClusterCfg = []byte(`{
},
"state_sync_interval": "1m0s",
"pin_recover_interval": "1m0s",
"peer_infos_caching_interval": "500ms",
"replication_factor": -1,
"monitor_ping_interval": "1s",
"monitor_ping_interval": "250ms",
"peer_watch_interval": "1s",
"disable_repinning": false,
"mdns_interval": "0s"

View File

@ -872,6 +872,12 @@ func TestClustersStatusAll(t *testing.T) {
t.Error("bad info in status")
}
for _, pi := range info {
if pi.IPFS != test.PeerID1 {
t.Error("ipfs not set in pin status")
}
}
pid := peer.Encode(c.host.ID())
if info[pid].Status != api.TrackerStatusPinned {
t.Error("the hash should have been pinned")
@ -947,6 +953,13 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
if errst.Status != api.TrackerStatusClusterError {
t.Error("erroring status should be set to ClusterError:", errst.Status)
}
if errst.PeerName != "peer_1" {
t.Error("peername should have been set in the erroring peer too from the cache")
}
if errst.IPFS != test.PeerID1 {
t.Error("IPFS ID should have been set in the erroring peer too from the cache")
}
// now check with Cid status
status, err := c.Status(ctx, h)
@ -959,6 +972,14 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
if pinfo.Status != api.TrackerStatusClusterError {
t.Error("erroring status should be ClusterError:", pinfo.Status)
}
// if pinfo.PeerName != "peer_1" {
// t.Error("peername should have been set in the erroring peer too from the cache")
// }
// if pinfo.IPFS != test.PeerID1 {
// t.Error("IPFS ID should have been set in the erroring peer too from the cache")
// }
case "crdt":
// CRDT will not have contacted the offline peer because
// its metric expired and therefore is not in the

View File

@ -665,7 +665,7 @@ func (ipfs *Connector) ConnectSwarms(ctx context.Context) error {
for _, id := range ids {
ipfsID := id.IPFS
if ipfsID == nil || id.Error != "" || ipfsID.Error != "" {
if id.Error != "" || ipfsID.Error != "" {
continue
}
for _, addr := range ipfsID.Addresses {

View File

@ -147,6 +147,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation) a
Peer: opt.pid,
PinInfoShort: api.PinInfoShort{
PeerName: opt.peerName,
IPFS: "",
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
AttemptCount: 0,
@ -160,6 +161,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation) a
Peer: opt.pid,
PinInfoShort: api.PinInfoShort{
PeerName: opt.peerName,
IPFS: "",
Status: op.ToTrackerStatus(),
TS: op.Timestamp(),
AttemptCount: op.AttemptCount(),

View File

@ -80,9 +80,28 @@ func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Co
go spt.opWorker(spt.pin, spt.priorityPinCh, spt.pinCh)
}
go spt.opWorker(spt.unpin, spt.unpinCh, nil)
return spt
}
// we can get our IPFS id from our own monitor ping metrics which
// are refreshed regularly.
func (spt *Tracker) getIPFSID(ctx context.Context) peer.ID {
// Wait until RPC is ready
<-spt.rpcReady
var pid peer.ID
spt.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSID",
struct{}{},
&pid,
)
return pid
}
// receives a pin Function (pin or unpin) and channels. Used for both pinning
// and unpinning.
func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, prioCh, normalCh chan *optracker.Operation) {
@ -225,7 +244,7 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera
// other components.
func (spt *Tracker) SetClient(c *rpc.Client) {
spt.rpcClient = c
spt.rpcReady <- struct{}{}
close(spt.rpcReady)
}
// Shutdown finishes the services provided by the StatelessPinTracker
@ -245,7 +264,6 @@ func (spt *Tracker) Shutdown(ctx context.Context) error {
logger.Info("stopping StatelessPinTracker")
spt.cancel()
close(spt.rpcReady)
spt.wg.Wait()
spt.shutdown = true
return nil
@ -315,7 +333,9 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []*
// we cannot filter in GetAll, because we are meant to replace items in
// pininfos and set the correct status, as otherwise they will remain in
// PinError.
ipfsid := spt.getIPFSID(ctx)
for _, infop := range spt.optracker.GetAll(ctx) {
infop.IPFS = ipfsid
pininfos[infop.Cid] = infop
}
@ -337,6 +357,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
// check if c has an inflight operation or errorred operation in optracker
if oppi, ok := spt.optracker.GetExists(ctx, c); ok {
// if it does return the status of the operation
oppi.IPFS = spt.getIPFSID(ctx)
return oppi
}
@ -345,6 +366,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
Peer: spt.peerID,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: spt.getIPFSID(ctx),
TS: time.Now(),
AttemptCount: 0,
PriorityPin: false,
@ -505,6 +527,7 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo
logger.Error(err)
return nil, err
}
ipfsid := spt.getIPFSID(ctx)
pins := make(map[cid.Cid]*api.PinInfo, len(ipsMap))
for cidstr, ips := range ipsMap {
c, err := cid.Decode(cidstr)
@ -518,6 +541,7 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo
Peer: spt.peerID,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: ipfsid,
Status: ips.ToTrackerStatus(),
TS: time.Now(), // to be set later
AttemptCount: 0,
@ -572,6 +596,7 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.T
}
pininfos := make(map[cid.Cid]*api.PinInfo, len(statePins))
ipfsid := spt.getIPFSID(ctx)
for _, p := range statePins {
ipfsInfo, pinnedInIpfs := localpis[p.Cid]
// base pinInfo object - status to be filled.
@ -581,6 +606,7 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.T
Peer: spt.peerID,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
IPFS: ipfsid,
TS: p.Timestamp,
AttemptCount: 0,
PriorityPin: false,

View File

@ -13,6 +13,7 @@ import (
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
@ -36,8 +37,7 @@ var (
// Overwrite Pin and Unpin methods on the normal mock in order to return
// special errors when unwanted operations have been triggered.
type mockIPFS struct {
}
type mockIPFS struct{}
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
@ -85,6 +85,13 @@ func (mock *mockIPFS) PinLsCid(ctx context.Context, in *api.Pin, out *api.IPFSPi
return nil
}
type mockCluster struct{}
func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error {
*out = test.PeerID1
return nil
}
func mockRPCClient(t testing.TB) *rpc.Client {
t.Helper()
@ -95,6 +102,11 @@ func mockRPCClient(t testing.TB) *rpc.Client {
if err != nil {
t.Fatal(err)
}
err = s.RegisterName("Cluster", &mockCluster{})
if err != nil {
t.Fatal(err)
}
return c
}
@ -427,6 +439,9 @@ func TestStatusAll(t *testing.T) {
default:
t.Error("Unexpected pin:", pi.Cid)
}
if pi.IPFS == "" {
t.Error("IPFS field should be set")
}
}
}
@ -472,6 +487,10 @@ func TestStatus(t *testing.T) {
if st.Status != api.TrackerStatusPinning {
t.Error("slowCid1 should be pinning")
}
if st.IPFS == "" {
t.Error("IPFS field should be set")
}
}
// Test

View File

@ -88,7 +88,7 @@ func newRPCServer(c *Cluster) (*rpc.Server, error) {
if err != nil {
return nil, err
}
pm := &PeerMonitorRPCAPI{c.monitor}
pm := &PeerMonitorRPCAPI{mon: c.monitor, pid: c.id}
err = s.RegisterName(RPCServiceID(pm), pm)
if err != nil {
return nil, err
@ -142,6 +142,7 @@ type ConsensusRPCAPI struct {
// internal peer API for the PeerMonitor component.
type PeerMonitorRPCAPI struct {
mon PeerMonitor
pid peer.ID
}
/*
@ -426,6 +427,12 @@ func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *[]api
return nil
}
// IPFSID returns the current cached IPFS ID.
func (rpcapi *ClusterRPCAPI) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error {
*out = rpcapi.c.curPingVal.IPFSID
return nil
}
/*
Tracker component methods
*/

View File

@ -7,9 +7,11 @@ package ipfscluster
// without missing any endpoint.
var DefaultRPCPolicy = map[string]RPCEndpointType{
// Cluster methods
"Cluster.Alerts": RPCClosed,
"Cluster.BlockAllocate": RPCClosed,
"Cluster.ConnectGraph": RPCClosed,
"Cluster.ID": RPCOpen,
"Cluster.IPFSID": RPCClosed,
"Cluster.Join": RPCClosed,
"Cluster.PeerAdd": RPCOpen, // Used by Join()
"Cluster.PeerRemove": RPCTrusted,
@ -26,7 +28,6 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.RepoGCLocal": RPCTrusted,
"Cluster.SendInformerMetrics": RPCClosed,
"Cluster.SendInformersMetrics": RPCClosed,
"Cluster.Alerts": RPCClosed,
"Cluster.Status": RPCClosed,
"Cluster.StatusAll": RPCClosed,
"Cluster.StatusAllLocal": RPCClosed,

View File

@ -52,7 +52,11 @@ func NewMockRPCClientWithHost(t testing.TB, h host.Host) *rpc.Client {
if err != nil {
t.Fatal(err)
}
err = s.RegisterName("PeerMonitor", &mockPeerMonitor{})
var pid peer.ID
if h != nil {
pid = h.ID()
}
err = s.RegisterName("PeerMonitor", &mockPeerMonitor{pid: pid})
if err != nil {
t.Fatal(err)
}
@ -64,7 +68,9 @@ type mockCluster struct{}
type mockPinTracker struct{}
type mockIPFSConnector struct{}
type mockConsensus struct{}
type mockPeerMonitor struct{}
type mockPeerMonitor struct {
pid peer.ID
}
func (mock *mockCluster) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
if in.Cid.Equals(ErrorCid) {
@ -163,7 +169,7 @@ func (mock *mockCluster) ID(ctx context.Context, in struct{}, out *api.ID) error
ID: PeerID1,
//PublicKey: pubkey,
Version: "0.0.mock",
IPFS: &api.IPFSID{
IPFS: api.IPFSID{
ID: PeerID1,
Addresses: []api.Multiaddr{addr},
},
@ -375,6 +381,11 @@ func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *[]api.Ale
return nil
}
func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error {
*out = PeerID1
return nil
}
/* Tracker methods */
func (mock *mockPinTracker) Track(ctx context.Context, in *api.Pin, out *struct{}) error {

23
util.go
View File

@ -2,12 +2,14 @@ package ipfscluster
import (
"bytes"
"encoding/json"
"errors"
"fmt"
blake2b "golang.org/x/crypto/blake2b"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
@ -157,3 +159,24 @@ func peersSubtract(a []peer.ID, b []peer.ID) []peer.ID {
return result
}
// pingValue describes the value carried by ping metrics
type pingValue struct {
Peername string `json:"peer_name,omitempty"`
IPFSID peer.ID `json:"ipfs_id,omitempty"`
}
// Valid returns true if the PingValue has IPFSID set.
func (pv pingValue) Valid() bool {
return pv.IPFSID != ""
}
// PingValue from metric parses a ping value from the value of a given metric,
// if possible.
func pingValueFromMetric(m *api.Metric) (pv pingValue) {
if m == nil {
return
}
json.Unmarshal([]byte(m.Value), &pv)
return
}