Introduce the concept of PinTracker. Thin ClusterState to minimal.

+ Try to make RPC handling code cleaner.
This commit is contained in:
Hector Sanjuan 2016-12-06 22:29:59 +01:00 committed by Hector Sanjuan
parent e746ccecb9
commit 9c1c256e33
8 changed files with 294 additions and 238 deletions

18
api.go
View File

@ -172,7 +172,7 @@ func (api *ClusterHTTPAPI) versionHandler(w http.ResponseWriter, r *http.Request
defer cancel()
rRpc := RPC(VersionRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Method, resp) {
if checkResponse(w, rRpc.Op(), resp) {
v := resp.Data.(string)
sendJSONResponse(w, 200, versionResp{v})
}
@ -183,7 +183,7 @@ func (api *ClusterHTTPAPI) memberListHandler(w http.ResponseWriter, r *http.Requ
defer cancel()
rRpc := RPC(MemberListRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Method, resp) {
if checkResponse(w, rRpc.Op(), resp) {
data := resp.Data.([]peer.ID)
var strPeers []string
for _, p := range data {
@ -203,10 +203,10 @@ func (api *ClusterHTTPAPI) pinHandler(w http.ResponseWriter, r *http.Request) {
sendErrorResponse(w, 400, err.Error())
}
rRpc := RPC(PinRPC, *c)
rRpc := RPC(PinRPC, c)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Method, resp) {
if checkResponse(w, rRpc.Op(), resp) {
c := resp.Data.(cid.Cid)
sendJSONResponse(w, 200, pinResp{c.String()})
}
@ -222,9 +222,9 @@ func (api *ClusterHTTPAPI) unpinHandler(w http.ResponseWriter, r *http.Request)
sendErrorResponse(w, 400, err.Error())
}
rRpc := RPC(UnpinRPC, *c)
rRpc := RPC(UnpinRPC, c)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Method, resp) {
if checkResponse(w, rRpc.Op(), resp) {
c := resp.Data.(cid.Cid)
sendJSONResponse(w, 200, unpinResp{c.String()})
}
@ -235,7 +235,7 @@ func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request
defer cancel()
rRpc := RPC(PinListRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Method, resp) {
if checkResponse(w, rRpc.Op(), resp) {
data := resp.Data.([]Pin)
pins := make(pinListResp, 0, len(data))
for _, d := range data {
@ -265,7 +265,7 @@ func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request
// an error if the RPCResponse contains one. It also checks that the RPC
// response data can be casted back into the expected value. It returns false
// if the checks fail or an empty response is sent, and true otherwise.
func checkResponse(w http.ResponseWriter, method RPCMethod, resp RPCResponse) bool {
func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool {
if resp.Error == nil && resp.Data == nil {
sendEmptyResponse(w)
return false
@ -276,7 +276,7 @@ func checkResponse(w http.ResponseWriter, method RPCMethod, resp RPCResponse) bo
// Check thatwe can cast to the expected response format
ok := true
switch method {
switch op {
case PinRPC:
_, ok = resp.Data.(cid.Cid)
case UnpinRPC:

View File

@ -31,12 +31,13 @@ type Cluster struct {
api ClusterAPI
ipfs IPFSConnector
state ClusterState
tracker PinTracker
}
// NewCluster builds a ready-to-start IPFS Cluster. It takes a ClusterAPI,
// an IPFSConnector and a ClusterState as parameters, allowing the user,
// to provide custom implementations of these components.
func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state ClusterState) (*Cluster, error) {
func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state ClusterState, tracker PinTracker) (*Cluster, error) {
ctx, cancel := context.WithCancel(context.Background())
host, err := makeHost(ctx, cfg)
if err != nil {
@ -58,6 +59,7 @@ func NewCluster(cfg *ClusterConfig, api ClusterAPI, ipfs IPFSConnector, state Cl
api: api,
ipfs: ipfs,
state: state,
tracker: tracker,
}
logger.Info("Starting IPFS Cluster")
@ -142,11 +144,11 @@ func (c *Cluster) run() {
for {
select {
case ipfsOp := <-ipfsCh:
go c.handleOp(&ipfsOp)
go c.handleOp(ipfsOp)
case consensusOp := <-consensusCh:
go c.handleOp(&consensusOp)
go c.handleOp(consensusOp)
case apiOp := <-apiCh:
go c.handleOp(&apiOp)
go c.handleOp(apiOp)
case <-c.ctx.Done():
logger.Debug("Cluster is Done()")
return
@ -156,46 +158,50 @@ func (c *Cluster) run() {
// handleOp takes care of running the necessary action for a
// clusterRPC request and sending the response.
func (c *Cluster) handleOp(op *ClusterRPC) {
func (c *Cluster) handleOp(rpc ClusterRPC) {
var crpc *CidClusterRPC
var grpc *GenericClusterRPC
switch rpc.(type) {
case *CidClusterRPC:
crpc = rpc.(*CidClusterRPC)
case *GenericClusterRPC:
grpc = rpc.(*GenericClusterRPC)
default:
logger.Error("expected a known ClusterRPC type but got something else")
return
}
var data interface{} = nil
var err error = nil
switch op.Method {
case PinRPC:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad PinRPC type")
break
}
err = c.Pin(&hash)
case UnpinRPC:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad UnpinRPC type")
break
}
err = c.Unpin(&hash)
case PinListRPC:
data, err = c.consensus.ListPins()
case IPFSPinRPC:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad IPFSPinRPC type")
break
}
err = c.ipfs.Pin(&hash)
case IPFSUnpinRPC:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad IPFSUnpinRPC type")
break
}
err = c.ipfs.Unpin(&hash)
switch rpc.Op() {
case VersionRPC:
data = c.Version()
case MemberListRPC:
data = c.Members()
case PinListRPC:
data = c.tracker.ListPins()
case PinRPC:
err = c.Pin(crpc.CID)
case UnpinRPC:
err = c.Unpin(crpc.CID)
case IPFSPinRPC:
err = c.ipfs.Pin(crpc.CID)
case IPFSUnpinRPC:
err = c.ipfs.Unpin(crpc.CID)
case StatusPinnedRPC:
err = c.tracker.Pinned(crpc.CID)
case StatusPinningRPC:
err = c.tracker.Pinning(crpc.CID)
case StatusUnpinningRPC:
err = c.tracker.Unpinning(crpc.CID)
case StatusUnpinnedRPC:
err = c.tracker.Unpinned(crpc.CID)
case StatusPinErrorRPC:
err = c.tracker.PinError(crpc.CID)
case StatusUnpinErrorRPC:
err = c.tracker.UnpinError(crpc.CID)
case RollbackRPC:
state, ok := op.Arguments.(ClusterState)
state, ok := grpc.Arguments.(ClusterState)
if !ok {
err = errors.New("Bad RollbackRPC type")
break
@ -204,32 +210,11 @@ func (c *Cluster) handleOp(op *ClusterRPC) {
case LeaderRPC:
// Leader RPC is a RPC that needs to be run
// by the Consensus Leader. Arguments is a wrapped RPC.
rpc, ok := op.Arguments.(*ClusterRPC)
rpc, ok := grpc.Arguments.(*ClusterRPC)
if !ok {
err = errors.New("Bad LeaderRPC type")
}
data, err = c.leaderRPC(rpc)
case StatePinSuccess:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad StatePinSucess type")
break
}
err = c.state.Pinned(&hash)
case StateUnpinSuccess:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad StateUnpinSucess type")
break
}
err = c.state.RmPin(&hash)
case StatePinError:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad StatePinError type")
break
}
err = c.state.PinError(&hash)
default:
logger.Error("Unknown operation. Ignoring")
}
@ -239,7 +224,7 @@ func (c *Cluster) handleOp(op *ClusterRPC) {
Error: err,
}
op.ResponseCh <- resp
rpc.ResponseCh() <- resp
}
// This uses libp2p to contact the cluster leader and ask him to do something

View File

@ -43,44 +43,45 @@ func (op clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
panic("Received unexpected state type")
}
cidObj, err := cid.Decode(op.Cid)
c, err := cid.Decode(op.Cid)
if err != nil {
// Should never be here
panic("Could not decode a CID we ourselves encoded")
}
async_op := func(try, success RPCMethod, c *cid.Cid) {
async_op := func(startOp, ipfsOp, doneOp, errorOp RPCOp) {
ctx, cancel := context.WithCancel(op.ctx)
defer cancel()
resp := MakeRPC(ctx, op.rpcCh, RPC(try, *c), true)
// Mark as Pinning/Unpinning
_ = MakeRPC(ctx, op.rpcCh, RPC(startOp, c), true)
// Tell IPFS to Pin/Unpin
resp := MakeRPC(ctx, op.rpcCh, RPC(ipfsOp, c), true)
if resp.Error != nil {
MakeRPC(ctx, op.rpcCh, RPC(StatePinError, *c), false)
logger.Debug("IPFS pin op error")
// Mark an error
MakeRPC(ctx, op.rpcCh, RPC(errorOp, c), false)
} else {
logger.Debugf("Pinop (%d) success", try)
MakeRPC(ctx, op.rpcCh, RPC(success, *c), false)
logger.Debug("IPFS pin op success")
// Mark Pinned/Unpinned
MakeRPC(ctx, op.rpcCh, RPC(doneOp, c), false)
}
}
switch op.Type {
case LogOpPin:
if state.ShouldPin(cidObj) {
err = state.Pinning(cidObj)
if err != nil {
goto ROLLBACK
}
go async_op(IPFSPinRPC, StatePinSuccess, cidObj)
} else {
err = state.Pinned(cidObj)
if err != nil {
goto ROLLBACK
}
}
case LogOpUnpin:
err = state.Unpinning(cidObj)
err := state.AddPin(c)
if err != nil {
goto ROLLBACK
}
go async_op(IPFSUnpinRPC, StateUnpinSuccess, cidObj)
go async_op(StatusPinningRPC, IPFSPinRPC, StatusPinnedRPC, IPFSPinErrorRPC)
case LogOpUnpin:
err := state.RmPin(c)
if err != nil {
goto ROLLBACK
}
go async_op(StatusUnpinningRPC, IPFSUnpinRPC, StatusUnpinnedRPC, IPFSUnpinErrorRPC)
default:
logger.Error("unknown clusterLogOp type. Ignoring")
}
@ -190,13 +191,6 @@ func (cc *ClusterConsensus) AddPin(c *cid.Cid) error {
// This means the op did not make it to the log
return err
}
// Note: the returned state could be nil
// if ApplyTo failed. We deal with this in ApplyTo.
// We must schedule a Rollback in that case.
// Here we only care that the operation was commited
// to the log, not if the resulting state is valid.
logger.Infof("Pin commited to global state: %s", c)
return nil
}
@ -209,28 +203,10 @@ func (cc *ClusterConsensus) RmPin(c *cid.Cid) error {
if err != nil {
return err
}
// Note: the returned state could be nil
// if ApplyTo failed. We deal with this in ApplyTo.
// We must schedule a Rollback in that case.
// Here we only care that the operation was commited
// to the log, not if the resulting state is valid.
logger.Infof("Unpin commited to global state: %s", c)
return nil
}
// ListPins returns the list of Cids which are part of the
// shared state of the cluster.
func (cc *ClusterConsensus) ListPins() ([]Pin, error) {
cstate, err := cc.consensus.GetLogHead()
if err != nil {
return nil, err
}
state := cstate.(ClusterState)
return state.ListPins(), nil
}
// Leader() returns the peerID of the Leader of the
// cluster.
func (cc *ClusterConsensus) Leader() peer.ID {

View File

@ -33,9 +33,10 @@ func main() {
return
}
state := ipfscluster.NewMapState("a")
state := ipfscluster.NewMapState()
tracker := ipfscluster.NewMapPinTracker()
cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state)
cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker)
if err != nil {
fmt.Println(err)
return

View File

@ -64,19 +64,33 @@ type Peered interface {
// ClusterState represents the shared state of the cluster and it
// is used by the ClusterConsensus component to keep track of
// objects which are pinned and their location.
// ClusterState is in charge of implementing any advanced pinning
// strategies.
// objects which objects are pinned. This component should be thread safe.
type ClusterState interface {
Pinning(*cid.Cid) error
Pinned(*cid.Cid) error
Unpinning(*cid.Cid) error
AddPin(*cid.Cid) error
RmPin(*cid.Cid) error
}
// PinTracker represents a component which tracks the status of
// the pins in this cluster and ensures they are in sync with the
// IPFS daemon. This component should be thread safe.
type PinTracker interface {
ClusterComponent
// Pinning tells the pin tracker that a pin is being pinned by IPFS
Pinning(*cid.Cid) error
// Pinned tells the pin tracer is pinned by IPFS
Pinned(*cid.Cid) error
// Pinned tells the pin tracker is being unpinned by IPFS
Unpinning(*cid.Cid) error
// Unpinned tells the pin tracker that a pin has been unpinned by IFPS
Unpinned(*cid.Cid) error
// PinError tells the pin tracker that an IPFS pin operation has failed
PinError(*cid.Cid) error
Exists(*cid.Cid) bool
// UnpinError tells the pin tracker that an IPFS unpin operation has failed
UnpinError(*cid.Cid) error
// ListPins returns the list of pins with their status
ListPins() []Pin
ShouldPin(*cid.Cid) bool
// ShouldPin(peer.ID, *cid.Cid) bool
// GetPin returns a pin and ok if it is found.
GetPin(*cid.Cid) (Pin, bool)
}
// MakeRPC sends a ClusterRPC object over a channel and waits for an answer on
@ -86,7 +100,7 @@ type ClusterState interface {
// If the message cannot be placed in the ClusterRPC channel, retries will be
// issued every MakeRPCRetryInterval.
func MakeRPC(ctx context.Context, ch chan ClusterRPC, r ClusterRPC, waitForResponse bool) RPCResponse {
logger.Debugf("Sending RPC %d", r.Method)
logger.Debugf("Sending RPC %d", r.Op())
exitLoop := false
for !exitLoop {
select {
@ -109,7 +123,7 @@ func MakeRPC(ctx context.Context, ch chan ClusterRPC, r ClusterRPC, waitForRespo
logger.Debug("Waiting for response")
select {
case resp, ok := <-r.ResponseCh:
case resp, ok := <-r.ResponseCh():
logger.Debug("Response received")
if !ok { // Not interested in response
logger.Warning("Response channel closed. Ignoring")

68
rpc.go
View File

@ -1,6 +1,8 @@
package ipfscluster
// ClusterRPC supported methods.
import cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
// ClusterRPC supported operations.
const (
PinRPC = iota
UnpinRPC
@ -11,29 +13,65 @@ const (
MemberListRPC
RollbackRPC
LeaderRPC
StatePinSuccess
StateUnpinSuccess
StatePinError
StatusPinnedRPC
StatusPinningRPC
StatusUnpinnedRPC
StatusUnpinningRPC
StatusPinErrorRPC
StatusUnPinErrorRPC
)
// RPCMethod identifies which RPC-supported operation we are trying to make
type RPCMethod int
type RPCOp int
// ClusterRPC is used to let Cluster perform operations as mandated by
type ClusterRPC interface {
Op() RPCOp
ResponseCh() chan RPCResponse
}
type baseRPC struct {
method RPCOp
responseCh chan RPCResponse
}
// Method returns the RPC method for this request
func (brpc *baseRPC) Op() RPCOp {
return brpc.method
}
// ResponseCh returns a channel to send the RPCResponse
func (brpc *baseRPC) ResponseCh() chan RPCResponse {
return brpc.responseCh
}
// GenericClusterRPC is used to let Cluster perform operations as mandated by
// its ClusterComponents. The result is placed on the ResponseCh channel.
type ClusterRPC struct {
Method RPCMethod
ResponseCh chan RPCResponse
Arguments interface{}
type GenericClusterRPC struct {
baseRPC
Arguments interface{}
}
type CidClusterRPC struct {
baseRPC
CID *cid.Cid
}
// RPC builds a ClusterRPC request.
func RPC(m RPCMethod, args interface{}) ClusterRPC {
return ClusterRPC{
Method: m,
Arguments: args,
ResponseCh: make(chan RPCResponse),
func RPC(m RPCOp, args interface{}) ClusterRPC {
c, ok := args.(*cid.Cid)
if ok { // Its a CID
r := new(CidClusterRPC)
r.method = m
r.CID = c
r.responseCh = make(chan RPCResponse)
return r
}
// Its not a cid, make a generic
r := new(GenericClusterRPC)
r.method = m
r.Arguments = args
r.responseCh = make(chan RPCResponse)
return r
}
// RPC response carries the result of an ClusterRPC-requested operation

104
state.go
View File

@ -6,84 +6,27 @@ import (
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
const (
pinEverywhere = -1
)
const (
Error = iota
Pinned
Pinning
Unpinning
)
type Pin struct {
Cid string `json:"cid"`
PinMode pinMode `json:ignore`
Status pinStatus `json:"status"`
}
type pinMode int
type pinStatus int
// MapState is a very simple pin map representation
// PinMap must be public as it will be serialized
// MapState is a very simple database to store
// the state of the system.
// PinMap is public because it is serialized
// and maintained by Raft.
type MapState struct {
PinMap map[string]Pin
PinMap map[string]bool
rpcCh chan ClusterRPC
mux sync.Mutex
tag string
}
func NewMapState(tag string) MapState {
func NewMapState() MapState {
return MapState{
PinMap: make(map[string]Pin),
PinMap: make(map[string]bool),
rpcCh: make(chan ClusterRPC),
tag: tag,
}
}
func (st MapState) Pinning(c *cid.Cid) error {
func (st MapState) AddPin(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Pinning,
}
return nil
}
func (st MapState) Unpinning(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Unpinning,
}
return nil
}
func (st MapState) Pinned(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Pinned,
}
return nil
}
func (st MapState) PinError(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Error,
}
st.PinMap[c.String()] = true
return nil
}
@ -93,32 +36,3 @@ func (st MapState) RmPin(c *cid.Cid) error {
delete(st.PinMap, c.String())
return nil
}
func (st MapState) Exists(c *cid.Cid) bool {
st.mux.Lock()
defer st.mux.Unlock()
_, ok := st.PinMap[c.String()]
return ok
}
func (st MapState) ListPins() []Pin {
st.mux.Lock()
defer st.mux.Unlock()
pins := make([]Pin, 0, len(st.PinMap))
for _, v := range st.PinMap {
pins = append(pins, v)
}
return pins
}
func (st MapState) ShouldPin(c *cid.Cid) bool {
return true
}
func (st MapState) Shutdown() error {
return nil
}
func (st MapState) RpcChan() <-chan ClusterRPC {
return st.rpcCh
}

128
tracker.go Normal file
View File

@ -0,0 +1,128 @@
package ipfscluster
import (
"context"
"sync"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
const (
pinEverywhere = -1
)
const (
PinError = iota
UnPinError
Pinned
Pinning
Unpinning
Unpinned
Pending
)
type Pin struct {
Cid string `json:"cid"`
PinMode PinMode `json:ignore`
Status PinStatus `json:"status"`
}
type PinMode int
type PinStatus int
type MapPinTracker struct {
status map[string]Pin
rpcCh chan ClusterRPC
mux sync.Mutex
doneCh chan bool
ctx context.Context
cancel context.CancelFunc
}
func NewMapPinTracker() *MapPinTracker {
ctx, cancel := context.WithCancel(context.Background())
mpt := &MapPinTracker{
status: make(map[string]Pin),
rpcCh: make(chan ClusterRPC),
doneCh: make(chan bool),
ctx: ctx,
cancel: cancel,
}
go mpt.run()
return mpt
}
func (mpt *MapPinTracker) run() {
// Great plans for this thread
select {
case <-mpt.ctx.Done():
close(mpt.doneCh)
}
}
func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.status[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: s,
}
return nil
}
func (mpt *MapPinTracker) Pinning(c *cid.Cid) error {
return mpt.set(c, Pinning)
}
func (mpt *MapPinTracker) Unpinning(c *cid.Cid) error {
return mpt.set(c, Unpinning)
}
func (mpt *MapPinTracker) Pinned(c *cid.Cid) error {
return mpt.set(c, Pinned)
}
func (mpt *MapPinTracker) PinError(c *cid.Cid) error {
return mpt.set(c, PinError)
}
func (mpt *MapPinTracker) UnpinError(c *cid.Cid) error {
return mpt.set(c, UnpinError)
}
func (mpt *MapPinTracker) Unpinned(c *cid.Cid) error {
mpt.mux.Lock()
defer mpt.mux.Unlock()
delete(mpt.status, c.String())
return nil
}
func (mpt *MapPinTracker) GetPin(c *cid.Cid) (Pin, bool) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
p, ok := mpt.status[c.String()]
return p, ok
}
func (mpt *MapPinTracker) ListPins() []Pin {
mpt.mux.Lock()
defer mpt.mux.Unlock()
pins := make([]Pin, 0, len(mpt.status))
for _, v := range mpt.status {
pins = append(pins, v)
}
return pins
}
func (mpt *MapPinTracker) Shutdown() error {
mpt.cancel()
<-mpt.doneCh
return nil
}
func (mpt *MapPinTracker) RpcChan() <-chan ClusterRPC {
return mpt.rpcCh
}