Renames everywhere removing redundant "Cluster" from "ClusterSomething".

Start preparing syncs() and status()

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-15 19:08:46 +01:00
parent 45c31846d1
commit 319c97585b
18 changed files with 477 additions and 274 deletions

201
api.go
View File

@ -9,20 +9,20 @@ import (
"strings"
"sync"
peer "github.com/libp2p/go-libp2p-peer"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
mux "github.com/gorilla/mux"
)
// ClusterHTTPAPI implements a ClusterAPI and aims to provides
// ClusterHTTPAPI implements a API and aims to provides
// a RESTful HTTP API for Cluster.
type ClusterHTTPAPI struct {
ctx context.Context
listenAddr string
listenPort int
rpcCh chan ClusterRPC
rpcCh chan RPC
router *mux.Router
listener net.Listener
@ -61,20 +61,20 @@ type unpinResp struct {
Unpinned string `json:"unpinned"`
}
type pinElemResp struct {
type statusCidResp struct {
Cid string `json:"cid"`
Status string `json:"status"`
}
type pinListResp []pinElemResp
type statusResp []statusCidResp
// NewHTTPClusterAPI creates a new object which is ready to be
// NewHTTPAPI creates a new object which is ready to be
// started.
func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
func NewHTTPAPI(cfg *Config) (*ClusterHTTPAPI, error) {
ctx := context.Background()
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.ClusterAPIListenAddr,
cfg.ClusterAPIListenPort))
cfg.APIListenAddr,
cfg.APIListenPort))
if err != nil {
return nil, err
}
@ -85,11 +85,11 @@ func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
api := &ClusterHTTPAPI{
ctx: ctx,
listenAddr: cfg.ClusterAPIListenAddr,
listenPort: cfg.ClusterAPIListenPort,
listenAddr: cfg.APIListenAddr,
listenPort: cfg.APIListenPort,
listener: l,
server: s,
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
rpcCh: make(chan RPC, RPCMaxQueue),
}
for _, route := range api.routes() {
@ -138,6 +138,30 @@ func (api *ClusterHTTPAPI) routes() []route {
"/pins/{hash}",
api.unpinHandler,
},
route{
"Status",
"GET",
"/status",
api.statusHandler,
},
route{
"StatusCid",
"GET",
"/status/{hash}",
api.statusCidHandler,
},
route{
"Sync",
"POST",
"/status",
api.syncHandler,
},
route{
"SyncCid",
"POST",
"/status/{hash}",
api.syncCidHandler,
},
}
}
@ -178,14 +202,14 @@ func (api *ClusterHTTPAPI) Shutdown() error {
// RpcChan can be used by Cluster to read any
// requests from this component
func (api *ClusterHTTPAPI) RpcChan() <-chan ClusterRPC {
func (api *ClusterHTTPAPI) RpcChan() <-chan RPC {
return api.rpcCh
}
func (api *ClusterHTTPAPI) versionHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
rRpc := RPC(VersionRPC, nil)
rRpc := NewRPC(VersionRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
v := resp.Data.(string)
@ -196,7 +220,7 @@ func (api *ClusterHTTPAPI) versionHandler(w http.ResponseWriter, r *http.Request
func (api *ClusterHTTPAPI) memberListHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
rRpc := RPC(MemberListRPC, nil)
rRpc := NewRPC(MemberListRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
data := resp.Data.([]peer.ID)
@ -211,72 +235,98 @@ func (api *ClusterHTTPAPI) memberListHandler(w http.ResponseWriter, r *http.Requ
func (api *ClusterHTTPAPI) pinHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
vars := mux.Vars(r)
hash := vars["hash"]
c, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return
}
rRpc := RPC(PinRPC, c)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendAcceptedResponse(w)
if c := parseCidOrError(w, r); c != nil {
rRpc := NewRPC(PinRPC, c)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendAcceptedResponse(w)
}
}
}
func (api *ClusterHTTPAPI) unpinHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
vars := mux.Vars(r)
hash := vars["hash"]
c, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return
}
rRpc := RPC(UnpinRPC, c)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendAcceptedResponse(w)
if c := parseCidOrError(w, r); c != nil {
rRpc := NewRPC(UnpinRPC, c)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendAcceptedResponse(w)
}
}
}
func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
rRpc := RPC(PinListRPC, nil)
rRpc := NewRPC(PinListRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
data := resp.Data.([]Pin)
pins := make(pinListResp, 0, len(data))
for _, d := range data {
var st string
switch d.Status {
case PinError:
st = "pin_error"
case UnpinError:
st = "unpin_error"
case Pinned:
st = "pinned"
case Pinning:
st = "pinning"
case Unpinning:
st = "unpinning"
}
pins = append(pins, pinElemResp{
Cid: d.Cid.String(),
Status: st,
})
}
sendJSONResponse(w, 200, pins)
data := resp.Data.([]*cid.Cid)
sendJSONResponse(w, 200, data)
}
}
func (api *ClusterHTTPAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
rRpc := NewRPC(StatusRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendStatusResponse(w, resp)
}
}
func (api *ClusterHTTPAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
if c := parseCidOrError(w, r); c != nil {
op := NewRPC(StatusCidRPC, c)
resp := MakeRPC(ctx, api.rpcCh, op, true)
if checkResponse(w, op.Op(), resp) {
sendStatusCidResponse(w, resp)
}
}
}
func (api *ClusterHTTPAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
rRpc := NewRPC(GlobalSyncRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendStatusResponse(w, resp)
}
}
func (api *ClusterHTTPAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
if c := parseCidOrError(w, r); c != nil {
op := NewRPC(GlobalSyncCidRPC, c)
resp := MakeRPC(ctx, api.rpcCh, op, true)
if checkResponse(w, op.Op(), resp) {
sendStatusCidResponse(w, resp)
}
}
}
func parseCidOrError(w http.ResponseWriter, r *http.Request) *cid.Cid {
vars := mux.Vars(r)
hash := vars["hash"]
c, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return nil
}
return c
}
// checkResponse does basic checking on an RPCResponse. It takes care of
// using the http.ResponseWriter to send
// an error if the RPCResponse contains one. It also checks that the RPC
@ -293,8 +343,12 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool {
switch op {
case PinRPC: // Pin/Unpin only return errors
case UnpinRPC:
case PinListRPC:
case StatusRPC, LocalSyncRPC, GlobalSyncRPC:
_, ok = resp.Data.([]Pin)
case StatusCidRPC, LocalSyncCidRPC, GlobalSyncCidRPC:
_, ok = resp.Data.(Pin)
case PinListRPC:
_, ok = resp.Data.([]*cid.Cid)
case IPFSPinRPC:
case IPFSUnpinRPC:
case VersionRPC:
@ -334,3 +388,24 @@ func sendErrorResponse(w http.ResponseWriter, code int, msg string) {
logger.Errorf("sending error response: %d: %s", code, msg)
sendJSONResponse(w, code, errorResp)
}
func sendStatusResponse(w http.ResponseWriter, resp RPCResponse) {
data := resp.Data.([]Pin)
pins := make(statusResp, 0, len(data))
for _, d := range data {
pins = append(pins, statusCidResp{
Cid: d.Cid.String(),
Status: d.Status.String(),
})
}
sendJSONResponse(w, 200, pins)
}
func sendStatusCidResponse(w http.ResponseWriter, resp RPCResponse) {
data := resp.Data.(Pin)
pin := statusCidResp{
Cid: data.Cid.String(),
Status: data.Status.String(),
}
sendJSONResponse(w, 200, pin)
}

View File

@ -8,8 +8,8 @@ import (
"net/http"
"testing"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
var (
@ -19,7 +19,7 @@ var (
func testClusterApi(t *testing.T) *ClusterHTTPAPI {
//logging.SetDebugLogging()
cfg := testingConfig()
api, err := NewHTTPClusterAPI(cfg)
api, err := NewHTTPAPI(cfg)
// No keep alive! Otherwise tests hang with
// connections re-used from previous tests
api.server.SetKeepAlivesEnabled(false)
@ -166,6 +166,24 @@ func TestUnpinEndpoint(t *testing.T) {
}
func TestPinListEndpoint(t *testing.T) {
c, _ := cid.Decode(testCid)
c2, _ := cid.Decode(testCid2)
c3, _ := cid.Decode(testCid3)
api := testClusterApi(t)
defer api.Shutdown()
pList := []*cid.Cid{
c, c2, c3, c, c2,
}
simulateAnswer(api.RpcChan(), pList, nil)
var resp []*cid.Cid
makeGet(t, "/pins", &resp)
if len(resp) != 5 {
t.Error("unexpected pin list: ", resp)
}
}
func TestStatusEndpoint(t *testing.T) {
c, _ := cid.Decode(testCid)
c2, _ := cid.Decode(testCid2)
c3, _ := cid.Decode(testCid3)
@ -195,9 +213,9 @@ func TestPinListEndpoint(t *testing.T) {
}
simulateAnswer(api.RpcChan(), pList, nil)
var resp pinListResp
makeGet(t, "/pins", &resp)
var resp statusResp
makeGet(t, "/status", &resp)
if len(resp) != 5 {
t.Error("unexpected pinListResp: ", resp)
t.Error("unexpected statusResp: ", resp)
}
}

View File

@ -8,15 +8,15 @@ import (
"strings"
"sync"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
multiaddr "github.com/multiformats/go-multiaddr"
host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host"
multiaddr "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr"
swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm"
basichost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic"
peerstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
crypto "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
// Cluster is the main IPFS cluster component. It provides
@ -24,32 +24,34 @@ import (
type Cluster struct {
ctx context.Context
config *ClusterConfig
config *Config
host host.Host
consensus *ClusterConsensus
api ClusterAPI
consensus *Consensus
api API
ipfs IPFSConnector
state ClusterState
state State
tracker PinTracker
rpcCh chan RPC
shutdownLock sync.Mutex
shutdown bool
shutdownCh chan struct{}
wg sync.WaitGroup
}
// NewCluster builds a ready-to-start IPFS Cluster. It takes a ClusterAPI,
// an IPFSConnector and a ClusterState as parameters, allowing the user,
// NewCluster builds a ready-to-start IPFS Cluster. It takes a API,
// an IPFSConnector and a State as parameters, allowing the user,
// to provide custom implementations of these components.
func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state ClusterState, tracker PinTracker) (*Cluster, error) {
func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker) (*Cluster, error) {
ctx := context.Background()
host, err := makeHost(ctx, cfg)
if err != nil {
return nil, err
}
consensus, err := NewClusterConsensus(cfg, host, state)
consensus, err := NewConsensus(cfg, host, state)
if err != nil {
logger.Errorf("error creating consensus: %s", err)
return nil, err
@ -64,14 +66,13 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
ipfs: ipfs,
state: state,
tracker: tracker,
rpcCh: make(chan RPC),
shutdownCh: make(chan struct{}),
}
logger.Info("starting IPFS Cluster")
cluster.run()
logger.Info("performing State synchronization")
cluster.Sync()
return cluster, nil
}
@ -107,17 +108,77 @@ func (c *Cluster) Shutdown() error {
return nil
}
func (c *Cluster) Sync() error {
// LocalSync makes sure that the current state of the Cluster matches
// and the desired IPFS daemon state are aligned. It makes sure that
// IPFS is pinning content that should be pinned locally, and not pinning
// other content. It will also try to recover any failed pin or unpin
// operations by retrigerring them.
func (c *Cluster) LocalSync() ([]Pin, error) {
cState, err := c.consensus.State()
if err != nil {
return err
return nil, err
}
changed := c.tracker.SyncState(cState)
for _, p := range changed {
logger.Debugf("recovering %s", p.Cid)
c.tracker.Recover(p.Cid)
err = c.tracker.Recover(p.Cid)
if err != nil {
logger.Errorf("Error recovering %s: %s", p.Cid, err)
return nil, err
}
}
return nil
return c.tracker.ListPins(), nil
}
// LocalSyncCid makes sure that the current state of the cluster
// and the desired IPFS daemon state are aligned for a given Cid. It
// makes sure that IPFS is pinning content that should be pinned locally,
// and not pinning other content. It will also try to recover any failed
// pin or unpin operations by retriggering them.
func (c *Cluster) LocalSyncCid(h *cid.Cid) (Pin, error) {
changed := c.tracker.Sync(h)
if changed {
err := c.tracker.Recover(h)
if err != nil {
logger.Errorf("Error recovering %s: %s", h, err)
return Pin{}, err
}
}
return c.tracker.GetPin(h), nil
}
// GlobalSync triggers Sync() operations in all members of the Cluster.
func (c *Cluster) GlobalSync() ([]Pin, error) {
return c.Status(), nil
}
// GlobalSunc triggers a Sync() operation for a given Cid in all members
// of the Cluster.
func (c *Cluster) GlobalSyncCid(h *cid.Cid) (Pin, error) {
return c.StatusCid(h), nil
}
// Status returns the last known status for all Pins tracked by Cluster.
func (c *Cluster) Status() []Pin {
// TODO: Global
return c.tracker.ListPins()
}
// StatusCid returns the last known status for a given Cid
func (c *Cluster) StatusCid(h *cid.Cid) Pin {
// TODO: Global
return c.tracker.GetPin(h)
}
// Pins returns the list of Cids managed by Cluster and which are part
// of the current global state. This is the source of truth as to which
// pins are managed, but does not indicate if the item is successfully pinned.
func (c *Cluster) Pins() []*cid.Cid {
cState, err := c.consensus.State()
if err != nil {
return []*cid.Cid{}
}
return cState.ListPins()
}
// Pin makes the cluster Pin a Cid. This implies adding the Cid
@ -177,41 +238,39 @@ func (c *Cluster) run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.ctx = ctx
ipfsCh := c.ipfs.RpcChan()
consensusCh := c.consensus.RpcChan()
apiCh := c.api.RpcChan()
trackerCh := c.tracker.RpcChan()
var op ClusterRPC
var op RPC
for {
select {
case op = <-ipfsCh:
case op = <-c.ipfs.RpcChan():
goto HANDLEOP
case op = <-consensusCh:
case op = <-c.consensus.RpcChan():
goto HANDLEOP
case op = <-apiCh:
case op = <-c.api.RpcChan():
goto HANDLEOP
case op = <-trackerCh:
case op = <-c.tracker.RpcChan():
goto HANDLEOP
case op = <-c.rpcCh:
goto HANDLEOP
case <-c.shutdownCh:
return
}
HANDLEOP:
switch op.(type) {
case *CidClusterRPC:
crpc := op.(*CidClusterRPC)
go c.handleCidRPC(crpc)
case *GenericClusterRPC:
grpc := op.(*GenericClusterRPC)
go c.handleGenericRPC(grpc)
case *CidRPC:
crpc := op.(*CidRPC)
go c.handleCidNewRPC(crpc)
case *GenericRPC:
grpc := op.(*GenericRPC)
go c.handleGenericNewRPC(grpc)
default:
logger.Error("unknown ClusterRPC type")
logger.Error("unknown RPC type")
}
}
}()
}
func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {
func (c *Cluster) handleGenericNewRPC(grpc *GenericRPC) {
var data interface{} = nil
var err error = nil
switch grpc.Op() {
@ -220,11 +279,15 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {
case MemberListRPC:
data = c.Members()
case PinListRPC:
data = c.tracker.ListPins()
case SyncRPC:
err = c.Sync()
data = c.Pins()
case LocalSyncRPC:
data, err = c.LocalSync()
case GlobalSyncRPC:
data, err = c.GlobalSync()
case StatusRPC:
data = c.Status()
case RollbackRPC:
state, ok := grpc.Argument.(ClusterState)
state, ok := grpc.Argument.(State)
if !ok {
err = errors.New("bad RollbackRPC type")
break
@ -233,13 +296,13 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {
case LeaderRPC:
// Leader RPC is a RPC that needs to be run
// by the Consensus Leader. Arguments is a wrapped RPC.
rpc, ok := grpc.Argument.(*ClusterRPC)
rpc, ok := grpc.Argument.(*RPC)
if !ok {
err = errors.New("bad LeaderRPC type")
}
data, err = c.leaderRPC(rpc)
data, err = c.leaderNewRPC(rpc)
default:
logger.Error("unknown operation for GenericClusterRPC. Ignoring.")
logger.Error("unknown operation for GenericRPC. Ignoring.")
}
resp := RPCResponse{
@ -252,7 +315,7 @@ func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {
// handleOp takes care of running the necessary action for a
// clusterRPC request and sending the response.
func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) {
func (c *Cluster) handleCidNewRPC(crpc *CidRPC) {
var data interface{} = nil
var err error = nil
switch crpc.Op() {
@ -278,8 +341,14 @@ func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) {
}
case IPFSIsPinnedRPC:
data, err = c.ipfs.IsPinned(crpc.CID)
case StatusCidRPC:
data = c.StatusCid(crpc.CID)
case LocalSyncCidRPC:
data, err = c.LocalSyncCid(crpc.CID)
case GlobalSyncCidRPC:
data, err = c.GlobalSyncCid(crpc.CID)
default:
logger.Error("unknown operation for CidClusterRPC. Ignoring.")
logger.Error("unknown operation for CidRPC. Ignoring.")
}
resp := RPCResponse{
@ -291,12 +360,12 @@ func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) {
}
// This uses libp2p to contact the cluster leader and ask him to do something
func (c *Cluster) leaderRPC(rpc *ClusterRPC) (interface{}, error) {
func (c *Cluster) leaderNewRPC(rpc *RPC) (interface{}, error) {
return nil, errors.New("not implemented yet")
}
// makeHost makes a libp2p-host
func makeHost(ctx context.Context, cfg *ClusterConfig) (host.Host, error) {
func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
ps := peerstore.NewPeerstore()
peerID, err := peer.IDB58Decode(cfg.ID)
if err != nil {

View File

@ -5,11 +5,11 @@ import (
"testing"
"time"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
type mockComponent struct {
rpcCh chan ClusterRPC
rpcCh chan RPC
returnError bool
}
@ -17,7 +17,7 @@ func (c *mockComponent) Shutdown() error {
return nil
}
func (c *mockComponent) RpcChan() <-chan ClusterRPC {
func (c *mockComponent) RpcChan() <-chan RPC {
return c.rpcCh
}
@ -52,9 +52,9 @@ func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) {
func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState, *MapPinTracker) {
api := &mockApi{}
api.rpcCh = make(chan ClusterRPC, 2)
api.rpcCh = make(chan RPC, 2)
ipfs := &mockConnector{}
ipfs.rpcCh = make(chan ClusterRPC, 2)
ipfs.rpcCh = make(chan RPC, 2)
cfg := testingConfig()
st := NewMapState()
tracker := NewMapPinTracker()
@ -87,11 +87,11 @@ func testClusterShutdown(t *testing.T) {
}
}
func TestClusterSync(t *testing.T) {
func TestClusterLocalSync(t *testing.T) {
cl, _, _, st, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
err := cl.Sync()
_, err := cl.LocalSync()
if err == nil {
t.Error("expected an error as there is no state to sync")
}
@ -102,7 +102,7 @@ func TestClusterSync(t *testing.T) {
t.Fatal("pin should have worked:", err)
}
err = cl.Sync()
_, err = cl.LocalSync()
if err != nil {
t.Fatal("sync after pinning should have worked:", err)
}
@ -110,7 +110,7 @@ func TestClusterSync(t *testing.T) {
// Modify state on the side so the sync does not
// happen on an empty slide
st.RmPin(c)
err = cl.Sync()
_, err = cl.LocalSync()
if err != nil {
t.Fatal("sync with recover should have worked:", err)
}
@ -185,7 +185,7 @@ func TestClusterRun(t *testing.T) {
// Generic RPC
for i := 0; i < NoopRPC; i++ {
rpc := RPC(RPCOp(i), "something")
rpc := NewRPC(RPCOp(i), "something")
switch i % 4 {
case 0:
ipfs.rpcCh <- rpc
@ -208,7 +208,7 @@ func TestClusterRun(t *testing.T) {
// Cid RPC
c, _ := cid.Decode(testCid)
for i := 0; i < NoopRPC; i++ {
rpc := RPC(RPCOp(i), c)
rpc := NewRPC(RPCOp(i), c)
switch i % 4 {
case 0:
ipfs.rpcCh <- rpc

View File

@ -5,11 +5,11 @@ import (
"io/ioutil"
)
type ClusterConfig struct {
type Config struct {
IPFSHost string `json:"ipfs_host"`
IPFSPort int `json:"ipfs_port"`
ClusterAPIListenAddr string `json:"cluster_api_listen_addr"`
ClusterAPIListenPort int `json:"cluster_api_listen_port"`
APIListenAddr string `json:"cluster_api_listen_addr"`
APIListenPort int `json:"cluster_api_listen_port"`
IPFSAPIListenAddr string `json:"ipfs_api_listen_addr"`
IPFSAPIListenPort int `json:"ipfs_api_listen_port"`
ConsensusListenAddr string `json:"consensus_listen_addr"`
@ -20,8 +20,8 @@ type ClusterConfig struct {
RaftFolder string `json:"raft_folder"`
}
func LoadConfig(path string) (*ClusterConfig, error) {
config := &ClusterConfig{}
func LoadConfig(path string) (*Config, error) {
config := &Config{}
file, err := ioutil.ReadFile(path)
if err != nil {
return nil, err

View File

@ -1,15 +1,15 @@
package ipfscluster
func testingConfig() *ClusterConfig {
cfg := &ClusterConfig{
func testingConfig() *Config {
cfg := &Config{
ConsensusListenPort: 10000,
ConsensusListenAddr: "127.0.0.1",
ID: "QmUfSFm12eYCaRdypg48m8RqkXfLW7A2ZeGZb2skeHHDGA",
PrivateKey: "CAASqAkwggSkAgEAAoIBAQDpT16IRF6bb9tHsCbQ7M+nb2aI8sz8xyt8PoAWM42ki+SNoESIxKb4UhFxixKvtEdGxNE6aUUVc8kFk6wTStJ/X3IGiMetwkXiFiUxabUF/8A6SyvnSVDm+wFuavugpVrZikjLcfrf2xOVgnG3deQQvd/qbAv14jTwMFl+T+8d/cXBo8Mn/leLZCQun/EJEnkXP5MjgNI8XcWUE4NnH3E0ESSm6Pkm8MhMDZ2fmzNgqEyJ0GVinNgSml3Pyha3PBSj5LRczLip/ie4QkKx5OHvX2L3sNv/JIUHse5HSbjZ1c/4oGCYMVTYCykWiczrxBUOlcr8RwnZLOm4n2bCt5ZhAgMBAAECggEAVkePwfzmr7zR7tTpxeGNeXHtDUAdJm3RWwUSASPXgb5qKyXVsm5nAPX4lXDE3E1i/nzSkzNS5PgIoxNVU10cMxZs6JW0okFx7oYaAwgAddN6lxQtjD7EuGaixN6zZ1k/G6vT98iS6i3uNCAlRZ9HVBmjsOF8GtYolZqLvfZ5izEVFlLVq/BCs7Y5OrDrbGmn3XupfitVWYExV0BrHpobDjsx2fYdTZkmPpSSvXNcm4Iq2AXVQzoqAfGo7+qsuLCZtVlyTfVKQjMvE2ffzN1dQunxixOvev/fz4WSjGnRpC6QLn6Oqps9+VxQKqKuXXqUJC+U45DuvA94Of9MvZfAAQKBgQD7xmXueXRBMr2+0WftybAV024ap0cXFrCAu+KWC1SUddCfkiV7e5w+kRJx6RH1cg4cyyCL8yhHZ99Z5V0Mxa/b/usuHMadXPyX5szVI7dOGgIC9q8IijN7B7GMFAXc8+qC7kivehJzjQghpRRAqvRzjDls4gmbNPhbH1jUiU124QKBgQDtOaW5/fOEtOq0yWbDLkLdjImct6oKMLhENL6yeIKjMYgifzHb2adk7rWG3qcMrdgaFtDVfqv8UmMEkzk7bSkovMVj3SkLzMz84ii1SkSfyaCXgt/UOzDkqAUYB0cXMppYA7jxHa2OY8oEHdBgmyJXdLdzJxCp851AoTlRUSePgQKBgQCQgKgUHOUaXnMEx88sbOuBO14gMg3dNIqM+Ejt8QbURmI8k3arzqA4UK8Tbb9+7b0nzXWanS5q/TT1tWyYXgW28DIuvxlHTA01aaP6WItmagrphIelERzG6f1+9ib/T4czKmvROvDIHROjq8lZ7ERs5Pg4g+sbh2VbdzxWj49EQQKBgFEna36ZVfmMOs7mJ3WWGeHY9ira2hzqVd9fe+1qNKbHhx7mDJR9fTqWPxuIh/Vac5dZPtAKqaOEO8OQ6f9edLou+ggT3LrgsS/B3tNGOPvA6mNqrk/Yf/15TWTO+I8DDLIXc+lokbsogC+wU1z5NWJd13RZZOX/JUi63vTmonYBAoGBAIpglLCH2sPXfmguO6p8QcQcv4RjAU1c0GP4P5PNN3Wzo0ItydVd2LHJb6MdmL6ypeiwNklzPFwTeRlKTPmVxJ+QPg1ct/3tAURN/D40GYw9ojDhqmdSl4HW4d6gHS2lYzSFeU5jkG49y5nirOOoEgHy95wghkh6BfpwHujYJGw4",
IPFSAPIListenAddr: "127.0.0.1",
IPFSAPIListenPort: 10001,
ClusterAPIListenAddr: "127.0.0.1",
ClusterAPIListenPort: 10002,
APIListenAddr: "127.0.0.1",
APIListenPort: 10002,
RaftFolder: "./raftFolderFromTests",
}

View File

@ -7,12 +7,12 @@ import (
"sync"
"time"
consensus "github.com/libp2p/go-libp2p-consensus"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
libp2praft "github.com/libp2p/go-libp2p-raft"
host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host"
consensus "gx/ipfs/QmZ88KbrvZMJpXaNwAGffswcYKz8EbeafzAFGMCA6MEZKt/go-libp2p-consensus"
libp2praft "gx/ipfs/QmaofA6ApgPQm8yRojC77dQbVUatYMihdyQjB7VsAqrks1/go-libp2p-raft"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
const (
@ -41,12 +41,12 @@ type clusterLogOp struct {
Cid string
Type clusterLogOpType
ctx context.Context
rpcCh chan ClusterRPC
rpcCh chan RPC
}
// ApplyTo applies the operation to the ClusterState
// ApplyTo applies the operation to the State
func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
state, ok := cstate.(ClusterState)
state, ok := cstate.(State)
var err error
if !ok {
// Should never be here
@ -69,14 +69,14 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
MakeRPC(ctx, op.rpcCh, RPC(IPFSPinRPC, c), false)
MakeRPC(ctx, op.rpcCh, NewRPC(IPFSPinRPC, c), false)
case LogOpUnpin:
err := state.RmPin(c)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
MakeRPC(ctx, op.rpcCh, RPC(IPFSUnpinRPC, c), false)
MakeRPC(ctx, op.rpcCh, NewRPC(IPFSUnpinRPC, c), false)
default:
logger.Error("unknown clusterLogOp type. Ignoring")
}
@ -87,8 +87,8 @@ ROLLBACK:
// and therefore we need to request a rollback to the
// cluster to the previous state. This operation can only be performed
// by the cluster leader.
rllbckRPC := RPC(RollbackRPC, state)
leadrRPC := RPC(LeaderRPC, rllbckRPC)
rllbckRPC := NewRPC(RollbackRPC, state)
leadrRPC := NewRPC(LeaderRPC, rllbckRPC)
MakeRPC(ctx, op.rpcCh, leadrRPC, false)
logger.Errorf("an error ocurred when applying Op to state: %s", err)
logger.Error("a rollback was requested")
@ -96,16 +96,16 @@ ROLLBACK:
return nil, errors.New("a rollback was requested. Reason: " + err.Error())
}
// ClusterConsensus handles the work of keeping a shared-state between
// Consensus handles the work of keeping a shared-state between
// the members of an IPFS Cluster, as well as modifying that state and
// applying any updates in a thread-safe manner.
type ClusterConsensus struct {
type Consensus struct {
ctx context.Context
consensus consensus.OpLogConsensus
actor consensus.Actor
baseOp *clusterLogOp
rpcCh chan ClusterRPC
rpcCh chan RPC
p2pRaft *libp2pRaftWrap
@ -115,13 +115,13 @@ type ClusterConsensus struct {
wg sync.WaitGroup
}
// NewClusterConsensus builds a new ClusterConsensus component. The state
// NewConsensus builds a new ClusterConsensus component. The state
// is used to initialize the Consensus system, so any information in it
// is discarded.
func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState) (*ClusterConsensus, error) {
func NewConsensus(cfg *Config, host host.Host, state State) (*Consensus, error) {
logger.Info("starting Consensus component")
ctx := context.Background()
rpcCh := make(chan ClusterRPC, RPCMaxQueue)
rpcCh := make(chan RPC, RPCMaxQueue)
op := &clusterLogOp{
ctx: context.Background(),
rpcCh: rpcCh,
@ -133,7 +133,7 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
con.SetActor(actor)
cc := &ClusterConsensus{
cc := &Consensus{
ctx: ctx,
consensus: con,
baseOp: op,
@ -147,7 +147,7 @@ func NewClusterConsensus(cfg *ClusterConfig, host host.Host, state ClusterState)
return cc, nil
}
func (cc *ClusterConsensus) run() {
func (cc *Consensus) run() {
cc.wg.Add(1)
go func() {
defer cc.wg.Done()
@ -178,9 +178,9 @@ func (cc *ClusterConsensus) run() {
for !quitLoop {
select {
case <-timer.C: // Make a first sync
MakeRPC(ctx, cc.rpcCh, RPC(SyncRPC, nil), false)
MakeRPC(ctx, cc.rpcCh, NewRPC(LocalSyncRPC, nil), false)
case <-upToDate:
MakeRPC(ctx, cc.rpcCh, RPC(SyncRPC, nil), false)
MakeRPC(ctx, cc.rpcCh, NewRPC(LocalSyncRPC, nil), false)
quitLoop = true
}
}
@ -192,7 +192,7 @@ func (cc *ClusterConsensus) run() {
// Shutdown stops the component so it will not process any
// more updates. The underlying consensus is permanently
// shutdown, along with the libp2p transport.
func (cc *ClusterConsensus) Shutdown() error {
func (cc *Consensus) Shutdown() error {
cc.shutdownLock.Lock()
defer cc.shutdownLock.Unlock()
@ -240,11 +240,11 @@ func (cc *ClusterConsensus) Shutdown() error {
// RpcChan can be used by Cluster to read any
// requests from this component
func (cc *ClusterConsensus) RpcChan() <-chan ClusterRPC {
func (cc *Consensus) RpcChan() <-chan RPC {
return cc.rpcCh
}
func (cc *ClusterConsensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp {
func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp {
return &clusterLogOp{
Cid: c.String(),
Type: t,
@ -252,7 +252,7 @@ func (cc *ClusterConsensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp {
}
// AddPin submits a Cid to the shared state of the cluster.
func (cc *ClusterConsensus) AddPin(c *cid.Cid) error {
func (cc *Consensus) AddPin(c *cid.Cid) error {
// Create pin operation for the log
op := cc.op(c, LogOpPin)
_, err := cc.consensus.CommitOp(op)
@ -265,7 +265,7 @@ func (cc *ClusterConsensus) AddPin(c *cid.Cid) error {
}
// RmPin removes a Cid from the shared state of the cluster.
func (cc *ClusterConsensus) RmPin(c *cid.Cid) error {
func (cc *Consensus) RmPin(c *cid.Cid) error {
// Create unpin operation for the log
op := cc.op(c, LogOpUnpin)
_, err := cc.consensus.CommitOp(op)
@ -276,12 +276,12 @@ func (cc *ClusterConsensus) RmPin(c *cid.Cid) error {
return nil
}
func (cc *ClusterConsensus) State() (ClusterState, error) {
func (cc *Consensus) State() (State, error) {
st, err := cc.consensus.GetLogHead()
if err != nil {
return nil, err
}
state, ok := st.(ClusterState)
state, ok := st.(State)
if !ok {
return nil, errors.New("wrong state type")
}
@ -290,13 +290,13 @@ func (cc *ClusterConsensus) State() (ClusterState, error) {
// Leader() returns the peerID of the Leader of the
// cluster.
func (cc *ClusterConsensus) Leader() peer.ID {
func (cc *Consensus) Leader() peer.ID {
// FIXME: Hashicorp Raft specific
raftactor := cc.actor.(*libp2praft.Actor)
return raftactor.Leader()
}
// TODO
func (cc *ClusterConsensus) Rollback(state ClusterState) error {
func (cc *Consensus) Rollback(state State) error {
return cc.consensus.Rollback(state)
}

View File

@ -6,7 +6,7 @@ import (
"testing"
"time"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
func TestApplyToPin(t *testing.T) {
@ -14,7 +14,7 @@ func TestApplyToPin(t *testing.T) {
Cid: testCid,
Type: LogOpPin,
ctx: context.Background(),
rpcCh: make(chan ClusterRPC, 1),
rpcCh: make(chan RPC, 1),
}
st := NewMapState()
@ -30,7 +30,7 @@ func TestApplyToUnpin(t *testing.T) {
Cid: testCid,
Type: LogOpUnpin,
ctx: context.Background(),
rpcCh: make(chan ClusterRPC, 1),
rpcCh: make(chan RPC, 1),
}
st := NewMapState()
@ -54,7 +54,7 @@ func TestApplyToBadState(t *testing.T) {
Cid: testCid,
Type: LogOpUnpin,
ctx: context.Background(),
rpcCh: make(chan ClusterRPC, 1),
rpcCh: make(chan RPC, 1),
}
var st interface{}
@ -72,7 +72,7 @@ func TestApplyToBadCid(t *testing.T) {
Cid: "agadfaegf",
Type: LogOpPin,
ctx: context.Background(),
rpcCh: make(chan ClusterRPC, 1),
rpcCh: make(chan RPC, 1),
}
st := NewMapState()
@ -83,7 +83,7 @@ func cleanRaft() {
os.RemoveAll(testingConfig().RaftFolder)
}
func testingClusterConsensus(t *testing.T) *ClusterConsensus {
func testingConsensus(t *testing.T) *Consensus {
//logging.SetDebugLogging()
cfg := testingConfig()
ctx := context.Background()
@ -92,34 +92,34 @@ func testingClusterConsensus(t *testing.T) *ClusterConsensus {
t.Fatal("cannot create host:", err)
}
st := NewMapState()
cc, err := NewClusterConsensus(cfg, h, st)
cc, err := NewConsensus(cfg, h, st)
if err != nil {
t.Fatal("cannot create ClusterConsensus:", err)
t.Fatal("cannot create Consensus:", err)
}
// Oxygen for Raft to declare leader
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
return cc
}
func TestShutdownClusterConsensus(t *testing.T) {
func TestShutdownConsensus(t *testing.T) {
// Bring it up twice to make sure shutdown cleans up properly
// but also to make sure raft comes up ok when re-initialized
defer cleanRaft()
cc := testingClusterConsensus(t)
cc := testingConsensus(t)
err := cc.Shutdown()
if err != nil {
t.Fatal("ClusterConsensus cannot shutdown:", err)
t.Fatal("Consensus cannot shutdown:", err)
}
cc.Shutdown()
cc = testingClusterConsensus(t)
cc = testingConsensus(t)
err = cc.Shutdown()
if err != nil {
t.Fatal("ClusterConsensus cannot shutdown:", err)
t.Fatal("Consensus cannot shutdown:", err)
}
}
func TestConsensusPin(t *testing.T) {
cc := testingClusterConsensus(t)
cc := testingConsensus(t)
defer cleanRaft() // Remember defer runs in LIFO order
defer cc.Shutdown()
@ -132,7 +132,7 @@ func TestConsensusPin(t *testing.T) {
time.Sleep(250 * time.Millisecond)
st, err := cc.State()
if err != nil {
t.Error("error getting state:", err)
t.Fatal("error getting state:", err)
}
pins := st.ListPins()
@ -142,7 +142,7 @@ func TestConsensusPin(t *testing.T) {
}
func TestConsensusUnpin(t *testing.T) {
cc := testingClusterConsensus(t)
cc := testingConsensus(t)
defer cleanRaft()
defer cc.Shutdown()
@ -154,7 +154,7 @@ func TestConsensusUnpin(t *testing.T) {
}
func TestConsensusLeader(t *testing.T) {
cc := testingClusterConsensus(t)
cc := testingConsensus(t)
cfg := testingConfig()
pId := cfg.ID
defer cleanRaft()

View File

@ -7,7 +7,7 @@ import (
"os/user"
"path/filepath"
logging "github.com/ipfs/go-log"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
ipfscluster "github.com/ipfs/ipfs-cluster"
)
@ -26,7 +26,7 @@ func main() {
fmt.Println(err)
return
}
api, err := ipfscluster.NewHTTPClusterAPI(clusterCfg)
api, err := ipfscluster.NewHTTPAPI(clusterCfg)
if err != nil {
fmt.Println(err)
return

View File

@ -12,9 +12,9 @@ import (
"errors"
"time"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
var logger = logging.Logger("ipfs-cluster")
@ -22,28 +22,28 @@ var logger = logging.Logger("ipfs-cluster")
// Current Cluster version.
const Version = "0.0.1"
// RPCMaxQueue can be used to set the size of the ClusterRPC channels,
// RPCMaxQueue can be used to set the size of the RPC channels,
// which will start blocking on send after reaching this number.
var RPCMaxQueue = 128
// MakeRPCRetryInterval specifies how long to wait before retrying
// to put a ClusterRPC request in the channel in MakeRPC().
// to put a RPC request in the channel in MakeRPC().
var MakeRPCRetryInterval time.Duration = 1 * time.Second
// ClusterComponent represents a piece of ipfscluster. Cluster components
// usually run their own goroutines (a http server for example). They
// communicate with Cluster via a channel carrying ClusterRPC operations.
// communicate with Cluster via a channel carrying RPC operations.
// These operations request tasks from other components. This way all components
// are independent from each other and can be swapped as long as they maintain
// RPC compatibility with Cluster.
type ClusterComponent interface {
Shutdown() error
RpcChan() <-chan ClusterRPC
RpcChan() <-chan RPC
}
// ClusterAPI is a component which offers an API for Cluster. This is
// API is a component which offers an API for Cluster. This is
// a base component.
type ClusterAPI interface {
type API interface {
ClusterComponent
}
@ -64,13 +64,13 @@ type Peered interface {
SetPeers(peers []peer.ID)
}
// ClusterState represents the shared state of the cluster and it
// is used by the ClusterConsensus component to keep track of
// State represents the shared state of the cluster and it
// is used by the Consensus component to keep track of
// objects which objects are pinned. This component should be thread safe.
type ClusterState interface {
// AddPin adds a pin to the ClusterState
type State interface {
// AddPin adds a pin to the State
AddPin(*cid.Cid) error
// RmPin removes a pin from the ClusterState
// RmPin removes a pin from the State
RmPin(*cid.Cid) error
// ListPins lists all the pins in the state
ListPins() []*cid.Cid
@ -107,17 +107,17 @@ type PinTracker interface {
SyncAll() []Pin
// SyncState makes sure that the tracked Pins matches those in the
// cluster state and runs SyncAll(). It returns a list of changed Pins.
SyncState(ClusterState) []Pin
SyncState(State) []Pin
}
// MakeRPC sends a ClusterRPC object over a channel and optionally waits for a
// Response on the ClusterRPC.ResponseCh channel. It can be used by any
// MakeRPC sends a RPC object over a channel and optionally waits for a
// Response on the RPC.ResponseCh channel. It can be used by any
// ClusterComponent to simplify making RPC requests to Cluster.
// The ctx parameter must be a cancellable context, and can be used to
// timeout requests.
// If the message cannot be placed in the ClusterRPC channel, retries will be
// If the message cannot be placed in the RPC channel, retries will be
// issued every MakeRPCRetryInterval.
func MakeRPC(ctx context.Context, rpcCh chan ClusterRPC, r ClusterRPC, waitForResponse bool) RPCResponse {
func MakeRPC(ctx context.Context, rpcCh chan RPC, r RPC, waitForResponse bool) RPCResponse {
logger.Debugf("sending RPC %d", r.Op())
exitLoop := false
for !exitLoop {

View File

@ -6,7 +6,7 @@ import (
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
var (
@ -17,8 +17,8 @@ var (
)
func TestMakeRPC(t *testing.T) {
testCh := make(chan ClusterRPC, 1)
testReq := RPC(MemberListRPC, nil)
testCh := make(chan RPC, 1)
testReq := NewRPC(MemberListRPC, nil)
testResp := RPCResponse{
Data: "hey",
Error: nil,
@ -60,7 +60,7 @@ func TestMakeRPC(t *testing.T) {
// Test cancelled while waiting for response context
ctx, cancel = context.WithCancel(context.Background())
testCh = make(chan ClusterRPC, 1)
testCh = make(chan RPC, 1)
go func() {
resp := MakeRPC(ctx, testCh, testReq, true)
if resp.Error == nil || !strings.Contains(resp.Error.Error(), "timed out") {
@ -72,7 +72,7 @@ func TestMakeRPC(t *testing.T) {
time.Sleep(1 * time.Second)
}
func simulateAnswer(ch <-chan ClusterRPC, answer interface{}, err error) {
func simulateAnswer(ch <-chan RPC, answer interface{}, err error) {
go func() {
req := <-ch
req.ResponseCh() <- RPCResponse{

View File

@ -12,7 +12,7 @@ import (
"strings"
"sync"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
// IPFSHTTPConnector implements the IPFSConnector interface
@ -31,7 +31,7 @@ type IPFSHTTPConnector struct {
listenAddr string
listenPort int
handlers map[string]func(http.ResponseWriter, *http.Request)
rpcCh chan ClusterRPC
rpcCh chan RPC
listener net.Listener
server *http.Server
@ -46,7 +46,7 @@ type ipfsError struct {
}
// NewIPFSHTTPConnector creates the component and leaves it ready to be started
func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
func NewIPFSHTTPConnector(cfg *Config) (*IPFSHTTPConnector, error) {
ctx := context.Background()
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d",
cfg.IPFSAPIListenAddr, cfg.IPFSAPIListenPort))
@ -67,7 +67,7 @@ func NewIPFSHTTPConnector(cfg *ClusterConfig) (*IPFSHTTPConnector, error) {
listenAddr: cfg.IPFSAPIListenAddr,
listenPort: cfg.IPFSAPIListenPort,
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
rpcCh: make(chan RPC, RPCMaxQueue),
listener: l,
server: s,
}
@ -138,7 +138,7 @@ func (ipfs *IPFSHTTPConnector) run() {
// RpcChan can be used by Cluster to read any
// requests from this component.
func (ipfs *IPFSHTTPConnector) RpcChan() <-chan ClusterRPC {
func (ipfs *IPFSHTTPConnector) RpcChan() <-chan RPC {
return ipfs.rpcCh
}

View File

@ -9,7 +9,7 @@ import (
"strings"
"testing"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
func testServer(t *testing.T) *httptest.Server {
@ -43,7 +43,7 @@ func testServer(t *testing.T) *httptest.Server {
return ts
}
func testIPFSConnectorConfig(ts *httptest.Server) *ClusterConfig {
func testIPFSConnectorConfig(ts *httptest.Server) *Config {
url, _ := url.Parse(ts.URL)
h := strings.Split(url.Host, ":")
i, _ := strconv.Atoi(h[1])

View File

@ -3,10 +3,10 @@ package ipfscluster
import (
"path/filepath"
host "github.com/libp2p/go-libp2p-host"
libp2praft "github.com/libp2p/go-libp2p-raft"
host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host"
libp2praft "gx/ipfs/QmaofA6ApgPQm8yRojC77dQbVUatYMihdyQjB7VsAqrks1/go-libp2p-raft"
peer "github.com/libp2p/go-libp2p-peer"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
hashiraft "github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
@ -26,7 +26,7 @@ type libp2pRaftWrap struct {
// This function does all heavy the work which is specifically related to
// hashicorp's Raft. Other places should just rely on the Consensus interface.
func makeLibp2pRaft(cfg *ClusterConfig, host host.Host, state ClusterState, op *clusterLogOp) (*libp2praft.Consensus, *libp2praft.Actor, *libp2pRaftWrap, error) {
func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) (*libp2praft.Consensus, *libp2praft.Actor, *libp2pRaftWrap, error) {
logger.Debug("creating libp2p Raft transport")
transport, err := libp2praft.NewLibp2pTransportWithHost(host)
if err != nil {

54
rpc.go
View File

@ -1,8 +1,8 @@
package ipfscluster
import cid "github.com/ipfs/go-cid"
import cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
// ClusterRPC supported operations.
// RPC supported operations.
const (
PinRPC = iota
UnpinRPC
@ -14,7 +14,12 @@ const (
MemberListRPC
RollbackRPC
LeaderRPC
SyncRPC
LocalSyncRPC
LocalSyncCidRPC
GlobalSyncRPC
GlobalSyncCidRPC
StatusRPC
StatusCidRPC
NoopRPC
)
@ -22,14 +27,14 @@ const (
// RPCMethod identifies which RPC supported operation we are trying to make
type RPCOp int
// ClusterRPC represents an internal RPC operation. It should be implemented
// RPC represents an internal RPC operation. It should be implemented
// by all RPC types.
type ClusterRPC interface {
type RPC interface {
Op() RPCOp
ResponseCh() chan RPCResponse
}
// baseRPC implements ClusterRPC and can be included as anonymous
// baseRPC implements RPC and can be included as anonymous
// field in other types.
type baseRPC struct {
method RPCOp
@ -47,39 +52,40 @@ func (brpc *baseRPC) ResponseCh() chan RPCResponse {
return brpc.responseCh
}
// GenericClusterRPC is a ClusterRPC with generic arguments.
type GenericClusterRPC struct {
// GenericRPC is a ClusterRPC with generic arguments.
type GenericRPC struct {
baseRPC
Argument interface{}
}
// CidClusterRPC is a ClusterRPC whose only argument is a CID.
type CidClusterRPC struct {
// CidRPC is a ClusterRPC whose only argument is a CID.
type CidRPC struct {
baseRPC
CID *cid.Cid
}
// RPC builds a ClusterRPC request. It will create a
// CidClusterRPC if the arg is of type cid.Cid. Otherwise,
// a GenericClusterRPC is returned.
func RPC(m RPCOp, arg interface{}) ClusterRPC {
c, ok := arg.(*cid.Cid)
if ok { // Its a CID
r := new(CidClusterRPC)
// RPC builds a RPC request. It will create a
// CidRPC if the arg is of type cid.Cid. Otherwise,
// a GenericRPC is returned.
func NewRPC(m RPCOp, arg interface{}) RPC {
switch arg.(type) {
case *cid.Cid:
c := arg.(*cid.Cid)
r := new(CidRPC)
r.method = m
r.CID = c
r.responseCh = make(chan RPCResponse)
return r
default:
r := new(GenericRPC)
r.method = m
r.Argument = arg
r.responseCh = make(chan RPCResponse)
return r
}
// Its not a cid, make a generic
r := new(GenericClusterRPC)
r.method = m
r.Argument = arg
r.responseCh = make(chan RPCResponse)
return r
}
// RPCResponse carries the result of a ClusterRPC requested operation and/or
// RPCResponse carries the result of a RPC requested operation and/or
// an error to indicate if the operation was successful.
type RPCResponse struct {
Data interface{}

View File

@ -3,18 +3,18 @@ package ipfscluster
import (
"testing"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
func TestRPC(t *testing.T) {
func TestNewRPC(t *testing.T) {
c, err := cid.Decode(testCid)
if err != nil {
t.Fatal(err)
}
crpc := RPC(IPFSPinRPC, c)
_, ok := crpc.(*CidClusterRPC)
crpc := NewRPC(IPFSPinRPC, c)
_, ok := crpc.(*CidRPC)
if !ok {
t.Error("expected a CidClusterRPC")
t.Error("expected a CidRPC")
}
if crpc.Op() != IPFSPinRPC {
@ -25,10 +25,10 @@ func TestRPC(t *testing.T) {
t.Error("should have made the ResponseCh")
}
grpc := RPC(MemberListRPC, 3)
_, ok = grpc.(*GenericClusterRPC)
grpc := NewRPC(MemberListRPC, 3)
_, ok = grpc.(*GenericRPC)
if !ok {
t.Error("expected a GenericClusterRPC")
t.Error("expected a GenericRPC")
}
if grpc.Op() != MemberListRPC {

View File

@ -3,7 +3,7 @@ package ipfscluster
import (
"sync"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
// MapState is a very simple database to store
@ -12,13 +12,13 @@ type MapState struct {
mux sync.RWMutex
PinMap map[string]struct{}
rpcCh chan ClusterRPC
rpcCh chan RPC
}
func NewMapState() *MapState {
return &MapState{
PinMap: make(map[string]struct{}),
rpcCh: make(chan ClusterRPC),
rpcCh: make(chan RPC),
}
}

View File

@ -3,14 +3,23 @@ package ipfscluster
import (
"context"
"sync"
"time"
cid "github.com/ipfs/go-cid"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
const (
pinEverywhere = -1
)
// A Pin or Unpin operation will be considered failed
// if the Cid has stayed in Pinning or Unpinning state
// for longer than these values.
var (
PinningTimeout = 15 * time.Minute
UnpinningTimeout = 10 * time.Second
)
const (
PinError = iota
UnpinError
@ -24,17 +33,36 @@ type Pin struct {
Cid *cid.Cid
PinMode PinMode
Status PinStatus
TS time.Time
}
type PinMode int
type PinStatus int
func (st PinStatus) String() string {
switch st {
case PinError:
return "pin_error"
case UnpinError:
return "unpin_error"
case Pinned:
return "pinned"
case Pinning:
return "pinning"
case Unpinning:
return "unpinning"
case Unpinned:
return "unpinned"
}
return ""
}
type MapPinTracker struct {
mux sync.Mutex
status map[string]Pin
ctx context.Context
rpcCh chan ClusterRPC
rpcCh chan RPC
shutdownLock sync.Mutex
shutdown bool
@ -46,7 +74,7 @@ func NewMapPinTracker() *MapPinTracker {
ctx := context.Background()
mpt := &MapPinTracker{
status: make(map[string]Pin),
rpcCh: make(chan ClusterRPC, RPCMaxQueue),
rpcCh: make(chan RPC, RPCMaxQueue),
ctx: ctx,
shutdownCh: make(chan struct{}),
}
@ -94,6 +122,7 @@ func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error {
Cid: c,
PinMode: pinEverywhere,
Status: s,
TS: time.Now(),
}
return nil
}
@ -160,7 +189,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
return true
}
resp := MakeRPC(ctx, mpt.rpcCh, RPC(IPFSIsPinnedRPC, c), true)
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSIsPinnedRPC, c), true)
if resp.Error != nil {
if p.Status == Pinned || p.Status == Pinning {
mpt.set(c, PinError)
@ -186,8 +215,11 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
mpt.set(c, Pinned)
return true
case Unpinning:
mpt.set(c, UnpinError) // Not sure here
return true
if time.Since(p.TS) > UnpinningTimeout {
mpt.set(c, UnpinError)
return true
}
return false
case Unpinned:
mpt.set(c, UnpinError)
return true
@ -198,8 +230,11 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
mpt.set(c, PinError)
return true
case Pinning:
mpt.set(c, PinError)
return true
if time.Since(p.TS) > PinningTimeout {
mpt.set(c, PinError)
return true
}
return false
case Unpinning:
mpt.set(c, Unpinned)
return true
@ -219,10 +254,10 @@ func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
if p.Status == PinError {
MakeRPC(ctx, mpt.rpcCh, RPC(IPFSPinRPC, c), false)
MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSPinRPC, c), false)
}
if p.Status == UnpinError {
MakeRPC(ctx, mpt.rpcCh, RPC(IPFSUnpinRPC, c), false)
MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), false)
}
return nil
}
@ -239,7 +274,7 @@ func (mpt *MapPinTracker) SyncAll() []Pin {
return changedPins
}
func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin {
func (mpt *MapPinTracker) SyncState(cState State) []Pin {
clusterPins := cState.ListPins()
clusterMap := make(map[string]struct{})
// Make a map for faster lookup
@ -252,7 +287,7 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin {
var changed []Pin
mpt.mux.Lock()
// Collect items in the ClusterState not in the tracker
// Collect items in the State not in the tracker
for _, c := range clusterPins {
_, ok := mpt.status[c.String()]
if !ok {
@ -260,7 +295,7 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin {
}
}
// Collect items in the tracker not in the ClusterState
// Collect items in the tracker not in the State
for _, p := range mpt.status {
_, ok := clusterMap[p.Cid.String()]
if !ok {
@ -293,6 +328,6 @@ func (mpt *MapPinTracker) SyncState(cState ClusterState) []Pin {
return changed
}
func (mpt *MapPinTracker) RpcChan() <-chan ClusterRPC {
func (mpt *MapPinTracker) RpcChan() <-chan RPC {
return mpt.rpcCh
}