Merge pull request #1762 from ipfs-cluster/fix/1733-ipfs-error-handling

Behaviour improvements when the ipfs daemon is unavailable
This commit is contained in:
Hector Sanjuan 2022-09-15 17:59:04 +02:00 committed by GitHub
commit c9895bf607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 265 additions and 129 deletions

View File

@ -33,11 +33,11 @@ import (
logging "github.com/ipfs/go-log/v2"
gopath "github.com/ipfs/go-path"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
gostream "github.com/libp2p/go-libp2p-gostream"
p2phttp "github.com/libp2p/go-libp2p-http"
host "github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
manet "github.com/multiformats/go-multiaddr/net"
@ -742,10 +742,30 @@ func (api *API) StreamResponse(w http.ResponseWriter, next StreamIterator, errCh
}
return
}
if !ok { // but no error.
if !ok {
// nothing in the channel, check for errors
for err = range errCh {
if err == nil {
continue
}
st := http.StatusInternalServerError
w.WriteHeader(st)
errorResp := api.config.APIErrorFunc(err, st)
if err := enc.Encode(errorResp); err != nil {
api.config.Logger.Error(err)
}
// This is correct, here we just process
// the first error in the channel.
return
}
// No errors at all, then NoContent.
w.WriteHeader(http.StatusNoContent)
return
}
// There is at least one item and no error, start with
// a 200 response.
w.WriteHeader(http.StatusOK)
}
if err != nil {

View File

@ -741,6 +741,20 @@ This might be due to one or several causes:
}
}
// Wait for ipfs
logger.Info("Waiting for IPFS to be ready...")
select {
case <-ctx.Done():
return
case <-c.ipfs.Ready(ctx):
ipfsid, err := c.ipfs.ID(ctx)
if err != nil {
logger.Error("IPFS signaled ready but ID() errored: ", err)
} else {
logger.Infof("IPFS is ready. Peer ID: %s", ipfsid.ID)
}
}
close(c.readyCh)
c.shutdownLock.Lock()
c.readyB = true

View File

