Merge pull request #2001 from ipfs-cluster/fix/1999-ipfsconn-unix-sockets

Fix #1999: Support talking to unix sockets in the ipfshttp connector
This commit is contained in:
Hector Sanjuan 2023-12-06 12:45:00 +01:00 committed by GitHub
commit 304c6285ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,6 +18,7 @@ import (
"github.com/ipfs-cluster/ipfs-cluster/api" "github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs-cluster/ipfs-cluster/observations" "github.com/ipfs-cluster/ipfs-cluster/observations"
"github.com/tv42/httpunix"
files "github.com/ipfs/boxo/files" files "github.com/ipfs/boxo/files"
gopath "github.com/ipfs/boxo/path" gopath "github.com/ipfs/boxo/path"
@ -55,8 +56,9 @@ type Connector struct {
cancel func() cancel func()
ready chan struct{} ready chan struct{}
config *Config config *Config
nodeAddr string nodeAddr string
nodeNetwork string
rpcClient *rpc.Client rpcClient *rpc.Client
rpcReady chan struct{} rpcReady chan struct{}
@ -152,12 +154,23 @@ func NewConnector(cfg *Config) (*Connector, error) {
nodeMAddr = resolvedAddrs[0] nodeMAddr = resolvedAddrs[0]
} }
_, nodeAddr, err := manet.DialArgs(nodeMAddr) nodeNetwork, nodeAddr, err := manet.DialArgs(nodeMAddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c := &http.Client{} // timeouts are handled by context timeouts c := &http.Client{} // timeouts are handled by context timeouts
if nodeNetwork == "unix" {
unixTransport := &httpunix.Transport{
DialTimeout: time.Second,
}
unixTransport.RegisterLocation("ipfs", nodeAddr)
t := &http.Transport{}
t.RegisterProtocol(httpunix.Scheme, unixTransport)
c.Transport = t
}
if cfg.Tracing { if cfg.Tracing {
c.Transport = &ochttp.Transport{ c.Transport = &ochttp.Transport{
Base: http.DefaultTransport, Base: http.DefaultTransport,
@ -176,6 +189,7 @@ func NewConnector(cfg *Config) (*Connector, error) {
ready: make(chan struct{}), ready: make(chan struct{}),
config: cfg, config: cfg,
nodeAddr: nodeAddr, nodeAddr: nodeAddr,
nodeNetwork: nodeNetwork,
rpcReady: make(chan struct{}, 1), rpcReady: make(chan struct{}, 1),
reqRateLimitCh: make(chan struct{}), reqRateLimitCh: make(chan struct{}),
client: c, client: c,
@ -1222,7 +1236,12 @@ func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
// daemon API. // daemon API.
func (ipfs *Connector) apiURL() string { func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr) switch ipfs.nodeNetwork {
case "unix":
return "http+unix://ipfs/api/v0"
default:
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
}
} }
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) { func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {