Merge pull request #1756 from ipfs-cluster/fix/1738-proxy-block-dag-put-intercept

ipfsproxy: intercept block/put and dag/put and pin to cluster on pin=true
This commit is contained in:
Hector Sanjuan 2022-09-09 16:44:22 +02:00 committed by GitHub
commit 920ba03b1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 520 additions and 29 deletions

View File

@ -119,7 +119,7 @@ func (proxy *Server) copyHeadersFromIPFSWithRequest(
hdrs []string,
dest http.Header, req *http.Request,
) error {
res, err := proxy.ipfsRoundTripper.RoundTrip(req)
res, err := proxy.reverseProxy.Transport.RoundTrip(req)
if err != nil {
logger.Error("error making request for header extraction to ipfs: ", err)
return err
@ -132,14 +132,14 @@ func (proxy *Server) copyHeadersFromIPFSWithRequest(
}
// setHeaders sets some headers for all hijacked endpoints:
// - First, we fix CORs headers by making an OPTIONS request to IPFS with the
// same Origin. Our objective is to get headers for non-preflight requests
// only (the ones we hijack).
// - Second, we add any of the one-time-extracted headers that we deem necessary
// or the user needs from IPFS (in case of custom headers).
// This may trigger a single POST request to ExtractHeaderPath if they
// were not extracted before or TTL has expired.
// - Third, we set our own headers.
// - First, we fix CORs headers by making an OPTIONS request to IPFS with the
// same Origin. Our objective is to get headers for non-preflight requests
// only (the ones we hijack).
// - Second, we add any of the one-time-extracted headers that we deem necessary
// or the user needs from IPFS (in case of custom headers).
// This may trigger a single POST request to ExtractHeaderPath if they
// were not extracted before or TTL has expired.
// - Third, we set our own headers.
func (proxy *Server) setHeaders(dest http.Header, srcRequest *http.Request) {
proxy.setCORSHeaders(dest, srcRequest)
proxy.setAdditionalIpfsHeaders(dest, srcRequest)

View File

@ -31,8 +31,8 @@ import (
cmd "github.com/ipfs/go-ipfs-cmds"
logging "github.com/ipfs/go-log/v2"
path "github.com/ipfs/go-path"
peer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p/core/peer"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr/net"
@ -65,9 +65,9 @@ type Server struct {
rpcClient *rpc.Client
rpcReady chan struct{}
listeners []net.Listener // proxy listener
server *http.Server // proxy server
ipfsRoundTripper http.RoundTripper // allows to talk to IPFS
listeners []net.Listener // proxy listener
server *http.Server // proxy server
reverseProxy *httputil.ReverseProxy // allows to talk to IPFS
ipfsHeadersStore sync.Map
@ -198,15 +198,15 @@ func New(cfg *Config) (*Server, error) {
reverseProxy.Transport = http.DefaultTransport
ctx, cancel := context.WithCancel(context.Background())
proxy := &Server{
ctx: ctx,
config: cfg,
cancel: cancel,
nodeAddr: nodeHTTPAddr,
nodeScheme: nodeScheme,
rpcReady: make(chan struct{}, 1),
listeners: listeners,
server: s,
ipfsRoundTripper: reverseProxy.Transport,
ctx: ctx,
config: cfg,
cancel: cancel,
nodeAddr: nodeHTTPAddr,
nodeScheme: nodeScheme,
rpcReady: make(chan struct{}, 1),
listeners: listeners,
server: s,
reverseProxy: reverseProxy,
}
// Ideally, we should only intercept POST requests, but
@ -260,6 +260,14 @@ func New(cfg *Config) (*Server, error) {
Path("/repo/gc").
HandlerFunc(proxy.repoGCHandler).
Name("RepoGC")
hijackSubrouter.
Path("/block/put").
HandlerFunc(proxy.blockPutHandler).
Name("BlockPut")
hijackSubrouter.
Path("/dag/put").
HandlerFunc(proxy.dagPutHandler).
Name("DagPut")
// Everything else goes to the IPFS daemon.
router.PathPrefix("/").Handler(reverseProxy)
@ -760,6 +768,167 @@ func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) {
}
}
type ipfsBlockPutResp struct {
Key api.Cid
Size int
}
func (proxy *Server) blockPutHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Query().Get("pin") != "true" {
proxy.reverseProxy.ServeHTTP(w, r)
return
}
proxy.setHeaders(w.Header(), r)
u2, err := url.Parse(proxy.nodeAddr)
if err != nil {
logger.Error(err)
ipfsErrorResponder(w, err.Error(), -1)
return
}
r.URL.Host = u2.Host
r.URL.Scheme = u2.Scheme
r.Host = u2.Host
r.RequestURI = ""
res, err := proxy.reverseProxy.Transport.RoundTrip(r)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
w.WriteHeader(res.StatusCode)
_, err = io.Copy(w, res.Body)
if err != nil {
logger.Error(err)
}
return
}
// Returned 200. Parse responses.
w.Header().Set("Trailer", "X-Stream-Error")
w.WriteHeader(http.StatusOK) // any errors from here go into trailers
dec := json.NewDecoder(res.Body)
enc := json.NewEncoder(w)
for {
var blockInfo ipfsBlockPutResp
err = dec.Decode(&blockInfo)
if err == io.EOF {
return
}
if err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
p := api.PinCid(blockInfo.Key)
var pinObj api.Pin
if err := proxy.rpcClient.Call(
"",
"Cluster",
"Pin",
p,
&pinObj,
); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
// keep going though blocks
}
if err := enc.Encode(blockInfo); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
}
}
type ipfsDagPutResp struct {
Cid cid.Cid
}
func (proxy *Server) dagPutHandler(w http.ResponseWriter, r *http.Request) {
// Note this mostly duplicates blockPutHandler
if r.URL.Query().Get("pin") != "true" {
proxy.reverseProxy.ServeHTTP(w, r)
return
}
proxy.setHeaders(w.Header(), r)
u2, err := url.Parse(proxy.nodeAddr)
if err != nil {
logger.Error(err)
ipfsErrorResponder(w, err.Error(), -1)
return
}
r.URL.Host = u2.Host
r.URL.Scheme = u2.Scheme
r.Host = u2.Host
newQuery := r.URL.Query()
newQuery.Set("pin", "false")
r.URL.RawQuery = newQuery.Encode()
r.RequestURI = ""
res, err := proxy.reverseProxy.Transport.RoundTrip(r)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
w.WriteHeader(res.StatusCode)
_, err = io.Copy(w, res.Body)
if err != nil {
logger.Error(err)
}
return
}
// Returned 200. Parse responses.
w.Header().Set("Trailer", "X-Stream-Error")
w.WriteHeader(http.StatusOK) // any errors from here go into trailers
dec := json.NewDecoder(res.Body)
enc := json.NewEncoder(w)
for {
var dagInfo ipfsDagPutResp
err = dec.Decode(&dagInfo)
if err == io.EOF {
return
}
if err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
p := api.PinCid(api.NewCid(dagInfo.Cid))
var pinObj api.Pin
if err := proxy.rpcClient.Call(
"",
"Cluster",
"Pin",
p,
&pinObj,
); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
// keep going though blocks
}
if err := enc.Encode(dagInfo); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
}
}
// slashHandler returns a handler which converts a /a/b/c/<argument> request
// into an /a/b/c/<argument>?arg=<argument> one. And uses the given origHandler
// for it. Our handlers expect that arguments are passed in the ?arg query

