From 80e7baca91ee4e2e936086141ed59c0190e9e333 Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Thu, 24 May 2018 17:12:05 +1000 Subject: [PATCH] ipfshttp: hande request timeouts to ifps daemon correctly License: MIT Signed-off-by: Adrian Lanzafame --- ipfsconn/ipfshttp/ipfshttp.go | 42 ++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index dba23500..1963200e 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -137,9 +137,7 @@ func NewConnector(cfg *Config) (*Connector, error) { } s.SetKeepAlivesEnabled(true) // A reminder that this can be changed - c := &http.Client{ - Timeout: cfg.IPFSRequestTimeout, - } + c := &http.Client{} // timeouts are handled by context timeouts ctx, cancel := context.WithCancel(context.Background()) @@ -571,8 +569,10 @@ func (ipfs *Connector) Shutdown() error { // returns an error and an empty IPFSID which also // contains the error message. func (ipfs *Connector) ID() (api.IPFSID, error) { + ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() id := api.IPFSID{} - body, err := ipfs.post("id") + body, err := ipfs.postCtx(ctx, "id") if err != nil { id.Error = err.Error() return id, err @@ -661,7 +661,9 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error { // PinLs performs a "pin ls --type typeFilter" request against the configured // IPFS daemon and returns a map of cid strings and their status. func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) { - body, err := ipfs.post("pin/ls?type=" + typeFilter) + ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() + body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter) // Some error talking to the daemon if err != nil { @@ -686,8 +688,10 @@ func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string // PinLsCid performs a "pin ls --type=recursive "request and returns // an api.IPFSPinStatus for that hash. func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPinStatus, error) { + ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash) - body, err := ipfs.post(lsPath) + body, err := ipfs.postCtx(ctx, lsPath) // Network error, daemon down if body == nil && err != nil { @@ -748,13 +752,9 @@ func checkResponse(path string, code int, body []byte) error { return fmt.Errorf("IPFS-post '%s' unsuccessful: %d: %s", path, code, body) } -// post makes a POST request against +// postCtx makes a POST request against // the ipfs daemon, reads the full body of the response and // returns it after checking for errors. -func (ipfs *Connector) post(path string) ([]byte, error) { - return ipfs.postCtx(ipfs.ctx, path) -} - func (ipfs *Connector) postCtx(ctx context.Context, path string) ([]byte, error) { res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path) if err != nil { @@ -793,6 +793,8 @@ func (ipfs *Connector) apiURL() string { // ConnectSwarms requests the ipfs addresses of other peers and // triggers ipfs swarm connect requests func (ipfs *Connector) ConnectSwarms() error { + ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() var idsSerial []api.IDSerial err := ipfs.rpcClient.Call( "", @@ -813,7 +815,7 @@ func (ipfs *Connector) ConnectSwarms() error { // This is a best effort attempt // We ignore errors which happens // when passing in a bunch of addresses - _, err := ipfs.post(fmt.Sprintf("swarm/connect?arg=%s", addr)) + _, err := ipfs.postCtx(ctx, fmt.Sprintf("swarm/connect?arg=%s", addr)) if err != nil { logger.Debug(err) continue @@ -828,7 +830,9 @@ func (ipfs *Connector) ConnectSwarms() error { // a given configuration key. For example, "Datastore/StorageMax" will return // the value for StorageMax in the Datastore configuration object. func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) { - res, err := ipfs.post("config/show") + ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() + res, err := ipfs.postCtx(ctx, "config/show") if err != nil { logger.Error(err) return nil, err @@ -872,7 +876,9 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err // value is derived from the RepoSize and StorageMax values given by "repo // stats". The value is in bytes. func (ipfs *Connector) FreeSpace() (uint64, error) { - res, err := ipfs.post("repo/stat") + ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() + res, err := ipfs.postCtx(ctx, "repo/stat") if err != nil { logger.Error(err) return 0, err @@ -890,7 +896,9 @@ func (ipfs *Connector) FreeSpace() (uint64, error) { // RepoSize returns the current repository size of the ipfs daemon as // provided by "repo stats". The value is in bytes. func (ipfs *Connector) RepoSize() (uint64, error) { - res, err := ipfs.post("repo/stat") + ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() + res, err := ipfs.postCtx(ctx, "repo/stat") if err != nil { logger.Error(err) return 0, err @@ -907,8 +915,10 @@ func (ipfs *Connector) RepoSize() (uint64, error) { // SwarmPeers returns the peers currently connected to this ipfs daemon func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) { + ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) + defer cancel() swarm := api.SwarmPeers{} - res, err := ipfs.post("swarm/peers") + res, err := ipfs.postCtx(ctx, "swarm/peers") if err != nil { logger.Error(err) return swarm, err