2016-12-08 16:24:38 +00:00
|
|
|
package ipfscluster
|
|
|
|
|
|
|
|
import (
|
2017-11-29 16:49:03 +00:00
|
|
|
"flag"
|
2016-12-21 13:30:54 +00:00
|
|
|
"fmt"
|
|
|
|
"math/rand"
|
|
|
|
"os"
|
2017-03-09 13:44:14 +00:00
|
|
|
"sort"
|
2017-02-13 15:46:53 +00:00
|
|
|
"strings"
|
2016-12-21 13:30:54 +00:00
|
|
|
"sync"
|
2016-12-08 16:24:38 +00:00
|
|
|
"testing"
|
|
|
|
"time"
|
2016-12-09 19:54:46 +00:00
|
|
|
|
2017-09-01 12:09:37 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
|
2017-02-08 17:04:08 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/api/rest"
|
|
|
|
"github.com/ipfs/ipfs-cluster/consensus/raft"
|
2017-03-27 13:07:12 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/informer/disk"
|
2017-03-14 15:10:45 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
|
2017-03-10 16:24:25 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/monitor/basic"
|
2017-03-14 15:10:45 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
|
2017-03-10 16:24:25 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/state"
|
2017-02-09 15:29:17 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/state/mapstate"
|
|
|
|
"github.com/ipfs/ipfs-cluster/test"
|
2017-02-08 17:04:08 +00:00
|
|
|
|
2016-12-21 13:30:54 +00:00
|
|
|
cid "github.com/ipfs/go-cid"
|
|
|
|
crypto "github.com/libp2p/go-libp2p-crypto"
|
2016-12-16 11:40:28 +00:00
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
2017-01-23 17:38:59 +00:00
|
|
|
ma "github.com/multiformats/go-multiaddr"
|
2016-12-08 16:24:38 +00:00
|
|
|
)
|
|
|
|
|
2016-12-21 13:30:54 +00:00
|
|
|
//TestClusters*
|
|
|
|
var (
|
|
|
|
// number of clusters to create
|
2018-01-15 09:49:07 +00:00
|
|
|
nClusters = 6
|
2016-12-21 13:30:54 +00:00
|
|
|
|
|
|
|
// number of pins to pin/unpin/check
|
2018-01-15 09:49:07 +00:00
|
|
|
nPins = 500
|
|
|
|
|
|
|
|
logLevel = "CRITICAL"
|
2016-12-21 13:30:54 +00:00
|
|
|
|
|
|
|
// ports
|
2017-11-29 16:49:03 +00:00
|
|
|
clusterPort = 10000
|
|
|
|
apiPort = 10100
|
|
|
|
ipfsProxyPort = 10200
|
2016-12-21 13:30:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func init() {
|
2018-01-15 09:49:07 +00:00
|
|
|
flag.StringVar(&logLevel, "loglevel", logLevel, "default log level for tests")
|
|
|
|
flag.IntVar(&nClusters, "nclusters", nClusters, "number of clusters to use")
|
|
|
|
flag.IntVar(&nPins, "npins", nPins, "number of pins to pin/unpin/check")
|
|
|
|
flag.Parse()
|
|
|
|
|
2016-12-21 13:30:54 +00:00
|
|
|
rand.Seed(time.Now().UnixNano())
|
2018-01-15 09:49:07 +00:00
|
|
|
|
|
|
|
for f := range LoggingFacilities {
|
|
|
|
SetFacilityLogLevel(f, logLevel)
|
|
|
|
}
|
|
|
|
|
|
|
|
for f := range LoggingFacilitiesExtra {
|
|
|
|
SetFacilityLogLevel(f, logLevel)
|
|
|
|
}
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func checkErr(t *testing.T, err error) {
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func randomBytes() []byte {
|
|
|
|
bs := make([]byte, 64, 64)
|
|
|
|
for i := 0; i < len(bs); i++ {
|
|
|
|
b := byte(rand.Int())
|
|
|
|
bs[i] = b
|
|
|
|
}
|
|
|
|
return bs
|
|
|
|
}
|
|
|
|
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft.Config, API, IPFSConnector, state.State, PinTracker, PeerMonitor, PinAllocator, Informer, *test.IpfsMock) {
|
2017-02-09 15:29:17 +00:00
|
|
|
mock := test.NewIpfsMock()
|
2018-01-16 19:57:54 +00:00
|
|
|
//
|
|
|
|
//clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i))
|
|
|
|
// Bind on port 0
|
|
|
|
clusterAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
|
|
|
|
//apiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", apiPort+i))
|
|
|
|
// Bind on port 0
|
|
|
|
apiAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
|
|
|
|
// Bind on Port 0
|
|
|
|
// proxyAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsProxyPort+i))
|
|
|
|
proxyAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
|
2017-02-09 15:29:17 +00:00
|
|
|
nodeAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.Addr, mock.Port))
|
2017-01-30 12:12:25 +00:00
|
|
|
priv, pub, err := crypto.GenerateKeyPair(crypto.RSA, 2048)
|
|
|
|
checkErr(t, err)
|
|
|
|
pid, err := peer.IDFromPublicKey(pub)
|
|
|
|
checkErr(t, err)
|
2017-12-01 18:50:13 +00:00
|
|
|
peername := fmt.Sprintf("peer_%d", i)
|
2017-01-30 12:12:25 +00:00
|
|
|
|
2017-11-29 13:32:26 +00:00
|
|
|
clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg := testingConfigs()
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
|
|
|
|
clusterCfg.ID = pid
|
2017-12-01 18:50:13 +00:00
|
|
|
clusterCfg.Peername = peername
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
clusterCfg.PrivateKey = priv
|
|
|
|
clusterCfg.Secret = clusterSecret
|
|
|
|
clusterCfg.ListenAddr = clusterAddr
|
|
|
|
clusterCfg.LeaveOnShutdown = false
|
|
|
|
apiCfg.ListenAddr = apiAddr
|
|
|
|
ipfshttpCfg.ProxyAddr = proxyAddr
|
|
|
|
ipfshttpCfg.NodeAddr = nodeAddr
|
|
|
|
consensusCfg.DataFolder = "./e2eTestRaft/" + pid.Pretty()
|
|
|
|
|
|
|
|
api, err := rest.NewAPI(apiCfg)
|
2017-01-30 12:12:25 +00:00
|
|
|
checkErr(t, err)
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
ipfs, err := ipfshttp.NewConnector(ipfshttpCfg)
|
2017-01-30 12:12:25 +00:00
|
|
|
checkErr(t, err)
|
2017-02-09 15:29:17 +00:00
|
|
|
state := mapstate.NewMapState()
|
2017-11-29 13:32:26 +00:00
|
|
|
tracker := maptracker.NewMapPinTracker(trackerCfg, clusterCfg.ID)
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
mon, err := basic.NewMonitor(monCfg)
|
|
|
|
checkErr(t, err)
|
2017-09-01 12:09:37 +00:00
|
|
|
alloc := descendalloc.NewAllocator()
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
inf, err := disk.NewInformer(diskInfCfg)
|
|
|
|
checkErr(t, err)
|
2017-01-30 12:12:25 +00:00
|
|
|
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
return clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock
|
2017-01-30 12:12:25 +00:00
|
|
|
}
|
2016-12-21 13:30:54 +00:00
|
|
|
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
func createCluster(t *testing.T, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
|
|
|
|
cl, err := NewCluster(clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
|
2017-01-30 12:12:25 +00:00
|
|
|
checkErr(t, err)
|
|
|
|
<-cl.Ready()
|
|
|
|
return cl
|
|
|
|
}
|
|
|
|
|
2017-07-04 20:39:10 +00:00
|
|
|
func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster, *test.IpfsMock) {
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth, clusterSecret)
|
|
|
|
cl := createCluster(t, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
|
2017-01-30 12:12:25 +00:00
|
|
|
return cl, mock
|
|
|
|
}
|
2016-12-21 13:30:54 +00:00
|
|
|
|
2017-02-09 15:29:17 +00:00
|
|
|
func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
|
2017-01-30 12:12:25 +00:00
|
|
|
os.RemoveAll("./e2eTestRaft")
|
|
|
|
cfgs := make([]*Config, nClusters, nClusters)
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
concfgs := make([]*raft.Config, nClusters, nClusters)
|
2017-02-13 15:46:53 +00:00
|
|
|
apis := make([]API, nClusters, nClusters)
|
|
|
|
ipfss := make([]IPFSConnector, nClusters, nClusters)
|
2017-03-10 16:24:25 +00:00
|
|
|
states := make([]state.State, nClusters, nClusters)
|
2017-02-13 15:46:53 +00:00
|
|
|
trackers := make([]PinTracker, nClusters, nClusters)
|
|
|
|
mons := make([]PeerMonitor, nClusters, nClusters)
|
|
|
|
allocs := make([]PinAllocator, nClusters, nClusters)
|
|
|
|
infs := make([]Informer, nClusters, nClusters)
|
2017-02-09 15:29:17 +00:00
|
|
|
ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters)
|
2017-01-30 12:12:25 +00:00
|
|
|
clusters := make([]*Cluster, nClusters, nClusters)
|
|
|
|
|
|
|
|
clusterPeers := make([]ma.Multiaddr, nClusters, nClusters)
|
2016-12-21 13:30:54 +00:00
|
|
|
for i := 0; i < nClusters; i++ {
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, i, testingClusterSecret)
|
|
|
|
cfgs[i] = clusterCfg
|
|
|
|
concfgs[i] = consensusCfg
|
2017-01-30 12:12:25 +00:00
|
|
|
apis[i] = api
|
|
|
|
ipfss[i] = ipfs
|
|
|
|
states[i] = state
|
|
|
|
trackers[i] = tracker
|
2017-02-13 15:46:53 +00:00
|
|
|
mons[i] = mon
|
|
|
|
allocs[i] = alloc
|
|
|
|
infs[i] = inf
|
2017-01-30 12:12:25 +00:00
|
|
|
ipfsMocks[i] = mock
|
|
|
|
addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s",
|
2016-12-21 13:30:54 +00:00
|
|
|
clusterPort+i,
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
clusterCfg.ID.Pretty()))
|
2017-01-30 12:12:25 +00:00
|
|
|
clusterPeers[i] = addr
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
2017-02-02 22:52:06 +00:00
|
|
|
|
2018-01-16 19:57:54 +00:00
|
|
|
// ----------------------------------------------------------
|
2016-12-21 13:30:54 +00:00
|
|
|
|
2018-01-16 19:57:54 +00:00
|
|
|
// // Set up the cluster using ClusterPeers
|
|
|
|
// for i := 0; i < nClusters; i++ {
|
|
|
|
// cfgs[i].Peers = make([]ma.Multiaddr, nClusters, nClusters)
|
|
|
|
// for j := 0; j < nClusters; j++ {
|
|
|
|
// cfgs[i].Peers[j] = clusterPeers[j]
|
|
|
|
// }
|
|
|
|
// }
|
2017-02-02 22:52:06 +00:00
|
|
|
|
2018-01-16 19:57:54 +00:00
|
|
|
// var wg sync.WaitGroup
|
|
|
|
// for i := 0; i < nClusters; i++ {
|
|
|
|
// wg.Add(1)
|
|
|
|
// go func(i int) {
|
|
|
|
// clusters[i] = createCluster(t, cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i])
|
|
|
|
// wg.Done()
|
|
|
|
// }(i)
|
2017-02-02 22:52:06 +00:00
|
|
|
// }
|
2018-01-16 19:57:54 +00:00
|
|
|
// wg.Wait()
|
|
|
|
|
|
|
|
// ----------------------------------------------
|
2017-02-02 22:52:06 +00:00
|
|
|
|
2018-01-16 19:57:54 +00:00
|
|
|
// Alternative way of starting using bootstrap
|
|
|
|
// Start first node
|
|
|
|
clusters[0] = createCluster(t, cfgs[0], concfgs[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0])
|
|
|
|
// Find out where it binded
|
|
|
|
bootstrapAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", clusters[0].host.Addrs()[0], clusters[0].id.Pretty()))
|
|
|
|
// Use first node to bootstrap
|
|
|
|
for i := 1; i < nClusters; i++ {
|
|
|
|
cfgs[i].Bootstrap = []ma.Multiaddr{bootstrapAddr}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start the rest
|
2017-01-23 11:08:04 +00:00
|
|
|
var wg sync.WaitGroup
|
2018-01-16 19:57:54 +00:00
|
|
|
for i := 1; i < nClusters; i++ {
|
2017-01-23 11:08:04 +00:00
|
|
|
wg.Add(1)
|
|
|
|
go func(i int) {
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
clusters[i] = createCluster(t, cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i])
|
2017-01-23 11:08:04 +00:00
|
|
|
wg.Done()
|
|
|
|
}(i)
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
2017-01-23 11:08:04 +00:00
|
|
|
wg.Wait()
|
2017-02-02 22:52:06 +00:00
|
|
|
|
2018-01-16 19:57:54 +00:00
|
|
|
// ---------------------------------------------
|
|
|
|
|
2017-02-02 22:52:06 +00:00
|
|
|
// Yet an alternative way using PeerAdd
|
|
|
|
// for i := 1; i < nClusters; i++ {
|
|
|
|
// clusters[0].PeerAdd(clusterAddr(clusters[i]))
|
|
|
|
// }
|
|
|
|
delay()
|
2018-01-16 19:57:54 +00:00
|
|
|
delay()
|
2016-12-21 13:30:54 +00:00
|
|
|
return clusters, ipfsMocks
|
|
|
|
}
|
|
|
|
|
2017-02-09 15:29:17 +00:00
|
|
|
func shutdownClusters(t *testing.T, clusters []*Cluster, m []*test.IpfsMock) {
|
2016-12-21 13:30:54 +00:00
|
|
|
for i, c := range clusters {
|
|
|
|
err := c.Shutdown()
|
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
2017-11-29 16:49:03 +00:00
|
|
|
m[i].Close()
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
os.RemoveAll("./e2eTestRaft")
|
|
|
|
}
|
|
|
|
|
|
|
|
func runF(t *testing.T, clusters []*Cluster, f func(*testing.T, *Cluster)) {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for _, c := range clusters {
|
|
|
|
wg.Add(1)
|
2016-12-21 19:46:00 +00:00
|
|
|
go func(c *Cluster) {
|
2016-12-21 13:30:54 +00:00
|
|
|
defer wg.Done()
|
|
|
|
f(t, c)
|
2016-12-21 19:46:00 +00:00
|
|
|
}(c)
|
2016-12-21 13:30:54 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
func delay() {
|
2017-02-02 22:52:06 +00:00
|
|
|
var d int
|
|
|
|
if nClusters > 10 {
|
|
|
|
d = 8
|
|
|
|
|
|
|
|
} else if nClusters > 5 {
|
|
|
|
d = 5
|
|
|
|
} else {
|
|
|
|
d = nClusters
|
|
|
|
}
|
|
|
|
time.Sleep(time.Duration(d) * time.Second)
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
|
2017-03-09 13:44:14 +00:00
|
|
|
func waitForLeader(t *testing.T, clusters []*Cluster) {
|
|
|
|
timer := time.NewTimer(time.Minute)
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
|
|
// Wait for consensus to pick a new leader in case we shut it down
|
|
|
|
|
|
|
|
// Make sure we don't check on a shutdown cluster
|
|
|
|
j := rand.Intn(len(clusters))
|
2017-10-31 10:20:14 +00:00
|
|
|
for clusters[j].shutdownB {
|
2017-03-09 13:44:14 +00:00
|
|
|
j = rand.Intn(len(clusters))
|
|
|
|
}
|
|
|
|
|
|
|
|
loop:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-timer.C:
|
|
|
|
t.Fatal("timed out waiting for a leader")
|
|
|
|
case <-ticker.C:
|
|
|
|
_, err := clusters[j].consensus.Leader()
|
|
|
|
if err == nil {
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-21 13:30:54 +00:00
|
|
|
func TestClustersVersion(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
v := c.Version()
|
|
|
|
if v != Version {
|
|
|
|
t.Error("Bad version")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
func TestClustersPeers(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
delay()
|
|
|
|
|
|
|
|
j := rand.Intn(nClusters) // choose a random cluster peer
|
|
|
|
peers := clusters[j].Peers()
|
2018-01-16 19:57:54 +00:00
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
if len(peers) != nClusters {
|
|
|
|
t.Fatal("expected as many peers as clusters")
|
|
|
|
}
|
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
clusterIDMap := make(map[peer.ID]api.ID)
|
|
|
|
peerIDMap := make(map[peer.ID]api.ID)
|
2017-01-26 18:59:31 +00:00
|
|
|
|
|
|
|
for _, c := range clusters {
|
|
|
|
id := c.ID()
|
|
|
|
clusterIDMap[id.ID] = id
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, p := range peers {
|
|
|
|
peerIDMap[p.ID] = p
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, id := range clusterIDMap {
|
|
|
|
id2, ok := peerIDMap[k]
|
|
|
|
if !ok {
|
|
|
|
t.Fatal("expected id in both maps")
|
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
//if !crypto.KeyEqual(id.PublicKey, id2.PublicKey) {
|
|
|
|
// t.Error("expected same public key")
|
|
|
|
//}
|
2017-01-26 18:59:31 +00:00
|
|
|
if id.IPFS.ID != id2.IPFS.ID {
|
|
|
|
t.Error("expected same ipfs daemon ID")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-21 13:30:54 +00:00
|
|
|
func TestClustersPin(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
exampleCid, _ := cid.Decode(test.TestCid1)
|
2016-12-21 13:30:54 +00:00
|
|
|
prefix := exampleCid.Prefix()
|
|
|
|
for i := 0; i < nPins; i++ {
|
2017-01-26 18:59:31 +00:00
|
|
|
j := rand.Intn(nClusters) // choose a random cluster peer
|
2016-12-21 13:30:54 +00:00
|
|
|
h, err := prefix.Sum(randomBytes()) // create random cid
|
|
|
|
checkErr(t, err)
|
2017-03-08 17:28:43 +00:00
|
|
|
err = clusters[j].Pin(api.PinCid(h))
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Errorf("error pinning %s: %s", h, err)
|
|
|
|
}
|
|
|
|
// Test re-pin
|
2017-03-08 17:28:43 +00:00
|
|
|
err = clusters[j].Pin(api.PinCid(h))
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Errorf("error repinning %s: %s", h, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
delay()
|
|
|
|
fpinned := func(t *testing.T, c *Cluster) {
|
2017-01-25 18:38:23 +00:00
|
|
|
status := c.tracker.StatusAll()
|
2016-12-21 13:30:54 +00:00
|
|
|
for _, v := range status {
|
2017-02-08 17:04:08 +00:00
|
|
|
if v.Status != api.TrackerStatusPinned {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Errorf("%s should have been pinned but it is %s",
|
2017-02-08 17:04:08 +00:00
|
|
|
v.Cid,
|
2017-01-25 17:07:19 +00:00
|
|
|
v.Status.String())
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if l := len(status); l != nPins {
|
|
|
|
t.Errorf("Pinned %d out of %d requests", l, nPins)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
runF(t, clusters, fpinned)
|
|
|
|
|
|
|
|
// Unpin everything
|
|
|
|
pinList := clusters[0].Pins()
|
|
|
|
|
|
|
|
for i := 0; i < nPins; i++ {
|
2017-01-26 18:59:31 +00:00
|
|
|
j := rand.Intn(nClusters) // choose a random cluster peer
|
2017-02-13 15:46:53 +00:00
|
|
|
err := clusters[j].Unpin(pinList[i].Cid)
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
2017-02-15 14:16:34 +00:00
|
|
|
t.Errorf("error unpinning %s: %s", pinList[i].Cid, err)
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
// test re-unpin
|
2017-02-13 15:46:53 +00:00
|
|
|
err = clusters[j].Unpin(pinList[i].Cid)
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
2017-02-15 14:16:34 +00:00
|
|
|
t.Errorf("error re-unpinning %s: %s", pinList[i].Cid, err)
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
delay()
|
|
|
|
|
|
|
|
funpinned := func(t *testing.T, c *Cluster) {
|
2017-01-25 18:38:23 +00:00
|
|
|
status := c.tracker.StatusAll()
|
2016-12-21 13:30:54 +00:00
|
|
|
if l := len(status); l != 0 {
|
|
|
|
t.Errorf("Nothing should be pinned")
|
|
|
|
//t.Errorf("%+v", status)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
runF(t, clusters, funpinned)
|
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
func TestClustersStatusAll(t *testing.T) {
|
2016-12-21 13:30:54 +00:00
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
2016-12-21 13:30:54 +00:00
|
|
|
delay()
|
|
|
|
// Global status
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
2017-01-25 18:38:23 +00:00
|
|
|
statuses, err := c.StatusAll()
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
2017-06-21 12:16:28 +00:00
|
|
|
if len(statuses) != 1 {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Fatal("bad status. Expected one item")
|
|
|
|
}
|
2017-02-09 15:29:17 +00:00
|
|
|
if statuses[0].Cid.String() != test.TestCid1 {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Error("bad cid in status")
|
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
info := statuses[0].PeerMap
|
2016-12-21 13:30:54 +00:00
|
|
|
if len(info) != nClusters {
|
|
|
|
t.Error("bad info in status")
|
|
|
|
}
|
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
if info[c.host.ID()].Status != api.TrackerStatusPinned {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Error("the hash should have been pinned")
|
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
status, err := c.Status(h)
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
pinfo, ok := status.PeerMap[c.host.ID()]
|
2016-12-21 13:30:54 +00:00
|
|
|
if !ok {
|
|
|
|
t.Fatal("Host not in status")
|
|
|
|
}
|
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
if pinfo.Status != api.TrackerStatusPinned {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Error("the status should show the hash as pinned")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
2017-06-21 12:16:28 +00:00
|
|
|
func TestClustersStatusAllWithErrors(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
clusters[0].Pin(api.PinCid(h))
|
|
|
|
delay()
|
|
|
|
|
|
|
|
// shutdown 1 cluster peer
|
|
|
|
clusters[1].Shutdown()
|
|
|
|
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
// skip if it's the shutdown peer
|
|
|
|
if c.ID().ID == clusters[1].ID().ID {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
statuses, err := c.StatusAll()
|
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
|
|
|
if len(statuses) != 1 {
|
|
|
|
t.Fatal("bad status. Expected one item")
|
|
|
|
}
|
|
|
|
|
|
|
|
stts := statuses[0]
|
|
|
|
if len(stts.PeerMap) != nClusters {
|
|
|
|
t.Error("bad number of peers in status")
|
|
|
|
}
|
|
|
|
|
|
|
|
errst := stts.PeerMap[clusters[1].ID().ID]
|
|
|
|
|
|
|
|
if errst.Cid.String() != test.TestCid1 {
|
|
|
|
t.Error("errored pinInfo should have a good cid")
|
|
|
|
}
|
|
|
|
|
|
|
|
if errst.Status != api.TrackerStatusClusterError {
|
|
|
|
t.Error("erroring status should be set to ClusterError")
|
|
|
|
}
|
|
|
|
|
|
|
|
// now check with Cid status
|
|
|
|
status, err := c.Status(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
pinfo := status.PeerMap[clusters[1].ID().ID]
|
|
|
|
|
|
|
|
if pinfo.Status != api.TrackerStatusClusterError {
|
|
|
|
t.Error("erroring status should be ClusterError")
|
|
|
|
}
|
|
|
|
|
|
|
|
if pinfo.Cid.String() != test.TestCid1 {
|
|
|
|
t.Error("errored status should have a good cid")
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
func TestClustersSyncAllLocal(t *testing.T) {
|
2016-12-21 13:30:54 +00:00
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
|
|
|
|
h2, _ := cid.Decode(test.TestCid2)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
|
|
|
clusters[0].Pin(api.PinCid(h2))
|
2016-12-21 13:30:54 +00:00
|
|
|
delay()
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
// Sync bad ID
|
2017-01-25 18:38:23 +00:00
|
|
|
infos, err := c.SyncAllLocal()
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
// LocalSync() is asynchronous and should not show an
|
|
|
|
// error even if Recover() fails.
|
|
|
|
t.Error(err)
|
|
|
|
}
|
|
|
|
if len(infos) != 1 {
|
|
|
|
t.Fatal("expected 1 elem slice")
|
|
|
|
}
|
|
|
|
// Last-known state may still be pinning
|
2017-02-08 17:04:08 +00:00
|
|
|
if infos[0].Status != api.TrackerStatusPinError && infos[0].Status != api.TrackerStatusPinning {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Error("element should be in Pinning or PinError state")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Test Local syncs
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
func TestClustersSyncLocal(t *testing.T) {
|
2016-12-21 13:30:54 +00:00
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
|
|
|
|
h2, _ := cid.Decode(test.TestCid2)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
|
|
|
clusters[0].Pin(api.PinCid(h2))
|
2016-12-21 13:30:54 +00:00
|
|
|
delay()
|
|
|
|
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
2017-01-25 18:38:23 +00:00
|
|
|
info, err := c.SyncLocal(h)
|
2017-01-25 17:07:19 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
if info.Status != api.TrackerStatusPinError && info.Status != api.TrackerStatusPinning {
|
2017-01-25 17:07:19 +00:00
|
|
|
t.Errorf("element is %s and not PinError", info.Status)
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Sync good ID
|
2017-01-25 18:38:23 +00:00
|
|
|
info, err = c.SyncLocal(h2)
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
if info.Status != api.TrackerStatusPinned {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Error("element should be in Pinned state")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Test Local syncs
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
func TestClustersSyncAll(t *testing.T) {
|
2016-12-21 13:30:54 +00:00
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
|
|
|
|
h2, _ := cid.Decode(test.TestCid2)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
|
|
|
clusters[0].Pin(api.PinCid(h2))
|
2016-12-21 13:30:54 +00:00
|
|
|
delay()
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
j := rand.Intn(nClusters) // choose a random cluster peer
|
2017-01-25 18:38:23 +00:00
|
|
|
ginfos, err := clusters[j].SyncAll()
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
if len(ginfos) != 1 {
|
|
|
|
t.Fatal("expected globalsync to have 1 elements")
|
|
|
|
}
|
2017-02-09 15:29:17 +00:00
|
|
|
if ginfos[0].Cid.String() != test.ErrorCid {
|
|
|
|
t.Error("expected globalsync to have problems with test.ErrorCid")
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
for _, c := range clusters {
|
2017-01-25 17:07:19 +00:00
|
|
|
inf, ok := ginfos[0].PeerMap[c.host.ID()]
|
2016-12-21 13:30:54 +00:00
|
|
|
if !ok {
|
|
|
|
t.Fatal("GlobalPinInfo should have this cluster")
|
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning {
|
2017-01-26 18:59:31 +00:00
|
|
|
t.Error("should be PinError in all peers")
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
func TestClustersSync(t *testing.T) {
|
2016-12-21 13:30:54 +00:00
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
|
|
|
|
h2, _ := cid.Decode(test.TestCid2)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
|
|
|
clusters[0].Pin(api.PinCid(h2))
|
2016-12-21 13:30:54 +00:00
|
|
|
delay()
|
|
|
|
|
|
|
|
j := rand.Intn(nClusters)
|
2017-01-25 18:38:23 +00:00
|
|
|
ginfo, err := clusters[j].Sync(h)
|
2017-01-23 23:52:42 +00:00
|
|
|
if err != nil {
|
|
|
|
// we always attempt to return a valid response
|
|
|
|
// with errors contained in GlobalPinInfo
|
|
|
|
t.Fatal("did not expect an error")
|
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
pinfo, ok := ginfo.PeerMap[clusters[j].host.ID()]
|
2017-01-23 23:52:42 +00:00
|
|
|
if !ok {
|
|
|
|
t.Fatal("should have info for this host")
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
2017-01-23 23:52:42 +00:00
|
|
|
if pinfo.Error == "" {
|
|
|
|
t.Error("pinInfo error should not be empty")
|
|
|
|
}
|
|
|
|
|
2017-02-09 15:29:17 +00:00
|
|
|
if ginfo.Cid.String() != test.ErrorCid {
|
|
|
|
t.Error("GlobalPinInfo should be for test.ErrorCid")
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, c := range clusters {
|
2017-01-25 17:07:19 +00:00
|
|
|
inf, ok := ginfo.PeerMap[c.host.ID()]
|
2016-12-21 13:30:54 +00:00
|
|
|
if !ok {
|
2016-12-23 18:35:37 +00:00
|
|
|
t.Logf("%+v", ginfo)
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Fatal("GlobalPinInfo should not be empty for this host")
|
|
|
|
}
|
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
if inf.Status != api.TrackerStatusPinError && inf.Status != api.TrackerStatusPinning {
|
2017-01-26 18:59:31 +00:00
|
|
|
t.Error("should be PinError or Pinning in all peers")
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Test with a good Cid
|
|
|
|
j = rand.Intn(nClusters)
|
2017-01-25 18:38:23 +00:00
|
|
|
ginfo, err = clusters[j].Sync(h2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
2017-02-09 15:29:17 +00:00
|
|
|
if ginfo.Cid.String() != test.TestCid2 {
|
2017-01-25 18:38:23 +00:00
|
|
|
t.Error("GlobalPinInfo should be for testrCid2")
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, c := range clusters {
|
|
|
|
inf, ok := ginfo.PeerMap[c.host.ID()]
|
|
|
|
if !ok {
|
|
|
|
t.Fatal("GlobalPinInfo should have this cluster")
|
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
if inf.Status != api.TrackerStatusPinned {
|
2017-01-26 18:59:31 +00:00
|
|
|
t.Error("the GlobalPinInfo should show Pinned in all peers")
|
2017-01-25 18:38:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestClustersRecoverLocal(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
|
|
|
|
h2, _ := cid.Decode(test.TestCid2)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
|
|
|
clusters[0].Pin(api.PinCid(h2))
|
2017-01-25 18:38:23 +00:00
|
|
|
|
|
|
|
delay()
|
|
|
|
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
info, err := c.RecoverLocal(h)
|
|
|
|
if err == nil {
|
|
|
|
t.Error("expected an error recovering")
|
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
if info.Status != api.TrackerStatusPinError {
|
2017-01-25 18:38:23 +00:00
|
|
|
t.Errorf("element is %s and not PinError", info.Status)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Recover good ID
|
|
|
|
info, err = c.SyncLocal(h2)
|
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
if info.Status != api.TrackerStatusPinned {
|
2017-01-25 18:38:23 +00:00
|
|
|
t.Error("element should be in Pinned state")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Test Local syncs
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestClustersRecover(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
2017-02-09 15:29:17 +00:00
|
|
|
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
|
|
|
|
h2, _ := cid.Decode(test.TestCid2)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
|
|
|
clusters[0].Pin(api.PinCid(h2))
|
2017-01-25 18:38:23 +00:00
|
|
|
|
|
|
|
delay()
|
|
|
|
|
|
|
|
j := rand.Intn(nClusters)
|
|
|
|
ginfo, err := clusters[j].Recover(h)
|
|
|
|
if err != nil {
|
|
|
|
// we always attempt to return a valid response
|
|
|
|
// with errors contained in GlobalPinInfo
|
|
|
|
t.Fatal("did not expect an error")
|
|
|
|
}
|
|
|
|
pinfo, ok := ginfo.PeerMap[clusters[j].host.ID()]
|
|
|
|
if !ok {
|
|
|
|
t.Fatal("should have info for this host")
|
|
|
|
}
|
|
|
|
if pinfo.Error == "" {
|
|
|
|
t.Error("pinInfo error should not be empty")
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, c := range clusters {
|
|
|
|
inf, ok := ginfo.PeerMap[c.host.ID()]
|
|
|
|
if !ok {
|
|
|
|
t.Fatal("GlobalPinInfo should not be empty for this host")
|
|
|
|
}
|
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
if inf.Status != api.TrackerStatusPinError {
|
|
|
|
t.Logf("%+v", inf)
|
2017-01-26 18:59:31 +00:00
|
|
|
t.Error("should be PinError in all peers")
|
2017-01-25 18:38:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Test with a good Cid
|
|
|
|
j = rand.Intn(nClusters)
|
|
|
|
ginfo, err = clusters[j].Recover(h2)
|
2016-12-21 13:30:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
2017-02-09 15:29:17 +00:00
|
|
|
if ginfo.Cid.String() != test.TestCid2 {
|
2016-12-21 13:30:54 +00:00
|
|
|
t.Error("GlobalPinInfo should be for testrCid2")
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, c := range clusters {
|
2017-01-25 17:07:19 +00:00
|
|
|
inf, ok := ginfo.PeerMap[c.host.ID()]
|
2016-12-21 13:30:54 +00:00
|
|
|
if !ok {
|
|
|
|
t.Fatal("GlobalPinInfo should have this cluster")
|
|
|
|
}
|
2017-02-08 17:04:08 +00:00
|
|
|
if inf.Status != api.TrackerStatusPinned {
|
2017-01-26 18:59:31 +00:00
|
|
|
t.Error("the GlobalPinInfo should show Pinned in all peers")
|
2016-12-21 13:30:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-01-30 12:12:25 +00:00
|
|
|
|
|
|
|
func TestClustersShutdown(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
err := c.Shutdown()
|
|
|
|
if err != nil {
|
|
|
|
t.Error("should be able to shutdown cleanly")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Shutdown 3 times
|
|
|
|
runF(t, clusters, f)
|
|
|
|
runF(t, clusters, f)
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
2017-02-13 15:46:53 +00:00
|
|
|
|
|
|
|
func TestClustersReplication(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
2018-01-12 17:04:46 +00:00
|
|
|
c.config.ReplicationFactorMin = nClusters - 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters - 1
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Why is replication factor nClusters - 1?
|
|
|
|
// Because that way we know that pinning nCluster
|
2017-03-27 13:07:12 +00:00
|
|
|
// pins with an strategy like numpins/disk
|
2017-02-13 15:46:53 +00:00
|
|
|
// will result in each peer holding locally exactly
|
|
|
|
// nCluster pins.
|
|
|
|
|
|
|
|
tmpCid, _ := cid.Decode(test.TestCid1)
|
|
|
|
prefix := tmpCid.Prefix()
|
|
|
|
|
|
|
|
for i := 0; i < nClusters; i++ {
|
|
|
|
// Pick a random cluster and hash
|
|
|
|
j := rand.Intn(nClusters) // choose a random cluster peer
|
|
|
|
h, err := prefix.Sum(randomBytes()) // create random cid
|
|
|
|
checkErr(t, err)
|
2017-03-08 17:28:43 +00:00
|
|
|
err = clusters[j].Pin(api.PinCid(h))
|
2017-02-13 15:46:53 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Error(err)
|
|
|
|
}
|
2017-03-23 18:34:33 +00:00
|
|
|
time.Sleep(time.Second)
|
2017-02-13 15:46:53 +00:00
|
|
|
|
|
|
|
// check that it is held by exactly nClusters -1 peers
|
|
|
|
gpi, err := clusters[j].Status(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
numLocal := 0
|
|
|
|
numRemote := 0
|
|
|
|
for _, v := range gpi.PeerMap {
|
|
|
|
if v.Status == api.TrackerStatusPinned {
|
|
|
|
numLocal++
|
|
|
|
} else if v.Status == api.TrackerStatusRemote {
|
|
|
|
numRemote++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if numLocal != nClusters-1 {
|
|
|
|
t.Errorf("We wanted replication %d but it's only %d",
|
|
|
|
nClusters-1, numLocal)
|
|
|
|
}
|
|
|
|
|
|
|
|
if numRemote != 1 {
|
|
|
|
t.Errorf("We wanted 1 peer track as remote but %d do", numRemote)
|
|
|
|
}
|
2017-11-29 16:49:03 +00:00
|
|
|
time.Sleep(time.Second) // this is for metric to be up to date
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
pinfos := c.tracker.StatusAll()
|
|
|
|
if len(pinfos) != nClusters {
|
|
|
|
t.Error("Pinfos does not have the expected pins")
|
|
|
|
}
|
|
|
|
numRemote := 0
|
|
|
|
numLocal := 0
|
|
|
|
for _, pi := range pinfos {
|
|
|
|
switch pi.Status {
|
|
|
|
case api.TrackerStatusPinned:
|
|
|
|
numLocal++
|
|
|
|
|
|
|
|
case api.TrackerStatusRemote:
|
|
|
|
numRemote++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if numLocal != nClusters-1 {
|
|
|
|
t.Errorf("Expected %d local pins but got %d", nClusters-1, numLocal)
|
|
|
|
}
|
|
|
|
|
|
|
|
if numRemote != 1 {
|
|
|
|
t.Errorf("Expected 1 remote pin but got %d", numRemote)
|
|
|
|
}
|
|
|
|
|
|
|
|
pins := c.Pins()
|
|
|
|
for _, pin := range pins {
|
|
|
|
allocs := pin.Allocations
|
|
|
|
if len(allocs) != nClusters-1 {
|
|
|
|
t.Errorf("Allocations are [%s]", allocs)
|
|
|
|
}
|
|
|
|
for _, a := range allocs {
|
|
|
|
if a == c.id {
|
|
|
|
pinfo := c.tracker.Status(pin.Cid)
|
|
|
|
if pinfo.Status != api.TrackerStatusPinned {
|
|
|
|
t.Errorf("Peer %s was allocated but it is not pinning cid", c.id)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
2018-01-15 18:32:19 +00:00
|
|
|
// This test checks that we pin with ReplicationFactorMax when
|
|
|
|
// we can
|
|
|
|
func TestClustersReplicationFactorMax(t *testing.T) {
|
|
|
|
if nClusters < 3 {
|
|
|
|
t.Skip("Need at least 3 peers")
|
|
|
|
}
|
|
|
|
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
|
|
|
c.config.ReplicationFactorMin = 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters - 1
|
|
|
|
}
|
|
|
|
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
err := clusters[0].Pin(api.PinCid(h))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
delay()
|
|
|
|
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
p, err := c.PinGet(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(p.Allocations) != nClusters-1 {
|
|
|
|
t.Error("should have pinned nClusters - 1 allocations")
|
|
|
|
}
|
|
|
|
|
|
|
|
if p.ReplicationFactorMin != 1 {
|
|
|
|
t.Error("rplMin should be 1")
|
|
|
|
}
|
|
|
|
|
|
|
|
if p.ReplicationFactorMax != nClusters-1 {
|
2018-01-16 10:19:39 +00:00
|
|
|
t.Error("rplMax should be nClusters-1")
|
2018-01-15 18:32:19 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
2018-01-16 10:19:39 +00:00
|
|
|
// This tests checks that repinning something that is overpinned
|
|
|
|
// removes some allocations
|
|
|
|
func TestClustersReplicationFactorMaxLower(t *testing.T) {
|
|
|
|
if nClusters < 5 {
|
|
|
|
t.Skip("Need at least 5 peers")
|
|
|
|
}
|
|
|
|
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
|
|
|
c.config.ReplicationFactorMin = 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters
|
|
|
|
}
|
|
|
|
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
err := clusters[0].Pin(api.PinCid(h))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
delay()
|
|
|
|
|
|
|
|
p1, err := clusters[0].PinGet(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(p1.Allocations) != nClusters {
|
|
|
|
t.Fatal("allocations should be nClusters")
|
|
|
|
}
|
|
|
|
|
|
|
|
err = clusters[0].Pin(api.Pin{
|
|
|
|
Cid: h,
|
|
|
|
ReplicationFactorMax: 2,
|
|
|
|
ReplicationFactorMin: 1,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
delay()
|
|
|
|
|
|
|
|
p2, err := clusters[0].PinGet(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(p2.Allocations) != 2 {
|
|
|
|
t.Fatal("allocations should have been reduced to 2")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-01-15 18:32:19 +00:00
|
|
|
// This test checks that when not all nodes are available,
|
|
|
|
// we pin in as many as we can aiming for ReplicationFactorMax
|
|
|
|
func TestClustersReplicationFactorInBetween(t *testing.T) {
|
|
|
|
if nClusters < 5 {
|
|
|
|
t.Skip("Need at least 5 peers")
|
|
|
|
}
|
|
|
|
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
|
|
|
c.config.ReplicationFactorMin = 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown two peers
|
|
|
|
clusters[nClusters-1].Shutdown()
|
|
|
|
clusters[nClusters-2].Shutdown()
|
|
|
|
|
|
|
|
time.Sleep(time.Second) // let metric expire
|
|
|
|
|
|
|
|
waitForLeader(t, clusters)
|
|
|
|
|
|
|
|
// allow metrics to arrive to new leader
|
|
|
|
delay()
|
|
|
|
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
err := clusters[0].Pin(api.PinCid(h))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
delay()
|
|
|
|
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
|
|
if c == clusters[nClusters-1] || c == clusters[nClusters-2] {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
p, err := c.PinGet(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(p.Allocations) != nClusters-2 {
|
2018-01-16 10:19:39 +00:00
|
|
|
t.Error("should have pinned nClusters-2 allocations")
|
2018-01-15 18:32:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if p.ReplicationFactorMin != 1 {
|
|
|
|
t.Error("rplMin should be 1")
|
|
|
|
}
|
|
|
|
|
|
|
|
if p.ReplicationFactorMax != nClusters {
|
|
|
|
t.Error("rplMax should be nClusters")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
|
|
}
|
|
|
|
|
|
|
|
// This test checks that we do not pin something for which
|
|
|
|
// we cannot reach ReplicationFactorMin
|
|
|
|
func TestClustersReplicationFactorMin(t *testing.T) {
|
|
|
|
if nClusters < 5 {
|
|
|
|
t.Skip("Need at least 5 peers")
|
|
|
|
}
|
|
|
|
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
|
|
|
c.config.ReplicationFactorMin = nClusters - 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown two peers
|
|
|
|
clusters[nClusters-1].Shutdown()
|
|
|
|
clusters[nClusters-2].Shutdown()
|
|
|
|
|
|
|
|
time.Sleep(time.Second) // let metric expire
|
|
|
|
|
|
|
|
waitForLeader(t, clusters)
|
|
|
|
|
|
|
|
// allow metrics to arrive to new leader
|
|
|
|
delay()
|
|
|
|
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
err := clusters[0].Pin(api.PinCid(h))
|
|
|
|
if err == nil {
|
|
|
|
t.Error("Pin should have failed as rplMin cannot be satisfied")
|
|
|
|
}
|
|
|
|
t.Log(err)
|
|
|
|
if !strings.Contains(err.Error(), fmt.Sprintf("not enough peers to allocate CID")) {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// This tests checks that repinning something that has becomed
|
|
|
|
// underpinned actually changes nothing if it's sufficiently pinned
|
|
|
|
func TestClustersReplicationMinMaxNoRealloc(t *testing.T) {
|
|
|
|
if nClusters < 5 {
|
|
|
|
t.Skip("Need at least 5 peers")
|
|
|
|
}
|
|
|
|
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
|
|
|
c.config.ReplicationFactorMin = 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters
|
|
|
|
}
|
|
|
|
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
err := clusters[0].Pin(api.PinCid(h))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown two peers
|
|
|
|
clusters[nClusters-1].Shutdown()
|
|
|
|
clusters[nClusters-2].Shutdown()
|
|
|
|
|
|
|
|
time.Sleep(time.Second) // let metric expire
|
|
|
|
|
|
|
|
waitForLeader(t, clusters)
|
|
|
|
|
|
|
|
// allow metrics to arrive to new leader
|
|
|
|
delay()
|
|
|
|
|
|
|
|
err = clusters[0].Pin(api.PinCid(h))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
p, err := clusters[0].PinGet(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(p.Allocations) != nClusters {
|
|
|
|
t.Error("allocations should still be nCluster even if not all available")
|
|
|
|
}
|
|
|
|
|
|
|
|
if p.ReplicationFactorMax != nClusters {
|
|
|
|
t.Error("rplMax should have not changed")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-01-16 10:19:39 +00:00
|
|
|
// This test checks that repinning something that has becomed
|
2018-01-15 18:32:19 +00:00
|
|
|
// underpinned does re-allocations when it's not sufficiently
|
|
|
|
// pinned anymore
|
|
|
|
func TestClustersReplicationMinMaxRealloc(t *testing.T) {
|
|
|
|
if nClusters < 5 {
|
|
|
|
t.Skip("Need at least 5 peers")
|
|
|
|
}
|
|
|
|
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
|
|
|
c.config.ReplicationFactorMin = 3
|
|
|
|
c.config.ReplicationFactorMax = 4
|
|
|
|
}
|
|
|
|
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
err := clusters[0].Pin(api.PinCid(h))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
delay()
|
|
|
|
|
|
|
|
p, err := clusters[0].PinGet(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
firstAllocations := p.Allocations
|
|
|
|
|
|
|
|
peerIDMap := make(map[peer.ID]*Cluster)
|
|
|
|
for _, a := range clusters {
|
|
|
|
peerIDMap[a.id] = a
|
|
|
|
}
|
|
|
|
|
|
|
|
// kill two of the allocations
|
|
|
|
alloc1 := peerIDMap[firstAllocations[0]]
|
|
|
|
alloc2 := peerIDMap[firstAllocations[1]]
|
|
|
|
safePeer := peerIDMap[firstAllocations[2]]
|
|
|
|
|
|
|
|
alloc1.Shutdown()
|
|
|
|
alloc2.Shutdown()
|
|
|
|
|
|
|
|
time.Sleep(time.Second) // let metric expire
|
|
|
|
|
|
|
|
waitForLeader(t, clusters)
|
|
|
|
|
|
|
|
// allow metrics to arrive to new leader
|
|
|
|
delay()
|
|
|
|
|
|
|
|
// Repin - (although this might have been taken of if there was an alert
|
|
|
|
err = safePeer.Pin(api.PinCid(h))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
p, err = safePeer.PinGet(h)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
secondAllocations := p.Allocations
|
|
|
|
|
|
|
|
strings1 := api.PeersToStrings(firstAllocations)
|
|
|
|
strings2 := api.PeersToStrings(secondAllocations)
|
|
|
|
sort.Strings(strings1)
|
|
|
|
sort.Strings(strings2)
|
|
|
|
t.Logf("Allocs1: %s", strings1)
|
|
|
|
t.Logf("Allocs2: %s", strings2)
|
|
|
|
|
|
|
|
if fmt.Sprintf("%s", strings1) == fmt.Sprintf("%s", strings2) {
|
|
|
|
t.Error("allocations should have changed")
|
|
|
|
}
|
|
|
|
|
2018-01-16 10:19:39 +00:00
|
|
|
lenSA := len(secondAllocations)
|
|
|
|
expected := minInt(nClusters-2, 4)
|
|
|
|
if lenSA != expected {
|
|
|
|
t.Errorf("Inssufficient reallocation, could have allocated to %d peers but instead only allocated to %d peers", expected, lenSA)
|
|
|
|
}
|
|
|
|
|
2018-01-19 12:04:06 +00:00
|
|
|
if lenSA < 3 {
|
2018-01-15 18:32:19 +00:00
|
|
|
t.Error("allocations should be more than rplMin")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-02-13 15:46:53 +00:00
|
|
|
// In this test we check that repinning something
|
|
|
|
// when a node has gone down will re-assign the pin
|
|
|
|
func TestClustersReplicationRealloc(t *testing.T) {
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
2018-01-12 17:04:46 +00:00
|
|
|
c.config.ReplicationFactorMin = nClusters - 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters - 1
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
j := rand.Intn(nClusters)
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
2017-03-08 17:28:43 +00:00
|
|
|
err := clusters[j].Pin(api.PinCid(h))
|
2017-02-13 15:46:53 +00:00
|
|
|
if err != nil {
|
2017-03-09 13:44:14 +00:00
|
|
|
t.Fatal(err)
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Let the pin arrive
|
2017-07-21 21:45:22 +00:00
|
|
|
time.Sleep(time.Second / 2)
|
2017-02-13 15:46:53 +00:00
|
|
|
|
2017-03-09 13:44:14 +00:00
|
|
|
pin := clusters[j].Pins()[0]
|
|
|
|
pinSerial := pin.ToSerial()
|
|
|
|
allocs := sort.StringSlice(pinSerial.Allocations)
|
|
|
|
allocs.Sort()
|
|
|
|
allocsStr := fmt.Sprintf("%s", allocs)
|
|
|
|
|
|
|
|
// Re-pin should work and be allocated to the same
|
|
|
|
// nodes
|
2017-03-08 17:28:43 +00:00
|
|
|
err = clusters[j].Pin(api.PinCid(h))
|
2017-03-09 13:44:14 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
2017-03-09 13:44:14 +00:00
|
|
|
|
2017-07-21 21:45:22 +00:00
|
|
|
time.Sleep(time.Second / 2)
|
2017-03-09 13:44:14 +00:00
|
|
|
|
|
|
|
pin2 := clusters[j].Pins()[0]
|
|
|
|
pinSerial2 := pin2.ToSerial()
|
|
|
|
allocs2 := sort.StringSlice(pinSerial2.Allocations)
|
|
|
|
allocs2.Sort()
|
|
|
|
allocsStr2 := fmt.Sprintf("%s", allocs2)
|
|
|
|
if allocsStr != allocsStr2 {
|
|
|
|
t.Fatal("allocations changed without reason")
|
|
|
|
}
|
|
|
|
//t.Log(allocsStr)
|
|
|
|
//t.Log(allocsStr2)
|
2017-02-13 15:46:53 +00:00
|
|
|
|
|
|
|
var killedClusterIndex int
|
|
|
|
// find someone that pinned it and kill that cluster
|
|
|
|
for i, c := range clusters {
|
|
|
|
pinfo := c.tracker.Status(h)
|
|
|
|
if pinfo.Status == api.TrackerStatusPinned {
|
2017-03-09 13:44:14 +00:00
|
|
|
//t.Logf("Killing %s", c.id.Pretty())
|
2017-02-13 15:46:53 +00:00
|
|
|
killedClusterIndex = i
|
2017-07-20 11:18:46 +00:00
|
|
|
t.Logf("Shutting down %s", c.ID().ID)
|
2017-02-13 15:46:53 +00:00
|
|
|
c.Shutdown()
|
2017-03-09 13:44:14 +00:00
|
|
|
break
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-07-21 08:34:20 +00:00
|
|
|
// let metrics expire and give time for the cluster to
|
|
|
|
// see if they have lost the leader
|
|
|
|
time.Sleep(4 * time.Second)
|
2017-03-09 13:44:14 +00:00
|
|
|
waitForLeader(t, clusters)
|
2017-07-20 11:18:46 +00:00
|
|
|
// wait for new metrics to arrive
|
2017-07-21 21:45:22 +00:00
|
|
|
time.Sleep(2 * time.Second)
|
2017-03-09 13:44:14 +00:00
|
|
|
|
|
|
|
// Make sure we haven't killed our randomly
|
|
|
|
// selected cluster
|
|
|
|
for j == killedClusterIndex {
|
|
|
|
j = rand.Intn(nClusters)
|
|
|
|
}
|
|
|
|
|
2017-02-13 15:46:53 +00:00
|
|
|
// now pin should succeed
|
2017-03-08 17:28:43 +00:00
|
|
|
err = clusters[j].Pin(api.PinCid(h))
|
2017-02-13 15:46:53 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2017-07-21 21:45:22 +00:00
|
|
|
time.Sleep(time.Second / 2)
|
2017-03-09 13:44:14 +00:00
|
|
|
|
2017-02-13 15:46:53 +00:00
|
|
|
numPinned := 0
|
|
|
|
for i, c := range clusters {
|
|
|
|
if i == killedClusterIndex {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
pinfo := c.tracker.Status(h)
|
|
|
|
if pinfo.Status == api.TrackerStatusPinned {
|
2017-03-09 13:44:14 +00:00
|
|
|
//t.Log(pinfo.Peer.Pretty())
|
2017-02-13 15:46:53 +00:00
|
|
|
numPinned++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if numPinned != nClusters-1 {
|
|
|
|
t.Error("pin should have been correctly re-assigned")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// In this test we try to pin something when there are not
|
|
|
|
// as many available peers a we need. It's like before, except
|
|
|
|
// more peers are killed.
|
|
|
|
func TestClustersReplicationNotEnoughPeers(t *testing.T) {
|
|
|
|
if nClusters < 5 {
|
|
|
|
t.Skip("Need at least 5 peers")
|
|
|
|
}
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
2018-01-12 17:04:46 +00:00
|
|
|
c.config.ReplicationFactorMin = nClusters - 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters - 1
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
j := rand.Intn(nClusters)
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
2017-03-08 17:28:43 +00:00
|
|
|
err := clusters[j].Pin(api.PinCid(h))
|
2017-02-13 15:46:53 +00:00
|
|
|
if err != nil {
|
2017-02-28 15:01:26 +00:00
|
|
|
t.Fatal(err)
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Let the pin arrive
|
2017-03-14 12:45:17 +00:00
|
|
|
time.Sleep(time.Second / 2)
|
2017-02-13 15:46:53 +00:00
|
|
|
|
2017-02-28 15:01:26 +00:00
|
|
|
clusters[0].Shutdown()
|
2017-02-13 15:46:53 +00:00
|
|
|
clusters[1].Shutdown()
|
|
|
|
|
|
|
|
delay()
|
2017-03-09 13:44:14 +00:00
|
|
|
waitForLeader(t, clusters)
|
2017-02-28 15:01:26 +00:00
|
|
|
|
2017-03-08 17:28:43 +00:00
|
|
|
err = clusters[2].Pin(api.PinCid(h))
|
2017-02-13 15:46:53 +00:00
|
|
|
if err == nil {
|
|
|
|
t.Fatal("expected an error")
|
|
|
|
}
|
2018-01-12 17:04:46 +00:00
|
|
|
if !strings.Contains(err.Error(), "not enough peers to allocate") {
|
2017-02-13 15:46:53 +00:00
|
|
|
t.Error("different error than expected")
|
|
|
|
t.Error(err)
|
|
|
|
}
|
2017-03-13 14:55:52 +00:00
|
|
|
//t.Log(err)
|
2017-02-13 15:46:53 +00:00
|
|
|
}
|
2017-02-28 15:01:26 +00:00
|
|
|
|
|
|
|
func TestClustersRebalanceOnPeerDown(t *testing.T) {
|
|
|
|
if nClusters < 5 {
|
|
|
|
t.Skip("Need at least 5 peers")
|
|
|
|
}
|
|
|
|
|
|
|
|
clusters, mock := createClusters(t)
|
|
|
|
defer shutdownClusters(t, clusters, mock)
|
|
|
|
for _, c := range clusters {
|
2018-01-12 17:04:46 +00:00
|
|
|
c.config.ReplicationFactorMin = nClusters - 1
|
|
|
|
c.config.ReplicationFactorMax = nClusters - 1
|
2017-02-28 15:01:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// pin something
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
2017-03-08 17:28:43 +00:00
|
|
|
clusters[0].Pin(api.PinCid(h))
|
2018-01-16 19:57:54 +00:00
|
|
|
time.Sleep(time.Second * 2) // let the pin arrive
|
2017-02-28 15:01:26 +00:00
|
|
|
pinLocal := 0
|
|
|
|
pinRemote := 0
|
|
|
|
var localPinner peer.ID
|
|
|
|
var remotePinner peer.ID
|
|
|
|
var remotePinnerCluster *Cluster
|
|
|
|
|
|
|
|
status, _ := clusters[0].Status(h)
|
|
|
|
|
|
|
|
// check it was correctly pinned
|
|
|
|
for p, pinfo := range status.PeerMap {
|
|
|
|
if pinfo.Status == api.TrackerStatusPinned {
|
|
|
|
pinLocal++
|
|
|
|
localPinner = p
|
|
|
|
} else if pinfo.Status == api.TrackerStatusRemote {
|
|
|
|
pinRemote++
|
|
|
|
remotePinner = p
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if pinLocal != nClusters-1 || pinRemote != 1 {
|
|
|
|
t.Fatal("Not pinned as expected")
|
|
|
|
}
|
|
|
|
|
|
|
|
// find a kill the local pinner
|
|
|
|
for _, c := range clusters {
|
|
|
|
if c.id == localPinner {
|
|
|
|
c.Shutdown()
|
|
|
|
} else if c.id == remotePinner {
|
|
|
|
remotePinnerCluster = c
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sleep a monitoring interval
|
|
|
|
time.Sleep(6 * time.Second)
|
|
|
|
|
|
|
|
// It should be now pinned in the remote pinner
|
|
|
|
if s := remotePinnerCluster.tracker.Status(h).Status; s != api.TrackerStatusPinned {
|
|
|
|
t.Errorf("it should be pinned and is %s", s)
|
|
|
|
}
|
|
|
|
}
|