Fix: bad behaviour when adding and ipfs is down

Adding keeps retrying indefinitely to send blocks to ipfs. When ipfs is down this:

* Stores all errors
* Keeps retrying
* Additionally, forgot to close bodies

This resulted in memory leaks. The new behaviour does not keep retrying. And
response bodies are closed after reading.
This commit is contained in:
Hector Sanjuan 2022-04-26 14:47:49 +02:00
parent 09814e5176
commit 8626fbc166

View File

@ -18,7 +18,6 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/api"
"go.uber.org/multierr"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
@ -617,6 +616,7 @@ func checkResponse(path string, res *http.Response) ([]byte, error) {
}
body, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err == nil {
var ipfsErr ipfsError
if err := json.Unmarshal(body, &ipfsErr); err == nil {
@ -1046,8 +1046,6 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi
logger.Debug("streaming blocks to IPFS")
defer ipfs.updateInformerMetric(ctx)
var errs error
it := &chanIterator{
ctx: ctx,
blocks: blocks,
@ -1072,46 +1070,41 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi
}
url := "block/put?" + q.Encode()
// We essentially keep going on any request errors and keep putting
// blocks until we are done. We will, however, return a final error if
// there were errors along the way, but we do not abort the blocks
// stream because we could not block/put.
for !it.Done() {
select {
case <-ctx.Done():
logger.Error("BlockStream aborted: %s", ctx.Err())
return ctx.Err()
default:
}
// Now we stream the blocks to ipfs. In case of error, we return
// directly, but leave a goroutine draining the channel until it is
// closed, which should be soon after returning.
multiFileR := files.NewMultiFileReader(dir, true)
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR)
if err != nil {
return err
}
defer body.Close()
multiFileR := files.NewMultiFileReader(dir, true)
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR)
dec := json.NewDecoder(body)
for {
var res ipfsBlockPutResp
err = dec.Decode(&res)
if err == io.EOF {
break
}
if err != nil {
errs = multierr.Append(errs, err)
continue
logger.Error(err)
break
}
dec := json.NewDecoder(body)
for {
var res ipfsBlockPutResp
err := dec.Decode(&res)
if err == io.EOF {
break
}
if err != nil {
logger.Error(err)
errs = multierr.Append(errs, err)
break
}
logger.Debugf("response block: %s", res.Key)
if !it.Seen(res.Key) {
logger.Warningf("blockPut response CID (%s) does not match the multihash of any blocks sent", res.Key)
}
logger.Debugf("response block: %s", res.Key)
if !it.Seen(res.Key) {
logger.Warningf("blockPut response CID (%s) does not match the multihash of any blocks sent", res.Key)
}
// continue until it.Done()
}
return errs
// keep draining blocks channel until closed.
go func() {
for range blocks {
}
}()
return err
}
// BlockGet retrieves an ipfs block with the given cid