RPC auth: Rework PeerAdd and Join

PeerAdd called RPC endpoints for `LogMetric` and `ConnectSwarms`
remotely. However, I think similar effect can be achieved by calling
these from the Join() function locally.

In particular, ConnectSwarms was called when maybe the joining peer did not
even know about the other peers in the Cluster. Now this is delayed until some
ping metrics have come through.
This commit is contained in:
Hector Sanjuan 2019-05-09 14:19:07 +02:00
parent 40fb0761da
commit 2ed48b6ac4
2 changed files with 88 additions and 79 deletions

View File

@ -266,19 +266,26 @@ func (c *Cluster) pushInformerMetrics(ctx context.Context) {
}
}
func (c *Cluster) sendPingMetric(ctx context.Context) (*api.Metric, error) {
ctx, span := trace.StartSpan(ctx, "cluster/sendPingMetric")
defer span.End()
metric := &api.Metric{
Name: pingMetricName,
Peer: c.id,
Valid: true,
}
metric.SetTTL(c.config.MonitorPingInterval * 2)
return metric, c.monitor.PublishMetric(ctx, metric)
}
func (c *Cluster) pushPingMetrics(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "cluster/pushPingMetrics")
defer span.End()
ticker := time.NewTicker(c.config.MonitorPingInterval)
for {
metric := &api.Metric{
Name: pingMetricName,
Peer: c.id,
Valid: true,
}
metric.SetTTL(c.config.MonitorPingInterval * 2)
c.monitor.PublishMetric(ctx, metric)
c.sendPingMetric(ctx)
select {
case <-ctx.Done():
@ -635,62 +642,15 @@ func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
return id, err
}
// Send a ping metric to the new node directly so
// it knows about this one at least
m := &api.Metric{
Name: pingMetricName,
Peer: c.id,
Valid: true,
}
m.SetTTL(c.config.MonitorPingInterval * 2)
err = c.rpcClient.CallContext(
ctx,
pid,
"PeerMonitor",
"LogMetric",
m,
&struct{}{},
)
if err != nil {
logger.Warning(err)
}
// Ask the new peer to connect its IPFS daemon to the rest
err = c.rpcClient.CallContext(
ctx,
pid,
"IPFSConnector",
"ConnectSwarms",
struct{}{},
&struct{}{},
)
if err != nil {
logger.Warning(err)
}
id := &api.ID{}
// wait up to 2 seconds for new peer to catch up
// and return an up to date api.ID object.
// otherwise it might not contain the current cluster peers
// as it should.
for i := 0; i < 20; i++ {
id, _ = c.getIDForPeer(ctx, pid)
ownPeers, err := c.consensus.Peers(ctx)
if err != nil {
break
}
newNodePeers := id.ClusterPeers
added, removed := diffPeers(ownPeers, newNodePeers)
if len(added) == 0 && len(removed) == 0 && containsPeer(ownPeers, pid) {
break // the new peer has fully joined
}
time.Sleep(200 * time.Millisecond)
logger.Debugf("%s addPeer: retrying to get ID from %s",
c.id.Pretty(), pid.Pretty())
}
logger.Info("Peer added ", pid.Pretty())
return id, nil
addedID, err := c.getIDForPeer(ctx, pid)
if err != nil {
return addedID, err
}
if !containsPeer(addedID.ClusterPeers, c.id) {
addedID.ClusterPeers = append(addedID.ClusterPeers, c.id)
}
return addedID, nil
}
// PeerRemove removes a peer from this Cluster.
@ -738,7 +698,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
return nil
}
// Add peer to peerstore so we can talk to it
// Add peer to peerstore so we can talk to it (and connect)
c.peerManager.ImportPeer(addr, true)
// Note that PeerAdd() on the remote peer will
@ -758,6 +718,30 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
return err
}
// Log a fake but valid metric from the peer we are
// contacting. This will signal a CRDT component that
// we know that peer since we have metrics for it without
// having to wait for the next metric round.
m := &api.Metric{
Name: pingMetricName,
Peer: pid,
Valid: true,
}
m.SetTTL(c.config.MonitorPingInterval * 2)
if err := c.monitor.LogMetric(ctx, m); err != nil {
logger.Warning(err)
}
// Broadcast our metrics to the world
_, err = c.sendInformerMetric(ctx)
if err != nil {
logger.Warning(err)
}
_, err = c.sendPingMetric(ctx)
if err != nil {
logger.Warning(err)
}
// We need to trigger a DHT bootstrap asap for this peer to not be
// lost if the peer it bootstrapped to goes down. We do this manually
// by triggering 1 round of bootstrap in the background.
@ -767,6 +751,12 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
c.dht.BootstrapOnce(ctx, dht.DefaultBootstrapConfig)
}()
// ConnectSwarms in the background after a while, when we have likely
// received some metrics.
time.AfterFunc(c.config.MonitorPingInterval, func() {
c.ipfs.ConnectSwarms(ctx)
})
// wait for leader and for state to catch up
// then sync
err = c.consensus.WaitForSync(ctx)