@ -22,8 +22,8 @@ import (
"github.com/ipfs-cluster/ipfs-cluster/version"
gopath "github.com/ipfs/go-path"
peer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p/core/peer"
)
type mockComponent struct {
@ -53,6 +53,12 @@ type mockConnector struct {
blocks sync.Map
}
func (ipfs *mockConnector) Ready(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}
func (ipfs *mockConnector) ID(ctx context.Context) (api.IPFSID, error) {
return api.IPFSID{
ID: test.PeerID1,

View File

@ -1,9 +1,11 @@
// Package ipfscluster implements a wrapper for the IPFS deamon which
// allows to orchestrate pinning operations among several IPFS nodes.
// Package ipfscluster is the heart of the IPFS Cluster implementation
// gluing together all the subcomponents and performing the core functionality.
//
// IPFS Cluster peers form a separate libp2p swarm. A Cluster peer uses
// multiple Cluster Components which perform different tasks like managing
// the underlying IPFS daemons, or providing APIs for external control.
// This package also provide the Cluster GO API through the Cluster object,
// which allows to programatically build and control a cluster.
//
// For an example on how to initialize components and cluster object, see
// cmd/ipfs-cluster-follow and cmd/ipfs-cluster-service.
package ipfscluster
import (
@ -12,8 +14,8 @@ import (
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs-cluster/ipfs-cluster/state"
peer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// Component represents a piece of ipfscluster. Cluster components
@ -73,6 +75,10 @@ type API interface {
// an IPFS daemon. This is a base component.
type IPFSConnector interface {
Component
//Ready provides a channel to notify when IPFS is ready. It allows the
//main cluster component to wait for IPFS to be in working state
//before starting full-fledge operations.
Ready(context.Context) <-chan struct{}
ID(context.Context) (api.IPFSID, error)
Pin(context.Context, api.Pin) error
Unpin(context.Context, api.Cid) error

View File

@ -53,6 +53,7 @@ type Connector struct {
ctx context.Context
cancel func()
ready chan struct{}
config *Config
nodeAddr string
@ -62,6 +63,9 @@ type Connector struct {
client *http.Client // client to ipfs daemon
failedRequests atomic.Uint64 // count failed requests.
reqRateLimitCh chan struct{}
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
@ -167,16 +171,20 @@ func NewConnector(cfg *Config) (*Connector, error) {
ctx, cancel := context.WithCancel(context.Background())
ipfs := &Connector{
ctx: ctx,
config: cfg,
cancel: cancel,
nodeAddr: nodeAddr,
rpcReady: make(chan struct{}, 1),
client: c,
ctx: ctx,
cancel: cancel,
ready: make(chan struct{}),
config: cfg,
nodeAddr: nodeAddr,
rpcReady: make(chan struct{}, 1),
reqRateLimitCh: make(chan struct{}),
client: c,
}
initializeMetrics(ctx)
go ipfs.rateLimiter()
go ipfs.run()
return ipfs, nil
}
@ -192,11 +200,77 @@ func initializeMetrics(ctx context.Context) {
stats.Record(ctx, observations.BlocksAddedError.M(0))
}
// rateLimiter issues ticks in the reqRateLimitCh that allow requests to
// proceed. See doPostCtx.
func (ipfs *Connector) rateLimiter() {
isRateLimiting := false
// TODO: The rate-limiter is configured to start rate-limiting after
// 10 failed requests at a rate of 1 req/s. This should probably be
// configurable.
for {
failed := ipfs.failedRequests.Load()
fmt.Println(failed)
switch {
case failed == 0:
if isRateLimiting {
// This does not print always,
// only when there were several requests
// waiting to read.
logger.Warning("Lifting up rate limit")
}
isRateLimiting = false
case failed > 0 && failed <= 10:
isRateLimiting = false
case failed > 10:
if !isRateLimiting {
logger.Warning("Rate-limiting requests to 1req/s")
}
isRateLimiting = true
time.Sleep(time.Second)
}
// Send tick
select {
case <-ipfs.ctx.Done():
close(ipfs.reqRateLimitCh)
return
case ipfs.reqRateLimitCh <- struct{}{}:
// note that the channel is unbuffered,
// therefore we will sit here until a method
// wants to read from us, and they don't if
// failed == 0.
}
}
}
// connects all ipfs daemons when
// we receive the rpcReady signal.
func (ipfs *Connector) run() {
<-ipfs.rpcReady
// wait for IPFS to be available
i := 0
for {
select {
case <-ipfs.ctx.Done():
return
default:
}
i++
_, err := ipfs.ID(ipfs.ctx)
if err == nil {
close(ipfs.ready)
break
}
if i%10 == 0 {
logger.Warningf("ipfs does not seem to be available after %d retries", i)
}
// Requests will be rate-limited when going faster.
time.Sleep(time.Second)
}
// Do not shutdown while launching threads
// -- prevents race conditions with ipfs.wg.
ipfs.shutdownLock.Lock()
@ -259,6 +333,12 @@ func (ipfs *Connector) Shutdown(ctx context.Context) error {
return nil
}
// Ready returns a channel which gets notified when a testing request to the
// IPFS daemon first succeeds.
func (ipfs *Connector) Ready(ctx context.Context) <-chan struct{} {
return ipfs.ready
}
// ID performs an ID request against the configured
// IPFS daemon. It returns the fetched information.
// If the request fails, or the parsing fails, it
@ -433,18 +513,14 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash api.Cid, maxDepth a
pinArgs := pinArgs(maxDepth)
path := fmt.Sprintf("pin/add?arg=%s&%s&progress=true", hash, pinArgs)
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil)
body, err := ipfs.postCtxStreamResponse(ctx, path, "", nil)
if err != nil {
return err
}
defer res.Body.Close()
defer body.Close()
_, err = checkResponse(path, res)
if err != nil {
return err
}
dec := json.NewDecoder(res.Body)
dec := json.NewDecoder(body)
for {
var pins ipfsPinsResp
if err := dec.Decode(&pins); err != nil {
@ -546,7 +622,6 @@ nextFilter:
path := "pin/ls?stream=true&type=" + typeFilter
bodies[i], err = ipfs.postCtxStreamResponse(ctx, path, "", nil)
if err != nil {
logger.Error("error querying pinset: %s", err)
return err
}
defer bodies[i].Close()
@ -623,90 +698,6 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, pin api.Pin) (api.IPFSPinSt
return res.Type, nil
}
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
logger.Debugf("posting /%s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path)
req, err := http.NewRequest("POST", urlstr, postBody)
if err != nil {
logger.Error("error creating POST request:", err)
}
req.Header.Set("Content-Type", contentType)
req = req.WithContext(ctx)
res, err := ipfs.client.Do(req)
if err != nil {
logger.Error("error posting to IPFS:", err)
}
return res, err
}
// checkResponse tries to parse an error message on non StatusOK responses
// from ipfs.
func checkResponse(path string, res *http.Response) ([]byte, error) {
if res.StatusCode == http.StatusOK {
return nil, nil
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err == nil {
var ipfsErr ipfsError
if err := json.Unmarshal(body, &ipfsErr); err == nil {
ipfsErr.code = res.StatusCode
ipfsErr.path = path
return body, ipfsErr
}
}
// No error response with useful message from ipfs
return nil, fmt.Errorf(
"IPFS request failed (is it running?) (%s). Code %d: %s",
path,
res.StatusCode,
string(body))
}
// postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.
func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) {
rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody)
if err != nil {
return nil, err
}
defer rdr.Close()
body, err := io.ReadAll(rdr)
if err != nil {
logger.Errorf("error reading response body: %s", err)
return nil, err
}
return body, nil
}
// postCtxStreamResponse makes a POST request against the ipfs daemon, and
// returns the body reader after checking the request for errros.
func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody)
if err != nil {
return nil, err
}
_, err = checkResponse(path, res)
if err != nil {
return nil, err
}
return res.Body, nil
}
// apiURL is a short-hand for building the url of the IPFS
// daemon API.
func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
}
// ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms(ctx context.Context) error {
@ -766,7 +757,6 @@ func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
defer cancel()
res, err := ipfs.postCtx(ctx, "config/show", "", nil)
if err != nil {
logger.Error(err)
return nil, err
}
@ -813,7 +803,6 @@ func (ipfs *Connector) RepoStat(ctx context.Context) (api.IPFSRepoStat, error) {
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat?size-only=true", "", nil)
if err != nil {
logger.Error(err)
return api.IPFSRepoStat{}, err
}
@ -834,14 +823,14 @@ func (ipfs *Connector) RepoGC(ctx context.Context) (api.RepoGC, error) {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.RepoGCTimeout)
defer cancel()
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), "repo/gc?stream-errors=true", "", nil)
body, err := ipfs.postCtxStreamResponse(ctx, "repo/gc?stream-errors=true", "", nil)
if err != nil {
logger.Error(err)
return api.RepoGC{}, err
}
defer res.Body.Close()
dec := json.NewDecoder(res.Body)
defer body.Close()
dec := json.NewDecoder(body)
repoGC := api.RepoGC{
Keys: []api.IPFSRepoGC{},
}
@ -886,7 +875,6 @@ func (ipfs *Connector) Resolve(ctx context.Context, path string) (api.Cid, error
defer cancel()
res, err := ipfs.postCtx(ctx, "resolve?arg="+url.QueryEscape(path), "", nil)
if err != nil {
logger.Error(err)
return api.CidUndef, err
}
@ -911,7 +899,6 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
res, err := ipfs.postCtx(ctx, "swarm/peers", "", nil)
if err != nil {
logger.Error(err)
return nil, err
}
var peersRaw ipfsSwarmPeersResp
@ -1234,3 +1221,103 @@ func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
}
return err
}
// daemon API.
func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
}
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
logger.Debugf("posting /%s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path)
req, err := http.NewRequest("POST", urlstr, postBody)
if err != nil {
logger.Error("error creating POST request:", err)
return nil, err
}
req.Header.Set("Content-Type", contentType)
req = req.WithContext(ctx)
// Rate limiter. If we have a number of failed requests,
// then wait for a tick.
if failed := ipfs.failedRequests.Load(); failed > 0 {
select {
case <-ipfs.reqRateLimitCh:
case <-ipfs.ctx.Done():
return nil, ctx.Err()
}
}
res, err := ipfs.client.Do(req)
if err != nil {
// request error: ipfs was unreachable, record it.
ipfs.failedRequests.Add(1)
logger.Error("error posting to IPFS:", err)
} else {
ipfs.failedRequests.Store(0)
}
return res, err
}
// checkResponse tries to parse an error message on non StatusOK responses
// from ipfs.
func checkResponse(path string, res *http.Response) ([]byte, error) {
if res.StatusCode == http.StatusOK {
return nil, nil
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err == nil {
var ipfsErr ipfsError
if err := json.Unmarshal(body, &ipfsErr); err == nil {
ipfsErr.code = res.StatusCode
ipfsErr.path = path
return body, ipfsErr
}
}
// No error response with useful message from ipfs
return nil, fmt.Errorf(
"IPFS request failed (is it running?) (%s). Code %d: %s",
path,
res.StatusCode,
string(body))
}
// postCtxStreamResponse makes a POST request against the ipfs daemon, and
// returns the body reader after checking the request for errros.
func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody)
if err != nil {
return nil, err
}
_, err = checkResponse(path, res)
if err != nil {
return nil, err
}
return res.Body, nil
}
// postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.
func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) {
rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody)
if err != nil {
return nil, err
}
defer rdr.Close()
body, err := io.ReadAll(rdr)
if err != nil {
logger.Errorf("error reading response body: %s", err)
return nil, err
}
return body, nil
}

