Fix #87: Implement ipfs-cluster-ctl pin ls <cid>

I have updated API endpoints to be /allocations rather than /pinlinst

It's more self-explanatory.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-04-06 04:27:02 +02:00
parent 4ae6486e26
commit bb82c27b25
11 changed files with 196 additions and 49 deletions

View File

@ -122,7 +122,8 @@ This is a quick summary of API endpoints offered by the Rest API component (thes
|GET |/peers |Cluster peers|
|POST |/peers |Add new peer|
|DELETE|/peers/{peerID} |Remove a peer|
|GET |/pinlist |List of pins in the consensus state|
|GET |/allocations |List of pins and their allocations (consensus-shared state)|
|GET |/allocations/{cid} |Show a single pin and its allocations (from the consensus-shared state)|
|GET |/pins |Status of all tracked CIDs|
|POST |/pins/sync |Sync all|
|GET |/pins/{cid} |Status of single CID|

View File

@ -147,12 +147,17 @@ func (rest *RESTAPI) routes() []route {
},
{
"Pins",
"Allocations",
"GET",
"/pinlist",
rest.pinListHandler,
"/allocations",
rest.allocationsHandler,
},
{
"Allocation",
"GET",
"/allocations/{hash}",
rest.allocationHandler,
},
{
"StatusAll",
"GET",
@ -334,16 +339,32 @@ func (rest *RESTAPI) unpinHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (rest *RESTAPI) pinListHandler(w http.ResponseWriter, r *http.Request) {
func (rest *RESTAPI) allocationsHandler(w http.ResponseWriter, r *http.Request) {
var pins []api.PinSerial
err := rest.rpcClient.Call("",
"Cluster",
"PinList",
"Pins",
struct{}{},
&pins)
sendResponse(w, err, pins)
}
func (rest *RESTAPI) allocationHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c.Cid != "" {
var pin api.PinSerial
err := rest.rpcClient.Call("",
"Cluster",
"PinGet",
c,
&pin)
if err != nil { // errors here are 404s
sendErrorResponse(w, 404, err.Error())
return
}
sendJSONResponse(w, 200, pin)
}
}
func (rest *RESTAPI) statusAllHandler(w http.ResponseWriter, r *http.Request) {
var pinInfos []api.GlobalPinInfoSerial
err := rest.rpcClient.Call("",
@ -464,6 +485,7 @@ func sendAcceptedResponse(w http.ResponseWriter, rpcErr error) {
}
func sendJSONResponse(w http.ResponseWriter, code int, resp interface{}) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(resp); err != nil {
panic(err)

View File

@ -188,12 +188,12 @@ func TestRESTAPIUnpinEndpoint(t *testing.T) {
}
}
func TestRESTAPIPinListEndpoint(t *testing.T) {
func TestRESTAPIAllocationsEndpoint(t *testing.T) {
rest := testRESTAPI(t)
defer rest.Shutdown()
var resp []api.PinSerial
makeGet(t, "/pinlist", &resp)
makeGet(t, "/allocations", &resp)
if len(resp) != 3 ||
resp[0].Cid != test.TestCid1 || resp[1].Cid != test.TestCid2 ||
resp[2].Cid != test.TestCid3 {
@ -201,6 +201,23 @@ func TestRESTAPIPinListEndpoint(t *testing.T) {
}
}
func TestRESTAPIAllocationEndpoint(t *testing.T) {
rest := testRESTAPI(t)
defer rest.Shutdown()
var resp api.PinSerial
makeGet(t, "/allocations/"+test.TestCid1, &resp)
if resp.Cid != test.TestCid1 {
t.Error("cid should be the same")
}
errResp := api.Error{}
makeGet(t, "/allocations/"+test.ErrorCid, &errResp)
if errResp.Code != 404 {
t.Error("a non-pinned cid should 404")
}
}
func TestRESTAPIStatusAllEndpoint(t *testing.T) {
rest := testRESTAPI(t)
defer rest.Shutdown()

View File

@ -732,7 +732,8 @@ func (c *Cluster) Recover(h *cid.Cid) (api.GlobalPinInfo, error) {
// Pins returns the list of Cids managed by Cluster and which are part
// of the current global state. This is the source of truth as to which
// pins are managed, but does not indicate if the item is successfully pinned.
// pins are managed and their allocation, but does not indicate if
// the item is successfully pinned. For that, use StatusAll().
func (c *Cluster) Pins() []api.Pin {
cState, err := c.consensus.State()
if err != nil {
@ -743,6 +744,25 @@ func (c *Cluster) Pins() []api.Pin {
return cState.List()
}
// PinGet returns information for a single Cid managed by Cluster.
// The information is obtained from the current global state. The
// returned api.Pin provides information about the allocations
// assigned for the requested Cid, but does not provide indicate if
// the item is successfully pinned. For that, use Status(). PinGet
// returns an error if the given Cid is not part of the global state.
func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) {
cState, err := c.consensus.State()
if err != nil {
logger.Error(err)
return api.Pin{}, err
}
pin := cState.Get(h)
if pin.Cid == nil {
return pin, errors.New("Cid is not part of the global state")
}
return pin, nil
}
// Pin makes the cluster Pin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state. Depending on the cluster
// pinning strategy, the PinTracker may then request the IPFS daemon

View File

@ -193,6 +193,52 @@ func TestClusterPin(t *testing.T) {
}
}
func TestClusterPins(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cl.Pin(api.PinCid(c))
if err != nil {
t.Fatal("pin should have worked:", err)
}
pins := cl.Pins()
if len(pins) != 1 {
t.Fatal("pin should be part of the state")
}
if !pins[0].Cid.Equals(c) || pins[0].ReplicationFactor != -1 {
t.Error("the Pin does not look as expected")
}
}
func TestClusterPinGet(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cl.Pin(api.PinCid(c))
if err != nil {
t.Fatal("pin should have worked:", err)
}
pin, err := cl.PinGet(c)
if err != nil {
t.Fatal(err)
}
if !pin.Cid.Equals(c) || pin.ReplicationFactor != -1 {
t.Error("the Pin does not look as expected")
}
c2, _ := cid.Decode(test.TestCid2)
_, err = cl.PinGet(c2)
if err == nil {
t.Fatal("expected an error")
}
}
func TestClusterUnpin(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()

View File

@ -274,13 +274,19 @@ although unpinning operations in the cluster may take longer or fail.
UsageText: `
This command will list the CIDs which are tracked by IPFS Cluster and to
which peers they are currently allocated. This list does not include
any monitoring information about the
merely represents the list of pins which are part of the global state of
the cluster. For specific information, use "status".
any monitoring information about the IPFS status of the CIDs, it
merely represents the list of pins which are part of the shared state of
the cluster. For IPFS-status information about the pins, use "status".
`,
Flags: []cli.Flag{parseFlag(formatPin)},
ArgsUsage: "[cid]",
Flags: []cli.Flag{parseFlag(formatPin)},
Action: func(c *cli.Context) error {
resp := request("GET", "/pinlist", nil)
cidStr := c.Args().First()
if cidStr != "" {
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
}
resp := request("GET", "/allocations/"+cidStr, nil)
formatResponse(c, resp)
return nil
},

View File

@ -263,6 +263,7 @@ func (ipfs *Connector) defaultHandler(w http.ResponseWriter, r *http.Request) {
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
res := ipfsError{errMsg}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write(resBytes)
return
@ -298,6 +299,7 @@ func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Re
Pins: []string{arg},
}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
@ -315,45 +317,51 @@ func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
pinLs := ipfsPinLsResp{}
pinLs.Keys = make(map[string]ipfsPinType)
var pins []api.PinSerial
err := ipfs.rpcClient.Call("",
"Cluster",
"PinList",
struct{}{},
&pins)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
for _, pin := range pins {
pinLs.Keys[pin.Cid] = ipfsPinType{
Type: "recursive",
}
}
argA, ok := r.URL.Query()["arg"]
if ok {
if len(argA) == 0 {
ipfsErrorResponder(w, "Error: bad argument")
q := r.URL.Query()
arg := q.Get("arg")
if arg != "" {
c, err := cid.Decode(arg)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
arg := argA[0]
singlePin, ok := pinLs.Keys[arg]
if ok {
pinLs.Keys = map[string]ipfsPinType{
arg: singlePin,
}
} else {
var pin api.PinSerial
err = ipfs.rpcClient.Call("",
"Cluster",
"PinGet",
api.PinCid(c).ToSerial(),
&pin)
if err != nil {
ipfsErrorResponder(w, fmt.Sprintf(
"Error: path '%s' is not pinned",
arg))
return
}
pinLs.Keys[pin.Cid] = ipfsPinType{
Type: "recursive",
}
} else {
var pins []api.PinSerial
err := ipfs.rpcClient.Call("",
"Cluster",
"Pins",
struct{}{},
&pins)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
for _, pin := range pins {
pinLs.Keys[pin.Cid] = ipfsPinType{
Type: "recursive",
}
}
}
resBytes, _ := json.Marshal(pinLs)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
}

View File

@ -327,6 +327,15 @@ func TestIPFSProxyPinLs(t *testing.T) {
if len(resp.Keys) != 3 {
t.Error("wrong response")
}
res3, err := http.Get(fmt.Sprintf("%s/pin/ls?arg=%s", proxyURL(ipfs), test.ErrorCid))
if err != nil {
t.Fatal("should have succeeded: ", err)
}
defer res3.Body.Close()
if res3.StatusCode != http.StatusInternalServerError {
t.Error("the request should have failed")
}
}
func TestProxyAdd(t *testing.T) {

View File

@ -41,8 +41,8 @@ func (rpcapi *RPCAPI) Unpin(in api.PinSerial, out *struct{}) error {
return rpcapi.c.Unpin(c)
}
// PinList runs Cluster.Pins().
func (rpcapi *RPCAPI) PinList(in struct{}, out *[]api.PinSerial) error {
// Pins runs Cluster.Pins().
func (rpcapi *RPCAPI) Pins(in struct{}, out *[]api.PinSerial) error {
cidList := rpcapi.c.Pins()
cidSerialList := make([]api.PinSerial, 0, len(cidList))
for _, c := range cidList {
@ -52,6 +52,16 @@ func (rpcapi *RPCAPI) PinList(in struct{}, out *[]api.PinSerial) error {
return nil
}
// PinGet runs Cluster.PinGet().
func (rpcapi *RPCAPI) PinGet(in api.PinSerial, out *api.PinSerial) error {
cidarg := in.ToPin()
pin, err := rpcapi.c.PinGet(cidarg.Cid)
if err == nil {
*out = pin.ToSerial()
}
return err
}
// Version runs Cluster.Version().
func (rpcapi *RPCAPI) Version(in struct{}, out *api.Version) error {
*out = api.Version{

View File

@ -40,7 +40,7 @@ type ipfsErr struct {
Message string
}
type mockIdResp struct {
type mockIDResp struct {
ID string
Addresses []string
}
@ -87,7 +87,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
var cidStr string
switch endp {
case "id":
resp := mockIdResp{
resp := mockIDResp{
ID: TestPeerID1.Pretty(),
Addresses: []string{
"/ip4/0.0.0.0/tcp/1234",

View File

@ -44,7 +44,7 @@ func (mock *mockService) Unpin(in api.PinSerial, out *struct{}) error {
return nil
}
func (mock *mockService) PinList(in struct{}, out *[]api.PinSerial) error {
func (mock *mockService) Pins(in struct{}, out *[]api.PinSerial) error {
*out = []api.PinSerial{
{
Cid: TestCid1,
@ -59,6 +59,14 @@ func (mock *mockService) PinList(in struct{}, out *[]api.PinSerial) error {
return nil
}
func (mock *mockService) PinGet(in api.PinSerial, out *api.PinSerial) error {
if in.Cid == ErrorCid {
return errors.New("expected error when using ErrorCid")
}
*out = in
return nil
}
func (mock *mockService) ID(in struct{}, out *api.IDSerial) error {
//_, pubkey, _ := crypto.GenerateKeyPair(
// DefaultConfigCrypto,