View File

@ -26,23 +26,28 @@ func newRPCServer(c *Cluster) (*rpc.Server, error) {
s = rpc.NewServer(c.host, version.RPCProtocol)
}
err := s.RegisterName("Cluster", &ClusterRPCAPI{c})
cl := &ClusterRPCAPI{c}
err := s.RegisterName(cl.SvcID(), cl)
if err != nil {
return nil, err
}
err = s.RegisterName("PinTracker", &PinTrackerRPCAPI{c.tracker})
pt := &PinTrackerRPCAPI{c.tracker}
err = s.RegisterName(pt.SvcID(), pt)
if err != nil {
return nil, err
}
err = s.RegisterName("IPFSConnector", &IPFSConnectorRPCAPI{c.ipfs})
ic := &IPFSConnectorRPCAPI{c.ipfs}
err = s.RegisterName(ic.SvcID(), ic)
if err != nil {
return nil, err
}
err = s.RegisterName("Consensus", &ConsensusRPCAPI{c.consensus})
cons := &ConsensusRPCAPI{c.consensus}
err = s.RegisterName(cons.SvcID(), cons)
if err != nil {
return nil, err
}
err = s.RegisterName("PeerMonitor", &PeerMonitorRPCAPI{c.monitor})
pm := &PeerMonitorRPCAPI{c.monitor}
err = s.RegisterName(pm.SvcID(), pm)
if err != nil {
return nil, err
}
@ -55,30 +60,55 @@ type ClusterRPCAPI struct {
c *Cluster
}
// SvcID returns the Service ID used to register this RPC service.
func (rpcapi *ClusterRPCAPI) SvcID() string {
return "Cluster"
}
// PinTrackerRPCAPI is a go-libp2p-gorpc service which provides the internal
// peer API for the PinTracker component.
type PinTrackerRPCAPI struct {
tracker PinTracker
}
// SvcID returns the Service ID used to register this RPC service.
func (rpcapi *PinTrackerRPCAPI) SvcID() string {
return "PinTracker"
}
// IPFSConnectorRPCAPI is a go-libp2p-gorpc service which provides the
// internal peer API for the IPFSConnector component.
type IPFSConnectorRPCAPI struct {
ipfs IPFSConnector
}
// SvcID returns the Service ID used to register this RPC service.
func (rpcapi *IPFSConnectorRPCAPI) SvcID() string {
return "IPFSConnector"
}
// ConsensusRPCAPI is a go-libp2p-gorpc service which provides the
// internal peer API for the Consensus component.
type ConsensusRPCAPI struct {
cons Consensus
}
// SvcID returns the Service ID used to register this RPC service.
func (rpcapi *ConsensusRPCAPI) SvcID() string {
return "Consensus"
}
// PeerMonitorRPCAPI is a go-libp2p-gorpc service which provides the
// internal peer API for the PeerMonitor component.
type PeerMonitorRPCAPI struct {
mon PeerMonitor
}
// SvcID returns the Service ID used to register this RPC service.
func (rpcapi *PeerMonitorRPCAPI) SvcID() string {
return "PeerMonitor"
}
/*
Cluster component methods
*/
@ -429,12 +459,6 @@ func (rpcapi *IPFSConnectorRPCAPI) PinLs(ctx context.Context, in string, out *ma
return nil
}
// ConnectSwarms runs IPFSConnector.ConnectSwarms().
func (rpcapi *IPFSConnectorRPCAPI) ConnectSwarms(ctx context.Context, in struct{}, out *struct{}) error {
err := rpcapi.ipfs.ConnectSwarms(ctx)
return err
}
// ConfigKey runs IPFSConnector.ConfigKey().
func (rpcapi *IPFSConnectorRPCAPI) ConfigKey(ctx context.Context, in string, out *interface{}) error {
res, err := rpcapi.ipfs.ConfigKey(in)
@ -536,11 +560,6 @@ func (rpcapi *ConsensusRPCAPI) Peers(ctx context.Context, in struct{}, out *[]pe
PeerMonitor
*/
// LogMetric runs PeerMonitor.LogMetric().
func (rpcapi *PeerMonitorRPCAPI) LogMetric(ctx context.Context, in *api.Metric, out *struct{}) error {
return rpcapi.mon.LogMetric(ctx, in)
}
// LatestMetrics runs PeerMonitor.LatestMetrics().
func (rpcapi *PeerMonitorRPCAPI) LatestMetrics(ctx context.Context, in string, out *[]*api.Metric) error {
*out = rpcapi.mon.LatestMetrics(ctx, in)