Issue #18: Move Consensus and PeerMonitor to its own submodules

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-03-10 17:24:25 +01:00
parent e99b7b4f79
commit c2faf48177
17 changed files with 159 additions and 107 deletions

View File

@ -10,6 +10,8 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
@ -34,10 +36,10 @@ type Cluster struct {
rpcClient *rpc.Client
peerManager *peerManager
consensus *Consensus
consensus *raft.Consensus
api API
ipfs IPFSConnector
state State
state state.State
tracker PinTracker
monitor PeerMonitor
allocator PinAllocator
@ -62,7 +64,7 @@ func NewCluster(
cfg *Config,
api API,
ipfs IPFSConnector,
state State,
st state.State,
tracker PinTracker,
monitor PeerMonitor,
allocator PinAllocator,
@ -88,7 +90,7 @@ func NewCluster(
host: host,
api: api,
ipfs: ipfs,
state: state,
state: st,
tracker: tracker,
monitor: monitor,
allocator: allocator,
@ -155,7 +157,7 @@ func (c *Cluster) setupConsensus() error {
startPeers = peersFromMultiaddrs(c.config.Bootstrap)
}
consensus, err := NewConsensus(
consensus, err := raft.NewConsensus(
append(startPeers, c.id),
c.host,
c.config.ConsensusDataFolder,

View File

@ -2,11 +2,13 @@ package ipfscluster
import (
"errors"
"os"
"testing"
"github.com/ipfs/ipfs-cluster/allocator/numpinalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
@ -80,7 +82,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
cfg := testingConfig()
st := mapstate.NewMapState()
tracker := NewMapPinTracker(cfg)
mon := NewStdPeerMonitor(cfg)
mon := basic.NewStdPeerMonitor(2)
alloc := numpinalloc.NewAllocator()
inf := numpin.NewInformer()
@ -100,6 +102,10 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
return cl, api, ipfs, st, tracker
}
func cleanRaft() {
os.RemoveAll(".raftFolderFromTests")
}
func testClusterShutdown(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
err := cl.Shutdown()

View File

@ -1,4 +1,4 @@
package ipfscluster
package raft
import (
"context"
@ -7,8 +7,10 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
consensus "github.com/libp2p/go-libp2p-consensus"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
@ -16,6 +18,8 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
var logger = logging.Logger("consensus")
// LeaderTimeout specifies how long to wait before failing an operation
// because there is no leader
var LeaderTimeout = 15 * time.Second
@ -49,7 +53,7 @@ type Consensus struct {
// NewConsensus builds a new ClusterConsensus component. The state
// is used to initialize the Consensus system, so any information in it
// is discarded.
func NewConsensus(clusterPeers []peer.ID, host host.Host, dataFolder string, state State) (*Consensus, error) {
func NewConsensus(clusterPeers []peer.ID, host host.Host, dataFolder string, state state.State) (*Consensus, error) {
op := &LogOp{
ctx: context.Background(),
}
@ -296,7 +300,11 @@ func (cc *Consensus) LogAddPeer(addr ma.Multiaddr) error {
}
// It seems WE are the leader.
pid, _, err := multiaddrSplit(addr)
pidStr, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
return err
}
pid, err := peer.IDB58Decode(pidStr)
if err != nil {
return err
}
@ -375,12 +383,12 @@ func (cc *Consensus) LogRmPeer(pid peer.ID) error {
// if no State has been agreed upon or the state is not
// consistent. The returned State is the last agreed-upon
// State known by this node.
func (cc *Consensus) State() (State, error) {
func (cc *Consensus) State() (state.State, error) {
st, err := cc.consensus.GetLogHead()
if err != nil {
return nil, err
}
state, ok := st.(State)
state, ok := st.(state.State)
if !ok {
return nil, errors.New("wrong state type")
}
@ -397,6 +405,6 @@ func (cc *Consensus) Leader() (peer.ID, error) {
// Rollback replaces the current agreed-upon
// state with the state provided. Only the consensus leader
// can perform this operation.
func (cc *Consensus) Rollback(state State) error {
func (cc *Consensus) Rollback(state state.State) error {
return cc.consensus.Rollback(state)
}

View File

@ -1,4 +1,4 @@
package ipfscluster
package raft
import (
"context"
@ -11,23 +11,37 @@ import (
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
)
func cleanRaft() {
os.RemoveAll(testingConfig().ConsensusDataFolder)
os.RemoveAll(".raftFolderFromTests")
}
func makeTestingHost(t *testing.T) host.Host {
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
pid, _ := peer.IDFromPublicKey(pub)
maddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/10000")
ps := peerstore.NewPeerstore()
ps.AddPubKey(pid, pub)
ps.AddPrivKey(pid, priv)
n, _ := swarm.NewNetwork(
context.Background(),
[]ma.Multiaddr{maddr},
pid, ps, nil)
return basichost.New(n)
}
func testingConsensus(t *testing.T) *Consensus {
//logging.SetDebugLogging()
cfg := testingConfig()
ctx := context.Background()
h, err := makeHost(ctx, cfg)
if err != nil {
t.Fatal("cannot create host:", err)
}
h := makeTestingHost(t)
st := mapstate.NewMapState()
cc, err := NewConsensus([]peer.ID{cfg.ID}, h, cfg.ConsensusDataFolder, st)
cc, err := NewConsensus([]peer.ID{h.ID()}, h, ".raftFolderFromTests", st)
if err != nil {
t.Fatal("cannot create Consensus:", err)
}
@ -90,8 +104,7 @@ func TestConsensusUnpin(t *testing.T) {
func TestConsensusLeader(t *testing.T) {
cc := testingConsensus(t)
cfg := testingConfig()
pID := cfg.ID
pID := cc.host.ID()
defer cleanRaft()
defer cc.Shutdown()
l, err := cc.Leader()

View File

@ -1,10 +1,11 @@
package ipfscluster
package raft
import (
"context"
"errors"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
consensus "github.com/libp2p/go-libp2p-consensus"
@ -36,7 +37,7 @@ type LogOp struct {
// ApplyTo applies the operation to the State
func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
state, ok := cstate.(State)
state, ok := cstate.(state.State)
var err error
if !ok {
// Should never be here

View File

@ -1,4 +1,4 @@
package ipfscluster
package raft
import (
"context"

View File

@ -1,15 +1,17 @@
package ipfscluster
package raft
import (
"context"
"errors"
"io/ioutil"
"log"
"path/filepath"
"strings"
"time"
hashiraft "github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
libp2praft "github.com/libp2p/go-libp2p-raft"
@ -26,6 +28,31 @@ var RaftMaxSnapshots = 5
// is this running 64 bits arch? https://groups.google.com/forum/#!topic/golang-nuts/vAckmhUMAdQ
const sixtyfour = uint64(^uint(0)) == ^uint64(0)
type logForwarder struct{}
var raftStdLogger = log.New(&logForwarder{}, "", 0)
var raftLogger = logging.Logger("raft")
// Write forwards to our go-log logger.
// According to https://golang.org/pkg/log/#Logger.Output
// it is called per line.
func (fw *logForwarder) Write(p []byte) (n int, err error) {
t := strings.TrimSuffix(string(p), "\n")
switch {
case strings.Contains(t, "[DEBUG]"):
raftLogger.Debug(strings.TrimPrefix(t, "[DEBUG] raft: "))
case strings.Contains(t, "[WARN]"):
raftLogger.Warning(strings.TrimPrefix(t, "[WARN] raft: "))
case strings.Contains(t, "[ERR]"):
raftLogger.Error(strings.TrimPrefix(t, "[ERR] raft: "))
case strings.Contains(t, "[INFO]"):
raftLogger.Info(strings.TrimPrefix(t, "[INFO] raft: "))
default:
raftLogger.Debug(t)
}
return len(p), nil
}
// Raft performs all Raft-specific operations which are needed by Cluster but
// are not fulfilled by the consensus interface. It should contain most of the
// Raft-related stuff so it can be easily replaced in the future, if need be.

View File

@ -4,9 +4,10 @@ package ipfscluster
func init() {
l := "DEBUG"
SetFacilityLogLevel("cluster", l)
SetFacilityLogLevel("restapi", l)
SetFacilityLogLevel("ipfshttp", l)
for _, f := range facilities {
SetFacilityLogLevel(f, l)
}
//SetFacilityLogLevel("raft", l)
//SetFacilityLogLevel("p2p-gorpc", l)
//SetFacilityLogLevel("swarm2", l)

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/ipfs-cluster/api/restapi"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/ipfs-connector/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/state/mapstate"
)
@ -248,7 +249,7 @@ func run(c *cli.Context) error {
state := mapstate.NewMapState()
tracker := ipfscluster.NewMapPinTracker(cfg)
mon := ipfscluster.NewStdPeerMonitor(cfg)
mon := basic.NewStdPeerMonitor(cfg.MonitoringIntervalSeconds)
informer := numpin.NewInformer()
alloc := numpinalloc.NewAllocator()
@ -286,11 +287,20 @@ func setupLogging(lvl string) {
func setupDebug() {
l := "DEBUG"
ipfscluster.SetFacilityLogLevel("cluster", l)
ipfscluster.SetFacilityLogLevel("ipfshttp", l)
ipfscluster.SetFacilityLogLevel("restapi", l)
ipfscluster.SetFacilityLogLevel("raft", l)
ipfscluster.SetFacilityLogLevel("p2p-gorpc", l)
var facilities = []string{
"cluster",
"restapi",
"ipfshttp",
"monitor",
"consensus",
"raft",
"p2p-gorpc",
}
for _, f := range facilities {
ipfscluster.SetFacilityLogLevel(f, l)
}
//SetFacilityLogLevel("swarm2", l)
//SetFacilityLogLevel("libp2p-raft", l)
}

View File

@ -9,8 +9,6 @@
package ipfscluster
import (
"io"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
@ -56,26 +54,6 @@ type Peered interface {
//SetPeers(peers []peer.ID)
}
// State represents the shared state of the cluster and it
// is used by the Consensus component to keep track of
// objects which objects are pinned. This component should be thread safe.
type State interface {
// Add adds a pin to the State
Add(api.Pin) error
// Rm removes a pin from the State
Rm(*cid.Cid) error
// List lists all the pins in the state
List() []api.Pin
// Has returns true if the state is holding information for a Cid
Has(*cid.Cid) bool
// Get returns the information attacthed to this pin
Get(*cid.Cid) api.Pin
// Snapshot writes a snapshot of the state to a writer
Snapshot(w io.Writer) error
// Restore restores a snapshot from a reader
Restore(r io.Reader) error
}
// PinTracker represents a component which tracks the status of
// the pins in this cluster and ensures they are in sync with the
// IPFS daemon. This component should be thread safe.

View File

@ -15,6 +15,8 @@ import (
"github.com/ipfs/ipfs-cluster/api/restapi"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/ipfs-connector/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
@ -57,7 +59,7 @@ func randomBytes() []byte {
return bs
}
func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, State, PinTracker, PeerMonitor, PinAllocator, Informer, *test.IpfsMock) {
func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, state.State, PinTracker, PeerMonitor, PinAllocator, Informer, *test.IpfsMock) {
mock := test.NewIpfsMock()
clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i))
apiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", apiPort+i))
@ -89,7 +91,7 @@ func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, State,
checkErr(t, err)
state := mapstate.NewMapState()
tracker := NewMapPinTracker(cfg)
mon := NewStdPeerMonitor(cfg)
mon := basic.NewStdPeerMonitor(cfg.MonitoringIntervalSeconds)
alloc := numpinalloc.NewAllocator()
numpin.MetricTTL = 1 // second
inf := numpin.NewInformer()
@ -97,7 +99,7 @@ func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, State,
return cfg, api, ipfs, state, tracker, mon, alloc, inf, mock
}
func createCluster(t *testing.T, cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
func createCluster(t *testing.T, cfg *Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
cl, err := NewCluster(cfg, api, ipfs, state, tracker, mon, alloc, inf)
checkErr(t, err)
<-cl.Ready()
@ -115,7 +117,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
cfgs := make([]*Config, nClusters, nClusters)
apis := make([]API, nClusters, nClusters)
ipfss := make([]IPFSConnector, nClusters, nClusters)
states := make([]State, nClusters, nClusters)
states := make([]state.State, nClusters, nClusters)
trackers := make([]PinTracker, nClusters, nClusters)
mons := make([]PeerMonitor, nClusters, nClusters)
allocs := make([]PinAllocator, nClusters, nClusters)

View File

@ -1,15 +1,17 @@
package ipfscluster
import (
"log"
"strings"
logging "github.com/ipfs/go-log"
)
import logging "github.com/ipfs/go-log"
var logger = logging.Logger("cluster")
var raftStdLogger = log.New(&logForwarder{}, "", 0)
var raftLogger = logging.Logger("raft")
var facilities = []string{
"cluster",
"restapi",
"ipfshttp",
"monitor",
"consensus",
"raft",
}
// SetFacilityLogLevel sets the log level for a given module
func SetFacilityLogLevel(f, l string) {
@ -23,26 +25,3 @@ func SetFacilityLogLevel(f, l string) {
*/
logging.SetLogLevel(f, l)
}
// implements the writer interface
type logForwarder struct{}
// Write forwards to our go-log logger.
// According to https://golang.org/pkg/log/#Logger.Output
// it is called per line.
func (fw *logForwarder) Write(p []byte) (n int, err error) {
t := strings.TrimSuffix(string(p), "\n")
switch {
case strings.Contains(t, "[DEBUG]"):
raftLogger.Debug(strings.TrimPrefix(t, "[DEBUG] raft: "))
case strings.Contains(t, "[WARN]"):
raftLogger.Warning(strings.TrimPrefix(t, "[WARN] raft: "))
case strings.Contains(t, "[ERR]"):
raftLogger.Error(strings.TrimPrefix(t, "[ERR] raft: "))
case strings.Contains(t, "[INFO]"):
raftLogger.Info(strings.TrimPrefix(t, "[INFO] raft: "))
default:
raftLogger.Debug(t)
}
return len(p), nil
}

View File

@ -1,4 +1,4 @@
package ipfscluster
package basic
import (
"context"
@ -7,11 +7,14 @@ import (
"time"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
)
var logger = logging.Logger("monitor")
// AlertChannelCap specifies how much buffer the alerts channel has.
var AlertChannelCap = 256
@ -99,7 +102,7 @@ type StdPeerMonitor struct {
// (how many metrics to keep for each peer and type of metric) and the
// monitoringInterval (interval between the checks that produce alerts)
// as parameters
func NewStdPeerMonitor(cfg *Config) *StdPeerMonitor {
func NewStdPeerMonitor(monIntervalSecs int) *StdPeerMonitor {
if WindowCap <= 0 {
panic("windowCap too small")
}
@ -115,7 +118,7 @@ func NewStdPeerMonitor(cfg *Config) *StdPeerMonitor {
windowCap: WindowCap,
alerts: make(chan api.Alert, AlertChannelCap),
monitoringInterval: cfg.MonitoringIntervalSeconds,
monitoringInterval: monIntervalSecs,
}
go mon.run()

View File

@ -1,4 +1,4 @@
package ipfscluster
package basic
import (
"fmt"
@ -14,9 +14,8 @@ import (
var metricCounter = 0
func testPeerMonitor(t *testing.T) *StdPeerMonitor {
cfg := testingConfig()
mock := test.NewMockRPCClient(t)
mon := NewStdPeerMonitor(cfg)
mon := NewStdPeerMonitor(2)
mon.SetClient(mock)
return mon
}

View File

@ -4,9 +4,10 @@ package ipfscluster
// This is our default logs levels
func init() {
SetFacilityLogLevel("cluster", "INFO")
SetFacilityLogLevel("restapi", "INFO")
SetFacilityLogLevel("ipfshttp", "INFO")
for _, f := range facilities {
SetFacilityLogLevel(f, "INFO")
}
SetFacilityLogLevel("raft", "ERROR")
SetFacilityLogLevel("p2p-gorpc", "ERROR")
//SetFacilityLogLevel("swarm2", l)

View File

@ -4,10 +4,10 @@ package ipfscluster
func init() {
l := "CRITICAL"
SetFacilityLogLevel("cluster", l)
SetFacilityLogLevel("restapi", l)
SetFacilityLogLevel("ipfshttp", l)
SetFacilityLogLevel("raft", l)
for _, f := range facilities {
SetFacilityLogLevel(f, l)
}
SetFacilityLogLevel("p2p-gorpc", l)
SetFacilityLogLevel("swarm2", l)
SetFacilityLogLevel("libp2p-raft", l)

22
state/interface.go Normal file
View File

@ -0,0 +1,22 @@
package state
// State represents the shared state of the cluster and it
import (
cid "github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api"
)
// is used by the Consensus component to keep track of
// objects which objects are pinned. This component should be thread safe.
type State interface {
// Add adds a pin to the State
Add(api.Pin) error
// Rm removes a pin from the State
Rm(*cid.Cid) error
// List lists all the pins in the state
List() []api.Pin
// Has returns true if the state is holding information for a Cid
Has(*cid.Cid) bool
// Get returns the information attacthed to this pin
Get(*cid.Cid) api.Pin
}