From 592d61b2281030f1e43918689ac857f11586ef20 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 15 Sep 2022 17:37:26 +0200 Subject: [PATCH] ipfshttp: rate limit requests when failures happen When IPFS starts failing or doesn't respond (i.e. during a restart), cluster is likely to start sending requests at very fast rates. i.e. if there are 100k items to be pinned, and pins start failing immediately, cluster will consume the pin queue really fast and it will all be failures. At the same time, ipfs is hammered non-stop until recover, which may make it harder. This commits introduces a rate-limit when requests to IPFS fail. After 10 failed requests, requests will be sent at most at 1req/s rate. Once a requests succeeds, the rate-limit is raised. This should prevent hammering the IPFS daemon, but also increased CPU in cluster as it burns through pinning queues when IPFS is offline, making the situation in machines worse (and emitting way more logs). --- ipfsconn/ipfshttp/ipfshttp.go | 274 ++++++++++++++++++++-------------- 1 file changed, 166 insertions(+), 108 deletions(-) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index f105a937..91f998d0 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -63,6 +63,9 @@ type Connector struct { client *http.Client // client to ipfs daemon + failedRequests atomic.Uint64 // count failed requests. + reqRateLimitCh chan struct{} + shutdownLock sync.Mutex shutdown bool wg sync.WaitGroup @@ -168,17 +171,20 @@ func NewConnector(cfg *Config) (*Connector, error) { ctx, cancel := context.WithCancel(context.Background()) ipfs := &Connector{ - ctx: ctx, - cancel: cancel, - ready: make(chan struct{}), - config: cfg, - nodeAddr: nodeAddr, - rpcReady: make(chan struct{}, 1), - client: c, + ctx: ctx, + cancel: cancel, + ready: make(chan struct{}), + config: cfg, + nodeAddr: nodeAddr, + rpcReady: make(chan struct{}, 1), + reqRateLimitCh: make(chan struct{}), + client: c, } initializeMetrics(ctx) + go ipfs.rateLimiter() + go ipfs.run() return ipfs, nil } @@ -194,6 +200,50 @@ func initializeMetrics(ctx context.Context) { stats.Record(ctx, observations.BlocksAddedError.M(0)) } +// rateLimiter issues ticks in the reqRateLimitCh that allow requests to +// proceed. See doPostCtx. +func (ipfs *Connector) rateLimiter() { + isRateLimiting := false + + // TODO: The rate-limiter is configured to start rate-limiting after + // 10 failed requests at a rate of 1 req/s. This should probably be + // configurable. + for { + failed := ipfs.failedRequests.Load() + fmt.Println(failed) + switch { + case failed == 0: + if isRateLimiting { + // This does not print always, + // only when there were several requests + // waiting to read. + logger.Warning("Lifting up rate limit") + } + isRateLimiting = false + case failed > 0 && failed <= 10: + isRateLimiting = false + case failed > 10: + if !isRateLimiting { + logger.Warning("Rate-limiting requests to 1req/s") + } + isRateLimiting = true + time.Sleep(time.Second) + } + + // Send tick + select { + case <-ipfs.ctx.Done(): + close(ipfs.reqRateLimitCh) + return + case ipfs.reqRateLimitCh <- struct{}{}: + // note that the channel is unbuffered, + // therefore we will sit here until a method + // wants to read from us, and they don't if + // failed == 0. + } + } +} + // connects all ipfs daemons when // we receive the rpcReady signal. func (ipfs *Connector) run() { @@ -217,6 +267,7 @@ func (ipfs *Connector) run() { logger.Warningf("ipfs does not seem to be available after %d retries", i) } + // Requests will be rate-limited when going faster. time.Sleep(time.Second) } @@ -462,18 +513,14 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash api.Cid, maxDepth a pinArgs := pinArgs(maxDepth) path := fmt.Sprintf("pin/add?arg=%s&%s&progress=true", hash, pinArgs) - res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil) + + body, err := ipfs.postCtxStreamResponse(ctx, path, "", nil) if err != nil { return err } - defer res.Body.Close() + defer body.Close() - _, err = checkResponse(path, res) - if err != nil { - return err - } - - dec := json.NewDecoder(res.Body) + dec := json.NewDecoder(body) for { var pins ipfsPinsResp if err := dec.Decode(&pins); err != nil { @@ -575,7 +622,6 @@ nextFilter: path := "pin/ls?stream=true&type=" + typeFilter bodies[i], err = ipfs.postCtxStreamResponse(ctx, path, "", nil) if err != nil { - logger.Errorf("error querying pinset: %s", err) return err } defer bodies[i].Close() @@ -652,90 +698,6 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, pin api.Pin) (api.IPFSPinSt return res.Type, nil } -func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) { - logger.Debugf("posting /%s", path) - urlstr := fmt.Sprintf("%s/%s", apiURL, path) - - req, err := http.NewRequest("POST", urlstr, postBody) - if err != nil { - logger.Error("error creating POST request:", err) - } - - req.Header.Set("Content-Type", contentType) - req = req.WithContext(ctx) - res, err := ipfs.client.Do(req) - if err != nil { - logger.Error("error posting to IPFS:", err) - } - - return res, err -} - -// checkResponse tries to parse an error message on non StatusOK responses -// from ipfs. -func checkResponse(path string, res *http.Response) ([]byte, error) { - if res.StatusCode == http.StatusOK { - return nil, nil - } - - body, err := io.ReadAll(res.Body) - res.Body.Close() - if err == nil { - var ipfsErr ipfsError - if err := json.Unmarshal(body, &ipfsErr); err == nil { - ipfsErr.code = res.StatusCode - ipfsErr.path = path - return body, ipfsErr - } - } - - // No error response with useful message from ipfs - return nil, fmt.Errorf( - "IPFS request failed (is it running?) (%s). Code %d: %s", - path, - res.StatusCode, - string(body)) -} - -// postCtx makes a POST request against -// the ipfs daemon, reads the full body of the response and -// returns it after checking for errors. -func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) { - rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody) - if err != nil { - return nil, err - } - defer rdr.Close() - - body, err := io.ReadAll(rdr) - if err != nil { - logger.Errorf("error reading response body: %s", err) - return nil, err - } - return body, nil -} - -// postCtxStreamResponse makes a POST request against the ipfs daemon, and -// returns the body reader after checking the request for errros. -func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) { - res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody) - if err != nil { - return nil, err - } - - _, err = checkResponse(path, res) - if err != nil { - return nil, err - } - return res.Body, nil -} - -// apiURL is a short-hand for building the url of the IPFS -// daemon API. -func (ipfs *Connector) apiURL() string { - return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr) -} - // ConnectSwarms requests the ipfs addresses of other peers and // triggers ipfs swarm connect requests func (ipfs *Connector) ConnectSwarms(ctx context.Context) error { @@ -795,7 +757,6 @@ func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) { defer cancel() res, err := ipfs.postCtx(ctx, "config/show", "", nil) if err != nil { - logger.Error(err) return nil, err } @@ -842,7 +803,6 @@ func (ipfs *Connector) RepoStat(ctx context.Context) (api.IPFSRepoStat, error) { defer cancel() res, err := ipfs.postCtx(ctx, "repo/stat?size-only=true", "", nil) if err != nil { - logger.Error(err) return api.IPFSRepoStat{}, err } @@ -863,14 +823,14 @@ func (ipfs *Connector) RepoGC(ctx context.Context) (api.RepoGC, error) { ctx, cancel := context.WithTimeout(ctx, ipfs.config.RepoGCTimeout) defer cancel() - res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), "repo/gc?stream-errors=true", "", nil) + body, err := ipfs.postCtxStreamResponse(ctx, "repo/gc?stream-errors=true", "", nil) if err != nil { - logger.Error(err) return api.RepoGC{}, err } - defer res.Body.Close() - dec := json.NewDecoder(res.Body) + defer body.Close() + + dec := json.NewDecoder(body) repoGC := api.RepoGC{ Keys: []api.IPFSRepoGC{}, } @@ -915,7 +875,6 @@ func (ipfs *Connector) Resolve(ctx context.Context, path string) (api.Cid, error defer cancel() res, err := ipfs.postCtx(ctx, "resolve?arg="+url.QueryEscape(path), "", nil) if err != nil { - logger.Error(err) return api.CidUndef, err } @@ -940,7 +899,6 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) { res, err := ipfs.postCtx(ctx, "swarm/peers", "", nil) if err != nil { - logger.Error(err) return nil, err } var peersRaw ipfsSwarmPeersResp @@ -1263,3 +1221,103 @@ func (ipfs *Connector) updateInformerMetric(ctx context.Context) error { } return err } + +// daemon API. +func (ipfs *Connector) apiURL() string { + return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr) +} + +func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) { + logger.Debugf("posting /%s", path) + urlstr := fmt.Sprintf("%s/%s", apiURL, path) + + req, err := http.NewRequest("POST", urlstr, postBody) + if err != nil { + logger.Error("error creating POST request:", err) + return nil, err + } + + req.Header.Set("Content-Type", contentType) + req = req.WithContext(ctx) + + // Rate limiter. If we have a number of failed requests, + // then wait for a tick. + if failed := ipfs.failedRequests.Load(); failed > 0 { + select { + case <-ipfs.reqRateLimitCh: + case <-ipfs.ctx.Done(): + return nil, ctx.Err() + } + + } + + res, err := ipfs.client.Do(req) + if err != nil { + // request error: ipfs was unreachable, record it. + ipfs.failedRequests.Add(1) + logger.Error("error posting to IPFS:", err) + } else { + ipfs.failedRequests.Store(0) + } + + return res, err +} + +// checkResponse tries to parse an error message on non StatusOK responses +// from ipfs. +func checkResponse(path string, res *http.Response) ([]byte, error) { + if res.StatusCode == http.StatusOK { + return nil, nil + } + + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err == nil { + var ipfsErr ipfsError + if err := json.Unmarshal(body, &ipfsErr); err == nil { + ipfsErr.code = res.StatusCode + ipfsErr.path = path + return body, ipfsErr + } + } + + // No error response with useful message from ipfs + return nil, fmt.Errorf( + "IPFS request failed (is it running?) (%s). Code %d: %s", + path, + res.StatusCode, + string(body)) +} + +// postCtxStreamResponse makes a POST request against the ipfs daemon, and +// returns the body reader after checking the request for errros. +func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) { + res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody) + if err != nil { + return nil, err + } + + _, err = checkResponse(path, res) + if err != nil { + return nil, err + } + return res.Body, nil +} + +// postCtx makes a POST request against +// the ipfs daemon, reads the full body of the response and +// returns it after checking for errors. +func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) { + rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody) + if err != nil { + return nil, err + } + defer rdr.Close() + + body, err := io.ReadAll(rdr) + if err != nil { + logger.Errorf("error reading response body: %s", err) + return nil, err + } + return body, nil +}