Hector Sanjuan 0d73d33ef5 Pintracker: streaming methods
This commit continues the work of taking advantage of the streaming
capabilities in go-libp2p-gorpc by improving the ipfsconnector and pintracker

StatusAll and RecoverAll methods are now streaming methods, with the REST API
output changing accordingly to produce a stream of GlobalPinInfos rather than
a json array.

pin/ls request to the ipfs daemon now use ?stream=true and avoid having to
load the full pinset map on memory. StatusAllLocal and RecoverAllLocal
requests to the pin tracker stream all the way and no longer store the full
pinset, and the full PinInfo status slice before sending it out.

We have additionally switched to a pattern where streaming methods receive the
channel as an argument, allowing the caller to decide on whether to launch a
goroutine, do buffering etc.
2022-03-22 15:38:01 +01:00

718 lines
16 KiB

package ipfscluster
import (
cid "github.com/ipfs/go-cid"
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host) {
cls := make([]*Cluster, nClusters)
mocks := make([]*test.IpfsMock, nClusters)
var wg sync.WaitGroup
for i := 0; i < nClusters; i++ {
go func(i int) {
defer wg.Done()
cl, m := createOnePeerCluster(t, i, testingClusterSecret)
cls[i] = cl
mocks[i] = m
// Creat an identity
ident, err := config.NewIdentity()
if err != nil {
// Create a config
cfg := &Config{}
listen, _ := ma.NewMultiaddr("/ip4/")
cfg.ListenAddr = []ma.Multiaddr{listen}
cfg.Secret = testingClusterSecret
h, _, _ := createHost(t, ident.PrivateKey, testingClusterSecret, cfg.ListenAddr)
// Connect host to all peers. This will allow that they can discover
// each others via DHT.
for i := 0; i < nClusters; i++ {
err := h.Connect(
ID: cls[i].host.ID(),
Addrs: cls[i].host.Addrs(),
if err != nil {
return cls, mocks, h
func clusterAddr(c *Cluster) ma.Multiaddr {
for _, a := range c.host.Addrs() {
if _, err := a.ValueForProtocol(ma.P_IP4); err == nil {
p := peer.Encode(c.id)
cAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", a, p))
return cAddr
return nil
func TestClustersPeerAdd(t *testing.T) {
ctx := context.Background()
clusters, mocks, boot := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
defer boot.Close()
if len(clusters) < 2 {
t.Skip("need at least 2 nodes for this test")
for i := 1; i < len(clusters); i++ {
id, err := clusters[0].PeerAdd(ctx, clusters[i].id)
if err != nil {
if !containsPeer(id.ClusterPeers, clusters[0].id) {
// ClusterPeers is originally empty and contains nodes as we add them
t.Log(i, id.ClusterPeers)
t.Fatal("cluster peers should be up to date with the cluster")
h := test.Cid1
_, err := clusters[1].Pin(ctx, h, api.PinOptions{})
if err != nil {
f := func(t *testing.T, c *Cluster) {
ids := c.Peers(ctx)
// check they are tracked by the peer manager
if len(ids) != nClusters {
t.Error("added clusters are not part of clusters")
// Check that they are part of the consensus
pins, err := c.pinsSlice(ctx)
if err != nil {
if len(pins) != 1 {
t.Error("expected 1 pin everywhere")
if len(c.ID(ctx).ClusterPeers) != nClusters {
t.Error("By now cluster peers should reflect all peers")
runF(t, clusters, f)
for _, c := range clusters {
f2 := func(t *testing.T, c *Cluster) {
// check that all peers are part of the peerstore
// (except ourselves)
addrs := c.peerManager.LoadPeerstore()
peerMap := make(map[peer.ID]struct{})
for _, a := range addrs {
pinfo, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
peerMap[pinfo.ID] = struct{}{}
if len(peerMap) == 0 {
t.Errorf("%s: peerstore to store at least 1 peer", c.id)
runF(t, clusters, f2)
func TestClustersJoinBadPeer(t *testing.T) {
ctx := context.Background()
clusters, mocks, boot := peerManagerClusters(t)
defer shutdownClusters(t, clusters[0:1], mocks[0:1])
defer boot.Close()
addr := clusterAddr(clusters[1])
if len(clusters) < 2 {
t.Skip("need at least 2 nodes for this test")
for _, c := range clusters[1:] {
// We add a cluster that has been shutdown
// (closed transports)
// Let the OS actually close the ports.
// Sometimes we hang otherwise.
err := clusters[0].Join(ctx, addr)
if err == nil {
t.Error("expected an error")
ids := clusters[0].Peers(ctx)
if len(ids) != 1 {
t.Error("cluster should have only one member")
func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
ctx := context.Background()
clusters, mocks, boot := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
defer boot.Close()
if len(clusters) < 3 {
t.Skip("need at least 3 nodes for this test")
clusters[0].PeerAdd(ctx, clusters[1].id)
ids := clusters[1].Peers(ctx)
// raft will have only 2 peers
// crdt will have all peers autodiscovered by now
if len(ids) < 2 {
t.Error("expected at least 2 peers")
// Now we shutdown the one member of the running cluster
// and try to add someone else.
err := clusters[1].Shutdown(ctx)
if err != nil {
t.Error("Shutdown should be clean: ", err)
switch consensus {
case "raft":
delay() // This makes sure the leader realizes that it's not
// leader anymore. Otherwise it commits fine.
_, err = clusters[0].PeerAdd(ctx, clusters[2].id)
if err == nil {
t.Error("expected an error")
ids = clusters[0].Peers(ctx)
if len(ids) != 2 {
t.Error("cluster should still have 2 peers")
case "crdt":
// crdt does not really care whether we add or remove
delay() // let metrics expire
_, err = clusters[0].PeerAdd(ctx, clusters[2].id)
if err != nil {
ids = clusters[0].Peers(ctx)
if len(ids) < 2 {
t.Error("cluster should have at least 2 peers after removing and adding 1")
t.Fatal("bad consensus")
func TestClustersPeerRemove(t *testing.T) {
ctx := context.Background()
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
switch consensus {
case "crdt":
// Peer Rm is a no op.
case "raft":
p := clusters[1].ID(ctx).ID
err := clusters[0].PeerRemove(ctx, p)
if err != nil {
f := func(t *testing.T, c *Cluster) {
if c.ID(ctx).ID == p { //This is the removed cluster
_, ok := <-c.Done()
if ok {
t.Error("removed peer should have exited")
} else {
ids := c.Peers(ctx)
if len(ids) != nClusters-1 {
t.Error("should have removed 1 peer")
runF(t, clusters, f)
t.Fatal("bad consensus")
func TestClustersPeerRemoveSelf(t *testing.T) {
ctx := context.Background()
// this test hangs sometimes if there are problems
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
switch consensus {
case "crdt":
// remove is a no op in CRDTs
case "raft":
for i := 0; i < len(clusters); i++ {
waitForLeaderAndMetrics(t, clusters)
peers := clusters[i].Peers(ctx)
t.Logf("Current cluster size: %d", len(peers))
if len(peers) != (len(clusters) - i) {
t.Fatal("Previous peers not removed correctly")
err := clusters[i].PeerRemove(ctx, clusters[i].ID(ctx).ID)
// Last peer member won't be able to remove itself
// In this case, we shut it down.
if err != nil {
if i != len(clusters)-1 { //not last
} else {
err := clusters[i].Shutdown(ctx)
if err != nil {
// potential hanging place
_, more := <-clusters[i].Done()
if more {
t.Error("should be done")
t.Fatal("bad consensus")
func TestClustersPeerRemoveLeader(t *testing.T) {
ctx := context.Background()
// this test is like the one above, except it always
// removes the current leader.
// this test hangs sometimes if there are problems
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
switch consensus {
case "crdt":
case "raft":
findLeader := func(t *testing.T) *Cluster {
var l peer.ID
for _, c := range clusters {
if !c.shutdownB {
waitForLeaderAndMetrics(t, clusters)
l, _ = c.consensus.Leader(ctx)
for _, c := range clusters {
if c.id == l {
return c
t.Fatal("no leader found")
return nil
for i := 0; i < len(clusters); i++ {
leader := findLeader(t)
peers := leader.Peers(ctx)
t.Logf("Current cluster size: %d", len(peers))
if len(peers) != (len(clusters) - i) {
t.Fatal("Previous peers not removed correctly")
err := leader.PeerRemove(ctx, leader.id)
// Last peer member won't be able to remove itself
// In this case, we shut it down.
if err != nil {
if i != len(clusters)-1 { //not last
} else {
err := leader.Shutdown(ctx)
if err != nil {
_, more := <-leader.Done()
if more {
t.Error("should be done")
time.Sleep(time.Second / 2)
t.Fatal("bad consensus")
func TestClustersPeerRemoveReallocsPins(t *testing.T) {
// This test is testing that the peers are vacated upon
// removal.
ctx := context.Background()
clusters, mocks := createClusters(t)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
// Adjust the replication factor for re-allocation
for _, c := range clusters {
c.config.ReplicationFactorMin = nClusters - 1
c.config.ReplicationFactorMax = nClusters - 1
// We choose to remove the leader, to make things even more interesting
chosenID, err := clusters[0].consensus.Leader(ctx)
if err != nil {
// choose a random peer - crdt
i := rand.Intn(nClusters)
chosenID = clusters[i].host.ID()
var chosen *Cluster
var chosenIndex int
for i, cl := range clusters {
if id := cl.ID(ctx).ID; id == chosenID {
chosen = cl
chosenIndex = i
if chosen == nil {
shutdownClusters(t, clusters, mocks)
t.Fatal("did not get to choose a peer?")
chosenMock := mocks[chosenIndex]
// Remove the chosen peer from set
clusters = append(clusters[:chosenIndex], clusters[chosenIndex+1:]...)
mocks = append(mocks[:chosenIndex], mocks[chosenIndex+1:]...)
defer chosen.Shutdown(ctx)
defer chosenMock.Close()
defer shutdownClusters(t, clusters, mocks)
prefix := test.Cid1.Prefix()
// Pin nCluster random pins. This ensures each peer will
// pin the same number of Cids.
for i := 0; i < nClusters; i++ {
h, err := prefix.Sum(randomBytes())
if err != nil {
_, err = chosen.Pin(ctx, h, api.PinOptions{})
if err != nil {
// At this point, all peers must have nClusters -1 pins
// associated to them.
// Find out which pins are associated to the chosen peer.
interestingCids := []cid.Cid{}
pins, err := chosen.pinsSlice(ctx)
if err != nil {
if len(pins) != nClusters {
t.Fatal("expected number of tracked pins to be nClusters")
for _, p := range pins {
if containsPeer(p.Allocations, chosenID) {
//t.Logf("%s pins %s", chosenID, p.Cid)
interestingCids = append(interestingCids, p.Cid)
if len(interestingCids) != nClusters-1 {
t.Fatalf("Expected %d allocated CIDs but got %d", nClusters-1,
// Now the chosen removes itself. Ignoring errors as they will
// be caught below and crdt does error here.
chosen.PeerRemove(ctx, chosenID)
waitForLeaderAndMetrics(t, clusters)
delay() // this seems to fail when not waiting enough...
for _, icid := range interestingCids {
// Now check that the allocations are new.
newPin, err := clusters[1].PinGet(ctx, icid)
if err != nil {
t.Fatal("error getting the new allocations for", icid)
if containsPeer(newPin.Allocations, chosenID) {
t.Fatal("pin should not be allocated to the removed peer")
func TestClustersPeerJoin(t *testing.T) {
ctx := context.Background()
clusters, mocks, boot := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
defer boot.Close()
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
for i := 1; i < len(clusters); i++ {
err := clusters[i].Join(ctx, clusterAddr(clusters[0]))
if err != nil {
h := test.Cid1
clusters[0].Pin(ctx, h, api.PinOptions{})
for _, p := range clusters {
f := func(t *testing.T, c *Cluster) {
peers := c.Peers(ctx)
str := c.id.String() + "\n"
for _, p := range peers {
str += " - " + p.ID.String() + "\n"
if len(peers) != nClusters {
t.Error("all peers should be connected")
pins, err := c.pinsSlice(ctx)
if err != nil {
if len(pins) != 1 || !pins[0].Cid.Equals(h) {
t.Error("all peers should have pinned the cid")
runF(t, clusters, f)
func TestClustersPeerJoinAllAtOnce(t *testing.T) {
ctx := context.Background()
clusters, mocks, boot := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
defer boot.Close()
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
f := func(t *testing.T, c *Cluster) {
err := c.Join(ctx, clusterAddr(clusters[0]))
if err != nil {
runF(t, clusters[1:], f)
h := test.Cid1
clusters[0].Pin(ctx, h, api.PinOptions{})
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers(ctx)
if len(peers) != nClusters {
t.Error("all peers should be connected")
pins, err := c.pinsSlice(ctx)
if err != nil {
if len(pins) != 1 || !pins[0].Cid.Equals(h) {
t.Error("all peers should have pinned the cid")
runF(t, clusters, f2)
// This test fails a lot when re-use port is not available (MacOS, Windows)
// func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
// clusters, mocks,boot := peerManagerClusters(t)
// defer shutdownClusters(t, clusters, mocks)
// defer boot.Close()
// if len(clusters) < 3 {
// t.Skip("test needs at least 3 clusters")
// }
// delay()
// // We have a 2 node cluster and the rest of nodes join
// // one of the two seeds randomly
// err := clusters[1].Join(clusterAddr(clusters[0]))
// if err != nil {
// t.Fatal(err)
// }
// f := func(t *testing.T, c *Cluster) {
// j := rand.Intn(2)
// err := c.Join(clusterAddr(clusters[j]))
// if err != nil {
// t.Fatal(err)
// }
// }
// runF(t, clusters[2:], f)
// hash := test.Cid1
// clusters[0].Pin(api.PinCid(hash))
// delay()
// f2 := func(t *testing.T, c *Cluster) {
// peers := c.Peers()
// if len(peers) != nClusters {
// peersIds := []peer.ID{}
// for _, p := range peers {
// peersIds = append(peersIds, p.ID)
// }
// t.Errorf("%s sees %d peers: %s", c.id, len(peers), peersIds)
// }
// pins := c.Pins()
// if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
// t.Error("all peers should have pinned the cid")
// }
// }
// runF(t, clusters, f2)
// }
// Tests that a peer catches up on the state correctly after rejoining
func TestClustersPeerRejoin(t *testing.T) {
ctx := context.Background()
clusters, mocks, boot := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
defer boot.Close()
// pin something in c0
pin1 := test.Cid1
_, err := clusters[0].Pin(ctx, pin1, api.PinOptions{})
if err != nil {
// add all clusters
for i := 1; i < len(clusters); i++ {
err := clusters[i].Join(ctx, clusterAddr(clusters[0]))
if err != nil {
// all added peers should have the content
for i := 1; i < len(clusters); i++ {
pinfo := clusters[i].tracker.Status(ctx, pin1)
if pinfo.Status != api.TrackerStatusPinned {
t.Error("Added peers should pin the content")
clusters[0].config.LeaveOnShutdown = true
err = clusters[0].Shutdown(ctx)
if err != nil {
// Forget peer so we can re-add one in same address/port
f := func(t *testing.T, c *Cluster) {
c.peerManager.RmPeer(clusters[0].id) // errors ignore for crdts
runF(t, clusters[1:], f)
// Pin something on the rest
pin2 := test.Cid2
_, err = clusters[1].Pin(ctx, pin2, api.PinOptions{})
if err != nil {
// Rejoin c0
c0, m0 := createOnePeerCluster(t, 0, testingClusterSecret)
clusters[0] = c0
mocks[0] = m0
err = c0.Join(ctx, clusterAddr(clusters[1]))
if err != nil {
pinfo := clusters[0].tracker.Status(ctx, pin2)
if pinfo.Status != api.TrackerStatusPinned {
t.Error("re-joined cluster should have caught up")
pinfo = clusters[0].tracker.Status(ctx, pin1)
if pinfo.Status != api.TrackerStatusPinned {
t.Error("re-joined cluster should have original pin")