Add and refine cli interface for cluster state

Added import, export, cleanup.
Changed state interface.
New sharness tests.

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2017-12-19 12:05:32 -05:00
parent 0f3391b015
commit 8361b8afe4
15 changed files with 338 additions and 81 deletions

View File

@ -65,7 +65,7 @@ deps: gx
check:
go vet ./...
golint -min_confidence 0.3 ./...
golint -set_exit_status -min_confidence 0.3 ./...
test: deps
go test -tags silent -v ./...

View File

@ -408,33 +408,3 @@ func clusterSecretToKey(secret []byte) (string, error) {
return key.String(), nil
}
// BackupState backs up a state according to this configuration's options
//func (cfg *Config) BackupState(state state.State) error {
// if cfg.BaseDir == "" {
// msg := "ClusterConfig BaseDir unset. Skipping backup"
// logger.Warning(msg)
// return errors.New(msg)
// }
// folder := filepath.Join(cfg.BaseDir, "backups")
// err := os.MkdirAll(folder, 0700)
// if err != nil {
// logger.Error(err)
// logger.Error("skipping backup")
// return errors.New("skipping backup")
// }
// fname := time.Now().UTC().Format("20060102_15:04:05")
// f, err := os.Create(filepath.Join(folder, fname))
// if err != nil {
// logger.Error(err)
// return err
// }
// defer f.Close()
// err = state.Snapshot(f)
// if err != nil {
// logger.Error(err)
// return err
// }
// return nil
// }

View File

