diff --git a/cluster.go b/cluster.go index 68bdb05a..a140e664 100644 --- a/cluster.go +++ b/cluster.go @@ -100,7 +100,7 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P cluster.rpcClient = rpcClient // Setup Consensus - consensus, err := NewConsensus(cfg, host, state) + consensus, err := NewConsensus(pm.peers(), host, cfg.ConsensusDataFolder, state) if err != nil { logger.Errorf("error creating consensus: %s", err) cluster.Shutdown() @@ -439,6 +439,7 @@ func (c *Cluster) Recover(h *cid.Cid) (GlobalPinInfo, error) { func (c *Cluster) Pins() []*cid.Cid { cState, err := c.consensus.State() if err != nil { + logger.Error(err) return []*cid.Cid{} } return cState.ListPins() diff --git a/config.go b/config.go index 006b34cd..af7c1cd3 100644 --- a/config.go +++ b/config.go @@ -6,9 +6,7 @@ import ( "fmt" "io/ioutil" "sync" - "time" - hashiraft "github.com/hashicorp/raft" crypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" @@ -55,9 +53,6 @@ type Config struct { // the Consensus component. ConsensusDataFolder string - // Hashicorp's Raft configuration - RaftConfig *hashiraft.Config - // if a config has been loaded from disk, track the path // so it can be saved to the same place. path string @@ -95,18 +90,6 @@ type JSONConfig struct { // Storage folder for snapshots, log store etc. Used by // the Consensus component. ConsensusDataFolder string `json:"consensus_data_folder"` - - // Raft configuration - RaftConfig *RaftConfig `json:"raft_config"` -} - -// RaftConfig is a configuration section which affects the behaviour of -// the Raft component. See https://godoc.org/github.com/hashicorp/raft#Config -// for more information. Only the options below are customizable, the rest will -// take the default values from raft.DefaultConfig(). -type RaftConfig struct { - SnapshotIntervalSeconds int `json:"snapshot_interval_seconds"` - EnableSingleNode bool `json:"enable_single_node"` } // ToJSONConfig converts a Config object to its JSON representation which @@ -140,10 +123,6 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) { IPFSProxyListenMultiaddress: cfg.IPFSProxyAddr.String(), IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(), ConsensusDataFolder: cfg.ConsensusDataFolder, - RaftConfig: &RaftConfig{ - SnapshotIntervalSeconds: int(cfg.RaftConfig.SnapshotInterval / time.Second), - EnableSingleNode: cfg.RaftConfig.EnableSingleNode, - }, } return } @@ -201,14 +180,6 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { return } - raftCfg := hashiraft.DefaultConfig() - raftCfg.DisableBootstrapAfterElect = false - raftCfg.ShutdownOnRemove = false - if jcfg.RaftConfig != nil { - raftCfg.SnapshotInterval = time.Duration(jcfg.RaftConfig.SnapshotIntervalSeconds) * time.Second - raftCfg.EnableSingleNode = jcfg.RaftConfig.EnableSingleNode - } - c = &Config{ ID: id, PrivateKey: pKey, @@ -217,7 +188,6 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) { APIAddr: apiAddr, IPFSProxyAddr: ipfsProxyAddr, IPFSNodeAddr: ipfsNodeAddr, - RaftConfig: raftCfg, ConsensusDataFolder: jcfg.ConsensusDataFolder, } return @@ -275,11 +245,6 @@ func NewDefaultConfig() (*Config, error) { return nil, err } - raftCfg := hashiraft.DefaultConfig() - raftCfg.DisableBootstrapAfterElect = false - raftCfg.EnableSingleNode = true - raftCfg.ShutdownOnRemove = false - clusterAddr, _ := ma.NewMultiaddr(DefaultClusterAddr) apiAddr, _ := ma.NewMultiaddr(DefaultAPIAddr) ipfsProxyAddr, _ := ma.NewMultiaddr(DefaultIPFSProxyAddr) @@ -294,7 +259,6 @@ func NewDefaultConfig() (*Config, error) { IPFSProxyAddr: ipfsProxyAddr, IPFSNodeAddr: ipfsNodeAddr, ConsensusDataFolder: "ipfscluster-data", - RaftConfig: raftCfg, }, nil } diff --git a/config_test.go b/config_test.go index eec83cd9..8ee7729c 100644 --- a/config_test.go +++ b/config_test.go @@ -11,10 +11,6 @@ func testingConfig() *Config { APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002", IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001", ConsensusDataFolder: "./raftFolderFromTests", - RaftConfig: &RaftConfig{ - EnableSingleNode: true, - SnapshotIntervalSeconds: 120, - }, } cfg, _ := jcfg.ToConfig() diff --git a/consensus.go b/consensus.go index 979bfaa0..9415997c 100644 --- a/consensus.go +++ b/consensus.go @@ -3,7 +3,6 @@ package ipfscluster import ( "context" "errors" - "strings" "sync" "time" @@ -15,11 +14,6 @@ import ( libp2praft "github.com/libp2p/go-libp2p-raft" ) -const ( - maxSnapshots = 5 - raftSingleMode = true -) - // Type of pin operation const ( LogOpPin = iota + 1 @@ -101,13 +95,12 @@ ROLLBACK: type Consensus struct { ctx context.Context - cfg *Config host host.Host consensus consensus.OpLogConsensus actor consensus.Actor baseOp *clusterLogOp - p2pRaft *libp2pRaftWrap + raft *Raft rpcClient *rpc.Client rpcReady chan struct{} @@ -122,33 +115,33 @@ type Consensus struct { // NewConsensus builds a new ClusterConsensus component. The state // is used to initialize the Consensus system, so any information in it // is discarded. -func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) { +func NewConsensus(clusterPeers []peer.ID, host host.Host, dataFolder string, state State) (*Consensus, error) { ctx := context.Background() op := &clusterLogOp{ ctx: context.Background(), } + logger.Infof("starting Consensus and waiting for a leader...") + consensus := libp2praft.NewOpLog(state, op) + raft, err := NewRaft(clusterPeers, host, dataFolder, consensus.FSM()) + if err != nil { + return nil, err + } + actor := libp2praft.NewActor(raft.raft) + consensus.SetActor(actor) + cc := &Consensus{ ctx: ctx, - cfg: cfg, host: host, + consensus: consensus, + actor: actor, baseOp: op, + raft: raft, shutdownCh: make(chan struct{}, 1), rpcReady: make(chan struct{}, 1), readyCh: make(chan struct{}, 1), } - logger.Infof("starting Consensus and waiting leader...") - con, actor, wrapper, err := makeLibp2pRaft(cc.cfg, - cc.host, state, cc.baseOp) - if err != nil { - return nil, err - } - con.SetActor(actor) - cc.actor = actor - cc.consensus = con - cc.p2pRaft = wrapper - cc.run() return cc, nil } @@ -162,108 +155,67 @@ func (cc *Consensus) run() { cc.ctx = ctx cc.baseOp.ctx = ctx - leader, err := cc.waitForLeader() - if err != nil { - return - } - logger.Infof("Consensus leader found (%s). Syncing state...", leader.Pretty()) - - cc.waitForUpdates() - logger.Info("Consensus state is up to date") - - // While rpc is not ready we cannot perform a sync - <-cc.rpcReady - - var pInfo []PinInfo - - _, err = cc.State() - // only check sync if we have a state - // avoid error on new running clusters - if err != nil { - logger.Debug("skipping state sync: ", err) - } else { - cc.rpcClient.Go( - "", - "Cluster", - "StateSync", - struct{}{}, - &pInfo, - nil) - } - cc.readyCh <- struct{}{} - logger.Debug("consensus ready") + go func() { + cc.finishBootstrap() + }() <-cc.shutdownCh }() } -// waits until there is a raft leader -func (cc *Consensus) waitForLeader() (peer.ID, error) { - // Wait for a leader - leader := peer.ID("") - var err error - rounds := 0 - for { - select { - case <-cc.ctx.Done(): - return "", errors.New("shutdown") - default: - if rounds%20 == 0 { //every 10 secs - logger.Info("Consensus is waiting for a leader...") - } - rounds++ - time.Sleep(500 * time.Millisecond) - leader, err = cc.Leader() - if err == nil && leader != "" { - return leader, nil - } - } - } - return leader, nil -} +// waits until there is a consensus leader and syncs the state +// to the tracker +func (cc *Consensus) finishBootstrap() { + cc.raft.WaitForLeader(cc.ctx) + cc.raft.WaitForUpdates(cc.ctx) + logger.Info("Consensus state is up to date") -// waits until the appliedIndex is the same as the lastIndex -func (cc *Consensus) waitForUpdates() { - // Wait for state catch up - logger.Debug("consensus state is catching up") - time.Sleep(time.Second) - for { - select { - case <-cc.ctx.Done(): - return - default: - lai := cc.p2pRaft.raft.AppliedIndex() - li := cc.p2pRaft.raft.LastIndex() - logger.Debugf("current Raft index: %d/%d", - lai, li) - if lai == li { - return - } - time.Sleep(500 * time.Millisecond) - } - - } -} - -// raft stores peer add/rm operations. This is how to force a peer set. -func (cc *Consensus) setPeers() { - logger.Debug("forcefully setting Raft peers to known set") - var peersStr []string - var peers []peer.ID - err := cc.rpcClient.Call("", - "Cluster", - "PeerManagerPeers", - struct{}{}, - &peers) - if err != nil { - logger.Error(err) + // While rpc is not ready we cannot perform a sync + select { + case <-cc.ctx.Done(): return + case <-cc.rpcReady: } - for _, p := range peers { - peersStr = append(peersStr, p.Pretty()) + + var pInfo []PinInfo + + _, err := cc.State() + // only check sync if we have a state + // avoid error on new running clusters + if err != nil { + logger.Debug("skipping state sync: ", err) + } else { + cc.rpcClient.Go( + "", + "Cluster", + "StateSync", + struct{}{}, + &pInfo, + nil) } - cc.p2pRaft.raft.SetPeers(peersStr) + cc.readyCh <- struct{}{} + logger.Debug("consensus ready") // not accurate if we are shutting down } +// // raft stores peer add/rm operations. This is how to force a peer set. +// func (cc *Consensus) setPeers() { +// logger.Debug("forcefully setting Raft peers to known set") +// var peersStr []string +// var peers []peer.ID +// err := cc.rpcClient.Call("", +// "Cluster", +// "PeerManagerPeers", +// struct{}{}, +// &peers) +// if err != nil { +// logger.Error(err) +// return +// } +// for _, p := range peers { +// peersStr = append(peersStr, p.Pretty()) +// } +// cc.p2pRaft.raft.SetPeers(peersStr) +// } + // Shutdown stops the component so it will not process any // more updates. The underlying consensus is permanently // shutdown, along with the libp2p transport. @@ -283,22 +235,13 @@ func (cc *Consensus) Shutdown() error { // Raft shutdown errMsgs := "" - - f := cc.p2pRaft.raft.Snapshot() - err := f.Error() - if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") { - errMsgs += "could not take snapshot: " + err.Error() + ".\n" - } - - f = cc.p2pRaft.raft.Shutdown() - err = f.Error() + err := cc.raft.Snapshot() if err != nil { - errMsgs += "could not shutdown raft: " + err.Error() + ".\n" + errMsgs += err.Error() } - - err = cc.p2pRaft.boltdb.Close() // important! + err = cc.raft.Shutdown() if err != nil { - errMsgs += "could not close boltdb: " + err.Error() + ".\n" + errMsgs += err.Error() } if errMsgs != "" { @@ -396,22 +339,18 @@ func (cc *Consensus) AddPeer(p peer.ID) error { //if err != nil || redirected { // return err // } - // We are the leader - future := cc.p2pRaft.raft.AddPeer(peer.IDB58Encode(p)) - err := future.Error() - return err + + return cc.raft.AddPeer(peer.IDB58Encode(p)) } // RemovePeer attempts to remove a peer from the consensus. func (cc *Consensus) RemovePeer(p peer.ID) error { - //redirected, err := cc.redirectToLeader("ConsensusRmPeer", p) + //redirected, err := cc.redirectToLeader("ConsensusRemovePeer", p) //if err != nil || redirected { // return err //} - future := cc.p2pRaft.raft.RemovePeer(peer.IDB58Encode(p)) - err := future.Error() - return err + return cc.raft.RemovePeer(peer.IDB58Encode(p)) } // State retrieves the current consensus State. It may error @@ -433,7 +372,6 @@ func (cc *Consensus) State() (State, error) { // Leader returns the peerID of the Leader of the // cluster. It returns an error when there is no leader. func (cc *Consensus) Leader() (peer.ID, error) { - // FIXME: Hashicorp Raft specific raftactor := cc.actor.(*libp2praft.Actor) return raftactor.Leader() } diff --git a/consensus_test.go b/consensus_test.go index 664c3eb9..362abca0 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -7,6 +7,7 @@ import ( "time" cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" ) func TestApplyToPin(t *testing.T) { @@ -92,7 +93,7 @@ func testingConsensus(t *testing.T) *Consensus { t.Fatal("cannot create host:", err) } st := NewMapState() - cc, err := NewConsensus(cfg, h, st) + cc, err := NewConsensus([]peer.ID{cfg.ID}, h, cfg.ConsensusDataFolder, st) if err != nil { t.Fatal("cannot create Consensus:", err) } diff --git a/logging.go b/logging.go index 063f04f9..fa8cd5c9 100644 --- a/logging.go +++ b/logging.go @@ -25,6 +25,7 @@ func SetLogLevel(l string) { DEBUG */ logging.SetLogLevel("cluster", l) + //logging.SetLogLevel("raft", l) //logging.SetLogLevel("p2p-gorpc", l) //logging.SetLogLevel("swarm2", l) //logging.SetLogLevel("libp2p-raft", l) diff --git a/peer_manager_test.go b/peer_manager_test.go index 5bef4e27..c737e46b 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -53,7 +53,10 @@ func TestClustersPeerAdd(t *testing.T) { } h, _ := cid.Decode(testCid) - clusters[1].Pin(h) + err := clusters[1].Pin(h) + if err != nil { + t.Fatal(err) + } delay() f := func(t *testing.T, c *Cluster) { @@ -67,6 +70,7 @@ func TestClustersPeerAdd(t *testing.T) { // Check that they are part of the consensus pins := c.Pins() if len(pins) != 1 { + t.Log(pins) t.Error("expected 1 pin everywhere") } diff --git a/raft.go b/raft.go index 327085bc..1d62ee59 100644 --- a/raft.go +++ b/raft.go @@ -1,8 +1,12 @@ package ipfscluster import ( + "context" + "errors" "io/ioutil" "path/filepath" + "strings" + "time" hashiraft "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" @@ -11,9 +15,21 @@ import ( libp2praft "github.com/libp2p/go-libp2p-raft" ) -// libp2pRaftWrap wraps the stuff that we need to run -// hashicorp raft. We carry it around for convenience -type libp2pRaftWrap struct { +// DefaultRaftConfig allows to tweak Raft configuration used by Cluster from +// from the outside. +var DefaultRaftConfig = hashiraft.DefaultConfig() + +// RaftMaxSnapshots indicates how many snapshots to keep in the consensus data +// folder. +var RaftMaxSnapshots = 5 + +// is this running 64 bits arch? https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ +const sixtyfour = uint64(^uint(0)) == ^uint64(0) + +// Raft performs all Raft-specific operations which are needed by Cluster but +// are not fulfilled by the consensus interface. It should contain most of the +// Raft-related stuff so it can be easily replaced in the future, if need be. +type Raft struct { raft *hashiraft.Raft transport *libp2praft.Libp2pTransport snapshotStore hashiraft.SnapshotStore @@ -23,63 +39,62 @@ type libp2pRaftWrap struct { boltdb *raftboltdb.BoltStore } -// This function does all heavy the work which is specifically related to -// hashicorp's Raft. Other places should just rely on the Consensus interface. -func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) (*libp2praft.Consensus, *libp2praft.Actor, *libp2pRaftWrap, error) { +func defaultRaftConfig() *hashiraft.Config { + // These options are imposed over any Default Raft Config. + // Changing them causes cluster peers difficult-to-understand, + // behaviours, usually around the add/remove of peers. + // That means that changing them will make users wonder why something + // does not work the way it is expected to. + // i.e. ShutdownOnRemove will cause that no snapshot will be taken + // when trying to shutdown a peer after removing it from a cluster. + DefaultRaftConfig.DisableBootstrapAfterElect = false + DefaultRaftConfig.EnableSingleNode = true + DefaultRaftConfig.ShutdownOnRemove = false + + // Set up logging + DefaultRaftConfig.LogOutput = ioutil.Discard + DefaultRaftConfig.Logger = raftStdLogger // see logging.go + return DefaultRaftConfig +} + +// NewRaft launches a go-libp2p-raft consensus peer. +func NewRaft(peers []peer.ID, host host.Host, dataFolder string, fsm hashiraft.FSM) (*Raft, error) { logger.Debug("creating libp2p Raft transport") transport, err := libp2praft.NewLibp2pTransportWithHost(host) if err != nil { logger.Error("creating libp2p-raft transport: ", err) - return nil, nil, nil, err + return nil, err } - //logger.Debug("opening connections") - //transport.OpenConns() - pstore := &libp2praft.Peerstore{} - strPeers := []string{peer.IDB58Encode(host.ID())} - for _, addr := range cfg.ClusterPeers { - p, _, err := multiaddrSplit(addr) - if err != nil { - return nil, nil, nil, err - } - strPeers = append(strPeers, p.Pretty()) + peersStr := make([]string, len(peers), len(peers)) + for i, p := range peers { + peersStr[i] = peer.IDB58Encode(p) } - pstore.SetPeers(strPeers) - - logger.Debug("creating OpLog") - cons := libp2praft.NewOpLog(state, op) - - raftCfg := cfg.RaftConfig - if raftCfg == nil { - raftCfg = hashiraft.DefaultConfig() - raftCfg.EnableSingleNode = raftSingleMode - } - raftCfg.LogOutput = ioutil.Discard - raftCfg.Logger = raftStdLogger + pstore.SetPeers(peersStr) logger.Debug("creating file snapshot store") - snapshots, err := hashiraft.NewFileSnapshotStoreWithLogger(cfg.ConsensusDataFolder, maxSnapshots, raftStdLogger) + snapshots, err := hashiraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, raftStdLogger) if err != nil { logger.Error("creating file snapshot store: ", err) - return nil, nil, nil, err + return nil, err } logger.Debug("creating BoltDB log store") - logStore, err := raftboltdb.NewBoltStore(filepath.Join(cfg.ConsensusDataFolder, "raft.db")) + logStore, err := raftboltdb.NewBoltStore(filepath.Join(dataFolder, "raft.db")) if err != nil { logger.Error("creating bolt store: ", err) - return nil, nil, nil, err + return nil, err } logger.Debug("creating Raft") - r, err := hashiraft.NewRaft(raftCfg, cons.FSM(), logStore, logStore, snapshots, pstore, transport) + r, err := hashiraft.NewRaft(defaultRaftConfig(), fsm, logStore, logStore, snapshots, pstore, transport) if err != nil { logger.Error("initializing raft: ", err) - return nil, nil, nil, err + return nil, err } - return cons, libp2praft.NewActor(r), &libp2pRaftWrap{ + return &Raft{ raft: r, transport: transport, snapshotStore: snapshots, @@ -89,3 +104,123 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) boltdb: logStore, }, nil } + +// WaitForLeader holds until Raft says we have a leader +func (r *Raft) WaitForLeader(ctx context.Context) { + // Using Raft observers panics on non-64 architectures. + // This is a work around + if sixtyfour { + r.waitForLeader(ctx) + } else { + r.waitForLeaderLegacy(ctx) + } +} + +func (r *Raft) waitForLeader(ctx context.Context) { + obsCh := make(chan hashiraft.Observation) + filter := func(o *hashiraft.Observation) bool { + switch o.Data.(type) { + case hashiraft.LeaderObservation: + return true + default: + return false + } + } + observer := hashiraft.NewObserver(obsCh, true, filter) + r.raft.RegisterObserver(observer) + defer r.raft.DeregisterObserver(observer) + select { + case obs := <-obsCh: + leaderObs := obs.Data.(hashiraft.LeaderObservation) + logger.Infof("Raft Leader elected: %s", leaderObs.Leader) + + case <-ctx.Done(): + return + } +} + +func (r *Raft) waitForLeaderLegacy(ctx context.Context) { + for { + leader := r.raft.Leader() + if leader != "" { + logger.Infof("Raft Leader elected: %s", leader) + return + } + select { + case <-ctx.Done(): + return + default: + time.Sleep(500 * time.Millisecond) + } + } +} + +// WaitForUpdates holds until Raft has synced to the last index in the log +func (r *Raft) WaitForUpdates(ctx context.Context) { + logger.Debug("Raft state is catching up") + for { + select { + case <-ctx.Done(): + return + default: + lai := r.raft.AppliedIndex() + li := r.raft.LastIndex() + logger.Debugf("current Raft index: %d/%d", + lai, li) + if lai == li { + return + } + time.Sleep(500 * time.Millisecond) + } + + } +} + +// Snapshot tells Raft to take a snapshot. +func (r *Raft) Snapshot() error { + future := r.raft.Snapshot() + err := future.Error() + if err != nil && !strings.Contains(err.Error(), "Nothing new to snapshot") { + return errors.New("could not take snapshot: " + err.Error()) + } + return nil +} + +// Shutdown shutdown Raft and closes the BoltDB. +func (r *Raft) Shutdown() error { + future := r.raft.Shutdown() + err := future.Error() + errMsgs := "" + if err != nil { + errMsgs += "could not shutdown raft: " + err.Error() + ".\n" + } + + err = r.boltdb.Close() // important! + if err != nil { + errMsgs += "could not close boltdb: " + err.Error() + } + if errMsgs != "" { + return errors.New(errMsgs) + } + return nil +} + +// AddPeer adds a peer to Raft +func (r *Raft) AddPeer(peer string) error { + future := r.raft.AddPeer(peer) + err := future.Error() + return err +} + +// RemovePeer removes a peer from Raft +func (r *Raft) RemovePeer(peer string) error { + future := r.raft.RemovePeer(peer) + err := future.Error() + return err +} + +// Leader returns Raft's leader. It may be an empty string if +// there is no leader or it is unknown. +func (r *Raft) Leader() string { + return r.raft.Leader() +} diff --git a/rpc_api.go b/rpc_api.go index 548722f3..0332796e 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -297,7 +297,6 @@ func (api *RPCAPI) ConsensusLogUnpin(in *CidArg, out *struct{}) error { /* Peer Manager methods - */ // PeerManagerAddPeer runs peerManager.addPeer().