Merge pull request #768 from ipfs/fix/732-pin-update
Proxy: hijack pin/update
This commit is contained in:
commit
f140bdbdbc
|
@ -212,6 +212,10 @@ func New(cfg *Config) (*Server, error) {
|
|||
Path("/pin/ls").
|
||||
HandlerFunc(proxy.pinLsHandler).
|
||||
Name("PinLs")
|
||||
hijackSubrouter.
|
||||
Path("/pin/update").
|
||||
HandlerFunc(proxy.pinUpdateHandler).
|
||||
Name("PinUpdate")
|
||||
hijackSubrouter.
|
||||
Path("/add").
|
||||
HandlerFunc(proxy.addHandler).
|
||||
|
@ -284,10 +288,14 @@ func (proxy *Server) run() {
|
|||
}
|
||||
|
||||
// ipfsErrorResponder writes an http error response just like IPFS would.
|
||||
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
|
||||
func ipfsErrorResponder(w http.ResponseWriter, errMsg string, code int) {
|
||||
res := ipfsError{errMsg}
|
||||
resBytes, _ := json.Marshal(res)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
if code > 0 {
|
||||
w.WriteHeader(code)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
@ -298,7 +306,7 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
|
|||
arg := r.URL.Query().Get("arg")
|
||||
p, err := path.ParsePath(arg)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "Error parsing IPFS Path: "+err.Error())
|
||||
ipfsErrorResponder(w, "Error parsing IPFS Path: "+err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -312,7 +320,7 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
|
|||
&pin,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -343,7 +351,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
if arg != "" {
|
||||
c, err := cid.Decode(arg)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
var pin api.Pin
|
||||
|
@ -355,7 +363,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
&pin,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, fmt.Sprintf("Error: path '%s' is not pinned", arg))
|
||||
ipfsErrorResponder(w, fmt.Sprintf("Error: path '%s' is not pinned", arg), -1)
|
||||
return
|
||||
}
|
||||
pinLs.Keys[pin.Cid.String()] = ipfsPinType{
|
||||
|
@ -371,7 +379,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
&pins,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -387,18 +395,132 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
w.Write(resBytes)
|
||||
}
|
||||
|
||||
func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, span := trace.StartSpan(r.Context(), "ipfsproxy/pinUpdateHandler")
|
||||
defer span.End()
|
||||
|
||||
proxy.setHeaders(w.Header(), r)
|
||||
|
||||
// Check that we have enough arguments and mimic ipfs response when not
|
||||
q := r.URL.Query()
|
||||
args := q["arg"]
|
||||
if len(args) == 0 {
|
||||
ipfsErrorResponder(w, "argument \"from-path\" is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if len(args) == 1 {
|
||||
ipfsErrorResponder(w, "argument \"to-path\" is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
unpin := !(q.Get("unpin") == "false")
|
||||
from := args[0]
|
||||
to := args[1]
|
||||
|
||||
// Parse paths (we will need to resolve them)
|
||||
pFrom, err := path.ParsePath(from)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error parsing \"from-path\" argument: "+err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
pTo, err := path.ParsePath(to)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error parsing \"to-path\" argument: "+err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve the FROM argument
|
||||
var fromCid cid.Cid
|
||||
err = proxy.rpcClient.CallContext(
|
||||
ctx,
|
||||
"",
|
||||
"Cluster",
|
||||
"IPFSResolve",
|
||||
pFrom.String(),
|
||||
&fromCid,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
// Get existing FROM pin, and send error if not present.
|
||||
var fromPin api.Pin
|
||||
err = proxy.rpcClient.CallContext(
|
||||
ctx,
|
||||
"",
|
||||
"Cluster",
|
||||
"PinGet",
|
||||
fromCid,
|
||||
&fromPin,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare to pin the TO argument with the options from the FROM pin
|
||||
// and the allocations of the FROM pin.
|
||||
toPath := &api.PinPath{
|
||||
Path: pTo.String(),
|
||||
PinOptions: fromPin.PinOptions,
|
||||
}
|
||||
toPath.PinOptions.UserAllocations = fromPin.Allocations
|
||||
|
||||
// Pin the TO pin.
|
||||
var toPin api.Pin
|
||||
err = proxy.rpcClient.CallContext(
|
||||
ctx,
|
||||
"",
|
||||
"Cluster",
|
||||
"PinPath",
|
||||
toPath,
|
||||
&toPin,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
// If unpin != "false", unpin the FROM argument
|
||||
// (it was already resolved).
|
||||
if unpin {
|
||||
err = proxy.rpcClient.CallContext(
|
||||
ctx,
|
||||
"",
|
||||
"Cluster",
|
||||
"Unpin",
|
||||
&fromPin,
|
||||
&struct{}{},
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
res := ipfsPinOpResp{
|
||||
Pins: []string{fromCid.String(), toPin.Cid.String()},
|
||||
}
|
||||
resBytes, _ := json.Marshal(res)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
|
||||
proxy.setHeaders(w.Header(), r)
|
||||
|
||||
reader, err := r.MultipartReader()
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error reading request: "+err.Error())
|
||||
ipfsErrorResponder(w, "error reading request: "+err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
if q.Get("only-hash") == "true" {
|
||||
ipfsErrorResponder(w, "only-hash is not supported when adding to cluster")
|
||||
ipfsErrorResponder(w, "only-hash is not supported when adding to cluster", -1)
|
||||
}
|
||||
|
||||
unpin := q.Get("pin") == "false"
|
||||
|
@ -407,7 +529,7 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
|
|||
// /add params. We can parse most of them directly from the query.
|
||||
params, err := api.AddParamsFromQuery(q)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error parsing options:"+err.Error())
|
||||
ipfsErrorResponder(w, "error parsing options:"+err.Error(), -1)
|
||||
return
|
||||
}
|
||||
trickle := q.Get("trickle")
|
||||
|
@ -475,7 +597,7 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
|
|||
&peers,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
ipfsErrorResponder(w, err.Error(), -1)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -314,6 +314,108 @@ func TestIPFSProxyUnpin(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIPFSProxyPinUpdate(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
proxy, mock := testIPFSProxy(t)
|
||||
defer mock.Close()
|
||||
defer proxy.Shutdown(ctx)
|
||||
|
||||
t.Run("pin/update bad args", func(t *testing.T) {
|
||||
res, err := http.Post(fmt.Sprintf("%s/pin/update", proxyURL(proxy)), "", nil)
|
||||
if err != nil {
|
||||
t.Fatal("request should complete: ", err)
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusBadRequest {
|
||||
t.Error("request should not be successful with a no arguments")
|
||||
}
|
||||
|
||||
res2, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s", proxyURL(proxy), test.PathIPFS1), "", nil)
|
||||
if err != nil {
|
||||
t.Fatal("request should complete: ", err)
|
||||
}
|
||||
|
||||
defer res2.Body.Close()
|
||||
if res2.StatusCode != http.StatusBadRequest {
|
||||
t.Error("request should not be successful with a single argument")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("pin/update", func(t *testing.T) {
|
||||
res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.PathIPFS1, test.PathIPFS2), "", nil)
|
||||
if err != nil {
|
||||
t.Fatal("request should complete: ", err)
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
|
||||
var resp ipfsPinOpResp
|
||||
resBytes, _ := ioutil.ReadAll(res.Body)
|
||||
err = json.Unmarshal(resBytes, &resp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Pins) != 2 ||
|
||||
resp.Pins[0] != test.Cid2.String() ||
|
||||
resp.Pins[1] != test.CidResolved.String() { // always resolve to the same
|
||||
t.Errorf("bad response: %s", string(resBytes))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("pin/update check unpin happens", func(t *testing.T) {
|
||||
// passing an errorCid to unpin should return an error
|
||||
// when unpinning.
|
||||
|
||||
res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.ErrorCid, test.PathIPFS2), "", nil)
|
||||
if err != nil {
|
||||
t.Fatal("request should complete: ", err)
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusInternalServerError {
|
||||
t.Fatal("request should error")
|
||||
}
|
||||
|
||||
resBytes, _ := ioutil.ReadAll(res.Body)
|
||||
var respErr ipfsError
|
||||
err = json.Unmarshal(resBytes, &respErr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if respErr.Message != test.ErrBadCid.Error() {
|
||||
t.Error("expected a bad cid error:", respErr.Message)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("pin/update check pin happens", func(t *testing.T) {
|
||||
// passing an errorCid to pin, with unpin=false should return
|
||||
// an error when pinning
|
||||
|
||||
res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s&unpin=false", proxyURL(proxy), test.Cid1, test.ErrorCid), "", nil)
|
||||
if err != nil {
|
||||
t.Fatal("request should complete: ", err)
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusInternalServerError {
|
||||
t.Fatal("request should error")
|
||||
}
|
||||
|
||||
resBytes, _ := ioutil.ReadAll(res.Body)
|
||||
var respErr ipfsError
|
||||
err = json.Unmarshal(resBytes, &respErr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if respErr.Message != test.ErrBadCid.Error() {
|
||||
t.Error("expected a bad cid error:", respErr.Message)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestIPFSProxyPinLs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
proxy, mock := testIPFSProxy(t)
|
||||
|
|
|
@ -614,7 +614,6 @@ func (ipfs *Connector) Resolve(ctx context.Context, path string) (cid.Cid, error
|
|||
logger.Error("could not parse path: " + err.Error())
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(path, "/ipns") && validPath.IsJustAKey() {
|
||||
ci, _, err := gopath.SplitAbsPath(validPath)
|
||||
return ci, err
|
||||
|
|
10
rpc_api.go
10
rpc_api.go
|
@ -424,6 +424,16 @@ func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in cid.Cid, out *[]byte)
|
|||
return nil
|
||||
}
|
||||
|
||||
// IPFSResolve runs IPFSConnector.Resolve().
|
||||
func (rpcapi *RPCAPI) IPFSResolve(ctx context.Context, in string, out *cid.Cid) error {
|
||||
c, err := rpcapi.c.ipfs.Resolve(ctx, in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*out = c
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
Consensus component methods
|
||||
*/
|
||||
|
|
|
@ -429,6 +429,16 @@ func (mock *mockService) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) IPFSResolve(ctx context.Context, in string, out *cid.Cid) error {
|
||||
switch in {
|
||||
case ErrorCid.String(), "/ipfs/" + ErrorCid.String():
|
||||
*out = ErrorCid
|
||||
default:
|
||||
*out = Cid2
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) ConsensusAddPeer(ctx context.Context, in peer.ID, out *struct{}) error {
|
||||
return errors.New("mock rpc cannot redirect")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user