diff --git a/cluster.go b/cluster.go index fd693f0d..d4bfea29 100644 --- a/cluster.go +++ b/cluster.go @@ -741,6 +741,20 @@ This might be due to one or several causes: } } + // Wait for ipfs + logger.Info("Waiting for IPFS to be ready...") + select { + case <-ctx.Done(): + return + case <-c.ipfs.Ready(ctx): + ipfsid, err := c.ipfs.ID(ctx) + if err != nil { + logger.Error("IPFS signaled ready but ID() errored: ", err) + } else { + logger.Infof("IPFS is ready. Peer ID: %s", ipfsid.ID) + } + } + close(c.readyCh) c.shutdownLock.Lock() c.readyB = true diff --git a/cluster_test.go b/cluster_test.go index 957b3aeb..cc220afe 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -22,8 +22,8 @@ import ( "github.com/ipfs-cluster/ipfs-cluster/version" gopath "github.com/ipfs/go-path" - peer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" + peer "github.com/libp2p/go-libp2p/core/peer" ) type mockComponent struct { @@ -53,6 +53,12 @@ type mockConnector struct { blocks sync.Map } +func (ipfs *mockConnector) Ready(ctx context.Context) <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + func (ipfs *mockConnector) ID(ctx context.Context) (api.IPFSID, error) { return api.IPFSID{ ID: test.PeerID1, diff --git a/ipfscluster.go b/ipfscluster.go index 361fc904..9d8d24c0 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -1,9 +1,11 @@ -// Package ipfscluster implements a wrapper for the IPFS deamon which -// allows to orchestrate pinning operations among several IPFS nodes. +// Package ipfscluster is the heart of the IPFS Cluster implementation +// gluing together all the subcomponents and performing the core functionality. // -// IPFS Cluster peers form a separate libp2p swarm. A Cluster peer uses -// multiple Cluster Components which perform different tasks like managing -// the underlying IPFS daemons, or providing APIs for external control. +// This package also provide the Cluster GO API through the Cluster object, +// which allows to programatically build and control a cluster. +// +// For an example on how to initialize components and cluster object, see +// cmd/ipfs-cluster-follow and cmd/ipfs-cluster-service. package ipfscluster import ( @@ -12,8 +14,8 @@ import ( "github.com/ipfs-cluster/ipfs-cluster/api" "github.com/ipfs-cluster/ipfs-cluster/state" - peer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" + peer "github.com/libp2p/go-libp2p/core/peer" ) // Component represents a piece of ipfscluster. Cluster components @@ -73,6 +75,10 @@ type API interface { // an IPFS daemon. This is a base component. type IPFSConnector interface { Component + //Ready provides a channel to notify when IPFS is ready. It allows the + //main cluster component to wait for IPFS to be in working state + //before starting full-fledge operations. + Ready(context.Context) <-chan struct{} ID(context.Context) (api.IPFSID, error) Pin(context.Context, api.Pin) error Unpin(context.Context, api.Cid) error diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index b459820f..b913c744 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -53,6 +53,7 @@ type Connector struct { ctx context.Context cancel func() + ready chan struct{} config *Config nodeAddr string @@ -168,8 +169,9 @@ func NewConnector(cfg *Config) (*Connector, error) { ipfs := &Connector{ ctx: ctx, - config: cfg, cancel: cancel, + ready: make(chan struct{}), + config: cfg, nodeAddr: nodeAddr, rpcReady: make(chan struct{}, 1), client: c, @@ -197,6 +199,27 @@ func initializeMetrics(ctx context.Context) { func (ipfs *Connector) run() { <-ipfs.rpcReady + // wait for IPFS to be available + i := 0 + for { + select { + case <-ipfs.ctx.Done(): + return + default: + } + i++ + _, err := ipfs.ID(ipfs.ctx) + if err == nil { + close(ipfs.ready) + break + } + if i%10 == 0 { + logger.Warningf("ipfs does not seem to be available after %d retries", i) + } + + time.Sleep(time.Second) + } + // Do not shutdown while launching threads // -- prevents race conditions with ipfs.wg. ipfs.shutdownLock.Lock() @@ -259,6 +282,12 @@ func (ipfs *Connector) Shutdown(ctx context.Context) error { return nil } +// Ready returns a channel which gets notified when a testing request to the +// IPFS daemon first succeeds. +func (ipfs *Connector) Ready(ctx context.Context) <-chan struct{} { + return ipfs.ready +} + // ID performs an ID request against the configured // IPFS daemon. It returns the fetched information. // If the request fails, or the parsing fails, it