Figured out globalSync and globalSync cid. Tests. More tests.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-20 19:51:13 +01:00
parent 07a8f62ef4
commit 5c41d69abc
13 changed files with 597 additions and 293 deletions

View File

@ -6,7 +6,7 @@ Things that need to be done:
* ~~Allow to shutdown multiple times (+ test)~~
* Efficient SyncAll (use single ipfs pin ls call)
* ~~GlobalStatus~~
* GlobalSync
* ~~GlobalSync~~
* ~~LeaderRPC implementation~~
* /pin/add /pin/rm hijack
* End-to-end tests

View File

@ -115,64 +115,117 @@ func (c *Cluster) Shutdown() error {
return nil
}
// LocalSync makes sure that the current state of the Cluster matches
// and the desired IPFS daemon state are aligned. It makes sure that
// IPFS is pinning content that should be pinned locally, and not pinning
// other content. It will also try to recover any failed pin or unpin
// operations by retrigerring them.
func (c *Cluster) LocalSync() ([]PinInfo, error) {
// StateSync syncs the consensus state to the Pin Tracker, ensuring
// that every Cid that should be tracked is tracked. It returns
// PinInfo for Cids which were added or deleted.
func (c *Cluster) StateSync() ([]PinInfo, error) {
cState, err := c.consensus.State()
if err != nil {
return nil, err
}
changed := c.tracker.SyncState(cState)
for _, h := range changed {
logger.Debugf("recovering %s", h)
err = c.tracker.Recover(h)
if err != nil {
logger.Errorf("Error recovering %s: %s", h, err)
return nil, err
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
logger.Info("syncing state to tracker")
clusterPins := cState.ListPins()
var changed []*cid.Cid
// Track items which are not tracked
for _, h := range clusterPins {
if c.tracker.StatusCid(h).IPFS == Unpinned {
changed = append(changed, h)
MakeRPC(ctx, c.rpcCh, NewRPC(TrackRPC, h), false)
}
}
return c.tracker.LocalStatus(), nil
// Untrack items which should not be tracked
for _, p := range c.tracker.Status() {
h, _ := cid.Decode(p.CidStr)
if !cState.HasPin(h) {
changed = append(changed, h)
MakeRPC(ctx, c.rpcCh, NewRPC(UntrackRPC, h), false)
}
}
var infos []PinInfo
for _, h := range changed {
infos = append(infos, c.tracker.StatusCid(h))
}
return infos, nil
}
// LocalSyncCid makes sure that the current state of the cluster
// and the desired IPFS daemon state are aligned for a given Cid. It
// makes sure that IPFS is pinning content that should be pinned locally,
// and not pinning other content. It will also try to recover any failed
// pin or unpin operations by retriggering them.
func (c *Cluster) LocalSyncCid(h *cid.Cid) (PinInfo, error) {
changed := c.tracker.Sync(h)
if changed {
err := c.tracker.Recover(h)
if err != nil {
logger.Errorf("Error recovering %s: %s", h, err)
return PinInfo{}, err
// LocalSync makes sure that the current state the Tracker matches
// the IPFS daemon state by triggering a Tracker.Sync() and Recover()
// on all items that need it. Returns PinInfo for items changed on Sync().
//
// LocalSync triggers recoveries asynchronously, and will not wait for
// them to fail or succeed before returning.
func (c *Cluster) LocalSync() ([]PinInfo, error) {
status := c.tracker.Status()
var toRecover []*cid.Cid
for _, p := range status {
h, _ := cid.Decode(p.CidStr)
modified := c.tracker.Sync(h)
if modified {
toRecover = append(toRecover, h)
}
}
return c.tracker.LocalStatusCid(h), nil
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
logger.Infof("%d items to recover after sync", len(toRecover))
for i, h := range toRecover {
logger.Infof("recovering in progress for %s (%d/%d", h, i, len(toRecover))
MakeRPC(ctx, c.rpcCh, NewRPC(TrackerRecoverRPC, h), false)
}
var changed []PinInfo
for _, h := range toRecover {
changed = append(changed, c.tracker.StatusCid(h))
}
return changed, nil
}
// LocalSyncCid performs a Tracker.Sync() operation followed by a
// Recover() when needed. It returns the latest known PinInfo for the Cid.
//
// LocalSyncCid will wait for the Recover operation to fail or succeed before
// returning.
func (c *Cluster) LocalSyncCid(h *cid.Cid) (PinInfo, error) {
var err error
if c.tracker.Sync(h) {
err = c.tracker.Recover(h)
}
return c.tracker.StatusCid(h), err
}
// GlobalSync triggers Sync() operations in all members of the Cluster.
func (c *Cluster) GlobalSync() ([]GlobalPinInfo, error) {
return c.Status()
return c.globalPinInfoSlice(LocalSyncRPC)
}
// GlobalSunc triggers a Sync() operation for a given Cid in all members
// of the Cluster.
// GlobalSyncCid triggers a LocalSyncCid() operation for a given Cid
// in all members of the Cluster.
//
// GlobalSyncCid will only return when all operations have either failed,
// succeeded or timed-out.
func (c *Cluster) GlobalSyncCid(h *cid.Cid) (GlobalPinInfo, error) {
return c.StatusCid(h)
return c.globalPinInfoCid(LocalSyncCidRPC, h)
}
// Status returns the last known status for all Pins tracked by Cluster.
// Status returns the GlobalPinInfo for all tracked Cids. If an error happens,
// the slice will contain as much information as could be fetched.
func (c *Cluster) Status() ([]GlobalPinInfo, error) {
return c.tracker.GlobalStatus()
return c.globalPinInfoSlice(TrackerStatusRPC)
}
// StatusCid returns the last known status for a given Cid
// StatusCid returns the GlobalPinInfo for a given Cid. If an error happens,
// the GlobalPinInfo should contain as much information as could be fetched.
func (c *Cluster) StatusCid(h *cid.Cid) (GlobalPinInfo, error) {
return c.tracker.GlobalStatusCid(h)
return c.globalPinInfoCid(TrackerStatusCidRPC, h)
}
// Pins returns the list of Cids managed by Cluster and which are part
@ -279,7 +332,7 @@ func (c *Cluster) run() {
logger.Error("unknown RPC type")
op.ResponseCh() <- RPCResponse{
Data: nil,
Error: errors.New("unknown RPC type"),
Error: NewRPCError("unknown RPC type"),
}
}
}
@ -300,10 +353,12 @@ func (c *Cluster) handleGenericRPC(grpc *GenericRPC) {
data, err = c.LocalSync()
case GlobalSyncRPC:
data, err = c.GlobalSync()
case StateSyncRPC:
data, err = c.StateSync()
case StatusRPC:
data, err = c.Status()
case TrackerLocalStatusRPC:
data = c.tracker.LocalStatus()
case TrackerStatusRPC:
data = c.tracker.Status()
case RollbackRPC:
// State, ok := grpc.Argument.(State)
// if !ok {
@ -317,7 +372,7 @@ func (c *Cluster) handleGenericRPC(grpc *GenericRPC) {
resp := RPCResponse{
Data: data,
Error: err,
Error: CastRPCError(err),
}
grpc.ResponseCh() <- resp
@ -334,6 +389,12 @@ func (c *Cluster) handleCidRPC(crpc *CidRPC) {
err = c.Pin(h)
case UnpinRPC:
err = c.Unpin(h)
case StatusCidRPC:
data, err = c.StatusCid(h)
case LocalSyncCidRPC:
data, err = c.LocalSyncCid(h)
case GlobalSyncCidRPC:
data, err = c.GlobalSyncCid(h)
case ConsensusLogPinRPC:
err = c.consensus.LogPin(h)
case ConsensusLogUnpinRPC:
@ -342,27 +403,24 @@ func (c *Cluster) handleCidRPC(crpc *CidRPC) {
err = c.tracker.Track(h)
case UntrackRPC:
err = c.tracker.Untrack(h)
case TrackerLocalStatusCidRPC:
data = c.tracker.LocalStatusCid(h)
case TrackerStatusCidRPC:
data = c.tracker.StatusCid(h)
case TrackerRecoverRPC:
err = c.tracker.Recover(h)
case IPFSPinRPC:
err = c.ipfs.Pin(h)
case IPFSUnpinRPC:
err = c.ipfs.Unpin(h)
case IPFSIsPinnedRPC:
data, err = c.ipfs.IsPinned(h)
case StatusCidRPC:
data, err = c.StatusCid(h)
case LocalSyncCidRPC:
data, err = c.LocalSyncCid(h)
case GlobalSyncCidRPC:
data, err = c.GlobalSyncCid(h)
default:
logger.Error("unknown operation for CidRPC. Ignoring.")
}
resp := RPCResponse{
Data: data,
Error: err,
Error: CastRPCError(err),
}
crpc.ResponseCh() <- resp
@ -373,15 +431,13 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) {
var resp RPCResponse
// resp initialization
switch innerRPC.Op() {
case TrackerLocalStatusRPC:
case TrackerStatusRPC, LocalSyncRPC:
resp = RPCResponse{
Data: []PinInfo{},
Error: nil,
Data: []PinInfo{},
}
case TrackerLocalStatusCidRPC:
case TrackerStatusCidRPC, LocalSyncCidRPC:
resp = RPCResponse{
Data: PinInfo{},
Error: nil,
Data: PinInfo{},
}
default:
resp = RPCResponse{}
@ -395,14 +451,14 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) {
if err != nil {
resp = RPCResponse{
Data: nil,
Error: err,
Error: CastRPCError(err),
}
}
err = c.remote.MakeRemoteRPC(innerRPC, leader, &resp)
if err != nil {
resp = RPCResponse{
Data: nil,
Error: fmt.Errorf("request to %s failed with: %s", err),
Error: CastRPCError(fmt.Errorf("request to %s failed with: %s", err)),
}
}
case BroadcastRPC:
@ -417,7 +473,7 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) {
if err != nil {
logger.Error("Error making remote RPC: ", err)
rch <- RPCResponse{
Error: err,
Error: CastRPCError(err),
}
} else {
rch <- r
@ -441,7 +497,7 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) {
default:
resp = RPCResponse{
Data: nil,
Error: errors.New("unknown WrappedRPC type"),
Error: NewRPCError("unknown WrappedRPC type"),
}
}
@ -529,3 +585,104 @@ func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
bhost := basichost.New(network)
return bhost, nil
}
func (c *Cluster) globalPinInfoCid(op RPCOp, h *cid.Cid) (GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
pin := GlobalPinInfo{
Cid: h,
Status: make(map[peer.ID]PinInfo),
}
rpc := NewRPC(op, h)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, c.rpcCh, wrpc, true)
if resp.Error != nil {
return pin, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return pin, errors.New("unexpected responses format")
}
var errorMsgs string
for _, r := range responses {
if r.Error != nil {
logger.Error(r.Error)
errorMsgs += r.Error.Error() + "\n"
}
info, ok := r.Data.(PinInfo)
if !ok {
return pin, errors.New("unexpected response format")
}
pin.Status[info.Peer] = info
}
if len(errorMsgs) == 0 {
return pin, nil
} else {
return pin, errors.New(errorMsgs)
}
}
func (c *Cluster) globalPinInfoSlice(op RPCOp) ([]GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
var infos []GlobalPinInfo
fullMap := make(map[string]GlobalPinInfo)
rpc := NewRPC(op, nil)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, c.rpcCh, wrpc, true)
if resp.Error != nil {
return nil, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return infos, errors.New("unexpected responses format")
}
mergePins := func(pins []PinInfo) {
for _, p := range pins {
item, ok := fullMap[p.CidStr]
c, _ := cid.Decode(p.CidStr)
if !ok {
fullMap[p.CidStr] = GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
p.Peer: p,
},
}
} else {
item.Status[p.Peer] = p
}
}
}
var errorMsgs string
for _, r := range responses {
if r.Error != nil {
logger.Error("error in one of the broadcast responses: ", r.Error)
errorMsgs += r.Error.Error() + "\n"
}
pins, ok := r.Data.([]PinInfo)
if !ok {
return infos, fmt.Errorf("unexpected response format: %+v", r.Data)
}
mergePins(pins)
}
for _, v := range fullMap {
infos = append(infos, v)
}
if len(errorMsgs) == 0 {
return infos, nil
} else {
return infos, errors.New(errorMsgs)
}
}

View File

@ -89,11 +89,11 @@ func testClusterShutdown(t *testing.T) {
}
}
func TestClusterLocalSync(t *testing.T) {
func TestClusterStateSync(t *testing.T) {
cl, _, _, st, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
_, err := cl.LocalSync()
_, err := cl.StateSync()
if err == nil {
t.Error("expected an error as there is no state to sync")
}
@ -104,7 +104,7 @@ func TestClusterLocalSync(t *testing.T) {
t.Fatal("pin should have worked:", err)
}
_, err = cl.LocalSync()
_, err = cl.StateSync()
if err != nil {
t.Fatal("sync after pinning should have worked:", err)
}
@ -112,7 +112,7 @@ func TestClusterLocalSync(t *testing.T) {
// Modify state on the side so the sync does not
// happen on an empty slide
st.RmPin(c)
_, err = cl.LocalSync()
_, err = cl.StateSync()
if err != nil {
t.Fatal("sync with recover should have worked:", err)
}

View File

@ -178,9 +178,9 @@ func (cc *Consensus) run() {
for !quitLoop {
select {
case <-timer.C: // Make a first sync
MakeRPC(ctx, cc.rpcCh, NewRPC(LocalSyncRPC, nil), false)
MakeRPC(ctx, cc.rpcCh, NewRPC(StateSyncRPC, nil), false)
case <-upToDate:
MakeRPC(ctx, cc.rpcCh, NewRPC(LocalSyncRPC, nil), false)
MakeRPC(ctx, cc.rpcCh, NewRPC(StateSyncRPC, nil), false)
quitLoop = true
}
}

View File

@ -179,7 +179,7 @@ func TestE2EPin(t *testing.T) {
}
delay()
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.LocalStatus()
status := c.tracker.Status()
for _, v := range status {
if v.IPFS != Pinned {
t.Errorf("%s should have been pinned but it is %s",
@ -212,7 +212,7 @@ func TestE2EPin(t *testing.T) {
delay()
funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.LocalStatus()
status := c.tracker.Status()
if l := len(status); l != 0 {
t.Errorf("Nothing should be pinned")
//t.Errorf("%+v", status)
@ -265,3 +265,148 @@ func TestE2EStatus(t *testing.T) {
}
runF(t, clusters, f)
}
func TestE2ELocalSync(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
delay()
h, _ := cid.Decode(errorCid) // This cid always fails
h2, _ := cid.Decode(testCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
delay()
f := func(t *testing.T, c *Cluster) {
// Sync bad ID
infos, err := c.LocalSync()
if err != nil {
// LocalSync() is asynchronous and should not show an
// error even if Recover() fails.
t.Error(err)
}
if len(infos) != 1 {
t.Fatal("expected 1 elem slice")
}
// Last-known state may still be pinning
if infos[0].IPFS != PinError && infos[0].IPFS != Pinning {
t.Error("element should be in Pinning or PinError state")
}
}
// Test Local syncs
runF(t, clusters, f)
}
func TestE2ELocalSyncCid(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
delay()
h, _ := cid.Decode(errorCid) // This cid always fails
h2, _ := cid.Decode(testCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
delay()
f := func(t *testing.T, c *Cluster) {
info, err := c.LocalSyncCid(h)
if err == nil {
// LocalSyncCid is synchronous
t.Error("expected an error")
}
if info.IPFS != PinError && info.IPFS != Pinning {
t.Errorf("element is %s and not PinError", info.IPFS)
}
// Sync good ID
info, err = c.LocalSyncCid(h2)
if err != nil {
t.Error(err)
}
if info.IPFS != Pinned {
t.Error("element should be in Pinned state")
}
}
// Test Local syncs
runF(t, clusters, f)
}
func TestE2EGlobalSync(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
delay()
h, _ := cid.Decode(errorCid) // This cid always fails
h2, _ := cid.Decode(testCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
delay()
j := rand.Intn(nClusters) // choose a random cluster member
ginfos, err := clusters[j].GlobalSync()
if err != nil {
t.Fatal(err)
}
if len(ginfos) != 1 {
t.Fatal("expected globalsync to have 1 elements")
}
if ginfos[0].Cid.String() != errorCid {
t.Error("expected globalsync to have problems with errorCid")
}
for _, c := range clusters {
inf, ok := ginfos[0].Status[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.IPFS != PinError && inf.IPFS != Pinning {
t.Error("should be PinError in all members")
}
}
}
func TestE2EGlobalSyncCid(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
delay()
h, _ := cid.Decode(errorCid) // This cid always fails
h2, _ := cid.Decode(testCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
delay()
j := rand.Intn(nClusters)
ginfo, err := clusters[j].GlobalSyncCid(h)
if err == nil {
t.Error("expected an error")
}
if ginfo.Cid.String() != errorCid {
t.Error("GlobalPinInfo should be for errorCid")
}
for _, c := range clusters {
inf, ok := ginfo.Status[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should not be empty for this host")
}
if inf.IPFS != PinError && inf.IPFS != Pinning {
t.Error("should be PinError or Pinning in all members")
}
}
// Test with a good Cid
j = rand.Intn(nClusters)
ginfo, err = clusters[j].GlobalSyncCid(h2)
if err != nil {
t.Fatal(err)
}
if ginfo.Cid.String() != testCid2 {
t.Error("GlobalPinInfo should be for testrCid2")
}
for _, c := range clusters {
inf, ok := ginfo.Status[c.host.ID()]
if !ok {
t.Fatal("GlobalPinInfo should have this cluster")
}
if inf.IPFS != Pinned {
t.Error("the GlobalPinInfo should show Pinned in all members")
}
}
}

View File

@ -9,7 +9,6 @@ package ipfscluster
import (
"context"
"errors"
"time"
host "github.com/libp2p/go-libp2p-host"
@ -92,6 +91,8 @@ type State interface {
RmPin(*cid.Cid) error
// ListPins lists all the pins in the state
ListPins() []*cid.Cid
// HasPin returns true if the state is holding a Cid
HasPin(*cid.Cid) bool
}
// PinTracker represents a component which tracks the status of
@ -105,26 +106,16 @@ type PinTracker interface {
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
// may perform an IPFS unpin operation.
Untrack(*cid.Cid) error
// LocalStatus returns the list of pins with their local status.
LocalStatus() []PinInfo
// GlobalStatus returns the list of pins with their local and remote
// status, which has been fetched.
LocalStatusCid(*cid.Cid) PinInfo
// GlobalStatusCid returns the global status of a given Cid.
GlobalStatus() ([]GlobalPinInfo, error)
// LocalStatusCid returns the local status of a given Cid.
GlobalStatusCid(*cid.Cid) (GlobalPinInfo, error)
// Sync makes sure that the Cid status reflect the real IPFS status. If not,
// the status is marked as error. The return value indicates if the
// Pin status was updated.
// Status returns the list of pins with their local status.
Status() []PinInfo
// StatusCid returns the local status of a given Cid.
StatusCid(*cid.Cid) PinInfo
// Sync makes sure that the Cid status reflect the real IPFS status.
// The return value indicates if the Cid status deserved attention,
// either because its state was updated or because it is in error state.
Sync(*cid.Cid) bool
// Recover attempts to recover an error by re-[un]pinning the item if needed.
// Recover retriggers a Pin/Unpin operation in Cids with error status.
Recover(*cid.Cid) error
// SyncAll runs Sync() on every known Pin. It returns a list of changed Pins
SyncAll() []PinInfo
// SyncState makes sure that the tracked Pins matches those in the
// cluster state and runs SyncAll(). It returns a list of changed Pins.
SyncState(State) []*cid.Cid
}
// Remote represents a component which takes care of
@ -157,7 +148,7 @@ func MakeRPC(ctx context.Context, rpcCh chan RPC, r RPC, waitForResponse bool) R
logger.Debug("cancelling sending RPC")
return RPCResponse{
Data: nil,
Error: errors.New("operation timed out while sending RPC"),
Error: NewRPCError("operation timed out while sending RPC"),
}
default:
logger.Errorf("RPC channel is full. Will retry request %d", r.Op())
@ -185,7 +176,7 @@ func MakeRPC(ctx context.Context, rpcCh chan RPC, r RPC, waitForResponse bool) R
logger.Debug("cancelling waiting for RPC Response")
return RPCResponse{
Data: nil,
Error: errors.New("operation timed out while waiting for response"),
Error: NewRPCError("operation timed out while waiting for response"),
}
}
}

View File

@ -78,7 +78,7 @@ func simulateAnswer(ch <-chan RPC, answer interface{}, err error) {
req := <-ch
req.ResponseCh() <- RPCResponse{
Data: answer,
Error: err,
Error: CastRPCError(err),
}
}()
}

View File

@ -116,7 +116,7 @@ DECODE_ERROR:
logger.Error("error decoding RPC request from Stream:", err)
errResp := RPCResponse{
Data: nil,
Error: errors.New("error decoding request"),
Error: NewRPCError("error decoding request"),
}
r.sendStreamResponse(s, errResp)
return nil, err

View File

@ -2,8 +2,6 @@ package ipfscluster
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -66,7 +64,7 @@ func (st IPFSStatus) String() string {
}
type MapPinTracker struct {
mux sync.Mutex
mux sync.RWMutex
status map[string]PinInfo
ctx context.Context
@ -147,13 +145,10 @@ func (mpt *MapPinTracker) set(c *cid.Cid, s IPFSStatus) {
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
func (mpt *MapPinTracker) unsafeGet(c *cid.Cid) PinInfo {
p, ok := mpt.status[c.String()]
if !ok {
return PinInfo{
// cid: c,
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: Unpinned,
@ -163,6 +158,12 @@ func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
return p
}
func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
mpt.mux.RLock()
defer mpt.mux.RUnlock()
return mpt.unsafeGet(c)
}
func (mpt *MapPinTracker) pin(c *cid.Cid) error {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
@ -184,7 +185,7 @@ func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
mpt.set(c, Unpinning)
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), true)
if resp.Error != nil {
mpt.set(c, PinError)
mpt.set(c, UnpinError)
return resp.Error
}
mpt.set(c, Unpinned)
@ -199,11 +200,11 @@ func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
return mpt.unpin(c)
}
func (mpt *MapPinTracker) LocalStatusCid(c *cid.Cid) PinInfo {
func (mpt *MapPinTracker) StatusCid(c *cid.Cid) PinInfo {
return mpt.get(c)
}
func (mpt *MapPinTracker) LocalStatus() []PinInfo {
func (mpt *MapPinTracker) Status() []PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
pins := make([]PinInfo, 0, len(mpt.status))
@ -213,116 +214,27 @@ func (mpt *MapPinTracker) LocalStatus() []PinInfo {
return pins
}
func (mpt *MapPinTracker) GlobalStatus() ([]GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
rpc := NewRPC(TrackerLocalStatusRPC, nil)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true)
if resp.Error != nil {
return nil, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return nil, errors.New("unexpected responses format")
}
fullMap := make(map[string]GlobalPinInfo)
mergePins := func(pins []PinInfo) {
for _, p := range pins {
item, ok := fullMap[p.CidStr]
c, _ := cid.Decode(p.CidStr)
if !ok {
fullMap[p.CidStr] = GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
p.Peer: p,
},
}
} else {
item.Status[p.Peer] = p
}
}
}
for _, r := range responses {
if r.Error != nil {
logger.Error("error in one of the broadcast responses: ", r.Error)
continue
}
pins, ok := r.Data.([]PinInfo)
if !ok {
return nil, fmt.Errorf("unexpected response format: %+v", r.Data)
}
mergePins(pins)
}
status := make([]GlobalPinInfo, 0, len(fullMap))
for _, v := range fullMap {
status = append(status, v)
}
return status, nil
}
func (mpt *MapPinTracker) GlobalStatusCid(c *cid.Cid) (GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
pin := GlobalPinInfo{
Cid: c,
Status: make(map[peer.ID]PinInfo),
}
rpc := NewRPC(TrackerLocalStatusCidRPC, c)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true)
if resp.Error != nil {
return pin, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return pin, errors.New("unexpected responses format")
}
for _, r := range responses {
if r.Error != nil {
return pin, r.Error
}
info, ok := r.Data.(PinInfo)
if !ok {
return pin, errors.New("unexpected response format")
}
pin.Status[info.Peer] = info
}
return pin, nil
}
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
p := mpt.get(c)
// We assume errors will need a Recover() so we return true
if p.IPFS == PinError || p.IPFS == UnpinError {
return true
}
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSIsPinnedRPC, c), true)
if resp.Error != nil {
if p.IPFS == Pinned || p.IPFS == Pinning {
switch p.IPFS {
case Pinned, Pinning:
mpt.set(c, PinError)
return true
}
if p.IPFS == Unpinned || p.IPFS == Unpinning {
case Unpinned, Unpinning:
mpt.set(c, UnpinError)
return true
case PinError, UnpinError:
return true
default:
return false
}
return false
}
ipfsPinned, ok := resp.Data.(bool)
if !ok {
logger.Error("wrong type of IPFSIsPinnedRPC response")
@ -333,7 +245,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
switch p.IPFS {
case Pinned:
return false
case Pinning:
case Pinning, PinError:
mpt.set(c, Pinned)
return true
case Unpinning:
@ -342,13 +254,15 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
return true
}
return false
case Unpinned:
case Unpinned, UnpinError:
mpt.set(c, UnpinError)
return true
default:
return false
}
} else {
switch p.IPFS {
case Pinned:
case Pinned, PinError:
mpt.set(c, PinError)
return true
case Pinning:
@ -357,89 +271,41 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
return true
}
return false
case Unpinning:
case Unpinning, UnpinError:
mpt.set(c, Unpinned)
return true
case Unpinned:
return false
default:
return false
}
}
return false
}
// Recover will re-track or re-untrack a Cid in error state,
// possibly retriggering an IPFS pinning operation and returning
// only when it is done.
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
p := mpt.get(c)
if p.IPFS != PinError && p.IPFS != UnpinError {
return nil
}
logger.Infof("Recovering %s", c)
var err error
if p.IPFS == PinError {
mpt.pin(c)
err = mpt.Track(c)
}
if p.IPFS == UnpinError {
mpt.unpin(c)
err = mpt.Untrack(c)
}
if err != nil {
logger.Errorf("error recovering %s: %s", c, err)
return err
}
return nil
}
func (mpt *MapPinTracker) SyncAll() []PinInfo {
var changedPins []PinInfo
pins := mpt.LocalStatus()
for _, p := range pins {
c, _ := cid.Decode(p.CidStr)
changed := mpt.Sync(c)
if changed {
changedPins = append(changedPins, p)
}
}
return changedPins
}
func (mpt *MapPinTracker) SyncState(cState State) []*cid.Cid {
clusterPins := cState.ListPins()
clusterMap := make(map[string]struct{})
// Make a map for faster lookup
for _, c := range clusterPins {
var a struct{}
clusterMap[c.String()] = a
}
var toRemove []*cid.Cid
var toAdd []*cid.Cid
var changed []*cid.Cid
mpt.mux.Lock()
// Collect items in the State not in the tracker
for _, c := range clusterPins {
_, ok := mpt.status[c.String()]
if !ok {
toAdd = append(toAdd, c)
}
}
// Collect items in the tracker not in the State
for _, p := range mpt.status {
c, _ := cid.Decode(p.CidStr)
_, ok := clusterMap[p.CidStr]
if !ok {
toRemove = append(toRemove, c)
}
}
// Update new items and mark them as pinning error
for _, c := range toAdd {
mpt.unsafeSet(c, PinError)
changed = append(changed, c)
}
// Mark items that need to be removed as unpin error
for _, c := range toRemove {
mpt.unsafeSet(c, UnpinError)
changed = append(changed, c)
}
mpt.mux.Unlock()
return changed
}
func (mpt *MapPinTracker) RpcChan() <-chan RPC {
return mpt.rpcCh
}

View File

@ -62,7 +62,7 @@ type unpinResp struct {
}
type statusInfo struct {
Status string
IPFS string `json:"ipfs"`
}
type statusCidResp struct {
@ -280,7 +280,7 @@ func (api *RESTAPI) statusHandler(w http.ResponseWriter, r *http.Request) {
rRpc := NewRPC(StatusRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendStatusResponse(w, resp)
sendStatusResponse(w, http.StatusOK, resp)
}
}
@ -292,7 +292,7 @@ func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
op := NewRPC(StatusCidRPC, c)
resp := MakeRPC(ctx, api.rpcCh, op, true)
if checkResponse(w, op.Op(), resp) {
sendStatusCidResponse(w, resp)
sendStatusCidResponse(w, http.StatusOK, resp)
}
}
}
@ -303,7 +303,7 @@ func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
rRpc := NewRPC(GlobalSyncRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendStatusResponse(w, resp)
sendStatusResponse(w, http.StatusAccepted, resp)
}
}
@ -315,7 +315,7 @@ func (api *RESTAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) {
op := NewRPC(GlobalSyncCidRPC, c)
resp := MakeRPC(ctx, api.rpcCh, op, true)
if checkResponse(w, op.Op(), resp) {
sendStatusCidResponse(w, resp)
sendStatusCidResponse(w, http.StatusOK, resp)
}
}
}
@ -347,14 +347,12 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool {
switch op {
case PinRPC: // Pin/Unpin only return errors
case UnpinRPC:
case StatusRPC, LocalSyncRPC, GlobalSyncRPC:
case StatusRPC, GlobalSyncRPC:
_, ok = resp.Data.([]GlobalPinInfo)
case StatusCidRPC, LocalSyncCidRPC, GlobalSyncCidRPC:
case StatusCidRPC, GlobalSyncCidRPC:
_, ok = resp.Data.(GlobalPinInfo)
case PinListRPC:
_, ok = resp.Data.([]*cid.Cid)
case IPFSPinRPC:
case IPFSUnpinRPC:
case VersionRPC:
_, ok = resp.Data.(string)
case MemberListRPC:
@ -399,24 +397,24 @@ func transformPinToStatusCid(p GlobalPinInfo) statusCidResp {
s.Status = make(map[string]statusInfo)
for k, v := range p.Status {
s.Status[k.Pretty()] = statusInfo{
Status: v.IPFS.String(),
IPFS: v.IPFS.String(),
}
}
return s
}
func sendStatusResponse(w http.ResponseWriter, resp RPCResponse) {
func sendStatusResponse(w http.ResponseWriter, code int, resp RPCResponse) {
data := resp.Data.([]GlobalPinInfo)
pins := make(statusResp, 0, len(data))
for _, d := range data {
pins = append(pins, transformPinToStatusCid(d))
}
sendJSONResponse(w, 200, pins)
sendJSONResponse(w, code, pins)
}
func sendStatusCidResponse(w http.ResponseWriter, resp RPCResponse) {
func sendStatusCidResponse(w http.ResponseWriter, code int, resp RPCResponse) {
data := resp.Data.(GlobalPinInfo)
st := transformPinToStatusCid(data)
sendJSONResponse(w, 200, st)
sendJSONResponse(w, code, st)
}

View File

@ -203,7 +203,7 @@ func TestStatusEndpoint(t *testing.T) {
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
CidStr: testCid2,
Peer: testPeerID,
IPFS: Unpinned,
},
@ -213,7 +213,7 @@ func TestStatusEndpoint(t *testing.T) {
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
CidStr: testCid3,
Peer: testPeerID,
IPFS: PinError,
},
@ -234,7 +234,7 @@ func TestStatusEndpoint(t *testing.T) {
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
CidStr: testCid2,
Peer: testPeerID,
IPFS: Unpinning,
},
@ -244,7 +244,7 @@ func TestStatusEndpoint(t *testing.T) {
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
CidStr: testCid3,
Peer: testPeerID,
IPFS: Pinning,
},
@ -260,3 +260,111 @@ func TestStatusEndpoint(t *testing.T) {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
}
func TestStatusCidEndpoint(t *testing.T) {
c, _ := cid.Decode(testCid)
api := testClusterApi(t)
defer api.Shutdown()
pin := GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: Unpinned,
},
},
}
simulateAnswer(api.RpcChan(), pin, nil)
var resp statusCidResp
makeGet(t, "/status/"+testCid, &resp)
if resp.Cid != testCid {
t.Error("expected the same cid")
}
info, ok := resp.Status[testPeerID.Pretty()]
if !ok {
t.Fatal("expected info for testPeerID")
}
if info.IPFS != "unpinned" {
t.Error("expected different status")
}
}
func TestStatusSyncEndpoint(t *testing.T) {
c, _ := cid.Decode(testCid)
c2, _ := cid.Decode(testCid2)
c3, _ := cid.Decode(testCid3)
api := testClusterApi(t)
defer api.Shutdown()
pList := []GlobalPinInfo{
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: PinError,
},
},
Cid: c,
},
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid2,
Peer: testPeerID,
IPFS: UnpinError,
},
},
Cid: c2,
},
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid3,
Peer: testPeerID,
IPFS: PinError,
},
},
Cid: c3,
},
}
simulateAnswer(api.RpcChan(), pList, nil)
var resp statusResp
makeGet(t, "/status", &resp)
if len(resp) != 3 {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
if resp[0].Cid != testCid {
t.Error("unexpected response info")
}
}
func TestStatusSyncCidEndpoint(t *testing.T) {
c, _ := cid.Decode(errorCid)
api := testClusterApi(t)
defer api.Shutdown()
pin := GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: errorCid,
Peer: testPeerID,
IPFS: PinError,
},
},
}
simulateAnswer(api.RpcChan(), pin, nil)
var resp statusCidResp
makePost(t, "/status/"+testCid, &resp)
if resp.Cid != errorCid {
t.Error("expected the same cid")
}
info, ok := resp.Status[testPeerID.Pretty()]
if !ok {
t.Fatal("expected info for testPeerID")
}
if info.IPFS != "pin_error" {
t.Error("expected different status")
}
}

