ipfs-cluster-service: extract some helpers to cmdutils

This will allow better re-use from other CLIs.
This commit is contained in:
Hector Sanjuan 2019-11-29 20:40:06 -06:00
parent 454efe83fd
commit 5cc9ed0c4f
5 changed files with 115 additions and 88 deletions

View File

@ -2,10 +2,7 @@ package main
import (
"context"
"os"
"os/signal"
"strings"
"syscall"
"time"
ipfscluster "github.com/ipfs/ipfs-cluster"
@ -60,7 +57,8 @@ func daemon(c *cli.Context) error {
defer locker.tryUnlock()
// Load all the configurations and identity
cfgHelper := loadConfigHelper()
cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
checkErr("loading configurations", err)
defer cfgHelper.Manager().Shutdown()
cfgs := cfgHelper.Configs()
@ -103,7 +101,7 @@ func daemon(c *cli.Context) error {
// will realize).
go bootstrap(ctx, cluster, bootstraps)
return handleSignals(ctx, cancel, cluster, host, dht)
return cmdutils.HandleSignals(ctx, cancel, cluster, host, dht)
}
// createCluster creates all the necessary things to produce the cluster
@ -217,61 +215,6 @@ func bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, bootstraps []m
}
}
func handleSignals(
ctx context.Context,
cancel context.CancelFunc,
cluster *ipfscluster.Cluster,
host host.Host,
dht *dht.IpfsDHT,
) error {
signalChan := make(chan os.Signal, 20)
signal.Notify(
signalChan,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGHUP,
)
var ctrlcCount int
for {
select {
case <-signalChan:
ctrlcCount++
handleCtrlC(ctx, cluster, ctrlcCount)
case <-cluster.Done():
cancel()
dht.Close()
host.Close()
return nil
}
}
}
func handleCtrlC(ctx context.Context, cluster *ipfscluster.Cluster, ctrlcCount int) {
switch ctrlcCount {
case 1:
go func() {
err := cluster.Shutdown(ctx)
checkErr("shutting down cluster", err)
}()
case 2:
out(`
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Shutdown is taking too long! Press Ctrl-c again to manually kill cluster.
Note that this may corrupt the local cluster state.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
`)
case 3:
out("exiting cluster NOW")
locker.tryUnlock()
os.Exit(-1)
}
}
func setupPinTracker(
name string,
h host.Host,

View File

@ -628,23 +628,14 @@ func yesNoPrompt(prompt string) bool {
return false
}
func loadConfigHelper() *cmdutils.ConfigHelper {
// Load all the configurations and identity
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "")
err := cfgHelper.LoadFromDisk()
checkErr("loading identity or configurations", err)
return cfgHelper
}
func getStateManager() cmdutils.StateManager {
cfgHelper := loadConfigHelper()
// since we won't save configs we can shutdown
cfgHelper.Manager().Shutdown()
mgr, err := cmdutils.NewStateManager(
cfgHelper.GetConsensus(),
cfgHelper.Identity(),
cfgHelper.Configs(),
cfgHelper, err := cmdutils.NewLoadedConfigHelper(
configPath,
identityPath,
)
checkErr("creating state manager", err)
checkErr("loading configurations", err)
cfgHelper.Manager().Shutdown()
mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper)
checkErr("loading state manager", err)
return mgr
}

View File

@ -3,9 +3,16 @@
package cmdutils
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
ipfscluster "github.com/ipfs/ipfs-cluster"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
ma "github.com/multiformats/go-multiaddr"
)
@ -56,3 +63,67 @@ func getPort(ln net.Listener, code int) int {
}
return 0
}
// HandleSignals orderly shutsdown an IPFS Cluster peer
// on SIGINT, SIGTERM, SIGHUP. It forces command termination
// on the 3rd-signal count.
func HandleSignals(
ctx context.Context,
cancel context.CancelFunc,
cluster *ipfscluster.Cluster,
host host.Host,
dht *dht.IpfsDHT,
) error {
signalChan := make(chan os.Signal, 20)
signal.Notify(
signalChan,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGHUP,
)
var ctrlcCount int
for {
select {
case <-signalChan:
ctrlcCount++
handleCtrlC(ctx, cluster, ctrlcCount)
case <-cluster.Done():
cancel()
dht.Close()
host.Close()
return nil
}
}
}
func handleCtrlC(ctx context.Context, cluster *ipfscluster.Cluster, ctrlcCount int) {
switch ctrlcCount {
case 1:
go func() {
if err := cluster.Shutdown(ctx); err != nil {
ErrorOut("error shutting down cluster: %s", err)
os.Exit(1)
}
}()
case 2:
ErrorOut(`
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Shutdown is taking too long! Press Ctrl-c again to manually kill cluster.
Note that this may corrupt the local cluster state.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
`)
case 3:
ErrorOut("exiting cluster NOW")
os.Exit(1)
}
}
// ErrorOut formats something and prints it to sdterr.
func ErrorOut(m string, a ...interface{}) {
fmt.Fprintf(os.Stderr, m, a...)
}

