Wait for IPFS to be ready during start
This commit introduces unlimited waiting on start until a request to `ipfs id` succeeds. Waiting has some consequences: * State watching (recover/sync) and metrics publishing does not start until ipfs is ready * swarm/connect is not triggered until ipfs is ready. Once the first request to ipfs succeeds everything goes to what it was before. This alleviates trying operations like sending our IDs in metrics when IPFS is simply not there.
This commit is contained in:
parent
b80f89dd01
commit
a60a835e36
14
cluster.go
14
cluster.go
|
@ -741,6 +741,20 @@ This might be due to one or several causes:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for ipfs
|
||||||
|
logger.Info("Waiting for IPFS to be ready...")
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-c.ipfs.Ready(ctx):
|
||||||
|
ipfsid, err := c.ipfs.ID(ctx)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("IPFS signaled ready but ID() errored: ", err)
|
||||||
|
} else {
|
||||||
|
logger.Infof("IPFS is ready. Peer ID: %s", ipfsid.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
close(c.readyCh)
|
close(c.readyCh)
|
||||||
c.shutdownLock.Lock()
|
c.shutdownLock.Lock()
|
||||||
c.readyB = true
|
c.readyB = true
|
||||||
|
|
|
@ -22,8 +22,8 @@ import (
|
||||||
"github.com/ipfs-cluster/ipfs-cluster/version"
|
"github.com/ipfs-cluster/ipfs-cluster/version"
|
||||||
|
|
||||||
gopath "github.com/ipfs/go-path"
|
gopath "github.com/ipfs/go-path"
|
||||||
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
||||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||||
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockComponent struct {
|
type mockComponent struct {
|
||||||
|
@ -53,6 +53,12 @@ type mockConnector struct {
|
||||||
blocks sync.Map
|
blocks sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ipfs *mockConnector) Ready(ctx context.Context) <-chan struct{} {
|
||||||
|
ch := make(chan struct{})
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
func (ipfs *mockConnector) ID(ctx context.Context) (api.IPFSID, error) {
|
func (ipfs *mockConnector) ID(ctx context.Context) (api.IPFSID, error) {
|
||||||
return api.IPFSID{
|
return api.IPFSID{
|
||||||
ID: test.PeerID1,
|
ID: test.PeerID1,
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
// Package ipfscluster implements a wrapper for the IPFS deamon which
|
// Package ipfscluster is the heart of the IPFS Cluster implementation
|
||||||
// allows to orchestrate pinning operations among several IPFS nodes.
|
// gluing together all the subcomponents and performing the core functionality.
|
||||||
//
|
//
|
||||||
// IPFS Cluster peers form a separate libp2p swarm. A Cluster peer uses
|
// This package also provide the Cluster GO API through the Cluster object,
|
||||||
// multiple Cluster Components which perform different tasks like managing
|
// which allows to programatically build and control a cluster.
|
||||||
// the underlying IPFS daemons, or providing APIs for external control.
|
//
|
||||||
|
// For an example on how to initialize components and cluster object, see
|
||||||
|
// cmd/ipfs-cluster-follow and cmd/ipfs-cluster-service.
|
||||||
package ipfscluster
|
package ipfscluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -12,8 +14,8 @@ import (
|
||||||
"github.com/ipfs-cluster/ipfs-cluster/api"
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
||||||
"github.com/ipfs-cluster/ipfs-cluster/state"
|
"github.com/ipfs-cluster/ipfs-cluster/state"
|
||||||
|
|
||||||
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
||||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||||
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Component represents a piece of ipfscluster. Cluster components
|
// Component represents a piece of ipfscluster. Cluster components
|
||||||
|
@ -73,6 +75,10 @@ type API interface {
|
||||||
// an IPFS daemon. This is a base component.
|
// an IPFS daemon. This is a base component.
|
||||||
type IPFSConnector interface {
|
type IPFSConnector interface {
|
||||||
Component
|
Component
|
||||||
|
//Ready provides a channel to notify when IPFS is ready. It allows the
|
||||||
|
//main cluster component to wait for IPFS to be in working state
|
||||||
|
//before starting full-fledge operations.
|
||||||
|
Ready(context.Context) <-chan struct{}
|
||||||
ID(context.Context) (api.IPFSID, error)
|
ID(context.Context) (api.IPFSID, error)
|
||||||
Pin(context.Context, api.Pin) error
|
Pin(context.Context, api.Pin) error
|
||||||
Unpin(context.Context, api.Cid) error
|
Unpin(context.Context, api.Cid) error
|
||||||
|
|
|
@ -53,6 +53,7 @@ type Connector struct {
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
|
ready chan struct{}
|
||||||
|
|
||||||
config *Config
|
config *Config
|
||||||
nodeAddr string
|
nodeAddr string
|
||||||
|
@ -168,8 +169,9 @@ func NewConnector(cfg *Config) (*Connector, error) {
|
||||||
|
|
||||||
ipfs := &Connector{
|
ipfs := &Connector{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
config: cfg,
|
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
ready: make(chan struct{}),
|
||||||
|
config: cfg,
|
||||||
nodeAddr: nodeAddr,
|
nodeAddr: nodeAddr,
|
||||||
rpcReady: make(chan struct{}, 1),
|
rpcReady: make(chan struct{}, 1),
|
||||||
client: c,
|
client: c,
|
||||||
|
@ -197,6 +199,27 @@ func initializeMetrics(ctx context.Context) {
|
||||||
func (ipfs *Connector) run() {
|
func (ipfs *Connector) run() {
|
||||||
<-ipfs.rpcReady
|
<-ipfs.rpcReady
|
||||||
|
|
||||||
|
// wait for IPFS to be available
|
||||||
|
i := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ipfs.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
_, err := ipfs.ID(ipfs.ctx)
|
||||||
|
if err == nil {
|
||||||
|
close(ipfs.ready)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if i%10 == 0 {
|
||||||
|
logger.Warningf("ipfs does not seem to be available after %d retries", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
// Do not shutdown while launching threads
|
// Do not shutdown while launching threads
|
||||||
// -- prevents race conditions with ipfs.wg.
|
// -- prevents race conditions with ipfs.wg.
|
||||||
ipfs.shutdownLock.Lock()
|
ipfs.shutdownLock.Lock()
|
||||||
|
@ -259,6 +282,12 @@ func (ipfs *Connector) Shutdown(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ready returns a channel which gets notified when a testing request to the
|
||||||
|
// IPFS daemon first succeeds.
|
||||||
|
func (ipfs *Connector) Ready(ctx context.Context) <-chan struct{} {
|
||||||
|
return ipfs.ready
|
||||||
|
}
|
||||||
|
|
||||||
// ID performs an ID request against the configured
|
// ID performs an ID request against the configured
|
||||||
// IPFS daemon. It returns the fetched information.
|
// IPFS daemon. It returns the fetched information.
|
||||||
// If the request fails, or the parsing fails, it
|
// If the request fails, or the parsing fails, it
|
||||||
|
|
Loading…
Reference in New Issue
Block a user