From da24114ae0bfda1c051cb06d89c8db64531b75ce Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 29 Apr 2019 16:36:40 +0200 Subject: [PATCH] Proxy: hijack pin/update The IPFS pin/update endpoint takes two arguments and usually unpins the first and pins the second. It is a bit more efficient to do it in a single operation than two separate ones. This will make the proxy endpoint hijack pin/update requests. First, the FROM pin is fetched from the state. If present, we set the options (replication factors, actual allocations) from that pin to the new one. Then we pin the TO item and proceed to unpin the FROM item when `unpin` is not false. We need to support path resolving, just like IPFS, therefore it was necessary to expose IPFSResolve() via RPC. --- api/ipfsproxy/ipfsproxy.go | 120 ++++++++++++++++++++++++++++++++ api/ipfsproxy/ipfsproxy_test.go | 102 +++++++++++++++++++++++++++ rpc_api.go | 10 +++ test/rpc_api_mock.go | 10 +++ 4 files changed, 242 insertions(+) diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index fb22077a..8a3785c8 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -212,6 +212,10 @@ func New(cfg *Config) (*Server, error) { Path("/pin/ls"). HandlerFunc(proxy.pinLsHandler). Name("PinLs") + hijackSubrouter. + Path("/pin/update"). + HandlerFunc(proxy.pinUpdateHandler). + Name("PinUpdate") hijackSubrouter. Path("/add"). HandlerFunc(proxy.addHandler). @@ -387,6 +391,122 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) { w.Write(resBytes) } +func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) { + ctx, span := trace.StartSpan(r.Context(), "ipfsproxy/pinUpdateHandler") + defer span.End() + + proxy.setHeaders(w.Header(), r) + + // Check that we have enough arguments and mimic ipfs response when not + q := r.URL.Query() + args := q["arg"] + if len(args) == 0 { + ipfsErrorResponder(w, "argument \"from-path\" is required") + return + } + if len(args) == 1 { + ipfsErrorResponder(w, "argument \"to-path\" is required") + return + } + + unpin := !(q.Get("unpin") == "false") + from := args[0] + to := args[1] + + // Parse paths (we will need to resolve them) + pFrom, err := path.ParsePath(from) + if err != nil { + ipfsErrorResponder(w, "error parsing \"from-path\" argument: "+err.Error()) + return + } + + pTo, err := path.ParsePath(to) + if err != nil { + ipfsErrorResponder(w, "error parsing \"to-path\" argument: "+err.Error()) + return + } + + // Resolve the FROM argument + var fromCid cid.Cid + err = proxy.rpcClient.CallContext( + ctx, + "", + "Cluster", + "IPFSResolve", + pFrom.String(), + &fromCid, + ) + if err != nil { + ipfsErrorResponder(w, err.Error()) + return + } + + // Get existing FROM pin, and send error if not present. + var fromPin api.Pin + err = proxy.rpcClient.CallContext( + ctx, + "", + "Cluster", + "PinGet", + fromCid, + &fromPin, + ) + if err != nil { + ipfsErrorResponder(w, err.Error()) + return + } + + // Prepare to pin the TO argument with the options from the FROM pin + // and the allocations of the FROM pin. + toPath := &api.PinPath{ + Path: pTo.String(), + PinOptions: fromPin.PinOptions, + } + toPath.PinOptions.UserAllocations = fromPin.Allocations + + // Pin the TO pin. + var toPin api.Pin + err = proxy.rpcClient.CallContext( + ctx, + "", + "Cluster", + "PinPath", + toPath, + &toPin, + ) + if err != nil { + ipfsErrorResponder(w, err.Error()) + return + } + + // If unpin != "false", unpin the FROM argument + // (it was already resolved). + if unpin { + err = proxy.rpcClient.CallContext( + ctx, + "", + "Cluster", + "Unpin", + &fromPin, + &struct{}{}, + ) + if err != nil { + ipfsErrorResponder(w, err.Error()) + return + } + } + + // Mimic ipfs response by answering with the paths + // https://github.com/ipfs/go-ipfs/issues/6269 + res := ipfsPinOpResp{ + Pins: []string{pFrom.String(), pTo.String()}, + } + resBytes, _ := json.Marshal(res) + w.WriteHeader(http.StatusOK) + w.Write(resBytes) + return +} + func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) { proxy.setHeaders(w.Header(), r) diff --git a/api/ipfsproxy/ipfsproxy_test.go b/api/ipfsproxy/ipfsproxy_test.go index 3fb3cf0d..1621b51c 100644 --- a/api/ipfsproxy/ipfsproxy_test.go +++ b/api/ipfsproxy/ipfsproxy_test.go @@ -314,6 +314,108 @@ func TestIPFSProxyUnpin(t *testing.T) { } } +func TestIPFSProxyPinUpdate(t *testing.T) { + ctx := context.Background() + proxy, mock := testIPFSProxy(t) + defer mock.Close() + defer proxy.Shutdown(ctx) + + t.Run("pin/update bad args", func(t *testing.T) { + res, err := http.Post(fmt.Sprintf("%s/pin/update", proxyURL(proxy)), "", nil) + if err != nil { + t.Fatal("request should complete: ", err) + } + + defer res.Body.Close() + if res.StatusCode != http.StatusInternalServerError { + t.Error("request should not be successful with a no arguments") + } + + res2, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s", proxyURL(proxy), test.PathIPFS1), "", nil) + if err != nil { + t.Fatal("request should complete: ", err) + } + + defer res2.Body.Close() + if res2.StatusCode != http.StatusInternalServerError { + t.Error("request should not be successful with a single argument") + } + }) + + t.Run("pin/update", func(t *testing.T) { + res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.PathIPFS1, test.PathIPFS2), "", nil) + if err != nil { + t.Fatal("request should complete: ", err) + } + + defer res.Body.Close() + + var resp ipfsPinOpResp + resBytes, _ := ioutil.ReadAll(res.Body) + err = json.Unmarshal(resBytes, &resp) + if err != nil { + t.Fatal(err) + } + if len(resp.Pins) != 2 || + resp.Pins[0] != test.PathIPFS1 || + resp.Pins[1] != test.PathIPFS2 { + t.Errorf("bad response: %s", string(resBytes)) + } + }) + + t.Run("pin/update check unpin happens", func(t *testing.T) { + // passing an errorCid to unpin should return an error + // when unpinning. + + res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.ErrorCid, test.PathIPFS2), "", nil) + if err != nil { + t.Fatal("request should complete: ", err) + } + + defer res.Body.Close() + if res.StatusCode != http.StatusInternalServerError { + t.Fatal("request should error") + } + + resBytes, _ := ioutil.ReadAll(res.Body) + var respErr ipfsError + err = json.Unmarshal(resBytes, &respErr) + if err != nil { + t.Fatal(err) + } + + if respErr.Message != test.ErrBadCid.Error() { + t.Error("expected a bad cid error:", respErr.Message) + } + }) + + t.Run("pin/update check pin happens", func(t *testing.T) { + // passing an errorCid to pin, with unpin=false should return + // an error when pinning + + res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s&unpin=false", proxyURL(proxy), test.Cid1, test.ErrorCid), "", nil) + if err != nil { + t.Fatal("request should complete: ", err) + } + + defer res.Body.Close() + if res.StatusCode != http.StatusInternalServerError { + t.Fatal("request should error") + } + + resBytes, _ := ioutil.ReadAll(res.Body) + var respErr ipfsError + err = json.Unmarshal(resBytes, &respErr) + if err != nil { + t.Fatal(err) + } + + if respErr.Message != test.ErrBadCid.Error() { + t.Error("expected a bad cid error:", respErr.Message) + } + }) +} + func TestIPFSProxyPinLs(t *testing.T) { ctx := context.Background() proxy, mock := testIPFSProxy(t) diff --git a/rpc_api.go b/rpc_api.go index fa36f1ef..cf044c3d 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -424,6 +424,16 @@ func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in cid.Cid, out *[]byte) return nil } +// IPFSResolve runs IPFSConnector.Resolve(). +func (rpcapi *RPCAPI) IPFSResolve(ctx context.Context, in string, out *cid.Cid) error { + c, err := rpcapi.c.ipfs.Resolve(ctx, in) + if err != nil { + return err + } + *out = c + return nil +} + /* Consensus component methods */ diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 52842f66..233d16f0 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -429,6 +429,16 @@ func (mock *mockService) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta, return nil } +func (mock *mockService) IPFSResolve(ctx context.Context, in string, out *cid.Cid) error { + switch in { + case ErrorCid.String(), "/ipfs/" + ErrorCid.String(): + *out = ErrorCid + default: + *out = Cid2 + } + return nil +} + func (mock *mockService) ConsensusAddPeer(ctx context.Context, in peer.ID, out *struct{}) error { return errors.New("mock rpc cannot redirect") }