From bb82c27b25cc255183c8dbeb01da134c5db21a16 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 6 Apr 2017 04:27:02 +0200 Subject: [PATCH] Fix #87: Implement ipfs-cluster-ctl pin ls I have updated API endpoints to be /allocations rather than /pinlinst It's more self-explanatory. License: MIT Signed-off-by: Hector Sanjuan --- README.md | 3 +- api/restapi/restapi.go | 34 ++++++++++++--- api/restapi/restapi_test.go | 21 +++++++++- cluster.go | 22 +++++++++- cluster_test.go | 46 +++++++++++++++++++++ ipfs-cluster-ctl/main.go | 16 +++++--- ipfsconn/ipfshttp/ipfshttp.go | 66 +++++++++++++++++------------- ipfsconn/ipfshttp/ipfshttp_test.go | 9 ++++ rpc_api.go | 14 ++++++- test/ipfs_mock.go | 4 +- test/rpc_api_mock.go | 10 ++++- 11 files changed, 196 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index 54998d5f..1f8f9b3a 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,8 @@ This is a quick summary of API endpoints offered by the Rest API component (thes |GET |/peers |Cluster peers| |POST |/peers |Add new peer| |DELETE|/peers/{peerID} |Remove a peer| -|GET |/pinlist |List of pins in the consensus state| +|GET |/allocations |List of pins and their allocations (consensus-shared state)| +|GET |/allocations/{cid} |Show a single pin and its allocations (from the consensus-shared state)| |GET |/pins |Status of all tracked CIDs| |POST |/pins/sync |Sync all| |GET |/pins/{cid} |Status of single CID| diff --git a/api/restapi/restapi.go b/api/restapi/restapi.go index 11296a76..1585a406 100644 --- a/api/restapi/restapi.go +++ b/api/restapi/restapi.go @@ -147,12 +147,17 @@ func (rest *RESTAPI) routes() []route { }, { - "Pins", + "Allocations", "GET", - "/pinlist", - rest.pinListHandler, + "/allocations", + rest.allocationsHandler, + }, + { + "Allocation", + "GET", + "/allocations/{hash}", + rest.allocationHandler, }, - { "StatusAll", "GET", @@ -334,16 +339,32 @@ func (rest *RESTAPI) unpinHandler(w http.ResponseWriter, r *http.Request) { } } -func (rest *RESTAPI) pinListHandler(w http.ResponseWriter, r *http.Request) { +func (rest *RESTAPI) allocationsHandler(w http.ResponseWriter, r *http.Request) { var pins []api.PinSerial err := rest.rpcClient.Call("", "Cluster", - "PinList", + "Pins", struct{}{}, &pins) sendResponse(w, err, pins) } +func (rest *RESTAPI) allocationHandler(w http.ResponseWriter, r *http.Request) { + if c := parseCidOrError(w, r); c.Cid != "" { + var pin api.PinSerial + err := rest.rpcClient.Call("", + "Cluster", + "PinGet", + c, + &pin) + if err != nil { // errors here are 404s + sendErrorResponse(w, 404, err.Error()) + return + } + sendJSONResponse(w, 200, pin) + } +} + func (rest *RESTAPI) statusAllHandler(w http.ResponseWriter, r *http.Request) { var pinInfos []api.GlobalPinInfoSerial err := rest.rpcClient.Call("", @@ -464,6 +485,7 @@ func sendAcceptedResponse(w http.ResponseWriter, rpcErr error) { } func sendJSONResponse(w http.ResponseWriter, code int, resp interface{}) { + w.Header().Add("Content-Type", "application/json") w.WriteHeader(code) if err := json.NewEncoder(w).Encode(resp); err != nil { panic(err) diff --git a/api/restapi/restapi_test.go b/api/restapi/restapi_test.go index 73c910ae..4586225e 100644 --- a/api/restapi/restapi_test.go +++ b/api/restapi/restapi_test.go @@ -188,12 +188,12 @@ func TestRESTAPIUnpinEndpoint(t *testing.T) { } } -func TestRESTAPIPinListEndpoint(t *testing.T) { +func TestRESTAPIAllocationsEndpoint(t *testing.T) { rest := testRESTAPI(t) defer rest.Shutdown() var resp []api.PinSerial - makeGet(t, "/pinlist", &resp) + makeGet(t, "/allocations", &resp) if len(resp) != 3 || resp[0].Cid != test.TestCid1 || resp[1].Cid != test.TestCid2 || resp[2].Cid != test.TestCid3 { @@ -201,6 +201,23 @@ func TestRESTAPIPinListEndpoint(t *testing.T) { } } +func TestRESTAPIAllocationEndpoint(t *testing.T) { + rest := testRESTAPI(t) + defer rest.Shutdown() + + var resp api.PinSerial + makeGet(t, "/allocations/"+test.TestCid1, &resp) + if resp.Cid != test.TestCid1 { + t.Error("cid should be the same") + } + + errResp := api.Error{} + makeGet(t, "/allocations/"+test.ErrorCid, &errResp) + if errResp.Code != 404 { + t.Error("a non-pinned cid should 404") + } +} + func TestRESTAPIStatusAllEndpoint(t *testing.T) { rest := testRESTAPI(t) defer rest.Shutdown() diff --git a/cluster.go b/cluster.go index a6227602..45262053 100644 --- a/cluster.go +++ b/cluster.go @@ -732,7 +732,8 @@ func (c *Cluster) Recover(h *cid.Cid) (api.GlobalPinInfo, error) { // Pins returns the list of Cids managed by Cluster and which are part // of the current global state. This is the source of truth as to which -// pins are managed, but does not indicate if the item is successfully pinned. +// pins are managed and their allocation, but does not indicate if +// the item is successfully pinned. For that, use StatusAll(). func (c *Cluster) Pins() []api.Pin { cState, err := c.consensus.State() if err != nil { @@ -743,6 +744,25 @@ func (c *Cluster) Pins() []api.Pin { return cState.List() } +// PinGet returns information for a single Cid managed by Cluster. +// The information is obtained from the current global state. The +// returned api.Pin provides information about the allocations +// assigned for the requested Cid, but does not provide indicate if +// the item is successfully pinned. For that, use Status(). PinGet +// returns an error if the given Cid is not part of the global state. +func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) { + cState, err := c.consensus.State() + if err != nil { + logger.Error(err) + return api.Pin{}, err + } + pin := cState.Get(h) + if pin.Cid == nil { + return pin, errors.New("Cid is not part of the global state") + } + return pin, nil +} + // Pin makes the cluster Pin a Cid. This implies adding the Cid // to the IPFS Cluster peers shared-state. Depending on the cluster // pinning strategy, the PinTracker may then request the IPFS daemon diff --git a/cluster_test.go b/cluster_test.go index 83d91d4b..ecd1003c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -193,6 +193,52 @@ func TestClusterPin(t *testing.T) { } } +func TestClusterPins(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + + c, _ := cid.Decode(test.TestCid1) + err := cl.Pin(api.PinCid(c)) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + pins := cl.Pins() + if len(pins) != 1 { + t.Fatal("pin should be part of the state") + } + if !pins[0].Cid.Equals(c) || pins[0].ReplicationFactor != -1 { + t.Error("the Pin does not look as expected") + } +} + +func TestClusterPinGet(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + + c, _ := cid.Decode(test.TestCid1) + err := cl.Pin(api.PinCid(c)) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + pin, err := cl.PinGet(c) + if err != nil { + t.Fatal(err) + } + if !pin.Cid.Equals(c) || pin.ReplicationFactor != -1 { + t.Error("the Pin does not look as expected") + } + + c2, _ := cid.Decode(test.TestCid2) + _, err = cl.PinGet(c2) + if err == nil { + t.Fatal("expected an error") + } +} + func TestClusterUnpin(t *testing.T) { cl, _, _, _, _ := testingCluster(t) defer cleanRaft() diff --git a/ipfs-cluster-ctl/main.go b/ipfs-cluster-ctl/main.go index 83e59748..3f2196f1 100644 --- a/ipfs-cluster-ctl/main.go +++ b/ipfs-cluster-ctl/main.go @@ -274,13 +274,19 @@ although unpinning operations in the cluster may take longer or fail. UsageText: ` This command will list the CIDs which are tracked by IPFS Cluster and to which peers they are currently allocated. This list does not include -any monitoring information about the -merely represents the list of pins which are part of the global state of -the cluster. For specific information, use "status". +any monitoring information about the IPFS status of the CIDs, it +merely represents the list of pins which are part of the shared state of +the cluster. For IPFS-status information about the pins, use "status". `, - Flags: []cli.Flag{parseFlag(formatPin)}, + ArgsUsage: "[cid]", + Flags: []cli.Flag{parseFlag(formatPin)}, Action: func(c *cli.Context) error { - resp := request("GET", "/pinlist", nil) + cidStr := c.Args().First() + if cidStr != "" { + _, err := cid.Decode(cidStr) + checkErr("parsing cid", err) + } + resp := request("GET", "/allocations/"+cidStr, nil) formatResponse(c, resp) return nil }, diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 27a7a81d..36e0c079 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -263,6 +263,7 @@ func (ipfs *Connector) defaultHandler(w http.ResponseWriter, r *http.Request) { func ipfsErrorResponder(w http.ResponseWriter, errMsg string) { res := ipfsError{errMsg} resBytes, _ := json.Marshal(res) + w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) w.Write(resBytes) return @@ -298,6 +299,7 @@ func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Re Pins: []string{arg}, } resBytes, _ := json.Marshal(res) + w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write(resBytes) return @@ -315,45 +317,51 @@ func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) { pinLs := ipfsPinLsResp{} pinLs.Keys = make(map[string]ipfsPinType) - var pins []api.PinSerial - err := ipfs.rpcClient.Call("", - "Cluster", - "PinList", - struct{}{}, - &pins) - - if err != nil { - ipfsErrorResponder(w, err.Error()) - return - } - - for _, pin := range pins { - pinLs.Keys[pin.Cid] = ipfsPinType{ - Type: "recursive", - } - } - - argA, ok := r.URL.Query()["arg"] - if ok { - if len(argA) == 0 { - ipfsErrorResponder(w, "Error: bad argument") + q := r.URL.Query() + arg := q.Get("arg") + if arg != "" { + c, err := cid.Decode(arg) + if err != nil { + ipfsErrorResponder(w, err.Error()) return } - arg := argA[0] - singlePin, ok := pinLs.Keys[arg] - if ok { - pinLs.Keys = map[string]ipfsPinType{ - arg: singlePin, - } - } else { + var pin api.PinSerial + err = ipfs.rpcClient.Call("", + "Cluster", + "PinGet", + api.PinCid(c).ToSerial(), + &pin) + if err != nil { ipfsErrorResponder(w, fmt.Sprintf( "Error: path '%s' is not pinned", arg)) return } + pinLs.Keys[pin.Cid] = ipfsPinType{ + Type: "recursive", + } + } else { + var pins []api.PinSerial + err := ipfs.rpcClient.Call("", + "Cluster", + "Pins", + struct{}{}, + &pins) + + if err != nil { + ipfsErrorResponder(w, err.Error()) + return + } + + for _, pin := range pins { + pinLs.Keys[pin.Cid] = ipfsPinType{ + Type: "recursive", + } + } } resBytes, _ := json.Marshal(pinLs) + w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write(resBytes) } diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index 6b8f1ed6..38735191 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -327,6 +327,15 @@ func TestIPFSProxyPinLs(t *testing.T) { if len(resp.Keys) != 3 { t.Error("wrong response") } + + res3, err := http.Get(fmt.Sprintf("%s/pin/ls?arg=%s", proxyURL(ipfs), test.ErrorCid)) + if err != nil { + t.Fatal("should have succeeded: ", err) + } + defer res3.Body.Close() + if res3.StatusCode != http.StatusInternalServerError { + t.Error("the request should have failed") + } } func TestProxyAdd(t *testing.T) { diff --git a/rpc_api.go b/rpc_api.go index f8e354b4..13ff3117 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -41,8 +41,8 @@ func (rpcapi *RPCAPI) Unpin(in api.PinSerial, out *struct{}) error { return rpcapi.c.Unpin(c) } -// PinList runs Cluster.Pins(). -func (rpcapi *RPCAPI) PinList(in struct{}, out *[]api.PinSerial) error { +// Pins runs Cluster.Pins(). +func (rpcapi *RPCAPI) Pins(in struct{}, out *[]api.PinSerial) error { cidList := rpcapi.c.Pins() cidSerialList := make([]api.PinSerial, 0, len(cidList)) for _, c := range cidList { @@ -52,6 +52,16 @@ func (rpcapi *RPCAPI) PinList(in struct{}, out *[]api.PinSerial) error { return nil } +// PinGet runs Cluster.PinGet(). +func (rpcapi *RPCAPI) PinGet(in api.PinSerial, out *api.PinSerial) error { + cidarg := in.ToPin() + pin, err := rpcapi.c.PinGet(cidarg.Cid) + if err == nil { + *out = pin.ToSerial() + } + return err +} + // Version runs Cluster.Version(). func (rpcapi *RPCAPI) Version(in struct{}, out *api.Version) error { *out = api.Version{ diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index 4f9d4849..4ac2d916 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -40,7 +40,7 @@ type ipfsErr struct { Message string } -type mockIdResp struct { +type mockIDResp struct { ID string Addresses []string } @@ -87,7 +87,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { var cidStr string switch endp { case "id": - resp := mockIdResp{ + resp := mockIDResp{ ID: TestPeerID1.Pretty(), Addresses: []string{ "/ip4/0.0.0.0/tcp/1234", diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index d72ab7c1..ca65b3e9 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -44,7 +44,7 @@ func (mock *mockService) Unpin(in api.PinSerial, out *struct{}) error { return nil } -func (mock *mockService) PinList(in struct{}, out *[]api.PinSerial) error { +func (mock *mockService) Pins(in struct{}, out *[]api.PinSerial) error { *out = []api.PinSerial{ { Cid: TestCid1, @@ -59,6 +59,14 @@ func (mock *mockService) PinList(in struct{}, out *[]api.PinSerial) error { return nil } +func (mock *mockService) PinGet(in api.PinSerial, out *api.PinSerial) error { + if in.Cid == ErrorCid { + return errors.New("expected error when using ErrorCid") + } + *out = in + return nil +} + func (mock *mockService) ID(in struct{}, out *api.IDSerial) error { //_, pubkey, _ := crypto.GenerateKeyPair( // DefaultConfigCrypto,