diff --git a/api/rest/client/client.go b/api/rest/client/client.go index 0f433ded..a49f704c 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -51,7 +51,7 @@ type Client interface { ID(context.Context) (api.ID, error) // Peers requests ID information for all cluster peers. - Peers(context.Context) ([]api.ID, error) + Peers(context.Context, chan<- api.ID) error // PeerAdd adds a new peer to the cluster. PeerAdd(ctx context.Context, pid peer.ID) (api.ID, error) // PeerRm removes a current peer from the cluster diff --git a/api/rest/client/lbclient.go b/api/rest/client/lbclient.go index d223cda3..8ae6e2e2 100644 --- a/api/rest/client/lbclient.go +++ b/api/rest/client/lbclient.go @@ -123,16 +123,13 @@ func (lc *loadBalancingClient) ID(ctx context.Context) (api.ID, error) { } // Peers requests ID information for all cluster peers. -func (lc *loadBalancingClient) Peers(ctx context.Context) ([]api.ID, error) { - var peers []api.ID +func (lc *loadBalancingClient) Peers(ctx context.Context, out chan<- api.ID) error { call := func(c Client) error { - var err error - peers, err = c.Peers(ctx) - return err + return c.Peers(ctx, out) } err := lc.retry(0, call) - return peers, err + return err } // PeerAdd adds a new peer to the cluster. diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index 4a898e08..9aa07812 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -34,13 +34,24 @@ func (c *defaultClient) ID(ctx context.Context) (api.ID, error) { } // Peers requests ID information for all cluster peers. -func (c *defaultClient) Peers(ctx context.Context) ([]api.ID, error) { +func (c *defaultClient) Peers(ctx context.Context, out chan<- api.ID) error { + defer close(out) + ctx, span := trace.StartSpan(ctx, "client/Peers") defer span.End() - var ids []api.ID - err := c.do(ctx, "GET", "/peers", nil, nil, &ids) - return ids, err + handler := func(dec *json.Decoder) error { + var obj api.ID + err := dec.Decode(&obj) + if err != nil { + return err + } + out <- obj + return nil + } + + return c.doStream(ctx, "GET", "/peers", nil, nil, handler) + } type peerAddBody struct { diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 71c9e8d9..ee68cb33 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -71,11 +71,12 @@ func TestPeers(t *testing.T) { defer shutdown(api) testF := func(t *testing.T, c Client) { - ids, err := c.Peers(ctx) + out := make(chan types.ID, 10) + err := c.Peers(ctx, out) if err != nil { t.Fatal(err) } - if len(ids) == 0 { + if len(out) == 0 { t.Error("expected some peers") } } @@ -92,11 +93,12 @@ func TestPeersWithError(t *testing.T) { addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/44444") var _ = c c, _ = NewDefaultClient(&Config{APIAddr: addr, DisableKeepAlives: true}) - ids, err := c.Peers(ctx) + out := make(chan types.ID, 10) + err := c.Peers(ctx, out) if err == nil { t.Fatal("expected error") } - if ids != nil { + if len(out) > 0 { t.Fatal("expected no ids") } } diff --git a/api/rest/client/request.go b/api/rest/client/request.go index 1112ef5c..0d80f888 100644 --- a/api/rest/client/request.go +++ b/api/rest/client/request.go @@ -3,12 +3,14 @@ package client import ( "context" "encoding/json" + "errors" "io" "io/ioutil" "net/http" "strings" "github.com/ipfs/ipfs-cluster/api" + "go.uber.org/multierr" "go.opencensus.io/trace" ) @@ -151,11 +153,16 @@ func (c *defaultClient) handleStreamResponse(resp *http.Response, handler respon } } - errTrailer := resp.Trailer.Get("X-Stream-Error") - if errTrailer != "" { + trailerErrs := resp.Trailer.Values("X-Stream-Error") + var err error + for _, trailerErr := range trailerErrs { + err = multierr.Append(err, errors.New(trailerErr)) + } + + if err != nil { return api.Error{ Code: 500, - Message: errTrailer, + Message: err.Error(), } } return nil diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 9445050c..4caee0d2 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -57,7 +57,7 @@ func NewAPI(ctx context.Context, cfg *Config) (*API, error) { return NewAPIWithHost(ctx, cfg, nil) } -// NewAPI creates a new REST API component using the given libp2p Host. +// NewAPIWithHost creates a new REST API component using the given libp2p Host. func NewAPIWithHost(ctx context.Context, cfg *Config, h host.Host) (*API, error) { api := API{ config: cfg, @@ -312,17 +312,28 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) { } func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) { - var peers []types.ID - err := api.rpcClient.CallContext( - r.Context(), - "", - "Cluster", - "Peers", - struct{}{}, - &peers, - ) + in := make(chan struct{}) + close(in) + out := make(chan types.ID, common.StreamChannelSize) + errCh := make(chan error, 1) + go func() { + defer close(errCh) - api.SendResponse(w, common.SetStatusAutomatically, err, peers) + errCh <- api.rpcClient.Stream( + r.Context(), + "", + "Cluster", + "Peers", + in, + out, + ) + }() + + iter := func() (interface{}, bool, error) { + p, ok := <-out + return p, ok, nil + } + api.StreamResponse(w, iter, errCh) } func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) { @@ -455,7 +466,7 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) { in := make(chan struct{}) close(in) - pins := make(chan types.Pin) + out := make(chan types.Pin, common.StreamChannelSize) errCh := make(chan error, 1) ctx, cancel := context.WithCancel(r.Context()) @@ -470,7 +481,7 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) { "Cluster", "Pins", in, - pins, + out, ) }() @@ -483,7 +494,7 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) { select { case <-ctx.Done(): break iterloop - case p, ok = <-pins: + case p, ok = <-out: if !ok { break iterloop } diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index c348ccc4..5d826d8e 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -96,14 +96,14 @@ func TestAPIVersionEndpoint(t *testing.T) { test.BothEndpoints(t, tf) } -func TestAPIPeerstEndpoint(t *testing.T) { +func TestAPIPeersEndpoint(t *testing.T) { ctx := context.Background() rest := testAPI(t) defer rest.Shutdown(ctx) tf := func(t *testing.T, url test.URLFunc) { - var list []*api.ID - test.MakeGet(t, rest, url(rest)+"/peers", &list) + var list []api.ID + test.MakeStreamingGet(t, rest, url(rest)+"/peers", &list, false) if len(list) != 1 { t.Fatal("expected 1 element") } @@ -559,7 +559,7 @@ func TestAPIMetricsEndpoint(t *testing.T) { defer rest.Shutdown(ctx) tf := func(t *testing.T, url test.URLFunc) { - var resp []*api.Metric + var resp []api.Metric test.MakeGet(t, rest, url(rest)+"/monitor/metrics/somemetricstype", &resp) if len(resp) == 0 { t.Fatal("No metrics found") @@ -804,7 +804,7 @@ func TestAPIIPFSGCEndpoint(t *testing.T) { rest := testAPI(t) defer rest.Shutdown(ctx) - testGlobalRepoGC := func(t *testing.T, gRepoGC *api.GlobalRepoGC) { + testGlobalRepoGC := func(t *testing.T, gRepoGC api.GlobalRepoGC) { if gRepoGC.PeerMap == nil { t.Fatal("expected a non-nil peer map") } @@ -836,11 +836,11 @@ func TestAPIIPFSGCEndpoint(t *testing.T) { tf := func(t *testing.T, url test.URLFunc) { var resp api.GlobalRepoGC test.MakePost(t, rest, url(rest)+"/ipfs/gc?local=true", []byte{}, &resp) - testGlobalRepoGC(t, &resp) + testGlobalRepoGC(t, resp) var resp1 api.GlobalRepoGC test.MakePost(t, rest, url(rest)+"/ipfs/gc", []byte{}, &resp1) - testGlobalRepoGC(t, &resp1) + testGlobalRepoGC(t, resp1) } test.BothEndpoints(t, tf) diff --git a/cluster.go b/cluster.go index 0b335fdb..918ea489 100644 --- a/cluster.go +++ b/cluster.go @@ -1747,7 +1747,7 @@ func (c *Cluster) Version() string { } // Peers returns the IDs of the members of this Cluster. -func (c *Cluster) Peers(ctx context.Context) []api.ID { +func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) { _, span := trace.StartSpan(ctx, "cluster/Peers") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1756,49 +1756,70 @@ func (c *Cluster) Peers(ctx context.Context) []api.ID { if err != nil { logger.Error(err) logger.Error("an empty list of peers will be returned") - return []api.ID{} + close(out) + return } - return c.peersWithFilter(ctx, peers) + c.peersWithFilter(ctx, peers, out) } // requests IDs from a given number of peers. -func (c *Cluster) peersWithFilter(ctx context.Context, peers []peer.ID) []api.ID { - lenPeers := len(peers) - ids := make([]api.ID, lenPeers) +func (c *Cluster) peersWithFilter(ctx context.Context, peers []peer.ID, out chan<- api.ID) { + defer close(out) // We should be done relatively quickly with this call. Otherwise // report errors. timeout := 15 * time.Second - ctxs, cancels := rpcutil.CtxsWithTimeout(ctx, lenPeers, timeout) - defer rpcutil.MultiCancel(cancels) + ctxCall, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - errs := c.rpcClient.MultiCall( - ctxs, - peers, - "Cluster", - "ID", - struct{}{}, - rpcutil.CopyIDsToIfaces(ids), - ) + in := make(chan struct{}) + close(in) + idsOut := make(chan api.ID, len(peers)) + errCh := make(chan []error, 1) - finalPeers := []api.ID{} + go func() { + defer close(errCh) + errCh <- c.rpcClient.MultiStream( + ctxCall, + peers, + "Cluster", + "IDStream", + in, + idsOut, + ) + }() + + // Unfortunately, we need to use idsOut as intermediary channel + // because it is closed when MultiStream ends and we cannot keep + // adding things on it (the errors below). + for id := range idsOut { + select { + case <-ctx.Done(): + logger.Errorf("Peers call aborted: %s", ctx.Err()) + return + case out <- id: + } + } + + // ErrCh will always be closed on context cancellation too. + errs := <-errCh for i, err := range errs { if err == nil { - finalPeers = append(finalPeers, ids[i]) - _ = finalPeers // staticcheck continue } - if rpc.IsAuthorizationError(err) { continue } - ids[i] = api.ID{} - ids[i].ID = peers[i] - ids[i].Error = err.Error() + select { + case <-ctx.Done(): + logger.Errorf("Peers call aborted: %s", ctx.Err()) + case out <- api.ID{ + ID: peers[i], + Error: err.Error(), + }: + } } - - return ids } // getTrustedPeers gives listed of trusted peers except the current peer and @@ -2001,8 +2022,6 @@ func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string, } } - msOut := make(chan api.PinInfo) - // We don't have a good timeout proposal for this. Depending on the // size of the state and the peformance of IPFS and the network, this // may take moderately long. @@ -2010,6 +2029,7 @@ func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string, ctx, cancel := context.WithCancel(ctx) defer cancel() + msOut := make(chan api.PinInfo) errsCh := make(chan []error, 1) go func() { defer close(errsCh) diff --git a/cluster_test.go b/cluster_test.go index 3513aeba..b90a2ac4 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -905,8 +905,10 @@ func TestClusterPeers(t *testing.T) { cl, _, _, _ := testingCluster(t) defer cleanState() defer cl.Shutdown(ctx) - peers := cl.Peers(ctx) - if len(peers) != 1 { + + out := make(chan api.ID, 10) + cl.Peers(ctx, out) + if len(out) != 1 { t.Fatal("expected 1 peer") } @@ -916,7 +918,8 @@ func TestClusterPeers(t *testing.T) { t.Fatal(err) } - if peers[0].ID != ident.ID { + p := <-out + if p.ID != ident.ID { t.Error("bad member") } } diff --git a/cmd/ipfs-cluster-ctl/formatters.go b/cmd/ipfs-cluster-ctl/formatters.go index 33309549..1573990b 100644 --- a/cmd/ipfs-cluster-ctl/formatters.go +++ b/cmd/ipfs-cluster-ctl/formatters.go @@ -54,6 +54,10 @@ func jsonFormatPrint(obj interface{}) { for o := range r { print(o) } + case chan api.ID: + for o := range r { + print(o) + } default: print(obj) } @@ -84,8 +88,8 @@ func textFormatObject(resp interface{}) { textFormatPrintMetric(r) case api.Alert: textFormatPrintAlert(r) - case []api.ID: - for _, item := range r { + case chan api.ID: + for item := range r { textFormatObject(item) } case chan api.GlobalPinInfo: diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index 51ac80fb..b24989bb 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -251,8 +251,15 @@ This command provides a list of the ID information of all the peers in the Clust Flags: []cli.Flag{}, ArgsUsage: " ", Action: func(c *cli.Context) error { - resp, cerr := globalClient.Peers(ctx) - formatResponse(c, resp, cerr) + out := make(chan api.ID, 1024) + errCh := make(chan error, 1) + go func() { + defer close(errCh) + errCh <- globalClient.Peers(ctx, out) + }() + formatResponse(c, out, nil) + err := <-errCh + formatResponse(c, nil, err) return nil }, }, diff --git a/connect_graph.go b/connect_graph.go index d0f27f6a..ead6df4d 100644 --- a/connect_graph.go +++ b/connect_graph.go @@ -2,7 +2,6 @@ package ipfscluster import ( "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/rpcutil" peer "github.com/libp2p/go-libp2p-core/peer" @@ -34,18 +33,32 @@ func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) { } peers := make([][]api.ID, len(members)) + errs := make([]error, len(members)) - ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(members)) - defer rpcutil.MultiCancel(cancels) + for i, member := range members { + in := make(chan struct{}) + close(in) + out := make(chan api.ID, 1024) + errCh := make(chan error, 1) + go func(i int) { + defer close(errCh) - errs := c.rpcClient.MultiCall( - ctxs, - members, - "Cluster", - "Peers", - struct{}{}, - rpcutil.CopyIDSliceToIfaces(peers), - ) + errCh <- c.rpcClient.Stream( + ctx, + member, + "Cluster", + "Peers", + in, + out, + ) + }(i) + var ids []api.ID + for id := range out { + ids = append(ids, id) + } + peers[i] = ids + errs[i] = <-errCh + } for i, err := range errs { p := peer.Encode(members[i]) diff --git a/go.mod b/go.mod index 410fbaa1..ad14971a 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/libp2p/go-libp2p-connmgr v0.3.1 github.com/libp2p/go-libp2p-consensus v0.0.1 github.com/libp2p/go-libp2p-core v0.13.0 - github.com/libp2p/go-libp2p-gorpc v0.3.1 + github.com/libp2p/go-libp2p-gorpc v0.3.2 github.com/libp2p/go-libp2p-gostream v0.3.1 github.com/libp2p/go-libp2p-http v0.2.1 github.com/libp2p/go-libp2p-kad-dht v0.15.0 diff --git a/go.sum b/go.sum index ae506355..3c93922f 100644 --- a/go.sum +++ b/go.sum @@ -780,8 +780,8 @@ github.com/libp2p/go-libp2p-discovery v0.5.0/go.mod h1:+srtPIU9gDaBNu//UHvcdliKB github.com/libp2p/go-libp2p-discovery v0.6.0 h1:1XdPmhMJr8Tmj/yUfkJMIi8mgwWrLUsCB3bMxdT+DSo= github.com/libp2p/go-libp2p-discovery v0.6.0/go.mod h1:/u1voHt0tKIe5oIA1RHBKQLVCWPna2dXmPNHc2zR9S8= github.com/libp2p/go-libp2p-gorpc v0.1.0/go.mod h1:DrswTLnu7qjLgbqe4fekX4ISoPiHUqtA45thTsJdE1w= -github.com/libp2p/go-libp2p-gorpc v0.3.1 h1:ZmqQIgHccgh/Ff1kS3ZlwATZRLvtuRUd633/MLWAx20= -github.com/libp2p/go-libp2p-gorpc v0.3.1/go.mod h1:sRz9ybP9rlOkJB1v65SMLr+NUEPB/ioLZn26MWIV4DU= +github.com/libp2p/go-libp2p-gorpc v0.3.2 h1:pQdGWqB+HImCXbKVbjqpgckUzGcXEPIYP8aisaYfkrs= +github.com/libp2p/go-libp2p-gorpc v0.3.2/go.mod h1:sRz9ybP9rlOkJB1v65SMLr+NUEPB/ioLZn26MWIV4DU= github.com/libp2p/go-libp2p-gostream v0.3.0/go.mod h1:pLBQu8db7vBMNINGsAwLL/ZCE8wng5V1FThoaE5rNjc= github.com/libp2p/go-libp2p-gostream v0.3.1 h1:XlwohsPn6uopGluEWs1Csv1QCEjrTXf2ZQagzZ5paAg= github.com/libp2p/go-libp2p-gostream v0.3.1/go.mod h1:1V3b+u4Zhaq407UUY9JLCpboaeufAeVQbnvAt12LRsI= diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 7a26d1da..b932eb35 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -1,9 +1,7 @@ package ipfscluster import ( - "bytes" "context" - "encoding/json" "errors" "flag" "fmt" @@ -606,9 +604,11 @@ func TestClustersPeers(t *testing.T) { delay() j := rand.Intn(nClusters) // choose a random cluster peer - peers := clusters[j].Peers(ctx) - if len(peers) != nClusters { + out := make(chan api.ID, len(clusters)) + clusters[j].Peers(ctx, out) + + if len(out) != nClusters { t.Fatal("expected as many peers as clusters") } @@ -620,7 +620,7 @@ func TestClustersPeers(t *testing.T) { clusterIDMap[id.ID] = id } - for _, p := range peers { + for p := range out { if p.Error != "" { t.Error(p.ID, p.Error) continue @@ -642,34 +642,6 @@ func TestClustersPeers(t *testing.T) { } } -func TestClustersPeersRetainOrder(t *testing.T) { - ctx := context.Background() - clusters, mock := createClusters(t) - defer shutdownClusters(t, clusters, mock) - - delay() - - for i := 0; i < 5; i++ { - j := rand.Intn(nClusters) // choose a random cluster peer - peers1, err := json.Marshal(clusters[j].Peers(ctx)) - if err != nil { - t.Fatal(err) - } - - waitForLeaderAndMetrics(t, clusters) - - k := rand.Intn(nClusters) - peers2, err := json.Marshal(clusters[k].Peers(ctx)) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(peers1, peers2) { - t.Error("expected both results to be same") - } - } -} - func TestClustersPin(t *testing.T) { ctx := context.Background() clusters, mock := createClusters(t) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 4e0fb5b8..1c87121a 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -685,21 +685,25 @@ func (ipfs *Connector) ConnectSwarms(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) defer cancel() - var ids []api.ID - err := ipfs.rpcClient.CallContext( - ctx, - "", - "Cluster", - "Peers", - struct{}{}, - &ids, - ) - if err != nil { - logger.Error(err) - return err - } - for _, id := range ids { + in := make(chan struct{}) + close(in) + out := make(chan api.ID) + go func() { + err := ipfs.rpcClient.Stream( + ctx, + "", + "Cluster", + "Peers", + in, + out, + ) + if err != nil { + logger.Error(err) + } + }() + + for id := range out { ipfsID := id.IPFS if id.Error != "" || ipfsID.Error != "" { continue diff --git a/peer_manager_test.go b/peer_manager_test.go index 62329bf3..b000f206 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -18,6 +18,19 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +func peers(ctx context.Context, t *testing.T, c *Cluster) []api.ID { + t.Helper() + out := make(chan api.ID) + go func() { + c.Peers(ctx, out) + }() + var ids []api.ID + for id := range out { + ids = append(ids, id) + } + return ids +} + func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host) { cls := make([]*Cluster, nClusters) mocks := make([]*test.IpfsMock, nClusters) @@ -105,7 +118,7 @@ func TestClustersPeerAdd(t *testing.T) { ttlDelay() f := func(t *testing.T, c *Cluster) { - ids := c.Peers(ctx) + ids := peers(ctx, t, c) // check they are tracked by the peer manager if len(ids) != nClusters { @@ -180,7 +193,7 @@ func TestClustersJoinBadPeer(t *testing.T) { if err == nil { t.Error("expected an error") } - ids := clusters[0].Peers(ctx) + ids := peers(ctx, t, clusters[0]) if len(ids) != 1 { t.Error("cluster should have only one member") } @@ -198,7 +211,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { clusters[0].PeerAdd(ctx, clusters[1].id) ttlDelay() - ids := clusters[1].Peers(ctx) + ids := peers(ctx, t, clusters[1]) // raft will have only 2 peers // crdt will have all peers autodiscovered by now if len(ids) < 2 { @@ -222,7 +235,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { t.Error("expected an error") } - ids = clusters[0].Peers(ctx) + ids = peers(ctx, t, clusters[0]) if len(ids) != 2 { t.Error("cluster should still have 2 peers") } @@ -237,7 +250,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { } ttlDelay() - ids = clusters[0].Peers(ctx) + ids = peers(ctx, t, clusters[0]) if len(ids) < 2 { t.Error("cluster should have at least 2 peers after removing and adding 1") } @@ -275,7 +288,7 @@ func TestClustersPeerRemove(t *testing.T) { t.Error("removed peer should have exited") } } else { - ids := c.Peers(ctx) + ids := peers(ctx, t, c) if len(ids) != nClusters-1 { t.Error("should have removed 1 peer") } @@ -302,7 +315,7 @@ func TestClustersPeerRemoveSelf(t *testing.T) { case "raft": for i := 0; i < len(clusters); i++ { waitForLeaderAndMetrics(t, clusters) - peers := clusters[i].Peers(ctx) + peers := peers(ctx, t, clusters[i]) t.Logf("Current cluster size: %d", len(peers)) if len(peers) != (len(clusters) - i) { t.Fatal("Previous peers not removed correctly") @@ -363,7 +376,7 @@ func TestClustersPeerRemoveLeader(t *testing.T) { for i := 0; i < len(clusters); i++ { leader := findLeader(t) - peers := leader.Peers(ctx) + peers := peers(ctx, t, leader) t.Logf("Current cluster size: %d", len(peers)) if len(peers) != (len(clusters) - i) { t.Fatal("Previous peers not removed correctly") @@ -528,7 +541,7 @@ func TestClustersPeerJoin(t *testing.T) { } f := func(t *testing.T, c *Cluster) { - peers := c.Peers(ctx) + peers := peers(ctx, t, c) str := c.id.String() + "\n" for _, p := range peers { str += " - " + p.ID.String() + "\n" @@ -571,7 +584,7 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) { ttlDelay() f2 := func(t *testing.T, c *Cluster) { - peers := c.Peers(ctx) + peers := peers(ctx, t, c) if len(peers) != nClusters { t.Error("all peers should be connected") } diff --git a/pnet_test.go b/pnet_test.go index 449eb02a..d08c5812 100644 --- a/pnet_test.go +++ b/pnet_test.go @@ -51,10 +51,10 @@ func TestSimplePNet(t *testing.T) { } ttlDelay() - if len(clusters[0].Peers(ctx)) != len(clusters[1].Peers(ctx)) { + if len(peers(ctx, t, clusters[0])) != len(peers(ctx, t, clusters[1])) { t.Fatal("Expected same number of peers") } - if len(clusters[0].Peers(ctx)) < 2 { + if len(peers(ctx, t, clusters[0])) < 2 { // crdt mode has auto discovered all peers at this point. // Raft mode has 2 peers only. t.Fatal("Expected at least 2 peers") diff --git a/rpc_api.go b/rpc_api.go index c1b9c67a..479a4bc2 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -160,6 +160,18 @@ func (rpcapi *ClusterRPCAPI) ID(ctx context.Context, in struct{}, out *api.ID) e return nil } +// IDStream runs Cluster.ID() but in streaming form. +func (rpcapi *ClusterRPCAPI) IDStream(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error { + defer close(out) + id := rpcapi.c.ID(ctx) + select { + case <-ctx.Done(): + return ctx.Err() + case out <- id: + } + return nil +} + // Pin runs Cluster.pin(). func (rpcapi *ClusterRPCAPI) Pin(ctx context.Context, in api.Pin, out *api.Pin) error { // we do not call the Pin method directly since that method does not @@ -227,14 +239,15 @@ func (rpcapi *ClusterRPCAPI) Version(ctx context.Context, in struct{}, out *api. } // Peers runs Cluster.Peers(). -func (rpcapi *ClusterRPCAPI) Peers(ctx context.Context, in struct{}, out *[]api.ID) error { - *out = rpcapi.c.Peers(ctx) +func (rpcapi *ClusterRPCAPI) Peers(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error { + rpcapi.c.Peers(ctx, out) return nil } // PeersWithFilter runs Cluster.peersWithFilter(). -func (rpcapi *ClusterRPCAPI) PeersWithFilter(ctx context.Context, in []peer.ID, out *[]api.ID) error { - *out = rpcapi.c.peersWithFilter(ctx, in) +func (rpcapi *ClusterRPCAPI) PeersWithFilter(ctx context.Context, in <-chan []peer.ID, out chan<- api.ID) error { + peers := <-in + rpcapi.c.peersWithFilter(ctx, peers, out) return nil } diff --git a/rpc_policy.go b/rpc_policy.go index e06cd9b1..1d7eadf8 100644 --- a/rpc_policy.go +++ b/rpc_policy.go @@ -11,6 +11,7 @@ var DefaultRPCPolicy = map[string]RPCEndpointType{ "Cluster.BlockAllocate": RPCClosed, "Cluster.ConnectGraph": RPCClosed, "Cluster.ID": RPCOpen, + "Cluster.IDStream": RPCOpen, "Cluster.IPFSID": RPCClosed, "Cluster.Join": RPCClosed, "Cluster.PeerAdd": RPCOpen, // Used by Join() diff --git a/sharness/t0025-ctl-status-report-commands.sh b/sharness/t0025-ctl-status-report-commands.sh index 4b2b978b..e5b6f352 100755 --- a/sharness/t0025-ctl-status-report-commands.sh +++ b/sharness/t0025-ctl-status-report-commands.sh @@ -12,8 +12,8 @@ test_expect_success IPFS,CLUSTER,JQ "cluster-ctl can read id" ' [ -n "$id" ] && ( ipfs-cluster-ctl id | egrep -q "$id" ) ' -test_expect_success IPFS,CLUSTER "cluster-ctl list 0 peers" ' - peer_length=`ipfs-cluster-ctl --enc=json peers ls | jq ". | length"` +test_expect_success IPFS,CLUSTER "cluster-ctl list 1 peer" ' + peer_length=`ipfs-cluster-ctl --enc=json peers ls | jq -n "[inputs] | length"` [ $peer_length -eq 1 ] ' diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 8d358c56..0acd7c0c 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -178,6 +178,18 @@ func (mock *mockCluster) ID(ctx context.Context, in struct{}, out *api.ID) error return nil } +func (mock *mockCluster) IDStream(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error { + defer close(out) + var id api.ID + mock.ID(ctx, struct{}{}, &id) + select { + case <-ctx.Done(): + return ctx.Err() + case out <- id: + } + return nil +} + func (mock *mockCluster) Version(ctx context.Context, in struct{}, out *api.Version) error { *out = api.Version{ Version: "0.0.mock", @@ -185,16 +197,18 @@ func (mock *mockCluster) Version(ctx context.Context, in struct{}, out *api.Vers return nil } -func (mock *mockCluster) Peers(ctx context.Context, in struct{}, out *[]api.ID) error { +func (mock *mockCluster) Peers(ctx context.Context, in <-chan struct{}, out chan<- api.ID) error { id := api.ID{} - mock.ID(ctx, in, &id) - - *out = []api.ID{id} + mock.ID(ctx, struct{}{}, &id) + out <- id + close(out) return nil } -func (mock *mockCluster) PeersWithFilter(ctx context.Context, in []peer.ID, out *[]api.ID) error { - return mock.Peers(ctx, struct{}{}, out) +func (mock *mockCluster) PeersWithFilter(ctx context.Context, in <-chan []peer.ID, out chan<- api.ID) error { + inCh := make(chan struct{}) + close(inCh) + return mock.Peers(ctx, inCh, out) } func (mock *mockCluster) PeerAdd(ctx context.Context, in peer.ID, out *api.ID) error {