Merge pull request #307 from ipfs/feat/auto-migration
Three small state features
This commit is contained in:
commit
a371fa409f
|
@ -242,12 +242,26 @@ configuration.
|
|||
{
|
||||
Name: "daemon",
|
||||
Usage: "run the IPFS Cluster peer (default)",
|
||||
Flags: []cli.Flag{
|
||||
cli.BoolFlag{
|
||||
Name: "upgrade, u",
|
||||
Usage: "run necessary state migrations before starting cluster service",
|
||||
},
|
||||
},
|
||||
Action: daemon,
|
||||
},
|
||||
{
|
||||
Name: "state",
|
||||
Usage: "Manage ipfs-cluster-state",
|
||||
Subcommands: []cli.Command{
|
||||
{
|
||||
Name: "version",
|
||||
Usage: "display the shared state format version",
|
||||
Action: func(c *cli.Context) error {
|
||||
fmt.Printf("%d\n", mapstate.Version)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "upgrade",
|
||||
Usage: "upgrade the IPFS Cluster state to the current version",
|
||||
|
@ -424,6 +438,13 @@ func daemon(c *cli.Context) error {
|
|||
|
||||
// Load all the configurations
|
||||
cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg := makeConfigs()
|
||||
|
||||
// Run any migrations
|
||||
if c.Bool("upgrade") {
|
||||
err := upgrade()
|
||||
checkErr("upgrading state", err)
|
||||
}
|
||||
|
||||
// Execution lock
|
||||
err := locker.lock()
|
||||
checkErr("acquiring execution lock", err)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
|
@ -15,11 +16,16 @@ import (
|
|||
var errNoSnapshot = errors.New("no snapshot found")
|
||||
|
||||
func upgrade() error {
|
||||
newState, err := restoreStateFromDisk()
|
||||
newState, current, err := restoreStateFromDisk()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if current {
|
||||
logger.Warning("Skipping migration of up-to-date state")
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()
|
||||
|
||||
err = cfg.LoadJSONFromFile(configPath)
|
||||
|
@ -32,7 +38,7 @@ func upgrade() error {
|
|||
}
|
||||
|
||||
func export(w io.Writer) error {
|
||||
stateToExport, err := restoreStateFromDisk()
|
||||
stateToExport, _, err := restoreStateFromDisk()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -40,12 +46,15 @@ func export(w io.Writer) error {
|
|||
return exportState(stateToExport, w)
|
||||
}
|
||||
|
||||
func restoreStateFromDisk() (*mapstate.MapState, error) {
|
||||
// restoreStateFromDisk returns a mapstate containing the latest
|
||||
// snapshot, a flag set to true when the state format has the
|
||||
// current version and an error
|
||||
func restoreStateFromDisk() (*mapstate.MapState, bool, error) {
|
||||
cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs()
|
||||
|
||||
err := cfg.LoadJSONFromFile(configPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
r, snapExists, err := raft.LastStateRaw(consensusCfg)
|
||||
|
@ -53,17 +62,31 @@ func restoreStateFromDisk() (*mapstate.MapState, error) {
|
|||
err = errNoSnapshot
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
stateFromSnap := mapstate.NewMapState()
|
||||
err = stateFromSnap.Migrate(r)
|
||||
// duplicate reader to both check version and migrate
|
||||
var buf bytes.Buffer
|
||||
r2 := io.TeeReader(r, &buf)
|
||||
raw, err := ioutil.ReadAll(r2)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
err = stateFromSnap.Unmarshal(raw)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if stateFromSnap.GetVersion() == mapstate.Version {
|
||||
return stateFromSnap, true, nil
|
||||
}
|
||||
|
||||
return stateFromSnap, nil
|
||||
err = stateFromSnap.Migrate(&buf)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return stateFromSnap, false, nil
|
||||
}
|
||||
|
||||
func stateImport(r io.Reader) error {
|
||||
|
|
|
@ -4,6 +4,7 @@ package mapstate
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
@ -99,6 +100,9 @@ func (st *MapState) Migrate(r io.Reader) error {
|
|||
return err
|
||||
}
|
||||
err = st.Unmarshal(bs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if st.Version == Version { // Unmarshal restored for us
|
||||
return nil
|
||||
}
|
||||
|
@ -142,6 +146,9 @@ func (st *MapState) Marshal() ([]byte, error) {
|
|||
func (st *MapState) Unmarshal(bs []byte) error {
|
||||
// Check version byte
|
||||
// logger.Debugf("The incoming bytes to unmarshal: %x", bs)
|
||||
if len(bs) < 1 {
|
||||
return errors.New("cannot unmarshal from empty bytes")
|
||||
}
|
||||
v := int(bs[0])
|
||||
logger.Debugf("The interpreted version: %d", v)
|
||||
if v != Version { // snapshot is out of date
|
||||
|
|
Loading…
Reference in New Issue
Block a user