da24114ae0
The IPFS pin/update endpoint takes two arguments and usually unpins the first and pins the second. It is a bit more efficient to do it in a single operation than two separate ones. This will make the proxy endpoint hijack pin/update requests. First, the FROM pin is fetched from the state. If present, we set the options (replication factors, actual allocations) from that pin to the new one. Then we pin the TO item and proceed to unpin the FROM item when `unpin` is not false. We need to support path resolving, just like IPFS, therefore it was necessary to expose IPFSResolve() via RPC.
493 lines
13 KiB
Go
493 lines
13 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"context"
|
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
"go.opencensus.io/trace"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
)
|
|
|
|
// RPCAPI is a go-libp2p-gorpc service which provides the internal ipfs-cluster
|
|
// API, which enables components and cluster peers to communicate and
|
|
// request actions from each other.
|
|
//
|
|
// The RPC API methods are usually redirects to the actual methods in
|
|
// the different components of ipfs-cluster, with very little added logic.
|
|
// Refer to documentation on those methods for details on their behaviour.
|
|
type RPCAPI struct {
|
|
c *Cluster
|
|
}
|
|
|
|
/*
|
|
Cluster components methods
|
|
*/
|
|
|
|
// ID runs Cluster.ID()
|
|
func (rpcapi *RPCAPI) ID(ctx context.Context, in struct{}, out *api.ID) error {
|
|
id := rpcapi.c.ID(ctx)
|
|
*out = *id
|
|
return nil
|
|
}
|
|
|
|
// Pin runs Cluster.Pin().
|
|
func (rpcapi *RPCAPI) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
return rpcapi.c.Pin(ctx, in)
|
|
}
|
|
|
|
// Unpin runs Cluster.Unpin().
|
|
func (rpcapi *RPCAPI) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
return rpcapi.c.Unpin(ctx, in.Cid)
|
|
}
|
|
|
|
// PinPath resolves path into a cid and runs Cluster.Pin().
|
|
func (rpcapi *RPCAPI) PinPath(ctx context.Context, in *api.PinPath, out *api.Pin) error {
|
|
pin, err := rpcapi.c.PinPath(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pin
|
|
return nil
|
|
}
|
|
|
|
// UnpinPath resolves path into a cid and runs Cluster.Unpin().
|
|
func (rpcapi *RPCAPI) UnpinPath(ctx context.Context, in *api.PinPath, out *api.Pin) error {
|
|
pin, err := rpcapi.c.UnpinPath(ctx, in.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pin
|
|
return nil
|
|
}
|
|
|
|
// Pins runs Cluster.Pins().
|
|
func (rpcapi *RPCAPI) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) error {
|
|
cidList, err := rpcapi.c.Pins(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = cidList
|
|
return nil
|
|
}
|
|
|
|
// PinGet runs Cluster.PinGet().
|
|
func (rpcapi *RPCAPI) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error {
|
|
pin, err := rpcapi.c.PinGet(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pin
|
|
return nil
|
|
}
|
|
|
|
// Version runs Cluster.Version().
|
|
func (rpcapi *RPCAPI) Version(ctx context.Context, in struct{}, out *api.Version) error {
|
|
*out = api.Version{
|
|
Version: rpcapi.c.Version(),
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Peers runs Cluster.Peers().
|
|
func (rpcapi *RPCAPI) Peers(ctx context.Context, in struct{}, out *[]*api.ID) error {
|
|
*out = rpcapi.c.Peers(ctx)
|
|
return nil
|
|
}
|
|
|
|
// PeerAdd runs Cluster.PeerAdd().
|
|
func (rpcapi *RPCAPI) PeerAdd(ctx context.Context, in peer.ID, out *api.ID) error {
|
|
id, err := rpcapi.c.PeerAdd(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *id
|
|
return nil
|
|
}
|
|
|
|
// ConnectGraph runs Cluster.GetConnectGraph().
|
|
func (rpcapi *RPCAPI) ConnectGraph(ctx context.Context, in struct{}, out *api.ConnectGraph) error {
|
|
graph, err := rpcapi.c.ConnectGraph()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = graph
|
|
return nil
|
|
}
|
|
|
|
// PeerRemove runs Cluster.PeerRm().
|
|
func (rpcapi *RPCAPI) PeerRemove(ctx context.Context, in peer.ID, out *struct{}) error {
|
|
return rpcapi.c.PeerRemove(ctx, in)
|
|
}
|
|
|
|
// Join runs Cluster.Join().
|
|
func (rpcapi *RPCAPI) Join(ctx context.Context, in api.Multiaddr, out *struct{}) error {
|
|
return rpcapi.c.Join(ctx, in.Value())
|
|
}
|
|
|
|
// StatusAll runs Cluster.StatusAll().
|
|
func (rpcapi *RPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
|
|
pinfos, err := rpcapi.c.StatusAll(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = pinfos
|
|
return nil
|
|
}
|
|
|
|
// StatusAllLocal runs Cluster.StatusAllLocal().
|
|
func (rpcapi *RPCAPI) StatusAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
|
|
pinfos := rpcapi.c.StatusAllLocal(ctx)
|
|
*out = pinfos
|
|
return nil
|
|
}
|
|
|
|
// Status runs Cluster.Status().
|
|
func (rpcapi *RPCAPI) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
|
|
pinfo, err := rpcapi.c.Status(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pinfo
|
|
return nil
|
|
}
|
|
|
|
// StatusLocal runs Cluster.StatusLocal().
|
|
func (rpcapi *RPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
|
|
pinfo := rpcapi.c.StatusLocal(ctx, in)
|
|
*out = *pinfo
|
|
return nil
|
|
}
|
|
|
|
// SyncAll runs Cluster.SyncAll().
|
|
func (rpcapi *RPCAPI) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
|
|
pinfos, err := rpcapi.c.SyncAll(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = pinfos
|
|
return nil
|
|
}
|
|
|
|
// SyncAllLocal runs Cluster.SyncAllLocal().
|
|
func (rpcapi *RPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
|
|
pinfos, err := rpcapi.c.SyncAllLocal(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = pinfos
|
|
return nil
|
|
}
|
|
|
|
// Sync runs Cluster.Sync().
|
|
func (rpcapi *RPCAPI) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
|
|
pinfo, err := rpcapi.c.Sync(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pinfo
|
|
return nil
|
|
}
|
|
|
|
// SyncLocal runs Cluster.SyncLocal().
|
|
func (rpcapi *RPCAPI) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
|
|
pinfo, err := rpcapi.c.SyncLocal(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pinfo
|
|
return nil
|
|
}
|
|
|
|
// RecoverAllLocal runs Cluster.RecoverAllLocal().
|
|
func (rpcapi *RPCAPI) RecoverAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
|
|
pinfos, err := rpcapi.c.RecoverAllLocal(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = pinfos
|
|
return nil
|
|
}
|
|
|
|
// Recover runs Cluster.Recover().
|
|
func (rpcapi *RPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
|
|
pinfo, err := rpcapi.c.Recover(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pinfo
|
|
return nil
|
|
}
|
|
|
|
// RecoverLocal runs Cluster.RecoverLocal().
|
|
func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
|
|
pinfo, err := rpcapi.c.RecoverLocal(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *pinfo
|
|
return nil
|
|
}
|
|
|
|
// BlockAllocate returns allocations for blocks. This is used in the adders.
|
|
// It's different from pin allocations when ReplicationFactor < 0.
|
|
func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
|
|
err := rpcapi.c.setupPin(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Return the current peer list.
|
|
if in.ReplicationFactorMin < 0 {
|
|
// Returned metrics are Valid and belong to current
|
|
// Cluster peers.
|
|
metrics := rpcapi.c.monitor.LatestMetrics(ctx, pingMetricName)
|
|
peers := make([]peer.ID, len(metrics), len(metrics))
|
|
for i, m := range metrics {
|
|
peers[i] = m.Peer
|
|
}
|
|
|
|
*out = peers
|
|
return nil
|
|
}
|
|
|
|
allocs, err := rpcapi.c.allocate(
|
|
ctx,
|
|
in.Cid,
|
|
in.ReplicationFactorMin,
|
|
in.ReplicationFactorMax,
|
|
[]peer.ID{}, // blacklist
|
|
[]peer.ID{}, // prio list
|
|
)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
*out = allocs
|
|
return nil
|
|
}
|
|
|
|
// SendInformerMetric runs Cluster.sendInformerMetric().
|
|
func (rpcapi *RPCAPI) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
|
|
m, err := rpcapi.c.sendInformerMetric(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *m
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
Tracker component methods
|
|
*/
|
|
|
|
// Track runs PinTracker.Track().
|
|
func (rpcapi *RPCAPI) Track(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/tracker/Track")
|
|
defer span.End()
|
|
return rpcapi.c.tracker.Track(ctx, in)
|
|
}
|
|
|
|
// Untrack runs PinTracker.Untrack().
|
|
func (rpcapi *RPCAPI) Untrack(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/tracker/Untrack")
|
|
defer span.End()
|
|
return rpcapi.c.tracker.Untrack(ctx, in.Cid)
|
|
}
|
|
|
|
// TrackerStatusAll runs PinTracker.StatusAll().
|
|
func (rpcapi *RPCAPI) TrackerStatusAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/tracker/StatusAll")
|
|
defer span.End()
|
|
*out = rpcapi.c.tracker.StatusAll(ctx)
|
|
return nil
|
|
}
|
|
|
|
// TrackerStatus runs PinTracker.Status().
|
|
func (rpcapi *RPCAPI) TrackerStatus(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/tracker/Status")
|
|
defer span.End()
|
|
pinfo := rpcapi.c.tracker.Status(ctx, in)
|
|
*out = *pinfo
|
|
return nil
|
|
}
|
|
|
|
// TrackerRecoverAll runs PinTracker.RecoverAll().f
|
|
func (rpcapi *RPCAPI) TrackerRecoverAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/tracker/RecoverAll")
|
|
defer span.End()
|
|
pinfos, err := rpcapi.c.tracker.RecoverAll(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = pinfos
|
|
return nil
|
|
}
|
|
|
|
// TrackerRecover runs PinTracker.Recover().
|
|
func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/tracker/Recover")
|
|
defer span.End()
|
|
pinfo, err := rpcapi.c.tracker.Recover(ctx, in)
|
|
*out = *pinfo
|
|
return err
|
|
}
|
|
|
|
/*
|
|
IPFS Connector component methods
|
|
*/
|
|
|
|
// IPFSPin runs IPFSConnector.Pin().
|
|
func (rpcapi *RPCAPI) IPFSPin(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/ipfsconn/IPFSPin")
|
|
defer span.End()
|
|
return rpcapi.c.ipfs.Pin(ctx, in.Cid, in.MaxDepth)
|
|
}
|
|
|
|
// IPFSUnpin runs IPFSConnector.Unpin().
|
|
func (rpcapi *RPCAPI) IPFSUnpin(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
return rpcapi.c.ipfs.Unpin(ctx, in.Cid)
|
|
}
|
|
|
|
// IPFSPinLsCid runs IPFSConnector.PinLsCid().
|
|
func (rpcapi *RPCAPI) IPFSPinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
|
|
b, err := rpcapi.c.ipfs.PinLsCid(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = b
|
|
return nil
|
|
}
|
|
|
|
// IPFSPinLs runs IPFSConnector.PinLs().
|
|
func (rpcapi *RPCAPI) IPFSPinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
|
|
m, err := rpcapi.c.ipfs.PinLs(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = m
|
|
return nil
|
|
}
|
|
|
|
// IPFSConnectSwarms runs IPFSConnector.ConnectSwarms().
|
|
func (rpcapi *RPCAPI) IPFSConnectSwarms(ctx context.Context, in struct{}, out *struct{}) error {
|
|
err := rpcapi.c.ipfs.ConnectSwarms(ctx)
|
|
return err
|
|
}
|
|
|
|
// IPFSConfigKey runs IPFSConnector.ConfigKey().
|
|
func (rpcapi *RPCAPI) IPFSConfigKey(ctx context.Context, in string, out *interface{}) error {
|
|
res, err := rpcapi.c.ipfs.ConfigKey(in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = res
|
|
return nil
|
|
}
|
|
|
|
// IPFSRepoStat runs IPFSConnector.RepoStat().
|
|
func (rpcapi *RPCAPI) IPFSRepoStat(ctx context.Context, in struct{}, out *api.IPFSRepoStat) error {
|
|
res, err := rpcapi.c.ipfs.RepoStat(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = *res
|
|
return err
|
|
}
|
|
|
|
// IPFSSwarmPeers runs IPFSConnector.SwarmPeers().
|
|
func (rpcapi *RPCAPI) IPFSSwarmPeers(ctx context.Context, in struct{}, out *[]peer.ID) error {
|
|
res, err := rpcapi.c.ipfs.SwarmPeers(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = res
|
|
return nil
|
|
}
|
|
|
|
// IPFSBlockPut runs IPFSConnector.BlockPut().
|
|
func (rpcapi *RPCAPI) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
|
|
return rpcapi.c.ipfs.BlockPut(ctx, in)
|
|
}
|
|
|
|
// IPFSBlockGet runs IPFSConnector.BlockGet().
|
|
func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in cid.Cid, out *[]byte) error {
|
|
res, err := rpcapi.c.ipfs.BlockGet(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = res
|
|
return nil
|
|
}
|
|
|
|
// IPFSResolve runs IPFSConnector.Resolve().
|
|
func (rpcapi *RPCAPI) IPFSResolve(ctx context.Context, in string, out *cid.Cid) error {
|
|
c, err := rpcapi.c.ipfs.Resolve(ctx, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = c
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
Consensus component methods
|
|
*/
|
|
|
|
// ConsensusLogPin runs Consensus.LogPin().
|
|
func (rpcapi *RPCAPI) ConsensusLogPin(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/consensus/LogPin")
|
|
defer span.End()
|
|
return rpcapi.c.consensus.LogPin(ctx, in)
|
|
}
|
|
|
|
// ConsensusLogUnpin runs Consensus.LogUnpin().
|
|
func (rpcapi *RPCAPI) ConsensusLogUnpin(ctx context.Context, in *api.Pin, out *struct{}) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/consensus/LogUnpin")
|
|
defer span.End()
|
|
return rpcapi.c.consensus.LogUnpin(ctx, in)
|
|
}
|
|
|
|
// ConsensusAddPeer runs Consensus.AddPeer().
|
|
func (rpcapi *RPCAPI) ConsensusAddPeer(ctx context.Context, in peer.ID, out *struct{}) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/consensus/AddPeer")
|
|
defer span.End()
|
|
return rpcapi.c.consensus.AddPeer(ctx, in)
|
|
}
|
|
|
|
// ConsensusRmPeer runs Consensus.RmPeer().
|
|
func (rpcapi *RPCAPI) ConsensusRmPeer(ctx context.Context, in peer.ID, out *struct{}) error {
|
|
ctx, span := trace.StartSpan(ctx, "rpc/consensus/RmPeer")
|
|
defer span.End()
|
|
return rpcapi.c.consensus.RmPeer(ctx, in)
|
|
}
|
|
|
|
// ConsensusPeers runs Consensus.Peers().
|
|
func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]peer.ID) error {
|
|
peers, err := rpcapi.c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*out = peers
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
PeerMonitor
|
|
*/
|
|
|
|
// PeerMonitorLogMetric runs PeerMonitor.LogMetric().
|
|
func (rpcapi *RPCAPI) PeerMonitorLogMetric(ctx context.Context, in *api.Metric, out *struct{}) error {
|
|
return rpcapi.c.monitor.LogMetric(ctx, in)
|
|
}
|
|
|
|
// PeerMonitorLatestMetrics runs PeerMonitor.LatestMetrics().
|
|
func (rpcapi *RPCAPI) PeerMonitorLatestMetrics(ctx context.Context, in string, out *[]*api.Metric) error {
|
|
*out = rpcapi.c.monitor.LatestMetrics(ctx, in)
|
|
return nil
|
|
}
|