View File

@ -1,11 +1,14 @@
package ipfsproxy
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
@ -15,9 +18,12 @@ import (
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs-cluster/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
cmd "github.com/ipfs/go-ipfs-cmds"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
)
func init() {
@ -712,6 +718,252 @@ func TestProxyError(t *testing.T) {
}
}
func TestProxyBlockPut(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown(ctx)
type testcase struct {
pin bool
expectedCodec multicodec.Code
query string
}
block1 := []byte("block1")
block2 := []byte("block2")
sum1, err := multihash.Sum([]byte(block1), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
sum2, err := multihash.Sum([]byte(block2), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
testcases := []testcase{
{
pin: true,
expectedCodec: multicodec.Raw,
},
{
pin: true,
expectedCodec: multicodec.DagPb,
},
{
pin: false,
expectedCodec: multicodec.Raw,
},
{
pin: true,
expectedCodec: multicodec.DagCbor,
},
}
// Set the query string
for i, tc := range testcases {
q := url.Values{}
q.Add("pin", fmt.Sprintf("%t", tc.pin))
q.Add("cid-codec", tc.expectedCodec.String())
testcases[i].query = q.Encode()
}
reqs := make([]*http.Request, len(testcases))
// Prepare requests to be made.
for i, tc := range testcases {
var body bytes.Buffer
mpw := multipart.NewWriter(&body)
w1, err := mpw.CreateFormFile("file", "b2")
if err != nil {
t.Fatal(err)
}
w1.Write(block1)
w2, err := mpw.CreateFormFile("file", "b1")
if err != nil {
t.Fatal(err)
}
w2.Write(block2)
mpw.Close()
url := fmt.Sprintf("%s/block/put?"+tc.query, proxyURL(proxy))
req, _ := http.NewRequest("POST", url, &body)
req.Header.Set("Content-Type", mpw.FormDataContentType())
reqs[i] = req
}
for i, tc := range testcases {
t.Run(tc.query, func(t *testing.T) {
res, err := http.DefaultClient.Do(reqs[i])
if err != nil {
t.Fatal("should have succeeded: ", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
t.Fatalf("Bad response status: got = %d, want = %d", res.StatusCode, http.StatusOK)
}
var blockCids []api.Cid
var resp ipfsBlockPutResp
dec := json.NewDecoder(res.Body)
for {
err := dec.Decode(&resp)
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
blockCids = append(blockCids, resp.Key)
}
if len(blockCids) != 2 {
t.Fatal("expected 2 block cids in response", len(blockCids))
}
if mh := blockCids[0].Cid.Hash(); !bytes.Equal(mh, sum1) {
t.Error("cid1 should match the multihash of the block sent", mh, sum1)
}
if mh := blockCids[1].Cid.Hash(); !bytes.Equal(mh, sum2) {
t.Error("cid2 should match the multihash of the block sent", mh, sum2)
}
for _, c := range blockCids {
if cdc := multicodec.Code(c.Cid.Prefix().Codec); cdc != testcases[i].expectedCodec {
t.Error("wrong codec in returned cid", cdc, testcases[i].expectedCodec)
}
}
})
}
}
func TestProxyDagPut(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown(ctx)
type testcase struct {
pin bool
expectedCodec multicodec.Code
query string
}
dag1 := []byte(`{"a": 1}`)
dag2 := []byte(`{"b": 2}`)
sum1, err := multihash.Sum([]byte(dag1), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
sum2, err := multihash.Sum([]byte(dag2), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
testcases := []testcase{
{
pin: true,
expectedCodec: multicodec.Raw,
},
{
pin: true,
expectedCodec: multicodec.DagPb,
},
{
pin: false,
expectedCodec: multicodec.Raw,
},
{
pin: true,
expectedCodec: multicodec.DagCbor,
},
}
// Set the query string
for i, tc := range testcases {
q := url.Values{}
q.Add("pin", fmt.Sprintf("%t", tc.pin))
q.Add("store-codec", tc.expectedCodec.String())
testcases[i].query = q.Encode()
}
reqs := make([]*http.Request, len(testcases))
// Prepare requests to be made.
for i, tc := range testcases {
var body bytes.Buffer
mpw := multipart.NewWriter(&body)
w1, err := mpw.CreateFormFile("file", "dag1")
if err != nil {
t.Fatal(err)
}
w1.Write(dag1)
w2, err := mpw.CreateFormFile("file", "dag2")
if err != nil {
t.Fatal(err)
}
w2.Write(dag2)
mpw.Close()
url := fmt.Sprintf("%s/dag/put?"+tc.query, proxyURL(proxy))
req, _ := http.NewRequest("POST", url, &body)
req.Header.Set("Content-Type", mpw.FormDataContentType())
reqs[i] = req
}
for i, tc := range testcases {
t.Run(tc.query, func(t *testing.T) {
res, err := http.DefaultClient.Do(reqs[i])
if err != nil {
t.Fatal("should have succeeded: ", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
t.Fatalf("Bad response status: got = %d, want = %d", res.StatusCode, http.StatusOK)
}
var dagCids []cid.Cid
var resp ipfsDagPutResp
dec := json.NewDecoder(res.Body)
for {
err := dec.Decode(&resp)
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
dagCids = append(dagCids, resp.Cid)
}
if len(dagCids) != 2 {
t.Fatal("expected 2 dag cids in response", len(dagCids))
}
if mh := dagCids[0].Hash(); !bytes.Equal(mh, sum1) {
t.Error("cid1 should match the multihash of the dag sent", mh, sum1)
}
if mh := dagCids[1].Hash(); !bytes.Equal(mh, sum2) {
t.Error("cid2 should match the multihash of the dag sent", mh, sum2)
}
for _, c := range dagCids {
if cdc := multicodec.Code(c.Prefix().Codec); cdc != testcases[i].expectedCodec {
t.Error("wrong codec in returned cid", cdc, testcases[i].expectedCodec)
}
}
})
}
}
func proxyURL(c *Server) string {
addr := c.listeners[0].Addr()
return fmt.Sprintf("http://%s/api/v0", addr.String())

View File

@ -19,7 +19,6 @@ import (
"github.com/ipfs-cluster/ipfs-cluster/state"
"github.com/ipfs-cluster/ipfs-cluster/state/dsstate"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
cid "github.com/ipfs/go-cid"
cors "github.com/rs/cors"
@ -102,6 +101,10 @@ type mockBlockPutResp struct {
Key string
}
type mockDagPutResp struct {
Cid cid.Cid
}
type mockRepoGCResp struct {
Key cid.Cid `json:",omitempty"`
Error string `json:",omitempty"`
@ -371,11 +374,19 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Trailer", "X-Stream-Error")
query := r.URL.Query()
codecStr := query.Get("cid-codec")
var mc multicodec.Code
mc.Set(codecStr)
mhType := multihash.Names[query.Get("mhtype")]
mhLen, _ := strconv.Atoi(query.Get("mhLen"))
mc := multicodec.Raw
if cdcstr := query.Get("cid-codec"); cdcstr != "" {
mc.Set(cdcstr)
}
mhType := multicodec.Sha2_256
if mh := query.Get("mhtype"); mh != "" {
mhType.Set(mh)
}
mhLen := -1
if l := query.Get("mhlen"); l != "" {
mhLen, _ = strconv.Atoi(l)
}
// Get the data and retun the hash
mpr, err := r.MultipartReader()
@ -402,7 +413,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
// Parse cid from data and format and add to mock block-store
builder := cid.V1Builder{
Codec: uint64(mc),
MhType: mhType,
MhType: uint64(mhType),
MhLength: mhLen,
}
@ -433,6 +444,65 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
goto ERROR
}
w.Write(data)
case "dag/put":
// DAG-put is a fake implementation as we are not going to
// parse the input and we are just going to hash it and return
// a response.
w.Header().Set("Trailer", "X-Stream-Error")
query := r.URL.Query()
storeCodec := query.Get("store-codec")
codec := multicodec.DagJson
if storeCodec != "" {
codec.Set(storeCodec)
}
hashFunc := query.Get("hash")
hash := multicodec.Sha2_256
if hashFunc != "" {
hash.Set(hashFunc)
}
// Get the data and retun the hash
mpr, err := r.MultipartReader()
if err != nil {
goto ERROR
}
w.WriteHeader(http.StatusOK)
for {
part, err := mpr.NextPart()
if err == io.EOF {
return
}
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
data, err := io.ReadAll(part)
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
// Parse cid from data and format and add to mock block-store
builder := cid.V1Builder{
Codec: uint64(codec),
MhType: uint64(hash),
MhLength: -1,
}
c, err := builder.Sum(data)
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())
return
}
resp := mockDagPutResp{
Cid: c,
}
j, _ := json.Marshal(resp)
w.Write(j)
}
case "repo/gc":
// It assumes `/repo/gc` with parameter `stream-errors=true`
enc := json.NewEncoder(w)