ipfshttp: rate limit requests when failures happen

When IPFS starts failing or doesn't respond (i.e. during a restart), cluster
is likely to start sending requests at very fast rates. i.e. if there are 100k
items to be pinned, and pins start failing immediately, cluster will consume
the pin queue really fast and it will all be failures. At the same time, ipfs
is hammered non-stop until recover, which may make it harder.

This commits introduces a rate-limit when requests to IPFS fail. After 10
failed requests, requests will be sent at most at 1req/s rate. Once a requests
succeeds, the rate-limit is raised.

This should prevent hammering the IPFS daemon, but also increased CPU in
cluster as it burns through pinning queues when IPFS is offline, making the
situation in machines worse (and emitting way more logs).
This commit is contained in:
Hector Sanjuan 2022-09-15 17:37:26 +02:00
parent 2286ee73f8
commit 592d61b228

View File

@ -63,6 +63,9 @@ type Connector struct {
client *http.Client // client to ipfs daemon
failedRequests atomic.Uint64 // count failed requests.
reqRateLimitCh chan struct{}
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
@ -168,17 +171,20 @@ func NewConnector(cfg *Config) (*Connector, error) {
ctx, cancel := context.WithCancel(context.Background())
ipfs := &Connector{
ctx: ctx,
cancel: cancel,
ready: make(chan struct{}),
config: cfg,
nodeAddr: nodeAddr,
rpcReady: make(chan struct{}, 1),
client: c,
ctx: ctx,
cancel: cancel,
ready: make(chan struct{}),
config: cfg,
nodeAddr: nodeAddr,
rpcReady: make(chan struct{}, 1),
reqRateLimitCh: make(chan struct{}),
client: c,
}
initializeMetrics(ctx)
go ipfs.rateLimiter()
go ipfs.run()
return ipfs, nil
}
@ -194,6 +200,50 @@ func initializeMetrics(ctx context.Context) {
stats.Record(ctx, observations.BlocksAddedError.M(0))
}
// rateLimiter issues ticks in the reqRateLimitCh that allow requests to
// proceed. See doPostCtx.
func (ipfs *Connector) rateLimiter() {
isRateLimiting := false
// TODO: The rate-limiter is configured to start rate-limiting after
// 10 failed requests at a rate of 1 req/s. This should probably be
// configurable.
for {
failed := ipfs.failedRequests.Load()
fmt.Println(failed)
switch {
case failed == 0:
if isRateLimiting {
// This does not print always,
// only when there were several requests
// waiting to read.
logger.Warning("Lifting up rate limit")
}
isRateLimiting = false
case failed > 0 && failed <= 10:
isRateLimiting = false
case failed > 10:
if !isRateLimiting {
logger.Warning("Rate-limiting requests to 1req/s")
}
isRateLimiting = true
time.Sleep(time.Second)
}
// Send tick
select {
case <-ipfs.ctx.Done():
close(ipfs.reqRateLimitCh)
return
case ipfs.reqRateLimitCh <- struct{}{}:
// note that the channel is unbuffered,
// therefore we will sit here until a method
// wants to read from us, and they don't if
// failed == 0.
}
}
}
// connects all ipfs daemons when
// we receive the rpcReady signal.
func (ipfs *Connector) run() {
@ -217,6 +267,7 @@ func (ipfs *Connector) run() {
logger.Warningf("ipfs does not seem to be available after %d retries", i)
}
// Requests will be rate-limited when going faster.
time.Sleep(time.Second)
}
@ -462,18 +513,14 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash api.Cid, maxDepth a
pinArgs := pinArgs(maxDepth)
path := fmt.Sprintf("pin/add?arg=%s&%s&progress=true", hash, pinArgs)
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil)
body, err := ipfs.postCtxStreamResponse(ctx, path, "", nil)
if err != nil {
return err
}
defer res.Body.Close()
defer body.Close()
_, err = checkResponse(path, res)
if err != nil {
return err
}
dec := json.NewDecoder(res.Body)
dec := json.NewDecoder(body)
for {
var pins ipfsPinsResp
if err := dec.Decode(&pins); err != nil {
@ -575,7 +622,6 @@ nextFilter:
path := "pin/ls?stream=true&type=" + typeFilter
bodies[i], err = ipfs.postCtxStreamResponse(ctx, path, "", nil)
if err != nil {
logger.Errorf("error querying pinset: %s", err)
return err
}
defer bodies[i].Close()
@ -652,90 +698,6 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, pin api.Pin) (api.IPFSPinSt
return res.Type, nil
}
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
logger.Debugf("posting /%s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path)
req, err := http.NewRequest("POST", urlstr, postBody)
if err != nil {
logger.Error("error creating POST request:", err)
}
req.Header.Set("Content-Type", contentType)
req = req.WithContext(ctx)
res, err := ipfs.client.Do(req)
if err != nil {
logger.Error("error posting to IPFS:", err)
}
return res, err
}
// checkResponse tries to parse an error message on non StatusOK responses
// from ipfs.
func checkResponse(path string, res *http.Response) ([]byte, error) {
if res.StatusCode == http.StatusOK {
return nil, nil
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err == nil {
var ipfsErr ipfsError
if err := json.Unmarshal(body, &ipfsErr); err == nil {
ipfsErr.code = res.StatusCode
ipfsErr.path = path
return body, ipfsErr
}
}
// No error response with useful message from ipfs
return nil, fmt.Errorf(
"IPFS request failed (is it running?) (%s). Code %d: %s",
path,
res.StatusCode,
string(body))
}
// postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.
func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) {
rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody)
if err != nil {
return nil, err
}
defer rdr.Close()
body, err := io.ReadAll(rdr)
if err != nil {
logger.Errorf("error reading response body: %s", err)
return nil, err
}
return body, nil
}
// postCtxStreamResponse makes a POST request against the ipfs daemon, and
// returns the body reader after checking the request for errros.
func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody)
if err != nil {
return nil, err
}
_, err = checkResponse(path, res)
if err != nil {
return nil, err
}
return res.Body, nil
}
// apiURL is a short-hand for building the url of the IPFS
// daemon API.
func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
}
// ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms(ctx context.Context) error {
@ -795,7 +757,6 @@ func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
defer cancel()
res, err := ipfs.postCtx(ctx, "config/show", "", nil)
if err != nil {
logger.Error(err)
return nil, err
}
@ -842,7 +803,6 @@ func (ipfs *Connector) RepoStat(ctx context.Context) (api.IPFSRepoStat, error) {
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat?size-only=true", "", nil)
if err != nil {
logger.Error(err)
return api.IPFSRepoStat{}, err
}
@ -863,14 +823,14 @@ func (ipfs *Connector) RepoGC(ctx context.Context) (api.RepoGC, error) {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.RepoGCTimeout)
defer cancel()
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), "repo/gc?stream-errors=true", "", nil)
body, err := ipfs.postCtxStreamResponse(ctx, "repo/gc?stream-errors=true", "", nil)
if err != nil {
logger.Error(err)
return api.RepoGC{}, err
}
defer res.Body.Close()
dec := json.NewDecoder(res.Body)
defer body.Close()
dec := json.NewDecoder(body)
repoGC := api.RepoGC{
Keys: []api.IPFSRepoGC{},
}
@ -915,7 +875,6 @@ func (ipfs *Connector) Resolve(ctx context.Context, path string) (api.Cid, error
defer cancel()
res, err := ipfs.postCtx(ctx, "resolve?arg="+url.QueryEscape(path), "", nil)
if err != nil {
logger.Error(err)
return api.CidUndef, err
}
@ -940,7 +899,6 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
res, err := ipfs.postCtx(ctx, "swarm/peers", "", nil)
if err != nil {
logger.Error(err)
return nil, err
}
var peersRaw ipfsSwarmPeersResp
@ -1263,3 +1221,103 @@ func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
}
return err
}
// daemon API.
func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
}
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
logger.Debugf("posting /%s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path)
req, err := http.NewRequest("POST", urlstr, postBody)
if err != nil {
logger.Error("error creating POST request:", err)
return nil, err
}
req.Header.Set("Content-Type", contentType)
req = req.WithContext(ctx)
// Rate limiter. If we have a number of failed requests,
// then wait for a tick.
if failed := ipfs.failedRequests.Load(); failed > 0 {
select {
case <-ipfs.reqRateLimitCh:
case <-ipfs.ctx.Done():
return nil, ctx.Err()
}
}
res, err := ipfs.client.Do(req)
if err != nil {
// request error: ipfs was unreachable, record it.
ipfs.failedRequests.Add(1)
logger.Error("error posting to IPFS:", err)
} else {
ipfs.failedRequests.Store(0)
}
return res, err
}
// checkResponse tries to parse an error message on non StatusOK responses
// from ipfs.
func checkResponse(path string, res *http.Response) ([]byte, error) {
if res.StatusCode == http.StatusOK {
return nil, nil
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err == nil {
var ipfsErr ipfsError
if err := json.Unmarshal(body, &ipfsErr); err == nil {
ipfsErr.code = res.StatusCode
ipfsErr.path = path
return body, ipfsErr
}
}
// No error response with useful message from ipfs
return nil, fmt.Errorf(
"IPFS request failed (is it running?) (%s). Code %d: %s",
path,
res.StatusCode,
string(body))
}
// postCtxStreamResponse makes a POST request against the ipfs daemon, and
// returns the body reader after checking the request for errros.
func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody)
if err != nil {
return nil, err
}
_, err = checkResponse(path, res)
if err != nil {
return nil, err
}
return res.Body, nil
}
// postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.
func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) {
rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody)
if err != nil {
return nil, err
}
defer rdr.Close()
body, err := io.ReadAll(rdr)
if err != nil {
logger.Errorf("error reading response body: %s", err)
return nil, err
}
return body, nil
}