Hector Sanjuan 8f06baa1bf Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:

  * Each component is initialized with a configuration object
  defined by its module
  * Each component decides how the JSON representation of its
  configuration looks like
  * Each component parses and validates its own configuration
  * Each component exposes its own defaults
  * Component configurations are make the sections of a
  central JSON configuration file (which replaces the current
  JSON format)
  * Component configurations implement a common interface
  (config.ComponentConfig) with a set of common operations
  * The central configuration file is managed by a
  config.ConfigManager which:
    * Registers ComponentConfigs
    * Assigns the correspondent sections from the JSON file to each
    component and delegates the parsing
    * Delegates the JSON generation for each section
    * Can be notified when the configuration is updated and must be
    saved to disk

The new service.json would then look as follows:

  "cluster": {
    "id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
    "private_key": "<...>",
    "secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
    "peers": [],
    "bootstrap": [],
    "leave_on_shutdown": false,
    "listen_multiaddress": "/ip4/",
    "state_sync_interval": "1m0s",
    "ipfs_sync_interval": "2m10s",
    "replication_factor": -1,
    "monitor_ping_interval": "15s"
  "consensus": {
    "raft": {
      "heartbeat_timeout": "1s",
      "election_timeout": "1s",
      "commit_timeout": "50ms",
      "max_append_entries": 64,
      "trailing_logs": 10240,
      "snapshot_interval": "2m0s",
      "snapshot_threshold": 8192,
      "leader_lease_timeout": "500ms"
  "api": {
    "restapi": {
      "listen_multiaddress": "/ip4/",
      "read_timeout": "30s",
      "read_header_timeout": "5s",
      "write_timeout": "1m0s",
      "idle_timeout": "2m0s"
  "ipfs_connector": {
    "ipfshttp": {
      "proxy_listen_multiaddress": "/ip4/",
      "node_multiaddress": "/ip4/",
      "connect_swarms_delay": "7s",
      "proxy_read_timeout": "10m0s",
      "proxy_read_header_timeout": "5s",
      "proxy_write_timeout": "10m0s",
      "proxy_idle_timeout": "1m0s"
  "monitor": {
    "monbasic": {
      "check_interval": "15s"
  "informer": {
    "disk": {
      "metric_ttl": "30s",
      "metric_type": "freespace"
    "numpin": {
      "metric_ttl": "10s"

This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded

Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-18 00:00:12 +02:00

386 lines
8.8 KiB

package ipfscluster
import (
cid "github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"
func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
cls := make([]*Cluster, nClusters, nClusters)
mocks := make([]*test.IpfsMock, nClusters, nClusters)
var wg sync.WaitGroup
for i := 0; i < nClusters; i++ {
go func(i int) {
defer wg.Done()
cl, m := createOnePeerCluster(t, i, testingClusterSecret)
cls[i] = cl
mocks[i] = m
return cls, mocks
func clusterAddr(c *Cluster) ma.Multiaddr {
return multiaddrJoin(c.config.ListenAddr, c.ID().ID)
func TestClustersPeerAdd(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("need at least 2 nodes for this test")
for i := 1; i < len(clusters); i++ {
addr := clusterAddr(clusters[i])
id, err := clusters[0].PeerAdd(addr)
if err != nil {
if len(id.ClusterPeers) != i {
// ClusterPeers is originally empty and contains nodes as we add them
t.Fatal("cluster peers should be up to date with the cluster")
h, _ := cid.Decode(test.TestCid1)
err := clusters[1].Pin(api.PinCid(h))
if err != nil {
f := func(t *testing.T, c *Cluster) {
ids := c.Peers()
// check they are tracked by the peer manager
if len(ids) != nClusters {
t.Error("added clusters are not part of clusters")
// Check that they are part of the consensus
pins := c.Pins()
if len(pins) != 1 {
t.Error("expected 1 pin everywhere")
if len(c.ID().ClusterPeers) != nClusters-1 {
t.Error("By now cluster peers should reflect all peers")
// // check that its part of the configuration
// if len(c.config.ClusterPeers) != nClusters-1 {
// t.Error("expected different cluster peers in the configuration")
// }
// for _, peer := range c.config.ClusterPeers {
// if peer == nil {
// t.Error("something went wrong adding peer to config")
// }
// }
runF(t, clusters, f)
func TestClustersPeerAddBadPeer(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("need at least 2 nodes for this test")
// We add a cluster that has been shutdown
// (closed transports)
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
if err == nil {
t.Error("expected an error")
ids := clusters[0].Peers()
if len(ids) != 1 {
t.Error("cluster should have only one member")
func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("need at least 3 nodes for this test")
_, err := clusters[0].PeerAdd(clusterAddr(clusters[1]))
ids := clusters[1].Peers()
if len(ids) != 2 {
t.Error("expected 2 peers")
// Now we shutdown one member of the running cluster
// and try to add someone else.
_, err = clusters[0].PeerAdd(clusterAddr(clusters[2]))
if err == nil {
t.Error("expected an error")
ids = clusters[0].Peers()
if len(ids) != 2 {
t.Error("cluster should still have 2 peers")
func TestClustersPeerRemove(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
p := clusters[1].ID().ID
//t.Logf("remove %s from %s", p.Pretty(), clusters[0].config.ClusterPeers)
err := clusters[0].PeerRemove(p)
if err != nil {
f := func(t *testing.T, c *Cluster) {
if c.ID().ID == p { //This is the removed cluster
_, ok := <-c.Done()
if ok {
t.Error("removed peer should have exited")
// if len(c.config.ClusterPeers) != 0 {
// t.Error("cluster peers should be empty")
// }
} else {
ids := c.Peers()
if len(ids) != nClusters-1 {
t.Error("should have removed 1 peer")
// if len(c.config.ClusterPeers) != nClusters-1 {
// t.Log(c.config.ClusterPeers)
// t.Error("should have removed peer from config")
// }
runF(t, clusters, f)
func TestClusterPeerRemoveSelf(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
for i := 0; i < len(clusters); i++ {
err := clusters[i].PeerRemove(clusters[i].ID().ID)
if err != nil {
_, more := <-clusters[i].Done()
if more {
t.Error("should be done")
func TestClusterPeerRemoveReallocsPins(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
// Adjust the replication factor for re-allocation
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
cpeer := clusters[0]
clusterID := cpeer.ID().ID
tmpCid, _ := cid.Decode(test.TestCid1)
prefix := tmpCid.Prefix()
// Pin nCluster random pins. This ensures each peer will
// pin the same number of Cids.
for i := 0; i < nClusters; i++ {
h, err := prefix.Sum(randomBytes())
checkErr(t, err)
err = cpeer.Pin(api.PinCid(h))
checkErr(t, err)
// At this point, all peers must have 1 pin associated to them.
// Find out which pin is associated to cpeer.
interestingCids := []*cid.Cid{}
pins := cpeer.Pins()
if len(pins) != nClusters {
t.Fatal("expected number of tracked pins to be nClusters")
for _, p := range pins {
if containsPeer(p.Allocations, clusterID) {
//t.Logf("%s pins %s", clusterID, p.Cid)
interestingCids = append(interestingCids, p.Cid)
if len(interestingCids) != nClusters-1 {
//t.Fatal("The number of allocated Cids is not expected")
t.Fatalf("Expected %d allocated CIDs but got %d", nClusters-1,
// Now remove cluster peer
err := clusters[0].PeerRemove(clusterID)
if err != nil {
t.Fatal("error removing peer:", err)
for _, icid := range interestingCids {
// Now check that the allocations are new.
newPin, err := clusters[0].PinGet(icid)
if err != nil {
t.Fatal("error getting the new allocations for", icid)
if containsPeer(newPin.Allocations, clusterID) {
t.Fatal("pin should not be allocated to the removed peer")
func TestClustersPeerJoin(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
for i := 1; i < len(clusters); i++ {
err := clusters[i].Join(clusterAddr(clusters[0]))
if err != nil {
hash, _ := cid.Decode(test.TestCid1)
f := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
pins := c.Pins()
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
runF(t, clusters, f)
func TestClustersPeerJoinAllAtOnce(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 2 {
t.Skip("test needs at least 2 clusters")
f := func(t *testing.T, c *Cluster) {
err := c.Join(clusterAddr(clusters[0]))
if err != nil {
runF(t, clusters[1:], f)
hash, _ := cid.Decode(test.TestCid1)
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
pins := c.Pins()
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
runF(t, clusters, f2)
func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
// We have a 2 node cluster and the rest of nodes join
// one of the two seeds randomly
err := clusters[1].Join(clusterAddr(clusters[0]))
if err != nil {
f := func(t *testing.T, c *Cluster) {
j := rand.Intn(2)
err := c.Join(clusterAddr(clusters[j]))
if err != nil {
runF(t, clusters[2:], f)
hash, _ := cid.Decode(test.TestCid1)
f2 := func(t *testing.T, c *Cluster) {
peers := c.Peers()
if len(peers) != nClusters {
t.Error("all peers should be connected")
pins := c.Pins()
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
runF(t, clusters, f2)