Merge pull request #970 from ipfs/feat/service-extract-to-cmdutils

Service: extract some methods to cmdutils
This commit is contained in:
Hector Sanjuan 2019-12-06 11:08:58 +01:00 committed by GitHub
commit f587b453c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 119 additions and 92 deletions

View File

@ -2,10 +2,7 @@ package main
import (
"context"
"os"
"os/signal"
"strings"
"syscall"
"time"
ipfscluster "github.com/ipfs/ipfs-cluster"
@ -60,7 +57,8 @@ func daemon(c *cli.Context) error {
defer locker.tryUnlock()
// Load all the configurations and identity
cfgHelper := loadConfigHelper()
cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
checkErr("loading configurations", err)
defer cfgHelper.Manager().Shutdown()
cfgs := cfgHelper.Configs()
@ -103,7 +101,7 @@ func daemon(c *cli.Context) error {
// will realize).
go bootstrap(ctx, cluster, bootstraps)
return handleSignals(ctx, cancel, cluster, host, dht)
return cmdutils.HandleSignals(ctx, cancel, cluster, host, dht)
}
// createCluster creates all the necessary things to produce the cluster
@ -227,61 +225,6 @@ func bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, bootstraps []m
}
}
func handleSignals(
ctx context.Context,
cancel context.CancelFunc,
cluster *ipfscluster.Cluster,
host host.Host,
dht *dht.IpfsDHT,
) error {
signalChan := make(chan os.Signal, 20)
signal.Notify(
signalChan,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGHUP,
)
var ctrlcCount int
for {
select {
case <-signalChan:
ctrlcCount++
handleCtrlC(ctx, cluster, ctrlcCount)
case <-cluster.Done():
cancel()
dht.Close()
host.Close()
return nil
}
}
}
func handleCtrlC(ctx context.Context, cluster *ipfscluster.Cluster, ctrlcCount int) {
switch ctrlcCount {
case 1:
go func() {
err := cluster.Shutdown(ctx)
checkErr("shutting down cluster", err)
}()
case 2:
out(`
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Shutdown is taking too long! Press Ctrl-c again to manually kill cluster.
Note that this may corrupt the local cluster state.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
`)
case 3:
out("exiting cluster NOW")
locker.tryUnlock()
os.Exit(-1)
}
}
func setupPinTracker(
name string,
h host.Host,

View File

@ -25,7 +25,7 @@ import (
)
// ProgramName of this application
const programName = `ipfs-cluster-service`
const programName = "ipfs-cluster-service"
// flag defaults
const (
@ -44,13 +44,13 @@ var commit string
// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
%s runs an IPFS Cluster node.
%s runs an IPFS Cluster peer.
A node participates in the cluster consensus, follows a distributed log
A peer participates in the cluster consensus, follows a distributed log
of pinning and unpinning requests and manages pinning operations to a
configured IPFS daemon.
This node also provides an API for cluster management, an IPFS Proxy API which
This peer also provides an API for cluster management, an IPFS Proxy API which
forwards requests to IPFS and a number of components for internal communication
using LibP2P. This is a simplified view of the components:
@ -170,7 +170,7 @@ func checkErr(doing string, err error, args ...interface{}) {
func main() {
app := cli.NewApp()
app.Name = programName
app.Usage = "IPFS Cluster node"
app.Usage = "IPFS Cluster peer"
app.Description = Description
//app.Copyright = "© Protocol Labs, Inc."
app.Version = version.Version.String()
@ -628,23 +628,14 @@ func yesNoPrompt(prompt string) bool {
return false
}
func loadConfigHelper() *cmdutils.ConfigHelper {
// Load all the configurations and identity
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "")
err := cfgHelper.LoadFromDisk()
checkErr("loading identity or configurations", err)
return cfgHelper
}
func getStateManager() cmdutils.StateManager {
cfgHelper := loadConfigHelper()
// since we won't save configs we can shutdown
cfgHelper.Manager().Shutdown()
mgr, err := cmdutils.NewStateManager(
cfgHelper.GetConsensus(),
cfgHelper.Identity(),
cfgHelper.Configs(),
cfgHelper, err := cmdutils.NewLoadedConfigHelper(
configPath,
identityPath,
)
checkErr("loading configurations", err)
cfgHelper.Manager().Shutdown()
mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper)
checkErr("creating state manager", err)
return mgr
}

View File

@ -3,9 +3,16 @@
package cmdutils
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
ipfscluster "github.com/ipfs/ipfs-cluster"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
ma "github.com/multiformats/go-multiaddr"
)
@ -56,3 +63,67 @@ func getPort(ln net.Listener, code int) int {
}
return 0
}
// HandleSignals orderly shuts down an IPFS Cluster peer
// on SIGINT, SIGTERM, SIGHUP. It forces command termination
// on the 3rd-signal count.
func HandleSignals(
ctx context.Context,
cancel context.CancelFunc,
cluster *ipfscluster.Cluster,
host host.Host,
dht *dht.IpfsDHT,
) error {
signalChan := make(chan os.Signal, 20)
signal.Notify(
signalChan,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGHUP,
)
var ctrlcCount int
for {
select {
case <-signalChan:
ctrlcCount++
handleCtrlC(ctx, cluster, ctrlcCount)
case <-cluster.Done():
cancel()
dht.Close()
host.Close()
return nil
}
}
}
func handleCtrlC(ctx context.Context, cluster *ipfscluster.Cluster, ctrlcCount int) {
switch ctrlcCount {
case 1:
go func() {
if err := cluster.Shutdown(ctx); err != nil {
ErrorOut("error shutting down cluster: %s", err)
os.Exit(1)
}
}()
case 2:
ErrorOut(`
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Shutdown is taking too long! Press Ctrl-c again to manually kill cluster.
Note that this may corrupt the local cluster state.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
`)
case 3:
ErrorOut("exiting cluster NOW")
os.Exit(1)
}
}
// ErrorOut formats something and prints it to sdterr.
func ErrorOut(m string, a ...interface{}) {
fmt.Fprintf(os.Stderr, m, a...)
}

View File

@ -55,6 +55,7 @@ type ConfigHelper struct {
// NewConfigHelper creates a config helper given the paths to the
// configuration and identity files.
// Remember to Shutdown() the ConfigHelper.Manager() after use.
func NewConfigHelper(configPath, identityPath, consensus string) *ConfigHelper {
ch := &ConfigHelper{
configPath: configPath,
@ -65,6 +66,15 @@ func NewConfigHelper(configPath, identityPath, consensus string) *ConfigHelper {
return ch
}
// NewLoadedConfigHelper creates a config helper given the paths to the
// configuration and identity files and loads the configurations from disk.
// Remember to Shutdown() the ConfigHelper.Manager() after use.
func NewLoadedConfigHelper(configPath, identityPath string) (*ConfigHelper, error) {
cfgHelper := NewConfigHelper(configPath, identityPath, "")
err := cfgHelper.LoadFromDisk()
return cfgHelper, err
}
// LoadConfigFromDisk parses the configuration from disk.
func (ch *ConfigHelper) LoadConfigFromDisk() error {
return ch.manager.LoadJSONFileAndEnv(ch.configPath)

View File

@ -26,6 +26,7 @@ type StateManager interface {
ImportState(io.Reader) error
ExportState(io.Writer) error
GetStore() (ds.Datastore, error)
GetOfflineState(ds.Datastore) (state.State, error)
Clean() error
}
@ -44,6 +45,16 @@ func NewStateManager(consensus string, ident *config.Identity, cfgs *Configs) (S
}
}
// NewStateManagerWithHelper returns a state manager initialized using the
// configuration and identity provided by the given config helper.
func NewStateManagerWithHelper(cfgHelper *ConfigHelper) (StateManager, error) {
return NewStateManager(
cfgHelper.GetConsensus(),
cfgHelper.Identity(),
cfgHelper.Configs(),
)
}
type raftStateManager struct {
ident *config.Identity
cfgs *Configs
@ -53,7 +64,7 @@ func (raftsm *raftStateManager) GetStore() (ds.Datastore, error) {
return inmem.New(), nil
}
func (raftsm *raftStateManager) getOfflineState(store ds.Datastore) (state.State, error) {
func (raftsm *raftStateManager) GetOfflineState(store ds.Datastore) (state.State, error) {
return raft.OfflineState(raftsm.cfgs.Raft, store)
}
@ -68,7 +79,7 @@ func (raftsm *raftStateManager) ImportState(r io.Reader) error {
return err
}
defer store.Close()
st, err := raftsm.getOfflineState(store)
st, err := raftsm.GetOfflineState(store)
if err != nil {
return err
}
@ -90,7 +101,7 @@ func (raftsm *raftStateManager) ExportState(w io.Writer) error {
return err
}
defer store.Close()
st, err := raftsm.getOfflineState(store)
st, err := raftsm.GetOfflineState(store)
if err != nil {
return err
}
@ -114,7 +125,7 @@ func (crdtsm *crdtStateManager) GetStore() (ds.Datastore, error) {
return bds, nil
}
func (crdtsm *crdtStateManager) getOfflineState(store ds.Datastore) (state.BatchingState, error) {
func (crdtsm *crdtStateManager) GetOfflineState(store ds.Datastore) (state.State, error) {
return crdt.OfflineState(crdtsm.cfgs.Crdt, store)
}
@ -129,17 +140,18 @@ func (crdtsm *crdtStateManager) ImportState(r io.Reader) error {
return err
}
defer store.Close()
st, err := crdtsm.getOfflineState(store)
st, err := crdtsm.GetOfflineState(store)
if err != nil {
return err
}
batchingSt := st.(state.BatchingState)
err = importState(r, batchingSt)
if err != nil {
return err
}
err = importState(r, st)
if err != nil {
return err
}
return st.Commit(context.Background())
return batchingSt.Commit(context.Background())
}
func (crdtsm *crdtStateManager) ExportState(w io.Writer) error {
@ -148,7 +160,7 @@ func (crdtsm *crdtStateManager) ExportState(w io.Writer) error {
return err
}
defer store.Close()
st, err := crdtsm.getOfflineState(store)
st, err := crdtsm.GetOfflineState(store)
if err != nil {
return err
}