Pin timeouts should start from the last block

IPFSConnector Pin operation should not have a timeout since the start
of the operation, but a timeout since the last block was received,
thus only cancelling operations when there are no more blocks provided
in the network
This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-04-04 12:12:01 +05:30 committed by Hector Sanjuan
parent 9692e368f3
commit 9244daabfb

View File

@ -3,6 +3,7 @@
package ipfshttp
import (
"bufio"
"context"
"encoding/json"
"errors"
@ -265,9 +266,9 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Pin")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ipfs.config.PinTimeout)
ctx1, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
pinStatus, err := ipfs.PinLsCid(ctx, hash)
pinStatus, err := ipfs.PinLsCid(ctx1, hash)
if err != nil {
return err
}
@ -292,7 +293,7 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
switch ipfs.config.PinMethod {
case "refs": // do refs -r first
path := fmt.Sprintf("refs?arg=%s&%s", hash, pinArgs)
err := ipfs.postDiscardBodyCtx(ctx, path)
err := ipfs.postCtxForStreamingResponse(ctx, path, ipfs.config.PinTimeout)
if err != nil {
return err
}
@ -300,8 +301,8 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
stats.Record(ctx, observations.Pins.M(1))
}
path := fmt.Sprintf("pin/add?arg=%s&%s", hash, pinArgs)
_, err = ipfs.postCtx(ctx, path, "", nil)
path := fmt.Sprintf("pin/add?arg=%s&%s&progress=true", hash, pinArgs)
err = ipfs.postCtxForStreamingResponse(ctx, path, ipfs.config.PinTimeout)
if err == nil {
logger.Info("IPFS Pin request succeeded: ", hash)
}
@ -450,6 +451,49 @@ func checkResponse(path string, code int, body []byte) error {
return fmt.Errorf("IPFS-post '%s' unsuccessful: %d: %s", path, code, body)
}
// postCtxForStreamingResponse makes a POST request against
// the ipfs daemon, reads the body of the response line by line
// at evey millisecond. It cancels the context if timeout duration
// has passed since last new data on the buffer.
func (ipfs *Connector) postCtxForStreamingResponse(ctx context.Context, path string, timeout time.Duration) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil)
if err != nil {
return err
}
defer res.Body.Close()
reader := bufio.NewReader(res.Body)
elapsed := time.Millisecond * 0
var prev string
for {
line, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
// There might be new bytes before EOF, but we don't need to know them
break
}
return err
}
cur := string(line)
if cur == prev {
elapsed = elapsed + time.Millisecond
} else {
prev = cur
elapsed = time.Millisecond * 0
}
time.Sleep(time.Millisecond)
if elapsed >= timeout {
cancel()
break
}
}
return nil
}
// postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.