Make golint happy
License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
805b867651
commit
3243cfcccf
13
cluster.go
13
cluster.go
|
@ -432,7 +432,7 @@ func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args inte
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
errs := make([]error, len(dests), len(dests))
|
errs := make([]error, len(dests), len(dests))
|
||||||
|
|
||||||
for i, _ := range dests {
|
for i := range dests {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
@ -459,7 +459,7 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er
|
||||||
members := c.Members()
|
members := c.Members()
|
||||||
replies := make([]PinInfo, len(members), len(members))
|
replies := make([]PinInfo, len(members), len(members))
|
||||||
ifaceReplies := make([]interface{}, len(members), len(members))
|
ifaceReplies := make([]interface{}, len(members), len(members))
|
||||||
for i, _ := range replies {
|
for i := range replies {
|
||||||
ifaceReplies[i] = &replies[i]
|
ifaceReplies[i] = &replies[i]
|
||||||
}
|
}
|
||||||
args := NewCidArg(h)
|
args := NewCidArg(h)
|
||||||
|
@ -476,9 +476,9 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er
|
||||||
|
|
||||||
if len(errorMsgs) == 0 {
|
if len(errorMsgs) == 0 {
|
||||||
return pin, nil
|
return pin, nil
|
||||||
} else {
|
|
||||||
return pin, errors.New(errorMsgs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return pin, errors.New(errorMsgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
|
func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
|
||||||
|
@ -488,7 +488,7 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
|
||||||
members := c.Members()
|
members := c.Members()
|
||||||
replies := make([][]PinInfo, len(members), len(members))
|
replies := make([][]PinInfo, len(members), len(members))
|
||||||
ifaceReplies := make([]interface{}, len(members), len(members))
|
ifaceReplies := make([]interface{}, len(members), len(members))
|
||||||
for i, _ := range replies {
|
for i := range replies {
|
||||||
ifaceReplies[i] = &replies[i]
|
ifaceReplies[i] = &replies[i]
|
||||||
}
|
}
|
||||||
errs := c.multiRPC(members, "Cluster", method, struct{}{}, ifaceReplies)
|
errs := c.multiRPC(members, "Cluster", method, struct{}{}, ifaceReplies)
|
||||||
|
@ -525,7 +525,6 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
|
||||||
|
|
||||||
if len(errorMsgs) == 0 {
|
if len(errorMsgs) == 0 {
|
||||||
return infos, nil
|
return infos, nil
|
||||||
} else {
|
|
||||||
return infos, errors.New(errorMsgs)
|
|
||||||
}
|
}
|
||||||
|
return infos, errors.New(errorMsgs)
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ func (c *mockComponent) SetClient(client *rpc.Client) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockApi struct {
|
type mockAPI struct {
|
||||||
mockComponent
|
mockComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,8 +52,8 @@ func (ipfs *mockConnector) IsPinned(c *cid.Cid) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState, *MapPinTracker) {
|
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *MapState, *MapPinTracker) {
|
||||||
api := &mockApi{}
|
api := &mockAPI{}
|
||||||
ipfs := &mockConnector{}
|
ipfs := &mockConnector{}
|
||||||
cfg := testingConfig()
|
cfg := testingConfig()
|
||||||
st := NewMapState()
|
st := NewMapState()
|
||||||
|
|
|
@ -23,6 +23,9 @@ const (
|
||||||
DefaultClusterPort = 9096
|
DefaultClusterPort = 9096
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Config represents an ipfs-cluster configuration which can be
|
||||||
|
// saved and loaded from disk. Currently it holds configuration
|
||||||
|
// values used by all components.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
// Libp2p ID and private key for Cluster communication (including)
|
// Libp2p ID and private key for Cluster communication (including)
|
||||||
// the Consensus component.
|
// the Consensus component.
|
||||||
|
@ -55,6 +58,8 @@ type Config struct {
|
||||||
IPFSPort int `json:"ipfs_port"`
|
IPFSPort int `json:"ipfs_port"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LoadConfig reads a JSON configuration file from the given path,
|
||||||
|
// parses it and returns a new Config object.
|
||||||
func LoadConfig(path string) (*Config, error) {
|
func LoadConfig(path string) (*Config, error) {
|
||||||
config := &Config{}
|
config := &Config{}
|
||||||
file, err := ioutil.ReadFile(path)
|
file, err := ioutil.ReadFile(path)
|
||||||
|
@ -100,6 +105,7 @@ func NewDefaultConfig() (*Config, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Save stores a configuration as a JSON file in the given path.
|
||||||
func (c *Config) Save(path string) error {
|
func (c *Config) Save(path string) error {
|
||||||
json, err := json.MarshalIndent(c, "", " ")
|
json, err := json.MarshalIndent(c, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
12
consensus.go
12
consensus.go
|
@ -280,6 +280,10 @@ func (cc *Consensus) LogUnpin(c *cid.Cid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// State retrieves the current consensus State. It may error
|
||||||
|
// if no State has been agreed upon or the state is not
|
||||||
|
// consistent. The returned State is the last agreed-upon
|
||||||
|
// State known by this node.
|
||||||
func (cc *Consensus) State() (State, error) {
|
func (cc *Consensus) State() (State, error) {
|
||||||
st, err := cc.consensus.GetLogHead()
|
st, err := cc.consensus.GetLogHead()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -292,15 +296,17 @@ func (cc *Consensus) State() (State, error) {
|
||||||
return state, nil
|
return state, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Leader() returns the peerID of the Leader of the
|
// Leader returns the peerID of the Leader of the
|
||||||
// cluster.
|
// cluster. It returns an error when there is no leader.
|
||||||
func (cc *Consensus) Leader() (peer.ID, error) {
|
func (cc *Consensus) Leader() (peer.ID, error) {
|
||||||
// FIXME: Hashicorp Raft specific
|
// FIXME: Hashicorp Raft specific
|
||||||
raftactor := cc.actor.(*libp2praft.Actor)
|
raftactor := cc.actor.(*libp2praft.Actor)
|
||||||
return raftactor.Leader()
|
return raftactor.Leader()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO
|
// Rollback replaces the current agreed-upon
|
||||||
|
// state with the state provided. Only the consensus leader
|
||||||
|
// can perform this operation.
|
||||||
func (cc *Consensus) Rollback(state State) error {
|
func (cc *Consensus) Rollback(state State) error {
|
||||||
return cc.consensus.Rollback(state)
|
return cc.consensus.Rollback(state)
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,7 +157,7 @@ func TestConsensusUnpin(t *testing.T) {
|
||||||
func TestConsensusLeader(t *testing.T) {
|
func TestConsensusLeader(t *testing.T) {
|
||||||
cc := testingConsensus(t)
|
cc := testingConsensus(t)
|
||||||
cfg := testingConfig()
|
cfg := testingConfig()
|
||||||
pId := cfg.ID
|
pID := cfg.ID
|
||||||
defer cleanRaft()
|
defer cleanRaft()
|
||||||
defer cc.Shutdown()
|
defer cc.Shutdown()
|
||||||
l, err := cc.Leader()
|
l, err := cc.Leader()
|
||||||
|
@ -165,7 +165,7 @@ func TestConsensusLeader(t *testing.T) {
|
||||||
t.Fatal("No leader:", err)
|
t.Fatal("No leader:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.Pretty() != pId {
|
if l.Pretty() != pID {
|
||||||
t.Errorf("expected %s but the leader appears as %s", pId, l)
|
t.Errorf("expected %s but the leader appears as %s", pID, l)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -206,7 +206,7 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnPin performs an unpin request against the configured IPFS
|
// Unpin performs an unpin request against the configured IPFS
|
||||||
// daemon.
|
// daemon.
|
||||||
func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
|
func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
|
||||||
pinned, err := ipfs.IsPinned(hash)
|
pinned, err := ipfs.IsPinned(hash)
|
||||||
|
@ -226,6 +226,8 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsPinned performs a "pin ls" request against the configured IPFS
|
||||||
|
// daemon. It returns true when the given Cid is pinned not indirectly.
|
||||||
func (ipfs *IPFSHTTPConnector) IsPinned(hash *cid.Cid) (bool, error) {
|
func (ipfs *IPFSHTTPConnector) IsPinned(hash *cid.Cid) (bool, error) {
|
||||||
pinType, err := ipfs.pinType(hash)
|
pinType, err := ipfs.pinType(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -238,7 +240,10 @@ func (ipfs *IPFSHTTPConnector) IsPinned(hash *cid.Cid) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns how a hash is pinned
|
// pinType performs a pin ls request and returns the information associated
|
||||||
|
// to the key. Unfortunately, the daemon does not provide an standarized
|
||||||
|
// output, so it may well be a sentence like "$hash is indirectly pinned through
|
||||||
|
// $otherhash".
|
||||||
func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
|
func (ipfs *IPFSHTTPConnector) pinType(hash *cid.Cid) (string, error) {
|
||||||
lsPath := fmt.Sprintf("pin/ls?arg=%s", hash)
|
lsPath := fmt.Sprintf("pin/ls?arg=%s", hash)
|
||||||
body, err := ipfs.get(lsPath)
|
body, err := ipfs.get(lsPath)
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
// package ipfscluster implements a wrapper for the IPFS deamon which
|
// Package ipfscluster implements a wrapper for the IPFS deamon which
|
||||||
// allows to orchestrate a number of tasks between several IPFS nodes.
|
// allows to orchestrate pinning operations among several IPFS nodes.
|
||||||
//
|
//
|
||||||
// IPFS Cluster uses a consensus algorithm and libP2P to keep a shared
|
// IPFS Cluster uses a go-libp2p-raft to keep a shared state between
|
||||||
// state between the different members of the cluster and provides
|
// the different members of the cluster. It also uses LibP2P to enable
|
||||||
// components to interact with the IPFS daemon and provide public
|
// communication between its different components, which perform different
|
||||||
// and internal APIs.
|
// tasks like managing the underlying IPFS daemons, or providing APIs for
|
||||||
|
// external control.
|
||||||
package ipfscluster
|
package ipfscluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -19,20 +20,13 @@ import (
|
||||||
|
|
||||||
var logger = logging.Logger("cluster")
|
var logger = logging.Logger("cluster")
|
||||||
|
|
||||||
// Current Cluster version.
|
// Version is the current cluster version. Version alignment between
|
||||||
|
// components, apis and tools ensures compatibility among them.
|
||||||
const Version = "0.0.1"
|
const Version = "0.0.1"
|
||||||
|
|
||||||
// RPCProtocol is used to send libp2p messages between cluster members
|
// RPCProtocol is used to send libp2p messages between cluster members
|
||||||
var RPCProtocol protocol.ID = "/ipfscluster/" + Version + "/rpc"
|
var RPCProtocol protocol.ID = "/ipfscluster/" + Version + "/rpc"
|
||||||
|
|
||||||
// RPCMaxQueue can be used to set the size of the RPC channels,
|
|
||||||
// which will start blocking on send after reaching this number.
|
|
||||||
var RPCMaxQueue = 256
|
|
||||||
|
|
||||||
// MakeRPCRetryInterval specifies how long to wait before retrying
|
|
||||||
// to put a RPC request in the channel in MakeRPC().
|
|
||||||
var MakeRPCRetryInterval time.Duration = 1 * time.Second
|
|
||||||
|
|
||||||
// SilentRaft controls whether all Raft log messages are discarded.
|
// SilentRaft controls whether all Raft log messages are discarded.
|
||||||
var SilentRaft = true
|
var SilentRaft = true
|
||||||
|
|
||||||
|
@ -49,6 +43,67 @@ func SetLogLevel(l string) {
|
||||||
logging.SetLogLevel("cluster", l)
|
logging.SetLogLevel("cluster", l)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IPFSStatus values
|
||||||
|
const (
|
||||||
|
// IPFSStatus should never take this value
|
||||||
|
Bug = iota
|
||||||
|
// An error occurred pinning
|
||||||
|
PinError
|
||||||
|
// An error occurred unpinning
|
||||||
|
UnpinError
|
||||||
|
// The IPFS daemon has pinned the item
|
||||||
|
Pinned
|
||||||
|
// The IPFS daemon is currently pinning the item
|
||||||
|
Pinning
|
||||||
|
// The IPFS daemon is currently unpinning the item
|
||||||
|
Unpinning
|
||||||
|
// The IPFS daemon is not pinning the item
|
||||||
|
Unpinned
|
||||||
|
// The IPFS deamon is not pinning the item but it is being tracked
|
||||||
|
RemotePin
|
||||||
|
)
|
||||||
|
|
||||||
|
// IPFSStatus represents the status of a tracked Cid in the IPFS daemon
|
||||||
|
type IPFSStatus int
|
||||||
|
|
||||||
|
// GlobalPinInfo contains cluster-wide status information about a tracked Cid,
|
||||||
|
// indexed by cluster member.
|
||||||
|
type GlobalPinInfo struct {
|
||||||
|
Cid *cid.Cid
|
||||||
|
Status map[peer.ID]PinInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// PinInfo holds information about local pins. PinInfo is
|
||||||
|
// serialized when requesting the Global status, therefore
|
||||||
|
// we cannot use *cid.Cid.
|
||||||
|
type PinInfo struct {
|
||||||
|
CidStr string
|
||||||
|
Peer peer.ID
|
||||||
|
IPFS IPFSStatus
|
||||||
|
TS time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// String converts an IPFSStatus into a readable string.
|
||||||
|
func (st IPFSStatus) String() string {
|
||||||
|
switch st {
|
||||||
|
case Bug:
|
||||||
|
return "bug"
|
||||||
|
case PinError:
|
||||||
|
return "pin_error"
|
||||||
|
case UnpinError:
|
||||||
|
return "unpin_error"
|
||||||
|
case Pinned:
|
||||||
|
return "pinned"
|
||||||
|
case Pinning:
|
||||||
|
return "pinning"
|
||||||
|
case Unpinning:
|
||||||
|
return "unpinning"
|
||||||
|
case Unpinned:
|
||||||
|
return "unpinned"
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
// ClusterComponent represents a piece of ipfscluster. Cluster components
|
// ClusterComponent represents a piece of ipfscluster. Cluster components
|
||||||
// usually run their own goroutines (a http server for example). They
|
// usually run their own goroutines (a http server for example). They
|
||||||
// communicate with the main Cluster component and other components
|
// communicate with the main Cluster component and other components
|
||||||
|
|
|
@ -34,7 +34,7 @@ var (
|
||||||
// ports
|
// ports
|
||||||
clusterPort = 20000
|
clusterPort = 20000
|
||||||
apiPort = 20500
|
apiPort = 20500
|
||||||
ipfsApiPort = 21000
|
ipfsAPIPort = 21000
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -100,7 +100,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) {
|
||||||
APIAddr: "127.0.0.1",
|
APIAddr: "127.0.0.1",
|
||||||
APIPort: apiPort + i,
|
APIPort: apiPort + i,
|
||||||
IPFSAPIAddr: "127.0.0.1",
|
IPFSAPIAddr: "127.0.0.1",
|
||||||
IPFSAPIPort: ipfsApiPort + i,
|
IPFSAPIPort: ipfsAPIPort + i,
|
||||||
IPFSAddr: mock.addr,
|
IPFSAddr: mock.addr,
|
||||||
IPFSPort: mock.port,
|
IPFSPort: mock.port,
|
||||||
})
|
})
|
||||||
|
|
|
@ -18,54 +18,8 @@ var (
|
||||||
UnpinningTimeout = 10 * time.Second
|
UnpinningTimeout = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// MapPinTracker is a PinTracker implementation which uses a Go map
|
||||||
Bad = iota
|
// to store the status of the tracked Cids. This component is thread-safe.
|
||||||
PinError
|
|
||||||
UnpinError
|
|
||||||
Pinned
|
|
||||||
Pinning
|
|
||||||
Unpinning
|
|
||||||
Unpinned
|
|
||||||
RemotePin
|
|
||||||
)
|
|
||||||
|
|
||||||
type GlobalPinInfo struct {
|
|
||||||
Cid *cid.Cid
|
|
||||||
Status map[peer.ID]PinInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// PinInfo holds information about local pins. PinInfo is
|
|
||||||
// serialized when requesting the Global status, therefore
|
|
||||||
// we cannot use *cid.Cid.
|
|
||||||
type PinInfo struct {
|
|
||||||
CidStr string
|
|
||||||
Peer peer.ID
|
|
||||||
IPFS IPFSStatus
|
|
||||||
TS time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type IPFSStatus int
|
|
||||||
|
|
||||||
func (st IPFSStatus) String() string {
|
|
||||||
switch st {
|
|
||||||
case Bad:
|
|
||||||
return "bug"
|
|
||||||
case PinError:
|
|
||||||
return "pin_error"
|
|
||||||
case UnpinError:
|
|
||||||
return "unpin_error"
|
|
||||||
case Pinned:
|
|
||||||
return "pinned"
|
|
||||||
case Pinning:
|
|
||||||
return "pinning"
|
|
||||||
case Unpinning:
|
|
||||||
return "unpinning"
|
|
||||||
case Unpinned:
|
|
||||||
return "unpinned"
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
type MapPinTracker struct {
|
type MapPinTracker struct {
|
||||||
mux sync.RWMutex
|
mux sync.RWMutex
|
||||||
status map[string]PinInfo
|
status map[string]PinInfo
|
||||||
|
@ -81,6 +35,8 @@ type MapPinTracker struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewMapPinTracker returns a new object which has been correcly
|
||||||
|
// initialized with the given configuration.
|
||||||
func NewMapPinTracker(cfg *Config) *MapPinTracker {
|
func NewMapPinTracker(cfg *Config) *MapPinTracker {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -101,6 +57,7 @@ func NewMapPinTracker(cfg *Config) *MapPinTracker {
|
||||||
return mpt
|
return mpt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// run does nothing other than give MapPinTracker a cancellable context.
|
||||||
func (mpt *MapPinTracker) run() {
|
func (mpt *MapPinTracker) run() {
|
||||||
mpt.wg.Add(1)
|
mpt.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -113,6 +70,8 @@ func (mpt *MapPinTracker) run() {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shutdown finishes the services provided by the MapPinTracker and cancels
|
||||||
|
// any active context.
|
||||||
func (mpt *MapPinTracker) Shutdown() error {
|
func (mpt *MapPinTracker) Shutdown() error {
|
||||||
mpt.shutdownLock.Lock()
|
mpt.shutdownLock.Lock()
|
||||||
defer mpt.shutdownLock.Unlock()
|
defer mpt.shutdownLock.Unlock()
|
||||||
|
@ -200,18 +159,26 @@ func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track tells the MapPinTracker to start managing a Cid,
|
||||||
|
// possibly trigerring Pin operations on the IPFS daemon.
|
||||||
func (mpt *MapPinTracker) Track(c *cid.Cid) error {
|
func (mpt *MapPinTracker) Track(c *cid.Cid) error {
|
||||||
return mpt.pin(c)
|
return mpt.pin(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Untrack tells the MapPinTracker to stop managing a Cid.
|
||||||
|
// If the Cid is pinned locally, it will be unpinned.
|
||||||
func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
|
func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
|
||||||
return mpt.unpin(c)
|
return mpt.unpin(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatusCid returns information for a Cid tracked by this
|
||||||
|
// MapPinTracker.
|
||||||
func (mpt *MapPinTracker) StatusCid(c *cid.Cid) PinInfo {
|
func (mpt *MapPinTracker) StatusCid(c *cid.Cid) PinInfo {
|
||||||
return mpt.get(c)
|
return mpt.get(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Status returns information for all Cids tracked by this
|
||||||
|
// MapPinTracker.
|
||||||
func (mpt *MapPinTracker) Status() []PinInfo {
|
func (mpt *MapPinTracker) Status() []PinInfo {
|
||||||
mpt.mux.Lock()
|
mpt.mux.Lock()
|
||||||
defer mpt.mux.Unlock()
|
defer mpt.mux.Unlock()
|
||||||
|
@ -222,6 +189,11 @@ func (mpt *MapPinTracker) Status() []PinInfo {
|
||||||
return pins
|
return pins
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync verifies that the status of a Cid matches the status
|
||||||
|
// of it in the IPFS daemon. If not, it will be transitioned
|
||||||
|
// to Pin or Unpin error. Sync returns true if the status was
|
||||||
|
// modified or the status is error. Pins in error states can be
|
||||||
|
// recovered with Recover().
|
||||||
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
|
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
|
||||||
var ipfsPinned bool
|
var ipfsPinned bool
|
||||||
p := mpt.get(c)
|
p := mpt.get(c)
|
||||||
|
@ -310,6 +282,8 @@ func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetClient makes the MapPinTracker ready to perform RPC requests to
|
||||||
|
// other components.
|
||||||
func (mpt *MapPinTracker) SetClient(c *rpc.Client) {
|
func (mpt *MapPinTracker) SetClient(c *rpc.Client) {
|
||||||
mpt.rpcClient = c
|
mpt.rpcClient = c
|
||||||
mpt.rpcReady <- struct{}{}
|
mpt.rpcReady <- struct{}{}
|
||||||
|
|
11
map_state.go
11
map_state.go
|
@ -6,19 +6,21 @@ import (
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MapState is a very simple database to store
|
// MapState is a very simple database to store the state of the system
|
||||||
// the state of the system.
|
// using a Go map. It is thread safe. It implements the State interface.
|
||||||
type MapState struct {
|
type MapState struct {
|
||||||
mux sync.RWMutex
|
mux sync.RWMutex
|
||||||
PinMap map[string]struct{}
|
PinMap map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewMapState initializes the internal map and returns a new MapState object.
|
||||||
func NewMapState() *MapState {
|
func NewMapState() *MapState {
|
||||||
return &MapState{
|
return &MapState{
|
||||||
PinMap: make(map[string]struct{}),
|
PinMap: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddPin adds a Cid to the internal map.
|
||||||
func (st *MapState) AddPin(c *cid.Cid) error {
|
func (st *MapState) AddPin(c *cid.Cid) error {
|
||||||
st.mux.Lock()
|
st.mux.Lock()
|
||||||
defer st.mux.Unlock()
|
defer st.mux.Unlock()
|
||||||
|
@ -27,6 +29,7 @@ func (st *MapState) AddPin(c *cid.Cid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RmPin removes a Cid from the internal map.
|
||||||
func (st *MapState) RmPin(c *cid.Cid) error {
|
func (st *MapState) RmPin(c *cid.Cid) error {
|
||||||
st.mux.Lock()
|
st.mux.Lock()
|
||||||
defer st.mux.Unlock()
|
defer st.mux.Unlock()
|
||||||
|
@ -34,6 +37,7 @@ func (st *MapState) RmPin(c *cid.Cid) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasPin returns true if the Cid belongs to the State.
|
||||||
func (st *MapState) HasPin(c *cid.Cid) bool {
|
func (st *MapState) HasPin(c *cid.Cid) bool {
|
||||||
st.mux.RLock()
|
st.mux.RLock()
|
||||||
defer st.mux.RUnlock()
|
defer st.mux.RUnlock()
|
||||||
|
@ -41,11 +45,12 @@ func (st *MapState) HasPin(c *cid.Cid) bool {
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListPins provides a list of Cids in the State.
|
||||||
func (st *MapState) ListPins() []*cid.Cid {
|
func (st *MapState) ListPins() []*cid.Cid {
|
||||||
st.mux.RLock()
|
st.mux.RLock()
|
||||||
defer st.mux.RUnlock()
|
defer st.mux.RUnlock()
|
||||||
cids := make([]*cid.Cid, 0, len(st.PinMap))
|
cids := make([]*cid.Cid, 0, len(st.PinMap))
|
||||||
for k, _ := range st.PinMap {
|
for k := range st.PinMap {
|
||||||
c, _ := cid.Decode(k)
|
c, _ := cid.Decode(k)
|
||||||
cids = append(cids, c)
|
cids = append(cids, c)
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ func TestRESTAPIPinEndpoint(t *testing.T) {
|
||||||
|
|
||||||
errResp := errorResp{}
|
errResp := errorResp{}
|
||||||
makePost(t, "/pins/"+errorCid, &errResp)
|
makePost(t, "/pins/"+errorCid, &errResp)
|
||||||
if errResp.Message != badCidError.Error() {
|
if errResp.Message != errBadCid.Error() {
|
||||||
t.Error("expected different error: ", errResp.Message)
|
t.Error("expected different error: ", errResp.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ func TestRESTAPIUnpinEndpoint(t *testing.T) {
|
||||||
|
|
||||||
errResp := errorResp{}
|
errResp := errorResp{}
|
||||||
makeDelete(t, "/pins/"+errorCid, &errResp)
|
makeDelete(t, "/pins/"+errorCid, &errResp)
|
||||||
if errResp.Message != badCidError.Error() {
|
if errResp.Message != errBadCid.Error() {
|
||||||
t.Error("expected different error: ", errResp.Message)
|
t.Error("expected different error: ", errResp.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
36
rpc_api.go
36
rpc_api.go
|
@ -5,18 +5,33 @@ import (
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
peer "github.com/libp2p/go-libp2p-peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// RPCAPI is a go-libp2p-rpc service which provides the internal ipfs-cluster
|
||||||
|
// API, which enables components and members of the cluster to communicate and
|
||||||
|
// request actions from each other.
|
||||||
|
//
|
||||||
|
// The RPC API methods are usually redirects to the actual methods in
|
||||||
|
// the different components of ipfs-cluster, with very little added logic.
|
||||||
|
// Refer to documentation on those methods for details on their behaviour.
|
||||||
type RPCAPI struct {
|
type RPCAPI struct {
|
||||||
cluster *Cluster
|
cluster *Cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CidArg is an arguments that carry a Cid. It may carry more things in the
|
||||||
|
// future.
|
||||||
type CidArg struct {
|
type CidArg struct {
|
||||||
Cid string
|
Cid string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewCidArg returns a CidArg which carries the given Cid. It panics if it is
|
||||||
|
// nil.
|
||||||
func NewCidArg(c *cid.Cid) *CidArg {
|
func NewCidArg(c *cid.Cid) *CidArg {
|
||||||
|
if c == nil {
|
||||||
|
panic("Cid cannot be nil")
|
||||||
|
}
|
||||||
return &CidArg{c.String()}
|
return &CidArg{c.String()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CID decodes and returns a Cid from a CidArg.
|
||||||
func (arg *CidArg) CID() (*cid.Cid, error) {
|
func (arg *CidArg) CID() (*cid.Cid, error) {
|
||||||
c, err := cid.Decode(arg.Cid)
|
c, err := cid.Decode(arg.Cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -29,6 +44,7 @@ func (arg *CidArg) CID() (*cid.Cid, error) {
|
||||||
Cluster components methods
|
Cluster components methods
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Pin runs Cluster.Pin().
|
||||||
func (api *RPCAPI) Pin(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) Pin(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -37,6 +53,7 @@ func (api *RPCAPI) Pin(in *CidArg, out *struct{}) error {
|
||||||
return api.cluster.Pin(c)
|
return api.cluster.Pin(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unpin runs Cluster.Unpin().
|
||||||
func (api *RPCAPI) Unpin(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) Unpin(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -45,6 +62,7 @@ func (api *RPCAPI) Unpin(in *CidArg, out *struct{}) error {
|
||||||
return api.cluster.Unpin(c)
|
return api.cluster.Unpin(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PinList runs Cluster.Pins().
|
||||||
func (api *RPCAPI) PinList(in struct{}, out *[]string) error {
|
func (api *RPCAPI) PinList(in struct{}, out *[]string) error {
|
||||||
cidList := api.cluster.Pins()
|
cidList := api.cluster.Pins()
|
||||||
cidStrList := make([]string, 0, len(cidList))
|
cidStrList := make([]string, 0, len(cidList))
|
||||||
|
@ -55,22 +73,26 @@ func (api *RPCAPI) PinList(in struct{}, out *[]string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Version runs Cluster.Version().
|
||||||
func (api *RPCAPI) Version(in struct{}, out *string) error {
|
func (api *RPCAPI) Version(in struct{}, out *string) error {
|
||||||
*out = api.cluster.Version()
|
*out = api.cluster.Version()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MemberList runs Cluster.Members().
|
||||||
func (api *RPCAPI) MemberList(in struct{}, out *[]peer.ID) error {
|
func (api *RPCAPI) MemberList(in struct{}, out *[]peer.ID) error {
|
||||||
*out = api.cluster.Members()
|
*out = api.cluster.Members()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Status runs Cluster.Status().
|
||||||
func (api *RPCAPI) Status(in struct{}, out *[]GlobalPinInfo) error {
|
func (api *RPCAPI) Status(in struct{}, out *[]GlobalPinInfo) error {
|
||||||
pinfo, err := api.cluster.Status()
|
pinfo, err := api.cluster.Status()
|
||||||
*out = pinfo
|
*out = pinfo
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatusCid runs Cluster.StatusCid().
|
||||||
func (api *RPCAPI) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
func (api *RPCAPI) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -81,12 +103,14 @@ func (api *RPCAPI) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LocalSync runs Cluster.LocalSync().
|
||||||
func (api *RPCAPI) LocalSync(in struct{}, out *[]PinInfo) error {
|
func (api *RPCAPI) LocalSync(in struct{}, out *[]PinInfo) error {
|
||||||
pinfo, err := api.cluster.LocalSync()
|
pinfo, err := api.cluster.LocalSync()
|
||||||
*out = pinfo
|
*out = pinfo
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LocalSyncCid runs Cluster.LocalSyncCid().
|
||||||
func (api *RPCAPI) LocalSyncCid(in *CidArg, out *PinInfo) error {
|
func (api *RPCAPI) LocalSyncCid(in *CidArg, out *PinInfo) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -97,12 +121,14 @@ func (api *RPCAPI) LocalSyncCid(in *CidArg, out *PinInfo) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GlobalSync runs Cluster.GlobalSync().
|
||||||
func (api *RPCAPI) GlobalSync(in struct{}, out *[]GlobalPinInfo) error {
|
func (api *RPCAPI) GlobalSync(in struct{}, out *[]GlobalPinInfo) error {
|
||||||
pinfo, err := api.cluster.GlobalSync()
|
pinfo, err := api.cluster.GlobalSync()
|
||||||
*out = pinfo
|
*out = pinfo
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GlobalSyncCid runs Cluster.GlobalSyncCid().
|
||||||
func (api *RPCAPI) GlobalSyncCid(in *CidArg, out *GlobalPinInfo) error {
|
func (api *RPCAPI) GlobalSyncCid(in *CidArg, out *GlobalPinInfo) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -113,6 +139,7 @@ func (api *RPCAPI) GlobalSyncCid(in *CidArg, out *GlobalPinInfo) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StateSync runs Cluster.StateSync().
|
||||||
func (api *RPCAPI) StateSync(in struct{}, out *[]PinInfo) error {
|
func (api *RPCAPI) StateSync(in struct{}, out *[]PinInfo) error {
|
||||||
pinfo, err := api.cluster.StateSync()
|
pinfo, err := api.cluster.StateSync()
|
||||||
*out = pinfo
|
*out = pinfo
|
||||||
|
@ -123,6 +150,7 @@ func (api *RPCAPI) StateSync(in struct{}, out *[]PinInfo) error {
|
||||||
Tracker component methods
|
Tracker component methods
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Track runs PinTracker.Track().
|
||||||
func (api *RPCAPI) Track(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) Track(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -131,6 +159,7 @@ func (api *RPCAPI) Track(in *CidArg, out *struct{}) error {
|
||||||
return api.cluster.tracker.Track(c)
|
return api.cluster.tracker.Track(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Untrack runs PinTracker.Untrack().
|
||||||
func (api *RPCAPI) Untrack(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) Untrack(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -139,11 +168,13 @@ func (api *RPCAPI) Untrack(in *CidArg, out *struct{}) error {
|
||||||
return api.cluster.tracker.Untrack(c)
|
return api.cluster.tracker.Untrack(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TrackerStatus runs PinTracker.Status().
|
||||||
func (api *RPCAPI) TrackerStatus(in struct{}, out *[]PinInfo) error {
|
func (api *RPCAPI) TrackerStatus(in struct{}, out *[]PinInfo) error {
|
||||||
*out = api.cluster.tracker.Status()
|
*out = api.cluster.tracker.Status()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TrackerStatusCid runs PinTracker.StatusCid().
|
||||||
func (api *RPCAPI) TrackerStatusCid(in *CidArg, out *PinInfo) error {
|
func (api *RPCAPI) TrackerStatusCid(in *CidArg, out *PinInfo) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -160,6 +191,7 @@ func (api *RPCAPI) TrackerStatusCid(in *CidArg, out *PinInfo) error {
|
||||||
IPFS Connector component methods
|
IPFS Connector component methods
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// IPFSPin runs IPFSConnector.Pin().
|
||||||
func (api *RPCAPI) IPFSPin(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) IPFSPin(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -168,6 +200,7 @@ func (api *RPCAPI) IPFSPin(in *CidArg, out *struct{}) error {
|
||||||
return api.cluster.ipfs.Pin(c)
|
return api.cluster.ipfs.Pin(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IPFSUnpin runs IPFSConnector.Unpin().
|
||||||
func (api *RPCAPI) IPFSUnpin(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) IPFSUnpin(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -176,6 +209,7 @@ func (api *RPCAPI) IPFSUnpin(in *CidArg, out *struct{}) error {
|
||||||
return api.cluster.ipfs.Unpin(c)
|
return api.cluster.ipfs.Unpin(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IPFSIsPinned runs IPFSConnector.IsPinned().
|
||||||
func (api *RPCAPI) IPFSIsPinned(in *CidArg, out *bool) error {
|
func (api *RPCAPI) IPFSIsPinned(in *CidArg, out *bool) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -190,6 +224,7 @@ func (api *RPCAPI) IPFSIsPinned(in *CidArg, out *bool) error {
|
||||||
Consensus component methods
|
Consensus component methods
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// ConsensusLogPin runs Consensus.LogPin().
|
||||||
func (api *RPCAPI) ConsensusLogPin(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) ConsensusLogPin(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -198,6 +233,7 @@ func (api *RPCAPI) ConsensusLogPin(in *CidArg, out *struct{}) error {
|
||||||
return api.cluster.consensus.LogPin(c)
|
return api.cluster.consensus.LogPin(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConsensusLogUnpin runs Consensus.LogUnpin().
|
||||||
func (api *RPCAPI) ConsensusLogUnpin(in *CidArg, out *struct{}) error {
|
func (api *RPCAPI) ConsensusLogUnpin(in *CidArg, out *struct{}) error {
|
||||||
c, err := in.CID()
|
c, err := in.CID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
peer "github.com/libp2p/go-libp2p-peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
var badCidError = errors.New("used the error cid so we error the op")
|
var errBadCid = errors.New("used the error cid so we error the op")
|
||||||
|
|
||||||
type mockService struct{}
|
type mockService struct{}
|
||||||
|
|
||||||
|
@ -26,14 +26,14 @@ func mockRPCClient(t *testing.T) *rpc.Client {
|
||||||
|
|
||||||
func (mock *mockService) Pin(in *CidArg, out *struct{}) error {
|
func (mock *mockService) Pin(in *CidArg, out *struct{}) error {
|
||||||
if in.Cid == errorCid {
|
if in.Cid == errorCid {
|
||||||
return badCidError
|
return errBadCid
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mock *mockService) Unpin(in *CidArg, out *struct{}) error {
|
func (mock *mockService) Unpin(in *CidArg, out *struct{}) error {
|
||||||
if in.Cid == errorCid {
|
if in.Cid == errorCid {
|
||||||
return badCidError
|
return errBadCid
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ func (mock *mockService) Status(in struct{}, out *[]GlobalPinInfo) error {
|
||||||
|
|
||||||
func (mock *mockService) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
func (mock *mockService) StatusCid(in *CidArg, out *GlobalPinInfo) error {
|
||||||
if in.Cid == errorCid {
|
if in.Cid == errorCid {
|
||||||
return badCidError
|
return errBadCid
|
||||||
}
|
}
|
||||||
c1, _ := cid.Decode(testCid1)
|
c1, _ := cid.Decode(testCid1)
|
||||||
*out = GlobalPinInfo{
|
*out = GlobalPinInfo{
|
||||||
|
|
Loading…
Reference in New Issue
Block a user