Add cluster_test

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-14 17:25:21 +01:00
parent 0f31995bd6
commit f56a9dd77e
4 changed files with 290 additions and 42 deletions

View File

@ -163,47 +163,79 @@ func (c *Cluster) run() {
apiCh := c.api.RpcChan()
trackerCh := c.tracker.RpcChan()
var op ClusterRPC
for {
select {
case ipfsOp := <-ipfsCh:
go c.handleOp(ipfsOp)
case consensusOp := <-consensusCh:
go c.handleOp(consensusOp)
case apiOp := <-apiCh:
go c.handleOp(apiOp)
case trackerOp := <-trackerCh:
go c.handleOp(trackerOp)
case op = <-ipfsCh:
goto HANDLEOP
case op = <-consensusCh:
goto HANDLEOP
case op = <-apiCh:
goto HANDLEOP
case op = <-trackerCh:
goto HANDLEOP
case <-c.ctx.Done():
logger.Debug("Cluster is Done()")
return
}
HANDLEOP:
switch op.(type) {
case *CidClusterRPC:
crpc := op.(*CidClusterRPC)
go c.handleCidRPC(crpc)
case *GenericClusterRPC:
grpc := op.(*GenericClusterRPC)
go c.handleGenericRPC(grpc)
default:
logger.Error("unknown ClusterRPC type")
}
}
}
// handleOp takes care of running the necessary action for a
// clusterRPC request and sending the response.
func (c *Cluster) handleOp(rpc ClusterRPC) {
var crpc *CidClusterRPC
var grpc *GenericClusterRPC
switch rpc.(type) {
case *CidClusterRPC:
crpc = rpc.(*CidClusterRPC)
case *GenericClusterRPC:
grpc = rpc.(*GenericClusterRPC)
default:
logger.Error("expected a known ClusterRPC type but got something else")
return
}
func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) {
var data interface{} = nil
var err error = nil
switch rpc.Op() {
switch grpc.Op() {
case VersionRPC:
data = c.Version()
case MemberListRPC:
data = c.Members()
case PinListRPC:
data = c.tracker.ListPins()
case RollbackRPC:
state, ok := grpc.Argument.(ClusterState)
if !ok {
err = errors.New("Bad RollbackRPC type")
break
}
err = c.consensus.Rollback(state)
case LeaderRPC:
// Leader RPC is a RPC that needs to be run
// by the Consensus Leader. Arguments is a wrapped RPC.
rpc, ok := grpc.Argument.(*ClusterRPC)
if !ok {
err = errors.New("Bad LeaderRPC type")
}
data, err = c.leaderRPC(rpc)
default:
logger.Error("unknown operation for GenericClusterRPC. Ignoring.")
}
resp := RPCResponse{
Data: data,
Error: err,
}
grpc.ResponseCh() <- resp
}
// handleOp takes care of running the necessary action for a
// clusterRPC request and sending the response.
func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) {
var data interface{} = nil
var err error = nil
switch crpc.Op() {
case PinRPC:
err = c.Pin(crpc.CID)
case UnpinRPC:
@ -226,23 +258,8 @@ func (c *Cluster) handleOp(rpc ClusterRPC) {
}
case IPFSIsPinnedRPC:
data, err = c.ipfs.IsPinned(crpc.CID)
case RollbackRPC:
state, ok := grpc.Argument.(ClusterState)
if !ok {
err = errors.New("Bad RollbackRPC type")
break
}
err = c.consensus.Rollback(state)
case LeaderRPC:
// Leader RPC is a RPC that needs to be run
// by the Consensus Leader. Arguments is a wrapped RPC.
rpc, ok := grpc.Argument.(*ClusterRPC)
if !ok {
err = errors.New("Bad LeaderRPC type")
}
data, err = c.leaderRPC(rpc)
default:
logger.Error("Unknown operation. Ignoring")
logger.Error("unknown operation for CidClusterRPC. Ignoring.")
}
resp := RPCResponse{
@ -250,7 +267,7 @@ func (c *Cluster) handleOp(rpc ClusterRPC) {
Error: err,
}
rpc.ResponseCh() <- resp
crpc.ResponseCh() <- resp
}
// This uses libp2p to contact the cluster leader and ask him to do something

228
cluster_test.go Normal file
View File

@ -0,0 +1,228 @@
package ipfscluster
import (
"errors"
"testing"
"time"
cid "github.com/ipfs/go-cid"
)
type mockComponent struct {
rpcCh chan ClusterRPC
returnError bool
}
func (c *mockComponent) Shutdown() error {
return nil
}
func (c *mockComponent) RpcChan() <-chan ClusterRPC {
return c.rpcCh
}
type mockApi struct {
mockComponent
}
type mockConnector struct {
mockComponent
}
func (ipfs *mockConnector) Pin(c *cid.Cid) error {
if ipfs.returnError {
return errors.New("")
}
return nil
}
func (ipfs *mockConnector) Unpin(c *cid.Cid) error {
if ipfs.returnError {
return errors.New("")
}
return nil
}
func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) {
if ipfs.returnError {
return false, errors.New("")
}
return true, nil
}
func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState, *MapPinTracker) {
api := &mockApi{}
api.rpcCh = make(chan ClusterRPC, 2)
ipfs := &mockConnector{}
ipfs.rpcCh = make(chan ClusterRPC, 2)
cfg := testingConfig()
st := NewMapState()
tracker := NewMapPinTracker()
cl, err := NewCluster(
cfg,
api,
ipfs,
st,
tracker,
)
if err != nil {
t.Fatal("cannot create cluster:", err)
}
return cl, api, ipfs, st, tracker
}
func testClusterShutdown(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
err := cl.Shutdown()
if err != nil {
t.Error("cluster shutdown failed:", err)
}
cl, _, _, _, _ = testingCluster(t)
err = cl.Shutdown()
if err != nil {
t.Error("cluster shutdown failed:", err)
}
}
func testClusterSync(t *testing.T) {
cl, _, _, st, _ := testingCluster(t)
defer cl.Shutdown()
err := cl.Sync()
if err != nil {
t.Error(err)
}
c, _ := cid.Decode(testCid)
err = cl.Pin(c)
if err != nil {
t.Fatal("pin should have worked:", err)
}
// Modify state on the side so the sync does not
// happen on an empty slide
st.RmPin(c)
err = cl.Sync()
if err != nil {
t.Fatal("sync should have worked:", err)
}
// test an error case
cl.consensus.Shutdown()
err = cl.Sync()
if err == nil {
t.Error("expected an error but things worked")
}
}
func testClusterPin(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
c, _ := cid.Decode(testCid)
err := cl.Pin(c)
if err != nil {
t.Fatal("pin should have worked:", err)
}
// test an error case
cl.consensus.Shutdown()
err = cl.Pin(c)
if err == nil {
t.Error("expected an error but things worked")
}
}
func testClusterUnpin(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
c, _ := cid.Decode(testCid)
err := cl.Unpin(c)
if err != nil {
t.Fatal("pin should have worked:", err)
}
// test an error case
cl.consensus.Shutdown()
err = cl.Unpin(c)
if err == nil {
t.Error("expected an error but things worked")
}
}
func TestClusterMembers(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
m := cl.Members()
id := testingConfig().ID
if len(m) != 1 || m[0].Pretty() != id {
t.Error("bad Members()")
}
}
func TestVersion(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
if cl.Version() != Version {
t.Error("bad Version()")
}
}
func TestClusterRun(t *testing.T) {
cl, api, ipfs, _, tracker := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
// We sent RPCs all all types with one of the
// RpcChannels and make sure there is always a response
// We don't care about the value of that response now. We leave
// that for end-to-end tests
// Generic RPC
for i := 0; i < NoopRPC; i++ {
rpc := RPC(RPCOp(i), "something")
switch i % 4 {
case 0:
ipfs.rpcCh <- rpc
case 1:
cl.consensus.rpcCh <- rpc
case 2:
api.rpcCh <- rpc
case 3:
tracker.rpcCh <- rpc
}
// Wait for a response
timer := time.NewTimer(time.Second)
select {
case <-rpc.ResponseCh():
case <-timer.C:
t.Errorf("Generic RPC %d was not handled correctly by Cluster", i)
}
}
// Cid RPC
c, _ := cid.Decode(testCid)
for i := 0; i < NoopRPC; i++ {
rpc := RPC(RPCOp(i), c)
switch i % 4 {
case 0:
ipfs.rpcCh <- rpc
case 1:
cl.consensus.rpcCh <- rpc
case 2:
api.rpcCh <- rpc
case 3:
tracker.rpcCh <- rpc
}
timer := time.NewTimer(time.Second)
select {
case <-rpc.ResponseCh():
case <-timer.C:
t.Errorf("Cid RPC %d was not handled correctly by Cluster", i)
}
}
}

2
rpc.go
View File

@ -14,6 +14,8 @@ const (
MemberListRPC
RollbackRPC
LeaderRPC
NoopRPC
)
// RPCMethod identifies which RPC supported operation we are trying to make

View File

@ -140,8 +140,9 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
p := mpt.get(c)
// We assume errors will need a Recover() so we return true
if p.Status == PinError || p.Status == UnpinError {
return false
return true
}
resp := MakeRPC(ctx, mpt.rpcCh, RPC(IPFSIsPinnedRPC, c), true)