diff --git a/api/types.go b/api/types.go index 5e5cd86e..943847e3 100644 --- a/api/types.go +++ b/api/types.go @@ -198,7 +198,7 @@ const ( // IPFSPinStatus represents the status of a pin in IPFS (direct, recursive etc.) type IPFSPinStatus int -// IPFSPinStatusFromString parses a string and returns the matching +// IPFSPinStatusFromString parses a strixng and returns the matching // IPFSPinStatus. func IPFSPinStatusFromString(t string) IPFSPinStatus { // Since indirect statuses are of the form "indirect through " @@ -286,6 +286,7 @@ func (gpi *GlobalPinInfo) Add(pi *PinInfo) { // objects and does not carry redundant information as PinInfo would. type PinInfoShort struct { PeerName string `json:"peername" codec:"pn,omitempty"` + IPFS peer.ID `json:"ipfs_peer_id,omitempty" codec:"i,omitempty"` Status TrackerStatus `json:"status" codec:"st,omitempty"` TS time.Time `json:"timestamp" codec:"ts,omitempty"` Error string `json:"error" codec:"e,omitempty"` @@ -298,7 +299,7 @@ type PinInfoShort struct { type PinInfo struct { Cid cid.Cid `json:"cid" codec:"c"` Name string `json:"name" codec:"m,omitempty"` - Peer peer.ID `json:"Peer" codec:"p,omitempty"` + Peer peer.ID `json:"peer" codec:"p,omitempty"` PinInfoShort } @@ -400,7 +401,7 @@ type ID struct { Commit string `json:"commit" codec:"c,omitempty"` RPCProtocolVersion protocol.ID `json:"rpc_protocol_version" codec:"rv,omitempty"` Error string `json:"error" codec:"e,omitempty"` - IPFS *IPFSID `json:"ipfs,omitempty" codec:"ip,omitempty"` + IPFS IPFSID `json:"ipfs,omitempty" codec:"ip,omitempty"` Peername string `json:"peername" codec:"pn,omitempty"` //PublicKey crypto.PubKey } diff --git a/api/types_test.go b/api/types_test.go index effe6891..8f320cf8 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -236,7 +236,7 @@ func TestIDCodec(t *testing.T) { Commit: "", RPCProtocolVersion: "abc", Error: "", - IPFS: &IPFSID{ + IPFS: IPFSID{ ID: TestPeerID3, Addresses: []Multiaddr{addr}, Error: "", diff --git a/cluster.go b/cluster.go index 7e19d7d3..aa911557 100644 --- a/cluster.go +++ b/cluster.go @@ -2,6 +2,7 @@ package ipfscluster import ( "context" + "encoding/json" "errors" "fmt" "mime/multipart" @@ -89,6 +90,8 @@ type Cluster struct { shutdownLock sync.Mutex shutdownB bool removed bool + + curPingVal pingValue } // NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host, @@ -233,8 +236,8 @@ func (c *Cluster) setupRPC() error { } func (c *Cluster) setupRPCClients() { - c.tracker.SetClient(c.rpcClient) c.ipfs.SetClient(c.rpcClient) + c.tracker.SetClient(c.rpcClient) for _, api := range c.apis { api.SetClient(c.rpcClient) } @@ -379,10 +382,28 @@ func (c *Cluster) sendPingMetric(ctx context.Context) (*api.Metric, error) { ctx, span := trace.StartSpan(ctx, "cluster/sendPingMetric") defer span.End() + id := c.ID(ctx) + newPingVal := pingValue{ + Peername: id.Peername, + IPFSID: id.IPFS.ID, + } + if c.curPingVal.Valid() && + !newPingVal.Valid() { // i.e. ipfs down + newPingVal = c.curPingVal // use last good value + } + c.curPingVal = newPingVal + + v, err := json.Marshal(newPingVal) + if err != nil { + logger.Error(err) + // continue anyways + } + metric := &api.Metric{ Name: pingMetricName, Peer: c.id, Valid: true, + Value: string(v), } metric.SetTTL(c.config.MonitorPingInterval * 2) return metric, c.monitor.PublishMetric(ctx, metric) @@ -871,7 +892,7 @@ func (c *Cluster) ID(ctx context.Context) *api.ID { ClusterPeersAddresses: addresses, Version: version.Version.String(), RPCProtocolVersion: version.RPCProtocol, - IPFS: ipfsID, + IPFS: *ipfsID, Peername: c.config.Peername, } if err != nil { @@ -1724,14 +1745,20 @@ func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer. return trustedPeers, nil } -func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, name string, t time.Time) { +func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, name string, t time.Time) { for _, p := range peers { + pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p)) + peerName := pv.Peername + if peerName == "" { + peerName = p.String() + } gpin.Add(&api.PinInfo{ Cid: h, Name: name, Peer: p, PinInfoShort: api.PinInfoShort{ - PeerName: p.String(), + PeerName: peerName, + IPFS: pv.IPFSID, Status: status, TS: t, }, @@ -1750,6 +1777,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c var dests []peer.ID // un-allocated peers, we will set remote status var remote []peer.ID + timeNow := time.Now() // If pin is not part of the pinset, mark it unpinned @@ -1773,7 +1801,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c } } - setTrackerStatus( + c.setTrackerStatus( gpin, h, members, @@ -1810,7 +1838,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c } // set status remote on un-allocated peers - setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, pin.Name, timeNow) + c.setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, pin.Name, timeNow) lenDests := len(dests) replies := make([]*api.PinInfo, lenDests) @@ -1846,12 +1874,19 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c // Deal with error cases (err != nil): wrap errors in PinInfo logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e) + + pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, dests[i])) + peerName := pv.Peername + if peerName == "" { + peerName = dests[i].String() + } gpin.Add(&api.PinInfo{ Cid: h, Name: pin.Name, Peer: dests[i], PinInfoShort: api.PinInfoShort{ - PeerName: dests[i].String(), + PeerName: peerName, + IPFS: pv.IPFSID, Status: api.TrackerStatusClusterError, TS: timeNow, Error: e.Error(), @@ -1934,15 +1969,23 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, a // Merge any errors for p, msg := range erroredPeers { + pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, p)) + peerName := pv.Peername + if peerName == "" { + peerName = p.String() + } + for c := range fullMap { setPinInfo(&api.PinInfo{ Cid: c, Name: "", Peer: p, PinInfoShort: api.PinInfoShort{ - Status: api.TrackerStatusClusterError, - TS: time.Now(), - Error: msg, + PeerName: peerName, + IPFS: pv.IPFSID, + Status: api.TrackerStatusClusterError, + TS: time.Now(), + Error: msg, }, }) } diff --git a/cluster_config.go b/cluster_config.go index d2e86d9b..4793e0a0 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -121,9 +121,10 @@ type Config struct { // possible. ReplicationFactorMin int - // MonitorPingInterval is the frequency with which a cluster peer pings - // the monitoring component. The ping metric has a TTL set to the double - // of this value. + // MonitorPingInterval is the frequency with which a cluster peer + // sends a "ping" metric. The metric has a TTL set to the double of + // this value. This metric sends information about this peer to other + // peers. MonitorPingInterval time.Duration // PeerWatchInterval is the frequency that we use to watch for changes diff --git a/cluster_test.go b/cluster_test.go index 2bbdcde6..5b4f221f 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -3,7 +3,6 @@ package ipfscluster import ( "context" "errors" - "fmt" "mime/multipart" "os" "path/filepath" @@ -916,8 +915,6 @@ func TestClusterPeers(t *testing.T) { } if peers[0].ID != ident.ID { - fmt.Println(peers[0].ID) - fmt.Println(ident.ID) t.Error("bad member") } } diff --git a/config_test.go b/config_test.go index 7c592bdf..5ad68392 100644 --- a/config_test.go +++ b/config_test.go @@ -34,8 +34,9 @@ var testingClusterCfg = []byte(`{ }, "state_sync_interval": "1m0s", "pin_recover_interval": "1m0s", + "peer_infos_caching_interval": "500ms", "replication_factor": -1, - "monitor_ping_interval": "1s", + "monitor_ping_interval": "250ms", "peer_watch_interval": "1s", "disable_repinning": false, "mdns_interval": "0s" diff --git a/ipfscluster_test.go b/ipfscluster_test.go index d33494ac..936e58d2 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -872,6 +872,12 @@ func TestClustersStatusAll(t *testing.T) { t.Error("bad info in status") } + for _, pi := range info { + if pi.IPFS != test.PeerID1 { + t.Error("ipfs not set in pin status") + } + } + pid := peer.Encode(c.host.ID()) if info[pid].Status != api.TrackerStatusPinned { t.Error("the hash should have been pinned") @@ -947,6 +953,13 @@ func TestClustersStatusAllWithErrors(t *testing.T) { if errst.Status != api.TrackerStatusClusterError { t.Error("erroring status should be set to ClusterError:", errst.Status) } + if errst.PeerName != "peer_1" { + t.Error("peername should have been set in the erroring peer too from the cache") + } + + if errst.IPFS != test.PeerID1 { + t.Error("IPFS ID should have been set in the erroring peer too from the cache") + } // now check with Cid status status, err := c.Status(ctx, h) @@ -959,6 +972,14 @@ func TestClustersStatusAllWithErrors(t *testing.T) { if pinfo.Status != api.TrackerStatusClusterError { t.Error("erroring status should be ClusterError:", pinfo.Status) } + + // if pinfo.PeerName != "peer_1" { + // t.Error("peername should have been set in the erroring peer too from the cache") + // } + + // if pinfo.IPFS != test.PeerID1 { + // t.Error("IPFS ID should have been set in the erroring peer too from the cache") + // } case "crdt": // CRDT will not have contacted the offline peer because // its metric expired and therefore is not in the diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 817f5a57..784c280f 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -665,7 +665,7 @@ func (ipfs *Connector) ConnectSwarms(ctx context.Context) error { for _, id := range ids { ipfsID := id.IPFS - if ipfsID == nil || id.Error != "" || ipfsID.Error != "" { + if id.Error != "" || ipfsID.Error != "" { continue } for _, addr := range ipfsID.Addresses { diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index 5e87f7bd..d62b1c96 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -147,6 +147,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation) a Peer: opt.pid, PinInfoShort: api.PinInfoShort{ PeerName: opt.peerName, + IPFS: "", Status: api.TrackerStatusUnpinned, TS: time.Now(), AttemptCount: 0, @@ -160,6 +161,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation) a Peer: opt.pid, PinInfoShort: api.PinInfoShort{ PeerName: opt.peerName, + IPFS: "", Status: op.ToTrackerStatus(), TS: op.Timestamp(), AttemptCount: op.AttemptCount(), diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index 66046b41..dcf2885f 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -80,9 +80,28 @@ func New(cfg *Config, pid peer.ID, peerName string, getState func(ctx context.Co go spt.opWorker(spt.pin, spt.priorityPinCh, spt.pinCh) } go spt.opWorker(spt.unpin, spt.unpinCh, nil) + return spt } +// we can get our IPFS id from our own monitor ping metrics which +// are refreshed regularly. +func (spt *Tracker) getIPFSID(ctx context.Context) peer.ID { + // Wait until RPC is ready + <-spt.rpcReady + + var pid peer.ID + spt.rpcClient.CallContext( + ctx, + "", + "Cluster", + "IPFSID", + struct{}{}, + &pid, + ) + return pid +} + // receives a pin Function (pin or unpin) and channels. Used for both pinning // and unpinning. func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, prioCh, normalCh chan *optracker.Operation) { @@ -225,7 +244,7 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera // other components. func (spt *Tracker) SetClient(c *rpc.Client) { spt.rpcClient = c - spt.rpcReady <- struct{}{} + close(spt.rpcReady) } // Shutdown finishes the services provided by the StatelessPinTracker @@ -245,7 +264,6 @@ func (spt *Tracker) Shutdown(ctx context.Context) error { logger.Info("stopping StatelessPinTracker") spt.cancel() - close(spt.rpcReady) spt.wg.Wait() spt.shutdown = true return nil @@ -315,7 +333,9 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []* // we cannot filter in GetAll, because we are meant to replace items in // pininfos and set the correct status, as otherwise they will remain in // PinError. + ipfsid := spt.getIPFSID(ctx) for _, infop := range spt.optracker.GetAll(ctx) { + infop.IPFS = ipfsid pininfos[infop.Cid] = infop } @@ -337,6 +357,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { // check if c has an inflight operation or errorred operation in optracker if oppi, ok := spt.optracker.GetExists(ctx, c); ok { // if it does return the status of the operation + oppi.IPFS = spt.getIPFSID(ctx) return oppi } @@ -345,6 +366,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { Peer: spt.peerID, PinInfoShort: api.PinInfoShort{ PeerName: spt.peerName, + IPFS: spt.getIPFSID(ctx), TS: time.Now(), AttemptCount: 0, PriorityPin: false, @@ -505,6 +527,7 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo logger.Error(err) return nil, err } + ipfsid := spt.getIPFSID(ctx) pins := make(map[cid.Cid]*api.PinInfo, len(ipsMap)) for cidstr, ips := range ipsMap { c, err := cid.Decode(cidstr) @@ -518,6 +541,7 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo Peer: spt.peerID, PinInfoShort: api.PinInfoShort{ PeerName: spt.peerName, + IPFS: ipfsid, Status: ips.ToTrackerStatus(), TS: time.Now(), // to be set later AttemptCount: 0, @@ -572,6 +596,7 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.T } pininfos := make(map[cid.Cid]*api.PinInfo, len(statePins)) + ipfsid := spt.getIPFSID(ctx) for _, p := range statePins { ipfsInfo, pinnedInIpfs := localpis[p.Cid] // base pinInfo object - status to be filled. @@ -581,6 +606,7 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.T Peer: spt.peerID, PinInfoShort: api.PinInfoShort{ PeerName: spt.peerName, + IPFS: ipfsid, TS: p.Timestamp, AttemptCount: 0, PriorityPin: false, diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index 77e31a54..2afe7665 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/ipfs-cluster/test" cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) @@ -36,8 +37,7 @@ var ( // Overwrite Pin and Unpin methods on the normal mock in order to return // special errors when unwanted operations have been triggered. -type mockIPFS struct { -} +type mockIPFS struct{} func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error { switch in.Cid { @@ -85,6 +85,13 @@ func (mock *mockIPFS) PinLsCid(ctx context.Context, in *api.Pin, out *api.IPFSPi return nil } +type mockCluster struct{} + +func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error { + *out = test.PeerID1 + return nil +} + func mockRPCClient(t testing.TB) *rpc.Client { t.Helper() @@ -95,6 +102,11 @@ func mockRPCClient(t testing.TB) *rpc.Client { if err != nil { t.Fatal(err) } + + err = s.RegisterName("Cluster", &mockCluster{}) + if err != nil { + t.Fatal(err) + } return c } @@ -427,6 +439,9 @@ func TestStatusAll(t *testing.T) { default: t.Error("Unexpected pin:", pi.Cid) } + if pi.IPFS == "" { + t.Error("IPFS field should be set") + } } } @@ -472,6 +487,10 @@ func TestStatus(t *testing.T) { if st.Status != api.TrackerStatusPinning { t.Error("slowCid1 should be pinning") } + + if st.IPFS == "" { + t.Error("IPFS field should be set") + } } // Test diff --git a/rpc_api.go b/rpc_api.go index ff880398..a16e9377 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -88,7 +88,7 @@ func newRPCServer(c *Cluster) (*rpc.Server, error) { if err != nil { return nil, err } - pm := &PeerMonitorRPCAPI{c.monitor} + pm := &PeerMonitorRPCAPI{mon: c.monitor, pid: c.id} err = s.RegisterName(RPCServiceID(pm), pm) if err != nil { return nil, err @@ -142,6 +142,7 @@ type ConsensusRPCAPI struct { // internal peer API for the PeerMonitor component. type PeerMonitorRPCAPI struct { mon PeerMonitor + pid peer.ID } /* @@ -426,6 +427,12 @@ func (rpcapi *ClusterRPCAPI) Alerts(ctx context.Context, in struct{}, out *[]api return nil } +// IPFSID returns the current cached IPFS ID. +func (rpcapi *ClusterRPCAPI) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error { + *out = rpcapi.c.curPingVal.IPFSID + return nil +} + /* Tracker component methods */ diff --git a/rpc_policy.go b/rpc_policy.go index 1aed97c6..d324ebaa 100644 --- a/rpc_policy.go +++ b/rpc_policy.go @@ -7,9 +7,11 @@ package ipfscluster // without missing any endpoint. var DefaultRPCPolicy = map[string]RPCEndpointType{ // Cluster methods + "Cluster.Alerts": RPCClosed, "Cluster.BlockAllocate": RPCClosed, "Cluster.ConnectGraph": RPCClosed, "Cluster.ID": RPCOpen, + "Cluster.IPFSID": RPCClosed, "Cluster.Join": RPCClosed, "Cluster.PeerAdd": RPCOpen, // Used by Join() "Cluster.PeerRemove": RPCTrusted, @@ -26,7 +28,6 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{ "Cluster.RepoGCLocal": RPCTrusted, "Cluster.SendInformerMetrics": RPCClosed, "Cluster.SendInformersMetrics": RPCClosed, - "Cluster.Alerts": RPCClosed, "Cluster.Status": RPCClosed, "Cluster.StatusAll": RPCClosed, "Cluster.StatusAllLocal": RPCClosed, diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index e9e5fc00..fa3e9339 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -52,7 +52,11 @@ func NewMockRPCClientWithHost(t testing.TB, h host.Host) *rpc.Client { if err != nil { t.Fatal(err) } - err = s.RegisterName("PeerMonitor", &mockPeerMonitor{}) + var pid peer.ID + if h != nil { + pid = h.ID() + } + err = s.RegisterName("PeerMonitor", &mockPeerMonitor{pid: pid}) if err != nil { t.Fatal(err) } @@ -64,7 +68,9 @@ type mockCluster struct{} type mockPinTracker struct{} type mockIPFSConnector struct{} type mockConsensus struct{} -type mockPeerMonitor struct{} +type mockPeerMonitor struct { + pid peer.ID +} func (mock *mockCluster) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error { if in.Cid.Equals(ErrorCid) { @@ -163,7 +169,7 @@ func (mock *mockCluster) ID(ctx context.Context, in struct{}, out *api.ID) error ID: PeerID1, //PublicKey: pubkey, Version: "0.0.mock", - IPFS: &api.IPFSID{ + IPFS: api.IPFSID{ ID: PeerID1, Addresses: []api.Multiaddr{addr}, }, @@ -375,6 +381,11 @@ func (mock *mockCluster) Alerts(ctx context.Context, in struct{}, out *[]api.Ale return nil } +func (mock *mockCluster) IPFSID(ctx context.Context, in struct{}, out *peer.ID) error { + *out = PeerID1 + return nil +} + /* Tracker methods */ func (mock *mockPinTracker) Track(ctx context.Context, in *api.Pin, out *struct{}) error { diff --git a/util.go b/util.go index 5b23779e..52cbab9f 100644 --- a/util.go +++ b/util.go @@ -2,12 +2,14 @@ package ipfscluster import ( "bytes" + "encoding/json" "errors" "fmt" blake2b "golang.org/x/crypto/blake2b" cid "github.com/ipfs/go-cid" + "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" ) @@ -157,3 +159,24 @@ func peersSubtract(a []peer.ID, b []peer.ID) []peer.ID { return result } + +// pingValue describes the value carried by ping metrics +type pingValue struct { + Peername string `json:"peer_name,omitempty"` + IPFSID peer.ID `json:"ipfs_id,omitempty"` +} + +// Valid returns true if the PingValue has IPFSID set. +func (pv pingValue) Valid() bool { + return pv.IPFSID != "" +} + +// PingValue from metric parses a ping value from the value of a given metric, +// if possible. +func pingValueFromMetric(m *api.Metric) (pv pingValue) { + if m == nil { + return + } + json.Unmarshal([]byte(m.Value), &pv) + return +}