Streaming Peers(): make Peers() a streaming call

This commit makes all the changes to make Peers() a streaming call.

While Peers is usually a non problematic call, for consistency, all calls
returning collections assembled through broadcast to cluster peers are now
streaming calls.
This commit is contained in:
Hector Sanjuan 2022-03-23 01:27:57 +01:00
parent 2d94c42310
commit eee53bfa4f
22 changed files with 250 additions and 158 deletions

View File

@ -51,7 +51,7 @@ type Client interface {
ID(context.Context) (api.ID, error)
// Peers requests ID information for all cluster peers.
Peers(context.Context) ([]api.ID, error)
Peers(context.Context, chan<- api.ID) error
// PeerAdd adds a new peer to the cluster.
PeerAdd(ctx context.Context, pid peer.ID) (api.ID, error)
// PeerRm removes a current peer from the cluster

View File

@ -123,16 +123,13 @@ func (lc *loadBalancingClient) ID(ctx context.Context) (api.ID, error) {
}
// Peers requests ID information for all cluster peers.
func (lc *loadBalancingClient) Peers(ctx context.Context) ([]api.ID, error) {
var peers []api.ID
func (lc *loadBalancingClient) Peers(ctx context.Context, out chan<- api.ID) error {
call := func(c Client) error {
var err error
peers, err = c.Peers(ctx)
return err
return c.Peers(ctx, out)
}
err := lc.retry(0, call)
return peers, err
return err
}
// PeerAdd adds a new peer to the cluster.

View File

@ -34,13 +34,24 @@ func (c *defaultClient) ID(ctx context.Context) (api.ID, error) {
}
// Peers requests ID information for all cluster peers.
func (c *defaultClient) Peers(ctx context.Context) ([]api.ID, error) {
func (c *defaultClient) Peers(ctx context.Context, out chan<- api.ID) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "client/Peers")
defer span.End()
var ids []api.ID
err := c.do(ctx, "GET", "/peers", nil, nil, &ids)
return ids, err
handler := func(dec *json.Decoder) error {
var obj api.ID
err := dec.Decode(&obj)
if err != nil {
return err
}
out <- obj
return nil
}
return c.doStream(ctx, "GET", "/peers", nil, nil, handler)
}
type peerAddBody struct {

View File

@ -71,11 +71,12 @@ func TestPeers(t *testing.T) {
defer shutdown(api)
testF := func(t *testing.T, c Client) {
ids, err := c.Peers(ctx)
out := make(chan types.ID, 10)
err := c.Peers(ctx, out)
if err != nil {
t.Fatal(err)
}
if len(ids) == 0 {
if len(out) == 0 {
t.Error("expected some peers")
}
}
@ -92,11 +93,12 @@ func TestPeersWithError(t *testing.T) {
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/44444")
var _ = c
c, _ = NewDefaultClient(&Config{APIAddr: addr, DisableKeepAlives: true})
ids, err := c.Peers(ctx)
out := make(chan types.ID, 10)
err := c.Peers(ctx, out)
if err == nil {
t.Fatal("expected error")
}
if ids != nil {
if len(out) > 0 {
t.Fatal("expected no ids")
}
}

View File

@ -3,12 +3,14 @@ package client
import (
"context"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/ipfs/ipfs-cluster/api"
"go.uber.org/multierr"
"go.opencensus.io/trace"
)
@ -151,11 +153,16 @@ func (c *defaultClient) handleStreamResponse(resp *http.Response, handler respon
}
}
errTrailer := resp.Trailer.Get("X-Stream-Error")
if errTrailer != "" {
trailerErrs := resp.Trailer.Values("X-Stream-Error")
var err error
for _, trailerErr := range trailerErrs {
err = multierr.Append(err, errors.New(trailerErr))
}
if err != nil {
return api.Error{
Code: 500,
Message: errTrailer,
Message: err.Error(),
}
}
return nil

View File

@ -57,7 +57,7 @@ func NewAPI(ctx context.Context, cfg *Config) (*API, error) {
return NewAPIWithHost(ctx, cfg, nil)
}
// NewAPI creates a new REST API component using the given libp2p Host.
// NewAPIWithHost creates a new REST API component using the given libp2p Host.
func NewAPIWithHost(ctx context.Context, cfg *Config, h host.Host) (*API, error) {
api := API{
config: cfg,
@ -312,17 +312,28 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
}
func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) {
var peers []types.ID
err := api.rpcClient.CallContext(
in := make(chan struct{})
close(in)
out := make(chan types.ID, common.StreamChannelSize)
errCh := make(chan error, 1)
go func() {
defer close(errCh)
errCh <- api.rpcClient.Stream(
r.Context(),
"",
"Cluster",
"Peers",
struct{}{},
&peers,
in,
out,
)
}()
api.SendResponse(w, common.SetStatusAutomatically, err, peers)
iter := func() (interface{}, bool, error) {
p, ok := <-out
return p, ok, nil
}
api.StreamResponse(w, iter, errCh)
}
func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
@ -455,7 +466,7 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) {
in := make(chan struct{})
close(in)
pins := make(chan types.Pin)
out := make(chan types.Pin, common.StreamChannelSize)
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(r.Context())
@ -470,7 +481,7 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) {
"Cluster",
"Pins",
in,
pins,
out,
)
}()
@ -483,7 +494,7 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) {
select {
case <-ctx.Done():
break iterloop
case p, ok = <-pins:
case p, ok = <-out:
if !ok {
break iterloop
}

View File

@ -96,14 +96,14 @@ func TestAPIVersionEndpoint(t *testing.T) {
test.BothEndpoints(t, tf)
}
func TestAPIPeerstEndpoint(t *testing.T) {
func TestAPIPeersEndpoint(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var list []*api.ID
test.MakeGet(t, rest, url(rest)+"/peers", &list)
var list []api.ID
test.MakeStreamingGet(t, rest, url(rest)+"/peers", &list, false)
if len(list) != 1 {
t.Fatal("expected 1 element")
}
@ -559,7 +559,7 @@ func TestAPIMetricsEndpoint(t *testing.T) {
defer rest.Shutdown(ctx)
tf := func(t *testing.T, url test.URLFunc) {
var resp []*api.Metric
var resp []api.Metric
test.MakeGet(t, rest, url(rest)+"/monitor/metrics/somemetricstype", &resp)
if len(resp) == 0 {
t.Fatal("No metrics found")
@ -804,7 +804,7 @@ func TestAPIIPFSGCEndpoint(t *testing.T) {
rest := testAPI(t)
defer rest.Shutdown(ctx)
testGlobalRepoGC := func(t *testing.T, gRepoGC *api.GlobalRepoGC) {
testGlobalRepoGC := func(t *testing.T, gRepoGC api.GlobalRepoGC) {
if gRepoGC.PeerMap == nil {
t.Fatal("expected a non-nil peer map")
}
@ -836,11 +836,11 @@ func TestAPIIPFSGCEndpoint(t *testing.T) {
tf := func(t *testing.T, url test.URLFunc) {
var resp api.GlobalRepoGC
test.MakePost(t, rest, url(rest)+"/ipfs/gc?local=true", []byte{}, &resp)
testGlobalRepoGC(t, &resp)
testGlobalRepoGC(t, resp)
var resp1 api.GlobalRepoGC
test.MakePost(t, rest, url(rest)+"/ipfs/gc", []byte{}, &resp1)
testGlobalRepoGC(t, &resp1)
testGlobalRepoGC(t, resp1)
}
test.BothEndpoints(t, tf)

View File

@ -1747,7 +1747,7 @@ func (c *Cluster) Version() string {
}
// Peers returns the IDs of the members of this Cluster.
func (c *Cluster) Peers(ctx context.Context) []api.ID {
func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) {
_, span := trace.StartSpan(ctx, "cluster/Peers")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
@ -1756,49 +1756,70 @@ func (c *Cluster) Peers(ctx context.Context) []api.ID {
if err != nil {
logger.Error(err)
logger.Error("an empty list of peers will be returned")
return []api.ID{}
close(out)
return
}
return c.peersWithFilter(ctx, peers)
c.peersWithFilter(ctx, peers, out)
}
// requests IDs from a given number of peers.
func (c *Cluster) peersWithFilter(ctx context.Context, peers []peer.ID) []api.ID {
lenPeers := len(peers)
ids := make([]api.ID, lenPeers)
func (c *Cluster) peersWithFilter(ctx context.Context, peers []peer.ID, out chan<- api.ID) {
defer close(out)
// We should be done relatively quickly with this call. Otherwise
// report errors.
timeout := 15 * time.Second
ctxs, cancels := rpcutil.CtxsWithTimeout(ctx, lenPeers, timeout)
defer rpcutil.MultiCancel(cancels)
ctxCall, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
errs := c.rpcClient.MultiCall(
ctxs,
in := make(chan struct{})
close(in)
idsOut := make(chan api.ID, len(peers))
errCh := make(chan []error, 1)
go func() {
defer close(errCh)
errCh <- c.rpcClient.MultiStream(
ctxCall,
peers,
"Cluster",
"ID",
struct{}{},
rpcutil.CopyIDsToIfaces(ids),
"IDStream",
in,
idsOut,
)
}()
finalPeers := []api.ID{}
for i, err := range errs {
if err == nil {
finalPeers = append(finalPeers, ids[i])
_ = finalPeers // staticcheck
continue
// Unfortunately, we need to use idsOut as intermediary channel
// because it is closed when MultiStream ends and we cannot keep
// adding things on it (the errors below).
for id := range idsOut {
select {
case <-ctx.Done():
logger.Errorf("Peers call aborted: %s", ctx.Err())
return
case out <- id:
}
}
// ErrCh will always be closed on context cancellation too.
errs := <-errCh
for i, err := range errs {
if err == nil {
continue
}
if rpc.IsAuthorizationError(err) {
continue
}
ids[i] = api.ID{}
ids[i].ID = peers[i]
ids[i].Error = err.Error()
select {
case <-ctx.Done():
logger.Errorf("Peers call aborted: %s", ctx.Err())
case out <- api.ID{
ID: peers[i],
Error: err.Error(),
}:
}
}
return ids
}
// getTrustedPeers gives listed of trusted peers except the current peer and
@ -2001,8 +2022,6 @@ func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string,
}
}
msOut := make(chan api.PinInfo)
// We don't have a good timeout proposal for this. Depending on the
// size of the state and the peformance of IPFS and the network, this
// may take moderately long.
@ -2010,6 +2029,7 @@ func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string,
ctx, cancel := context.WithCancel(ctx)
defer cancel()
msOut := make(chan api.PinInfo)
errsCh := make(chan []error, 1)
go func() {
defer close(errsCh)

View File

@ -905,8 +905,10 @@ func TestClusterPeers(t *testing.T) {
cl, _, _, _ := testingCluster(t)
defer cleanState()
defer cl.Shutdown(ctx)
peers := cl.Peers(ctx)
if len(peers) != 1 {
out := make(chan api.ID, 10)
cl.Peers(ctx, out)
if len(out) != 1 {
t.Fatal("expected 1 peer")
}
@ -916,7 +918,8 @@ func TestClusterPeers(t *testing.T) {
t.Fatal(err)
}
if peers[0].ID != ident.ID {
p := <-out
if p.ID != ident.ID {
t.Error("bad member")
}
}

View File

@ -54,6 +54,10 @@ func jsonFormatPrint(obj interface{}) {
for o := range r {
print(o)
}
case chan api.ID:
for o := range r {
print(o)
}
default:
print(obj)
}
@ -84,8 +88,8 @@ func textFormatObject(resp interface{}) {
textFormatPrintMetric(r)
case api.Alert:
textFormatPrintAlert(r)
case []api.ID:
for _, item := range r {
case chan api.ID:
for item := range r {
textFormatObject(item)
}
case chan api.GlobalPinInfo:

View File

@ -251,8 +251,15 @@ This command provides a list of the ID information of all the peers in the Clust
Flags: []cli.Flag{},
ArgsUsage: " ",
Action: func(c *cli.Context) error {
resp, cerr := globalClient.Peers(ctx)
formatResponse(c, resp, cerr)
out := make(chan api.ID, 1024)
errCh := make(chan error, 1)
go func() {
defer close(errCh)
errCh <- globalClient.Peers(ctx, out)
}()
formatResponse(c, out, nil)
err := <-errCh
formatResponse(c, nil, err)
return nil
},
},

View File

@ -2,7 +2,6 @@ package ipfscluster
import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
peer "github.com/libp2p/go-libp2p-core/peer"
@ -34,18 +33,32 @@ func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) {
}
peers := make([][]api.ID, len(members))
errs := make([]error, len(members))
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(members))
defer rpcutil.MultiCancel(cancels)
for i, member := range members {
in := make(chan struct{})
close(in)
out := make(chan api.ID, 1024)
errCh := make(chan error, 1)
go func(i int) {
defer close(errCh)
errs := c.rpcClient.MultiCall(
ctxs,
members,
errCh <- c.rpcClient.Stream(
ctx,
member,
"Cluster",
"Peers",
struct{}{},
rpcutil.CopyIDSliceToIfaces(peers),
in,
out,
)
}(i)
var ids []api.ID
for id := range out {
ids = append(ids, id)
}
peers[i] = ids
errs[i] = <-errCh
}
for i, err := range errs {
p := peer.Encode(members[i])

2
go.mod
View File

@ -45,7 +45,7 @@ require (
github.com/libp2p/go-libp2p-connmgr v0.3.1
github.com/libp2p/go-libp2p-consensus v0.0.1
github.com/libp2p/go-libp2p-core v0.13.0
github.com/libp2p/go-libp2p-gorpc v0.3.1
github.com/libp2p/go-libp2p-gorpc v0.3.2
github.com/libp2p/go-libp2p-gostream v0.3.1
github.com/libp2p/go-libp2p-http v0.2.1
github.com/libp2p/go-libp2p-kad-dht v0.15.0

4
go.sum
View File

@ -780,8 +780,8 @@ github.com/libp2p/go-libp2p-discovery v0.5.0/go.mod h1:+srtPIU9gDaBNu//UHvcdliKB
github.com/libp2p/go-libp2p-discovery v0.6.0 h1:1XdPmhMJr8Tmj/yUfkJMIi8mgwWrLUsCB3bMxdT+DSo=
github.com/libp2p/go-libp2p-discovery v0.6.0/go.mod h1:/u1voHt0tKIe5oIA1RHBKQLVCWPna2dXmPNHc2zR9S8=
github.com/libp2p/go-libp2p-gorpc v0.1.0/go.mod h1:DrswTLnu7qjLgbqe4fekX4ISoPiHUqtA45thTsJdE1w=
github.com/libp2p/go-libp2p-gorpc v0.3.1 h1:ZmqQIgHccgh/Ff1kS3ZlwATZRLvtuRUd633/MLWAx20=
github.com/libp2p/go-libp2p-gorpc v0.3.1/go.mod h1:sRz9ybP9rlOkJB1v65SMLr+NUEPB/ioLZn26MWIV4DU=
github.com/libp2p/go-libp2p-gorpc v0.3.2 h1:pQdGWqB+HImCXbKVbjqpgckUzGcXEPIYP8aisaYfkrs=
github.com/libp2p/go-libp2p-gorpc v0.3.2/go.mod h1:sRz9ybP9rlOkJB1v65SMLr+NUEPB/ioLZn26MWIV4DU=
github.com/libp2p/go-libp2p-gostream v0.3.0/go.mod h1:pLBQu8db7vBMNINGsAwLL/ZCE8wng5V1FThoaE5rNjc=
github.com/libp2p/go-libp2p-gostream v0.3.1 h1:XlwohsPn6uopGluEWs1Csv1QCEjrTXf2ZQagzZ5paAg=
github.com/libp2p/go-libp2p-gostream v0.3.1/go.mod h1:1V3b+u4Zhaq407UUY9JLCpboaeufAeVQbnvAt12LRsI=

View File

@ -1,9 +1,7 @@
package ipfscluster
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
@ -606,9 +604,11 @@ func TestClustersPeers(t *testing.T) {
delay()
j := rand.Intn(nClusters) // choose a random cluster peer
peers := clusters[j].Peers(ctx)
if len(peers) != nClusters {
out := make(chan api.ID, len(clusters))
clusters[j].Peers(ctx, out)
if len(out) != nClusters {
t.Fatal("expected as many peers as clusters")
}
@ -620,7 +620,7 @@ func TestClustersPeers(t *testing.T) {
clusterIDMap[id.ID] = id
}
for _, p := range peers {
for p := range out {
if p.Error != "" {
t.Error(p.ID, p.Error)
continue
@ -642,34 +642,6 @@ func TestClustersPeers(t *testing.T) {
}
}
func TestClustersPeersRetainOrder(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
delay()
for i := 0; i < 5; i++ {
j := rand.Intn(nClusters) // choose a random cluster peer
peers1, err := json.Marshal(clusters[j].Peers(ctx))
if err != nil {
t.Fatal(err)
}
waitForLeaderAndMetrics(t, clusters)
k := rand.Intn(nClusters)
peers2, err := json.Marshal(clusters[k].Peers(ctx))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(peers1, peers2) {
t.Error("expected both results to be same")
}
}
}
func TestClustersPin(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)

View File

@ -685,21 +685,25 @@ func (ipfs *Connector) ConnectSwarms(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
var ids []api.ID
err := ipfs.rpcClient.CallContext(
in := make(chan struct{})
close(in)
out := make(chan api.ID)
go func() {
err := ipfs.rpcClient.Stream(
ctx,
"",
"Cluster",
"Peers",
struct{}{},
&ids,
in,
out,
)
if err != nil {
logger.Error(err)
return err
}
}()
for _, id := range ids {
for id := range out {
ipfsID := id.IPFS
if id.Error != "" || ipfsID.Error != "" {
continue

View File

@ -18,6 +18,19 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
func peers(ctx context.Context, t *testing.T, c *Cluster) []api.ID {
t.Helper()
out := make(chan api.ID)
go func() {
c.Peers(ctx, out)
}()
var ids []api.ID
for id := range out {
ids = append(ids, id)
}
return ids
}
func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host) {
cls := make([]*Cluster, nClusters)
mocks := make([]*test.IpfsMock, nClusters)
@ -105,7 +118,7 @@ func TestClustersPeerAdd(t *testing.T) {
ttlDelay()
f := func(t *testing.T, c *Cluster) {
ids := c.Peers(ctx)
ids := peers(ctx, t, c)
// check they are tracked by the peer manager
if len(ids) != nClusters {
@ -180,7 +193,7 @@ func TestClustersJoinBadPeer(t *testing.T) {
if err == nil {
t.Error("expected an error")
}
ids := clusters[0].Peers(ctx)
ids := peers(ctx, t, clusters[0])
if len(ids) != 1 {
t.Error("cluster should have only one member")
}
@ -198,7 +211,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
clusters[0].PeerAdd(ctx, clusters[1].id)
ttlDelay()
ids := clusters[1].Peers(ctx)
ids := peers(ctx, t, clusters[1])
// raft will have only 2 peers
// crdt will have all peers autodiscovered by now
if len(ids) < 2 {
@ -222,7 +235,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
t.Error("expected an error")
}
ids = clusters[0].Peers(ctx)
ids = peers(ctx, t, clusters[0])
if len(ids) != 2 {
t.Error("cluster should still have 2 peers")
}
@ -237,7 +250,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
}
ttlDelay()
ids = clusters[0].Peers(ctx)
ids = peers(ctx, t, clusters[0])
if len(ids) < 2 {
t.Error("cluster should have at least 2 peers after removing and adding 1")
}
@ -275,7 +288,7 @@ func TestClustersPeerRemove(t *testing.T) {
t.Error("removed peer should have exited")
}
} else {
ids := c.Peers(ctx)
ids := peers(ctx, t, c)
if len(ids) != nClusters-1 {
t.Error("should have removed 1 peer")
}
@ -302,7 +315,7 @@ func TestClustersPeerRemoveSelf(t *testing.T) {
case "raft":
for i := 0; i < len(clusters); i++ {
waitForLeaderAndMetrics(t, clusters)
peers := clusters[i].Peers(ctx)
peers := peers(ctx, t, clusters[i])
t.Logf("Current cluster size: %d", len(peers))
if len(peers) != (len(clusters) - i) {
t.Fatal("Previous peers not removed correctly")
@ -363,7 +376,7 @@ func TestClustersPeerRemoveLeader(t *testing.T) {
for i := 0; i < len(clusters); i++ {
leader := findLeader(t)
peers := leader.Peers(ctx)
peers := peers(ctx, t, leader)
t.Logf("Current cluster size: %d", len(peers))
if len(peers) != (len(clusters) - i) {
t.Fatal("Previous peers not removed correctly")
@ -528,7 +541,7 @@ func TestClustersPeerJoin(t *testing.T) {
}
f := func(t *testing.T, c *Cluster) {
peers := c.Peers(ctx)
peers := peers(ctx, t, c)
str := c.id.String() + "\n"
for _, p := range peers {
str += " - " + p.ID.String() + "\n"
@ -571,7 +584,7 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) {
ttlDelay()
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers(ctx)
peers := peers(ctx, t, c)
if len(peers) != nClusters {
t.Error("all peers should be connected")
}

View File

@ -51,10 +51,10 @@ func TestSimplePNet(t *testing.T) {
}
ttlDelay()
if len(clusters[0].Peers(ctx)) != len(clusters[1].Peers(ctx)) {
if len(peers(ctx, t, clusters[0])) != len(peers(ctx, t, clusters[1])) {
t.Fatal("Expected same number of peers")
}
if len(clusters[0].Peers(ctx)) < 2 {
if len(peers(ctx, t, clusters[0])) < 2 {
// crdt mode has auto discovered all peers at this point.
// Raft mode has 2 peers only.
t.Fatal("Expected at least 2 peers")

View File

@ -160,6 +160,18 @@ func (rpcapi *ClusterRPCAPI) ID(ctx context.Context, in struct{}, out *api.ID) e
return nil
}
// IDStream runs Cluster.ID() but in streaming form.
func (rpcapi *ClusterRPCAPI) IDStream(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error {
defer close(out)
id := rpcapi.c.ID(ctx)
select {
case <-ctx.Done():
return ctx.Err()
case out <- id:
}
return nil
}
// Pin runs Cluster.pin().
func (rpcapi *ClusterRPCAPI) Pin(ctx context.Context, in api.Pin, out *api.Pin) error {
// we do not call the Pin method directly since that method does not
@ -227,14 +239,15 @@ func (rpcapi *ClusterRPCAPI) Version(ctx context.Context, in struct{}, out *api.
}
// Peers runs Cluster.Peers().
func (rpcapi *ClusterRPCAPI) Peers(ctx context.Context, in struct{}, out *[]api.ID) error {
*out = rpcapi.c.Peers(ctx)
func (rpcapi *ClusterRPCAPI) Peers(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error {
rpcapi.c.Peers(ctx, out)
return nil
}
// PeersWithFilter runs Cluster.peersWithFilter().
func (rpcapi *ClusterRPCAPI) PeersWithFilter(ctx context.Context, in []peer.ID, out *[]api.ID) error {
*out = rpcapi.c.peersWithFilter(ctx, in)
func (rpcapi *ClusterRPCAPI) PeersWithFilter(ctx context.Context, in <-chan []peer.ID, out chan<- api.ID) error {
peers := <-in
rpcapi.c.peersWithFilter(ctx, peers, out)
return nil
}

View File

@ -11,6 +11,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{
"Cluster.BlockAllocate": RPCClosed,
"Cluster.ConnectGraph": RPCClosed,
"Cluster.ID": RPCOpen,
"Cluster.IDStream": RPCOpen,
"Cluster.IPFSID": RPCClosed,
"Cluster.Join": RPCClosed,
"Cluster.PeerAdd": RPCOpen, // Used by Join()

View File

@ -12,8 +12,8 @@ test_expect_success IPFS,CLUSTER,JQ "cluster-ctl can read id" '
[ -n "$id" ] && ( ipfs-cluster-ctl id | egrep -q "$id" )
'
test_expect_success IPFS,CLUSTER "cluster-ctl list 0 peers" '
peer_length=`ipfs-cluster-ctl --enc=json peers ls | jq ". | length"`
test_expect_success IPFS,CLUSTER "cluster-ctl list 1 peer" '
peer_length=`ipfs-cluster-ctl --enc=json peers ls | jq -n "[inputs] | length"`
[ $peer_length -eq 1 ]
'

View File

@ -178,6 +178,18 @@ func (mock *mockCluster) ID(ctx context.Context, in struct{}, out *api.ID) error
return nil
}
func (mock *mockCluster) IDStream(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error {
defer close(out)
var id api.ID
mock.ID(ctx, struct{}{}, &id)
select {
case <-ctx.Done():
return ctx.Err()
case out <- id:
}
return nil
}
func (mock *mockCluster) Version(ctx context.Context, in struct{}, out *api.Version) error {
*out = api.Version{
Version: "0.0.mock",
@ -185,16 +197,18 @@ func (mock *mockCluster) Version(ctx context.Context, in struct{}, out *api.Vers
return nil
}
func (mock *mockCluster) Peers(ctx context.Context, in struct{}, out *[]api.ID) error {
func (mock *mockCluster) Peers(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error {
id := api.ID{}
mock.ID(ctx, in, &id)
*out = []api.ID{id}
mock.ID(ctx, struct{}{}, &id)
out <- id
close(out)
return nil
}
func (mock *mockCluster) PeersWithFilter(ctx context.Context, in []peer.ID, out *[]api.ID) error {
return mock.Peers(ctx, struct{}{}, out)
func (mock *mockCluster) PeersWithFilter(ctx context.Context, in <-chan []peer.ID, out chan<- api.ID) error {
inCh := make(chan struct{})
close(inCh)
return mock.Peers(ctx, inCh, out)
}
func (mock *mockCluster) PeerAdd(ctx context.Context, in peer.ID, out *api.ID) error {