ipfsproxy: intercept block/put and dag/put and pin to cluster on pin=true

This fixes #1738. Tests still missing
This commit is contained in:
Hector Sanjuan 2022-09-06 15:09:14 +02:00
parent 5452b59a2e
commit 6ce90dfe47
2 changed files with 186 additions and 22 deletions

View File

@ -119,7 +119,7 @@ func (proxy *Server) copyHeadersFromIPFSWithRequest(
hdrs []string,
dest http.Header, req *http.Request,
) error {
res, err := proxy.ipfsRoundTripper.RoundTrip(req)
res, err := proxy.reverseProxy.Transport.RoundTrip(req)
if err != nil {
logger.Error("error making request for header extraction to ipfs: ", err)
return err
@ -132,14 +132,14 @@ func (proxy *Server) copyHeadersFromIPFSWithRequest(
}
// setHeaders sets some headers for all hijacked endpoints:
// - First, we fix CORs headers by making an OPTIONS request to IPFS with the
// same Origin. Our objective is to get headers for non-preflight requests
// only (the ones we hijack).
// - Second, we add any of the one-time-extracted headers that we deem necessary
// or the user needs from IPFS (in case of custom headers).
// This may trigger a single POST request to ExtractHeaderPath if they
// were not extracted before or TTL has expired.
// - Third, we set our own headers.
// - First, we fix CORs headers by making an OPTIONS request to IPFS with the
// same Origin. Our objective is to get headers for non-preflight requests
// only (the ones we hijack).
// - Second, we add any of the one-time-extracted headers that we deem necessary
// or the user needs from IPFS (in case of custom headers).
// This may trigger a single POST request to ExtractHeaderPath if they
// were not extracted before or TTL has expired.
// - Third, we set our own headers.
func (proxy *Server) setHeaders(dest http.Header, srcRequest *http.Request) {
proxy.setCORSHeaders(dest, srcRequest)
proxy.setAdditionalIpfsHeaders(dest, srcRequest)

View File

@ -31,8 +31,8 @@ import (
cmd "github.com/ipfs/go-ipfs-cmds"
logging "github.com/ipfs/go-log/v2"
path "github.com/ipfs/go-path"
peer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p/core/peer"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr/net"
@ -65,9 +65,9 @@ type Server struct {
rpcClient *rpc.Client
rpcReady chan struct{}
listeners []net.Listener // proxy listener
server *http.Server // proxy server
ipfsRoundTripper http.RoundTripper // allows to talk to IPFS
listeners []net.Listener // proxy listener
server *http.Server // proxy server
reverseProxy *httputil.ReverseProxy // allows to talk to IPFS
ipfsHeadersStore sync.Map
@ -198,15 +198,15 @@ func New(cfg *Config) (*Server, error) {
reverseProxy.Transport = http.DefaultTransport
ctx, cancel := context.WithCancel(context.Background())
proxy := &Server{
ctx: ctx,
config: cfg,
cancel: cancel,
nodeAddr: nodeHTTPAddr,
nodeScheme: nodeScheme,
rpcReady: make(chan struct{}, 1),
listeners: listeners,
server: s,
ipfsRoundTripper: reverseProxy.Transport,
ctx: ctx,
config: cfg,
cancel: cancel,
nodeAddr: nodeHTTPAddr,
nodeScheme: nodeScheme,
rpcReady: make(chan struct{}, 1),
listeners: listeners,
server: s,
reverseProxy: reverseProxy,
}
// Ideally, we should only intercept POST requests, but
@ -260,6 +260,14 @@ func New(cfg *Config) (*Server, error) {
Path("/repo/gc").
HandlerFunc(proxy.repoGCHandler).
Name("RepoGC")
hijackSubrouter.
Path("/block/put").
HandlerFunc(proxy.blockPutHandler).
Name("BlockPut")
hijackSubrouter.
Path("/dag/put").
HandlerFunc(proxy.dagPutHandler).
Name("DagPut")
// Everything else goes to the IPFS daemon.
router.PathPrefix("/").Handler(reverseProxy)
@ -760,6 +768,162 @@ func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) {
}
}
type ipfsBlockPutResp struct {
Key api.Cid
Size int
}
func (proxy *Server) blockPutHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Query().Get("pin") != "true" {
proxy.reverseProxy.ServeHTTP(w, r)
return
}
u2, err := url.Parse(proxy.nodeAddr)
if err != nil {
logger.Error(err)
ipfsErrorResponder(w, err.Error(), -1)
return
}
r.URL.Host = u2.Host
r.URL.Scheme = u2.Scheme
r.Host = u2.Host
r.RequestURI = ""
res, err := proxy.reverseProxy.Transport.RoundTrip(r)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
w.WriteHeader(res.StatusCode)
_, err = io.Copy(w, res.Body)
if err != nil {
logger.Error(err)
}
return
}
// Returned 200. Parse responses.
w.Header().Set("Trailer", "X-Stream-Error")
w.WriteHeader(http.StatusOK) // any errors from here go into trailers
dec := json.NewDecoder(res.Body)
enc := json.NewEncoder(w)
for {
var res ipfsBlockPutResp
err = dec.Decode(&res)
if err == io.EOF {
return
}
if err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
p := api.PinCid(res.Key)
var pinObj api.Pin
if err := proxy.rpcClient.Call(
"",
"Cluster",
"Pin",
p,
&pinObj,
); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
// keep going though blocks
}
if err := enc.Encode(res); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
}
}
type ipfsDagPutResp struct {
Cid cid.Cid
}
func (proxy *Server) dagPutHandler(w http.ResponseWriter, r *http.Request) {
// Note this mostly duplicates blockPutHandler
if r.URL.Query().Get("pin") != "true" {
proxy.reverseProxy.ServeHTTP(w, r)
return
}
u2, err := url.Parse(proxy.nodeAddr)
if err != nil {
logger.Error(err)
ipfsErrorResponder(w, err.Error(), -1)
return
}
r.URL.Host = u2.Host
r.URL.Scheme = u2.Scheme
r.Host = u2.Host
r.RequestURI = ""
res, err := proxy.reverseProxy.Transport.RoundTrip(r)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
w.WriteHeader(res.StatusCode)
_, err = io.Copy(w, res.Body)
if err != nil {
logger.Error(err)
}
return
}
// Returned 200. Parse responses.
w.Header().Set("Trailer", "X-Stream-Error")
w.WriteHeader(http.StatusOK) // any errors from here go into trailers
dec := json.NewDecoder(res.Body)
enc := json.NewEncoder(w)
for {
var res ipfsDagPutResp
err = dec.Decode(&res)
if err == io.EOF {
return
}
if err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
p := api.PinCid(api.NewCid(res.Cid))
var pinObj api.Pin
if err := proxy.rpcClient.Call(
"",
"Cluster",
"Pin",
p,
&pinObj,
); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
// keep going though blocks
}
if err := enc.Encode(res); err != nil {
logger.Error(err)
w.Header().Add("X-Stream-Error", err.Error())
return
}
}
}
// slashHandler returns a handler which converts a /a/b/c/<argument> request
// into an /a/b/c/<argument>?arg=<argument> one. And uses the given origHandler
// for it. Our handlers expect that arguments are passed in the ?arg query