ipfs-cluster/consensus/raft/consensus_test.go

282 lines
6.7 KiB
Go
Raw Normal View History

package raft
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
func cleanRaft(idn int) {
os.RemoveAll(fmt.Sprintf("raftFolderFromTests-%d", idn))
}
func consensusListenAddr(c *Consensus) ma.Multiaddr {
return c.host.Addrs()[0]
}
func consensusAddr(c *Consensus) ma.Multiaddr {
cAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", consensusListenAddr(c), c.host.ID().Pretty()))
return cAddr
}
func makeTestingHost(t *testing.T) host.Host {
h, err := libp2p.New(
context.Background(),
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
)
if err != nil {
t.Fatal(err)
}
return h
}
func testingConsensus(t *testing.T, idn int) *Consensus {
cleanRaft(idn)
h := makeTestingHost(t)
st := mapstate.NewMapState()
Issue #162: Rework configuration format The following commit reimplements ipfs-cluster configuration under the following premises: * Each component is initialized with a configuration object defined by its module * Each component decides how the JSON representation of its configuration looks like * Each component parses and validates its own configuration * Each component exposes its own defaults * Component configurations are make the sections of a central JSON configuration file (which replaces the current JSON format) * Component configurations implement a common interface (config.ComponentConfig) with a set of common operations * The central configuration file is managed by a config.ConfigManager which: * Registers ComponentConfigs * Assigns the correspondent sections from the JSON file to each component and delegates the parsing * Delegates the JSON generation for each section * Can be notified when the configuration is updated and must be saved to disk The new service.json would then look as follows: ```json { "cluster": { "id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2", "private_key": "<...>", "secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786", "peers": [], "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, "consensus": { "raft": { "heartbeat_timeout": "1s", "election_timeout": "1s", "commit_timeout": "50ms", "max_append_entries": 64, "trailing_logs": 10240, "snapshot_interval": "2m0s", "snapshot_threshold": 8192, "leader_lease_timeout": "500ms" } }, "api": { "restapi": { "listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "read_timeout": "30s", "read_header_timeout": "5s", "write_timeout": "1m0s", "idle_timeout": "2m0s" } }, "ipfs_connector": { "ipfshttp": { "proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "connect_swarms_delay": "7s", "proxy_read_timeout": "10m0s", "proxy_read_header_timeout": "5s", "proxy_write_timeout": "10m0s", "proxy_idle_timeout": "1m0s" } }, "monitor": { "monbasic": { "check_interval": "15s" } }, "informer": { "disk": { "metric_ttl": "30s", "metric_type": "freespace" }, "numpin": { "metric_ttl": "10s" } } } ``` This new format aims to be easily extensible per component. As such, it already surfaces quite a few new options which were hardcoded before. Additionally, since Go API have changed, some redundant methods have been removed and small refactoring has happened to take advantage of the new way. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
cfg := &Config{}
cfg.Default()
cfg.DataFolder = fmt.Sprintf("raftFolderFromTests-%d", idn)
cfg.hostShutdown = true
Issue #162: Rework configuration format The following commit reimplements ipfs-cluster configuration under the following premises: * Each component is initialized with a configuration object defined by its module * Each component decides how the JSON representation of its configuration looks like * Each component parses and validates its own configuration * Each component exposes its own defaults * Component configurations are make the sections of a central JSON configuration file (which replaces the current JSON format) * Component configurations implement a common interface (config.ComponentConfig) with a set of common operations * The central configuration file is managed by a config.ConfigManager which: * Registers ComponentConfigs * Assigns the correspondent sections from the JSON file to each component and delegates the parsing * Delegates the JSON generation for each section * Can be notified when the configuration is updated and must be saved to disk The new service.json would then look as follows: ```json { "cluster": { "id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2", "private_key": "<...>", "secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786", "peers": [], "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, "consensus": { "raft": { "heartbeat_timeout": "1s", "election_timeout": "1s", "commit_timeout": "50ms", "max_append_entries": 64, "trailing_logs": 10240, "snapshot_interval": "2m0s", "snapshot_threshold": 8192, "leader_lease_timeout": "500ms" } }, "api": { "restapi": { "listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "read_timeout": "30s", "read_header_timeout": "5s", "write_timeout": "1m0s", "idle_timeout": "2m0s" } }, "ipfs_connector": { "ipfshttp": { "proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "connect_swarms_delay": "7s", "proxy_read_timeout": "10m0s", "proxy_read_header_timeout": "5s", "proxy_write_timeout": "10m0s", "proxy_idle_timeout": "1m0s" } }, "monitor": { "monbasic": { "check_interval": "15s" } }, "informer": { "disk": { "metric_ttl": "30s", "metric_type": "freespace" }, "numpin": { "metric_ttl": "10s" } } } ``` This new format aims to be easily extensible per component. As such, it already surfaces quite a few new options which were hardcoded before. Additionally, since Go API have changed, some redundant methods have been removed and small refactoring has happened to take advantage of the new way. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
Feat: emancipate Consensus from the Cluster component This commit promotes the Consensus component (and Raft) to become a fully independent thing like other components, passed to NewCluster during initialization. Cluster (main component) no longer creates the consensus layer internally. This has triggered a number of breaking changes that I will explain below. Motivation: Future work will require the possibility of running Cluster with a consensus layer that is not Raft. The "consensus" layer is in charge of maintaining two things: * The current cluster peerset, as required by the implementation * The current cluster pinset (shared state) While the pinset maintenance has always been in the consensus layer, the peerset maintenance was handled by the main component (starting by the "peers" key in the configuration) AND the Raft component (internally) and this generated lots of confusion: if the user edited the peers in the configuration they would be greeted with an error. The bootstrap process (adding a peer to an existing cluster) and configuration key also complicated many things, since the main component did it, but only when the consensus was initialized and in single peer mode. In all this we also mixed the peerstore (list of peer addresses in the libp2p host) with the peerset, when they need not to be linked. By initializing the consensus layer before calling NewCluster, all the difficulties in maintaining the current implementation in the same way have come to light. Thus, the following changes have been introduced: * Remove "peers" and "bootstrap" keys from the configuration: we no longer edit or save the configuration files. This was a very bad practice, requiring write permissions by the process to the file containing the private key and additionally made things like Puppet deployments of cluster difficult as configuration would mutate from its initial version. Needless to say all the maintenance associated to making sure peers and bootstrap had correct values when peers are bootstrapped or removed. A loud and detailed error message has been added when staring cluster with an old config, along with instructions on how to move forward. * Introduce a PeerstoreFile ("peerstore") which stores peer addresses: in ipfs, the peerstore is not persisted because it can be re-built from the network bootstrappers and the DHT. Cluster should probably also allow discoverability of peers addresses (when not bootstrapping, as in that case we have it), but in the meantime, we will read and persist the peerstore addresses for cluster peers in this file, different from the configuration. Note that dns multiaddresses are now fully supported and no IPs are saved when we have DNS multiaddresses for a peer. * The former "peer_manager" code is now a pstoremgr module, providing utilities to parse, add, list and generally maintain the libp2p host peerstore, including operations on the PeerstoreFile. This "pstoremgr" can now also be extended to perform address autodiscovery and other things indepedently from Cluster. * Create and initialize Raft outside of the main Cluster component: since we can now launch Raft independently from Cluster, we have more degrees of freedom. A new "staging" option when creating the object allows a raft peer to be launched in Staging mode, waiting to be added to a running consensus, and thus, not electing itself as leader or doing anything like we were doing before. This additionally allows us to track when the peer has become a Voter, which only happens when it's caught up with the state, something that was wonky previously. * The raft configuration now includes an InitPeerset key, which allows to provide a peerset for new peers and which is ignored when staging==true. The whole Raft initialization code is way cleaner and stronger now. * Cluster peer bootsrapping is now an ipfs-cluster-service feature. The --bootstrap flag works as before (additionally allowing comma-separated-list of entries). What bootstrap does, is to initialize Raft with staging == true, and then call Join in the main cluster component. Only when the Raft peer transitions to Voter, consensus becomes ready, and cluster becomes Ready. This is cleaner, works better and is less complex than before (supporting both flags and config values). We also backup and clean the state whenever we are boostrapping, automatically * ipfs-cluster-service no longer runs the daemon. Starting cluster needs now "ipfs-cluster-service daemon". The daemon specific flags (bootstrap, alloc) are now flags for the daemon subcommand. Here we mimic ipfs ("ipfs" does not start the daemon but print help) and pave the path for merging both service and ctl in the future. While this brings some breaking changes, it significantly reduces the complexity of the configuration, the code and most importantly, the documentation. It should be easier now to explain the user what is the right way to launch a cluster peer, and more difficult to make mistakes. As a side effect, the PR also: * Fixes #381 - peers with dynamic addresses * Fixes #371 - peers should be Raft configuration option * Fixes #378 - waitForUpdates may return before state fully synced * Fixes #235 - config option shadowing (no cfg saves, no need to shadow) License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
2018-04-28 22:22:23 +00:00
cc, err := NewConsensus(h, cfg, st, false)
if err != nil {
t.Fatal("cannot create Consensus:", err)
}
cc.SetClient(test.NewMockRPCClientWithHost(t, h))
<-cc.Ready()
return cc
}
func TestShutdownConsensus(t *testing.T) {
// Bring it up twice to make sure shutdown cleans up properly
// but also to make sure raft comes up ok when re-initialized
cc := testingConsensus(t, 1)
defer cleanRaft(1)
err := cc.Shutdown()
if err != nil {
t.Fatal("Consensus cannot shutdown:", err)
}
err = cc.Shutdown() // should be fine to shutdown twice
if err != nil {
t.Fatal("Consensus should be able to shutdown several times")
}
cleanRaft(1)
cc = testingConsensus(t, 1)
err = cc.Shutdown()
if err != nil {
t.Fatal("Consensus cannot shutdown:", err)
}
cleanRaft(1)
}
func TestConsensusPin(t *testing.T) {
cc := testingConsensus(t, 1)
defer cleanRaft(1) // Remember defer runs in LIFO order
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
st, err := cc.State()
if err != nil {
t.Fatal("error gettinng state:", err)
}
pins := st.List()
if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 {
t.Error("the added pin should be in the state")
}
}
func TestConsensusUnpin(t *testing.T) {
cc := testingConsensus(t, 1)
defer cleanRaft(1)
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid2)
err := cc.LogUnpin(api.PinCid(c))
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
}
func TestConsensusAddPeer(t *testing.T) {
cc := testingConsensus(t, 1)
cc2 := testingConsensus(t, 2)
t.Log(cc.host.ID().Pretty())
t.Log(cc2.host.ID().Pretty())
defer cleanRaft(1)
defer cleanRaft(2)
defer cc.Shutdown()
defer cc2.Shutdown()
cc.host.Peerstore().AddAddr(cc2.host.ID(), consensusListenAddr(cc2), peerstore.PermanentAddrTTL)
err := cc.AddPeer(cc2.host.ID())
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = cc2.raft.WaitForPeer(ctx, cc.host.ID().Pretty(), false)
if err != nil {
t.Fatal(err)
}
peers, err := cc2.raft.Peers()
if err != nil {
t.Fatal(err)
}
if len(peers) != 2 {
t.Error("peer was not added")
}
}
func TestConsensusRmPeer(t *testing.T) {
cc := testingConsensus(t, 1)
cc2 := testingConsensus(t, 2)
defer cleanRaft(1)
defer cleanRaft(2)
defer cc.Shutdown()
defer cc2.Shutdown()
cc.host.Peerstore().AddAddr(cc2.host.ID(), consensusListenAddr(cc2), peerstore.PermanentAddrTTL)
err := cc.AddPeer(cc2.host.ID())
if err != nil {
t.Error("could not add peer:", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
err = cc.raft.WaitForPeer(ctx, cc2.host.ID().Pretty(), false)
if err != nil {
t.Fatal(err)
}
cc.raft.WaitForLeader(ctx)
c, _ := cid.Decode(test.TestCid1)
err = cc.LogPin(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("could not pin after adding peer:", err)
}
time.Sleep(2 * time.Second)
// Remove unexisting peer
err = cc.RmPeer(test.TestPeerID1)
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
// Remove real peer. At least the leader can succeed
err = cc2.RmPeer(cc.host.ID())
err2 := cc.RmPeer(cc2.host.ID())
if err != nil && err2 != nil {
t.Error("could not remove peer:", err, err2)
}
err = cc.raft.WaitForPeer(ctx, cc2.host.ID().Pretty(), true)
if err != nil {
t.Fatal(err)
}
}
func TestConsensusLeader(t *testing.T) {
cc := testingConsensus(t, 1)
pID := cc.host.ID()
defer cleanRaft(1)
defer cc.Shutdown()
l, err := cc.Leader()
if err != nil {
t.Fatal("No leader:", err)
}
if l != pID {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}
func TestRaftLatestSnapshot(t *testing.T) {
cc := testingConsensus(t, 1)
defer cleanRaft(1)
defer cc.Shutdown()
// Make pin 1
c1, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the first pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the first snapshot was not taken successfully")
}
// Make pin 2
c2, _ := cid.Decode(test.TestCid2)
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the second pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the second snapshot was not taken successfully")
}
// Make pin 3
c3, _ := cid.Decode(test.TestCid3)
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the third pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the third snapshot was not taken successfully")
}
// Call raft.LastState and ensure we get the correct state
snapState := mapstate.NewMapState()
r, snapExists, err := LastStateRaw(cc.config)
if !snapExists {
t.Fatal("No snapshot found by LastStateRaw")
}
if err != nil {
t.Fatal("Error while taking snapshot", err)
}
err = snapState.Migrate(r)
if err != nil {
t.Fatal("Snapshot bytes returned could not restore to state")
}
pins := snapState.List()
if len(pins) != 3 {
t.Fatal("Latest snapshot not read")
}
}