Issue #453 Extract the IPFS Proxy from ipfshttp
Extract the IPFS Proxy from ipfshttp and make it an api module The `ipfshttp` IPFSConnector implementation includes the so called IPFS Proxy. An endpoint which offers an IPFS API, hijacking some interesting requests and forwarding the rest to the ipfs daemon. `ipfshttp` should contain an implementation of IPFSConnector whose only task should be to talk to IPFS A new module should be created, `api/ipfsproxy`, an API Component implementation for Cluster. The whole proxy code should be moved here. License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com>
This commit is contained in:
parent
f65349e9c8
commit
155a65cac3
64
api/ipfsproxy/config.go
Normal file
64
api/ipfsproxy/config.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package ipfsproxy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/config"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// Config is used to initialize a Connector and allows to customize
|
||||
// its behaviour. It implements the config.ComponentConfig interface.
|
||||
type Config struct {
|
||||
config.Saver
|
||||
|
||||
// Listen parameters for the IPFS Proxy. Used by the IPFS
|
||||
// connector component.
|
||||
ProxyAddr ma.Multiaddr
|
||||
|
||||
// Host/Port for the IPFS daemon.
|
||||
NodeAddr ma.Multiaddr
|
||||
|
||||
// Maximum duration before timing out reading a full request
|
||||
ProxyReadTimeout time.Duration
|
||||
// Maximum duration before timing out reading the headers of a request
|
||||
ProxyReadHeaderTimeout time.Duration
|
||||
|
||||
// Maximum duration before timing out write of the response
|
||||
ProxyWriteTimeout time.Duration
|
||||
|
||||
// Server-side amount of time a Keep-Alive connection will be
|
||||
// kept idle before being reused
|
||||
ProxyIdleTimeout time.Duration
|
||||
}
|
||||
|
||||
// Validate checks that the fields of this Config have sensible values,
|
||||
// at least in appearance.
|
||||
func (cfg *Config) Validate() error {
|
||||
var err error
|
||||
if cfg.ProxyAddr == nil {
|
||||
err = errors.New("ipfshttp.proxy_listen_multiaddress not set")
|
||||
}
|
||||
if cfg.NodeAddr == nil {
|
||||
err = errors.New("ipfshttp.node_multiaddress not set")
|
||||
}
|
||||
|
||||
if cfg.ProxyReadTimeout < 0 {
|
||||
err = errors.New("ipfshttp.proxy_read_timeout is invalid")
|
||||
}
|
||||
|
||||
if cfg.ProxyReadHeaderTimeout < 0 {
|
||||
err = errors.New("ipfshttp.proxy_read_header_timeout is invalid")
|
||||
}
|
||||
|
||||
if cfg.ProxyWriteTimeout < 0 {
|
||||
err = errors.New("ipfshttp.proxy_write_timeout is invalid")
|
||||
}
|
||||
|
||||
if cfg.ProxyIdleTimeout < 0 {
|
||||
err = errors.New("ipfshttp.proxy_idle_timeout invalid")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
464
api/ipfsproxy/ipfsproxy.go
Normal file
464
api/ipfsproxy/ipfsproxy.go
Normal file
|
@ -0,0 +1,464 @@
|
|||
package ipfsproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/rpc"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/ipfs/ipfs-cluster/adder/adderutils"
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/rpcutil"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
)
|
||||
|
||||
// DNSTimeout is used when resolving DNS multiaddresses in this module
|
||||
var DNSTimeout = 5 * time.Second
|
||||
|
||||
var logger = logging.Logger("ipfsproxy")
|
||||
|
||||
// IPFSProxy offers an IPFS API, hijacking some interesting requests
|
||||
// and forwarding the rest to the ipfs daemon
|
||||
// it proxies HTTP requests to the configured IPFS
|
||||
// daemon. It is able to intercept these requests though, and
|
||||
// perform extra operations on them.
|
||||
type IPFSProxy struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
config *Config
|
||||
nodeAddr string
|
||||
|
||||
rpcClient *rpc.Client
|
||||
rpcReady chan struct{}
|
||||
|
||||
listener net.Listener // proxy listener
|
||||
server *http.Server // proxy server
|
||||
|
||||
shutdownLock sync.Mutex
|
||||
shutdown bool
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type ipfsError struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
type ipfsPinType struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
type ipfsPinLsResp struct {
|
||||
Keys map[string]ipfsPinType
|
||||
}
|
||||
|
||||
type ipfsPinOpResp struct {
|
||||
Pins []string
|
||||
}
|
||||
|
||||
// From https://github.com/ipfs/go-ipfs/blob/master/core/coreunix/add.go#L49
|
||||
type ipfsAddResp struct {
|
||||
Name string
|
||||
Hash string `json:",omitempty"`
|
||||
Bytes int64 `json:",omitempty"`
|
||||
Size string `json:",omitempty"`
|
||||
}
|
||||
|
||||
func NewIPFSProxy(cfg *Config) (*IPFSProxy, error) {
|
||||
err := cfg.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeMAddr := cfg.NodeAddr
|
||||
// dns multiaddresses need to be resolved first
|
||||
if madns.Matches(nodeMAddr) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), DNSTimeout)
|
||||
defer cancel()
|
||||
resolvedAddrs, err := madns.Resolve(ctx, cfg.NodeAddr)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
nodeMAddr = resolvedAddrs[0]
|
||||
}
|
||||
|
||||
_, nodeAddr, err := manet.DialArgs(nodeMAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
proxyNet, proxyAddr, err := manet.DialArgs(cfg.ProxyAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l, err := net.Listen(proxyNet, proxyAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeHTTPAddr := "http://" + nodeAddr
|
||||
proxyURL, err := url.Parse(nodeHTTPAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
proxyHandler := httputil.NewSingleHostReverseProxy(proxyURL)
|
||||
|
||||
smux := http.NewServeMux()
|
||||
s := &http.Server{
|
||||
ReadTimeout: cfg.ProxyReadTimeout,
|
||||
WriteTimeout: cfg.ProxyWriteTimeout,
|
||||
ReadHeaderTimeout: cfg.ProxyReadHeaderTimeout,
|
||||
IdleTimeout: cfg.ProxyIdleTimeout,
|
||||
Handler: smux,
|
||||
}
|
||||
|
||||
// See: https://github.com/ipfs/go-ipfs/issues/5168
|
||||
// See: https://github.com/ipfs/ipfs-cluster/issues/548
|
||||
// on why this is re-enabled.
|
||||
s.SetKeepAlivesEnabled(false) // A reminder that this can be changed
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
ipfs := &IPFSProxy{
|
||||
ctx: ctx,
|
||||
config: cfg,
|
||||
cancel: cancel,
|
||||
nodeAddr: nodeAddr,
|
||||
rpcReady: make(chan struct{}, 1),
|
||||
listener: l,
|
||||
server: s,
|
||||
}
|
||||
smux.Handle("/", proxyHandler)
|
||||
smux.HandleFunc("/api/v0/pin/add/", ipfs.pinHandler)
|
||||
smux.HandleFunc("/api/v0/pin/rm/", ipfs.unpinHandler)
|
||||
smux.HandleFunc("/api/v0/pin/ls", ipfs.pinLsHandler) // required to handle /pin/ls for all pins
|
||||
smux.HandleFunc("/api/v0/pin/ls/", ipfs.pinLsHandler)
|
||||
smux.HandleFunc("/api/v0/add", ipfs.addHandler)
|
||||
smux.HandleFunc("/api/v0/add/", ipfs.addHandler)
|
||||
smux.HandleFunc("/api/v0/repo/stat", ipfs.repoStatHandler)
|
||||
smux.HandleFunc("/api/v0/repo/stat/", ipfs.repoStatHandler)
|
||||
|
||||
go ipfs.run()
|
||||
return ipfs, nil
|
||||
}
|
||||
|
||||
// SetClient makes the component ready to perform RPC
|
||||
// requests.
|
||||
func (ipfs *IPFSProxy) SetClient(c *rpc.Client) {
|
||||
ipfs.rpcClient = c
|
||||
ipfs.rpcReady <- struct{}{}
|
||||
}
|
||||
|
||||
// Shutdown stops any listeners and stops the component from taking
|
||||
// any requests.
|
||||
func (ipfs *IPFSProxy) Shutdown() error {
|
||||
ipfs.shutdownLock.Lock()
|
||||
defer ipfs.shutdownLock.Unlock()
|
||||
|
||||
if ipfs.shutdown {
|
||||
logger.Debug("already shutdown")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("stopping IPFS Proxy")
|
||||
|
||||
ipfs.cancel()
|
||||
close(ipfs.rpcReady)
|
||||
ipfs.server.SetKeepAlivesEnabled(false)
|
||||
ipfs.listener.Close()
|
||||
|
||||
ipfs.wg.Wait()
|
||||
ipfs.shutdown = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// launches proxy and connects all ipfs daemons when
|
||||
// we receive the rpcReady signal.
|
||||
func (ipfs *IPFSProxy) run() {
|
||||
<-ipfs.rpcReady
|
||||
|
||||
// Do not shutdown while launching threads
|
||||
// -- prevents race conditions with ipfs.wg.
|
||||
ipfs.shutdownLock.Lock()
|
||||
defer ipfs.shutdownLock.Unlock()
|
||||
|
||||
// This launches the proxy
|
||||
ipfs.wg.Add(1)
|
||||
go func() {
|
||||
defer ipfs.wg.Done()
|
||||
logger.Infof(
|
||||
"IPFS Proxy: %s -> %s",
|
||||
ipfs.config.ProxyAddr,
|
||||
ipfs.config.NodeAddr,
|
||||
)
|
||||
err := ipfs.server.Serve(ipfs.listener) // hangs here
|
||||
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
||||
logger.Error(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Handlers
|
||||
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
|
||||
res := ipfsError{errMsg}
|
||||
resBytes, _ := json.Marshal(res)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
|
||||
arg, ok := extractArgument(r.URL)
|
||||
if !ok {
|
||||
ipfsErrorResponder(w, "Error: bad argument")
|
||||
return
|
||||
}
|
||||
c, err := cid.Decode(arg)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "Error parsing CID: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
err = ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
op,
|
||||
api.PinCid(c).ToSerial(),
|
||||
&struct{}{},
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
res := ipfsPinOpResp{
|
||||
Pins: []string{arg},
|
||||
}
|
||||
resBytes, _ := json.Marshal(res)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (ipfs *Connector) pinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ipfs.pinOpHandler("Pin", w, r)
|
||||
}
|
||||
|
||||
func (ipfs *Connector) unpinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ipfs.pinOpHandler("Unpin", w, r)
|
||||
}
|
||||
|
||||
func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
pinLs := ipfsPinLsResp{}
|
||||
pinLs.Keys = make(map[string]ipfsPinType)
|
||||
|
||||
arg, ok := extractArgument(r.URL)
|
||||
if ok {
|
||||
c, err := cid.Decode(arg)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
var pin api.PinSerial
|
||||
err = ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
"PinGet",
|
||||
api.PinCid(c).ToSerial(),
|
||||
&pin,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, fmt.Sprintf("Error: path '%s' is not pinned", arg))
|
||||
return
|
||||
}
|
||||
pinLs.Keys[pin.Cid] = ipfsPinType{
|
||||
Type: "recursive",
|
||||
}
|
||||
} else {
|
||||
var pins []api.PinSerial
|
||||
err := ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
"Pins",
|
||||
struct{}{},
|
||||
&pins,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for _, pin := range pins {
|
||||
pinLs.Keys[pin.Cid] = ipfsPinType{
|
||||
Type: "recursive",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resBytes, _ := json.Marshal(pinLs)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
}
|
||||
|
||||
func (ipfs *Connector) addHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reader, err := r.MultipartReader()
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error reading request: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
if q.Get("only-hash") == "true" {
|
||||
ipfsErrorResponder(w, "only-hash is not supported when adding to cluster")
|
||||
}
|
||||
|
||||
unpin := q.Get("pin") == "false"
|
||||
|
||||
// Luckily, most IPFS add query params are compatible with cluster's
|
||||
// /add params. We can parse most of them directly from the query.
|
||||
params, err := api.AddParamsFromQuery(q)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error parsing options:"+err.Error())
|
||||
return
|
||||
}
|
||||
trickle := q.Get("trickle")
|
||||
if trickle == "true" {
|
||||
params.Layout = "trickle"
|
||||
}
|
||||
|
||||
logger.Warningf("Proxy/add does not support all IPFS params. Current options: %+v", params)
|
||||
|
||||
outputTransform := func(in *api.AddedOutput) interface{} {
|
||||
r := &ipfsAddResp{
|
||||
Name: in.Name,
|
||||
Hash: in.Cid,
|
||||
Bytes: int64(in.Bytes),
|
||||
}
|
||||
if in.Size != 0 {
|
||||
r.Size = strconv.FormatUint(in.Size, 10)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
root, err := adderutils.AddMultipartHTTPHandler(
|
||||
ipfs.ctx,
|
||||
ipfs.rpcClient,
|
||||
params,
|
||||
reader,
|
||||
w,
|
||||
outputTransform,
|
||||
)
|
||||
|
||||
// any errors have been sent as Trailer
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !unpin {
|
||||
return
|
||||
}
|
||||
|
||||
// Unpin because the user doesn't want to pin
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
err = ipfs.rpcClient.CallContext(
|
||||
ipfs.ctx,
|
||||
"",
|
||||
"Cluster",
|
||||
"Unpin",
|
||||
api.PinCid(root).ToSerial(),
|
||||
&struct{}{},
|
||||
)
|
||||
if err != nil {
|
||||
w.Header().Set("X-Stream-Error", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ipfs *Connector) repoStatHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var peers []peer.ID
|
||||
err := ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
"ConsensusPeers",
|
||||
struct{}{},
|
||||
&peers,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ctxs, cancels := rpcutil.CtxsWithTimeout(ipfs.ctx, len(peers), ipfs.config.IPFSRequestTimeout)
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
||||
repoStats := make([]api.IPFSRepoStat, len(peers), len(peers))
|
||||
repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats))
|
||||
for i := range repoStats {
|
||||
repoStatsIfaces[i] = &repoStats[i]
|
||||
}
|
||||
|
||||
errs := ipfs.rpcClient.MultiCall(
|
||||
ctxs,
|
||||
peers,
|
||||
"Cluster",
|
||||
"IPFSRepoStat",
|
||||
struct{}{},
|
||||
repoStatsIfaces,
|
||||
)
|
||||
|
||||
totalStats := api.IPFSRepoStat{}
|
||||
|
||||
for i, err := range errs {
|
||||
if err != nil {
|
||||
logger.Errorf("%s repo/stat errored: %s", peers[i], err)
|
||||
continue
|
||||
}
|
||||
totalStats.RepoSize += repoStats[i].RepoSize
|
||||
totalStats.StorageMax += repoStats[i].StorageMax
|
||||
}
|
||||
|
||||
resBytes, _ := json.Marshal(totalStats)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
// extractArgument extracts the cid argument from a url.URL, either via
|
||||
// the query string parameters or from the url path itself.
|
||||
func extractArgument(u *url.URL) (string, bool) {
|
||||
arg := u.Query().Get("arg")
|
||||
if arg != "" {
|
||||
return arg, true
|
||||
}
|
||||
|
||||
p := strings.TrimPrefix(u.Path, "/api/v0/")
|
||||
segs := strings.Split(p, "/")
|
||||
|
||||
if len(segs) > 2 {
|
||||
warnMsg := "You are using an undocumented form of the IPFS API."
|
||||
warnMsg += "Consider passing your command arguments"
|
||||
warnMsg += "with the '?arg=' query parameter"
|
||||
logger.Warning(warnMsg)
|
||||
return segs[len(segs)-1], true
|
||||
}
|
||||
return "", false
|
||||
}
|
|
@ -10,18 +10,13 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/adder/adderutils"
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/rpcutil"
|
||||
|
||||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
|
@ -44,14 +39,9 @@ var logger = logging.Logger("ipfshttp")
|
|||
var updateMetricMod = 10
|
||||
|
||||
// Connector implements the IPFSConnector interface
|
||||
// and provides a component which does two tasks:
|
||||
//
|
||||
// On one side, it proxies HTTP requests to the configured IPFS
|
||||
// daemon. It is able to intercept these requests though, and
|
||||
// perform extra operations on them.
|
||||
//
|
||||
// On the other side, it is used to perform on-demand requests
|
||||
// against the configured IPFS daemom (such as a pin request).
|
||||
// and provides a component which is used to perform
|
||||
// on-demand requests against the configured IPFS daemom
|
||||
// (such as a pin request).
|
||||
type Connector struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
@ -59,14 +49,10 @@ type Connector struct {
|
|||
config *Config
|
||||
nodeAddr string
|
||||
|
||||
handlers map[string]func(http.ResponseWriter, *http.Request)
|
||||
|
||||
rpcClient *rpc.Client
|
||||
rpcReady chan struct{}
|
||||
|
||||
listener net.Listener // proxy listener
|
||||
server *http.Server // proxy server
|
||||
client *http.Client // client to ipfs daemon
|
||||
client *http.Client // client to ipfs daemon
|
||||
|
||||
updateMetricMutex sync.Mutex
|
||||
updateMetricCount int
|
||||
|
@ -88,23 +74,11 @@ type ipfsPinLsResp struct {
|
|||
Keys map[string]ipfsPinType
|
||||
}
|
||||
|
||||
type ipfsPinOpResp struct {
|
||||
Pins []string
|
||||
}
|
||||
|
||||
type ipfsIDResp struct {
|
||||
ID string
|
||||
Addresses []string
|
||||
}
|
||||
|
||||
// From https://github.com/ipfs/go-ipfs/blob/master/core/coreunix/add.go#L49
|
||||
type ipfsAddResp struct {
|
||||
Name string
|
||||
Hash string `json:",omitempty"`
|
||||
Bytes int64 `json:",omitempty"`
|
||||
Size string `json:",omitempty"`
|
||||
}
|
||||
|
||||
type ipfsSwarmPeersResp struct {
|
||||
Peers []ipfsPeer
|
||||
}
|
||||
|
@ -142,38 +116,6 @@ func NewConnector(cfg *Config) (*Connector, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
proxyNet, proxyAddr, err := manet.DialArgs(cfg.ProxyAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l, err := net.Listen(proxyNet, proxyAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeHTTPAddr := "http://" + nodeAddr
|
||||
proxyURL, err := url.Parse(nodeHTTPAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
proxyHandler := httputil.NewSingleHostReverseProxy(proxyURL)
|
||||
|
||||
smux := http.NewServeMux()
|
||||
s := &http.Server{
|
||||
ReadTimeout: cfg.ProxyReadTimeout,
|
||||
WriteTimeout: cfg.ProxyWriteTimeout,
|
||||
ReadHeaderTimeout: cfg.ProxyReadHeaderTimeout,
|
||||
IdleTimeout: cfg.ProxyIdleTimeout,
|
||||
Handler: smux,
|
||||
}
|
||||
|
||||
// See: https://github.com/ipfs/go-ipfs/issues/5168
|
||||
// See: https://github.com/ipfs/ipfs-cluster/issues/548
|
||||
// on why this is re-enabled.
|
||||
s.SetKeepAlivesEnabled(false) // A reminder that this can be changed
|
||||
|
||||
c := &http.Client{} // timeouts are handled by context timeouts
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -183,23 +125,10 @@ func NewConnector(cfg *Config) (*Connector, error) {
|
|||
config: cfg,
|
||||
cancel: cancel,
|
||||
nodeAddr: nodeAddr,
|
||||
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
|
||||
rpcReady: make(chan struct{}, 1),
|
||||
listener: l,
|
||||
server: s,
|
||||
client: c,
|
||||
}
|
||||
|
||||
smux.Handle("/", proxyHandler)
|
||||
smux.HandleFunc("/api/v0/pin/add/", ipfs.pinHandler)
|
||||
smux.HandleFunc("/api/v0/pin/rm/", ipfs.unpinHandler)
|
||||
smux.HandleFunc("/api/v0/pin/ls", ipfs.pinLsHandler) // required to handle /pin/ls for all pins
|
||||
smux.HandleFunc("/api/v0/pin/ls/", ipfs.pinLsHandler)
|
||||
smux.HandleFunc("/api/v0/add", ipfs.addHandler)
|
||||
smux.HandleFunc("/api/v0/add/", ipfs.addHandler)
|
||||
smux.HandleFunc("/api/v0/repo/stat", ipfs.repoStatHandler)
|
||||
smux.HandleFunc("/api/v0/repo/stat/", ipfs.repoStatHandler)
|
||||
|
||||
go ipfs.run()
|
||||
return ipfs, nil
|
||||
}
|
||||
|
@ -214,21 +143,6 @@ func (ipfs *Connector) run() {
|
|||
ipfs.shutdownLock.Lock()
|
||||
defer ipfs.shutdownLock.Unlock()
|
||||
|
||||
// This launches the proxy
|
||||
ipfs.wg.Add(1)
|
||||
go func() {
|
||||
defer ipfs.wg.Done()
|
||||
logger.Infof(
|
||||
"IPFS Proxy: %s -> %s",
|
||||
ipfs.config.ProxyAddr,
|
||||
ipfs.config.NodeAddr,
|
||||
)
|
||||
err := ipfs.server.Serve(ipfs.listener) // hangs here
|
||||
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
||||
logger.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// This runs ipfs swarm connect to the daemons of other cluster members
|
||||
ipfs.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -250,264 +164,6 @@ func (ipfs *Connector) run() {
|
|||
}()
|
||||
}
|
||||
|
||||
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
|
||||
res := ipfsError{errMsg}
|
||||
resBytes, _ := json.Marshal(res)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
|
||||
arg, ok := extractArgument(r.URL)
|
||||
if !ok {
|
||||
ipfsErrorResponder(w, "Error: bad argument")
|
||||
return
|
||||
}
|
||||
c, err := cid.Decode(arg)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "Error parsing CID: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
err = ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
op,
|
||||
api.PinCid(c).ToSerial(),
|
||||
&struct{}{},
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
res := ipfsPinOpResp{
|
||||
Pins: []string{arg},
|
||||
}
|
||||
resBytes, _ := json.Marshal(res)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (ipfs *Connector) pinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ipfs.pinOpHandler("Pin", w, r)
|
||||
}
|
||||
|
||||
func (ipfs *Connector) unpinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ipfs.pinOpHandler("Unpin", w, r)
|
||||
}
|
||||
|
||||
func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
pinLs := ipfsPinLsResp{}
|
||||
pinLs.Keys = make(map[string]ipfsPinType)
|
||||
|
||||
arg, ok := extractArgument(r.URL)
|
||||
if ok {
|
||||
c, err := cid.Decode(arg)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
var pin api.PinSerial
|
||||
err = ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
"PinGet",
|
||||
api.PinCid(c).ToSerial(),
|
||||
&pin,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, fmt.Sprintf("Error: path '%s' is not pinned", arg))
|
||||
return
|
||||
}
|
||||
pinLs.Keys[pin.Cid] = ipfsPinType{
|
||||
Type: "recursive",
|
||||
}
|
||||
} else {
|
||||
var pins []api.PinSerial
|
||||
err := ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
"Pins",
|
||||
struct{}{},
|
||||
&pins,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for _, pin := range pins {
|
||||
pinLs.Keys[pin.Cid] = ipfsPinType{
|
||||
Type: "recursive",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resBytes, _ := json.Marshal(pinLs)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
}
|
||||
|
||||
func (ipfs *Connector) addHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reader, err := r.MultipartReader()
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error reading request: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
if q.Get("only-hash") == "true" {
|
||||
ipfsErrorResponder(w, "only-hash is not supported when adding to cluster")
|
||||
}
|
||||
|
||||
unpin := q.Get("pin") == "false"
|
||||
|
||||
// Luckily, most IPFS add query params are compatible with cluster's
|
||||
// /add params. We can parse most of them directly from the query.
|
||||
params, err := api.AddParamsFromQuery(q)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, "error parsing options:"+err.Error())
|
||||
return
|
||||
}
|
||||
trickle := q.Get("trickle")
|
||||
if trickle == "true" {
|
||||
params.Layout = "trickle"
|
||||
}
|
||||
|
||||
logger.Warningf("Proxy/add does not support all IPFS params. Current options: %+v", params)
|
||||
|
||||
outputTransform := func(in *api.AddedOutput) interface{} {
|
||||
r := &ipfsAddResp{
|
||||
Name: in.Name,
|
||||
Hash: in.Cid,
|
||||
Bytes: int64(in.Bytes),
|
||||
}
|
||||
if in.Size != 0 {
|
||||
r.Size = strconv.FormatUint(in.Size, 10)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
root, err := adderutils.AddMultipartHTTPHandler(
|
||||
ipfs.ctx,
|
||||
ipfs.rpcClient,
|
||||
params,
|
||||
reader,
|
||||
w,
|
||||
outputTransform,
|
||||
)
|
||||
|
||||
// any errors have been sent as Trailer
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !unpin {
|
||||
return
|
||||
}
|
||||
|
||||
// Unpin because the user doesn't want to pin
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
err = ipfs.rpcClient.CallContext(
|
||||
ipfs.ctx,
|
||||
"",
|
||||
"Cluster",
|
||||
"Unpin",
|
||||
api.PinCid(root).ToSerial(),
|
||||
&struct{}{},
|
||||
)
|
||||
if err != nil {
|
||||
w.Header().Set("X-Stream-Error", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ipfs *Connector) repoStatHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var peers []peer.ID
|
||||
err := ipfs.rpcClient.Call(
|
||||
"",
|
||||
"Cluster",
|
||||
"ConsensusPeers",
|
||||
struct{}{},
|
||||
&peers,
|
||||
)
|
||||
if err != nil {
|
||||
ipfsErrorResponder(w, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ctxs, cancels := rpcutil.CtxsWithTimeout(ipfs.ctx, len(peers), ipfs.config.IPFSRequestTimeout)
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
||||
repoStats := make([]api.IPFSRepoStat, len(peers), len(peers))
|
||||
repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats))
|
||||
for i := range repoStats {
|
||||
repoStatsIfaces[i] = &repoStats[i]
|
||||
}
|
||||
|
||||
errs := ipfs.rpcClient.MultiCall(
|
||||
ctxs,
|
||||
peers,
|
||||
"Cluster",
|
||||
"IPFSRepoStat",
|
||||
struct{}{},
|
||||
repoStatsIfaces,
|
||||
)
|
||||
|
||||
totalStats := api.IPFSRepoStat{}
|
||||
|
||||
for i, err := range errs {
|
||||
if err != nil {
|
||||
logger.Errorf("%s repo/stat errored: %s", peers[i], err)
|
||||
continue
|
||||
}
|
||||
totalStats.RepoSize += repoStats[i].RepoSize
|
||||
totalStats.StorageMax += repoStats[i].StorageMax
|
||||
}
|
||||
|
||||
resBytes, _ := json.Marshal(totalStats)
|
||||
w.Header().Add("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
// SetClient makes the component ready to perform RPC
|
||||
// requests.
|
||||
func (ipfs *Connector) SetClient(c *rpc.Client) {
|
||||
ipfs.rpcClient = c
|
||||
ipfs.rpcReady <- struct{}{}
|
||||
}
|
||||
|
||||
// Shutdown stops any listeners and stops the component from taking
|
||||
// any requests.
|
||||
func (ipfs *Connector) Shutdown() error {
|
||||
ipfs.shutdownLock.Lock()
|
||||
defer ipfs.shutdownLock.Unlock()
|
||||
|
||||
if ipfs.shutdown {
|
||||
logger.Debug("already shutdown")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("stopping IPFS Proxy")
|
||||
|
||||
ipfs.cancel()
|
||||
close(ipfs.rpcReady)
|
||||
ipfs.server.SetKeepAlivesEnabled(false)
|
||||
ipfs.listener.Close()
|
||||
|
||||
ipfs.wg.Wait()
|
||||
ipfs.shutdown = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ID performs an ID request against the configured
|
||||
// IPFS daemon. It returns the fetched information.
|
||||
// If the request fails, or the parsing fails, it
|
||||
|
|
Loading…
Reference in New Issue
Block a user