Feat: Allow and run multiple informer components (#962)
This introduces the possiblity of running Cluster with multiple informer components. The first one in the list is the used for "allocations". This is just groundwork for working with several informers in the future.
This commit is contained in:
parent
281653bdbd
commit
30fd5ee6a0
|
@ -65,7 +65,7 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int
|
||||||
if err == nil {
|
if err == nil {
|
||||||
currentAllocs = currentPin.Allocations
|
currentAllocs = currentPin.Allocations
|
||||||
}
|
}
|
||||||
metrics := c.monitor.LatestMetrics(ctx, c.informer.Name())
|
metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name())
|
||||||
|
|
||||||
currentMetrics := make(map[peer.ID]*api.Metric)
|
currentMetrics := make(map[peer.ID]*api.Metric)
|
||||||
candidatesMetrics := make(map[peer.ID]*api.Metric)
|
candidatesMetrics := make(map[peer.ID]*api.Metric)
|
||||||
|
|
52
cluster.go
52
cluster.go
|
@ -72,7 +72,7 @@ type Cluster struct {
|
||||||
tracker PinTracker
|
tracker PinTracker
|
||||||
monitor PeerMonitor
|
monitor PeerMonitor
|
||||||
allocator PinAllocator
|
allocator PinAllocator
|
||||||
informer Informer
|
informers []Informer
|
||||||
tracer Tracer
|
tracer Tracer
|
||||||
|
|
||||||
doneCh chan struct{}
|
doneCh chan struct{}
|
||||||
|
@ -107,7 +107,7 @@ func NewCluster(
|
||||||
tracker PinTracker,
|
tracker PinTracker,
|
||||||
monitor PeerMonitor,
|
monitor PeerMonitor,
|
||||||
allocator PinAllocator,
|
allocator PinAllocator,
|
||||||
informer Informer,
|
informers []Informer,
|
||||||
tracer Tracer,
|
tracer Tracer,
|
||||||
) (*Cluster, error) {
|
) (*Cluster, error) {
|
||||||
err := cfg.Validate()
|
err := cfg.Validate()
|
||||||
|
@ -119,6 +119,10 @@ func NewCluster(
|
||||||
return nil, errors.New("cluster host is nil")
|
return nil, errors.New("cluster host is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(informers) == 0 {
|
||||||
|
return nil, errors.New("no informers are passed")
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
listenAddrs := ""
|
listenAddrs := ""
|
||||||
|
@ -155,7 +159,7 @@ func NewCluster(
|
||||||
tracker: tracker,
|
tracker: tracker,
|
||||||
monitor: monitor,
|
monitor: monitor,
|
||||||
allocator: allocator,
|
allocator: allocator,
|
||||||
informer: informer,
|
informers: informers,
|
||||||
tracer: tracer,
|
tracer: tracer,
|
||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
shutdownB: false,
|
shutdownB: false,
|
||||||
|
@ -238,7 +242,9 @@ func (c *Cluster) setupRPCClients() {
|
||||||
c.consensus.SetClient(c.rpcClient)
|
c.consensus.SetClient(c.rpcClient)
|
||||||
c.monitor.SetClient(c.rpcClient)
|
c.monitor.SetClient(c.rpcClient)
|
||||||
c.allocator.SetClient(c.rpcClient)
|
c.allocator.SetClient(c.rpcClient)
|
||||||
c.informer.SetClient(c.rpcClient)
|
for _, informer := range c.informers {
|
||||||
|
informer.SetClient(c.rpcClient)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time
|
// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time
|
||||||
|
@ -268,19 +274,34 @@ func (c *Cluster) syncWatcher() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) sendInformerMetric(ctx context.Context) (*api.Metric, error) {
|
func (c *Cluster) sendInformerMetric(ctx context.Context, informer Informer) (*api.Metric, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric")
|
ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
metric := c.informer.GetMetric(ctx)
|
metric := informer.GetMetric(ctx)
|
||||||
metric.Peer = c.id
|
metric.Peer = c.id
|
||||||
return metric, c.monitor.PublishMetric(ctx, metric)
|
return metric, c.monitor.PublishMetric(ctx, metric)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) sendInformersMetrics(ctx context.Context) ([]*api.Metric, error) {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "cluster/sendInformersMetrics")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var metrics []*api.Metric
|
||||||
|
for _, informer := range c.informers {
|
||||||
|
m, err := c.sendInformerMetric(ctx, informer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
metrics = append(metrics, m)
|
||||||
|
}
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
|
||||||
// pushInformerMetrics loops and publishes informers metrics using the
|
// pushInformerMetrics loops and publishes informers metrics using the
|
||||||
// cluster monitor. Metrics are pushed normally at a TTL/2 rate. If an error
|
// cluster monitor. Metrics are pushed normally at a TTL/2 rate. If an error
|
||||||
// occurs, they are pushed at a TTL/4 rate.
|
// occurs, they are pushed at a TTL/4 rate.
|
||||||
func (c *Cluster) pushInformerMetrics(ctx context.Context) {
|
func (c *Cluster) pushInformerMetrics(ctx context.Context, informer Informer) {
|
||||||
ctx, span := trace.StartSpan(ctx, "cluster/pushInformerMetrics")
|
ctx, span := trace.StartSpan(ctx, "cluster/pushInformerMetrics")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -302,7 +323,7 @@ func (c *Cluster) pushInformerMetrics(ctx context.Context) {
|
||||||
// wait
|
// wait
|
||||||
}
|
}
|
||||||
|
|
||||||
metric, err := c.sendInformerMetric(ctx)
|
metric, err := c.sendInformerMetric(ctx, informer)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if (retries % retryWarnMod) == 0 {
|
if (retries % retryWarnMod) == 0 {
|
||||||
|
@ -541,11 +562,13 @@ func (c *Cluster) run() {
|
||||||
c.pushPingMetrics(c.ctx)
|
c.pushPingMetrics(c.ctx)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.wg.Add(1)
|
c.wg.Add(len(c.informers))
|
||||||
go func() {
|
for _, informer := range c.informers {
|
||||||
defer c.wg.Done()
|
go func(inf Informer) {
|
||||||
c.pushInformerMetrics(c.ctx)
|
defer c.wg.Done()
|
||||||
}()
|
c.pushInformerMetrics(c.ctx, inf)
|
||||||
|
}(informer)
|
||||||
|
}
|
||||||
|
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -913,10 +936,11 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast our metrics to the world
|
// Broadcast our metrics to the world
|
||||||
_, err = c.sendInformerMetric(ctx)
|
_, err = c.sendInformersMetrics(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warning(err)
|
logger.Warning(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = c.sendPingMetric(ctx)
|
_, err = c.sendPingMetric(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warning(err)
|
logger.Warning(err)
|
||||||
|
|
|
@ -197,7 +197,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
|
||||||
tracker,
|
tracker,
|
||||||
mon,
|
mon,
|
||||||
alloc,
|
alloc,
|
||||||
inf,
|
[]Informer{inf},
|
||||||
tracer,
|
tracer,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -200,7 +200,7 @@ func createCluster(
|
||||||
tracker,
|
tracker,
|
||||||
mon,
|
mon,
|
||||||
alloc,
|
alloc,
|
||||||
informer,
|
[]ipfscluster.Informer{informer},
|
||||||
tracer,
|
tracer,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -282,7 +282,7 @@ func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func createCluster(t *testing.T, host host.Host, dht *dht.IpfsDHT, clusterCfg *Config, store ds.Datastore, consensus Consensus, apis []API, ipfs IPFSConnector, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer, tracer Tracer) *Cluster {
|
func createCluster(t *testing.T, host host.Host, dht *dht.IpfsDHT, clusterCfg *Config, store ds.Datastore, consensus Consensus, apis []API, ipfs IPFSConnector, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer, tracer Tracer) *Cluster {
|
||||||
cl, err := NewCluster(context.Background(), host, dht, clusterCfg, store, consensus, apis, ipfs, tracker, mon, alloc, inf, tracer)
|
cl, err := NewCluster(context.Background(), host, dht, clusterCfg, store, consensus, apis, ipfs, tracker, mon, alloc, []Informer{inf}, tracer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -537,7 +537,7 @@ func waitForClustersHealthy(t *testing.T, clusters []*Cluster) {
|
||||||
timer := time.NewTimer(15 * time.Second)
|
timer := time.NewTimer(15 * time.Second)
|
||||||
for {
|
for {
|
||||||
ttlDelay()
|
ttlDelay()
|
||||||
metrics := clusters[0].monitor.LatestMetrics(context.Background(), clusters[0].informer.Name())
|
metrics := clusters[0].monitor.LatestMetrics(context.Background(), clusters[0].informers[0].Name())
|
||||||
healthy := 0
|
healthy := 0
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
if !m.Expired() {
|
if !m.Expired() {
|
||||||
|
|
|
@ -952,15 +952,14 @@ func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var metric api.Metric
|
var metrics []*api.Metric
|
||||||
|
|
||||||
err := ipfs.rpcClient.GoContext(
|
err := ipfs.rpcClient.GoContext(
|
||||||
ctx,
|
ctx,
|
||||||
"",
|
"",
|
||||||
"Cluster",
|
"Cluster",
|
||||||
"SendInformerMetric",
|
"SendInformersMetrics",
|
||||||
struct{}{},
|
struct{}{},
|
||||||
&metric,
|
&metrics,
|
||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
13
rpc_api.go
13
rpc_api.go
|
@ -440,7 +440,7 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out *
|
||||||
|
|
||||||
// SendInformerMetric runs Cluster.sendInformerMetric().
|
// SendInformerMetric runs Cluster.sendInformerMetric().
|
||||||
func (rpcapi *ClusterRPCAPI) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
|
func (rpcapi *ClusterRPCAPI) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
|
||||||
m, err := rpcapi.c.sendInformerMetric(ctx)
|
m, err := rpcapi.c.sendInformerMetric(ctx, rpcapi.c.informers[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -448,6 +448,17 @@ func (rpcapi *ClusterRPCAPI) SendInformerMetric(ctx context.Context, in struct{}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SendInformersMetrics runs Cluster.sendInformerMetric() on all informers.
|
||||||
|
func (rpcapi *ClusterRPCAPI) SendInformersMetrics(ctx context.Context, in struct{}, out *[]*api.Metric) error {
|
||||||
|
var metrics []*api.Metric
|
||||||
|
metrics, err := rpcapi.c.sendInformersMetrics(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*out = metrics
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Tracker component methods
|
Tracker component methods
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -7,35 +7,36 @@ package ipfscluster
|
||||||
// without missing any endpoint.
|
// without missing any endpoint.
|
||||||
var DefaultRPCPolicy = map[string]RPCEndpointType{
|
var DefaultRPCPolicy = map[string]RPCEndpointType{
|
||||||
// Cluster methods
|
// Cluster methods
|
||||||
"Cluster.BlockAllocate": RPCClosed,
|
"Cluster.BlockAllocate": RPCClosed,
|
||||||
"Cluster.ConnectGraph": RPCClosed,
|
"Cluster.ConnectGraph": RPCClosed,
|
||||||
"Cluster.ID": RPCOpen,
|
"Cluster.ID": RPCOpen,
|
||||||
"Cluster.Join": RPCClosed,
|
"Cluster.Join": RPCClosed,
|
||||||
"Cluster.PeerAdd": RPCOpen, // Used by Join()
|
"Cluster.PeerAdd": RPCOpen, // Used by Join()
|
||||||
"Cluster.PeerRemove": RPCTrusted,
|
"Cluster.PeerRemove": RPCTrusted,
|
||||||
"Cluster.Peers": RPCTrusted, // Used by ConnectGraph()
|
"Cluster.Peers": RPCTrusted, // Used by ConnectGraph()
|
||||||
"Cluster.Pin": RPCClosed,
|
"Cluster.Pin": RPCClosed,
|
||||||
"Cluster.PinGet": RPCClosed,
|
"Cluster.PinGet": RPCClosed,
|
||||||
"Cluster.PinPath": RPCClosed,
|
"Cluster.PinPath": RPCClosed,
|
||||||
"Cluster.Pins": RPCClosed, // Used in stateless tracker, ipfsproxy, restapi
|
"Cluster.Pins": RPCClosed, // Used in stateless tracker, ipfsproxy, restapi
|
||||||
"Cluster.Recover": RPCClosed,
|
"Cluster.Recover": RPCClosed,
|
||||||
"Cluster.RecoverAll": RPCClosed,
|
"Cluster.RecoverAll": RPCClosed,
|
||||||
"Cluster.RecoverAllLocal": RPCTrusted,
|
"Cluster.RecoverAllLocal": RPCTrusted,
|
||||||
"Cluster.RecoverLocal": RPCTrusted,
|
"Cluster.RecoverLocal": RPCTrusted,
|
||||||
"Cluster.RepoGC": RPCClosed,
|
"Cluster.RepoGC": RPCClosed,
|
||||||
"Cluster.RepoGCLocal": RPCTrusted,
|
"Cluster.RepoGCLocal": RPCTrusted,
|
||||||
"Cluster.SendInformerMetric": RPCClosed,
|
"Cluster.SendInformerMetric": RPCClosed,
|
||||||
"Cluster.Status": RPCClosed,
|
"Cluster.SendInformersMetrics": RPCClosed,
|
||||||
"Cluster.StatusAll": RPCClosed,
|
"Cluster.Status": RPCClosed,
|
||||||
"Cluster.StatusAllLocal": RPCClosed,
|
"Cluster.StatusAll": RPCClosed,
|
||||||
"Cluster.StatusLocal": RPCClosed,
|
"Cluster.StatusAllLocal": RPCClosed,
|
||||||
"Cluster.Sync": RPCClosed,
|
"Cluster.StatusLocal": RPCClosed,
|
||||||
"Cluster.SyncAll": RPCClosed,
|
"Cluster.Sync": RPCClosed,
|
||||||
"Cluster.SyncAllLocal": RPCTrusted, // Called in broadcast from SyncAll()
|
"Cluster.SyncAll": RPCClosed,
|
||||||
"Cluster.SyncLocal": RPCTrusted, // Called in broadcast from Sync()
|
"Cluster.SyncAllLocal": RPCTrusted, // Called in broadcast from SyncAll()
|
||||||
"Cluster.Unpin": RPCClosed,
|
"Cluster.SyncLocal": RPCTrusted, // Called in broadcast from Sync()
|
||||||
"Cluster.UnpinPath": RPCClosed,
|
"Cluster.Unpin": RPCClosed,
|
||||||
"Cluster.Version": RPCOpen,
|
"Cluster.UnpinPath": RPCClosed,
|
||||||
|
"Cluster.Version": RPCOpen,
|
||||||
|
|
||||||
// PinTracker methods
|
// PinTracker methods
|
||||||
"PinTracker.Recover": RPCTrusted, // Called in broadcast from Recover()
|
"PinTracker.Recover": RPCTrusted, // Called in broadcast from Recover()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user