58
rpc.go
View File

@ -4,29 +4,40 @@ import cid "github.com/ipfs/go-cid"
// RPC supported operations.
const (
// Cluster API
PinRPC = iota
UnpinRPC
PinListRPC
IPFSPinRPC
IPFSUnpinRPC
IPFSIsPinnedRPC
ConsensusLogPinRPC
ConsensusLogUnpinRPC
VersionRPC
MemberListRPC
RollbackRPC
LeaderRPC
BroadcastRPC
StatusRPC
StatusCidRPC
LocalSyncRPC
LocalSyncCidRPC
GlobalSyncRPC
GlobalSyncCidRPC
StateSyncRPC
// Tracker
TrackRPC
UntrackRPC
TrackerLocalStatusRPC
TrackerLocalStatusCidRPC
StatusRPC
StatusCidRPC
TrackerStatusRPC
TrackerStatusCidRPC
TrackerRecoverRPC
// IPFS Connector
IPFSPinRPC
IPFSUnpinRPC
IPFSIsPinnedRPC
// Consensus
ConsensusLogPinRPC
ConsensusLogUnpinRPC
// Special
LeaderRPC
BroadcastRPC
RollbackRPC
NoopRPC
)
@ -141,5 +152,26 @@ func NewRPC(m RPCOp, arg interface{}) RPC {
// an error to indicate if the operation was successful.
type RPCResponse struct {
Data interface{}
Error error
Error *RPCError
}
// RPCError is a serializable implementation of error.
type RPCError struct {
Msg string
}
func NewRPCError(m string) *RPCError {
return &RPCError{m}
}
func CastRPCError(err error) *RPCError {
if err != nil {
return NewRPCError(err.Error())
} else {
return nil
}
}
func (r *RPCError) Error() string {
return r.Msg
}

7
silent.go Normal file
View File

@ -0,0 +1,7 @@
// +build !debug,silent
package ipfscluster
func init() {
SetLogLevel("CRITICAL")
}