Merge pull request #427 from ipfs/fix/pin-context-timeout

ipfshttp: Use custom http clients for pin/unpin
This commit is contained in:
Hector Sanjuan 2018-05-25 09:44:53 +02:00 committed by GitHub
commit 6d9ddfaf85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -137,9 +137,7 @@ func NewConnector(cfg *Config) (*Connector, error) {
} }
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
c := &http.Client{ c := &http.Client{} // timeouts are handled by context timeouts
Timeout: cfg.IPFSRequestTimeout,
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -571,8 +569,10 @@ func (ipfs *Connector) Shutdown() error {
// returns an error and an empty IPFSID which also // returns an error and an empty IPFSID which also
// contains the error message. // contains the error message.
func (ipfs *Connector) ID() (api.IPFSID, error) { func (ipfs *Connector) ID() (api.IPFSID, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
id := api.IPFSID{} id := api.IPFSID{}
body, err := ipfs.post("id") body, err := ipfs.postCtx(ctx, "id")
if err != nil { if err != nil {
id.Error = err.Error() id.Error = err.Error()
return id, err return id, err
@ -661,7 +661,9 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error {
// PinLs performs a "pin ls --type typeFilter" request against the configured // PinLs performs a "pin ls --type typeFilter" request against the configured
// IPFS daemon and returns a map of cid strings and their status. // IPFS daemon and returns a map of cid strings and their status.
func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) { func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) {
body, err := ipfs.post("pin/ls?type=" + typeFilter) ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter)
// Some error talking to the daemon // Some error talking to the daemon
if err != nil { if err != nil {
@ -686,8 +688,10 @@ func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string
// PinLsCid performs a "pin ls --type=recursive <hash> "request and returns // PinLsCid performs a "pin ls --type=recursive <hash> "request and returns
// an api.IPFSPinStatus for that hash. // an api.IPFSPinStatus for that hash.
func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPinStatus, error) { func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPinStatus, error) {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash) lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash)
body, err := ipfs.post(lsPath) body, err := ipfs.postCtx(ctx, lsPath)
// Network error, daemon down // Network error, daemon down
if body == nil && err != nil { if body == nil && err != nil {
@ -748,13 +752,9 @@ func checkResponse(path string, code int, body []byte) error {
return fmt.Errorf("IPFS-post '%s' unsuccessful: %d: %s", path, code, body) return fmt.Errorf("IPFS-post '%s' unsuccessful: %d: %s", path, code, body)
} }
// post makes a POST request against // postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and // the ipfs daemon, reads the full body of the response and
// returns it after checking for errors. // returns it after checking for errors.
func (ipfs *Connector) post(path string) ([]byte, error) {
return ipfs.postCtx(ipfs.ctx, path)
}
func (ipfs *Connector) postCtx(ctx context.Context, path string) ([]byte, error) { func (ipfs *Connector) postCtx(ctx context.Context, path string) ([]byte, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path) res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path)
if err != nil { if err != nil {
@ -793,6 +793,8 @@ func (ipfs *Connector) apiURL() string {
// ConnectSwarms requests the ipfs addresses of other peers and // ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests // triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms() error { func (ipfs *Connector) ConnectSwarms() error {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
var idsSerial []api.IDSerial var idsSerial []api.IDSerial
err := ipfs.rpcClient.Call( err := ipfs.rpcClient.Call(
"", "",
@ -813,7 +815,7 @@ func (ipfs *Connector) ConnectSwarms() error {
// This is a best effort attempt // This is a best effort attempt
// We ignore errors which happens // We ignore errors which happens
// when passing in a bunch of addresses // when passing in a bunch of addresses
_, err := ipfs.post(fmt.Sprintf("swarm/connect?arg=%s", addr)) _, err := ipfs.postCtx(ctx, fmt.Sprintf("swarm/connect?arg=%s", addr))
if err != nil { if err != nil {
logger.Debug(err) logger.Debug(err)
continue continue
@ -828,7 +830,9 @@ func (ipfs *Connector) ConnectSwarms() error {
// a given configuration key. For example, "Datastore/StorageMax" will return // a given configuration key. For example, "Datastore/StorageMax" will return
// the value for StorageMax in the Datastore configuration object. // the value for StorageMax in the Datastore configuration object.
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) { func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
res, err := ipfs.post("config/show") ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "config/show")
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return nil, err return nil, err
@ -872,7 +876,9 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err
// value is derived from the RepoSize and StorageMax values given by "repo // value is derived from the RepoSize and StorageMax values given by "repo
// stats". The value is in bytes. // stats". The value is in bytes.
func (ipfs *Connector) FreeSpace() (uint64, error) { func (ipfs *Connector) FreeSpace() (uint64, error) {
res, err := ipfs.post("repo/stat") ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat")
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return 0, err return 0, err
@ -890,7 +896,9 @@ func (ipfs *Connector) FreeSpace() (uint64, error) {
// RepoSize returns the current repository size of the ipfs daemon as // RepoSize returns the current repository size of the ipfs daemon as
// provided by "repo stats". The value is in bytes. // provided by "repo stats". The value is in bytes.
func (ipfs *Connector) RepoSize() (uint64, error) { func (ipfs *Connector) RepoSize() (uint64, error) {
res, err := ipfs.post("repo/stat") ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat")
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return 0, err return 0, err
@ -907,8 +915,10 @@ func (ipfs *Connector) RepoSize() (uint64, error) {
// SwarmPeers returns the peers currently connected to this ipfs daemon // SwarmPeers returns the peers currently connected to this ipfs daemon
func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) { func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
swarm := api.SwarmPeers{} swarm := api.SwarmPeers{}
res, err := ipfs.post("swarm/peers") res, err := ipfs.postCtx(ctx, "swarm/peers")
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return swarm, err return swarm, err