Make ipfs pinning async. Add Pin intermediary states

This commit is contained in:
Hector Sanjuan 2016-12-05 15:30:11 +01:00 committed by Hector Sanjuan
parent e0840df267
commit 2285f8d1a8
10 changed files with 196 additions and 251 deletions

31
api.go
View File

@ -52,6 +52,13 @@ type unpinResp struct {
Unpinned string `json:"unpinned"`
}
type pinElemResp struct {
Cid string `json:"cid"`
Status string `json:"status"`
}
type pinListResp []pinElemResp
// NewHTTPClusterAPI creates a new object which is ready to be
// started.
func NewHTTPClusterAPI(cfg *ClusterConfig) (*ClusterHTTPAPI, error) {
@ -216,12 +223,26 @@ func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request
rRpc := RPC(PinListRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Method, resp) {
data := resp.Data.([]*cid.Cid)
strPins := make([]string, 0, len(data))
data := resp.Data.([]Pin)
pins := make(pinListResp, 0, len(data))
for _, d := range data {
strPins = append(strPins, d.String())
var st string
switch d.Status {
case Error:
st = "error"
case Pinned:
st = "pinned"
case Pinning:
st = "pinning"
case Unpinning:
st = "unpinning"
}
pins = append(pins, pinElemResp{
Cid: d.Cid,
Status: st,
})
}
sendJSONResponse(w, 200, strPins)
sendJSONResponse(w, 200, pins)
}
}
@ -248,7 +269,7 @@ func checkResponse(w http.ResponseWriter, method RPCMethod, resp RPCResponse) bo
case UnpinRPC:
_, ok = resp.Data.(cid.Cid)
case PinListRPC:
_, ok = resp.Data.([]*cid.Cid)
_, ok = resp.Data.([]Pin)
case IPFSPinRPC:
case IPFSUnpinRPC:
case VersionRPC:

View File

@ -209,6 +209,27 @@ func (c *Cluster) handleOp(op *ClusterRPC) {
err = errors.New("Bad LeaderRPC type")
}
data, err = c.leaderRPC(rpc)
case StatePinSuccess:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad StatePinSucess type")
break
}
err = c.state.Pinned(&hash)
case StateUnpinSuccess:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad StateUnpinSucess type")
break
}
err = c.state.RmPin(&hash)
case StatePinError:
hash, ok := op.Arguments.(cid.Cid)
if !ok {
err = errors.New("Bad StatePinError type")
break
}
err = c.state.PinError(&hash)
default:
logger.Error("Unknown operation. Ignoring")
}

View File

