diff --git a/Makefile b/Makefile index b1a9855c..1a5a99fc 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ deps: gx check: go vet ./... - golint -min_confidence 0.3 ./... + golint -set_exit_status -min_confidence 0.3 ./... test: deps go test -tags silent -v ./... diff --git a/cluster_config.go b/cluster_config.go index 04145cfa..ff572ca5 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -408,33 +408,3 @@ func clusterSecretToKey(secret []byte) (string, error) { return key.String(), nil } - -// BackupState backs up a state according to this configuration's options -//func (cfg *Config) BackupState(state state.State) error { -// if cfg.BaseDir == "" { -// msg := "ClusterConfig BaseDir unset. Skipping backup" -// logger.Warning(msg) -// return errors.New(msg) -// } - -// folder := filepath.Join(cfg.BaseDir, "backups") -// err := os.MkdirAll(folder, 0700) -// if err != nil { -// logger.Error(err) -// logger.Error("skipping backup") -// return errors.New("skipping backup") -// } -// fname := time.Now().UTC().Format("20060102_15:04:05") -// f, err := os.Create(filepath.Join(folder, fname)) -// if err != nil { -// logger.Error(err) -// return err -// } -// defer f.Close() -// err = state.Snapshot(f) -// if err != nil { -// logger.Error(err) -// return err -// } -// return nil -// } diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index 59be741b..82aaada0 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -277,7 +277,7 @@ func TestRaftLatestSnapshot(t *testing.T) { if err != nil { t.Fatal("Error while taking snapshot", err) } - err = snapState.Restore(r) + err = snapState.Migrate(r) if err != nil { t.Fatal("Snapshot bytes returned could not restore to state") } diff --git a/consensus/raft/raft.go b/consensus/raft/raft.go index a82c64b2..dde2db51 100644 --- a/consensus/raft/raft.go +++ b/consensus/raft/raft.go @@ -502,7 +502,7 @@ func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error { raftIndex = meta.Index raftTerm = meta.Term srvCfg = meta.Configuration - cleanupRaft(dataFolder) + CleanupRaft(dataFolder) } else { raftIndex = uint64(1) raftTerm = uint64(1) @@ -532,7 +532,8 @@ func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error { return nil } -func cleanupRaft(dataFolder string) error { +// CleanupRaft moves the current data folder to a backup location +func CleanupRaft(dataFolder string) error { dbh := newDataBackupHelper(dataFolder) err := dbh.makeBackup() if err != nil { @@ -545,7 +546,7 @@ func cleanupRaft(dataFolder string) error { // only call when Raft is shutdown func (rw *raftWrapper) Clean() error { - return cleanupRaft(rw.dataFolder) + return CleanupRaft(rw.dataFolder) } func find(s []string, elem string) bool { diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index dac809d4..474f76ca 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -4,6 +4,7 @@ import ( "bufio" "errors" "fmt" + "io" "os" "os/signal" "os/user" @@ -251,7 +252,6 @@ configuration. Name: "upgrade", Usage: "upgrade the IPFS Cluster state to the current version", Description: ` - This command upgrades the internal state of the ipfs-cluster node specified in the latest raft snapshot. The state format is migrated from the version of the snapshot to the version supported by the current cluster version. @@ -259,9 +259,121 @@ To successfully run an upgrade of an entire cluster, shut down each peer without removal, upgrade state using this command, and restart every peer. `, Action: func(c *cli.Context) error { - err := upgrade() + err := locker.lock() + checkErr("acquiring execution lock", err) + defer locker.tryUnlock() + + err = upgrade() checkErr("upgrading state", err) - return err + return nil + }, + }, + { + Name: "export", + Usage: "save the IPFS Cluster state to a json file", + Description: ` +This command reads the current cluster state and saves it as json for +human readability and editing. Only state formats compatible with this +version of ipfs-cluster-service can be exported. By default this command +prints the state to stdout. +`, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "file, f", + Value: "", + Usage: "sets an output file for exported state", + }, + }, + Action: func(c *cli.Context) error { + err := locker.lock() + checkErr("acquiring execution lock", err) + defer locker.tryUnlock() + + var w io.WriteCloser + outputPath := c.String("file") + if outputPath == "" { + // Output to stdout + w = os.Stdout + } else { + // Create the export file + w, err = os.Create(outputPath) + checkErr("creating output file", err) + } + defer w.Close() + + err = export(w) + checkErr("exporting state", err) + return nil + }, + }, + { + Name: "import", + Usage: "load an IPFS Cluster state from an exported state file", + Description: ` +This command reads in an exported state file storing the state as a persistent +snapshot to be loaded as the cluster state when the cluster peer is restarted. +If an argument is provided, cluster will treat it as the path of the file to +import. If no argument is provided cluster will read json from stdin +`, + Action: func(c *cli.Context) error { + err := locker.lock() + checkErr("acquiring execution lock", err) + defer locker.tryUnlock() + + if !c.GlobalBool("force") { + if !yesNoPrompt("The peer's state will be replaced. Run with -h for details. Continue? [y/n]:") { + return nil + } + } + + // Get the importing file path + importFile := c.Args().First() + var r io.ReadCloser + if importFile == "" { + r = os.Stdin + logger.Info("Reading from stdin, Ctrl-D to finish") + } else { + r, err = os.Open(importFile) + checkErr("reading import file", err) + } + defer r.Close() + err = stateImport(r) + checkErr("importing state", err) + logger.Info("the given state has been correctly imported to this peer. Make sure all peers have consistent states") + return nil + }, + }, + { + Name: "cleanup", + Usage: "cleanup persistent consensus state so cluster can start afresh", + Description: ` +This command removes the persistent state that is loaded on startup to determine this peer's view of the +cluster state. While it removes the existing state from the load path, one invocation does not permanently remove +this state from disk. This command renames cluster's data folder to .old.0, and rotates other +deprecated data folders to .old., etc for some rotation factor before permanatly deleting +the mth data folder (m currently defaults to 5) +`, + Action: func(c *cli.Context) error { + err := locker.lock() + checkErr("acquiring execution lock", err) + defer locker.tryUnlock() + + if !c.GlobalBool("force") { + if !yesNoPrompt("The peer's state will be removed from the load path. Existing pins may be lost. Continue? [y/n]:") { + return nil + } + } + + cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs() + err = cfg.LoadJSONFromFile(configPath) + checkErr("initializing configs", err) + + dataFolder := filepath.Join(consensusCfg.BaseDir, raft.DefaultDataSubFolder) + err = raft.CleanupRaft(dataFolder) + checkErr("Cleaning up consensus data", err) + logger.Warningf("the %s folder has been rotated. Next start will use an empty state", dataFolder) + + return nil }, }, }, @@ -476,6 +588,25 @@ func promptUser(msg string) string { return scanner.Text() } +// Lifted from go-ipfs/cmd/ipfs/daemon.go +func yesNoPrompt(prompt string) bool { + var s string + for i := 0; i < 3; i++ { + fmt.Printf("%s ", prompt) + fmt.Scanf("%s", &s) + switch s { + case "y", "Y": + return true + case "n", "N": + return false + case "": + return false + } + fmt.Println("Please press either 'y' or 'n'") + } + return false +} + func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *disk.Config, *numpin.Config) { cfg := config.NewManager() clusterCfg := &ipfscluster.Config{} diff --git a/ipfs-cluster-service/migrate.go b/ipfs-cluster-service/state.go similarity index 51% rename from ipfs-cluster-service/migrate.go rename to ipfs-cluster-service/state.go index 68d9d03a..0e4d71e7 100644 --- a/ipfs-cluster-service/migrate.go +++ b/ipfs-cluster-service/state.go @@ -1,54 +1,94 @@ package main import ( + "encoding/json" "errors" + "io" "io/ioutil" ipfscluster "github.com/ipfs/ipfs-cluster" + "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/state/mapstate" ) -func upgrade() error { - // Load configs - cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs() +var errNoSnapshot = errors.New("no snapshot found") - // Execution lock - err := locker.lock() +func upgrade() error { + newState, err := restoreStateFromDisk() if err != nil { return err } - defer locker.tryUnlock() + + cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs() err = cfg.LoadJSONFromFile(configPath) if err != nil { return err } - newState := mapstate.NewMapState() + return raft.SnapshotSave(consensusCfg, newState, clusterCfg.ID) +} + +func export(w io.Writer) error { + stateToExport, err := restoreStateFromDisk() + if err != nil { + return err + } + + return exportState(stateToExport, w) +} + +func restoreStateFromDisk() (*mapstate.MapState, error) { + cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs() + + err := cfg.LoadJSONFromFile(configPath) + if err != nil { + return nil, err + } - // Get the last state r, snapExists, err := raft.LastStateRaw(consensusCfg) - if err != nil { - return err - } if !snapExists { - logger.Error("no raft state currently exists to upgrade from") - return errors.New("no snapshot could be found") + err = errNoSnapshot + } + if err != nil { + return nil, err } - // Restore the state from snapshot - err = newState.Restore(r) + stateFromSnap := mapstate.NewMapState() + err = stateFromSnap.Migrate(r) + if err != nil { + return nil, err + } + + return stateFromSnap, nil + +} + +func stateImport(r io.Reader) error { + cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs() + + err := cfg.LoadJSONFromFile(configPath) if err != nil { return err } - // Reset with SnapshotSave - err = raft.SnapshotSave(consensusCfg, newState, clusterCfg.ID) + pinSerials := make([]api.PinSerial, 0) + dec := json.NewDecoder(r) + err = dec.Decode(&pinSerials) if err != nil { return err } - return nil + + stateToImport := mapstate.NewMapState() + for _, pS := range pinSerials { + err = stateToImport.Add(pS.ToPin()) + if err != nil { + return err + } + } + + return raft.SnapshotSave(consensusCfg, stateToImport, clusterCfg.ID) } func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error { @@ -56,10 +96,8 @@ func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error { r, snapExists, err := raft.LastStateRaw(cCfg) if !snapExists && err != nil { logger.Error("error before reading latest snapshot.") - return err } else if snapExists && err != nil { logger.Error("error after reading last snapshot. Snapshot potentially corrupt.") - return err } else if snapExists && err == nil { raw, err := ioutil.ReadAll(r) if err != nil { @@ -77,8 +115,23 @@ func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error { logger.Error("To launch a node without this state, rename the consensus data directory.") logger.Error("Hint, the default is .ipfs-cluster/ipfs-cluster-data.") logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - return errors.New("outdated state version stored") + err = errors.New("outdated state version stored") } } // !snapExists && err == nil // no existing state, no check needed - return nil + return err +} + +// ExportState saves a json representation of a state +func exportState(state *mapstate.MapState, w io.Writer) error { + // Serialize pins + pins := state.List() + pinSerials := make([]api.PinSerial, len(pins), len(pins)) + for i, pin := range pins { + pinSerials[i] = pin.ToSerial() + } + + // Write json to output file + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + return enc.Encode(pinSerials) } diff --git a/sharness/lib/test-lib.sh b/sharness/lib/test-lib.sh index 066c20e1..817d4219 100755 --- a/sharness/lib/test-lib.sh +++ b/sharness/lib/test-lib.sh @@ -7,7 +7,7 @@ SHARNESS_LIB="lib/sharness/sharness.sh" # Daemons output will be redirected to... IPFS_OUTPUT="/dev/null" -# IPFS_OUTPUT="/dev/stderr" # uncomment for debugging +#IPFS_OUTPUT="/dev/stderr" # uncomment for debugging . "$SHARNESS_LIB" || { echo >&2 "Cannot source: $SHARNESS_LIB" @@ -98,6 +98,14 @@ test_confirm_v1State() { fi } +test_confirm_importState() { + IMP_STATE_PATH="../test_data/importState" + if [ -f $IMP_STATE_PATH ]; then + cp $IMP_STATE_PATH importState + test_set_prereq IMPORTSTATE + fi +} + cluster_kill(){ kill -1 "$CLUSTER_D_PID" while pgrep ipfs-cluster-service >/dev/null; do diff --git a/sharness/t0051-service-state-upgrade-from-old.sh b/sharness/t0051-service-state-upgrade-from-old.sh index 2b7c95ea..f63a4b3b 100755 --- a/sharness/t0051-service-state-upgrade-from-old.sh +++ b/sharness/t0051-service-state-upgrade-from-old.sh @@ -1,7 +1,6 @@ #!/bin/bash test_description="Test service state upgrade v1 -> v2 and v2 -> v2" -IPFS_OUTPUT="/dev/stderr" # change for debugging . lib/test-lib.sh diff --git a/sharness/t0052-service-state-export.sh b/sharness/t0052-service-state-export.sh new file mode 100755 index 00000000..d00c67d0 --- /dev/null +++ b/sharness/t0052-service-state-export.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +test_description="Test service state export" + +. lib/test-lib.sh + +test_ipfs_init +cleanup test_clean_ipfs +test_cluster_init +cleanup test_clean_cluster + + +test_expect_success IPFS,CLUSTER "state export fails without snapshots" ' + cluster_kill && sleep 5 && + test_expect_code 1 ipfs-cluster-service --debug --config "test-config" state export +' + +test_clean_cluster +test_cluster_init + +test_expect_success IPFS,CLUSTER,JQ "state export saves the correct state to expected file" ' + cid=`docker exec ipfs sh -c "echo test_52 | ipfs add -q"` && + ipfs-cluster-ctl pin add "$cid" && + sleep 5 && + cluster_kill && sleep 5 && + ipfs-cluster-service --debug --config "test-config" state export -f export.json && + [ -f export.json ] && + jq ".[].cid" export.json | grep -q "$cid" +' + +test_done diff --git a/sharness/t0053-service-state-import.sh b/sharness/t0053-service-state-import.sh new file mode 100755 index 00000000..6e89ac61 --- /dev/null +++ b/sharness/t0053-service-state-import.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +test_description="Test service state import" + +. lib/test-lib.sh + +test_ipfs_init +cleanup test_clean_ipfs +test_cluster_init +cleanup test_clean_cluster +test_confirm_importState + +# Kill cluster daemon but keep data folder +cluster_kill + +test_expect_success IPFS,CLUSTER "state import fails on incorrect format" ' + sleep 5 && + echo "not exactly json" > badImportFile && + test_expect_code 1 ipfs-cluster-service -f --config "test-config" state import badImportFile +' + +test_expect_success IPFS,CLUSTER,IMPORTSTATE "state import succeeds on correct format" ' + cid=`docker exec ipfs sh -c "echo test_53 | ipfs add -q"` && + ipfs-cluster-service -f --debug --config "test-config" state import importState && + cluster_start && + sleep 5 && + ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" && + ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" +' + +test_done diff --git a/sharness/t0054-service-state-clean.sh b/sharness/t0054-service-state-clean.sh new file mode 100755 index 00000000..720327d7 --- /dev/null +++ b/sharness/t0054-service-state-clean.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +test_description="Test service state import" + +. lib/test-lib.sh + +test_ipfs_init +cleanup test_clean_ipfs +test_cluster_init +cleanup test_clean_cluster + +test_expect_success IPFS,CLUSTER "state cleanup refreshes state on restart" ' + cid=`docker exec ipfs sh -c "echo test_54 | ipfs add -q"` && + ipfs-cluster-ctl pin add "$cid" && sleep 5 && + ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" && + ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" && + [ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] && + cluster_kill && sleep 5 && + ipfs-cluster-service -f --debug --config "test-config" state cleanup && + cluster_start && sleep 5 && + [ 0 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] +' + +test_expect_success IPFS,CLUSTER "export + cleanup + import == noop" ' + cid=`docker exec ipfs sh -c "echo test_54 | ipfs add -q"` && + ipfs-cluster-ctl pin add "$cid" && sleep 5 && + [ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] && + cluster_kill && sleep 5 && + ipfs-cluster-service --debug --config "test-config" state export -f import.json && + ipfs-cluster-service -f --debug --config "test-config" state cleanup && + ipfs-cluster-service -f --debug --config "test-config" state import import.json && + cluster_start && sleep 5 && + ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" && + ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" && + [ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] +' + +test_done diff --git a/sharness/test_data/importState b/sharness/test_data/importState new file mode 100644 index 00000000..4ae07ba5 --- /dev/null +++ b/sharness/test_data/importState @@ -0,0 +1,8 @@ +[ + { + "cid": "QmbrCtydGyPeHiLURSPMqrvE5mCgMCwFYq3UD4XLCeAYw6", + "name": "", + "allocations": [], + "replication_factor": -1 + } +] \ No newline at end of file diff --git a/state/interface.go b/state/interface.go index ebeced25..79cfde20 100644 --- a/state/interface.go +++ b/state/interface.go @@ -23,10 +23,8 @@ type State interface { Has(*cid.Cid) bool // Get returns the information attacthed to this pin Get(*cid.Cid) api.Pin - // Snapshot writes a snapshot of the state to a writer - Snapshot(w io.Writer) error - // Restore restores an outdated state to the current version - Restore(r io.Reader) error + // Migrate restores the serialized format of an outdated state to the current version + Migrate(r io.Reader) error // Return the version of this state GetVersion() int // Marshal serializes the state to a byte slice diff --git a/state/mapstate/map_state.go b/state/mapstate/map_state.go index b2b39c56..ab5f844d 100644 --- a/state/mapstate/map_state.go +++ b/state/mapstate/map_state.go @@ -4,7 +4,6 @@ package mapstate import ( "bytes" - "encoding/json" "io" "io/ioutil" "sync" @@ -87,19 +86,9 @@ func (st *MapState) List() []api.Pin { return cids } -// Snapshot dumps the MapState to the given writer, in pretty json -// format. -func (st *MapState) Snapshot(w io.Writer) error { - st.pinMux.RLock() - defer st.pinMux.RUnlock() - enc := json.NewEncoder(w) - enc.SetIndent("", " ") - return enc.Encode(st) -} - -// Restore restores a snapshot from the state's internal bytes. It should -// migrate the format if it is not compatible with the current version. -func (st *MapState) Restore(r io.Reader) error { +// Migrate restores a snapshot from the state's internal bytes and if +// necessary migrates the format to the current version. +func (st *MapState) Migrate(r io.Reader) error { bs, err := ioutil.ReadAll(r) if err != nil { return err diff --git a/state/mapstate/map_state_test.go b/state/mapstate/map_state_test.go index 3fca31c2..6888e915 100644 --- a/state/mapstate/map_state_test.go +++ b/state/mapstate/map_state_test.go @@ -118,7 +118,7 @@ func TestMigrateFromV1(t *testing.T) { } // Migrate state to current version r := bytes.NewBuffer(v1Bytes) - err = ms.Restore(r) + err = ms.Migrate(r) if err != nil { t.Error(err) }