@ -277,7 +277,7 @@ func TestRaftLatestSnapshot(t *testing.T) {
if err != nil {
t.Fatal("Error while taking snapshot", err)
}
err = snapState.Restore(r)
err = snapState.Migrate(r)
if err != nil {
t.Fatal("Snapshot bytes returned could not restore to state")
}

View File

@ -502,7 +502,7 @@ func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error {
raftIndex = meta.Index
raftTerm = meta.Term
srvCfg = meta.Configuration
cleanupRaft(dataFolder)
CleanupRaft(dataFolder)
} else {
raftIndex = uint64(1)
raftTerm = uint64(1)
@ -532,7 +532,8 @@ func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error {
return nil
}
func cleanupRaft(dataFolder string) error {
// CleanupRaft moves the current data folder to a backup location
func CleanupRaft(dataFolder string) error {
dbh := newDataBackupHelper(dataFolder)
err := dbh.makeBackup()
if err != nil {
@ -545,7 +546,7 @@ func cleanupRaft(dataFolder string) error {
// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
return cleanupRaft(rw.dataFolder)
return CleanupRaft(rw.dataFolder)
}
func find(s []string, elem string) bool {

View File

@ -4,6 +4,7 @@ import (
"bufio"
"errors"
"fmt"
"io"
"os"
"os/signal"
"os/user"
@ -251,7 +252,6 @@ configuration.
Name: "upgrade",
Usage: "upgrade the IPFS Cluster state to the current version",
Description: `
This command upgrades the internal state of the ipfs-cluster node
specified in the latest raft snapshot. The state format is migrated from the
version of the snapshot to the version supported by the current cluster version.
@ -259,9 +259,121 @@ To successfully run an upgrade of an entire cluster, shut down each peer without
removal, upgrade state using this command, and restart every peer.
`,
Action: func(c *cli.Context) error {
err := upgrade()
err := locker.lock()
checkErr("acquiring execution lock", err)
defer locker.tryUnlock()
err = upgrade()
checkErr("upgrading state", err)
return err
return nil
},
},
{
Name: "export",
Usage: "save the IPFS Cluster state to a json file",
Description: `
This command reads the current cluster state and saves it as json for
human readability and editing. Only state formats compatible with this
version of ipfs-cluster-service can be exported. By default this command
prints the state to stdout.
`,
Flags: []cli.Flag{
cli.StringFlag{
Name: "file, f",
Value: "",
Usage: "sets an output file for exported state",
},
},
Action: func(c *cli.Context) error {
err := locker.lock()
checkErr("acquiring execution lock", err)
defer locker.tryUnlock()
var w io.WriteCloser
outputPath := c.String("file")
if outputPath == "" {
// Output to stdout
w = os.Stdout
} else {
// Create the export file
w, err = os.Create(outputPath)
checkErr("creating output file", err)
}
defer w.Close()
err = export(w)
checkErr("exporting state", err)
return nil
},
},
{
Name: "import",
Usage: "load an IPFS Cluster state from an exported state file",
Description: `
This command reads in an exported state file storing the state as a persistent
snapshot to be loaded as the cluster state when the cluster peer is restarted.
If an argument is provided, cluster will treat it as the path of the file to
import. If no argument is provided cluster will read json from stdin
`,
Action: func(c *cli.Context) error {
err := locker.lock()
checkErr("acquiring execution lock", err)
defer locker.tryUnlock()
if !c.GlobalBool("force") {
if !yesNoPrompt("The peer's state will be replaced. Run with -h for details. Continue? [y/n]:") {
return nil
}
}
// Get the importing file path
importFile := c.Args().First()
var r io.ReadCloser
if importFile == "" {
r = os.Stdin
logger.Info("Reading from stdin, Ctrl-D to finish")
} else {
r, err = os.Open(importFile)
checkErr("reading import file", err)
}
defer r.Close()
err = stateImport(r)
checkErr("importing state", err)
logger.Info("the given state has been correctly imported to this peer. Make sure all peers have consistent states")
return nil
},
},
{
Name: "cleanup",
Usage: "cleanup persistent consensus state so cluster can start afresh",
Description: `
This command removes the persistent state that is loaded on startup to determine this peer's view of the
cluster state. While it removes the existing state from the load path, one invocation does not permanently remove
this state from disk. This command renames cluster's data folder to <data-folder-name>.old.0, and rotates other
deprecated data folders to <data-folder-name>.old.<n+1>, etc for some rotation factor before permanatly deleting
the mth data folder (m currently defaults to 5)
`,
Action: func(c *cli.Context) error {
err := locker.lock()
checkErr("acquiring execution lock", err)
defer locker.tryUnlock()
if !c.GlobalBool("force") {
if !yesNoPrompt("The peer's state will be removed from the load path. Existing pins may be lost. Continue? [y/n]:") {
return nil
}
}
cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs()
err = cfg.LoadJSONFromFile(configPath)
checkErr("initializing configs", err)
dataFolder := filepath.Join(consensusCfg.BaseDir, raft.DefaultDataSubFolder)
err = raft.CleanupRaft(dataFolder)
checkErr("Cleaning up consensus data", err)
logger.Warningf("the %s folder has been rotated. Next start will use an empty state", dataFolder)
return nil
},
},
},
@ -476,6 +588,25 @@ func promptUser(msg string) string {
return scanner.Text()
}
// Lifted from go-ipfs/cmd/ipfs/daemon.go
func yesNoPrompt(prompt string) bool {
var s string
for i := 0; i < 3; i++ {
fmt.Printf("%s ", prompt)
fmt.Scanf("%s", &s)
switch s {
case "y", "Y":
return true
case "n", "N":
return false
case "":
return false
}
fmt.Println("Please press either 'y' or 'n'")
}
return false
}
func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *disk.Config, *numpin.Config) {
cfg := config.NewManager()
clusterCfg := &ipfscluster.Config{}

View File

@ -1,54 +1,94 @@
package main
import (
"encoding/json"
"errors"
"io"
"io/ioutil"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/state/mapstate"
)
func upgrade() error {
// Load configs
cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()
var errNoSnapshot = errors.New("no snapshot found")
// Execution lock
err := locker.lock()
func upgrade() error {
newState, err := restoreStateFromDisk()
if err != nil {
return err
}
defer locker.tryUnlock()
cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()
err = cfg.LoadJSONFromFile(configPath)
if err != nil {
return err
}
newState := mapstate.NewMapState()
return raft.SnapshotSave(consensusCfg, newState, clusterCfg.ID)
}
func export(w io.Writer) error {
stateToExport, err := restoreStateFromDisk()
if err != nil {
return err
}
return exportState(stateToExport, w)
}
func restoreStateFromDisk() (*mapstate.MapState, error) {
cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs()
err := cfg.LoadJSONFromFile(configPath)
if err != nil {
return nil, err
}
// Get the last state
r, snapExists, err := raft.LastStateRaw(consensusCfg)
if err != nil {
return err
}
if !snapExists {
logger.Error("no raft state currently exists to upgrade from")
return errors.New("no snapshot could be found")
err = errNoSnapshot
}
if err != nil {
return nil, err
}
// Restore the state from snapshot
err = newState.Restore(r)
stateFromSnap := mapstate.NewMapState()
err = stateFromSnap.Migrate(r)
if err != nil {
return nil, err
}
return stateFromSnap, nil
}
func stateImport(r io.Reader) error {
cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()
err := cfg.LoadJSONFromFile(configPath)
if err != nil {
return err
}
// Reset with SnapshotSave
err = raft.SnapshotSave(consensusCfg, newState, clusterCfg.ID)
pinSerials := make([]api.PinSerial, 0)
dec := json.NewDecoder(r)
err = dec.Decode(&pinSerials)
if err != nil {
return err
}
return nil
stateToImport := mapstate.NewMapState()
for _, pS := range pinSerials {
err = stateToImport.Add(pS.ToPin())
if err != nil {
return err
}
}
return raft.SnapshotSave(consensusCfg, stateToImport, clusterCfg.ID)
}
func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error {
@ -56,10 +96,8 @@ func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error {
r, snapExists, err := raft.LastStateRaw(cCfg)
if !snapExists && err != nil {
logger.Error("error before reading latest snapshot.")
return err
} else if snapExists && err != nil {
logger.Error("error after reading last snapshot. Snapshot potentially corrupt.")
return err
} else if snapExists && err == nil {
raw, err := ioutil.ReadAll(r)
if err != nil {
@ -77,8 +115,23 @@ func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error {
logger.Error("To launch a node without this state, rename the consensus data directory.")
logger.Error("Hint, the default is .ipfs-cluster/ipfs-cluster-data.")
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
return errors.New("outdated state version stored")
err = errors.New("outdated state version stored")
}
} // !snapExists && err == nil // no existing state, no check needed
return nil
return err
}
// ExportState saves a json representation of a state
func exportState(state *mapstate.MapState, w io.Writer) error {
// Serialize pins
pins := state.List()
pinSerials := make([]api.PinSerial, len(pins), len(pins))
for i, pin := range pins {
pinSerials[i] = pin.ToSerial()
}
// Write json to output file
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(pinSerials)
}

View File

@ -7,7 +7,7 @@ SHARNESS_LIB="lib/sharness/sharness.sh"
# Daemons output will be redirected to...
IPFS_OUTPUT="/dev/null"
# IPFS_OUTPUT="/dev/stderr" # uncomment for debugging
#IPFS_OUTPUT="/dev/stderr" # uncomment for debugging
. "$SHARNESS_LIB" || {
echo >&2 "Cannot source: $SHARNESS_LIB"
@ -98,6 +98,14 @@ test_confirm_v1State() {
fi
}
test_confirm_importState() {
IMP_STATE_PATH="../test_data/importState"
if [ -f $IMP_STATE_PATH ]; then
cp $IMP_STATE_PATH importState
test_set_prereq IMPORTSTATE
fi
}
cluster_kill(){
kill -1 "$CLUSTER_D_PID"
while pgrep ipfs-cluster-service >/dev/null; do

View File

@ -1,7 +1,6 @@
#!/bin/bash
test_description="Test service state upgrade v1 -> v2 and v2 -> v2"
IPFS_OUTPUT="/dev/stderr" # change for debugging
. lib/test-lib.sh

View File

@ -0,0 +1,31 @@
#!/bin/bash
test_description="Test service state export"
. lib/test-lib.sh
test_ipfs_init
cleanup test_clean_ipfs
test_cluster_init
cleanup test_clean_cluster
test_expect_success IPFS,CLUSTER "state export fails without snapshots" '
cluster_kill && sleep 5 &&
test_expect_code 1 ipfs-cluster-service --debug --config "test-config" state export
'
test_clean_cluster
test_cluster_init
test_expect_success IPFS,CLUSTER,JQ "state export saves the correct state to expected file" '
cid=`docker exec ipfs sh -c "echo test_52 | ipfs add -q"` &&
ipfs-cluster-ctl pin add "$cid" &&
sleep 5 &&
cluster_kill && sleep 5 &&
ipfs-cluster-service --debug --config "test-config" state export -f export.json &&
[ -f export.json ] &&
jq ".[].cid" export.json | grep -q "$cid"
'
test_done

View File

@ -0,0 +1,31 @@
#!/bin/bash
test_description="Test service state import"
. lib/test-lib.sh
test_ipfs_init
cleanup test_clean_ipfs
test_cluster_init
cleanup test_clean_cluster
test_confirm_importState
# Kill cluster daemon but keep data folder
cluster_kill
test_expect_success IPFS,CLUSTER "state import fails on incorrect format" '
sleep 5 &&
echo "not exactly json" > badImportFile &&
test_expect_code 1 ipfs-cluster-service -f --config "test-config" state import badImportFile
'
test_expect_success IPFS,CLUSTER,IMPORTSTATE "state import succeeds on correct format" '
cid=`docker exec ipfs sh -c "echo test_53 | ipfs add -q"` &&
ipfs-cluster-service -f --debug --config "test-config" state import importState &&
cluster_start &&
sleep 5 &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
'
test_done

View File

@ -0,0 +1,38 @@
#!/bin/bash
test_description="Test service state import"
. lib/test-lib.sh
test_ipfs_init
cleanup test_clean_ipfs
test_cluster_init
cleanup test_clean_cluster
test_expect_success IPFS,CLUSTER "state cleanup refreshes state on restart" '
cid=`docker exec ipfs sh -c "echo test_54 | ipfs add -q"` &&
ipfs-cluster-ctl pin add "$cid" && sleep 5 &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" &&
[ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] &&
cluster_kill && sleep 5 &&
ipfs-cluster-service -f --debug --config "test-config" state cleanup &&
cluster_start && sleep 5 &&
[ 0 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ]
'
test_expect_success IPFS,CLUSTER "export + cleanup + import == noop" '
cid=`docker exec ipfs sh -c "echo test_54 | ipfs add -q"` &&
ipfs-cluster-ctl pin add "$cid" && sleep 5 &&
[ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] &&
cluster_kill && sleep 5 &&
ipfs-cluster-service --debug --config "test-config" state export -f import.json &&
ipfs-cluster-service -f --debug --config "test-config" state cleanup &&
ipfs-cluster-service -f --debug --config "test-config" state import import.json &&
cluster_start && sleep 5 &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" &&
[ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ]
'
test_done

View File

@ -0,0 +1,8 @@
[
{
"cid": "QmbrCtydGyPeHiLURSPMqrvE5mCgMCwFYq3UD4XLCeAYw6",
"name": "",
"allocations": [],
"replication_factor": -1
}
]

View File

@ -23,10 +23,8 @@ type State interface {
Has(*cid.Cid) bool
// Get returns the information attacthed to this pin
Get(*cid.Cid) api.Pin
// Snapshot writes a snapshot of the state to a writer
Snapshot(w io.Writer) error
// Restore restores an outdated state to the current version
Restore(r io.Reader) error
// Migrate restores the serialized format of an outdated state to the current version
Migrate(r io.Reader) error
// Return the version of this state
GetVersion() int
// Marshal serializes the state to a byte slice

View File

@ -4,7 +4,6 @@ package mapstate
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"sync"
@ -87,19 +86,9 @@ func (st *MapState) List() []api.Pin {
return cids
}
// Snapshot dumps the MapState to the given writer, in pretty json
// format.
func (st *MapState) Snapshot(w io.Writer) error {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(st)
}
// Restore restores a snapshot from the state's internal bytes. It should
// migrate the format if it is not compatible with the current version.
func (st *MapState) Restore(r io.Reader) error {
// Migrate restores a snapshot from the state's internal bytes and if
// necessary migrates the format to the current version.
func (st *MapState) Migrate(r io.Reader) error {
bs, err := ioutil.ReadAll(r)
if err != nil {
return err

View File

@ -118,7 +118,7 @@ func TestMigrateFromV1(t *testing.T) {
}
// Migrate state to current version
r := bytes.NewBuffer(v1Bytes)
err = ms.Restore(r)
err = ms.Migrate(r)
if err != nil {
t.Error(err)
}