8f06baa1bf
The following commit reimplements ipfs-cluster configuration under the following premises: * Each component is initialized with a configuration object defined by its module * Each component decides how the JSON representation of its configuration looks like * Each component parses and validates its own configuration * Each component exposes its own defaults * Component configurations are make the sections of a central JSON configuration file (which replaces the current JSON format) * Component configurations implement a common interface (config.ComponentConfig) with a set of common operations * The central configuration file is managed by a config.ConfigManager which: * Registers ComponentConfigs * Assigns the correspondent sections from the JSON file to each component and delegates the parsing * Delegates the JSON generation for each section * Can be notified when the configuration is updated and must be saved to disk The new service.json would then look as follows: ```json { "cluster": { "id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2", "private_key": "<...>", "secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786", "peers": [], "bootstrap": [], "leave_on_shutdown": false, "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096", "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", "replication_factor": -1, "monitor_ping_interval": "15s" }, "consensus": { "raft": { "heartbeat_timeout": "1s", "election_timeout": "1s", "commit_timeout": "50ms", "max_append_entries": 64, "trailing_logs": 10240, "snapshot_interval": "2m0s", "snapshot_threshold": 8192, "leader_lease_timeout": "500ms" } }, "api": { "restapi": { "listen_multiaddress": "/ip4/127.0.0.1/tcp/9094", "read_timeout": "30s", "read_header_timeout": "5s", "write_timeout": "1m0s", "idle_timeout": "2m0s" } }, "ipfs_connector": { "ipfshttp": { "proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095", "node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "connect_swarms_delay": "7s", "proxy_read_timeout": "10m0s", "proxy_read_header_timeout": "5s", "proxy_write_timeout": "10m0s", "proxy_idle_timeout": "1m0s" } }, "monitor": { "monbasic": { "check_interval": "15s" } }, "informer": { "disk": { "metric_ttl": "30s", "metric_type": "freespace" }, "numpin": { "metric_ttl": "10s" } } } ``` This new format aims to be easily extensible per component. As such, it already surfaces quite a few new options which were hardcoded before. Additionally, since Go API have changed, some redundant methods have been removed and small refactoring has happened to take advantage of the new way. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
418 lines
9.6 KiB
Go
418 lines
9.6 KiB
Go
// Package raft implements a Consensus component for IPFS Cluster which uses
|
|
// Raft (go-libp2p-raft).
|
|
package raft
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/state"
|
|
|
|
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
|
logging "github.com/ipfs/go-log"
|
|
consensus "github.com/libp2p/go-libp2p-consensus"
|
|
host "github.com/libp2p/go-libp2p-host"
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
libp2praft "github.com/libp2p/go-libp2p-raft"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
var logger = logging.Logger("consensus")
|
|
|
|
// LeaderTimeout specifies how long to wait before failing an operation
|
|
// because there is no leader
|
|
var LeaderTimeout = 15 * time.Second
|
|
|
|
// CommitRetries specifies how many times we retry a failed commit until
|
|
// we give up
|
|
var CommitRetries = 2
|
|
|
|
// Consensus handles the work of keeping a shared-state between
|
|
// the peers of an IPFS Cluster, as well as modifying that state and
|
|
// applying any updates in a thread-safe manner.
|
|
type Consensus struct {
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
host host.Host
|
|
|
|
consensus consensus.OpLogConsensus
|
|
actor consensus.Actor
|
|
baseOp *LogOp
|
|
raft *Raft
|
|
|
|
rpcClient *rpc.Client
|
|
rpcReady chan struct{}
|
|
readyCh chan struct{}
|
|
|
|
shutdownLock sync.Mutex
|
|
shutdown bool
|
|
}
|
|
|
|
// NewConsensus builds a new ClusterConsensus component. The state
|
|
// is used to initialize the Consensus system, so any information in it
|
|
// is discarded.
|
|
func NewConsensus(clusterPeers []peer.ID, host host.Host, cfg *Config, state state.State) (*Consensus, error) {
|
|
err := cfg.Validate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
op := &LogOp{
|
|
ctx: context.Background(),
|
|
}
|
|
|
|
logger.Infof("starting Consensus and waiting for a leader...")
|
|
consensus := libp2praft.NewOpLog(state, op)
|
|
raft, err := NewRaft(clusterPeers, host, cfg, consensus.FSM())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
actor := libp2praft.NewActor(raft.raft)
|
|
consensus.SetActor(actor)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
op.ctx = ctx
|
|
|
|
cc := &Consensus{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
host: host,
|
|
consensus: consensus,
|
|
actor: actor,
|
|
baseOp: op,
|
|
raft: raft,
|
|
rpcReady: make(chan struct{}, 1),
|
|
readyCh: make(chan struct{}, 1),
|
|
}
|
|
|
|
go cc.finishBootstrap()
|
|
return cc, nil
|
|
}
|
|
|
|
// WaitForSync waits for a leader and for the state to be up to date, then returns.
|
|
func (cc *Consensus) WaitForSync() error {
|
|
leaderCtx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout)
|
|
defer cancel()
|
|
err := cc.raft.WaitForLeader(leaderCtx)
|
|
if err != nil {
|
|
return errors.New("error waiting for leader: " + err.Error())
|
|
}
|
|
err = cc.raft.WaitForUpdates(cc.ctx)
|
|
if err != nil {
|
|
return errors.New("error waiting for consensus updates: " + err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// waits until there is a consensus leader and syncs the state
|
|
// to the tracker
|
|
func (cc *Consensus) finishBootstrap() {
|
|
err := cc.WaitForSync()
|
|
if err != nil {
|
|
return
|
|
}
|
|
logger.Info("Consensus state is up to date")
|
|
|
|
// While rpc is not ready we cannot perform a sync
|
|
if cc.rpcClient == nil {
|
|
select {
|
|
case <-cc.ctx.Done():
|
|
return
|
|
case <-cc.rpcReady:
|
|
}
|
|
}
|
|
|
|
st, err := cc.State()
|
|
_ = st
|
|
// only check sync if we have a state
|
|
// avoid error on new running clusters
|
|
if err != nil {
|
|
logger.Debug("skipping state sync: ", err)
|
|
} else {
|
|
var pInfoSerial []api.PinInfoSerial
|
|
cc.rpcClient.Go(
|
|
"",
|
|
"Cluster",
|
|
"StateSync",
|
|
struct{}{},
|
|
&pInfoSerial,
|
|
nil)
|
|
}
|
|
cc.readyCh <- struct{}{}
|
|
logger.Debug("consensus ready")
|
|
}
|
|
|
|
// Shutdown stops the component so it will not process any
|
|
// more updates. The underlying consensus is permanently
|
|
// shutdown, along with the libp2p transport.
|
|
func (cc *Consensus) Shutdown() error {
|
|
cc.shutdownLock.Lock()
|
|
defer cc.shutdownLock.Unlock()
|
|
|
|
if cc.shutdown {
|
|
logger.Debug("already shutdown")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("stopping Consensus component")
|
|
|
|
cc.cancel()
|
|
close(cc.rpcReady)
|
|
|
|
// Raft shutdown
|
|
errMsgs := ""
|
|
err := cc.raft.Snapshot()
|
|
if err != nil {
|
|
errMsgs += err.Error()
|
|
}
|
|
err = cc.raft.Shutdown()
|
|
if err != nil {
|
|
errMsgs += err.Error()
|
|
}
|
|
|
|
if errMsgs != "" {
|
|
errMsgs += "Consensus shutdown unsuccessful"
|
|
logger.Error(errMsgs)
|
|
return errors.New(errMsgs)
|
|
}
|
|
cc.shutdown = true
|
|
return nil
|
|
}
|
|
|
|
// SetClient makes the component ready to perform RPC requets
|
|
func (cc *Consensus) SetClient(c *rpc.Client) {
|
|
cc.rpcClient = c
|
|
cc.baseOp.rpcClient = c
|
|
cc.rpcReady <- struct{}{}
|
|
}
|
|
|
|
// Ready returns a channel which is signaled when the Consensus
|
|
// algorithm has finished bootstrapping and is ready to use
|
|
func (cc *Consensus) Ready() <-chan struct{} {
|
|
return cc.readyCh
|
|
}
|
|
|
|
func (cc *Consensus) op(argi interface{}, t LogOpType) *LogOp {
|
|
switch argi.(type) {
|
|
case api.Pin:
|
|
return &LogOp{
|
|
Cid: argi.(api.Pin).ToSerial(),
|
|
Type: t,
|
|
}
|
|
case ma.Multiaddr:
|
|
return &LogOp{
|
|
Peer: api.MultiaddrToSerial(argi.(ma.Multiaddr)),
|
|
Type: t,
|
|
}
|
|
default:
|
|
panic("bad type")
|
|
}
|
|
}
|
|
|
|
// returns true if the operation was redirected to the leader
|
|
func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) {
|
|
leader, err := cc.Leader()
|
|
if err != nil {
|
|
rctx, cancel := context.WithTimeout(cc.ctx, LeaderTimeout)
|
|
defer cancel()
|
|
err := cc.raft.WaitForLeader(rctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
if leader == cc.host.ID() {
|
|
return false, nil
|
|
}
|
|
|
|
err = cc.rpcClient.Call(
|
|
leader,
|
|
"Cluster",
|
|
method,
|
|
arg,
|
|
&struct{}{})
|
|
return true, err
|
|
}
|
|
|
|
func (cc *Consensus) logOpCid(rpcOp string, opType LogOpType, pin api.Pin) error {
|
|
var finalErr error
|
|
for i := 0; i < CommitRetries; i++ {
|
|
logger.Debugf("Try %d", i)
|
|
redirected, err := cc.redirectToLeader(
|
|
rpcOp, pin.ToSerial())
|
|
if err != nil {
|
|
finalErr = err
|
|
continue
|
|
}
|
|
|
|
if redirected {
|
|
return nil
|
|
}
|
|
|
|
// It seems WE are the leader.
|
|
|
|
op := cc.op(pin, opType)
|
|
_, err = cc.consensus.CommitOp(op)
|
|
if err != nil {
|
|
// This means the op did not make it to the log
|
|
finalErr = err
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
finalErr = nil
|
|
break
|
|
}
|
|
if finalErr != nil {
|
|
return finalErr
|
|
}
|
|
|
|
switch opType {
|
|
case LogOpPin:
|
|
logger.Infof("pin committed to global state: %s", pin.Cid)
|
|
case LogOpUnpin:
|
|
logger.Infof("unpin committed to global state: %s", pin.Cid)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// LogPin submits a Cid to the shared state of the cluster. It will forward
|
|
// the operation to the leader if this is not it.
|
|
func (cc *Consensus) LogPin(c api.Pin) error {
|
|
return cc.logOpCid("ConsensusLogPin", LogOpPin, c)
|
|
}
|
|
|
|
// LogUnpin removes a Cid from the shared state of the cluster.
|
|
func (cc *Consensus) LogUnpin(c api.Pin) error {
|
|
return cc.logOpCid("ConsensusLogUnpin", LogOpUnpin, c)
|
|
}
|
|
|
|
// LogAddPeer submits a new peer to the shared state of the cluster. It will
|
|
// forward the operation to the leader if this is not it.
|
|
func (cc *Consensus) LogAddPeer(addr ma.Multiaddr) error {
|
|
var finalErr error
|
|
for i := 0; i < CommitRetries; i++ {
|
|
logger.Debugf("Try %d", i)
|
|
redirected, err := cc.redirectToLeader(
|
|
"ConsensusLogAddPeer", api.MultiaddrToSerial(addr))
|
|
if err != nil {
|
|
finalErr = err
|
|
continue
|
|
}
|
|
|
|
if redirected {
|
|
return nil
|
|
}
|
|
|
|
// It seems WE are the leader.
|
|
pidStr, err := addr.ValueForProtocol(ma.P_IPFS)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pid, err := peer.IDB58Decode(pidStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create pin operation for the log
|
|
op := cc.op(addr, LogOpAddPeer)
|
|
_, err = cc.consensus.CommitOp(op)
|
|
if err != nil {
|
|
// This means the op did not make it to the log
|
|
finalErr = err
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
err = cc.raft.AddPeer(peer.IDB58Encode(pid))
|
|
if err != nil {
|
|
finalErr = err
|
|
continue
|
|
}
|
|
finalErr = nil
|
|
break
|
|
}
|
|
if finalErr != nil {
|
|
return finalErr
|
|
}
|
|
logger.Infof("peer committed to global state: %s", addr)
|
|
return nil
|
|
}
|
|
|
|
// LogRmPeer removes a peer from the shared state of the cluster. It will
|
|
// forward the operation to the leader if this is not it.
|
|
func (cc *Consensus) LogRmPeer(pid peer.ID) error {
|
|
var finalErr error
|
|
for i := 0; i < CommitRetries; i++ {
|
|
logger.Debugf("Try %d", i)
|
|
redirected, err := cc.redirectToLeader("ConsensusLogRmPeer", pid)
|
|
if err != nil {
|
|
finalErr = err
|
|
continue
|
|
}
|
|
|
|
if redirected {
|
|
return nil
|
|
}
|
|
|
|
// It seems WE are the leader.
|
|
|
|
// Create pin operation for the log
|
|
addr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(pid))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
op := cc.op(addr, LogOpRmPeer)
|
|
_, err = cc.consensus.CommitOp(op)
|
|
if err != nil {
|
|
// This means the op did not make it to the log
|
|
finalErr = err
|
|
continue
|
|
}
|
|
err = cc.raft.RemovePeer(peer.IDB58Encode(pid))
|
|
if err != nil {
|
|
finalErr = err
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
finalErr = nil
|
|
break
|
|
}
|
|
if finalErr != nil {
|
|
return finalErr
|
|
}
|
|
logger.Infof("peer removed from global state: %s", pid)
|
|
return nil
|
|
}
|
|
|
|
// State retrieves the current consensus State. It may error
|
|
// if no State has been agreed upon or the state is not
|
|
// consistent. The returned State is the last agreed-upon
|
|
// State known by this node.
|
|
func (cc *Consensus) State() (state.State, error) {
|
|
st, err := cc.consensus.GetLogHead()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
state, ok := st.(state.State)
|
|
if !ok {
|
|
return nil, errors.New("wrong state type")
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
// Leader returns the peerID of the Leader of the
|
|
// cluster. It returns an error when there is no leader.
|
|
func (cc *Consensus) Leader() (peer.ID, error) {
|
|
raftactor := cc.actor.(*libp2praft.Actor)
|
|
return raftactor.Leader()
|
|
}
|
|
|
|
// Rollback replaces the current agreed-upon
|
|
// state with the state provided. Only the consensus leader
|
|
// can perform this operation.
|
|
func (cc *Consensus) Rollback(state state.State) error {
|
|
return cc.consensus.Rollback(state)
|
|
}
|