@ -37,6 +37,7 @@ type clusterLogOp struct {
// ApplyTo applies the operation to the ClusterState
func (op clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
state, ok := cstate.(ClusterState)
var err error
if !ok {
// Should never be here
panic("Received unexpected state type")
@ -48,32 +49,41 @@ func (op clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
panic("Could not decode a CID we ourselves encoded")
}
var rpcM RPCMethod
var resp RPCResponse
ctx, cancel := context.WithCancel(op.ctx)
defer cancel()
async_op := func(try, success RPCMethod, c *cid.Cid) {
ctx, cancel := context.WithCancel(op.ctx)
defer cancel()
resp := MakeRPC(ctx, op.rpcCh, RPC(try, *c), true)
if resp.Error != nil {
MakeRPC(ctx, op.rpcCh, RPC(StatePinError, *c), false)
} else {
logger.Debugf("Pinop (%d) success", try)
MakeRPC(ctx, op.rpcCh, RPC(success, *c), false)
}
}
switch op.Type {
case LogOpPin:
err = state.AddPin(cidObj)
rpcM = IPFSPinRPC
if state.ShouldPin(cidObj) {
err = state.Pinning(cidObj)
if err != nil {
goto ROLLBACK
}
go async_op(IPFSPinRPC, StatePinSuccess, cidObj)
} else {
err = state.Pinned(cidObj)
if err != nil {
goto ROLLBACK
}
}
case LogOpUnpin:
err = state.RmPin(cidObj)
rpcM = IPFSUnpinRPC
err = state.Unpinning(cidObj)
if err != nil {
goto ROLLBACK
}
go async_op(IPFSUnpinRPC, StateUnpinSuccess, cidObj)
default:
err = errors.New("Unknown clusterLogOp type")
logger.Error("unknown clusterLogOp type. Ignoring")
}
if err != nil {
goto ROLLBACK
}
// Do we want to wait? Pins can take a very long time
resp = MakeRPC(ctx, op.rpcCh, RPC(rpcM, *cidObj), true)
if resp.Error != nil {
err = resp.Error
goto ROLLBACK
}
return state, nil
ROLLBACK:
@ -83,6 +93,8 @@ ROLLBACK:
// by the cluster leader.
rllbckRPC := RPC(RollbackRPC, state)
leadrRPC := RPC(LeaderRPC, rllbckRPC)
ctx, cancel := context.WithCancel(op.ctx)
defer cancel()
MakeRPC(ctx, op.rpcCh, leadrRPC, false)
logger.Errorf("an error ocurred when applying Op to state: %s", err)
logger.Error("a rollback was requested")
@ -204,13 +216,13 @@ func (cc *ClusterConsensus) RmPin(c *cid.Cid) error {
// Here we only care that the operation was commited
// to the log, not if the resulting state is valid.
logger.Infof("Pin commited to global state: %s", c)
logger.Infof("Unpin commited to global state: %s", c)
return nil
}
// ListPins returns the list of Cids which are part of the
// shared state of the cluster.
func (cc *ClusterConsensus) ListPins() ([]*cid.Cid, error) {
func (cc *ClusterConsensus) ListPins() ([]Pin, error) {
cstate, err := cc.consensus.GetLogHead()
if err != nil {
return nil, err

View File

@ -33,7 +33,7 @@ func main() {
return
}
state := ipfscluster.NewMapState()
state := ipfscluster.NewMapState("a")
cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state)
if err != nil {

44
ipfs.go
View File

@ -1,44 +0,0 @@
package main
import (
"io"
"log"
"net/http"
)
// ipfsHandlerFunc implements a basic 'pass through' proxy for an ipfs daemon
func (c *Cluster) ipfsHandlerFunc(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path[8:]
switch path {
case "pin/add":
log.Println("pin request")
default:
log.Printf("path: %s", path)
}
url := *r.URL
url.Host = "localhost:5001"
url.Scheme = "http"
req, err := http.NewRequest(r.Method, url.String(), r.Body)
if err != nil {
log.Printf("error creating request: ", err)
http.Error(w, "error forwaring request", 501)
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("error forwarding request: ", err)
http.Error(w, "error forwaring request", 501)
return
}
for k, v := range resp.Header {
for _, s := range v {
w.Header().Add(k, s)
}
}
io.Copy(w, resp.Body)
}

View File

@ -68,10 +68,14 @@ type Peered interface {
// ClusterState is in charge of implementing any advanced pinning
// strategies.
type ClusterState interface {
AddPin(*cid.Cid) error
Pinning(*cid.Cid) error
Pinned(*cid.Cid) error
Unpinning(*cid.Cid) error
RmPin(*cid.Cid) error
PinError(*cid.Cid) error
Exists(*cid.Cid) bool
ListPins() []*cid.Cid
ListPins() []Pin
ShouldPin(*cid.Cid) bool
// ShouldPin(peer.ID, *cid.Cid) bool
}
@ -103,9 +107,10 @@ func MakeRPC(ctx context.Context, ch chan ClusterRPC, r ClusterRPC, waitForRespo
return RPCResponse{}
}
logger.Debugf("Waiting for response")
logger.Debug("Waiting for response")
select {
case resp, ok := <-r.ResponseCh:
logger.Debug("Response received")
if !ok { // Not interested in response
logger.Warning("Response channel closed. Ignoring")
return RPCResponse{

View File

@ -140,12 +140,17 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
logger.Infof("IPFS Pin request for: %s", hash)
pinType, err := ipfs.pinType(hash)
if err != nil || strings.Contains(pinType, "indirect") {
if err != nil {
return err
}
if pinType == "unpinned" || strings.Contains(pinType, "indirect") {
// Not pinned or indirectly pinned
path := fmt.Sprintf("pin/add?arg=%s", hash)
_, err = ipfs.get(path)
return err
}
logger.Debug("object is already pinned. Doing nothing")
return nil
}
@ -156,14 +161,20 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
logger.Info("IPFS Unpin request for:", hash)
pinType, err := ipfs.pinType(hash)
if err == nil && !strings.Contains(pinType, "indirect") {
if err != nil {
return err
}
if pinType == "unpinned" {
logger.Debug("object not directly pinned. Doing nothing")
return nil
}
if !strings.Contains(pinType, "indirect") {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.get(path)
return err
}
// It is not pinned we do nothing
logger.Debug("object not directly pinned. Doing nothing")
return nil
}
@ -171,10 +182,17 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
lsPath := fmt.Sprintf("pin/ls?arg=%s", hash)
body, err := ipfs.get(lsPath)
if err != nil { // Not pinned
// Network error, daemon down
if body == nil && err != nil {
return "", err
}
// Pin not found likely here
if err != nil { // Not pinned
return "unpinned", nil
}
// What type of pin it is
var resp struct {
Keys map[string]struct {
@ -199,13 +217,14 @@ func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
// get performs the heavy lifting of a get request against
// the IPFS daemon.
func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) {
logger.Debugf("Getting %s", path)
url := fmt.Sprintf("%s/%s",
ipfs.apiURL(),
path)
resp, err := http.Get(url)
if err != nil {
logger.Error("Error unpinning:", err)
logger.Error("Error getting:", err)
return nil, err
}
defer resp.Body.Close()
@ -228,7 +247,7 @@ func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) {
resp.StatusCode, body)
}
logger.Error(msg)
return nil, errors.New(msg)
return body, errors.New(msg)
}
return body, nil
}

153
main.go
View File

@ -1,153 +0,0 @@
package main
import (
"encoding/json"
"log"
"net/http"
"os"
"strings"
cli "github.com/codegangsta/cli"
api "github.com/ipfs/go-ipfs-api"
"golang.org/x/net/context"
)
const ClusterVersion = "0.0.0"
type Cluster struct {
shell *api.Shell
ipfsapi string
}
func NewCluster(ipfsapi string) *Cluster {
return &Cluster{
shell: api.NewShell(ipfsapi),
ipfsapi: ipfsapi,
}
}
func respondJson(w http.ResponseWriter, i interface{}) {
enc := json.NewEncoder(w)
err := enc.Encode(i)
if err != nil {
log.Println("error responding: ", err)
}
}
func (c *Cluster) GetStatus() (interface{}, error) {
status := make(map[string]interface{})
_, _, err := c.shell.Version()
status["online"] = (err == nil)
return status, nil
}
func (c *Cluster) apiHandlerFunc(w http.ResponseWriter, r *http.Request) {
path := strings.Split(r.URL.Path, "/")[1:]
if len(path) == 0 {
w.WriteHeader(404)
return
}
switch path[0] {
case "version":
respondJson(w, map[string]interface{}{"version": ClusterVersion})
case "status":
out, err := c.GetStatus()
if err != nil {
w.WriteHeader(503)
log.Println("get status error: ", err)
}
respondJson(w, out)
case "join":
host := r.URL.Query().Get("host")
_ = host
panic("not yet implemented")
default:
w.WriteHeader(404)
}
}
func (c *Cluster) StartAPIServer(ctx context.Context, addr string) error {
smux := http.NewServeMux()
smux.HandleFunc("/", c.apiHandlerFunc)
log.Printf("serving clusterd control api on %s", addr)
go func() {
err := http.ListenAndServe(addr, smux)
if err != nil {
panic(err)
}
}()
return nil
}
func (c *Cluster) StartIPFSHandler(ctx context.Context, addr string) error {
smux := http.NewServeMux()
smux.HandleFunc("/", c.ipfsHandlerFunc)
log.Printf("serving ipfs api on %s", addr)
go func() {
err := http.ListenAndServe(addr, smux)
if err != nil {
panic(err)
}
}()
return nil
}
func (c *Cluster) Start(iapi, capi string) error {
ctx, cancel := context.WithCancel(context.Background())
err := c.StartIPFSHandler(ctx, iapi)
if err != nil {
return err
}
err = c.StartAPIServer(ctx, capi)
if err != nil {
return err
}
_ = cancel
// start clusterd messaging protocol server
// join to other nodes in cluster
// hang and serve
<-ctx.Done()
return nil
}
func main() {
app := cli.NewApp()
app.Name = "clusterd"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "ipfs-daemon",
Value: "localhost:5001",
},
cli.StringFlag{
Name: "ipfs-api",
Value: "localhost:5101",
},
cli.StringFlag{
Name: "control-api",
Value: "localhost:5100",
},
}
app.Action = func(c *cli.Context) error {
clst := NewCluster(c.String("ipfs-daemon"))
err := clst.Start(c.String("ipfs-api"), c.String("control-api"))
if err != nil {
return err
}
return nil
}
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}

3
rpc.go
View File

@ -11,6 +11,9 @@ const (
MemberListRPC
RollbackRPC
LeaderRPC
StatePinSuccess
StateUnpinSuccess
StatePinError
)
// RPCMethod identifies which RPC-supported operation we are trying to make

View File

@ -1,56 +1,117 @@
package ipfscluster
import (
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
"sync"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
type pinMode int
const (
pinEverywhere = -1
)
const (
Error = iota
Pinned
Pinning
Unpinning
)
type Pin struct {
Cid string `json:"cid"`
PinMode pinMode `json:ignore`
Status pinStatus `json:"status"`
}
type pinMode int
type pinStatus int
// MapState is a very simple pin map representation
// PinMap must be public as it will be serialized
type MapState struct {
PinMap map[string]pinMode
PinMap map[string]Pin
rpcCh chan ClusterRPC
mux sync.Mutex
tag string
}
func NewMapState() MapState {
func NewMapState(tag string) MapState {
return MapState{
PinMap: make(map[string]pinMode),
PinMap: make(map[string]Pin),
rpcCh: make(chan ClusterRPC),
tag: tag,
}
}
func (st MapState) AddPin(c *cid.Cid) error {
st.PinMap[c.String()] = pinEverywhere
func (st MapState) Pinning(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Pinning,
}
return nil
}
func (st MapState) Unpinning(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Unpinning,
}
return nil
}
func (st MapState) Pinned(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Pinned,
}
return nil
}
func (st MapState) PinError(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
st.PinMap[c.String()] = Pin{
Cid: c.String(),
PinMode: pinEverywhere,
Status: Error,
}
return nil
}
func (st MapState) RmPin(c *cid.Cid) error {
st.mux.Lock()
defer st.mux.Unlock()
delete(st.PinMap, c.String())
return nil
}
func (st MapState) Exists(c *cid.Cid) bool {
st.mux.Lock()
defer st.mux.Unlock()
_, ok := st.PinMap[c.String()]
return ok
}
func (st MapState) ListPins() []*cid.Cid {
keys := make([]*cid.Cid, 0, len(st.PinMap))
for k := range st.PinMap {
c, _ := cid.Decode(k)
keys = append(keys, c)
func (st MapState) ListPins() []Pin {
st.mux.Lock()
defer st.mux.Unlock()
pins := make([]Pin, 0, len(st.PinMap))
for _, v := range st.PinMap {
pins = append(pins, v)
}
return keys
return pins
}
func (st MapState) ShouldPin(p peer.ID, c *cid.Cid) bool {
func (st MapState) ShouldPin(c *cid.Cid) bool {
return true
}