From 319c97585bb62363e1a78644ac4c4e961a0595f6 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 15 Dec 2016 19:08:46 +0100 Subject: [PATCH] Renames everywhere removing redundant "Cluster" from "ClusterSomething". Start preparing syncs() and status() License: MIT Signed-off-by: Hector Sanjuan --- api.go | 201 ++++++++++++++++++++++++++++------------- api_test.go | 30 ++++-- cluster.go | 169 ++++++++++++++++++++++++---------- cluster_test.go | 22 ++--- config.go | 10 +- config_test.go | 8 +- consensus.go | 62 ++++++------- consensus_test.go | 36 ++++---- ipfs-cluster/main.go | 4 +- ipfs_cluster.go | 38 ++++---- ipfs_cluster_test.go | 10 +- ipfs_connector.go | 10 +- ipfs_connector_test.go | 4 +- raft.go | 8 +- rpc.go | 54 ++++++----- rpc_test.go | 16 ++-- state.go | 6 +- tracker.go | 63 ++++++++++--- 18 files changed, 477 insertions(+), 274 deletions(-) diff --git a/api.go b/api.go index 9d60961d..9201d8d0 100644 --- a/api.go +++ b/api.go @@ -9,20 +9,20 @@ import ( "strings" "sync" - peer "github.com/libp2p/go-libp2p-peer" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" mux "github.com/gorilla/mux" ) -// ClusterHTTPAPI implements a ClusterAPI and aims to provides +// ClusterHTTPAPI implements a API and aims to provides // a RESTful HTTP API for Cluster. type ClusterHTTPAPI struct { ctx context.Context listenAddr string listenPort int - rpcCh chan ClusterRPC + rpcCh chan RPC router *mux.Router listener net.Listener @@ -61,20 +61,20 @@ type unpinResp struct { Unpinned string `json:"unpinned"` } -type pinElemResp struct { +type statusCidResp struct { Cid string `json:"cid"` Status string `json:"status"` } -type pinListResp []pinElemResp +type statusResp []statusCidResp -// NewHTTPClusterAPI creates a new object which is ready to be +// NewHTTPAPI creates a new object which is ready to be // started. -func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { +func NewHTTPAPI(cfg *Config) (*ClusterHTTPAPI, error) { ctx := context.Background() l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", - cfg.ClusterAPIListenAddr, - cfg.ClusterAPIListenPort)) + cfg.APIListenAddr, + cfg.APIListenPort)) if err != nil { return nil, err } @@ -85,11 +85,11 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) { api := &ClusterHTTPAPI{ ctx: ctx, - listenAddr: cfg.ClusterAPIListenAddr, - listenPort: cfg.ClusterAPIListenPort, + listenAddr: cfg.APIListenAddr, + listenPort: cfg.APIListenPort, listener: l, server: s, - rpcCh: make(chan ClusterRPC, RPCMaxQueue), + rpcCh: make(chan RPC, RPCMaxQueue), } for _, route := range api.routes() { @@ -138,6 +138,30 @@ func (api *ClusterHTTPAPI) routes() []route { "/pins/{hash}", api.unpinHandler, }, + route{ + "Status", + "GET", + "/status", + api.statusHandler, + }, + route{ + "StatusCid", + "GET", + "/status/{hash}", + api.statusCidHandler, + }, + route{ + "Sync", + "POST", + "/status", + api.syncHandler, + }, + route{ + "SyncCid", + "POST", + "/status/{hash}", + api.syncCidHandler, + }, } } @@ -178,14 +202,14 @@ func (api *ClusterHTTPAPI) Shutdown() error { // RpcChan can be used by Cluster to read any // requests from this component -func (api *ClusterHTTPAPI) RpcChan() <-chan ClusterRPC { +func (api *ClusterHTTPAPI) RpcChan() <-chan RPC { return api.rpcCh } func (api *ClusterHTTPAPI) versionHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(api.ctx) defer cancel() - rRpc := RPC(VersionRPC, nil) + rRpc := NewRPC(VersionRPC, nil) resp := MakeRPC(ctx, api.rpcCh, rRpc, true) if checkResponse(w, rRpc.Op(), resp) { v := resp.Data.(string) @@ -196,7 +220,7 @@ func (api *ClusterHTTPAPI) versionHandler(w http.ResponseWriter, r *http.Request func (api *ClusterHTTPAPI) memberListHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(api.ctx) defer cancel() - rRpc := RPC(MemberListRPC, nil) + rRpc := NewRPC(MemberListRPC, nil) resp := MakeRPC(ctx, api.rpcCh, rRpc, true) if checkResponse(w, rRpc.Op(), resp) { data := resp.Data.([]peer.ID) @@ -211,72 +235,98 @@ func (api *ClusterHTTPAPI) memberListHandler(w http.ResponseWriter, r *http.Requ func (api *ClusterHTTPAPI) pinHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(api.ctx) defer cancel() - vars := mux.Vars(r) - hash := vars["hash"] - c, err := cid.Decode(hash) - if err != nil { - sendErrorResponse(w, 400, "error decoding Cid: "+err.Error()) - return - } - rRpc := RPC(PinRPC, c) - resp := MakeRPC(ctx, api.rpcCh, rRpc, true) - - if checkResponse(w, rRpc.Op(), resp) { - sendAcceptedResponse(w) + if c := parseCidOrError(w, r); c != nil { + rRpc := NewRPC(PinRPC, c) + resp := MakeRPC(ctx, api.rpcCh, rRpc, true) + if checkResponse(w, rRpc.Op(), resp) { + sendAcceptedResponse(w) + } } } func (api *ClusterHTTPAPI) unpinHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(api.ctx) defer cancel() - vars := mux.Vars(r) - hash := vars["hash"] - c, err := cid.Decode(hash) - if err != nil { - sendErrorResponse(w, 400, "error decoding Cid: "+err.Error()) - return - } - rRpc := RPC(UnpinRPC, c) - resp := MakeRPC(ctx, api.rpcCh, rRpc, true) - if checkResponse(w, rRpc.Op(), resp) { - sendAcceptedResponse(w) + if c := parseCidOrError(w, r); c != nil { + rRpc := NewRPC(UnpinRPC, c) + resp := MakeRPC(ctx, api.rpcCh, rRpc, true) + if checkResponse(w, rRpc.Op(), resp) { + sendAcceptedResponse(w) + } } } func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(api.ctx) defer cancel() - rRpc := RPC(PinListRPC, nil) + rRpc := NewRPC(PinListRPC, nil) resp := MakeRPC(ctx, api.rpcCh, rRpc, true) if checkResponse(w, rRpc.Op(), resp) { - data := resp.Data.([]Pin) - pins := make(pinListResp, 0, len(data)) - for _, d := range data { - var st string - switch d.Status { - case PinError: - st = "pin_error" - case UnpinError: - st = "unpin_error" - case Pinned: - st = "pinned" - case Pinning: - st = "pinning" - case Unpinning: - st = "unpinning" - } - pins = append(pins, pinElemResp{ - Cid: d.Cid.String(), - Status: st, - }) - } - sendJSONResponse(w, 200, pins) + data := resp.Data.([]*cid.Cid) + sendJSONResponse(w, 200, data) } } +func (api *ClusterHTTPAPI) statusHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithCancel(api.ctx) + defer cancel() + rRpc := NewRPC(StatusRPC, nil) + resp := MakeRPC(ctx, api.rpcCh, rRpc, true) + if checkResponse(w, rRpc.Op(), resp) { + sendStatusResponse(w, resp) + } +} + +func (api *ClusterHTTPAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithCancel(api.ctx) + defer cancel() + + if c := parseCidOrError(w, r); c != nil { + op := NewRPC(StatusCidRPC, c) + resp := MakeRPC(ctx, api.rpcCh, op, true) + if checkResponse(w, op.Op(), resp) { + sendStatusCidResponse(w, resp) + } + } +} + +func (api *ClusterHTTPAPI) syncHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithCancel(api.ctx) + defer cancel() + rRpc := NewRPC(GlobalSyncRPC, nil) + resp := MakeRPC(ctx, api.rpcCh, rRpc, true) + if checkResponse(w, rRpc.Op(), resp) { + sendStatusResponse(w, resp) + } +} + +func (api *ClusterHTTPAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithCancel(api.ctx) + defer cancel() + + if c := parseCidOrError(w, r); c != nil { + op := NewRPC(GlobalSyncCidRPC, c) + resp := MakeRPC(ctx, api.rpcCh, op, true) + if checkResponse(w, op.Op(), resp) { + sendStatusCidResponse(w, resp) + } + } +} + +func parseCidOrError(w http.ResponseWriter, r *http.Request) *cid.Cid { + vars := mux.Vars(r) + hash := vars["hash"] + c, err := cid.Decode(hash) + if err != nil { + sendErrorResponse(w, 400, "error decoding Cid: "+err.Error()) + return nil + } + return c +} + // checkResponse does basic checking on an RPCResponse. It takes care of // using the http.ResponseWriter to send // an error if the RPCResponse contains one. It also checks that the RPC @@ -293,8 +343,12 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool { switch op { case PinRPC: // Pin/Unpin only return errors case UnpinRPC: - case PinListRPC: + case StatusRPC, LocalSyncRPC, GlobalSyncRPC: _, ok = resp.Data.([]Pin) + case StatusCidRPC, LocalSyncCidRPC, GlobalSyncCidRPC: + _, ok = resp.Data.(Pin) + case PinListRPC: + _, ok = resp.Data.([]*cid.Cid) case IPFSPinRPC: case IPFSUnpinRPC: case VersionRPC: @@ -334,3 +388,24 @@ func sendErrorResponse(w http.ResponseWriter, code int, msg string) { logger.Errorf("sending error response: %d: %s", code, msg) sendJSONResponse(w, code, errorResp) } + +func sendStatusResponse(w http.ResponseWriter, resp RPCResponse) { + data := resp.Data.([]Pin) + pins := make(statusResp, 0, len(data)) + for _, d := range data { + pins = append(pins, statusCidResp{ + Cid: d.Cid.String(), + Status: d.Status.String(), + }) + } + sendJSONResponse(w, 200, pins) +} + +func sendStatusCidResponse(w http.ResponseWriter, resp RPCResponse) { + data := resp.Data.(Pin) + pin := statusCidResp{ + Cid: data.Cid.String(), + Status: data.Status.String(), + } + sendJSONResponse(w, 200, pin) +} diff --git a/api_test.go b/api_test.go index e74b1790..836186bd 100644 --- a/api_test.go +++ b/api_test.go @@ -8,8 +8,8 @@ import ( "net/http" "testing" - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-peer" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) var ( @@ -19,7 +19,7 @@ var ( func testClusterApi(t *testing.T) *ClusterHTTPAPI { //logging.SetDebugLogging() cfg := testingConfig() - api, err := NewHTTPClusterAPI(cfg) + api, err := NewHTTPAPI(cfg) // No keep alive! Otherwise tests hang with // connections re-used from previous tests api.server.SetKeepAlivesEnabled(false) @@ -166,6 +166,24 @@ func TestUnpinEndpoint(t *testing.T) { } func TestPinListEndpoint(t *testing.T) { + c, _ := cid.Decode(testCid) + c2, _ := cid.Decode(testCid2) + c3, _ := cid.Decode(testCid3) + api := testClusterApi(t) + defer api.Shutdown() + pList := []*cid.Cid{ + c, c2, c3, c, c2, + } + + simulateAnswer(api.RpcChan(), pList, nil) + var resp []*cid.Cid + makeGet(t, "/pins", &resp) + if len(resp) != 5 { + t.Error("unexpected pin list: ", resp) + } +} + +func TestStatusEndpoint(t *testing.T) { c, _ := cid.Decode(testCid) c2, _ := cid.Decode(testCid2) c3, _ := cid.Decode(testCid3) @@ -195,9 +213,9 @@ func TestPinListEndpoint(t *testing.T) { } simulateAnswer(api.RpcChan(), pList, nil) - var resp pinListResp - makeGet(t, "/pins", &resp) + var resp statusResp + makeGet(t, "/status", &resp) if len(resp) != 5 { - t.Error("unexpected pinListResp: ", resp) + t.Error("unexpected statusResp: ", resp) } } diff --git a/cluster.go b/cluster.go index 1672f537..cf13b515 100644 --- a/cluster.go +++ b/cluster.go @@ -8,15 +8,15 @@ import ( "strings" "sync" - crypto "github.com/libp2p/go-libp2p-crypto" - host "github.com/libp2p/go-libp2p-host" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - swarm "github.com/libp2p/go-libp2p-swarm" - basichost "github.com/libp2p/go-libp2p/p2p/host/basic" - multiaddr "github.com/multiformats/go-multiaddr" + host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" + multiaddr "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr" + swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm" + basichost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic" + peerstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" + crypto "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) // Cluster is the main IPFS cluster component. It provides @@ -24,32 +24,34 @@ import ( type Cluster struct { ctx context.Context - config *ClusterConfig + config *Config host host.Host - consensus *ClusterConsensus - api ClusterAPI + consensus *Consensus + api API ipfs IPFSConnector - state ClusterState + state State tracker PinTracker + rpcCh chan RPC + shutdownLock sync.Mutex shutdown bool shutdownCh chan struct{} wg sync.WaitGroup } -// NewCluster builds a ready-to-start IPFS Cluster. It takes a ClusterAPI, -// an IPFSConnector and a ClusterState as parameters, allowing the user, +// NewCluster builds a ready-to-start IPFS Cluster. It takes a API, +// an IPFSConnector and a State as parameters, allowing the user, // to provide custom implementations of these components. -func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state ClusterState, tracker PinTracker) (*Cluster, error) { +func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker) (*Cluster, error) { ctx := context.Background() host, err := makeHost(ctx, cfg) if err != nil { return nil, err } - consensus, err := NewClusterConsensus(cfg, host, state) + consensus, err := NewConsensus(cfg, host, state) if err != nil { logger.Errorf("error creating consensus: %s", err) return nil, err @@ -64,14 +66,13 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl ipfs: ipfs, state: state, tracker: tracker, + rpcCh: make(chan RPC), shutdownCh: make(chan struct{}), } logger.Info("starting IPFS Cluster") cluster.run() - logger.Info("performing State synchronization") - cluster.Sync() return cluster, nil } @@ -107,17 +108,77 @@ func (c *Cluster) Shutdown() error { return nil } -func (c *Cluster) Sync() error { +// LocalSync makes sure that the current state of the Cluster matches +// and the desired IPFS daemon state are aligned. It makes sure that +// IPFS is pinning content that should be pinned locally, and not pinning +// other content. It will also try to recover any failed pin or unpin +// operations by retrigerring them. +func (c *Cluster) LocalSync() ([]Pin, error) { cState, err := c.consensus.State() if err != nil { - return err + return nil, err } changed := c.tracker.SyncState(cState) for _, p := range changed { logger.Debugf("recovering %s", p.Cid) - c.tracker.Recover(p.Cid) + err = c.tracker.Recover(p.Cid) + if err != nil { + logger.Errorf("Error recovering %s: %s", p.Cid, err) + return nil, err + } } - return nil + return c.tracker.ListPins(), nil +} + +// LocalSyncCid makes sure that the current state of the cluster +// and the desired IPFS daemon state are aligned for a given Cid. It +// makes sure that IPFS is pinning content that should be pinned locally, +// and not pinning other content. It will also try to recover any failed +// pin or unpin operations by retriggering them. +func (c *Cluster) LocalSyncCid(h *cid.Cid) (Pin, error) { + changed := c.tracker.Sync(h) + if changed { + err := c.tracker.Recover(h) + if err != nil { + logger.Errorf("Error recovering %s: %s", h, err) + return Pin{}, err + } + } + return c.tracker.GetPin(h), nil +} + +// GlobalSync triggers Sync() operations in all members of the Cluster. +func (c *Cluster) GlobalSync() ([]Pin, error) { + return c.Status(), nil +} + +// GlobalSunc triggers a Sync() operation for a given Cid in all members +// of the Cluster. +func (c *Cluster) GlobalSyncCid(h *cid.Cid) (Pin, error) { + return c.StatusCid(h), nil +} + +// Status returns the last known status for all Pins tracked by Cluster. +func (c *Cluster) Status() []Pin { + // TODO: Global + return c.tracker.ListPins() +} + +// StatusCid returns the last known status for a given Cid +func (c *Cluster) StatusCid(h *cid.Cid) Pin { + // TODO: Global + return c.tracker.GetPin(h) +} + +// Pins returns the list of Cids managed by Cluster and which are part +// of the current global state. This is the source of truth as to which +// pins are managed, but does not indicate if the item is successfully pinned. +func (c *Cluster) Pins() []*cid.Cid { + cState, err := c.consensus.State() + if err != nil { + return []*cid.Cid{} + } + return cState.ListPins() } // Pin makes the cluster Pin a Cid. This implies adding the Cid @@ -177,41 +238,39 @@ func (c *Cluster) run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() c.ctx = ctx - ipfsCh := c.ipfs.RpcChan() - consensusCh := c.consensus.RpcChan() - apiCh := c.api.RpcChan() - trackerCh := c.tracker.RpcChan() - var op ClusterRPC + var op RPC for { select { - case op = <-ipfsCh: + case op = <-c.ipfs.RpcChan(): goto HANDLEOP - case op = <-consensusCh: + case op = <-c.consensus.RpcChan(): goto HANDLEOP - case op = <-apiCh: + case op = <-c.api.RpcChan(): goto HANDLEOP - case op = <-trackerCh: + case op = <-c.tracker.RpcChan(): + goto HANDLEOP + case op = <-c.rpcCh: goto HANDLEOP case <-c.shutdownCh: return } HANDLEOP: switch op.(type) { - case *CidClusterRPC: - crpc := op.(*CidClusterRPC) - go c.handleCidRPC(crpc) - case *GenericClusterRPC: - grpc := op.(*GenericClusterRPC) - go c.handleGenericRPC(grpc) + case *CidRPC: + crpc := op.(*CidRPC) + go c.handleCidNewRPC(crpc) + case *GenericRPC: + grpc := op.(*GenericRPC) + go c.handleGenericNewRPC(grpc) default: - logger.Error("unknown ClusterRPC type") + logger.Error("unknown RPC type") } } }() } -func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) { +func (c *Cluster) handleGenericNewRPC(grpc *GenericRPC) { var data interface{} = nil var err error = nil switch grpc.Op() { @@ -220,11 +279,15 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) { case MemberListRPC: data = c.Members() case PinListRPC: - data = c.tracker.ListPins() - case SyncRPC: - err = c.Sync() + data = c.Pins() + case LocalSyncRPC: + data, err = c.LocalSync() + case GlobalSyncRPC: + data, err = c.GlobalSync() + case StatusRPC: + data = c.Status() case RollbackRPC: - state, ok := grpc.Argument.(ClusterState) + state, ok := grpc.Argument.(State) if !ok { err = errors.New("bad RollbackRPC type") break @@ -233,13 +296,13 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) { case LeaderRPC: // Leader RPC is a RPC that needs to be run // by the Consensus Leader. Arguments is a wrapped RPC. - rpc, ok := grpc.Argument.(*ClusterRPC) + rpc, ok := grpc.Argument.(*RPC) if !ok { err = errors.New("bad LeaderRPC type") } - data, err = c.leaderRPC(rpc) + data, err = c.leaderNewRPC(rpc) default: - logger.Error("unknown operation for GenericClusterRPC. Ignoring.") + logger.Error("unknown operation for GenericRPC. Ignoring.") } resp := RPCResponse{ @@ -252,7 +315,7 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) { // handleOp takes care of running the necessary action for a // clusterRPC request and sending the response. -func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) { +func (c *Cluster) handleCidNewRPC(crpc *CidRPC) { var data interface{} = nil var err error = nil switch crpc.Op() { @@ -278,8 +341,14 @@ func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) { } case IPFSIsPinnedRPC: data, err = c.ipfs.IsPinned(crpc.CID) + case StatusCidRPC: + data = c.StatusCid(crpc.CID) + case LocalSyncCidRPC: + data, err = c.LocalSyncCid(crpc.CID) + case GlobalSyncCidRPC: + data, err = c.GlobalSyncCid(crpc.CID) default: - logger.Error("unknown operation for CidClusterRPC. Ignoring.") + logger.Error("unknown operation for CidRPC. Ignoring.") } resp := RPCResponse{ @@ -291,12 +360,12 @@ func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) { } // This uses libp2p to contact the cluster leader and ask him to do something -func (c *Cluster) leaderRPC(rpc *ClusterRPC) (interface{}, error) { +func (c *Cluster) leaderNewRPC(rpc *RPC) (interface{}, error) { return nil, errors.New("not implemented yet") } // makeHost makes a libp2p-host -func makeHost(ctx context.Context, cfg *ClusterConfig) (host.Host, error) { +func makeHost(ctx context.Context, cfg *Config) (host.Host, error) { ps := peerstore.NewPeerstore() peerID, err := peer.IDB58Decode(cfg.ID) if err != nil { diff --git a/cluster_test.go b/cluster_test.go index 01769c0e..d1045ac5 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -5,11 +5,11 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) type mockComponent struct { - rpcCh chan ClusterRPC + rpcCh chan RPC returnError bool } @@ -17,7 +17,7 @@ func (c *mockComponent) Shutdown() error { return nil } -func (c *mockComponent) RpcChan() <-chan ClusterRPC { +func (c *mockComponent) RpcChan() <-chan RPC { return c.rpcCh } @@ -52,9 +52,9 @@ func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) { func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState, *MapPinTracker) { api := &mockApi{} - api.rpcCh = make(chan ClusterRPC, 2) + api.rpcCh = make(chan RPC, 2) ipfs := &mockConnector{} - ipfs.rpcCh = make(chan ClusterRPC, 2) + ipfs.rpcCh = make(chan RPC, 2) cfg := testingConfig() st := NewMapState() tracker := NewMapPinTracker() @@ -87,11 +87,11 @@ func testClusterShutdown(t *testing.T) { } } -func TestClusterSync(t *testing.T) { +func TestClusterLocalSync(t *testing.T) { cl, _, _, st, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() - err := cl.Sync() + _, err := cl.LocalSync() if err == nil { t.Error("expected an error as there is no state to sync") } @@ -102,7 +102,7 @@ func TestClusterSync(t *testing.T) { t.Fatal("pin should have worked:", err) } - err = cl.Sync() + _, err = cl.LocalSync() if err != nil { t.Fatal("sync after pinning should have worked:", err) } @@ -110,7 +110,7 @@ func TestClusterSync(t *testing.T) { // Modify state on the side so the sync does not // happen on an empty slide st.RmPin(c) - err = cl.Sync() + _, err = cl.LocalSync() if err != nil { t.Fatal("sync with recover should have worked:", err) } @@ -185,7 +185,7 @@ func TestClusterRun(t *testing.T) { // Generic RPC for i := 0; i < NoopRPC; i++ { - rpc := RPC(RPCOp(i), "something") + rpc := NewRPC(RPCOp(i), "something") switch i % 4 { case 0: ipfs.rpcCh <- rpc @@ -208,7 +208,7 @@ func TestClusterRun(t *testing.T) { // Cid RPC c, _ := cid.Decode(testCid) for i := 0; i < NoopRPC; i++ { - rpc := RPC(RPCOp(i), c) + rpc := NewRPC(RPCOp(i), c) switch i % 4 { case 0: ipfs.rpcCh <- rpc diff --git a/config.go b/config.go index 8088ff0a..e2271d7d 100644 --- a/config.go +++ b/config.go @@ -5,11 +5,11 @@ import ( "io/ioutil" ) -type ClusterConfig struct { +type Config struct { IPFSHost string `json:"ipfs_host"` IPFSPort int `json:"ipfs_port"` - ClusterAPIListenAddr string `json:"cluster_api_listen_addr"` - ClusterAPIListenPort int `json:"cluster_api_listen_port"` + APIListenAddr string `json:"cluster_api_listen_addr"` + APIListenPort int `json:"cluster_api_listen_port"` IPFSAPIListenAddr string `json:"ipfs_api_listen_addr"` IPFSAPIListenPort int `json:"ipfs_api_listen_port"` ConsensusListenAddr string `json:"consensus_listen_addr"` @@ -20,8 +20,8 @@ type ClusterConfig struct { RaftFolder string `json:"raft_folder"` } -func LoadConfig(path string) (*ClusterConfig, error) { - config := &ClusterConfig{} +func LoadConfig(path string) (*Config, error) { + config := &Config{} file, err := ioutil.ReadFile(path) if err != nil { return nil, err diff --git a/config_test.go b/config_test.go index 4db963f9..0cb1ba0d 100644 --- a/config_test.go +++ b/config_test.go @@ -1,15 +1,15 @@ package ipfscluster -func testingConfig() *ClusterConfig { - cfg := &ClusterConfig{ +func testingConfig() *Config { + cfg := &Config{ ConsensusListenPort: 10000, ConsensusListenAddr: "127.0.0.1", ID: "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA", PrivateKey: "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4", IPFSAPIListenAddr: "127.0.0.1", IPFSAPIListenPort: 10001, - ClusterAPIListenAddr: "127.0.0.1", - ClusterAPIListenPort: 10002, + APIListenAddr: "127.0.0.1", + APIListenPort: 10002, RaftFolder: "./raftFolderFromTests", } diff --git a/consensus.go b/consensus.go index 9985e86e..476cce31 100644 --- a/consensus.go +++ b/consensus.go @@ -7,12 +7,12 @@ import ( "sync" "time" - consensus "github.com/libp2p/go-libp2p-consensus" - host "github.com/libp2p/go-libp2p-host" - peer "github.com/libp2p/go-libp2p-peer" - libp2praft "github.com/libp2p/go-libp2p-raft" + host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" + consensus "gx/ipfs/QmZ88KbrvZMJpXaNwAGffswcYKz8EbeafzAFGMCA6MEZKt/go-libp2p-consensus" + libp2praft "gx/ipfs/QmaofA6ApgPQm8yRojC77dQbVUatYMihdyQjB7VsAqrks1/go-libp2p-raft" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) const ( @@ -41,12 +41,12 @@ type clusterLogOp struct { Cid string Type clusterLogOpType ctx context.Context - rpcCh chan ClusterRPC + rpcCh chan RPC } -// ApplyTo applies the operation to the ClusterState +// ApplyTo applies the operation to the State func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { - state, ok := cstate.(ClusterState) + state, ok := cstate.(State) var err error if !ok { // Should never be here @@ -69,14 +69,14 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) goto ROLLBACK } // Async, we let the PinTracker take care of any problems - MakeRPC(ctx, op.rpcCh, RPC(IPFSPinRPC, c), false) + MakeRPC(ctx, op.rpcCh, NewRPC(IPFSPinRPC, c), false) case LogOpUnpin: err := state.RmPin(c) if err != nil { goto ROLLBACK } // Async, we let the PinTracker take care of any problems - MakeRPC(ctx, op.rpcCh, RPC(IPFSUnpinRPC, c), false) + MakeRPC(ctx, op.rpcCh, NewRPC(IPFSUnpinRPC, c), false) default: logger.Error("unknown clusterLogOp type. Ignoring") } @@ -87,8 +87,8 @@ ROLLBACK: // and therefore we need to request a rollback to the // cluster to the previous state. This operation can only be performed // by the cluster leader. - rllbckRPC := RPC(RollbackRPC, state) - leadrRPC := RPC(LeaderRPC, rllbckRPC) + rllbckRPC := NewRPC(RollbackRPC, state) + leadrRPC := NewRPC(LeaderRPC, rllbckRPC) MakeRPC(ctx, op.rpcCh, leadrRPC, false) logger.Errorf("an error ocurred when applying Op to state: %s", err) logger.Error("a rollback was requested") @@ -96,16 +96,16 @@ ROLLBACK: return nil, errors.New("a rollback was requested. Reason: " + err.Error()) } -// ClusterConsensus handles the work of keeping a shared-state between +// Consensus handles the work of keeping a shared-state between // the members of an IPFS Cluster, as well as modifying that state and // applying any updates in a thread-safe manner. -type ClusterConsensus struct { +type Consensus struct { ctx context.Context consensus consensus.OpLogConsensus actor consensus.Actor baseOp *clusterLogOp - rpcCh chan ClusterRPC + rpcCh chan RPC p2pRaft *libp2pRaftWrap @@ -115,13 +115,13 @@ type ClusterConsensus struct { wg sync.WaitGroup } -// NewClusterConsensus builds a new ClusterConsensus component. The state +// NewConsensus builds a new ClusterConsensus component. The state // is used to initialize the Consensus system, so any information in it // is discarded. -func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) (*ClusterConsensus, error) { +func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) { logger.Info("starting Consensus component") ctx := context.Background() - rpcCh := make(chan ClusterRPC, RPCMaxQueue) + rpcCh := make(chan RPC, RPCMaxQueue) op := &clusterLogOp{ ctx: context.Background(), rpcCh: rpcCh, @@ -133,7 +133,7 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) con.SetActor(actor) - cc := &ClusterConsensus{ + cc := &Consensus{ ctx: ctx, consensus: con, baseOp: op, @@ -147,7 +147,7 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) return cc, nil } -func (cc *ClusterConsensus) run() { +func (cc *Consensus) run() { cc.wg.Add(1) go func() { defer cc.wg.Done() @@ -178,9 +178,9 @@ func (cc *ClusterConsensus) run() { for !quitLoop { select { case <-timer.C: // Make a first sync - MakeRPC(ctx, cc.rpcCh, RPC(SyncRPC, nil), false) + MakeRPC(ctx, cc.rpcCh, NewRPC(LocalSyncRPC, nil), false) case <-upToDate: - MakeRPC(ctx, cc.rpcCh, RPC(SyncRPC, nil), false) + MakeRPC(ctx, cc.rpcCh, NewRPC(LocalSyncRPC, nil), false) quitLoop = true } } @@ -192,7 +192,7 @@ func (cc *ClusterConsensus) run() { // Shutdown stops the component so it will not process any // more updates. The underlying consensus is permanently // shutdown, along with the libp2p transport. -func (cc *ClusterConsensus) Shutdown() error { +func (cc *Consensus) Shutdown() error { cc.shutdownLock.Lock() defer cc.shutdownLock.Unlock() @@ -240,11 +240,11 @@ func (cc *ClusterConsensus) Shutdown() error { // RpcChan can be used by Cluster to read any // requests from this component -func (cc *ClusterConsensus) RpcChan() <-chan ClusterRPC { +func (cc *Consensus) RpcChan() <-chan RPC { return cc.rpcCh } -func (cc *ClusterConsensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { +func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { return &clusterLogOp{ Cid: c.String(), Type: t, @@ -252,7 +252,7 @@ func (cc *ClusterConsensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { } // AddPin submits a Cid to the shared state of the cluster. -func (cc *ClusterConsensus) AddPin(c *cid.Cid) error { +func (cc *Consensus) AddPin(c *cid.Cid) error { // Create pin operation for the log op := cc.op(c, LogOpPin) _, err := cc.consensus.CommitOp(op) @@ -265,7 +265,7 @@ func (cc *ClusterConsensus) AddPin(c *cid.Cid) error { } // RmPin removes a Cid from the shared state of the cluster. -func (cc *ClusterConsensus) RmPin(c *cid.Cid) error { +func (cc *Consensus) RmPin(c *cid.Cid) error { // Create unpin operation for the log op := cc.op(c, LogOpUnpin) _, err := cc.consensus.CommitOp(op) @@ -276,12 +276,12 @@ func (cc *ClusterConsensus) RmPin(c *cid.Cid) error { return nil } -func (cc *ClusterConsensus) State() (ClusterState, error) { +func (cc *Consensus) State() (State, error) { st, err := cc.consensus.GetLogHead() if err != nil { return nil, err } - state, ok := st.(ClusterState) + state, ok := st.(State) if !ok { return nil, errors.New("wrong state type") } @@ -290,13 +290,13 @@ func (cc *ClusterConsensus) State() (ClusterState, error) { // Leader() returns the peerID of the Leader of the // cluster. -func (cc *ClusterConsensus) Leader() peer.ID { +func (cc *Consensus) Leader() peer.ID { // FIXME: Hashicorp Raft specific raftactor := cc.actor.(*libp2praft.Actor) return raftactor.Leader() } // TODO -func (cc *ClusterConsensus) Rollback(state ClusterState) error { +func (cc *Consensus) Rollback(state State) error { return cc.consensus.Rollback(state) } diff --git a/consensus_test.go b/consensus_test.go index fb6d51b0..63abdb0b 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) func TestApplyToPin(t *testing.T) { @@ -14,7 +14,7 @@ func TestApplyToPin(t *testing.T) { Cid: testCid, Type: LogOpPin, ctx: context.Background(), - rpcCh: make(chan ClusterRPC, 1), + rpcCh: make(chan RPC, 1), } st := NewMapState() @@ -30,7 +30,7 @@ func TestApplyToUnpin(t *testing.T) { Cid: testCid, Type: LogOpUnpin, ctx: context.Background(), - rpcCh: make(chan ClusterRPC, 1), + rpcCh: make(chan RPC, 1), } st := NewMapState() @@ -54,7 +54,7 @@ func TestApplyToBadState(t *testing.T) { Cid: testCid, Type: LogOpUnpin, ctx: context.Background(), - rpcCh: make(chan ClusterRPC, 1), + rpcCh: make(chan RPC, 1), } var st interface{} @@ -72,7 +72,7 @@ func TestApplyToBadCid(t *testing.T) { Cid: "agadfaegf", Type: LogOpPin, ctx: context.Background(), - rpcCh: make(chan ClusterRPC, 1), + rpcCh: make(chan RPC, 1), } st := NewMapState() @@ -83,7 +83,7 @@ func cleanRaft() { os.RemoveAll(testingConfig().RaftFolder) } -func testingClusterConsensus(t *testing.T) *ClusterConsensus { +func testingConsensus(t *testing.T) *Consensus { //logging.SetDebugLogging() cfg := testingConfig() ctx := context.Background() @@ -92,34 +92,34 @@ func testingClusterConsensus(t *testing.T) *ClusterConsensus { t.Fatal("cannot create host:", err) } st := NewMapState() - cc, err := NewClusterConsensus(cfg, h, st) + cc, err := NewConsensus(cfg, h, st) if err != nil { - t.Fatal("cannot create ClusterConsensus:", err) + t.Fatal("cannot create Consensus:", err) } // Oxygen for Raft to declare leader - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) return cc } -func TestShutdownClusterConsensus(t *testing.T) { +func TestShutdownConsensus(t *testing.T) { // Bring it up twice to make sure shutdown cleans up properly // but also to make sure raft comes up ok when re-initialized defer cleanRaft() - cc := testingClusterConsensus(t) + cc := testingConsensus(t) err := cc.Shutdown() if err != nil { - t.Fatal("ClusterConsensus cannot shutdown:", err) + t.Fatal("Consensus cannot shutdown:", err) } cc.Shutdown() - cc = testingClusterConsensus(t) + cc = testingConsensus(t) err = cc.Shutdown() if err != nil { - t.Fatal("ClusterConsensus cannot shutdown:", err) + t.Fatal("Consensus cannot shutdown:", err) } } func TestConsensusPin(t *testing.T) { - cc := testingClusterConsensus(t) + cc := testingConsensus(t) defer cleanRaft() // Remember defer runs in LIFO order defer cc.Shutdown() @@ -132,7 +132,7 @@ func TestConsensusPin(t *testing.T) { time.Sleep(250 * time.Millisecond) st, err := cc.State() if err != nil { - t.Error("error getting state:", err) + t.Fatal("error getting state:", err) } pins := st.ListPins() @@ -142,7 +142,7 @@ func TestConsensusPin(t *testing.T) { } func TestConsensusUnpin(t *testing.T) { - cc := testingClusterConsensus(t) + cc := testingConsensus(t) defer cleanRaft() defer cc.Shutdown() @@ -154,7 +154,7 @@ func TestConsensusUnpin(t *testing.T) { } func TestConsensusLeader(t *testing.T) { - cc := testingClusterConsensus(t) + cc := testingConsensus(t) cfg := testingConfig() pId := cfg.ID defer cleanRaft() diff --git a/ipfs-cluster/main.go b/ipfs-cluster/main.go index 050d1777..d783ff99 100644 --- a/ipfs-cluster/main.go +++ b/ipfs-cluster/main.go @@ -7,7 +7,7 @@ import ( "os/user" "path/filepath" - logging "github.com/ipfs/go-log" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" ipfscluster "github.com/ipfs/ipfs-cluster" ) @@ -26,7 +26,7 @@ func main() { fmt.Println(err) return } - api, err := ipfscluster.NewHTTPClusterAPI(clusterCfg) + api, err := ipfscluster.NewHTTPAPI(clusterCfg) if err != nil { fmt.Println(err) return diff --git a/ipfs_cluster.go b/ipfs_cluster.go index f85079e7..2656d60c 100644 --- a/ipfs_cluster.go +++ b/ipfs_cluster.go @@ -12,9 +12,9 @@ import ( "errors" "time" - cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-peer" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) var logger = logging.Logger("ipfs-cluster") @@ -22,28 +22,28 @@ var logger = logging.Logger("ipfs-cluster") // Current Cluster version. const Version = "0.0.1" -// RPCMaxQueue can be used to set the size of the ClusterRPC channels, +// RPCMaxQueue can be used to set the size of the RPC channels, // which will start blocking on send after reaching this number. var RPCMaxQueue = 128 // MakeRPCRetryInterval specifies how long to wait before retrying -// to put a ClusterRPC request in the channel in MakeRPC(). +// to put a RPC request in the channel in MakeRPC(). var MakeRPCRetryInterval time.Duration = 1 * time.Second // ClusterComponent represents a piece of ipfscluster. Cluster components // usually run their own goroutines (a http server for example). They -// communicate with Cluster via a channel carrying ClusterRPC operations. +// communicate with Cluster via a channel carrying RPC operations. // These operations request tasks from other components. This way all components // are independent from each other and can be swapped as long as they maintain // RPC compatibility with Cluster. type ClusterComponent interface { Shutdown() error - RpcChan() <-chan ClusterRPC + RpcChan() <-chan RPC } -// ClusterAPI is a component which offers an API for Cluster. This is +// API is a component which offers an API for Cluster. This is // a base component. -type ClusterAPI interface { +type API interface { ClusterComponent } @@ -64,13 +64,13 @@ type Peered interface { SetPeers(peers []peer.ID) } -// ClusterState represents the shared state of the cluster and it -// is used by the ClusterConsensus component to keep track of +// State represents the shared state of the cluster and it +// is used by the Consensus component to keep track of // objects which objects are pinned. This component should be thread safe. -type ClusterState interface { - // AddPin adds a pin to the ClusterState +type State interface { + // AddPin adds a pin to the State AddPin(*cid.Cid) error - // RmPin removes a pin from the ClusterState + // RmPin removes a pin from the State RmPin(*cid.Cid) error // ListPins lists all the pins in the state ListPins() []*cid.Cid @@ -107,17 +107,17 @@ type PinTracker interface { SyncAll() []Pin // SyncState makes sure that the tracked Pins matches those in the // cluster state and runs SyncAll(). It returns a list of changed Pins. - SyncState(ClusterState) []Pin + SyncState(State) []Pin } -// MakeRPC sends a ClusterRPC object over a channel and optionally waits for a -// Response on the ClusterRPC.ResponseCh channel. It can be used by any +// MakeRPC sends a RPC object over a channel and optionally waits for a +// Response on the RPC.ResponseCh channel. It can be used by any // ClusterComponent to simplify making RPC requests to Cluster. // The ctx parameter must be a cancellable context, and can be used to // timeout requests. -// If the message cannot be placed in the ClusterRPC channel, retries will be +// If the message cannot be placed in the RPC channel, retries will be // issued every MakeRPCRetryInterval. -func MakeRPC(ctx context.Context, rpcCh chan ClusterRPC, r ClusterRPC, waitForResponse bool) RPCResponse { +func MakeRPC(ctx context.Context, rpcCh chan RPC, r RPC, waitForResponse bool) RPCResponse { logger.Debugf("sending RPC %d", r.Op()) exitLoop := false for !exitLoop { diff --git a/ipfs_cluster_test.go b/ipfs_cluster_test.go index 885e5976..cd85456f 100644 --- a/ipfs_cluster_test.go +++ b/ipfs_cluster_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - peer "github.com/libp2p/go-libp2p-peer" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) var ( @@ -17,8 +17,8 @@ var ( ) func TestMakeRPC(t *testing.T) { - testCh := make(chan ClusterRPC, 1) - testReq := RPC(MemberListRPC, nil) + testCh := make(chan RPC, 1) + testReq := NewRPC(MemberListRPC, nil) testResp := RPCResponse{ Data: "hey", Error: nil, @@ -60,7 +60,7 @@ func TestMakeRPC(t *testing.T) { // Test cancelled while waiting for response context ctx, cancel = context.WithCancel(context.Background()) - testCh = make(chan ClusterRPC, 1) + testCh = make(chan RPC, 1) go func() { resp := MakeRPC(ctx, testCh, testReq, true) if resp.Error == nil || !strings.Contains(resp.Error.Error(), "timed out") { @@ -72,7 +72,7 @@ func TestMakeRPC(t *testing.T) { time.Sleep(1 * time.Second) } -func simulateAnswer(ch <-chan ClusterRPC, answer interface{}, err error) { +func simulateAnswer(ch <-chan RPC, answer interface{}, err error) { go func() { req := <-ch req.ResponseCh() <- RPCResponse{ diff --git a/ipfs_connector.go b/ipfs_connector.go index 2af5dd4a..c06d7173 100644 --- a/ipfs_connector.go +++ b/ipfs_connector.go @@ -12,7 +12,7 @@ import ( "strings" "sync" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) // IPFSHTTPConnector implements the IPFSConnector interface @@ -31,7 +31,7 @@ type IPFSHTTPConnector struct { listenAddr string listenPort int handlers map[string]func(http.ResponseWriter, *http.Request) - rpcCh chan ClusterRPC + rpcCh chan RPC listener net.Listener server *http.Server @@ -46,7 +46,7 @@ type ipfsError struct { } // NewIPFSHTTPConnector creates the component and leaves it ready to be started -func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { +func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) { ctx := context.Background() l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.IPFSAPIListenAddr, cfg.IPFSAPIListenPort)) @@ -67,7 +67,7 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) { listenAddr: cfg.IPFSAPIListenAddr, listenPort: cfg.IPFSAPIListenPort, handlers: make(map[string]func(http.ResponseWriter, *http.Request)), - rpcCh: make(chan ClusterRPC, RPCMaxQueue), + rpcCh: make(chan RPC, RPCMaxQueue), listener: l, server: s, } @@ -138,7 +138,7 @@ func (ipfs *IPFSHTTPConnector) run() { // RpcChan can be used by Cluster to read any // requests from this component. -func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC { +func (ipfs *IPFSHTTPConnector) RpcChan() <-chan RPC { return ipfs.rpcCh } diff --git a/ipfs_connector_test.go b/ipfs_connector_test.go index 5d38ebbe..453c4894 100644 --- a/ipfs_connector_test.go +++ b/ipfs_connector_test.go @@ -9,7 +9,7 @@ import ( "strings" "testing" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) func testServer(t *testing.T) *httptest.Server { @@ -43,7 +43,7 @@ func testServer(t *testing.T) *httptest.Server { return ts } -func testIPFSConnectorConfig(ts *httptest.Server) *ClusterConfig { +func testIPFSConnectorConfig(ts *httptest.Server) *Config { url, _ := url.Parse(ts.URL) h := strings.Split(url.Host, ":") i, _ := strconv.Atoi(h[1]) diff --git a/raft.go b/raft.go index ffd22ec1..ca156fa4 100644 --- a/raft.go +++ b/raft.go @@ -3,10 +3,10 @@ package ipfscluster import ( "path/filepath" - host "github.com/libp2p/go-libp2p-host" - libp2praft "github.com/libp2p/go-libp2p-raft" + host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" + libp2praft "gx/ipfs/QmaofA6ApgPQm8yRojC77dQbVUatYMihdyQjB7VsAqrks1/go-libp2p-raft" - peer "github.com/libp2p/go-libp2p-peer" + peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" hashiraft "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" @@ -26,7 +26,7 @@ type libp2pRaftWrap struct { // This function does all heavy the work which is specifically related to // hashicorp's Raft. Other places should just rely on the Consensus interface. -func makeLibp2pRaft(cfg *ClusterConfig, host host.Host, state ClusterState, op *clusterLogOp) (*libp2praft.Consensus, *libp2praft.Actor, *libp2pRaftWrap, error) { +func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) (*libp2praft.Consensus, *libp2praft.Actor, *libp2pRaftWrap, error) { logger.Debug("creating libp2p Raft transport") transport, err := libp2praft.NewLibp2pTransportWithHost(host) if err != nil { diff --git a/rpc.go b/rpc.go index 5d62a51b..004a07fe 100644 --- a/rpc.go +++ b/rpc.go @@ -1,8 +1,8 @@ package ipfscluster -import cid "github.com/ipfs/go-cid" +import cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" -// ClusterRPC supported operations. +// RPC supported operations. const ( PinRPC = iota UnpinRPC @@ -14,7 +14,12 @@ const ( MemberListRPC RollbackRPC LeaderRPC - SyncRPC + LocalSyncRPC + LocalSyncCidRPC + GlobalSyncRPC + GlobalSyncCidRPC + StatusRPC + StatusCidRPC NoopRPC ) @@ -22,14 +27,14 @@ const ( // RPCMethod identifies which RPC supported operation we are trying to make type RPCOp int -// ClusterRPC represents an internal RPC operation. It should be implemented +// RPC represents an internal RPC operation. It should be implemented // by all RPC types. -type ClusterRPC interface { +type RPC interface { Op() RPCOp ResponseCh() chan RPCResponse } -// baseRPC implements ClusterRPC and can be included as anonymous +// baseRPC implements RPC and can be included as anonymous // field in other types. type baseRPC struct { method RPCOp @@ -47,39 +52,40 @@ func (brpc *baseRPC) ResponseCh() chan RPCResponse { return brpc.responseCh } -// GenericClusterRPC is a ClusterRPC with generic arguments. -type GenericClusterRPC struct { +// GenericRPC is a ClusterRPC with generic arguments. +type GenericRPC struct { baseRPC Argument interface{} } -// CidClusterRPC is a ClusterRPC whose only argument is a CID. -type CidClusterRPC struct { +// CidRPC is a ClusterRPC whose only argument is a CID. +type CidRPC struct { baseRPC CID *cid.Cid } -// RPC builds a ClusterRPC request. It will create a -// CidClusterRPC if the arg is of type cid.Cid. Otherwise, -// a GenericClusterRPC is returned. -func RPC(m RPCOp, arg interface{}) ClusterRPC { - c, ok := arg.(*cid.Cid) - if ok { // Its a CID - r := new(CidClusterRPC) +// RPC builds a RPC request. It will create a +// CidRPC if the arg is of type cid.Cid. Otherwise, +// a GenericRPC is returned. +func NewRPC(m RPCOp, arg interface{}) RPC { + switch arg.(type) { + case *cid.Cid: + c := arg.(*cid.Cid) + r := new(CidRPC) r.method = m r.CID = c r.responseCh = make(chan RPCResponse) return r + default: + r := new(GenericRPC) + r.method = m + r.Argument = arg + r.responseCh = make(chan RPCResponse) + return r } - // Its not a cid, make a generic - r := new(GenericClusterRPC) - r.method = m - r.Argument = arg - r.responseCh = make(chan RPCResponse) - return r } -// RPCResponse carries the result of a ClusterRPC requested operation and/or +// RPCResponse carries the result of a RPC requested operation and/or // an error to indicate if the operation was successful. type RPCResponse struct { Data interface{} diff --git a/rpc_test.go b/rpc_test.go index b6a4cb14..1a631685 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -3,18 +3,18 @@ package ipfscluster import ( "testing" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) -func TestRPC(t *testing.T) { +func TestNewRPC(t *testing.T) { c, err := cid.Decode(testCid) if err != nil { t.Fatal(err) } - crpc := RPC(IPFSPinRPC, c) - _, ok := crpc.(*CidClusterRPC) + crpc := NewRPC(IPFSPinRPC, c) + _, ok := crpc.(*CidRPC) if !ok { - t.Error("expected a CidClusterRPC") + t.Error("expected a CidRPC") } if crpc.Op() != IPFSPinRPC { @@ -25,10 +25,10 @@ func TestRPC(t *testing.T) { t.Error("should have made the ResponseCh") } - grpc := RPC(MemberListRPC, 3) - _, ok = grpc.(*GenericClusterRPC) + grpc := NewRPC(MemberListRPC, 3) + _, ok = grpc.(*GenericRPC) if !ok { - t.Error("expected a GenericClusterRPC") + t.Error("expected a GenericRPC") } if grpc.Op() != MemberListRPC { diff --git a/state.go b/state.go index 8becd773..71eea6a3 100644 --- a/state.go +++ b/state.go @@ -3,7 +3,7 @@ package ipfscluster import ( "sync" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) // MapState is a very simple database to store @@ -12,13 +12,13 @@ type MapState struct { mux sync.RWMutex PinMap map[string]struct{} - rpcCh chan ClusterRPC + rpcCh chan RPC } func NewMapState() *MapState { return &MapState{ PinMap: make(map[string]struct{}), - rpcCh: make(chan ClusterRPC), + rpcCh: make(chan RPC), } } diff --git a/tracker.go b/tracker.go index 65c2dfdc..9ed24933 100644 --- a/tracker.go +++ b/tracker.go @@ -3,14 +3,23 @@ package ipfscluster import ( "context" "sync" + "time" - cid "github.com/ipfs/go-cid" + cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) const ( pinEverywhere = -1 ) +// A Pin or Unpin operation will be considered failed +// if the Cid has stayed in Pinning or Unpinning state +// for longer than these values. +var ( + PinningTimeout = 15 * time.Minute + UnpinningTimeout = 10 * time.Second +) + const ( PinError = iota UnpinError @@ -24,17 +33,36 @@ type Pin struct { Cid *cid.Cid PinMode PinMode Status PinStatus + TS time.Time } type PinMode int type PinStatus int +func (st PinStatus) String() string { + switch st { + case PinError: + return "pin_error" + case UnpinError: + return "unpin_error" + case Pinned: + return "pinned" + case Pinning: + return "pinning" + case Unpinning: + return "unpinning" + case Unpinned: + return "unpinned" + } + return "" +} + type MapPinTracker struct { mux sync.Mutex status map[string]Pin ctx context.Context - rpcCh chan ClusterRPC + rpcCh chan RPC shutdownLock sync.Mutex shutdown bool @@ -46,7 +74,7 @@ func NewMapPinTracker() *MapPinTracker { ctx := context.Background() mpt := &MapPinTracker{ status: make(map[string]Pin), - rpcCh: make(chan ClusterRPC, RPCMaxQueue), + rpcCh: make(chan RPC, RPCMaxQueue), ctx: ctx, shutdownCh: make(chan struct{}), } @@ -94,6 +122,7 @@ func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error { Cid: c, PinMode: pinEverywhere, Status: s, + TS: time.Now(), } return nil } @@ -160,7 +189,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { return true } - resp := MakeRPC(ctx, mpt.rpcCh, RPC(IPFSIsPinnedRPC, c), true) + resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSIsPinnedRPC, c), true) if resp.Error != nil { if p.Status == Pinned || p.Status == Pinning { mpt.set(c, PinError) @@ -186,8 +215,11 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { mpt.set(c, Pinned) return true case Unpinning: - mpt.set(c, UnpinError) // Not sure here - return true + if time.Since(p.TS) > UnpinningTimeout { + mpt.set(c, UnpinError) + return true + } + return false case Unpinned: mpt.set(c, UnpinError) return true @@ -198,8 +230,11 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { mpt.set(c, PinError) return true case Pinning: - mpt.set(c, PinError) - return true + if time.Since(p.TS) > PinningTimeout { + mpt.set(c, PinError) + return true + } + return false case Unpinning: mpt.set(c, Unpinned) return true @@ -219,10 +254,10 @@ func (mpt *MapPinTracker) Recover(c *cid.Cid) error { ctx, cancel := context.WithCancel(mpt.ctx) defer cancel() if p.Status == PinError { - MakeRPC(ctx, mpt.rpcCh, RPC(IPFSPinRPC, c), false) + MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSPinRPC, c), false) } if p.Status == UnpinError { - MakeRPC(ctx, mpt.rpcCh, RPC(IPFSUnpinRPC, c), false) + MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), false) } return nil } @@ -239,7 +274,7 @@ func (mpt *MapPinTracker) SyncAll() []Pin { return changedPins } -func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin { +func (mpt *MapPinTracker) SyncState(cState State) []Pin { clusterPins := cState.ListPins() clusterMap := make(map[string]struct{}) // Make a map for faster lookup @@ -252,7 +287,7 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin { var changed []Pin mpt.mux.Lock() - // Collect items in the ClusterState not in the tracker + // Collect items in the State not in the tracker for _, c := range clusterPins { _, ok := mpt.status[c.String()] if !ok { @@ -260,7 +295,7 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin { } } - // Collect items in the tracker not in the ClusterState + // Collect items in the tracker not in the State for _, p := range mpt.status { _, ok := clusterMap[p.Cid.String()] if !ok { @@ -293,6 +328,6 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin { return changed } -func (mpt *MapPinTracker) RpcChan() <-chan ClusterRPC { +func (mpt *MapPinTracker) RpcChan() <-chan RPC { return mpt.rpcCh }