From 0422ceed1609c6fd544e92ee6c3561bed2389c90 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 16 Dec 2016 12:40:28 +0100 Subject: [PATCH] Preliminary support for Remote RPC operations License: MIT Signed-off-by: Hector Sanjuan --- api.go | 8 +-- api_test.go | 4 +- cluster.go | 154 +++++++++++++++++++++++++++-------------- cluster_test.go | 71 +++++++++++++++---- consensus.go | 12 ++-- consensus_test.go | 11 ++- ipfs-cluster/main.go | 5 +- ipfs_cluster.go | 38 +++++++++- ipfs_cluster_test.go | 2 +- ipfs_connector.go | 2 +- ipfs_connector_test.go | 2 +- raft.go | 11 ++- remote.go | 139 +++++++++++++++++++++++++++++++++++++ rpc.go | 17 ++++- rpc_test.go | 2 +- state.go | 2 +- tracker.go | 2 +- util.go | 39 +++++++++++ 18 files changed, 424 insertions(+), 97 deletions(-) create mode 100644 remote.go create mode 100644 util.go diff --git a/api.go b/api.go index 9201d8d0..597305ec 100644 --- a/api.go +++ b/api.go @@ -9,9 +9,9 @@ import ( "strings" "sync" - peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-peer" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" mux "github.com/gorilla/mux" ) @@ -296,7 +296,7 @@ func (api *ClusterHTTPAPI) statusCidHandler(w http.ResponseWriter, r *http.Reque func (api *ClusterHTTPAPI) syncHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(api.ctx) defer cancel() - rRpc := NewRPC(GlobalSyncRPC, nil) + rRpc := NewRPC(LocalSyncRPC, nil) resp := MakeRPC(ctx, api.rpcCh, rRpc, true) if checkResponse(w, rRpc.Op(), resp) { sendStatusResponse(w, resp) @@ -308,7 +308,7 @@ func (api *ClusterHTTPAPI) syncCidHandler(w http.ResponseWriter, r *http.Request defer cancel() if c := parseCidOrError(w, r); c != nil { - op := NewRPC(GlobalSyncCidRPC, c) + op := NewRPC(LocalSyncCidRPC, c) resp := MakeRPC(ctx, api.rpcCh, op, true) if checkResponse(w, op.Op(), resp) { sendStatusCidResponse(w, resp) diff --git a/api_test.go b/api_test.go index 836186bd..4159563b 100644 --- a/api_test.go +++ b/api_test.go @@ -8,8 +8,8 @@ import ( "net/http" "testing" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" - peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" + cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" ) var ( diff --git a/cluster.go b/cluster.go index cf13b515..223946dd 100644 --- a/cluster.go +++ b/cluster.go @@ -8,15 +8,15 @@ import ( "strings" "sync" - host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" - multiaddr "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr" - swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm" - basichost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic" - peerstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" - peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" - crypto "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto" + crypto "github.com/libp2p/go-libp2p-crypto" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + swarm "github.com/libp2p/go-libp2p-swarm" + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" + multiaddr "github.com/multiformats/go-multiaddr" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) // Cluster is the main IPFS cluster component. It provides @@ -28,6 +28,7 @@ type Cluster struct { host host.Host consensus *Consensus + remote Remote api API ipfs IPFSConnector state State @@ -44,7 +45,7 @@ type Cluster struct { // NewCluster builds a ready-to-start IPFS Cluster. It takes a API, // an IPFSConnector and a State as parameters, allowing the user, // to provide custom implementations of these components. -func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker) (*Cluster, error) { +func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker, remote Remote) (*Cluster, error) { ctx := context.Background() host, err := makeHost(ctx, cfg) if err != nil { @@ -57,11 +58,14 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P return nil, err } + remote.SetHost(host) + cluster := &Cluster{ ctx: ctx, config: cfg, host: host, consensus: consensus, + remote: remote, api: api, ipfs: ipfs, state: state, @@ -183,38 +187,46 @@ func (c *Cluster) Pins() []*cid.Cid { // Pin makes the cluster Pin a Cid. This implies adding the Cid // to the IPFS Cluster peers shared-state. Depending on the cluster -// pinning strategy, the IPFSConnector may then request the IPFS daemon -// to pin the Cid. +// pinning strategy, the PinTracker may then request the IPFS daemon +// to pin the Cid. When the current node is not the cluster leader, +// the request is forwarded to the leader. // // Pin returns an error if the operation could not be persisted // to the global state. Pin does not reflect the success or failure // of underlying IPFS daemon pinning operations. func (c *Cluster) Pin(h *cid.Cid) error { - // TODO: Check this hash makes any sense + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + logger.Info("pinning:", h) - err := c.consensus.AddPin(h) - if err != nil { - logger.Error(err) - return err + rpc := NewRPC(ConsensusAddPinRPC, h) + wrpc := NewRPC(LeaderRPC, rpc) + resp := MakeRPC(ctx, c.rpcCh, wrpc, true) + if resp.Error != nil { + logger.Error(resp.Error) + return resp.Error } return nil } // Unpin makes the cluster Unpin a Cid. This implies adding the Cid -// to the IPFS Cluster peers shared-state. Depending on the cluster -// unpinning strategy, the IPFSConnector may then request the IPFS daemon -// to unpin the Cid. +// to the IPFS Cluster peers shared-state. When the current node is +// not the cluster leader, the request is forwarded to the leader. // // Unpin returns an error if the operation could not be persisted // to the global state. Unpin does not reflect the success or failure // of underlying IPFS daemon unpinning operations. func (c *Cluster) Unpin(h *cid.Cid) error { - // TODO: Check this hash makes any sense + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + logger.Info("unpinning:", h) - err := c.consensus.RmPin(h) - if err != nil { - logger.Error(err) - return err + rpc := NewRPC(ConsensusRmPinRPC, h) + wrpc := NewRPC(LeaderRPC, rpc) + resp := MakeRPC(ctx, c.rpcCh, wrpc, true) + if resp.Error != nil { + logger.Error(resp.Error) + return resp.Error } return nil } @@ -250,6 +262,8 @@ func (c *Cluster) run() { goto HANDLEOP case op = <-c.tracker.RpcChan(): goto HANDLEOP + case op = <-c.remote.RpcChan(): + goto HANDLEOP case op = <-c.rpcCh: goto HANDLEOP case <-c.shutdownCh: @@ -259,18 +273,25 @@ func (c *Cluster) run() { switch op.(type) { case *CidRPC: crpc := op.(*CidRPC) - go c.handleCidNewRPC(crpc) + go c.handleCidRPC(crpc) case *GenericRPC: grpc := op.(*GenericRPC) - go c.handleGenericNewRPC(grpc) + go c.handleGenericRPC(grpc) + case *WrappedRPC: + wrpc := op.(*WrappedRPC) + go c.handleWrappedRPC(wrpc) default: logger.Error("unknown RPC type") + op.ResponseCh() <- RPCResponse{ + Data: nil, + Error: errors.New("unknown RPC type"), + } } } }() } -func (c *Cluster) handleGenericNewRPC(grpc *GenericRPC) { +func (c *Cluster) handleGenericRPC(grpc *GenericRPC) { var data interface{} = nil var err error = nil switch grpc.Op() { @@ -293,14 +314,6 @@ func (c *Cluster) handleGenericNewRPC(grpc *GenericRPC) { break } err = c.consensus.Rollback(state) - case LeaderRPC: - // Leader RPC is a RPC that needs to be run - // by the Consensus Leader. Arguments is a wrapped RPC. - rpc, ok := grpc.Argument.(*RPC) - if !ok { - err = errors.New("bad LeaderRPC type") - } - data, err = c.leaderNewRPC(rpc) default: logger.Error("unknown operation for GenericRPC. Ignoring.") } @@ -315,38 +328,43 @@ func (c *Cluster) handleGenericNewRPC(grpc *GenericRPC) { // handleOp takes care of running the necessary action for a // clusterRPC request and sending the response. -func (c *Cluster) handleCidNewRPC(crpc *CidRPC) { +func (c *Cluster) handleCidRPC(crpc *CidRPC) { var data interface{} = nil var err error = nil + var h *cid.Cid = crpc.CID switch crpc.Op() { case PinRPC: - err = c.Pin(crpc.CID) + err = c.Pin(h) case UnpinRPC: - err = c.Unpin(crpc.CID) + err = c.Unpin(h) + case ConsensusAddPinRPC: + err = c.consensus.AddPin(h) + case ConsensusRmPinRPC: + err = c.consensus.RmPin(h) case IPFSPinRPC: - c.tracker.Pinning(crpc.CID) - err = c.ipfs.Pin(crpc.CID) + c.tracker.Pinning(h) + err = c.ipfs.Pin(h) if err != nil { - c.tracker.PinError(crpc.CID) + c.tracker.PinError(h) } else { - c.tracker.Pinned(crpc.CID) + c.tracker.Pinned(h) } case IPFSUnpinRPC: - c.tracker.Unpinning(crpc.CID) - err = c.ipfs.Unpin(crpc.CID) + c.tracker.Unpinning(h) + err = c.ipfs.Unpin(h) if err != nil { - c.tracker.UnpinError(crpc.CID) + c.tracker.UnpinError(h) } else { - c.tracker.Unpinned(crpc.CID) + c.tracker.Unpinned(h) } case IPFSIsPinnedRPC: - data, err = c.ipfs.IsPinned(crpc.CID) + data, err = c.ipfs.IsPinned(h) case StatusCidRPC: - data = c.StatusCid(crpc.CID) + data = c.StatusCid(h) case LocalSyncCidRPC: - data, err = c.LocalSyncCid(crpc.CID) + data, err = c.LocalSyncCid(h) case GlobalSyncCidRPC: - data, err = c.GlobalSyncCid(crpc.CID) + data, err = c.GlobalSyncCid(h) default: logger.Error("unknown operation for CidRPC. Ignoring.") } @@ -359,9 +377,38 @@ func (c *Cluster) handleCidNewRPC(crpc *CidRPC) { crpc.ResponseCh() <- resp } -// This uses libp2p to contact the cluster leader and ask him to do something -func (c *Cluster) leaderNewRPC(rpc *RPC) (interface{}, error) { - return nil, errors.New("not implemented yet") +func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) { + innerRPC := wrpc.WRPC + var resp RPCResponse + switch wrpc.Op() { + case LeaderRPC: + leader, err := c.consensus.Leader() + if err != nil { + resp = RPCResponse{ + Data: nil, + Error: err, + } + } + resp, err = c.remote.MakeRemoteRPC(innerRPC, leader) + if err != nil { + resp = RPCResponse{ + Data: nil, + Error: fmt.Errorf("request to %s failed with: %s", err), + } + } + case BroadcastRPC: + resp = RPCResponse{ + Data: nil, + Error: errors.New("not implemented"), + } + default: + resp = RPCResponse{ + Data: nil, + Error: errors.New("unknown WrappedRPC type"), + } + } + + wrpc.ResponseCh() <- resp } // makeHost makes a libp2p-host @@ -443,5 +490,6 @@ func makeHost(ctx context.Context, cfg *Config) (host.Host, error) { } bhost := basichost.New(network) + return bhost, nil } diff --git a/cluster_test.go b/cluster_test.go index d1045ac5..cc0d6ff1 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) type mockComponent struct { @@ -50,7 +50,7 @@ func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) { return true, nil } -func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState, *MapPinTracker) { +func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState, *MapPinTracker, *Libp2pRemote) { api := &mockApi{} api.rpcCh = make(chan RPC, 2) ipfs := &mockConnector{} @@ -58,6 +58,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState cfg := testingConfig() st := NewMapState() tracker := NewMapPinTracker() + remote := NewLibp2pRemote() cl, err := NewCluster( cfg, @@ -65,22 +66,23 @@ func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState ipfs, st, tracker, + remote, ) if err != nil { t.Fatal("cannot create cluster:", err) } time.Sleep(3 * time.Second) // make sure a leader is elected - return cl, api, ipfs, st, tracker + return cl, api, ipfs, st, tracker, remote } func testClusterShutdown(t *testing.T) { - cl, _, _, _, _ := testingCluster(t) + cl, _, _, _, _, _ := testingCluster(t) err := cl.Shutdown() if err != nil { t.Error("cluster shutdown failed:", err) } cl.Shutdown() - cl, _, _, _, _ = testingCluster(t) + cl, _, _, _, _, _ = testingCluster(t) err = cl.Shutdown() if err != nil { t.Error("cluster shutdown failed:", err) @@ -88,7 +90,7 @@ func testClusterShutdown(t *testing.T) { } func TestClusterLocalSync(t *testing.T) { - cl, _, _, st, _ := testingCluster(t) + cl, _, _, st, _, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() _, err := cl.LocalSync() @@ -117,7 +119,7 @@ func TestClusterLocalSync(t *testing.T) { } func TestClusterPin(t *testing.T) { - cl, _, _, _, _ := testingCluster(t) + cl, _, _, _, _, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() @@ -136,7 +138,7 @@ func TestClusterPin(t *testing.T) { } func TestClusterUnpin(t *testing.T) { - cl, _, _, _, _ := testingCluster(t) + cl, _, _, _, _, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() @@ -155,7 +157,7 @@ func TestClusterUnpin(t *testing.T) { } func TestClusterMembers(t *testing.T) { - cl, _, _, _, _ := testingCluster(t) + cl, _, _, _, _, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() m := cl.Members() @@ -166,7 +168,7 @@ func TestClusterMembers(t *testing.T) { } func TestVersion(t *testing.T) { - cl, _, _, _, _ := testingCluster(t) + cl, _, _, _, _, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() if cl.Version() != Version { @@ -175,7 +177,7 @@ func TestVersion(t *testing.T) { } func TestClusterRun(t *testing.T) { - cl, api, ipfs, _, tracker := testingCluster(t) + cl, api, ipfs, _, tracker, remote := testingCluster(t) defer cleanRaft() defer cl.Shutdown() // We sent RPCs all all types with one of the @@ -186,7 +188,7 @@ func TestClusterRun(t *testing.T) { // Generic RPC for i := 0; i < NoopRPC; i++ { rpc := NewRPC(RPCOp(i), "something") - switch i % 4 { + switch i % 5 { case 0: ipfs.rpcCh <- rpc case 1: @@ -195,6 +197,8 @@ func TestClusterRun(t *testing.T) { api.rpcCh <- rpc case 3: tracker.rpcCh <- rpc + case 4: + remote.rpcCh <- rpc } // Wait for a response timer := time.NewTimer(time.Second) @@ -207,9 +211,9 @@ func TestClusterRun(t *testing.T) { // Cid RPC c, _ := cid.Decode(testCid) - for i := 0; i < NoopRPC; i++ { + for i := 0; i <= NoopRPC; i++ { rpc := NewRPC(RPCOp(i), c) - switch i % 4 { + switch i % 5 { case 0: ipfs.rpcCh <- rpc case 1: @@ -218,6 +222,8 @@ func TestClusterRun(t *testing.T) { api.rpcCh <- rpc case 3: tracker.rpcCh <- rpc + case 4: + remote.rpcCh <- rpc } timer := time.NewTimer(time.Second) select { @@ -226,4 +232,41 @@ func TestClusterRun(t *testing.T) { t.Errorf("CidRPC %d was not handled correctly by Cluster", i) } } + + // Wrapped RPC + w := NewRPC(PinRPC, c) + for i := 0; i <= NoopRPC; i++ { + rpc := NewRPC(RPCOp(i), w) + switch i % 5 { + case 0: + ipfs.rpcCh <- rpc + case 1: + cl.consensus.rpcCh <- rpc + case 2: + api.rpcCh <- rpc + case 3: + tracker.rpcCh <- rpc + case 4: + remote.rpcCh <- rpc + } + timer := time.NewTimer(time.Second) + select { + case <-rpc.ResponseCh(): + case <-timer.C: + t.Errorf("CidRPC %d was not handled correctly by Cluster", i) + } + } + + // Test that we answer to custom RPC types that are not handled + wrongRPC := &baseRPC{ + method: 999, + responseCh: make(chan RPCResponse), + } + ipfs.rpcCh <- wrongRPC + timer := time.NewTimer(time.Second) + select { + case <-wrongRPC.ResponseCh(): + case <-timer.C: + t.Errorf("We did not give an answer to an RPC with a bad code") + } } diff --git a/consensus.go b/consensus.go index 476cce31..5916b06b 100644 --- a/consensus.go +++ b/consensus.go @@ -7,12 +7,12 @@ import ( "sync" "time" - host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" - consensus "gx/ipfs/QmZ88KbrvZMJpXaNwAGffswcYKz8EbeafzAFGMCA6MEZKt/go-libp2p-consensus" - libp2praft "gx/ipfs/QmaofA6ApgPQm8yRojC77dQbVUatYMihdyQjB7VsAqrks1/go-libp2p-raft" - peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" + consensus "github.com/libp2p/go-libp2p-consensus" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + libp2praft "github.com/libp2p/go-libp2p-raft" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) const ( @@ -290,7 +290,7 @@ func (cc *Consensus) State() (State, error) { // Leader() returns the peerID of the Leader of the // cluster. -func (cc *Consensus) Leader() peer.ID { +func (cc *Consensus) Leader() (peer.ID, error) { // FIXME: Hashicorp Raft specific raftactor := cc.actor.(*libp2praft.Actor) return raftactor.Leader() diff --git a/consensus_test.go b/consensus_test.go index 63abdb0b..3dcb3d75 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) func TestApplyToPin(t *testing.T) { @@ -132,7 +132,7 @@ func TestConsensusPin(t *testing.T) { time.Sleep(250 * time.Millisecond) st, err := cc.State() if err != nil { - t.Fatal("error getting state:", err) + t.Fatal("error gettinng state:", err) } pins := st.ListPins() @@ -159,7 +159,12 @@ func TestConsensusLeader(t *testing.T) { pId := cfg.ID defer cleanRaft() defer cc.Shutdown() - if l := cc.Leader().Pretty(); l != pId { + l, err := cc.Leader() + if err != nil { + t.Fatal("No leader:", err) + } + + if l.Pretty() != pId { t.Errorf("expected %s but the leader appears as %s", pId, l) } } diff --git a/ipfs-cluster/main.go b/ipfs-cluster/main.go index d783ff99..c354d53a 100644 --- a/ipfs-cluster/main.go +++ b/ipfs-cluster/main.go @@ -7,7 +7,7 @@ import ( "os/user" "path/filepath" - logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + logging "github.com/ipfs/go-log" ipfscluster "github.com/ipfs/ipfs-cluster" ) @@ -40,8 +40,9 @@ func main() { state := ipfscluster.NewMapState() tracker := ipfscluster.NewMapPinTracker() + remote := NewLibp2pRemote() - cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker) + cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker, remote) if err != nil { fmt.Println(err) return diff --git a/ipfs_cluster.go b/ipfs_cluster.go index 2656d60c..0bc7a5e9 100644 --- a/ipfs_cluster.go +++ b/ipfs_cluster.go @@ -12,9 +12,12 @@ import ( "errors" "time" - logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" - peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" + host "github.com/libp2p/go-libp2p-host" + + cid "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + peer "github.com/libp2p/go-libp2p-peer" + wlogging "github.com/whyrusleeping/go-logging" ) var logger = logging.Logger("ipfs-cluster") @@ -30,6 +33,26 @@ var RPCMaxQueue = 128 // to put a RPC request in the channel in MakeRPC(). var MakeRPCRetryInterval time.Duration = 1 * time.Second +// SilentRaft controls whether all Raft log messages are discarded. +var SilentRaft = true + +// SetLogLevel sets the level in the logs +func SetLogLevel(l wlogging.Level) { + /* + CRITICAL Level = iota + ERROR + WARNING + NOTICE + INFO + DEBUG + */ + logging.SetAllLoggers(l) +} + +func init() { + SetLogLevel(wlogging.CRITICAL) +} + // ClusterComponent represents a piece of ipfscluster. Cluster components // usually run their own goroutines (a http server for example). They // communicate with Cluster via a channel carrying RPC operations. @@ -110,6 +133,15 @@ type PinTracker interface { SyncState(State) []Pin } +// Remote represents a component which takes care of +// executing RPC requests in a different cluster member as well as +// handling any incoming remote requests from other nodes. +type Remote interface { + ClusterComponent + MakeRemoteRPC(rpc RPC, node peer.ID) (RPCResponse, error) + SetHost(host.Host) +} + // MakeRPC sends a RPC object over a channel and optionally waits for a // Response on the RPC.ResponseCh channel. It can be used by any // ClusterComponent to simplify making RPC requests to Cluster. diff --git a/ipfs_cluster_test.go b/ipfs_cluster_test.go index cd85456f..cdbedad7 100644 --- a/ipfs_cluster_test.go +++ b/ipfs_cluster_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-peer" ) var ( diff --git a/ipfs_connector.go b/ipfs_connector.go index c06d7173..18798d41 100644 --- a/ipfs_connector.go +++ b/ipfs_connector.go @@ -12,7 +12,7 @@ import ( "strings" "sync" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) // IPFSHTTPConnector implements the IPFSConnector interface diff --git a/ipfs_connector_test.go b/ipfs_connector_test.go index 453c4894..02e775d6 100644 --- a/ipfs_connector_test.go +++ b/ipfs_connector_test.go @@ -9,7 +9,7 @@ import ( "strings" "testing" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) func testServer(t *testing.T) *httptest.Server { diff --git a/raft.go b/raft.go index ca156fa4..55d93e68 100644 --- a/raft.go +++ b/raft.go @@ -1,12 +1,13 @@ package ipfscluster import ( + "io/ioutil" "path/filepath" - host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" - libp2praft "gx/ipfs/QmaofA6ApgPQm8yRojC77dQbVUatYMihdyQjB7VsAqrks1/go-libp2p-raft" + host "github.com/libp2p/go-libp2p-host" + libp2praft "github.com/libp2p/go-libp2p-raft" - peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-peer" hashiraft "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" @@ -50,6 +51,10 @@ func makeLibp2pRaft(cfg *Config, host host.Host, state State, op *clusterLogOp) raftCfg := hashiraft.DefaultConfig() raftCfg.EnableSingleNode = raftSingleMode + if SilentRaft { + raftCfg.LogOutput = ioutil.Discard + raftCfg.Logger = nil + } logger.Debug("creating file snapshot store") snapshots, err := hashiraft.NewFileSnapshotStore(cfg.RaftFolder, maxSnapshots, nil) if err != nil { diff --git a/remote.go b/remote.go new file mode 100644 index 00000000..143dbf23 --- /dev/null +++ b/remote.go @@ -0,0 +1,139 @@ +package ipfscluster + +import ( + "context" + "errors" + "sync" + + peer "github.com/libp2p/go-libp2p-peer" + + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" +) + +//ClusterP2PProtocol is used to send libp2p messages between cluster members +const ClusterP2PProtocol = "/ipfscluster/0.0.1/rpc" + +// Remote is a Cluster component used to handle communication with remote +// Cluster nodes +type Libp2pRemote struct { + ctx context.Context + + host host.Host + + rpcCh chan RPC + + shutdownLock sync.Mutex + shutdown bool + shutdownCh chan struct{} + wg sync.WaitGroup +} + +func NewLibp2pRemote() *Libp2pRemote { + ctx := context.Background() + + r := &Libp2pRemote{ + ctx: ctx, + rpcCh: make(chan RPC), + shutdownCh: make(chan struct{}), + } + + return r +} + +func (r *Libp2pRemote) SetHost(h host.Host) { + r.host = h + r.host.SetStreamHandler(ClusterP2PProtocol, func(s inet.Stream) { + sWrap := wrapStream(s) + defer s.Close() + err := r.handleRemoteRPC(sWrap) + if err != nil { + logger.Error("error handling remote RPC:", err) + } + }) +} + +func (r *Libp2pRemote) Shutdown() error { + r.shutdownLock.Lock() + defer r.shutdownLock.Unlock() + if r.shutdown { + logger.Debug("already shutdown") + return nil + } + logger.Info("shutting down Remote component") + //r.shutdownCh <- struct{}{} + r.shutdown = true + //<-r.shutdownCh + return nil +} + +func (r *Libp2pRemote) RpcChan() <-chan RPC { + return r.rpcCh +} + +func (r *Libp2pRemote) handleRemoteRPC(s *streamWrap) error { + var rpc RPC + if err := s.dec.Decode(&rpc); err != nil { + logger.Error("error decoding RPC request from Stream:", err) + errResp := RPCResponse{ + Data: nil, + Error: errors.New("error decoding request"), + } + r.sendStreamResponse(s, errResp) + return err + } + + ctx, cancel := context.WithCancel(r.ctx) + defer cancel() + resp := MakeRPC(ctx, r.rpcCh, rpc, true) + return r.sendStreamResponse(s, resp) +} + +func (r *Libp2pRemote) sendStreamResponse(s *streamWrap, resp RPCResponse) error { + if err := s.enc.Encode(resp); err != nil { + logger.Error("error encoding response:", err) + return err + } + if err := s.w.Flush(); err != nil { + logger.Error("error flushing response:", err) + return err + } + return nil +} + +func (r *Libp2pRemote) MakeRemoteRPC(rpc RPC, node peer.ID) (RPCResponse, error) { + ctx, cancel := context.WithCancel(r.ctx) + defer cancel() + var resp RPCResponse + + if r.host == nil { + return resp, errors.New("no host set") + } + + if node == r.host.ID() { + // libp2p cannot dial itself + return MakeRPC(ctx, r.rpcCh, rpc, true), nil + } + + s, err := r.host.NewStream(ctx, node, ClusterP2PProtocol) + if err != nil { + return resp, err + } + defer s.Close() + sWrap := wrapStream(s) + + logger.Debugf("sending remote RPC %d to %s", rpc.Op(), node) + if err := sWrap.enc.Encode(rpc); err != nil { + return resp, err + } + + if err := sWrap.w.Flush(); err != nil { + return resp, err + } + + logger.Debug("Waiting for response from %s", node) + if err := sWrap.dec.Decode(&resp); err != nil { + return resp, err + } + return resp, nil +} diff --git a/rpc.go b/rpc.go index 004a07fe..70d8ea4b 100644 --- a/rpc.go +++ b/rpc.go @@ -1,6 +1,6 @@ package ipfscluster -import cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" +import cid "github.com/ipfs/go-cid" // RPC supported operations. const ( @@ -10,10 +10,13 @@ const ( IPFSPinRPC IPFSUnpinRPC IPFSIsPinnedRPC + ConsensusAddPinRPC + ConsensusRmPinRPC VersionRPC MemberListRPC RollbackRPC LeaderRPC + BroadcastRPC LocalSyncRPC LocalSyncCidRPC GlobalSyncRPC @@ -64,6 +67,11 @@ type CidRPC struct { CID *cid.Cid } +type WrappedRPC struct { + baseRPC + WRPC RPC +} + // RPC builds a RPC request. It will create a // CidRPC if the arg is of type cid.Cid. Otherwise, // a GenericRPC is returned. @@ -76,6 +84,13 @@ func NewRPC(m RPCOp, arg interface{}) RPC { r.CID = c r.responseCh = make(chan RPCResponse) return r + case RPC: + w := arg.(RPC) + r := new(WrappedRPC) + r.method = m + r.WRPC = w + r.responseCh = make(chan RPCResponse) + return r default: r := new(GenericRPC) r.method = m diff --git a/rpc_test.go b/rpc_test.go index 1a631685..eef58076 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -3,7 +3,7 @@ package ipfscluster import ( "testing" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) func TestNewRPC(t *testing.T) { diff --git a/state.go b/state.go index 71eea6a3..03d31d8d 100644 --- a/state.go +++ b/state.go @@ -3,7 +3,7 @@ package ipfscluster import ( "sync" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) // MapState is a very simple database to store diff --git a/tracker.go b/tracker.go index 9ed24933..43d86891 100644 --- a/tracker.go +++ b/tracker.go @@ -5,7 +5,7 @@ import ( "sync" "time" - cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" + cid "github.com/ipfs/go-cid" ) const ( diff --git a/util.go b/util.go new file mode 100644 index 00000000..e57dc1e4 --- /dev/null +++ b/util.go @@ -0,0 +1,39 @@ +package ipfscluster + +import ( + "bufio" + + "github.com/ugorji/go/codec" + + inet "github.com/libp2p/go-libp2p-net" +) + +// streamWrap wraps a libp2p stream. We encode/decode whenever we +// write/read from a stream, so we can just carry the encoders +// and bufios with us +type streamWrap struct { + stream inet.Stream + enc *codec.Encoder + dec *codec.Decoder + w *bufio.Writer + r *bufio.Reader +} + +// wrapStream takes a stream and complements it with r/w bufios and +// decoder/encoder. In order to write to the stream we can use +// wrap.w.Write(). To encode something into it we can wrap.enc.Encode(). +// Finally, we should wrap.w.Flush() to actually send the data. Similar +// for receiving. +func wrapStream(s inet.Stream) *streamWrap { + reader := bufio.NewReader(s) + writer := bufio.NewWriter(s) + dec := codec.NewDecoder(reader, &codec.MsgpackHandle{}) + enc := codec.NewEncoder(writer, &codec.MsgpackHandle{}) + return &streamWrap{ + stream: s, + r: reader, + w: writer, + enc: enc, + dec: dec, + } +}