diff --git a/cluster.go b/cluster.go index f507d280..6d94738e 100644 --- a/cluster.go +++ b/cluster.go @@ -163,47 +163,79 @@ func (c *Cluster) run() { apiCh := c.api.RpcChan() trackerCh := c.tracker.RpcChan() + var op ClusterRPC for { select { - case ipfsOp := <-ipfsCh: - go c.handleOp(ipfsOp) - case consensusOp := <-consensusCh: - go c.handleOp(consensusOp) - case apiOp := <-apiCh: - go c.handleOp(apiOp) - case trackerOp := <-trackerCh: - go c.handleOp(trackerOp) + case op = <-ipfsCh: + goto HANDLEOP + case op = <-consensusCh: + goto HANDLEOP + case op = <-apiCh: + goto HANDLEOP + case op = <-trackerCh: + goto HANDLEOP case <-c.ctx.Done(): logger.Debug("Cluster is Done()") return } + + HANDLEOP: + switch op.(type) { + case *CidClusterRPC: + crpc := op.(*CidClusterRPC) + go c.handleCidRPC(crpc) + case *GenericClusterRPC: + grpc := op.(*GenericClusterRPC) + go c.handleGenericRPC(grpc) + default: + logger.Error("unknown ClusterRPC type") + } } } -// handleOp takes care of running the necessary action for a -// clusterRPC request and sending the response. -func (c *Cluster) handleOp(rpc ClusterRPC) { - var crpc *CidClusterRPC - var grpc *GenericClusterRPC - switch rpc.(type) { - case *CidClusterRPC: - crpc = rpc.(*CidClusterRPC) - case *GenericClusterRPC: - grpc = rpc.(*GenericClusterRPC) - default: - logger.Error("expected a known ClusterRPC type but got something else") - return - } - +func (c *Cluster) handleGenericRPC(grpc *GenericClusterRPC) { var data interface{} = nil var err error = nil - switch rpc.Op() { + switch grpc.Op() { case VersionRPC: data = c.Version() case MemberListRPC: data = c.Members() case PinListRPC: data = c.tracker.ListPins() + case RollbackRPC: + state, ok := grpc.Argument.(ClusterState) + if !ok { + err = errors.New("Bad RollbackRPC type") + break + } + err = c.consensus.Rollback(state) + case LeaderRPC: + // Leader RPC is a RPC that needs to be run + // by the Consensus Leader. Arguments is a wrapped RPC. + rpc, ok := grpc.Argument.(*ClusterRPC) + if !ok { + err = errors.New("Bad LeaderRPC type") + } + data, err = c.leaderRPC(rpc) + default: + logger.Error("unknown operation for GenericClusterRPC. Ignoring.") + } + + resp := RPCResponse{ + Data: data, + Error: err, + } + + grpc.ResponseCh() <- resp +} + +// handleOp takes care of running the necessary action for a +// clusterRPC request and sending the response. +func (c *Cluster) handleCidRPC(crpc *CidClusterRPC) { + var data interface{} = nil + var err error = nil + switch crpc.Op() { case PinRPC: err = c.Pin(crpc.CID) case UnpinRPC: @@ -226,23 +258,8 @@ func (c *Cluster) handleOp(rpc ClusterRPC) { } case IPFSIsPinnedRPC: data, err = c.ipfs.IsPinned(crpc.CID) - case RollbackRPC: - state, ok := grpc.Argument.(ClusterState) - if !ok { - err = errors.New("Bad RollbackRPC type") - break - } - err = c.consensus.Rollback(state) - case LeaderRPC: - // Leader RPC is a RPC that needs to be run - // by the Consensus Leader. Arguments is a wrapped RPC. - rpc, ok := grpc.Argument.(*ClusterRPC) - if !ok { - err = errors.New("Bad LeaderRPC type") - } - data, err = c.leaderRPC(rpc) default: - logger.Error("Unknown operation. Ignoring") + logger.Error("unknown operation for CidClusterRPC. Ignoring.") } resp := RPCResponse{ @@ -250,7 +267,7 @@ func (c *Cluster) handleOp(rpc ClusterRPC) { Error: err, } - rpc.ResponseCh() <- resp + crpc.ResponseCh() <- resp } // This uses libp2p to contact the cluster leader and ask him to do something diff --git a/cluster_test.go b/cluster_test.go new file mode 100644 index 00000000..15804b28 --- /dev/null +++ b/cluster_test.go @@ -0,0 +1,228 @@ +package ipfscluster + +import ( + "errors" + "testing" + "time" + + cid "github.com/ipfs/go-cid" +) + +type mockComponent struct { + rpcCh chan ClusterRPC + returnError bool +} + +func (c *mockComponent) Shutdown() error { + return nil +} + +func (c *mockComponent) RpcChan() <-chan ClusterRPC { + return c.rpcCh +} + +type mockApi struct { + mockComponent +} + +type mockConnector struct { + mockComponent +} + +func (ipfs *mockConnector) Pin(c *cid.Cid) error { + if ipfs.returnError { + return errors.New("") + } + return nil +} + +func (ipfs *mockConnector) Unpin(c *cid.Cid) error { + if ipfs.returnError { + return errors.New("") + } + return nil +} + +func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) { + if ipfs.returnError { + return false, errors.New("") + } + return true, nil +} + +func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState, *MapPinTracker) { + api := &mockApi{} + api.rpcCh = make(chan ClusterRPC, 2) + ipfs := &mockConnector{} + ipfs.rpcCh = make(chan ClusterRPC, 2) + cfg := testingConfig() + st := NewMapState() + tracker := NewMapPinTracker() + + cl, err := NewCluster( + cfg, + api, + ipfs, + st, + tracker, + ) + if err != nil { + t.Fatal("cannot create cluster:", err) + } + return cl, api, ipfs, st, tracker +} + +func testClusterShutdown(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + err := cl.Shutdown() + if err != nil { + t.Error("cluster shutdown failed:", err) + } + cl, _, _, _, _ = testingCluster(t) + err = cl.Shutdown() + if err != nil { + t.Error("cluster shutdown failed:", err) + } +} + +func testClusterSync(t *testing.T) { + cl, _, _, st, _ := testingCluster(t) + defer cl.Shutdown() + err := cl.Sync() + if err != nil { + t.Error(err) + } + + c, _ := cid.Decode(testCid) + err = cl.Pin(c) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + // Modify state on the side so the sync does not + // happen on an empty slide + st.RmPin(c) + err = cl.Sync() + if err != nil { + t.Fatal("sync should have worked:", err) + } + + // test an error case + cl.consensus.Shutdown() + err = cl.Sync() + if err == nil { + t.Error("expected an error but things worked") + } +} + +func testClusterPin(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + + c, _ := cid.Decode(testCid) + err := cl.Pin(c) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + // test an error case + cl.consensus.Shutdown() + err = cl.Pin(c) + if err == nil { + t.Error("expected an error but things worked") + } +} + +func testClusterUnpin(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + + c, _ := cid.Decode(testCid) + err := cl.Unpin(c) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + // test an error case + cl.consensus.Shutdown() + err = cl.Unpin(c) + if err == nil { + t.Error("expected an error but things worked") + } +} + +func TestClusterMembers(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + m := cl.Members() + id := testingConfig().ID + if len(m) != 1 || m[0].Pretty() != id { + t.Error("bad Members()") + } +} + +func TestVersion(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + if cl.Version() != Version { + t.Error("bad Version()") + } +} + +func TestClusterRun(t *testing.T) { + cl, api, ipfs, _, tracker := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + // We sent RPCs all all types with one of the + // RpcChannels and make sure there is always a response + // We don't care about the value of that response now. We leave + // that for end-to-end tests + + // Generic RPC + for i := 0; i < NoopRPC; i++ { + rpc := RPC(RPCOp(i), "something") + switch i % 4 { + case 0: + ipfs.rpcCh <- rpc + case 1: + cl.consensus.rpcCh <- rpc + case 2: + api.rpcCh <- rpc + case 3: + tracker.rpcCh <- rpc + } + // Wait for a response + timer := time.NewTimer(time.Second) + select { + case <-rpc.ResponseCh(): + case <-timer.C: + t.Errorf("Generic RPC %d was not handled correctly by Cluster", i) + } + } + + // Cid RPC + c, _ := cid.Decode(testCid) + for i := 0; i < NoopRPC; i++ { + rpc := RPC(RPCOp(i), c) + switch i % 4 { + case 0: + ipfs.rpcCh <- rpc + case 1: + cl.consensus.rpcCh <- rpc + case 2: + api.rpcCh <- rpc + case 3: + tracker.rpcCh <- rpc + } + timer := time.NewTimer(time.Second) + select { + case <-rpc.ResponseCh(): + case <-timer.C: + t.Errorf("Cid RPC %d was not handled correctly by Cluster", i) + } + } +} diff --git a/rpc.go b/rpc.go index 53f0e677..27a2c240 100644 --- a/rpc.go +++ b/rpc.go @@ -14,6 +14,8 @@ const ( MemberListRPC RollbackRPC LeaderRPC + + NoopRPC ) // RPCMethod identifies which RPC supported operation we are trying to make diff --git a/tracker.go b/tracker.go index 0209312c..7d4795c0 100644 --- a/tracker.go +++ b/tracker.go @@ -140,8 +140,9 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { p := mpt.get(c) + // We assume errors will need a Recover() so we return true if p.Status == PinError || p.Status == UnpinError { - return false + return true } resp := MakeRPC(ctx, mpt.rpcCh, RPC(IPFSIsPinnedRPC, c), true)