go lint, go vet, put the Consensus component behind interface.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-03-14 16:37:29 +01:00
parent 37fab27ba6
commit e2efef8469
13 changed files with 82 additions and 39 deletions

View File

@ -1,3 +1,5 @@
// Package restapi implements an IPFS Cluster API component. It provides
// a REST-ish API to interact with Cluster over HTTP.
package restapi package restapi
import ( import (

View File

@ -36,7 +36,7 @@ type Cluster struct {
rpcClient *rpc.Client rpcClient *rpc.Client
peerManager *peerManager peerManager *peerManager
consensus *raft.Consensus consensus Consensus
api API api API
ipfs IPFSConnector ipfs IPFSConnector
state state.State state state.State

View File

@ -1,3 +1,5 @@
// Package raft implements a Consensus component for IPFS Cluster which uses
// Raft (go-libp2p-raft).
package raft package raft
import ( import (

View File

@ -244,7 +244,7 @@ func run(c *cli.Context) error {
api, err := restapi.NewRESTAPI(cfg.APIAddr) api, err := restapi.NewRESTAPI(cfg.APIAddr)
checkErr("creating REST API component", err) checkErr("creating REST API component", err)
proxy, err := ipfshttp.NewIPFSHTTPConnector( proxy, err := ipfshttp.NewConnector(
cfg.IPFSNodeAddr, cfg.IPFSProxyAddr) cfg.IPFSNodeAddr, cfg.IPFSProxyAddr)
checkErr("creating IPFS Connector component", err) checkErr("creating IPFS Connector component", err)

View File

@ -9,12 +9,14 @@
package ipfscluster package ipfscluster
import ( import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc" rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol" protocol "github.com/libp2p/go-libp2p-protocol"
ma "github.com/multiformats/go-multiaddr"
"github.com/ipfs/ipfs-cluster/api"
) )
// RPCProtocol is used to send libp2p messages between cluster peers // RPCProtocol is used to send libp2p messages between cluster peers
@ -29,6 +31,31 @@ type Component interface {
Shutdown() error Shutdown() error
} }
// Consensus is a component which keeps a shared state in
// IPFS Cluster and triggers actions on updates to that state.
// Currently, Consensus needs to be able to elect/provide a
// Cluster Leader and the implementation is very tight to
// the Cluster main component.
type Consensus interface {
Component
// Returns a channel to signal that the consensus
// algoritm is ready
Ready() <-chan struct{}
// Logs a pin operation
LogPin(c api.Pin) error
// Logs an unpin operation
LogUnpin(c api.Pin) error
LogAddPeer(addr ma.Multiaddr) error
LogRmPeer(p peer.ID) error
State() (state.State, error)
// Provide a node which is responsible to perform
// specific tasks which must only run in 1 cluster peer
Leader() (peer.ID, error)
// Only returns when the consensus state has all log
// updates applied to it
WaitForSync() error
}
// API is a component which offers an API for Cluster. This is // API is a component which offers an API for Cluster. This is
// a base component. // a base component.
type API interface { type API interface {

View File

@ -86,7 +86,7 @@ func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, state.S
api, err := restapi.NewRESTAPI(cfg.APIAddr) api, err := restapi.NewRESTAPI(cfg.APIAddr)
checkErr(t, err) checkErr(t, err)
ipfs, err := ipfshttp.NewIPFSHTTPConnector( ipfs, err := ipfshttp.NewConnector(
cfg.IPFSNodeAddr, cfg.IPFSNodeAddr,
cfg.IPFSProxyAddr) cfg.IPFSProxyAddr)
checkErr(t, err) checkErr(t, err)

View File

@ -1,3 +1,5 @@
// Package ipfshttp implements an IPFS Cluster IPFSConnector component. It
// uses the IPFS HTTP API to communicate to IPFS.
package ipfshttp package ipfshttp
import ( import (
@ -36,7 +38,7 @@ var (
IPFSProxyServerIdleTimeout = 60 * time.Second IPFSProxyServerIdleTimeout = 60 * time.Second
) )
// IPFSHTTPConnector implements the IPFSConnector interface // Connector implements the IPFSConnector interface
// and provides a component which does two tasks: // and provides a component which does two tasks:
// //
// On one side, it proxies HTTP requests to the configured IPFS // On one side, it proxies HTTP requests to the configured IPFS
@ -45,7 +47,7 @@ var (
// //
// On the other side, it is used to perform on-demand requests // On the other side, it is used to perform on-demand requests
// against the configured IPFS daemom (such as a pin request). // against the configured IPFS daemom (such as a pin request).
type IPFSHTTPConnector struct { type Connector struct {
ctx context.Context ctx context.Context
cancel func() cancel func()
@ -90,8 +92,8 @@ type ipfsIDResp struct {
Addresses []string Addresses []string
} }
// NewIPFSHTTPConnector creates the component and leaves it ready to be started // NewConnector creates the component and leaves it ready to be started
func NewIPFSHTTPConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*IPFSHTTPConnector, error) { func NewConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*Connector, error) {
destHost, err := ipfsNodeMAddr.ValueForProtocol(ma.P_IP4) destHost, err := ipfsNodeMAddr.ValueForProtocol(ma.P_IP4)
if err != nil { if err != nil {
return nil, err return nil, err
@ -135,7 +137,7 @@ func NewIPFSHTTPConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiadd
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ipfs := &IPFSHTTPConnector{ ipfs := &Connector{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
nodeAddr: ipfsNodeMAddr, nodeAddr: ipfsNodeMAddr,
@ -161,7 +163,7 @@ func NewIPFSHTTPConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiadd
} }
// set cancellable context. launch proxy // set cancellable context. launch proxy
func (ipfs *IPFSHTTPConnector) run() { func (ipfs *Connector) run() {
// This launches the proxy // This launches the proxy
ipfs.wg.Add(1) ipfs.wg.Add(1)
go func() { go func() {
@ -180,7 +182,7 @@ func (ipfs *IPFSHTTPConnector) run() {
// This will run a custom handler if we have one for a URL.Path, or // This will run a custom handler if we have one for a URL.Path, or
// otherwise just proxy the requests. // otherwise just proxy the requests.
func (ipfs *IPFSHTTPConnector) handle(w http.ResponseWriter, r *http.Request) { func (ipfs *Connector) handle(w http.ResponseWriter, r *http.Request) {
if customHandler, ok := ipfs.handlers[r.URL.Path]; ok { if customHandler, ok := ipfs.handlers[r.URL.Path]; ok {
customHandler(w, r) customHandler(w, r)
} else { } else {
@ -190,7 +192,7 @@ func (ipfs *IPFSHTTPConnector) handle(w http.ResponseWriter, r *http.Request) {
} }
// defaultHandler just proxies the requests // defaultHandler just proxies the requests
func (ipfs *IPFSHTTPConnector) defaultHandler(w http.ResponseWriter, r *http.Request) { func (ipfs *Connector) defaultHandler(w http.ResponseWriter, r *http.Request) {
newURL := *r.URL newURL := *r.URL
newURL.Host = fmt.Sprintf("%s:%d", ipfs.destHost, ipfs.destPort) newURL.Host = fmt.Sprintf("%s:%d", ipfs.destHost, ipfs.destPort)
newURL.Scheme = "http" newURL.Scheme = "http"
@ -228,7 +230,7 @@ func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
return return
} }
func (ipfs *IPFSHTTPConnector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) { func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
argA := r.URL.Query()["arg"] argA := r.URL.Query()["arg"]
if len(argA) == 0 { if len(argA) == 0 {
ipfsErrorResponder(w, "Error: bad argument") ipfsErrorResponder(w, "Error: bad argument")
@ -263,15 +265,15 @@ func (ipfs *IPFSHTTPConnector) pinOpHandler(op string, w http.ResponseWriter, r
return return
} }
func (ipfs *IPFSHTTPConnector) pinHandler(w http.ResponseWriter, r *http.Request) { func (ipfs *Connector) pinHandler(w http.ResponseWriter, r *http.Request) {
ipfs.pinOpHandler("Pin", w, r) ipfs.pinOpHandler("Pin", w, r)
} }
func (ipfs *IPFSHTTPConnector) unpinHandler(w http.ResponseWriter, r *http.Request) { func (ipfs *Connector) unpinHandler(w http.ResponseWriter, r *http.Request) {
ipfs.pinOpHandler("Unpin", w, r) ipfs.pinOpHandler("Unpin", w, r)
} }
func (ipfs *IPFSHTTPConnector) pinLsHandler(w http.ResponseWriter, r *http.Request) { func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
pinLs := ipfsPinLsResp{} pinLs := ipfsPinLsResp{}
pinLs.Keys = make(map[string]ipfsPinType) pinLs.Keys = make(map[string]ipfsPinType)
@ -320,14 +322,14 @@ func (ipfs *IPFSHTTPConnector) pinLsHandler(w http.ResponseWriter, r *http.Reque
// SetClient makes the component ready to perform RPC // SetClient makes the component ready to perform RPC
// requests. // requests.
func (ipfs *IPFSHTTPConnector) SetClient(c *rpc.Client) { func (ipfs *Connector) SetClient(c *rpc.Client) {
ipfs.rpcClient = c ipfs.rpcClient = c
ipfs.rpcReady <- struct{}{} ipfs.rpcReady <- struct{}{}
} }
// Shutdown stops any listeners and stops the component from taking // Shutdown stops any listeners and stops the component from taking
// any requests. // any requests.
func (ipfs *IPFSHTTPConnector) Shutdown() error { func (ipfs *Connector) Shutdown() error {
ipfs.shutdownLock.Lock() ipfs.shutdownLock.Lock()
defer ipfs.shutdownLock.Unlock() defer ipfs.shutdownLock.Unlock()
@ -353,7 +355,7 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error {
// If the request fails, or the parsing fails, it // If the request fails, or the parsing fails, it
// returns an error and an empty IPFSID which also // returns an error and an empty IPFSID which also
// contains the error message. // contains the error message.
func (ipfs *IPFSHTTPConnector) ID() (api.IPFSID, error) { func (ipfs *Connector) ID() (api.IPFSID, error) {
id := api.IPFSID{} id := api.IPFSID{}
body, err := ipfs.get("id") body, err := ipfs.get("id")
if err != nil { if err != nil {
@ -391,7 +393,7 @@ func (ipfs *IPFSHTTPConnector) ID() (api.IPFSID, error) {
// Pin performs a pin request against the configured IPFS // Pin performs a pin request against the configured IPFS
// daemon. // daemon.
func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error { func (ipfs *Connector) Pin(hash *cid.Cid) error {
pinStatus, err := ipfs.PinLsCid(hash) pinStatus, err := ipfs.PinLsCid(hash)
if err != nil { if err != nil {
return err return err
@ -410,7 +412,7 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
// Unpin performs an unpin request against the configured IPFS // Unpin performs an unpin request against the configured IPFS
// daemon. // daemon.
func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error { func (ipfs *Connector) Unpin(hash *cid.Cid) error {
pinStatus, err := ipfs.PinLsCid(hash) pinStatus, err := ipfs.PinLsCid(hash)
if err != nil { if err != nil {
return err return err
@ -430,7 +432,7 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
// PinLs performs a "pin ls --type typeFilter" request against the configured // PinLs performs a "pin ls --type typeFilter" request against the configured
// IPFS daemon and returns a map of cid strings and their status. // IPFS daemon and returns a map of cid strings and their status.
func (ipfs *IPFSHTTPConnector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) { func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) {
body, err := ipfs.get("pin/ls?type=" + typeFilter) body, err := ipfs.get("pin/ls?type=" + typeFilter)
// Some error talking to the daemon // Some error talking to the daemon
@ -455,7 +457,7 @@ func (ipfs *IPFSHTTPConnector) PinLs(typeFilter string) (map[string]api.IPFSPinS
// PinLsCid performs a "pin ls <hash> "request and returns IPFSPinStatus for // PinLsCid performs a "pin ls <hash> "request and returns IPFSPinStatus for
// that hash. // that hash.
func (ipfs *IPFSHTTPConnector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) { func (ipfs *Connector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) {
lsPath := fmt.Sprintf("pin/ls?arg=%s", hash) lsPath := fmt.Sprintf("pin/ls?arg=%s", hash)
body, err := ipfs.get(lsPath) body, err := ipfs.get(lsPath)
@ -486,7 +488,7 @@ func (ipfs *IPFSHTTPConnector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error
// get performs the heavy lifting of a get request against // get performs the heavy lifting of a get request against
// the IPFS daemon. // the IPFS daemon.
func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) { func (ipfs *Connector) get(path string) ([]byte, error) {
logger.Debugf("getting %s", path) logger.Debugf("getting %s", path)
url := fmt.Sprintf("%s/%s", url := fmt.Sprintf("%s/%s",
ipfs.apiURL(), ipfs.apiURL(),
@ -524,7 +526,7 @@ func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) {
// apiURL is a short-hand for building the url of the IPFS // apiURL is a short-hand for building the url of the IPFS
// daemon API. // daemon API.
func (ipfs *IPFSHTTPConnector) apiURL() string { func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s:%d/api/v0", return fmt.Sprintf("http://%s:%d/api/v0",
ipfs.destHost, ipfs.destHost,
ipfs.destPort) ipfs.destPort)

View File

@ -14,13 +14,13 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
func testIPFSConnector(t *testing.T) (*IPFSHTTPConnector, *test.IpfsMock) { func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
mock := test.NewIpfsMock() mock := test.NewIpfsMock()
nodeMAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", nodeMAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d",
mock.Addr, mock.Port)) mock.Addr, mock.Port))
proxyMAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/10001") proxyMAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/10001")
ipfs, err := NewIPFSHTTPConnector(nodeMAddr, proxyMAddr) ipfs, err := NewConnector(nodeMAddr, proxyMAddr)
if err != nil { if err != nil {
t.Fatal("creating an IPFSConnector should work: ", err) t.Fatal("creating an IPFSConnector should work: ", err)
} }
@ -28,7 +28,7 @@ func testIPFSConnector(t *testing.T) (*IPFSHTTPConnector, *test.IpfsMock) {
return ipfs, mock return ipfs, mock
} }
func TestNewIPFSHTTPConnector(t *testing.T) { func TestNewConnector(t *testing.T) {
ipfs, mock := testIPFSConnector(t) ipfs, mock := testIPFSConnector(t)
defer mock.Close() defer mock.Close()
defer ipfs.Shutdown() defer ipfs.Shutdown()

View File

@ -1,3 +1,6 @@
// Package basic implements a basic PeerMonitor component for IPFS Cluster. This
// component is in charge of logging metrics and triggering alerts when a peer
// goes down.
package basic package basic
import ( import (

View File

@ -1,3 +1,5 @@
// Package maptracker implements a PinTracker component for IPFS Cluster. It
// uses a map to keep track of the state of tracked pins.
package maptracker package maptracker
import ( import (

View File

@ -142,9 +142,9 @@ func TestStatusAll(t *testing.T) {
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
// LocalPin // LocalPin
c := api.Pin{h1, []peer.ID{}, -1} c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1}
mpt.Track(c) mpt.Track(c)
c = api.Pin{h2, []peer.ID{test.TestPeerID2}, 1} c = api.Pin{Cid: h2, Allocations: []peer.ID{test.TestPeerID2}, ReplicationFactor: 1}
mpt.Track(c) mpt.Track(c)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -172,9 +172,9 @@ func TestSyncAndRecover(t *testing.T) {
h1, _ := cid.Decode(test.TestCid1) h1, _ := cid.Decode(test.TestCid1)
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
c := api.Pin{h1, []peer.ID{}, -1} c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1}
mpt.Track(c) mpt.Track(c)
c = api.Pin{h2, []peer.ID{}, -1} c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1}
mpt.Track(c) mpt.Track(c)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -238,9 +238,9 @@ func TestSyncAll(t *testing.T) {
h1, _ := cid.Decode(test.TestCid1) h1, _ := cid.Decode(test.TestCid1)
h2, _ := cid.Decode(test.TestCid2) h2, _ := cid.Decode(test.TestCid2)
c := api.Pin{h1, []peer.ID{}, -1} c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1}
mpt.Track(c) mpt.Track(c)
c = api.Pin{h2, []peer.ID{}, -1} c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1}
mpt.Track(c) mpt.Track(c)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)

View File

@ -1,14 +1,16 @@
// Package state holds the interface that any state implementation for
// IPFS Cluster must satisfy.
package state package state
// State represents the shared state of the cluster and it // State represents the shared state of the cluster and it
import ( import (
cid "github.com/ipfs/go-cid"
"io" "io"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
) )
// is used by the Consensus component to keep track of // State is used by the Consensus component to keep track of
// objects which objects are pinned. This component should be thread safe. // objects which objects are pinned. This component should be thread safe.
type State interface { type State interface {
// Add adds a pin to the State // Add adds a pin to the State

View File

@ -1,3 +1,5 @@
// Package mapstate implements the State interface for IPFS Cluster by using
// a map to keep track of the consensus-shared state.
package mapstate package mapstate
import ( import (
@ -90,6 +92,8 @@ func (st *MapState) Snapshot(w io.Writer) error {
return enc.Encode(st) return enc.Encode(st)
} }
// Restore takes a reader and restores a snapshot. It should migrate
// the format if it is not compatible with the current version.
func (st *MapState) Restore(r io.Reader) error { func (st *MapState) Restore(r io.Reader) error {
snap, err := ioutil.ReadAll(r) snap, err := ioutil.ReadAll(r)
if err != nil { if err != nil {
@ -104,7 +108,6 @@ func (st *MapState) Restore(r io.Reader) error {
// we are good // we are good
err := json.Unmarshal(snap, st) err := json.Unmarshal(snap, st)
return err return err
} else { }
return st.migrateFrom(vonly.Version, snap) return st.migrateFrom(vonly.Version, snap)
} }
}