Fix #543: Use only healthy peers when adding everywhere (+tests)

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-09-26 13:42:20 +02:00
parent 638128d6a0
commit e7b1eacf83
3 changed files with 107 additions and 5 deletions

96
add_test.go Normal file
View File

@ -0,0 +1,96 @@
package ipfscluster
// This files has tests for Add* using multiple cluster peers.
import (
"mime/multipart"
"testing"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
func TestAdd(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
t.Run("local", func(t *testing.T) {
params := api.DefaultAddParams()
params.Shard = false
params.Name = "testlocal"
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[0].AddFile(r, params)
if err != nil {
t.Fatal(err)
}
if ci.String() != test.ShardingDirBalancedRootCID {
t.Fatal("unexpected root CID for local add")
}
pinDelay()
f := func(t *testing.T, c *Cluster) {
pin := c.StatusLocal(ci)
if pin.Error != "" {
t.Error(pin.Error)
}
if pin.Status != api.TrackerStatusPinned {
t.Error("item should be pinned")
}
}
runF(t, clusters, f)
})
}
func TestAddPeerDown(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
err := clusters[0].Shutdown()
if err != nil {
t.Fatal(err)
}
waitForLeaderAndMetrics(t, clusters)
t.Run("local", func(t *testing.T) {
params := api.DefaultAddParams()
params.Shard = false
params.Name = "testlocal"
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[1].AddFile(r, params)
if err != nil {
t.Fatal(err)
}
if ci.String() != test.ShardingDirBalancedRootCID {
t.Fatal("unexpected root CID for local add")
}
pinDelay()
f := func(t *testing.T, c *Cluster) {
if c.id == clusters[0].id {
return
}
pin := c.StatusLocal(ci)
if pin.Error != "" {
t.Error(pin.Error)
}
if pin.Status != api.TrackerStatusPinned {
t.Error("item should be pinned")
}
}
runF(t, clusters, f)
})
}

View File

@ -31,6 +31,8 @@ import (
// consensus layer.
var ReadyTimeout = 30 * time.Second
var pingMetricName = "ping"
// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
type Cluster struct {
@ -252,7 +254,7 @@ func (c *Cluster) pushPingMetrics() {
ticker := time.NewTicker(c.config.MonitorPingInterval)
for {
metric := api.Metric{
Name: "ping",
Name: pingMetricName,
Peer: c.id,
Valid: true,
}
@ -279,7 +281,7 @@ func (c *Cluster) alertsHandler() {
if err == nil && leader == c.id {
logger.Warningf("Peer %s received alert for %s in %s", c.id, alrt.MetricName, alrt.Peer.Pretty())
switch alrt.MetricName {
case "ping":
case pingMetricName:
c.repinFromPeer(alrt.Peer)
}
}

View File

@ -202,10 +202,14 @@ func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in api.PinSerial, out *
// Return the current peer list.
if pin.ReplicationFactorMin < 0 {
peers, err := rpcapi.c.consensus.Peers()
if err != nil {
return err
// Returned metrics are Valid and belong to current
// Cluster peers.
metrics := rpcapi.c.monitor.LatestMetrics(pingMetricName)
peers := make([]peer.ID, len(metrics), len(metrics))
for i, m := range metrics {
peers[i] = m.Peer
}
*out = api.PeersToStrings(peers)
return nil
}