Fix #25: Only the consensus layer should deal with leaders

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-01-23 14:01:49 +01:00
parent 365c549d7c
commit 031523f7bf
4 changed files with 40 additions and 33 deletions

View File

@ -265,25 +265,14 @@ func (c *Cluster) Pins() []*cid.Cid {
// Pin makes the cluster Pin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state. Depending on the cluster
// pinning strategy, the PinTracker may then request the IPFS daemon
// to pin the Cid. When the current node is not the cluster leader,
// the request is forwarded to the leader.
// to pin the Cid.
//
// Pin returns an error if the operation could not be persisted
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
func (c *Cluster) Pin(h *cid.Cid) error {
logger.Info("pinning:", h)
leader, err := c.consensus.Leader()
if err != nil {
return err
}
err = c.rpcClient.Call(
leader,
"Cluster",
"ConsensusLogPin",
NewCidArg(h),
&struct{}{})
err := c.consensus.LogPin(h)
if err != nil {
return err
}
@ -291,25 +280,14 @@ func (c *Cluster) Pin(h *cid.Cid) error {
}
// Unpin makes the cluster Unpin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state. When the current node is
// not the cluster leader, the request is forwarded to the leader.
// to the IPFS Cluster peers shared-state.
//
// Unpin returns an error if the operation could not be persisted
// to the global state. Unpin does not reflect the success or failure
// of underlying IPFS daemon unpinning operations.
func (c *Cluster) Unpin(h *cid.Cid) error {
logger.Info("pinning:", h)
leader, err := c.consensus.Leader()
if err != nil {
return err
}
err = c.rpcClient.Call(
leader,
"Cluster",
"ConsensusLogUnpin",
NewCidArg(h),
&struct{}{})
err := c.consensus.LogUnpin(h)
if err != nil {
return err
}

View File

@ -3,7 +3,6 @@ package ipfscluster
import (
"errors"
"testing"
"time"
rpc "github.com/hsanjuan/go-libp2p-rpc"
cid "github.com/ipfs/go-cid"
@ -69,7 +68,6 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *MapState
if err != nil {
t.Fatal("cannot create cluster:", err)
}
time.Sleep(3 * time.Second) // make sure a leader is elected
return cl, api, ipfs, st, tracker
}

View File

@ -276,11 +276,37 @@ func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp {
}
}
// LogPin submits a Cid to the shared state of the cluster.
// returns true if the operation was redirected to the leader
func (cc *Consensus) redirectToLeader(method string, c *cid.Cid) (bool, error) {
leader, err := cc.Leader()
if err != nil {
return false, err
}
if leader != cc.host.ID() {
err = cc.rpcClient.Call(
leader,
"Cluster",
method,
NewCidArg(c),
&struct{}{})
return true, err
}
return false, nil
}
// LogPin submits a Cid to the shared state of the cluster. It will forward
// the operation to the leader if this is not it.
func (cc *Consensus) LogPin(c *cid.Cid) error {
redirected, err := cc.redirectToLeader("ConsensusLogPin", c)
if err != nil || redirected {
return err
}
// It seems WE are the leader.
// Create pin operation for the log
op := cc.op(c, LogOpPin)
_, err := cc.consensus.CommitOp(op)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log
return err
@ -291,9 +317,16 @@ func (cc *Consensus) LogPin(c *cid.Cid) error {
// LogUnpin removes a Cid from the shared state of the cluster.
func (cc *Consensus) LogUnpin(c *cid.Cid) error {
redirected, err := cc.redirectToLeader("ConsensusLogUnpin", c)
if err != nil || redirected {
return err
}
// It seems WE are the leader.
// Create unpin operation for the log
op := cc.op(c, LogOpUnpin)
_, err := cc.consensus.CommitOp(op)
_, err = cc.consensus.CommitOp(op)
if err != nil {
return err
}

View File

@ -97,8 +97,6 @@ func testingConsensus(t *testing.T) *Consensus {
t.Fatal("cannot create Consensus:", err)
}
cc.SetClient(mockRPCClient(t))
// Oxygen for Raft to declare leader
time.Sleep(3 * time.Second)
return cc
}