Proxy: hijack pin/update

The IPFS pin/update endpoint takes two arguments and usually
unpins the first and pins the second. It is a bit more efficient
to do it in a single operation than two separate ones.

This will make the proxy endpoint hijack pin/update requests.

First, the FROM pin is fetched from the state. If present, we
set the options (replication factors, actual allocations) from
that pin to the new one. Then we pin the TO item and proceed
to unpin the FROM item when `unpin` is not false.

We need to support path resolving, just like IPFS, therefore
it was necessary to expose IPFSResolve() via RPC.
This commit is contained in:
Hector Sanjuan 2019-04-29 16:36:40 +02:00
parent 2144f4bd42
commit da24114ae0
4 changed files with 242 additions and 0 deletions

View File

@ -212,6 +212,10 @@ func New(cfg *Config) (*Server, error) {
Path("/pin/ls").
HandlerFunc(proxy.pinLsHandler).
Name("PinLs")
hijackSubrouter.
Path("/pin/update").
HandlerFunc(proxy.pinUpdateHandler).
Name("PinUpdate")
hijackSubrouter.
Path("/add").
HandlerFunc(proxy.addHandler).
@ -387,6 +391,122 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
w.Write(resBytes)
}
func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "ipfsproxy/pinUpdateHandler")
defer span.End()
proxy.setHeaders(w.Header(), r)
// Check that we have enough arguments and mimic ipfs response when not
q := r.URL.Query()
args := q["arg"]
if len(args) == 0 {
ipfsErrorResponder(w, "argument \"from-path\" is required")
return
}
if len(args) == 1 {
ipfsErrorResponder(w, "argument \"to-path\" is required")
return
}
unpin := !(q.Get("unpin") == "false")
from := args[0]
to := args[1]
// Parse paths (we will need to resolve them)
pFrom, err := path.ParsePath(from)
if err != nil {
ipfsErrorResponder(w, "error parsing \"from-path\" argument: "+err.Error())
return
}
pTo, err := path.ParsePath(to)
if err != nil {
ipfsErrorResponder(w, "error parsing \"to-path\" argument: "+err.Error())
return
}
// Resolve the FROM argument
var fromCid cid.Cid
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSResolve",
pFrom.String(),
&fromCid,
)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
// Get existing FROM pin, and send error if not present.
var fromPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"PinGet",
fromCid,
&fromPin,
)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
// Prepare to pin the TO argument with the options from the FROM pin
// and the allocations of the FROM pin.
toPath := &api.PinPath{
Path: pTo.String(),
PinOptions: fromPin.PinOptions,
}
toPath.PinOptions.UserAllocations = fromPin.Allocations
// Pin the TO pin.
var toPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"PinPath",
toPath,
&toPin,
)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
// If unpin != "false", unpin the FROM argument
// (it was already resolved).
if unpin {
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"Unpin",
&fromPin,
&struct{}{},
)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
}
// Mimic ipfs response by answering with the paths
// https://github.com/ipfs/go-ipfs/issues/6269
res := ipfsPinOpResp{
Pins: []string{pFrom.String(), pTo.String()},
}
resBytes, _ := json.Marshal(res)
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
}
func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header(), r)

View File

@ -314,6 +314,108 @@ func TestIPFSProxyUnpin(t *testing.T) {
}
}
func TestIPFSProxyPinUpdate(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown(ctx)
t.Run("pin/update bad args", func(t *testing.T) {
res, err := http.Post(fmt.Sprintf("%s/pin/update", proxyURL(proxy)), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusInternalServerError {
t.Error("request should not be successful with a no arguments")
}
res2, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s", proxyURL(proxy), test.PathIPFS1), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}
defer res2.Body.Close()
if res2.StatusCode != http.StatusInternalServerError {
t.Error("request should not be successful with a single argument")
}
})
t.Run("pin/update", func(t *testing.T) {
res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.PathIPFS1, test.PathIPFS2), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}
defer res.Body.Close()
var resp ipfsPinOpResp
resBytes, _ := ioutil.ReadAll(res.Body)
err = json.Unmarshal(resBytes, &resp)
if err != nil {
t.Fatal(err)
}
if len(resp.Pins) != 2 ||
resp.Pins[0] != test.PathIPFS1 ||
resp.Pins[1] != test.PathIPFS2 {
t.Errorf("bad response: %s", string(resBytes))
}
})
t.Run("pin/update check unpin happens", func(t *testing.T) {
// passing an errorCid to unpin should return an error
// when unpinning.
res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.ErrorCid, test.PathIPFS2), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusInternalServerError {
t.Fatal("request should error")
}
resBytes, _ := ioutil.ReadAll(res.Body)
var respErr ipfsError
err = json.Unmarshal(resBytes, &respErr)
if err != nil {
t.Fatal(err)
}
if respErr.Message != test.ErrBadCid.Error() {
t.Error("expected a bad cid error:", respErr.Message)
}
})
t.Run("pin/update check pin happens", func(t *testing.T) {
// passing an errorCid to pin, with unpin=false should return
// an error when pinning
res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s&unpin=false", proxyURL(proxy), test.Cid1, test.ErrorCid), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusInternalServerError {
t.Fatal("request should error")
}
resBytes, _ := ioutil.ReadAll(res.Body)
var respErr ipfsError
err = json.Unmarshal(resBytes, &respErr)
if err != nil {
t.Fatal(err)
}
if respErr.Message != test.ErrBadCid.Error() {
t.Error("expected a bad cid error:", respErr.Message)
}
})
}
func TestIPFSProxyPinLs(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)

View File

@ -424,6 +424,16 @@ func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in cid.Cid, out *[]byte)
return nil
}
// IPFSResolve runs IPFSConnector.Resolve().
func (rpcapi *RPCAPI) IPFSResolve(ctx context.Context, in string, out *cid.Cid) error {
c, err := rpcapi.c.ipfs.Resolve(ctx, in)
if err != nil {
return err
}
*out = c
return nil
}
/*
Consensus component methods
*/

View File

@ -429,6 +429,16 @@ func (mock *mockService) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta,
return nil
}
func (mock *mockService) IPFSResolve(ctx context.Context, in string, out *cid.Cid) error {
switch in {
case ErrorCid.String(), "/ipfs/" + ErrorCid.String():
*out = ErrorCid
default:
*out = Cid2
}
return nil
}
func (mock *mockService) ConsensusAddPeer(ctx context.Context, in peer.ID, out *struct{}) error {
return errors.New("mock rpc cannot redirect")
}