View File

@ -15,8 +15,8 @@ import (
"github.com/ipfs-cluster/ipfs-cluster/state"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opencensus.io/trace"
)
@ -353,14 +353,17 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out
// At some point we need a full map of what we have and what
// we don't. The IPFS pinset is the smallest thing we can keep
// on memory.
ipfsPinsCh, err := spt.ipfsPins(ctx)
if err != nil {
logger.Error(err)
return err
}
ipfsPinsCh, errCh := spt.ipfsPins(ctx)
for ipfsPinInfo := range ipfsPinsCh {
ipfsRecursivePins[ipfsPinInfo.Cid] = ipfsPinInfo.Type
}
// If there was an error listing recursive pins then abort.
err := <-errCh
if err != nil {
err := fmt.Errorf("could not get pinset from IPFS: %w", err)
logger.Error(err)
return err
}
}
// Prepare pinset streaming
@ -645,14 +648,15 @@ func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi api.PinInfo) (api
return spt.Status(ctx, pi.Cid), nil
}
func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/ipfsStatusAll")
func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, <-chan error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/ipfspins")
defer span.End()
in := make(chan []string, 1) // type filter.
in <- []string{"recursive", "direct"}
close(in)
out := make(chan api.IPFSPinInfo, pinsChannelSize)
errCh := make(chan error)
go func() {
err := spt.rpcClient.Stream(
@ -663,11 +667,10 @@ func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, error
in,
out,
)
if err != nil {
logger.Error(err)
}
errCh <- err
close(errCh)
}()
return out, nil
return out, errCh
}
// PinQueueSize returns the current size of the pinning queue.