From 8626fbc166208eff5db0b8aa82c927ce08b7053b Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 26 Apr 2022 14:47:49 +0200 Subject: [PATCH] Fix: bad behaviour when adding and ipfs is down Adding keeps retrying indefinitely to send blocks to ipfs. When ipfs is down this: * Stores all errors * Keeps retrying * Additionally, forgot to close bodies This resulted in memory leaks. The new behaviour does not keep retrying. And response bodies are closed after reading. --- ipfsconn/ipfshttp/ipfshttp.go | 67 ++++++++++++++++------------------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 50f08acf..a48cd7ab 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -18,7 +18,6 @@ import ( "time" "github.com/ipfs/ipfs-cluster/api" - "go.uber.org/multierr" cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" @@ -617,6 +616,7 @@ func checkResponse(path string, res *http.Response) ([]byte, error) { } body, err := ioutil.ReadAll(res.Body) + res.Body.Close() if err == nil { var ipfsErr ipfsError if err := json.Unmarshal(body, &ipfsErr); err == nil { @@ -1046,8 +1046,6 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi logger.Debug("streaming blocks to IPFS") defer ipfs.updateInformerMetric(ctx) - var errs error - it := &chanIterator{ ctx: ctx, blocks: blocks, @@ -1072,46 +1070,41 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi } url := "block/put?" + q.Encode() - // We essentially keep going on any request errors and keep putting - // blocks until we are done. We will, however, return a final error if - // there were errors along the way, but we do not abort the blocks - // stream because we could not block/put. - for !it.Done() { - select { - case <-ctx.Done(): - logger.Error("BlockStream aborted: %s", ctx.Err()) - return ctx.Err() - default: - } + // Now we stream the blocks to ipfs. In case of error, we return + // directly, but leave a goroutine draining the channel until it is + // closed, which should be soon after returning. + multiFileR := files.NewMultiFileReader(dir, true) + contentType := "multipart/form-data; boundary=" + multiFileR.Boundary() + body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR) + if err != nil { + return err + } + defer body.Close() - multiFileR := files.NewMultiFileReader(dir, true) - contentType := "multipart/form-data; boundary=" + multiFileR.Boundary() - body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR) + dec := json.NewDecoder(body) + for { + var res ipfsBlockPutResp + err = dec.Decode(&res) + if err == io.EOF { + break + } if err != nil { - errs = multierr.Append(errs, err) - continue + logger.Error(err) + break } - dec := json.NewDecoder(body) - for { - var res ipfsBlockPutResp - err := dec.Decode(&res) - if err == io.EOF { - break - } - if err != nil { - logger.Error(err) - errs = multierr.Append(errs, err) - break - } - logger.Debugf("response block: %s", res.Key) - if !it.Seen(res.Key) { - logger.Warningf("blockPut response CID (%s) does not match the multihash of any blocks sent", res.Key) - } + logger.Debugf("response block: %s", res.Key) + if !it.Seen(res.Key) { + logger.Warningf("blockPut response CID (%s) does not match the multihash of any blocks sent", res.Key) } - // continue until it.Done() } - return errs + // keep draining blocks channel until closed. + go func() { + for range blocks { + } + }() + + return err } // BlockGet retrieves an ipfs block with the given cid