Add MapPinTracker sync and recover capabilities.

Sync checks that the CID status corresponds to what IPFS says.

Recover retries pinning, unpinning on Error-ed cids.
This commit is contained in:
Hector Sanjuan 2016-12-07 17:21:29 +01:00 committed by Hector Sanjuan
parent 9c1c256e33
commit a9dcf57a90
7 changed files with 188 additions and 76 deletions

8
api.go
View File

@ -241,8 +241,10 @@ func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request
for _, d := range data {
var st string
switch d.Status {
case Error:
st = "error"
case PinError:
st = "pin_error"
case UnpinError:
st = "unpin_error"
case Pinned:
st = "pinned"
case Pinning:
@ -251,7 +253,7 @@ func (api *ClusterHTTPAPI) pinListHandler(w http.ResponseWriter, r *http.Request
st = "unpinning"
}
pins = append(pins, pinElemResp{
Cid: d.Cid,
Cid: d.Cid.String(),
Status: st,
})
}

View File

@ -82,6 +82,11 @@ func (c *Cluster) Shutdown() error {
logger.Errorf("Error stopping IPFS Connector: %s", err)
return err
}
if err := c.tracker.Shutdown(); err != nil {
logger.Errorf("Error stopping PinTracker: %s", err)
return err
}
c.cancel()
return nil
}
@ -140,6 +145,7 @@ func (c *Cluster) run() {
ipfsCh := c.ipfs.RpcChan()
consensusCh := c.consensus.RpcChan()
apiCh := c.api.RpcChan()
trackerCh := c.tracker.RpcChan()
for {
select {
@ -149,6 +155,8 @@ func (c *Cluster) run() {
go c.handleOp(consensusOp)
case apiOp := <-apiCh:
go c.handleOp(apiOp)
case trackerOp := <-trackerCh:
go c.handleOp(trackerOp)
case <-c.ctx.Done():
logger.Debug("Cluster is Done()")
return
@ -185,21 +193,23 @@ func (c *Cluster) handleOp(rpc ClusterRPC) {
case UnpinRPC:
err = c.Unpin(crpc.CID)
case IPFSPinRPC:
c.tracker.Pinning(crpc.CID)
err = c.ipfs.Pin(crpc.CID)
if err != nil {
c.tracker.PinError(crpc.CID)
} else {
c.tracker.Pinned(crpc.CID)
}
case IPFSUnpinRPC:
c.tracker.Unpinning(crpc.CID)
err = c.ipfs.Unpin(crpc.CID)
case StatusPinnedRPC:
err = c.tracker.Pinned(crpc.CID)
case StatusPinningRPC:
err = c.tracker.Pinning(crpc.CID)
case StatusUnpinningRPC:
err = c.tracker.Unpinning(crpc.CID)
case StatusUnpinnedRPC:
err = c.tracker.Unpinned(crpc.CID)
case StatusPinErrorRPC:
err = c.tracker.PinError(crpc.CID)
case StatusUnpinErrorRPC:
err = c.tracker.UnpinError(crpc.CID)
if err != nil {
c.tracker.UnpinError(crpc.CID)
} else {
c.tracker.Unpinned(crpc.CID)
}
case IPFSIsPinnedRPC:
data, err = c.ipfs.IsPinned(crpc.CID)
case RollbackRPC:
state, ok := grpc.Arguments.(ClusterState)
if !ok {

View File

@ -49,25 +49,8 @@ func (op clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
panic("Could not decode a CID we ourselves encoded")
}
async_op := func(startOp, ipfsOp, doneOp, errorOp RPCOp) {
ctx, cancel := context.WithCancel(op.ctx)
defer cancel()
// Mark as Pinning/Unpinning
_ = MakeRPC(ctx, op.rpcCh, RPC(startOp, c), true)
// Tell IPFS to Pin/Unpin
resp := MakeRPC(ctx, op.rpcCh, RPC(ipfsOp, c), true)
if resp.Error != nil {
logger.Debug("IPFS pin op error")
// Mark an error
MakeRPC(ctx, op.rpcCh, RPC(errorOp, c), false)
} else {
logger.Debug("IPFS pin op success")
// Mark Pinned/Unpinned
MakeRPC(ctx, op.rpcCh, RPC(doneOp, c), false)
}
}
ctx, cancel := context.WithCancel(op.ctx)
defer cancel()
switch op.Type {
case LogOpPin:
@ -75,13 +58,15 @@ func (op clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
if err != nil {
goto ROLLBACK
}
go async_op(StatusPinningRPC, IPFSPinRPC, StatusPinnedRPC, IPFSPinErrorRPC)
// Async, we let the PinTracker take care of any problems
MakeRPC(ctx, op.rpcCh, RPC(IPFSPinRPC, c), false)
case LogOpUnpin:
err := state.RmPin(c)
if err != nil {
goto ROLLBACK
}
go async_op(StatusUnpinningRPC, IPFSUnpinRPC, StatusUnpinnedRPC, IPFSUnpinErrorRPC)
// Async, we let the PinTracker take care of any problems
MakeRPC(ctx, op.rpcCh, RPC(IPFSUnpinRPC, c), false)
default:
logger.Error("unknown clusterLogOp type. Ignoring")
}
@ -94,8 +79,6 @@ ROLLBACK:
// by the cluster leader.
rllbckRPC := RPC(RollbackRPC, state)
leadrRPC := RPC(LeaderRPC, rllbckRPC)
ctx, cancel := context.WithCancel(op.ctx)
defer cancel()
MakeRPC(ctx, op.rpcCh, leadrRPC, false)
logger.Errorf("an error ocurred when applying Op to state: %s", err)
logger.Error("a rollback was requested")

View File

@ -52,6 +52,7 @@ type IPFSConnector interface {
ClusterComponent
Pin(*cid.Cid) error
Unpin(*cid.Cid) error
IsPinned(*cid.Cid) (bool, error)
}
// Peered represents a component which needs to be aware of the peers
@ -89,8 +90,16 @@ type PinTracker interface {
UnpinError(*cid.Cid) error
// ListPins returns the list of pins with their status
ListPins() []Pin
// GetPin returns a pin and ok if it is found.
GetPin(*cid.Cid) (Pin, bool)
// GetPin returns a Pin.
GetPin(*cid.Cid) Pin
// Sync makes sure that the Cid status reflect the real IPFS status. If not,
// the status is marked as error. The return value indicates if the
// Pin status was updated.
Sync(*cid.Cid) bool
// Recover attempts to recover an error by re-[un]pinning the item.
Recover(*cid.Cid) error
// SyncAll runs Sync() on every known Pin. It returns a list of changed Pins
SyncAll() []Pin
}
// MakeRPC sends a ClusterRPC object over a channel and waits for an answer on

View File

@ -149,19 +149,15 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error {
// daemon.
func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
logger.Infof("IPFS Pin request for: %s", hash)
pinType, err := ipfs.pinType(hash)
pinned, err := ipfs.IsPinned(hash)
if err != nil {
return err
}
if pinType == "unpinned" || strings.Contains(pinType, "indirect") {
// Not pinned or indirectly pinned
if !pinned {
path := fmt.Sprintf("pin/add?arg=%s", hash)
_, err = ipfs.get(path)
return err
}
logger.Debug("object is already pinned. Doing nothing")
return nil
}
@ -170,25 +166,32 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
// daemon.
func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
logger.Info("IPFS Unpin request for:", hash)
pinType, err := ipfs.pinType(hash)
pinned, err := ipfs.IsPinned(hash)
if err != nil {
return err
}
if pinType == "unpinned" {
logger.Debug("object not directly pinned. Doing nothing")
return nil
}
if !strings.Contains(pinType, "indirect") {
if pinned {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.get(path)
return err
}
logger.Debug("object not [directly] pinned. Doing nothing")
return nil
}
func (ipfs *IPFSHTTPConnector) IsPinned(hash *cid.Cid) (bool, error) {
pinType, err := ipfs.pinType(hash)
if err != nil {
return false, err
}
if pinType == "unpinned" || strings.Contains(pinType, "indirect") {
return false, nil
}
return true, nil
}
// Returns how a hash is pinned
func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
lsPath := fmt.Sprintf("pin/ls?arg=%s", hash)

7
rpc.go
View File

@ -9,16 +9,11 @@ const (
PinListRPC
IPFSPinRPC
IPFSUnpinRPC
IPFSIsPinnedRPC
VersionRPC
MemberListRPC
RollbackRPC
LeaderRPC
StatusPinnedRPC
StatusPinningRPC
StatusUnpinnedRPC
StatusUnpinningRPC
StatusPinErrorRPC
StatusUnPinErrorRPC
)
// RPCMethod identifies which RPC-supported operation we are trying to make

View File

@ -13,18 +13,17 @@ const (
const (
PinError = iota
UnPinError
UnpinError
Pinned
Pinning
Unpinning
Unpinned
Pending
)
type Pin struct {
Cid string `json:"cid"`
PinMode PinMode `json:ignore`
Status PinStatus `json:"status"`
Cid *cid.Cid
PinMode PinMode
Status PinStatus
}
type PinMode int
@ -56,6 +55,15 @@ func NewMapPinTracker() *MapPinTracker {
func (mpt *MapPinTracker) run() {
// Great plans for this thread
// The first time we run, we sync all
// and try to recover any errors
changed := mpt.SyncAll()
for _, p := range changed {
logger.Debugf("Recovering %s", p.Cid)
mpt.Recover(p.Cid)
}
select {
case <-mpt.ctx.Done():
close(mpt.doneCh)
@ -65,14 +73,32 @@ func (mpt *MapPinTracker) run() {
func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error {
mpt.mux.Lock()
defer mpt.mux.Unlock()
if s == Unpinned {
delete(mpt.status, c.String())
return nil
}
mpt.status[c.String()] = Pin{
Cid: c.String(),
Cid: c,
PinMode: pinEverywhere,
Status: s,
}
return nil
}
func (mpt *MapPinTracker) get(c *cid.Cid) Pin {
mpt.mux.Lock()
defer mpt.mux.Unlock()
p, ok := mpt.status[c.String()]
if !ok {
return Pin{
Cid: c,
Status: Unpinned,
}
}
return p
}
func (mpt *MapPinTracker) Pinning(c *cid.Cid) error {
return mpt.set(c, Pinning)
}
@ -94,17 +120,11 @@ func (mpt *MapPinTracker) UnpinError(c *cid.Cid) error {
}
func (mpt *MapPinTracker) Unpinned(c *cid.Cid) error {
mpt.mux.Lock()
defer mpt.mux.Unlock()
delete(mpt.status, c.String())
return nil
return mpt.set(c, Unpinned)
}
func (mpt *MapPinTracker) GetPin(c *cid.Cid) (Pin, bool) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
p, ok := mpt.status[c.String()]
return p, ok
func (mpt *MapPinTracker) GetPin(c *cid.Cid) Pin {
return mpt.get(c)
}
func (mpt *MapPinTracker) ListPins() []Pin {
@ -117,7 +137,97 @@ func (mpt *MapPinTracker) ListPins() []Pin {
return pins
}
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
p := mpt.get(c)
if p.Status == PinError || p.Status == UnpinError {
return false
}
resp := MakeRPC(ctx, mpt.rpcCh, RPC(IPFSIsPinnedRPC, c), true)
if resp.Error != nil {
if p.Status == Pinned || p.Status == Pinning {
mpt.set(c, PinError)
return true
}
if p.Status == Unpinned || p.Status == Unpinning {
mpt.set(c, UnpinError)
return true
}
return false
}
ipfsPinned, ok := resp.Data.(bool)
if !ok {
logger.Error("wrong type of IPFSIsPinnedRPC response")
return false
}
if ipfsPinned {
switch p.Status {
case Pinned:
return false
case Pinning:
mpt.set(c, Pinned)
return true
case Unpinning:
mpt.set(c, UnpinError) // Not sure here
return true
case Unpinned:
mpt.set(c, UnpinError)
return true
}
} else {
switch p.Status {
case Pinned:
mpt.set(c, PinError)
return true
case Pinning:
mpt.set(c, PinError)
return true
case Unpinning:
mpt.set(c, Unpinned)
return true
case Unpinned:
return false
}
}
return false
}
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
p := mpt.get(c)
if p.Status != PinError && p.Status != UnpinError {
return nil
}
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
if p.Status == PinError {
MakeRPC(ctx, mpt.rpcCh, RPC(IPFSPinRPC, c), false)
}
if p.Status == UnpinError {
MakeRPC(ctx, mpt.rpcCh, RPC(IPFSUnpinRPC, c), false)
}
return nil
}
func (mpt *MapPinTracker) SyncAll() []Pin {
var changedPins []Pin
pins := mpt.ListPins()
for _, p := range pins {
changed := mpt.Sync(p.Cid)
if changed {
changedPins = append(changedPins, p)
}
}
return changedPins
}
func (mpt *MapPinTracker) Shutdown() error {
logger.Info("Stopping MapPinTracker")
mpt.cancel()
<-mpt.doneCh
return nil