From a60a835e36da06c5e8f5d5590320c5421e87a1f1 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 15 Sep 2022 14:59:08 +0200 Subject: [PATCH] Wait for IPFS to be ready during start This commit introduces unlimited waiting on start until a request to `ipfs id` succeeds. Waiting has some consequences: * State watching (recover/sync) and metrics publishing does not start until ipfs is ready * swarm/connect is not triggered until ipfs is ready. Once the first request to ipfs succeeds everything goes to what it was before. This alleviates trying operations like sending our IDs in metrics when IPFS is simply not there. --- cluster.go | 14 ++++++++++++++ cluster_test.go | 8 +++++++- ipfscluster.go | 18 ++++++++++++------ ipfsconn/ipfshttp/ipfshttp.go | 31 ++++++++++++++++++++++++++++++- 4 files changed, 63 insertions(+), 8 deletions(-) diff --git a/cluster.go b/cluster.go index fd693f0d..d4bfea29 100644 --- a/cluster.go +++ b/cluster.go @@ -741,6 +741,20 @@ This might be due to one or several causes: } } + // Wait for ipfs + logger.Info("Waiting for IPFS to be ready...") + select { + case <-ctx.Done(): + return + case <-c.ipfs.Ready(ctx): + ipfsid, err := c.ipfs.ID(ctx) + if err != nil { + logger.Error("IPFS signaled ready but ID() errored: ", err) + } else { + logger.Infof("IPFS is ready. Peer ID: %s", ipfsid.ID) + } + } + close(c.readyCh) c.shutdownLock.Lock() c.readyB = true diff --git a/cluster_test.go b/cluster_test.go index 957b3aeb..cc220afe 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -22,8 +22,8 @@ import ( "github.com/ipfs-cluster/ipfs-cluster/version" gopath "github.com/ipfs/go-path" - peer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" + peer "github.com/libp2p/go-libp2p/core/peer" ) type mockComponent struct { @@ -53,6 +53,12 @@ type mockConnector struct { blocks sync.Map } +func (ipfs *mockConnector) Ready(ctx context.Context) <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + func (ipfs *mockConnector) ID(ctx context.Context) (api.IPFSID, error) { return api.IPFSID{ ID: test.PeerID1, diff --git a/ipfscluster.go b/ipfscluster.go index 361fc904..9d8d24c0 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -1,9 +1,11 @@ -// Package ipfscluster implements a wrapper for the IPFS deamon which -// allows to orchestrate pinning operations among several IPFS nodes. +// Package ipfscluster is the heart of the IPFS Cluster implementation +// gluing together all the subcomponents and performing the core functionality. // -// IPFS Cluster peers form a separate libp2p swarm. A Cluster peer uses -// multiple Cluster Components which perform different tasks like managing -// the underlying IPFS daemons, or providing APIs for external control. +// This package also provide the Cluster GO API through the Cluster object, +// which allows to programatically build and control a cluster. +// +// For an example on how to initialize components and cluster object, see +// cmd/ipfs-cluster-follow and cmd/ipfs-cluster-service. package ipfscluster import ( @@ -12,8 +14,8 @@ import ( "github.com/ipfs-cluster/ipfs-cluster/api" "github.com/ipfs-cluster/ipfs-cluster/state" - peer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" + peer "github.com/libp2p/go-libp2p/core/peer" ) // Component represents a piece of ipfscluster. Cluster components @@ -73,6 +75,10 @@ type API interface { // an IPFS daemon. This is a base component. type IPFSConnector interface { Component + //Ready provides a channel to notify when IPFS is ready. It allows the + //main cluster component to wait for IPFS to be in working state + //before starting full-fledge operations. + Ready(context.Context) <-chan struct{} ID(context.Context) (api.IPFSID, error) Pin(context.Context, api.Pin) error Unpin(context.Context, api.Cid) error diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index b459820f..b913c744 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -53,6 +53,7 @@ type Connector struct { ctx context.Context cancel func() + ready chan struct{} config *Config nodeAddr string @@ -168,8 +169,9 @@ func NewConnector(cfg *Config) (*Connector, error) { ipfs := &Connector{ ctx: ctx, - config: cfg, cancel: cancel, + ready: make(chan struct{}), + config: cfg, nodeAddr: nodeAddr, rpcReady: make(chan struct{}, 1), client: c, @@ -197,6 +199,27 @@ func initializeMetrics(ctx context.Context) { func (ipfs *Connector) run() { <-ipfs.rpcReady + // wait for IPFS to be available + i := 0 + for { + select { + case <-ipfs.ctx.Done(): + return + default: + } + i++ + _, err := ipfs.ID(ipfs.ctx) + if err == nil { + close(ipfs.ready) + break + } + if i%10 == 0 { + logger.Warningf("ipfs does not seem to be available after %d retries", i) + } + + time.Sleep(time.Second) + } + // Do not shutdown while launching threads // -- prevents race conditions with ipfs.wg. ipfs.shutdownLock.Lock() @@ -259,6 +282,12 @@ func (ipfs *Connector) Shutdown(ctx context.Context) error { return nil } +// Ready returns a channel which gets notified when a testing request to the +// IPFS daemon first succeeds. +func (ipfs *Connector) Ready(ctx context.Context) <-chan struct{} { + return ipfs.ready +} + // ID performs an ID request against the configured // IPFS daemon. It returns the fetched information. // If the request fails, or the parsing fails, it