diff --git a/api/restapi/restapi.go b/api/restapi/restapi.go index b1319e83..3573796e 100644 --- a/api/restapi/restapi.go +++ b/api/restapi/restapi.go @@ -1,3 +1,5 @@ +// Package restapi implements an IPFS Cluster API component. It provides +// a REST-ish API to interact with Cluster over HTTP. package restapi import ( diff --git a/cluster.go b/cluster.go index fe5d9b19..7b83c16d 100644 --- a/cluster.go +++ b/cluster.go @@ -36,7 +36,7 @@ type Cluster struct { rpcClient *rpc.Client peerManager *peerManager - consensus *raft.Consensus + consensus Consensus api API ipfs IPFSConnector state state.State diff --git a/consensus/raft/consensus.go b/consensus/raft/consensus.go index 80e5ca0c..d566bd05 100644 --- a/consensus/raft/consensus.go +++ b/consensus/raft/consensus.go @@ -1,3 +1,5 @@ +// Package raft implements a Consensus component for IPFS Cluster which uses +// Raft (go-libp2p-raft). package raft import ( diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 8332be59..f4318525 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -244,7 +244,7 @@ func run(c *cli.Context) error { api, err := restapi.NewRESTAPI(cfg.APIAddr) checkErr("creating REST API component", err) - proxy, err := ipfshttp.NewIPFSHTTPConnector( + proxy, err := ipfshttp.NewConnector( cfg.IPFSNodeAddr, cfg.IPFSProxyAddr) checkErr("creating IPFS Connector component", err) diff --git a/ipfscluster.go b/ipfscluster.go index 9c9fc0c9..d8ac3326 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -9,12 +9,14 @@ package ipfscluster import ( + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/state" + rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" - - "github.com/ipfs/ipfs-cluster/api" + ma "github.com/multiformats/go-multiaddr" ) // RPCProtocol is used to send libp2p messages between cluster peers @@ -29,6 +31,31 @@ type Component interface { Shutdown() error } +// Consensus is a component which keeps a shared state in +// IPFS Cluster and triggers actions on updates to that state. +// Currently, Consensus needs to be able to elect/provide a +// Cluster Leader and the implementation is very tight to +// the Cluster main component. +type Consensus interface { + Component + // Returns a channel to signal that the consensus + // algoritm is ready + Ready() <-chan struct{} + // Logs a pin operation + LogPin(c api.Pin) error + // Logs an unpin operation + LogUnpin(c api.Pin) error + LogAddPeer(addr ma.Multiaddr) error + LogRmPeer(p peer.ID) error + State() (state.State, error) + // Provide a node which is responsible to perform + // specific tasks which must only run in 1 cluster peer + Leader() (peer.ID, error) + // Only returns when the consensus state has all log + // updates applied to it + WaitForSync() error +} + // API is a component which offers an API for Cluster. This is // a base component. type API interface { diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 11e17baf..4628d93f 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -86,7 +86,7 @@ func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, state.S api, err := restapi.NewRESTAPI(cfg.APIAddr) checkErr(t, err) - ipfs, err := ipfshttp.NewIPFSHTTPConnector( + ipfs, err := ipfshttp.NewConnector( cfg.IPFSNodeAddr, cfg.IPFSProxyAddr) checkErr(t, err) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 8bc7aff3..05014108 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -1,3 +1,5 @@ +// Package ipfshttp implements an IPFS Cluster IPFSConnector component. It +// uses the IPFS HTTP API to communicate to IPFS. package ipfshttp import ( @@ -36,7 +38,7 @@ var ( IPFSProxyServerIdleTimeout = 60 * time.Second ) -// IPFSHTTPConnector implements the IPFSConnector interface +// Connector implements the IPFSConnector interface // and provides a component which does two tasks: // // On one side, it proxies HTTP requests to the configured IPFS @@ -45,7 +47,7 @@ var ( // // On the other side, it is used to perform on-demand requests // against the configured IPFS daemom (such as a pin request). -type IPFSHTTPConnector struct { +type Connector struct { ctx context.Context cancel func() @@ -90,8 +92,8 @@ type ipfsIDResp struct { Addresses []string } -// NewIPFSHTTPConnector creates the component and leaves it ready to be started -func NewIPFSHTTPConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*IPFSHTTPConnector, error) { +// NewConnector creates the component and leaves it ready to be started +func NewConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*Connector, error) { destHost, err := ipfsNodeMAddr.ValueForProtocol(ma.P_IP4) if err != nil { return nil, err @@ -135,7 +137,7 @@ func NewIPFSHTTPConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiadd ctx, cancel := context.WithCancel(context.Background()) - ipfs := &IPFSHTTPConnector{ + ipfs := &Connector{ ctx: ctx, cancel: cancel, nodeAddr: ipfsNodeMAddr, @@ -161,7 +163,7 @@ func NewIPFSHTTPConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiadd } // set cancellable context. launch proxy -func (ipfs *IPFSHTTPConnector) run() { +func (ipfs *Connector) run() { // This launches the proxy ipfs.wg.Add(1) go func() { @@ -180,7 +182,7 @@ func (ipfs *IPFSHTTPConnector) run() { // This will run a custom handler if we have one for a URL.Path, or // otherwise just proxy the requests. -func (ipfs *IPFSHTTPConnector) handle(w http.ResponseWriter, r *http.Request) { +func (ipfs *Connector) handle(w http.ResponseWriter, r *http.Request) { if customHandler, ok := ipfs.handlers[r.URL.Path]; ok { customHandler(w, r) } else { @@ -190,7 +192,7 @@ func (ipfs *IPFSHTTPConnector) handle(w http.ResponseWriter, r *http.Request) { } // defaultHandler just proxies the requests -func (ipfs *IPFSHTTPConnector) defaultHandler(w http.ResponseWriter, r *http.Request) { +func (ipfs *Connector) defaultHandler(w http.ResponseWriter, r *http.Request) { newURL := *r.URL newURL.Host = fmt.Sprintf("%s:%d", ipfs.destHost, ipfs.destPort) newURL.Scheme = "http" @@ -228,7 +230,7 @@ func ipfsErrorResponder(w http.ResponseWriter, errMsg string) { return } -func (ipfs *IPFSHTTPConnector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) { +func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) { argA := r.URL.Query()["arg"] if len(argA) == 0 { ipfsErrorResponder(w, "Error: bad argument") @@ -263,15 +265,15 @@ func (ipfs *IPFSHTTPConnector) pinOpHandler(op string, w http.ResponseWriter, r return } -func (ipfs *IPFSHTTPConnector) pinHandler(w http.ResponseWriter, r *http.Request) { +func (ipfs *Connector) pinHandler(w http.ResponseWriter, r *http.Request) { ipfs.pinOpHandler("Pin", w, r) } -func (ipfs *IPFSHTTPConnector) unpinHandler(w http.ResponseWriter, r *http.Request) { +func (ipfs *Connector) unpinHandler(w http.ResponseWriter, r *http.Request) { ipfs.pinOpHandler("Unpin", w, r) } -func (ipfs *IPFSHTTPConnector) pinLsHandler(w http.ResponseWriter, r *http.Request) { +func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) { pinLs := ipfsPinLsResp{} pinLs.Keys = make(map[string]ipfsPinType) @@ -320,14 +322,14 @@ func (ipfs *IPFSHTTPConnector) pinLsHandler(w http.ResponseWriter, r *http.Reque // SetClient makes the component ready to perform RPC // requests. -func (ipfs *IPFSHTTPConnector) SetClient(c *rpc.Client) { +func (ipfs *Connector) SetClient(c *rpc.Client) { ipfs.rpcClient = c ipfs.rpcReady <- struct{}{} } // Shutdown stops any listeners and stops the component from taking // any requests. -func (ipfs *IPFSHTTPConnector) Shutdown() error { +func (ipfs *Connector) Shutdown() error { ipfs.shutdownLock.Lock() defer ipfs.shutdownLock.Unlock() @@ -353,7 +355,7 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error { // If the request fails, or the parsing fails, it // returns an error and an empty IPFSID which also // contains the error message. -func (ipfs *IPFSHTTPConnector) ID() (api.IPFSID, error) { +func (ipfs *Connector) ID() (api.IPFSID, error) { id := api.IPFSID{} body, err := ipfs.get("id") if err != nil { @@ -391,7 +393,7 @@ func (ipfs *IPFSHTTPConnector) ID() (api.IPFSID, error) { // Pin performs a pin request against the configured IPFS // daemon. -func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error { +func (ipfs *Connector) Pin(hash *cid.Cid) error { pinStatus, err := ipfs.PinLsCid(hash) if err != nil { return err @@ -410,7 +412,7 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error { // Unpin performs an unpin request against the configured IPFS // daemon. -func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error { +func (ipfs *Connector) Unpin(hash *cid.Cid) error { pinStatus, err := ipfs.PinLsCid(hash) if err != nil { return err @@ -430,7 +432,7 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error { // PinLs performs a "pin ls --type typeFilter" request against the configured // IPFS daemon and returns a map of cid strings and their status. -func (ipfs *IPFSHTTPConnector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) { +func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) { body, err := ipfs.get("pin/ls?type=" + typeFilter) // Some error talking to the daemon @@ -455,7 +457,7 @@ func (ipfs *IPFSHTTPConnector) PinLs(typeFilter string) (map[string]api.IPFSPinS // PinLsCid performs a "pin ls "request and returns IPFSPinStatus for // that hash. -func (ipfs *IPFSHTTPConnector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) { +func (ipfs *Connector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) { lsPath := fmt.Sprintf("pin/ls?arg=%s", hash) body, err := ipfs.get(lsPath) @@ -486,7 +488,7 @@ func (ipfs *IPFSHTTPConnector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error // get performs the heavy lifting of a get request against // the IPFS daemon. -func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) { +func (ipfs *Connector) get(path string) ([]byte, error) { logger.Debugf("getting %s", path) url := fmt.Sprintf("%s/%s", ipfs.apiURL(), @@ -524,7 +526,7 @@ func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) { // apiURL is a short-hand for building the url of the IPFS // daemon API. -func (ipfs *IPFSHTTPConnector) apiURL() string { +func (ipfs *Connector) apiURL() string { return fmt.Sprintf("http://%s:%d/api/v0", ipfs.destHost, ipfs.destPort) diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index 6e23cbe6..857db384 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -14,13 +14,13 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -func testIPFSConnector(t *testing.T) (*IPFSHTTPConnector, *test.IpfsMock) { +func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) { mock := test.NewIpfsMock() nodeMAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", mock.Addr, mock.Port)) proxyMAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/10001") - ipfs, err := NewIPFSHTTPConnector(nodeMAddr, proxyMAddr) + ipfs, err := NewConnector(nodeMAddr, proxyMAddr) if err != nil { t.Fatal("creating an IPFSConnector should work: ", err) } @@ -28,7 +28,7 @@ func testIPFSConnector(t *testing.T) (*IPFSHTTPConnector, *test.IpfsMock) { return ipfs, mock } -func TestNewIPFSHTTPConnector(t *testing.T) { +func TestNewConnector(t *testing.T) { ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown() diff --git a/monitor/basic/peer_monitor.go b/monitor/basic/peer_monitor.go index 85e9883e..f0356a74 100644 --- a/monitor/basic/peer_monitor.go +++ b/monitor/basic/peer_monitor.go @@ -1,3 +1,6 @@ +// Package basic implements a basic PeerMonitor component for IPFS Cluster. This +// component is in charge of logging metrics and triggering alerts when a peer +// goes down. package basic import ( diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go index 91109a30..862fc5b2 100644 --- a/pintracker/maptracker/maptracker.go +++ b/pintracker/maptracker/maptracker.go @@ -1,3 +1,5 @@ +// Package maptracker implements a PinTracker component for IPFS Cluster. It +// uses a map to keep track of the state of tracked pins. package maptracker import ( diff --git a/pintracker/maptracker/maptracker_test.go b/pintracker/maptracker/maptracker_test.go index 98e375f3..86cf3505 100644 --- a/pintracker/maptracker/maptracker_test.go +++ b/pintracker/maptracker/maptracker_test.go @@ -142,9 +142,9 @@ func TestStatusAll(t *testing.T) { h2, _ := cid.Decode(test.TestCid2) // LocalPin - c := api.Pin{h1, []peer.ID{}, -1} + c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1} mpt.Track(c) - c = api.Pin{h2, []peer.ID{test.TestPeerID2}, 1} + c = api.Pin{Cid: h2, Allocations: []peer.ID{test.TestPeerID2}, ReplicationFactor: 1} mpt.Track(c) time.Sleep(100 * time.Millisecond) @@ -172,9 +172,9 @@ func TestSyncAndRecover(t *testing.T) { h1, _ := cid.Decode(test.TestCid1) h2, _ := cid.Decode(test.TestCid2) - c := api.Pin{h1, []peer.ID{}, -1} + c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1} mpt.Track(c) - c = api.Pin{h2, []peer.ID{}, -1} + c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1} mpt.Track(c) time.Sleep(100 * time.Millisecond) @@ -238,9 +238,9 @@ func TestSyncAll(t *testing.T) { h1, _ := cid.Decode(test.TestCid1) h2, _ := cid.Decode(test.TestCid2) - c := api.Pin{h1, []peer.ID{}, -1} + c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1} mpt.Track(c) - c = api.Pin{h2, []peer.ID{}, -1} + c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1} mpt.Track(c) time.Sleep(100 * time.Millisecond) diff --git a/state/interface.go b/state/interface.go index dc904d4d..dbd894fd 100644 --- a/state/interface.go +++ b/state/interface.go @@ -1,14 +1,16 @@ +// Package state holds the interface that any state implementation for +// IPFS Cluster must satisfy. package state // State represents the shared state of the cluster and it import ( - cid "github.com/ipfs/go-cid" "io" + cid "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" ) -// is used by the Consensus component to keep track of +// State is used by the Consensus component to keep track of // objects which objects are pinned. This component should be thread safe. type State interface { // Add adds a pin to the State diff --git a/state/mapstate/map_state.go b/state/mapstate/map_state.go index 7fe46e48..199dcb64 100644 --- a/state/mapstate/map_state.go +++ b/state/mapstate/map_state.go @@ -1,3 +1,5 @@ +// Package mapstate implements the State interface for IPFS Cluster by using +// a map to keep track of the consensus-shared state. package mapstate import ( @@ -90,6 +92,8 @@ func (st *MapState) Snapshot(w io.Writer) error { return enc.Encode(st) } +// Restore takes a reader and restores a snapshot. It should migrate +// the format if it is not compatible with the current version. func (st *MapState) Restore(r io.Reader) error { snap, err := ioutil.ReadAll(r) if err != nil { @@ -104,7 +108,6 @@ func (st *MapState) Restore(r io.Reader) error { // we are good err := json.Unmarshal(snap, st) return err - } else { - return st.migrateFrom(vonly.Version, snap) } + return st.migrateFrom(vonly.Version, snap) }