Global pin status. /status /status/cid will now report pin tracker state

from all cluster members.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-19 18:35:24 +01:00
parent f0c5350743
commit 8172b0ca61
15 changed files with 546 additions and 262 deletions

View File

@ -5,7 +5,9 @@ Things that need to be done:
* ~~Start up sync fix~~
* ~~Allow to shutdown multiple times (+ test)~~
* Efficient SyncAll (use single ipfs pin ls call)
* LeaderRPC implementation
* ~~GlobalStatus~~
* GlobalSync
* ~~LeaderRPC implementation~~
* /pin/add /pin/rm hijack
* End-to-end tests
* ipfscluster-server tool

View File

@ -116,21 +116,21 @@ func (c *Cluster) Shutdown() error {
// IPFS is pinning content that should be pinned locally, and not pinning
// other content. It will also try to recover any failed pin or unpin
// operations by retrigerring them.
func (c *Cluster) LocalSync() ([]Pin, error) {
func (c *Cluster) LocalSync() ([]PinInfo, error) {
cState, err := c.consensus.State()
if err != nil {
return nil, err
}
changed := c.tracker.SyncState(cState)
for _, p := range changed {
logger.Debugf("recovering %s", p.Cid)
err = c.tracker.Recover(p.Cid)
for _, h := range changed {
logger.Debugf("recovering %s", h)
err = c.tracker.Recover(h)
if err != nil {
logger.Errorf("Error recovering %s: %s", p.Cid, err)
logger.Errorf("Error recovering %s: %s", h, err)
return nil, err
}
}
return c.tracker.ListPins(), nil
return c.tracker.LocalStatus(), nil
}
// LocalSyncCid makes sure that the current state of the cluster
@ -138,39 +138,37 @@ func (c *Cluster) LocalSync() ([]Pin, error) {
// makes sure that IPFS is pinning content that should be pinned locally,
// and not pinning other content. It will also try to recover any failed
// pin or unpin operations by retriggering them.
func (c *Cluster) LocalSyncCid(h *cid.Cid) (Pin, error) {
func (c *Cluster) LocalSyncCid(h *cid.Cid) (PinInfo, error) {
changed := c.tracker.Sync(h)
if changed {
err := c.tracker.Recover(h)
if err != nil {
logger.Errorf("Error recovering %s: %s", h, err)
return Pin{}, err
return PinInfo{}, err
}
}
return c.tracker.GetPin(h), nil
return c.tracker.LocalStatusCid(h), nil
}
// GlobalSync triggers Sync() operations in all members of the Cluster.
func (c *Cluster) GlobalSync() ([]Pin, error) {
return c.Status(), nil
func (c *Cluster) GlobalSync() ([]GlobalPinInfo, error) {
return c.Status()
}
// GlobalSunc triggers a Sync() operation for a given Cid in all members
// of the Cluster.
func (c *Cluster) GlobalSyncCid(h *cid.Cid) (Pin, error) {
return c.StatusCid(h), nil
func (c *Cluster) GlobalSyncCid(h *cid.Cid) (GlobalPinInfo, error) {
return c.StatusCid(h)
}
// Status returns the last known status for all Pins tracked by Cluster.
func (c *Cluster) Status() []Pin {
// TODO: Global
return c.tracker.ListPins()
func (c *Cluster) Status() ([]GlobalPinInfo, error) {
return c.tracker.GlobalStatus()
}
// StatusCid returns the last known status for a given Cid
func (c *Cluster) StatusCid(h *cid.Cid) Pin {
// TODO: Global
return c.tracker.GetPin(h)
func (c *Cluster) StatusCid(h *cid.Cid) (GlobalPinInfo, error) {
return c.tracker.GlobalStatusCid(h)
}
// Pins returns the list of Cids managed by Cluster and which are part
@ -198,7 +196,7 @@ func (c *Cluster) Pin(h *cid.Cid) error {
defer cancel()
logger.Info("pinning:", h)
rpc := NewRPC(ConsensusAddPinRPC, h)
rpc := NewRPC(ConsensusLogPinRPC, h)
wrpc := NewRPC(LeaderRPC, rpc)
resp := MakeRPC(ctx, c.rpcCh, wrpc, true)
if resp.Error != nil {
@ -220,7 +218,7 @@ func (c *Cluster) Unpin(h *cid.Cid) error {
defer cancel()
logger.Info("unpinning:", h)
rpc := NewRPC(ConsensusRmPinRPC, h)
rpc := NewRPC(ConsensusLogUnpinRPC, h)
wrpc := NewRPC(LeaderRPC, rpc)
resp := MakeRPC(ctx, c.rpcCh, wrpc, true)
if resp.Error != nil {
@ -305,14 +303,16 @@ func (c *Cluster) handleGenericRPC(grpc *GenericRPC) {
case GlobalSyncRPC:
data, err = c.GlobalSync()
case StatusRPC:
data = c.Status()
data, err = c.Status()
case TrackerLocalStatusRPC:
data = c.tracker.LocalStatus()
case RollbackRPC:
state, ok := grpc.Argument.(State)
if !ok {
err = errors.New("bad RollbackRPC type")
break
}
err = c.consensus.Rollback(state)
// State, ok := grpc.Argument.(State)
// if !ok {
// err = errors.New("bad RollbackRPC type")
// break
// }
// err = c.consensus.Rollback(state)
default:
logger.Error("unknown operation for GenericRPC. Ignoring.")
}
@ -336,30 +336,24 @@ func (c *Cluster) handleCidRPC(crpc *CidRPC) {
err = c.Pin(h)
case UnpinRPC:
err = c.Unpin(h)
case ConsensusAddPinRPC:
err = c.consensus.AddPin(h)
case ConsensusRmPinRPC:
err = c.consensus.RmPin(h)
case ConsensusLogPinRPC:
err = c.consensus.LogPin(h)
case ConsensusLogUnpinRPC:
err = c.consensus.LogUnpin(h)
case TrackRPC:
err = c.tracker.Track(h)
case UntrackRPC:
err = c.tracker.Untrack(h)
case TrackerLocalStatusCidRPC:
data = c.tracker.LocalStatusCid(h)
case IPFSPinRPC:
c.tracker.Pinning(h)
err = c.ipfs.Pin(h)
if err != nil {
c.tracker.PinError(h)
} else {
c.tracker.Pinned(h)
}
case IPFSUnpinRPC:
c.tracker.Unpinning(h)
err = c.ipfs.Unpin(h)
if err != nil {
c.tracker.UnpinError(h)
} else {
c.tracker.Unpinned(h)
}
case IPFSIsPinnedRPC:
data, err = c.ipfs.IsPinned(h)
case StatusCidRPC:
data = c.StatusCid(h)
data, err = c.StatusCid(h)
case LocalSyncCidRPC:
data, err = c.LocalSyncCid(h)
case GlobalSyncCidRPC:
@ -379,8 +373,26 @@ func (c *Cluster) handleCidRPC(crpc *CidRPC) {
func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) {
innerRPC := wrpc.WRPC
var resp RPCResponse
// resp initialization
switch innerRPC.Op() {
case TrackerLocalStatusRPC:
resp = RPCResponse{
Data: []PinInfo{},
Error: nil,
}
case TrackerLocalStatusCidRPC:
resp = RPCResponse{
Data: PinInfo{},
Error: nil,
}
default:
resp = RPCResponse{}
}
switch wrpc.Op() {
case LeaderRPC:
// This is very generic for the moment. Only used for consensus
// LogPin/unpin.
leader, err := c.consensus.Leader()
if err != nil {
resp = RPCResponse{
@ -388,7 +400,7 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) {
Error: err,
}
}
resp, err = c.remote.MakeRemoteRPC(innerRPC, leader)
err = c.remote.MakeRemoteRPC(innerRPC, leader, &resp)
if err != nil {
resp = RPCResponse{
Data: nil,
@ -396,9 +408,37 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) {
}
}
case BroadcastRPC:
var wg sync.WaitGroup
var responses []RPCResponse
members := c.Members()
rch := make(chan RPCResponse, len(members))
makeRemote := func(p peer.ID, r RPCResponse) {
defer wg.Done()
err := c.remote.MakeRemoteRPC(innerRPC, p, &r)
if err != nil {
logger.Error("Error making remote RPC: ", err)
rch <- RPCResponse{
Error: err,
}
} else {
rch <- r
}
}
wg.Add(len(members))
for _, m := range members {
go makeRemote(m, resp)
}
wg.Wait()
close(rch)
for r := range rch {
responses = append(responses, r)
}
resp = RPCResponse{
Data: nil,
Error: errors.New("not implemented"),
Data: responses,
Error: nil,
}
default:
resp = RPCResponse{

View File

@ -57,7 +57,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState
ipfs.rpcCh = make(chan RPC, 2)
cfg := testingConfig()
st := NewMapState()
tracker := NewMapPinTracker()
tracker := NewMapPinTracker(cfg)
remote := NewLibp2pRemote()
cl, err := NewCluster(

View File

@ -2,6 +2,7 @@ package ipfscluster
import (
"encoding/json"
"fmt"
"io/ioutil"
)
@ -38,11 +39,13 @@ type Config struct {
}
func LoadConfig(path string) (*Config, error) {
fmt.Println(path)
config := &Config{}
file, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
json.Unmarshal(file, config)
fmt.Printf("%+v", config)
return config, nil
}

View File

@ -69,14 +69,14 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error)
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
MakeRPC(ctx, op.rpcCh, NewRPC(IPFSPinRPC, c), false)
MakeRPC(ctx, op.rpcCh, NewRPC(TrackRPC, c), false)
case LogOpUnpin:
err := state.RmPin(c)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
MakeRPC(ctx, op.rpcCh, NewRPC(IPFSUnpinRPC, c), false)
MakeRPC(ctx, op.rpcCh, NewRPC(UntrackRPC, c), false)
default:
logger.Error("unknown clusterLogOp type. Ignoring")
}
@ -251,8 +251,8 @@ func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp {
}
}
// AddPin submits a Cid to the shared state of the cluster.
func (cc *Consensus) AddPin(c *cid.Cid) error {
// LogPin submits a Cid to the shared state of the cluster.
func (cc *Consensus) LogPin(c *cid.Cid) error {
// Create pin operation for the log
op := cc.op(c, LogOpPin)
_, err := cc.consensus.CommitOp(op)
@ -264,8 +264,8 @@ func (cc *Consensus) AddPin(c *cid.Cid) error {
return nil
}
// RmPin removes a Cid from the shared state of the cluster.
func (cc *Consensus) RmPin(c *cid.Cid) error {
// LogUnpin removes a Cid from the shared state of the cluster.
func (cc *Consensus) LogUnpin(c *cid.Cid) error {
// Create unpin operation for the log
op := cc.op(c, LogOpUnpin)
_, err := cc.consensus.CommitOp(op)

View File

@ -124,7 +124,7 @@ func TestConsensusPin(t *testing.T) {
defer cc.Shutdown()
c, _ := cid.Decode(testCid)
err := cc.AddPin(c)
err := cc.LogPin(c)
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
@ -147,7 +147,7 @@ func TestConsensusUnpin(t *testing.T) {
defer cc.Shutdown()
c, _ := cid.Decode(testCid2)
err := cc.RmPin(c)
err := cc.LogUnpin(c)
if err != nil {
t.Error("the operation did not make it to the log:", err)
}

View File

@ -15,7 +15,8 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)
// This runs tests using the standard components and the IPFS mock daemon.
// This runs end-to-end tests using the standard components and the IPFS mock
// daemon.
// End-to-end means that all default implementations of components are tested
// together. It is not fully end-to-end because the ipfs daemon is a mock which
// never hangs.
@ -24,7 +25,7 @@ import (
var nClusters = 3
// number of pins to pin/unpin/check
var nPins = 1000
var nPins = 500
// ports
var clusterPort = 20000
@ -50,9 +51,9 @@ func randomBytes() []byte {
return bs
}
func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) {
func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
os.RemoveAll("./e2eTestRaft")
ipfsmock := newIpfsMock()
ipfsMocks := make([]*ipfsMock, 0, nClusters)
clusters := make([]*Cluster, 0, nClusters)
cfgs := make([]*Config, 0, nClusters)
@ -82,6 +83,8 @@ func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) {
// Generate nClusters configs
for i := 0; i < nClusters; i++ {
mock := newIpfsMock()
ipfsMocks = append(ipfsMocks, mock)
cfgs = append(cfgs, &Config{
ID: peers[i].pid,
PrivateKey: peers[i].priv,
@ -93,8 +96,8 @@ func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) {
APIPort: apiPort + i,
IPFSAPIAddr: "127.0.0.1",
IPFSAPIPort: ipfsApiPort + i,
IPFSAddr: ipfsmock.addr,
IPFSPort: ipfsmock.port,
IPFSAddr: mock.addr,
IPFSPort: mock.port,
})
}
@ -104,19 +107,19 @@ func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) {
ipfs, err := NewIPFSHTTPConnector(cfgs[i])
checkErr(t, err)
state := NewMapState()
tracker := NewMapPinTracker()
tracker := NewMapPinTracker(cfgs[i])
remote := NewLibp2pRemote()
cl, err := NewCluster(cfgs[i], api, ipfs, state, tracker, remote)
checkErr(t, err)
clusters = append(clusters, cl)
}
return clusters, ipfsmock
return clusters, ipfsMocks
}
func shutdownClusters(t *testing.T, clusters []*Cluster, m *ipfsMock) {
m.Close()
for _, c := range clusters {
func shutdownClusters(t *testing.T, clusters []*Cluster, m []*ipfsMock) {
for i, c := range clusters {
m[i].Close()
err := c.Shutdown()
if err != nil {
t.Error(err)
@ -163,26 +166,101 @@ func TestE2EPin(t *testing.T) {
for i := 0; i < nPins; i++ {
j := rand.Intn(nClusters) // choose a random cluster member
h, err := prefix.Sum(randomBytes()) // create random cid
fmt.Println(h)
checkErr(t, err)
err = clusters[j].Pin(h)
if err != nil {
t.Errorf("error pinning %s: %s", h, err)
}
// Test re-pin
err = clusters[j].Pin(h)
if err != nil {
t.Errorf("error repinning %s: %s", h, err)
}
}
delay()
f := func(t *testing.T, c *Cluster) {
status := c.tracker.ListPins()
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.LocalStatus()
for _, v := range status {
if v.Status != Pinned {
if v.IPFS != Pinned {
t.Errorf("%s should have been pinned but it is %s",
v.Cid.String,
v.Status.String())
v.CidStr,
v.IPFS.String())
}
}
if l := len(status); l != nPins {
t.Errorf("Pinned %d out of %d requests", l, nPins)
}
}
runF(t, clusters, fpinned)
// Unpin everything
pinList := clusters[0].Pins()
for i := 0; i < nPins; i++ {
j := rand.Intn(nClusters) // choose a random cluster member
err := clusters[j].Unpin(pinList[i])
if err != nil {
t.Errorf("error unpinning %s: %s", pinList[i], err)
}
// test re-unpin
err = clusters[j].Unpin(pinList[i])
if err != nil {
t.Errorf("error re-unpinning %s: %s", pinList[i], err)
}
}
delay()
funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.LocalStatus()
if l := len(status); l != 0 {
t.Errorf("Nothing should be pinned")
}
}
runF(t, clusters, funpinned)
}
func TestE2EStatus(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
delay()
h, _ := cid.Decode(testCid)
clusters[0].Pin(h)
delay()
// Global status
f := func(t *testing.T, c *Cluster) {
statuses, err := c.Status()
if err != nil {
t.Error(err)
}
if len(statuses) == 0 {
t.Fatal("bad status. Expected one item")
}
if statuses[0].Cid.String() != testCid {
t.Error("bad cid in status")
}
info := statuses[0].Status
if len(info) != nClusters {
t.Error("bad info in status")
}
if info[c.host.ID()].IPFS != Pinned {
t.Error("the hash should have been pinned")
}
status, err := c.StatusCid(h)
if err != nil {
t.Error(err)
}
pinfo, ok := status.Status[c.host.ID()]
if !ok {
t.Fatal("Host not in status")
}
if pinfo.IPFS != Pinned {
t.Error("the status should show the hash as pinned")
}
}
runF(t, clusters, f)
}

View File

@ -26,7 +26,7 @@ func main() {
fmt.Println(err)
return
}
api, err := ipfscluster.NewHTTPAPI(clusterCfg)
api, err := ipfscluster.NewRESTAPI(clusterCfg)
if err != nil {
fmt.Println(err)
return
@ -39,7 +39,7 @@ func main() {
}
state := ipfscluster.NewMapState()
tracker := ipfscluster.NewMapPinTracker()
tracker := ipfscluster.NewMapPinTracker(clusterCfg)
remote := ipfscluster.NewLibp2pRemote()
cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker, remote)

View File

@ -166,7 +166,6 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error {
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
logger.Infof("IPFS Pin request for: %s", hash)
pinned, err := ipfs.IsPinned(hash)
if err != nil {
return err
@ -174,16 +173,18 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
if !pinned {
path := fmt.Sprintf("pin/add?arg=%s", hash)
_, err = ipfs.get(path)
if err == nil {
logger.Info("IPFS Pin request succeeded: ", hash)
}
return err
}
logger.Debug("object is already pinned. Doing nothing")
logger.Info("IPFS object is already pinned: ", hash)
return nil
}
// UnPin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
logger.Info("IPFS Unpin request for:", hash)
pinned, err := ipfs.IsPinned(hash)
if err != nil {
return err
@ -191,10 +192,13 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
if pinned {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.get(path)
if err == nil {
logger.Info("IPFS Unpin request succeeded:", hash)
}
return err
}
logger.Debug("object not [directly] pinned. Doing nothing")
logger.Info("IPFS object is already unpinned: ", hash)
return nil
}

View File

@ -99,33 +99,32 @@ type State interface {
// IPFS daemon. This component should be thread safe.
type PinTracker interface {
ClusterComponent
// Pinning tells the pin tracker that a pin is being pinned by IPFS
Pinning(*cid.Cid) error
// Pinned tells the pin tracer is pinned by IPFS
Pinned(*cid.Cid) error
// Pinned tells the pin tracker is being unpinned by IPFS
Unpinning(*cid.Cid) error
// Unpinned tells the pin tracker that a pin has been unpinned by IFPS
Unpinned(*cid.Cid) error
// PinError tells the pin tracker that an IPFS pin operation has failed
PinError(*cid.Cid) error
// UnpinError tells the pin tracker that an IPFS unpin operation has failed
UnpinError(*cid.Cid) error
// ListPins returns the list of pins with their status
ListPins() []Pin
// GetPin returns a Pin.
GetPin(*cid.Cid) Pin
// Track tells the tracker that a Cid is now under its supervision
// The tracker may decide to perform an IPFS pin.
Track(*cid.Cid) error
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
// may perform an IPFS unpin operation.
Untrack(*cid.Cid) error
// LocalStatus returns the list of pins with their local status.
LocalStatus() []PinInfo
// GlobalStatus returns the list of pins with their local and remote
// status, which has been fetched.
LocalStatusCid(*cid.Cid) PinInfo
// GlobalStatusCid returns the global status of a given Cid.
GlobalStatus() ([]GlobalPinInfo, error)
// LocalStatusCid returns the local status of a given Cid.
GlobalStatusCid(*cid.Cid) (GlobalPinInfo, error)
// Sync makes sure that the Cid status reflect the real IPFS status. If not,
// the status is marked as error. The return value indicates if the
// Pin status was updated.
Sync(*cid.Cid) bool
// Recover attempts to recover an error by re-[un]pinning the item.
// Recover attempts to recover an error by re-[un]pinning the item if needed.
Recover(*cid.Cid) error
// SyncAll runs Sync() on every known Pin. It returns a list of changed Pins
SyncAll() []Pin
SyncAll() []PinInfo
// SyncState makes sure that the tracked Pins matches those in the
// cluster state and runs SyncAll(). It returns a list of changed Pins.
SyncState(State) []Pin
SyncState(State) []*cid.Cid
}
// Remote represents a component which takes care of
@ -133,7 +132,10 @@ type PinTracker interface {
// handling any incoming remote requests from other nodes.
type Remote interface {
ClusterComponent
MakeRemoteRPC(rpc RPC, node peer.ID) (RPCResponse, error)
// MakeRemoteRPC performs an RPC requests to a remote peer.
// The response is decoded onto the RPCResponse provided.
MakeRemoteRPC(RPC, peer.ID, *RPCResponse) error
// SetHost provides a libp2p host to use by this remote
SetHost(host.Host)
}

View File

@ -33,7 +33,7 @@ func NewLibp2pRemote() *Libp2pRemote {
r := &Libp2pRemote{
ctx: ctx,
rpcCh: make(chan RPC),
rpcCh: make(chan RPC, RPCMaxQueue),
shutdownCh: make(chan struct{}),
}
@ -91,14 +91,13 @@ func (r *Libp2pRemote) handleRemoteRPC(s *streamWrap) error {
func (r *Libp2pRemote) decodeRPC(s *streamWrap, rpcType int) (RPC, error) {
var err error
switch RPCOpToType[rpcType] {
switch rpcType {
case CidRPCType:
var rpc *CidRPC
err = s.dec.Decode(&rpc)
if err != nil {
goto DECODE_ERROR
}
logger.Debugf("%+v", rpc)
return rpc, nil
case GenericRPCType:
var rpc *GenericRPC
@ -135,43 +134,43 @@ func (r *Libp2pRemote) sendStreamResponse(s *streamWrap, resp RPCResponse) error
return nil
}
func (r *Libp2pRemote) MakeRemoteRPC(rpc RPC, node peer.ID) (RPCResponse, error) {
func (r *Libp2pRemote) MakeRemoteRPC(rpc RPC, node peer.ID, resp *RPCResponse) error {
ctx, cancel := context.WithCancel(r.ctx)
defer cancel()
var resp RPCResponse
if r.host == nil {
return resp, errors.New("no host set")
return errors.New("no host set")
}
if node == r.host.ID() {
// libp2p cannot dial itself
return MakeRPC(ctx, r.rpcCh, rpc, true), nil
*resp = MakeRPC(ctx, r.rpcCh, rpc, true)
return nil
}
s, err := r.host.NewStream(ctx, node, ClusterP2PProtocol)
if err != nil {
return resp, err
return err
}
defer s.Close()
sWrap := wrapStream(s)
logger.Debugf("sending remote RPC %d to %s", rpc.Op(), node)
if err := sWrap.w.WriteByte(byte(rpc.Op())); err != nil {
return resp, err
if err := sWrap.w.WriteByte(byte(rpc.RType())); err != nil {
return err
}
if err := sWrap.enc.Encode(rpc); err != nil {
return resp, err
return err
}
if err := sWrap.w.Flush(); err != nil {
return resp, err
return err
}
logger.Debug("Waiting for response from %s", node)
if err := sWrap.dec.Decode(&resp); err != nil {
return resp, err
if err := sWrap.dec.Decode(resp); err != nil {
return err
}
return resp, nil
return nil
}

View File

@ -2,14 +2,14 @@ package ipfscluster
import (
"context"
"errors"
"fmt"
"sync"
"time"
cid "github.com/ipfs/go-cid"
)
peer "github.com/libp2p/go-libp2p-peer"
const (
pinEverywhere = -1
cid "github.com/ipfs/go-cid"
)
// A Pin or Unpin operation will be considered failed
@ -27,19 +27,27 @@ const (
Pinning
Unpinning
Unpinned
RemotePin
)
type Pin struct {
Cid *cid.Cid
PinMode PinMode
Status PinStatus
TS time.Time
type GlobalPinInfo struct {
Cid *cid.Cid
Status map[peer.ID]PinInfo
}
type PinMode int
type PinStatus int
// PinInfo holds information about local pins. PinInfo is
// serialized when requesting the Global status, therefore
// we cannot use *cid.Cid.
type PinInfo struct {
CidStr string
Peer peer.ID
IPFS IPFSStatus
TS time.Time
}
func (st PinStatus) String() string {
type IPFSStatus int
func (st IPFSStatus) String() string {
switch st {
case PinError:
return "pin_error"
@ -59,10 +67,11 @@ func (st PinStatus) String() string {
type MapPinTracker struct {
mux sync.Mutex
status map[string]Pin
status map[string]PinInfo
ctx context.Context
rpcCh chan RPC
ctx context.Context
rpcCh chan RPC
peerID peer.ID
shutdownLock sync.Mutex
shutdown bool
@ -70,12 +79,19 @@ type MapPinTracker struct {
wg sync.WaitGroup
}
func NewMapPinTracker() *MapPinTracker {
func NewMapPinTracker(cfg *Config) *MapPinTracker {
ctx := context.Background()
pID, err := peer.IDB58Decode(cfg.ID)
if err != nil {
panic(err)
}
mpt := &MapPinTracker{
status: make(map[string]Pin),
rpcCh: make(chan RPC, RPCMaxQueue),
ctx: ctx,
status: make(map[string]PinInfo),
rpcCh: make(chan RPC, RPCMaxQueue),
peerID: pID,
shutdownCh: make(chan struct{}),
}
logger.Info("starting MapPinTracker")
@ -110,74 +126,179 @@ func (mpt *MapPinTracker) Shutdown() error {
return nil
}
func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error {
mpt.mux.Lock()
defer mpt.mux.Unlock()
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s IPFSStatus) {
if s == Unpinned {
delete(mpt.status, c.String())
return nil
}
mpt.status[c.String()] = Pin{
Cid: c,
PinMode: pinEverywhere,
Status: s,
TS: time.Now(),
mpt.status[c.String()] = PinInfo{
// cid: c,
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: s,
TS: time.Now(),
}
return nil
}
func (mpt *MapPinTracker) get(c *cid.Cid) Pin {
func (mpt *MapPinTracker) set(c *cid.Cid, s IPFSStatus) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
p, ok := mpt.status[c.String()]
if !ok {
return Pin{
Cid: c,
Status: Unpinned,
return PinInfo{
// cid: c,
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: Unpinned,
TS: time.Now(),
}
}
return p
}
func (mpt *MapPinTracker) Pinning(c *cid.Cid) error {
return mpt.set(c, Pinning)
func (mpt *MapPinTracker) pin(c *cid.Cid) error {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
mpt.set(c, Pinning)
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSPinRPC, c), true)
if resp.Error != nil {
mpt.set(c, PinError)
return resp.Error
}
mpt.set(c, Pinned)
return nil
}
func (mpt *MapPinTracker) Unpinning(c *cid.Cid) error {
return mpt.set(c, Unpinning)
func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
mpt.set(c, Unpinning)
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), true)
if resp.Error != nil {
mpt.set(c, PinError)
return resp.Error
}
mpt.set(c, Unpinned)
return nil
}
func (mpt *MapPinTracker) Pinned(c *cid.Cid) error {
return mpt.set(c, Pinned)
func (mpt *MapPinTracker) Track(c *cid.Cid) error {
return mpt.pin(c)
}
func (mpt *MapPinTracker) PinError(c *cid.Cid) error {
return mpt.set(c, PinError)
func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
return mpt.unpin(c)
}
func (mpt *MapPinTracker) UnpinError(c *cid.Cid) error {
return mpt.set(c, UnpinError)
}
func (mpt *MapPinTracker) Unpinned(c *cid.Cid) error {
return mpt.set(c, Unpinned)
}
func (mpt *MapPinTracker) GetPin(c *cid.Cid) Pin {
func (mpt *MapPinTracker) LocalStatusCid(c *cid.Cid) PinInfo {
return mpt.get(c)
}
func (mpt *MapPinTracker) ListPins() []Pin {
func (mpt *MapPinTracker) LocalStatus() []PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
pins := make([]Pin, 0, len(mpt.status))
pins := make([]PinInfo, 0, len(mpt.status))
for _, v := range mpt.status {
pins = append(pins, v)
}
return pins
}
func (mpt *MapPinTracker) GlobalStatus() ([]GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
rpc := NewRPC(TrackerLocalStatusRPC, nil)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true)
if resp.Error != nil {
return nil, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return nil, errors.New("unexpected responses format")
}
fullMap := make(map[string]GlobalPinInfo)
mergePins := func(pins []PinInfo) {
for _, p := range pins {
item, ok := fullMap[p.CidStr]
c, _ := cid.Decode(p.CidStr)
if !ok {
fullMap[p.CidStr] = GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
p.Peer: p,
},
}
} else {
item.Status[p.Peer] = p
}
}
}
for _, r := range responses {
if r.Error != nil {
logger.Error("error in one of the broadcast responses: ", r.Error)
continue
}
pins, ok := r.Data.([]PinInfo)
if !ok {
return nil, fmt.Errorf("unexpected response format: %+v", r.Data)
}
mergePins(pins)
}
status := make([]GlobalPinInfo, 0, len(fullMap))
for _, v := range fullMap {
status = append(status, v)
}
return status, nil
}
func (mpt *MapPinTracker) GlobalStatusCid(c *cid.Cid) (GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
pin := GlobalPinInfo{
Cid: c,
Status: make(map[peer.ID]PinInfo),
}
rpc := NewRPC(TrackerLocalStatusCidRPC, c)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true)
if resp.Error != nil {
return pin, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return pin, errors.New("unexpected responses format")
}
for _, r := range responses {
if r.Error != nil {
return pin, r.Error
}
info, ok := r.Data.(PinInfo)
if !ok {
return pin, errors.New("unexpected response format")
}
pin.Status[info.Peer] = info
}
return pin, nil
}
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
@ -185,17 +306,17 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
p := mpt.get(c)
// We assume errors will need a Recover() so we return true
if p.Status == PinError || p.Status == UnpinError {
if p.IPFS == PinError || p.IPFS == UnpinError {
return true
}
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSIsPinnedRPC, c), true)
if resp.Error != nil {
if p.Status == Pinned || p.Status == Pinning {
if p.IPFS == Pinned || p.IPFS == Pinning {
mpt.set(c, PinError)
return true
}
if p.Status == Unpinned || p.Status == Unpinning {
if p.IPFS == Unpinned || p.IPFS == Unpinning {
mpt.set(c, UnpinError)
return true
}
@ -208,7 +329,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
}
if ipfsPinned {
switch p.Status {
switch p.IPFS {
case Pinned:
return false
case Pinning:
@ -225,7 +346,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
return true
}
} else {
switch p.Status {
switch p.IPFS {
case Pinned:
mpt.set(c, PinError)
return true
@ -247,26 +368,25 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
p := mpt.get(c)
if p.Status != PinError && p.Status != UnpinError {
if p.IPFS != PinError && p.IPFS != UnpinError {
return nil
}
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
if p.Status == PinError {
MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSPinRPC, c), false)
if p.IPFS == PinError {
mpt.pin(c)
}
if p.Status == UnpinError {
MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), false)
if p.IPFS == UnpinError {
mpt.unpin(c)
}
return nil
}
func (mpt *MapPinTracker) SyncAll() []Pin {
var changedPins []Pin
pins := mpt.ListPins()
func (mpt *MapPinTracker) SyncAll() []PinInfo {
var changedPins []PinInfo
pins := mpt.LocalStatus()
for _, p := range pins {
changed := mpt.Sync(p.Cid)
c, _ := cid.Decode(p.CidStr)
changed := mpt.Sync(c)
if changed {
changedPins = append(changedPins, p)
}
@ -274,7 +394,7 @@ func (mpt *MapPinTracker) SyncAll() []Pin {
return changedPins
}
func (mpt *MapPinTracker) SyncState(cState State) []Pin {
func (mpt *MapPinTracker) SyncState(cState State) []*cid.Cid {
clusterPins := cState.ListPins()
clusterMap := make(map[string]struct{})
// Make a map for faster lookup
@ -284,7 +404,7 @@ func (mpt *MapPinTracker) SyncState(cState State) []Pin {
}
var toRemove []*cid.Cid
var toAdd []*cid.Cid
var changed []Pin
var changed []*cid.Cid
mpt.mux.Lock()
// Collect items in the State not in the tracker
@ -297,32 +417,23 @@ func (mpt *MapPinTracker) SyncState(cState State) []Pin {
// Collect items in the tracker not in the State
for _, p := range mpt.status {
_, ok := clusterMap[p.Cid.String()]
c, _ := cid.Decode(p.CidStr)
_, ok := clusterMap[p.CidStr]
if !ok {
toRemove = append(toRemove, p.Cid)
toRemove = append(toRemove, c)
}
}
// Update new items and mark them as pinning error
for _, c := range toAdd {
p := Pin{
Cid: c,
PinMode: pinEverywhere,
Status: PinError,
}
mpt.status[c.String()] = p
changed = append(changed, p)
mpt.unsafeSet(c, PinError)
changed = append(changed, c)
}
// Mark items that need to be removed as unpin error
for _, c := range toRemove {
p := Pin{
Cid: c,
PinMode: pinEverywhere,
Status: UnpinError,
}
mpt.status[c.String()] = p
changed = append(changed, p)
mpt.unsafeSet(c, UnpinError)
changed = append(changed, c)
}
mpt.mux.Unlock()
return changed

View File

@ -61,9 +61,13 @@ type unpinResp struct {
Unpinned string `json:"unpinned"`
}
type statusInfo struct {
Status string
}
type statusCidResp struct {
Cid string `json:"cid"`
Status string `json:"status"`
Cid string `json:"cid"`
Status map[string]statusInfo `json:"status"`
}
type statusResp []statusCidResp
@ -296,7 +300,7 @@ func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) {
func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
rRpc := NewRPC(LocalSyncRPC, nil)
rRpc := NewRPC(GlobalSyncRPC, nil)
resp := MakeRPC(ctx, api.rpcCh, rRpc, true)
if checkResponse(w, rRpc.Op(), resp) {
sendStatusResponse(w, resp)
@ -308,7 +312,7 @@ func (api *RESTAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) {
defer cancel()
if c := parseCidOrError(w, r); c != nil {
op := NewRPC(LocalSyncCidRPC, c)
op := NewRPC(GlobalSyncCidRPC, c)
resp := MakeRPC(ctx, api.rpcCh, op, true)
if checkResponse(w, op.Op(), resp) {
sendStatusCidResponse(w, resp)
@ -344,9 +348,9 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool {
case PinRPC: // Pin/Unpin only return errors
case UnpinRPC:
case StatusRPC, LocalSyncRPC, GlobalSyncRPC:
_, ok = resp.Data.([]Pin)
_, ok = resp.Data.([]GlobalPinInfo)
case StatusCidRPC, LocalSyncCidRPC, GlobalSyncCidRPC:
_, ok = resp.Data.(Pin)
_, ok = resp.Data.(GlobalPinInfo)
case PinListRPC:
_, ok = resp.Data.([]*cid.Cid)
case IPFSPinRPC:
@ -389,23 +393,30 @@ func sendErrorResponse(w http.ResponseWriter, code int, msg string) {
sendJSONResponse(w, code, errorResp)
}
func transformPinToStatusCid(p GlobalPinInfo) statusCidResp {
s := statusCidResp{}
s.Cid = p.Cid.String()
s.Status = make(map[string]statusInfo)
for k, v := range p.Status {
s.Status[k.Pretty()] = statusInfo{
Status: v.IPFS.String(),
}
}
return s
}
func sendStatusResponse(w http.ResponseWriter, resp RPCResponse) {
data := resp.Data.([]Pin)
data := resp.Data.([]GlobalPinInfo)
pins := make(statusResp, 0, len(data))
for _, d := range data {
pins = append(pins, statusCidResp{
Cid: d.Cid.String(),
Status: d.Status.String(),
})
pins = append(pins, transformPinToStatusCid(d))
}
sendJSONResponse(w, 200, pins)
}
func sendStatusCidResponse(w http.ResponseWriter, resp RPCResponse) {
data := resp.Data.(Pin)
pin := statusCidResp{
Cid: data.Cid.String(),
Status: data.Status.String(),
}
sendJSONResponse(w, 200, pin)
data := resp.Data.(GlobalPinInfo)
st := transformPinToStatusCid(data)
sendJSONResponse(w, 200, st)
}

View File

@ -189,33 +189,74 @@ func TestStatusEndpoint(t *testing.T) {
c3, _ := cid.Decode(testCid3)
api := testClusterApi(t)
defer api.Shutdown()
pList := []Pin{
Pin{
Status: PinError,
Cid: c,
pList := []GlobalPinInfo{
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: Pinned,
},
},
Cid: c,
},
Pin{
Status: UnpinError,
Cid: c,
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: Unpinned,
},
},
Cid: c2,
},
Pin{
Status: Pinned,
Cid: c3,
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: PinError,
},
},
Cid: c3,
},
Pin{
Status: Pinning,
Cid: c,
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: UnpinError,
},
},
Cid: c,
},
Pin{
Status: Unpinning,
Cid: c2,
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: Unpinning,
},
},
Cid: c2,
},
GlobalPinInfo{
Status: map[peer.ID]PinInfo{
testPeerID: PinInfo{
CidStr: testCid,
Peer: testPeerID,
IPFS: Pinning,
},
},
Cid: c3,
},
}
simulateAnswer(api.RpcChan(), pList, nil)
var resp statusResp
makeGet(t, "/status", &resp)
if len(resp) != 5 {
t.Error("unexpected statusResp: ", resp)
if len(resp) != 6 {
t.Errorf("unexpected statusResp:\n %+v", resp)
}
}

45
rpc.go
View File

@ -10,8 +10,8 @@ const (
IPFSPinRPC
IPFSUnpinRPC
IPFSIsPinnedRPC
ConsensusAddPinRPC
ConsensusRmPinRPC
ConsensusLogPinRPC
ConsensusLogUnpinRPC
VersionRPC
MemberListRPC
RollbackRPC
@ -21,6 +21,10 @@ const (
LocalSyncCidRPC
GlobalSyncRPC
GlobalSyncCidRPC
TrackRPC
UntrackRPC
TrackerLocalStatusRPC
TrackerLocalStatusCidRPC
StatusRPC
StatusCidRPC
@ -35,42 +39,27 @@ const (
WrappedRPCType
)
var RPCOpToType = map[int]int{
PinRPC: CidRPCType,
UnpinRPC: CidRPCType,
PinListRPC: GenericRPCType,
IPFSPinRPC: CidRPCType,
IPFSUnpinRPC: CidRPCType,
IPFSIsPinnedRPC: CidRPCType,
ConsensusAddPinRPC: CidRPCType,
ConsensusRmPinRPC: CidRPCType,
VersionRPC: GenericRPCType,
MemberListRPC: GenericRPCType,
RollbackRPC: GenericRPCType,
LeaderRPC: WrappedRPCType,
BroadcastRPC: WrappedRPCType,
LocalSyncRPC: GenericRPCType,
LocalSyncCidRPC: CidRPCType,
GlobalSyncRPC: GenericRPCType,
GlobalSyncCidRPC: CidRPCType,
StatusRPC: GenericRPCType,
StatusCidRPC: CidRPCType,
}
// RPCMethod identifies which RPC supported operation we are trying to make
// RPCOp identifies which RPC supported operation we are trying to make
type RPCOp int
// RPCType identified which implementation of RPC we are using
type RPCType int
// RPC represents an internal RPC operation. It should be implemented
// by all RPC types.
type RPC interface {
// Op indicates which operation should be performed
Op() RPCOp
// RType indicates which RPC implementation is used
RType() RPCType
// ResponseCh returns a channel to place the response for this RPC
ResponseCh() chan RPCResponse
}
// baseRPC implements RPC and can be included as anonymous
// field in other types.
type baseRPC struct {
Type int
Type RPCType
Method RPCOp
RespChan chan RPCResponse
}
@ -86,6 +75,10 @@ func (brpc *baseRPC) ResponseCh() chan RPCResponse {
return brpc.RespChan
}
func (brpc *baseRPC) RType() RPCType {
return brpc.Type
}
// GenericRPC is a ClusterRPC with generic arguments.
type GenericRPC struct {
baseRPC