0d73d33ef5
This commit continues the work of taking advantage of the streaming capabilities in go-libp2p-gorpc by improving the ipfsconnector and pintracker components. StatusAll and RecoverAll methods are now streaming methods, with the REST API output changing accordingly to produce a stream of GlobalPinInfos rather than a json array. pin/ls request to the ipfs daemon now use ?stream=true and avoid having to load the full pinset map on memory. StatusAllLocal and RecoverAllLocal requests to the pin tracker stream all the way and no longer store the full pinset, and the full PinInfo status slice before sending it out. We have additionally switched to a pattern where streaming methods receive the channel as an argument, allowing the caller to decide on whether to launch a goroutine, do buffering etc.
125 lines
2.5 KiB
Go
125 lines
2.5 KiB
Go
// Package numpin implements an ipfs-cluster informer which determines how many
|
|
// items this peer is pinning and returns it as api.Metric
|
|
package numpin
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
|
|
"go.opencensus.io/trace"
|
|
)
|
|
|
|
// MetricName specifies the name of our metric
|
|
var MetricName = "numpin"
|
|
|
|
// Informer is a simple object to implement the ipfscluster.Informer
|
|
// and Component interfaces
|
|
type Informer struct {
|
|
config *Config
|
|
|
|
mu sync.Mutex
|
|
rpcClient *rpc.Client
|
|
}
|
|
|
|
// NewInformer returns an initialized Informer.
|
|
func NewInformer(cfg *Config) (*Informer, error) {
|
|
err := cfg.Validate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Informer{
|
|
config: cfg,
|
|
}, nil
|
|
}
|
|
|
|
// SetClient provides us with an rpc.Client which allows
|
|
// contacting other components in the cluster.
|
|
func (npi *Informer) SetClient(c *rpc.Client) {
|
|
npi.mu.Lock()
|
|
npi.rpcClient = c
|
|
npi.mu.Unlock()
|
|
}
|
|
|
|
// Shutdown is called on cluster shutdown. We just invalidate
|
|
// any metrics from this point.
|
|
func (npi *Informer) Shutdown(ctx context.Context) error {
|
|
_, span := trace.StartSpan(ctx, "informer/numpin/Shutdown")
|
|
defer span.End()
|
|
|
|
npi.mu.Lock()
|
|
npi.rpcClient = nil
|
|
npi.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Name returns the name of this informer
|
|
func (npi *Informer) Name() string {
|
|
return MetricName
|
|
}
|
|
|
|
// GetMetrics contacts the IPFSConnector component and requests the `pin ls`
|
|
// command. We return the number of pins in IPFS. It must always return at
|
|
// least one metric.
|
|
func (npi *Informer) GetMetrics(ctx context.Context) []api.Metric {
|
|
ctx, span := trace.StartSpan(ctx, "informer/numpin/GetMetric")
|
|
defer span.End()
|
|
|
|
npi.mu.Lock()
|
|
rpcClient := npi.rpcClient
|
|
npi.mu.Unlock()
|
|
|
|
if rpcClient == nil {
|
|
return []api.Metric{
|
|
{
|
|
Valid: false,
|
|
},
|
|
}
|
|
}
|
|
|
|
// make use of the RPC API to obtain information
|
|
// about the number of pins in IPFS. See RPCAPI docs.
|
|
in := make(chan []string, 1)
|
|
in <- []string{"recursive", "direct"}
|
|
close(in)
|
|
out := make(chan api.IPFSPinInfo, 1024)
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
defer close(errCh)
|
|
err := rpcClient.Stream(
|
|
ctx,
|
|
"", // Local call
|
|
"IPFSConnector", // Service name
|
|
"PinLs", // Method name
|
|
in,
|
|
out,
|
|
)
|
|
errCh <- err
|
|
}()
|
|
|
|
n := 0
|
|
for range out {
|
|
n++
|
|
}
|
|
|
|
err := <-errCh
|
|
|
|
valid := err == nil
|
|
|
|
m := api.Metric{
|
|
Name: MetricName,
|
|
Value: fmt.Sprintf("%d", n),
|
|
Valid: valid,
|
|
Partitionable: false,
|
|
}
|
|
|
|
m.SetTTL(npi.config.MetricTTL)
|
|
return []api.Metric{m}
|
|
}
|