View File

@ -55,6 +55,7 @@ type ConfigHelper struct {
// NewConfigHelper creates a config helper given the paths to the
// configuration and identity files.
// Remember to Shutdown() the ConfigHelper.Manager() after use.
func NewConfigHelper(configPath, identityPath, consensus string) *ConfigHelper {
ch := &ConfigHelper{
configPath: configPath,
@ -65,6 +66,15 @@ func NewConfigHelper(configPath, identityPath, consensus string) *ConfigHelper {
return ch
}
// NewLoadedConfigHelper creates a config helper given the paths to the
// configuration and identity files and loads the configurations from disk.
// Remember to Shutdown() the ConfigHelper.Manager() after use.
func NewLoadedConfigHelper(configPath, identityPath string) (*ConfigHelper, error) {
cfgHelper := NewConfigHelper(configPath, identityPath, "")
err := cfgHelper.LoadFromDisk()
return cfgHelper, err
}
// LoadConfigFromDisk parses the configuration from disk.
func (ch *ConfigHelper) LoadConfigFromDisk() error {
return ch.manager.LoadJSONFileAndEnv(ch.configPath)

View File

@ -26,6 +26,7 @@ type StateManager interface {
ImportState(io.Reader) error
ExportState(io.Writer) error
GetStore() (ds.Datastore, error)
GetOfflineState(ds.Datastore) (state.State, error)
Clean() error
}
@ -44,6 +45,16 @@ func NewStateManager(consensus string, ident *config.Identity, cfgs *Configs) (S
}
}
// NewStateManagerWithHelper returns a state manager initialized using the
// configuration and identity provided by the given config helper.
func NewStateManagerWithHelper(cfgHelper *ConfigHelper) (StateManager, error) {
return NewStateManager(
cfgHelper.GetConsensus(),
cfgHelper.Identity(),
cfgHelper.Configs(),
)
}
type raftStateManager struct {
ident *config.Identity
cfgs *Configs
@ -53,7 +64,7 @@ func (raftsm *raftStateManager) GetStore() (ds.Datastore, error) {
return inmem.New(), nil
}
func (raftsm *raftStateManager) getOfflineState(store ds.Datastore) (state.State, error) {
func (raftsm *raftStateManager) GetOfflineState(store ds.Datastore) (state.State, error) {
return raft.OfflineState(raftsm.cfgs.Raft, store)
}
@ -68,7 +79,7 @@ func (raftsm *raftStateManager) ImportState(r io.Reader) error {
return err
}
defer store.Close()
st, err := raftsm.getOfflineState(store)
st, err := raftsm.GetOfflineState(store)
if err != nil {
return err
}
@ -90,7 +101,7 @@ func (raftsm *raftStateManager) ExportState(w io.Writer) error {
return err
}
defer store.Close()
st, err := raftsm.getOfflineState(store)
st, err := raftsm.GetOfflineState(store)
if err != nil {
return err
}
@ -114,7 +125,7 @@ func (crdtsm *crdtStateManager) GetStore() (ds.Datastore, error) {
return bds, nil
}
func (crdtsm *crdtStateManager) getOfflineState(store ds.Datastore) (state.BatchingState, error) {
func (crdtsm *crdtStateManager) GetOfflineState(store ds.Datastore) (state.State, error) {
return crdt.OfflineState(crdtsm.cfgs.Crdt, store)
}
@ -129,17 +140,18 @@ func (crdtsm *crdtStateManager) ImportState(r io.Reader) error {
return err
}
defer store.Close()
st, err := crdtsm.getOfflineState(store)
st, err := crdtsm.GetOfflineState(store)
if err != nil {
return err
}
batchingSt := st.(state.BatchingState)
err = importState(r, batchingSt)
if err != nil {
return err
}
err = importState(r, st)
if err != nil {
return err
}
return st.Commit(context.Background())
return batchingSt.Commit(context.Background())
}
func (crdtsm *crdtStateManager) ExportState(w io.Writer) error {
@ -148,7 +160,7 @@ func (crdtsm *crdtStateManager) ExportState(w io.Writer) error {
return err
}
defer store.Close()
st, err := crdtsm.getOfflineState(store)
st, err := crdtsm.GetOfflineState(store)
if err != nil {
return err
}