ipfs-cluster-service state upgrade cli command

ipfs-cluster-service now has a migration subcommand that upgrades
    persistant state snapshots with an out-of-date format version to the
    newest version of raft state. If all cluster members shutdown with
    consistent state, upgrade ipfs-cluster, and run the state upgrade command,
    the new version of cluster will be compatible with persistent storage.
    ipfs-cluster now validates its persistent state upon loading it and exits
    with a clear error in the case the state format version is not up to date.

    Raft snapshotting is enforced on all shutdowns and the json backup is no
    longer run.  This commit makes use of recent changes to libp2p-raft
    allowing raft states to implement their own marshaling strategies. Now
    mapstate handles the logic for its (de)serialization.  In the interest of
    supporting various potential upgrade formats the state serialization
    begins with a varint (right now one byte) describing the version.

    Some go tests are modified and a go test is added to cover new ipfs-cluster
    raft snapshot reading functions.  Sharness tests are added to cover the
    state upgrade command.
This commit is contained in:
Wyatt 2017-11-28 17:45:10 -05:00
parent 2bc7aec079
commit 47b744f1c0
29 changed files with 555 additions and 92 deletions

View File

@ -3,8 +3,6 @@ package ipfscluster
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -520,9 +518,11 @@ func (c *Cluster) Shutdown() error {
}
// Do not save anything if we were not ready
if c.readyB {
c.backupState()
}
// if c.readyB {
// // peers are saved usually on addPeer/rmPeer
// // c.peerManager.savePeers()
// c.config.BackupState(c.state)
//}
// We left the cluster or were removed. Destroy the Raft state.
if c.removed && c.readyB {
@ -1350,34 +1350,6 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer
}
}
func (c *Cluster) backupState() {
if c.config.BaseDir == "" {
logger.Warning("ClusterConfig BaseDir unset. Skipping backup")
return
}
folder := filepath.Join(c.config.BaseDir, "backups")
err := os.MkdirAll(folder, 0700)
if err != nil {
logger.Error(err)
logger.Error("skipping backup")
return
}
fname := time.Now().UTC().Format("20060102_15:04:05")
f, err := os.Create(filepath.Join(folder, fname))
if err != nil {
logger.Error(err)
return
}
defer f.Close()
err = c.state.Snapshot(f)
if err != nil {
logger.Error(err)
return
}
}
// diffPeers returns the peerIDs added and removed from peers2 in relation to
// peers1
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {

View File

@ -392,3 +392,33 @@ func clusterSecretToKey(secret []byte) (string, error) {
return key.String(), nil
}
// BackupState backs up a state according to this configuration's options
//func (cfg *Config) BackupState(state state.State) error {
// if cfg.BaseDir == "" {
// msg := "ClusterConfig BaseDir unset. Skipping backup"
// logger.Warning(msg)
// return errors.New(msg)
// }
// folder := filepath.Join(cfg.BaseDir, "backups")
// err := os.MkdirAll(folder, 0700)
// if err != nil {
// logger.Error(err)
// logger.Error("skipping backup")
// return errors.New("skipping backup")
// }
// fname := time.Now().UTC().Format("20060102_15:04:05")
// f, err := os.Create(filepath.Join(folder, fname))
// if err != nil {
// logger.Error(err)
// return err
// }
// defer f.Close()
// err = state.Snapshot(f)
// if err != nil {
// logger.Error(err)
// return err
// }
// return nil
// }

View File

@ -3,6 +3,7 @@ package ipfscluster
import (
"errors"
"os"
"path/filepath"
"testing"
"time"
@ -115,7 +116,10 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
}
func cleanRaft() {
os.RemoveAll("raftFolderFromTests")
raftDirs, _ := filepath.Glob("raftFolderFromTests*")
for _, dir := range raftDirs {
os.RemoveAll(dir)
}
}
func testClusterShutdown(t *testing.T) {

View File

@ -221,3 +221,66 @@ func TestConsensusLeader(t *testing.T) {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}
func TestRaftLatestSnapshot(t *testing.T) {
cc := testingConsensus(t, p2pPort)
defer cleanRaft(p2pPort)
defer cc.Shutdown()
// Make pin 1
c1, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1})
if err != nil {
t.Error("the first pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the first snapshot was not taken successfully")
}
// Make pin 2
c2, _ := cid.Decode(test.TestCid2)
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1})
if err != nil {
t.Error("the second pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the second snapshot was not taken successfully")
}
// Make pin 3
c3, _ := cid.Decode(test.TestCid3)
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1})
if err != nil {
t.Error("the third pin did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the third snapshot was not taken successfully")
}
// Call raft.LastState and ensure we get the correct state
snapState := mapstate.NewMapState()
r, snapExists, err := LastStateRaw(cc.config)
if !snapExists {
t.Fatal("No snapshot found by LastStateRaw")
}
if err != nil {
t.Fatal("Error while taking snapshot", err)
}
err = snapState.Restore(r)
if err != nil {
t.Fatal("Snapshot bytes returned could not restore to state")
}
pins := snapState.List()
if len(pins) != 3 {
t.Fatal("Latest snapshot not read")
}
}

View File

@ -3,6 +3,7 @@ package raft
import (
"context"
"errors"
"io"
"os"
"path/filepath"
"time"
@ -12,6 +13,8 @@ import (
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
p2praft "github.com/libp2p/go-libp2p-raft"
"github.com/ipfs/ipfs-cluster/state"
)
// errBadRaftState is returned when the consensus component cannot start
@ -332,9 +335,13 @@ func (rw *raftWrapper) Snapshot() error {
// Shutdown shutdown Raft and closes the BoltDB.
func (rw *raftWrapper) Shutdown() error {
future := rw.raft.Shutdown()
err := future.Error()
errMsgs := ""
err := rw.Snapshot()
if err != nil {
errMsgs += "could not snapshot raft: " + err.Error() + ".\n"
}
future := rw.raft.Shutdown()
err = future.Error()
if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
}
@ -427,9 +434,106 @@ func (rw *raftWrapper) Peers() ([]string, error) {
return ids, nil
}
// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
dbh := newDataBackupHelper(rw.dataFolder)
// latestSnapshot looks for the most recent raft snapshot stored at the
// provided basedir. It returns a boolean indicating if any snapshot is
// readable, the snapshot's metadata, and a reader to the snapshot's bytes
func latestSnapshot(raftDataFolder string) (*hraft.SnapshotMeta, io.ReadCloser, error) {
store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil)
if err != nil {
return nil, nil, err
}
snapMetas, err := store.List()
if err != nil {
return nil, nil, err
}
if len(snapMetas) == 0 { // no error if snapshot isn't found
return nil, nil, nil
}
meta, r, err := store.Open(snapMetas[0].ID)
if err != nil {
return nil, nil, err
}
return meta, r, nil
}
// LastStateRaw returns the bytes of the last snapshot stored, its metadata,
// and a flag indicating whether any snapshot was found.
func LastStateRaw(cfg *Config) (io.Reader, bool, error) {
// Read most recent snapshot
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return nil, false, err
}
meta, r, err := latestSnapshot(dataFolder)
if err != nil {
return nil, false, err
}
if meta == nil { // no snapshots could be read
return nil, false, nil
}
return r, true, nil
}
// SnapshotSave saves the provided state to a snapshot in the
// raft data path. Old raft data is backed up and replaced
// by the new snapshot
func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error {
newStateBytes, err := p2praft.EncodeSnapshot(newState)
if err != nil {
return err
}
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return err
}
meta, _, err := latestSnapshot(dataFolder)
if err != nil {
return err
}
// make a new raft snapshot
var raftSnapVersion hraft.SnapshotVersion
raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1
configIndex := uint64(1)
var raftIndex uint64
var raftTerm uint64
var srvCfg hraft.Configuration
if meta != nil {
raftIndex = meta.Index
raftTerm = meta.Term
srvCfg = meta.Configuration
cleanupRaft(dataFolder)
} else {
raftIndex = uint64(1)
raftTerm = uint64(1)
srvCfg = makeServerConf([]peer.ID{pid})
}
snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, nil)
if err != nil {
return err
}
_, dummyTransport := hraft.NewInmemTransport("")
sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
if err != nil {
return err
}
_, err = sink.Write(newStateBytes)
if err != nil {
sink.Cancel()
return err
}
err = sink.Close()
if err != nil {
return err
}
return nil
}
func cleanupRaft(dataFolder string) error {
dbh := newDataBackupHelper(dataFolder)
err := dbh.makeBackup()
if err != nil {
logger.Warning(err)
@ -439,6 +543,11 @@ func (rw *raftWrapper) Clean() error {
return nil
}
// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
return cleanupRaft(rw.dataFolder)
}
func find(s []string, elem string) bool {
for _, selem := range s {
if selem == elem {

View File

@ -7,7 +7,7 @@ for dir in $dirs;
do
if ls "$dir"/*.go &> /dev/null;
then
go test -v -coverprofile=profile.out -covermode=count -tags silent "$dir"
go test -v -coverprofile=profile.out -covermode=count -tags silent -timeout 15m "$dir"
if [ $? -ne 0 ];
then
exit 1

View File

@ -338,7 +338,7 @@ The safest way to upgrade ipfs-cluster is to stop all cluster peers, update and
As long as the *shared state* format has not changed, there is nothing preventing from stopping cluster peers separately, updating and launching them.
When the shared state format has changed, a state migration will be required. ipfs-cluster will refuse to start and inform the user in this case, although this feature is yet to be implemented next time the state format changes.
When the shared state format has changed, a state migration will be required. ipfs-cluster will refuse to start and inform the user in this case. Currently state migrations are supported in one direction, from old state formats to the format used by the updated ipfs-cluster-service. This is accomplished by stopping the ipfs-cluster-service daemon and running `ipfs-cluster-service state upgrade`. Note that due to changes in state serialization introduced while implementing state migrations ipfs-cluster shared state saved before December 2017 can not be migrated with this method.
The upgrading procedures is something which is actively worked on and will improve over time.

View File

@ -14,7 +14,7 @@ import (
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli"
cli "github.com/urfave/cli"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
@ -239,6 +239,29 @@ configuration.
Usage: "run the IPFS Cluster peer (default)",
Action: daemon,
},
{
Name: "state",
Usage: "Manage ipfs-cluster-state",
Subcommands: []cli.Command{
{
Name: "upgrade",
Usage: "upgrade the IPFS Cluster state to the current version",
Description: `
This command upgrades the internal state of the ipfs-cluster node
specified in the latest raft snapshot. The state format is migrated from the
version of the snapshot to the version supported by the current cluster version.
To succesfully run an upgrade of an entire cluster, shut down each peer without
removal, upgrade state using this command, and restart every peer.
`,
Action: func(c *cli.Context) error {
err := upgrade()
checkErr("upgrading state", err)
return nil
},
},
},
},
}
app.Before = func(c *cli.Context) error {
@ -301,6 +324,10 @@ func daemon(c *cli.Context) error {
checkErr("creating IPFS Connector component", err)
state := mapstate.NewMapState()
err = validateVersion(clusterCfg, consensusCfg)
checkErr("validating version", err)
tracker := maptracker.NewMapPinTracker(clusterCfg.ID)
mon, err := basic.NewMonitor(monCfg)
checkErr("creating Monitor component", err)
@ -342,6 +369,7 @@ var facilities = []string{
"cluster",
"restapi",
"ipfshttp",
"mapstate",
"monitor",
"consensus",
"pintracker",

View File

@ -0,0 +1,78 @@
package main
import (
"errors"
"io/ioutil"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/consensus/raft"
)
func upgrade() error {
//Load configs
cfg, clusterCfg, _, _, consensusCfg, _, _, _ := makeConfigs()
err := cfg.LoadJSONFromFile(configPath)
if err != nil {
return err
}
newState := mapstate.NewMapState()
// Get the last state
r, snapExists, err := raft.LastStateRaw(consensusCfg)
if err != nil {
return err
}
if !snapExists {
logger.Error("No raft state currently exists to upgrade from")
return errors.New("No snapshot could be found")
}
// Restore the state from snapshot
err = newState.Restore(r)
if err != nil {
return err
}
// Reset with SnapshotSave
err = raft.SnapshotSave(consensusCfg, newState, clusterCfg.ID)
if err != nil {
return err
}
return nil
}
func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error {
state := mapstate.NewMapState()
r, snapExists, err := raft.LastStateRaw(cCfg)
if !snapExists && err != nil {
logger.Error("Error before reading latest snapshot.")
return err
} else if snapExists && err != nil {
logger.Error("Error after reading last snapshot. Snapshot potentially corrupt.")
return err
} else if snapExists && err == nil {
raw, err := ioutil.ReadAll(r)
if err != nil {
return err
}
err = state.Unmarshal(raw)
if err != nil {
logger.Error("Error unmarshalling snapshot. Snapshot potentially corrupt.")
return err
}
if state.GetVersion() != state.Version {
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.Error("Out of date ipfs-cluster state is saved.")
logger.Error("To migrate to the new version, run ipfs-cluster-service state upgrade.")
logger.Error("To launch a node without this state, rename the consensus data directory.")
logger.Error("Hint, the default is .ipfs-cluster/ipfs-cluster-data.")
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
return errors.New("Outdated state version stored")
}
} // !snapExists && err == nil // no existing state, no check needed
return nil
}

View File

@ -9,6 +9,7 @@ var facilities = []string{
"restapi",
"ipfshttp",
"monitor",
"mapstate",
"consensus",
"raft",
"pintracker",

View File

@ -21,9 +21,9 @@
},
{
"author": "hsanjuan",
"hash": "QmcjpDnmS6jGrYrTnuT4RDHUHduF97w2V8JuYs6eynbFg2",
"hash": "QmSzULQiTbMnt36oxzdaUUomNuTHwXa1DvnyM8o7ogJ7Hb",
"name": "go-libp2p-raft",
"version": "1.1.0"
"version": "1.2.0"
},
{
"author": "whyrusleeping",

View File

@ -6,8 +6,8 @@
SHARNESS_LIB="lib/sharness/sharness.sh"
# Daemons output will be redirected to...
IPFS_OUTPUT="/dev/null" # change for debugging
# IPFS_OUTPUT="/dev/stderr" # change for debugging
IPFS_OUTPUT="/dev/null"
# IPFS_OUTPUT="/dev/stderr" # uncomment for debugging
. "$SHARNESS_LIB" || {
echo >&2 "Cannot source: $SHARNESS_LIB"
@ -35,7 +35,10 @@ test_ipfs_init() {
echo "Error running go-ipfs in docker."
exit 1
fi
sleep 10
while ! curl -s "localhost:5001/api/v0/version" > /dev/null; do
sleep 0.2
done
sleep 2
fi
test_set_prereq IPFS
}
@ -71,10 +74,7 @@ test_cluster_init() {
if [ -n "$custom_config_files" ]; then
cp -f ${custom_config_files}/* "test-config"
fi
ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 &
export CLUSTER_D_PID=$!
sleep 5
test_set_prereq CLUSTER
cluster_start
}
test_cluster_config() {
@ -88,6 +88,34 @@ cluster_id() {
jq --raw-output ".cluster.id" test-config/service.json
}
test_confirm_v1State() {
V1_SNAP_PATH="../test_data/v1State"
V1_CRC_PATH="../test_data/v1Crc"
if [ -f $V1_SNAP_PATH ] && [ -f $V1_CRC_PATH ]; then
export V1_CRC=$(cat ../test_data/v1Crc)
cp $V1_SNAP_PATH v1State
test_set_prereq V1STATE
fi
}
cluster_kill(){
kill -1 "$CLUSTER_D_PID"
while pgrep ipfs-cluster-service >/dev/null; do
sleep 0.2
done
}
cluster_start(){
ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 &
export CLUSTER_D_PID=$!
while ! curl -s 'localhost:9095/api/v0/version' >/dev/null; do
sleep 0.2
done
sleep 5 # wait for leader election
test_set_prereq CLUSTER
}
# Cleanup functions
test_clean_ipfs(){
docker kill ipfs
@ -96,7 +124,6 @@ test_clean_ipfs(){
}
test_clean_cluster(){
kill -1 "$CLUSTER_D_PID"
cluster_kill
rm -rf 'test-config'
sleep 2
}

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
test_description="Test ctl installation and some basic commands"

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
test_description="Test service startup and init functionality"

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
test_description="Test ctl's status reporting functionality. Test errors on incomplete commands"

View File

@ -11,7 +11,7 @@ cleanup test_clean_cluster
test_expect_success IPFS,CLUSTER "pin data to cluster with ctl" '
cid=`docker exec ipfs sh -c "echo test | ipfs add -q"`
ipfs-cluster-ctl pin add "$cid" &> test4 &&
ipfs-cluster-ctl pin add "$cid" &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
'

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
test_description="Test service + ctl SSL interaction"

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
test_description="Test failure when server not using SSL but client requests it"

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
test_description="Test service + ctl SSL interaction"

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
test_description="Test service + ctl SSL interaction"

View File

@ -0,0 +1,30 @@
#!/bin/bash
test_description="Test service state 'upgrade' from current version"
. lib/test-lib.sh
test_ipfs_init
cleanup test_clean_ipfs
test_cluster_init
cleanup test_clean_cluster
test_expect_success IPFS,CLUSTER "cluster-service state upgrade works" '
cid=`docker exec ipfs sh -c "echo testing | ipfs add -q"` &&
ipfs-cluster-ctl pin add "$cid" &&
sleep 5 &&
cluster_kill &&
ipfs-cluster-service --config "test-config" state upgrade
'
# previous test kills the cluster, we need to re-start
# if done inside the test, we lose debugging output
cluster_start
test_expect_success IPFS,CLUSTER "state is preserved after migration" '
cid=`docker exec ipfs sh -c "echo testing | ipfs add -q"` &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
'
test_done

View File

@ -0,0 +1,34 @@
#!/bin/bash
test_description="Test service state upgrade v1 -> v2 and v2 -> v2"
IPFS_OUTPUT="/dev/stderr" # change for debugging
. lib/test-lib.sh
test_ipfs_init
cleanup test_clean_ipfs
test_cluster_init
test_confirm_v1State
cleanup test_clean_cluster
# Make a pin and shutdown to force a snapshot. Modify snapshot files to specify
# a snapshot of v1 state pinning "test" (it's easier than taking a new one each
# time with the correct metadata). Upgrade state, check that the correct
# pin cid is in the state
test_expect_success IPFS,CLUSTER,V1STATE,JQ "cluster-service loads v1 state correctly" '
cid=`docker exec ipfs sh -c "echo test | ipfs add -q"` &&
cid2=`docker exec ipfs sh -c "echo testing | ipfs add -q"` &&
ipfs-cluster-ctl pin add "$cid2" &&
cluster_kill &&
sleep 15 &&
SNAP_DIR=`find test-config/ipfs-cluster-data/snapshots/ -maxdepth 1 -mindepth 1 | head -n 1` &&
cp v1State "$SNAP_DIR/state.bin" &&
cat "$SNAP_DIR/meta.json" | jq --arg CRC "$V1_CRC" '"'"'.CRC = $CRC'"'"' > tmp.json &&
cp tmp.json "$SNAP_DIR/meta.json" &&
ipfs-cluster-service --debug --config "test-config" state upgrade &&
cluster_start &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
'
test_done

1
sharness/test_data/v1Crc Normal file
View File

@ -0,0 +1 @@
y8SrOIoXJo4=

BIN
sharness/test_data/v1State Executable file

Binary file not shown.

View File

@ -25,6 +25,12 @@ type State interface {
Get(*cid.Cid) api.Pin
// Snapshot writes a snapshot of the state to a writer
Snapshot(w io.Writer) error
// Restore restores a snapshot from a reader
// Restore restores an outdated state to the current version
Restore(r io.Reader) error
// Return the version of this state
GetVersion() int
// Marshal serializes the state to a byte slice
Marshal() ([]byte, error)
// Unmarshal deserializes the state from marshaled bytes
Unmarshal([]byte) error
}

View File

@ -3,20 +3,25 @@
package mapstate
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"sync"
"github.com/ipfs/ipfs-cluster/api"
msgpack "github.com/multiformats/go-multicodec/msgpack"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/ipfs/ipfs-cluster/api"
)
// Version is the map state Version. States with old versions should
// perform an upgrade before.
const Version = 2
var logger = logging.Logger("mapstate")
// MapState is a very simple database to store the state of the system
// using a Go map. It is thread safe. It implements the State interface.
type MapState struct {
@ -92,22 +97,69 @@ func (st *MapState) Snapshot(w io.Writer) error {
return enc.Encode(st)
}
// Restore takes a reader and restores a snapshot. It should migrate
// the format if it is not compatible with the current version.
// Restore restores a snapshot from the state's internal bytes. It should
// migrate the format if it is not compatible with the current version.
func (st *MapState) Restore(r io.Reader) error {
snap, err := ioutil.ReadAll(r)
bs, err := ioutil.ReadAll(r)
if err != nil {
return err
}
var vonly struct{ Version int }
err = json.Unmarshal(snap, &vonly)
err = st.Unmarshal(bs)
if st.Version == Version { // Unmarshal restored for us
return nil
}
bytesNoVersion := bs[1:] // Restore is aware of encoding format
err = st.migrateFrom(st.Version, bytesNoVersion)
if err != nil {
return err
}
if vonly.Version == Version {
// we are good
err := json.Unmarshal(snap, st)
return err
}
return st.migrateFrom(vonly.Version, snap)
st.Version = Version
return nil
}
// GetVersion returns the current version of this state object.
// It is not necessarily up to date
func (st *MapState) GetVersion() int {
return st.Version
}
// Marshal encodes the state using msgpack
func (st *MapState) Marshal() ([]byte, error) {
logger.Debugf("Marshal-- Marshalling state of version %d", st.Version)
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
if err := enc.Encode(st); err != nil {
return nil, err
}
// First byte indicates the version (probably should make this a varint
// if we stick to this encoding)
vCodec := make([]byte, 1)
vCodec[0] = byte(st.Version)
ret := append(vCodec, buf.Bytes()...)
logger.Debugf("Marshal-- The final marshaled bytes: %x", ret)
return ret, nil
}
// Unmarshal decodes the state using msgpack. It first decodes just
// the version number. If this is not the current version the bytes
// are stored within the state's internal reader, which can be migrated
// to the current version in a later call to restore. Note: Out of date
// version is not an error
func (st *MapState) Unmarshal(bs []byte) error {
// Check version byte
logger.Debugf("The incoming bytes to unmarshal: %x", bs)
v := int(bs[0])
logger.Debugf("The interpreted version: %d", v)
if v != Version { // snapshot is out of date
st.Version = v
return nil
}
// snapshot is up to date
buf := bytes.NewBuffer(bs[1:])
dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf)
if err := dec.Decode(st); err != nil {
return err
}
return nil
}

View File

@ -2,9 +2,10 @@ package mapstate
import (
"bytes"
"fmt"
"testing"
msgpack "github.com/multiformats/go-multicodec/msgpack"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
@ -69,19 +70,21 @@ func TestList(t *testing.T) {
}
}
func TestSnapshotRestore(t *testing.T) {
func TestMarshalUnmarshal(t *testing.T) {
ms := NewMapState()
ms.Add(c)
var buf bytes.Buffer
err := ms.Snapshot(&buf)
b, err := ms.Marshal()
if err != nil {
t.Fatal(err)
}
ms2 := NewMapState()
err = ms2.Restore(&buf)
err = ms2.Unmarshal(b)
if err != nil {
t.Fatal(err)
}
if ms.Version != ms2.Version {
t.Fatal(err)
}
get := ms2.Get(c.Cid)
if get.Allocations[0] != testPeerID1 {
t.Error("expected different peer id")
@ -89,19 +92,36 @@ func TestSnapshotRestore(t *testing.T) {
}
func TestMigrateFromV1(t *testing.T) {
v1 := []byte(fmt.Sprintf(`{
"Version": 1,
"PinMap": {
"%s": {}
}
}
`, c.Cid))
buf := bytes.NewBuffer(v1)
ms := NewMapState()
err := ms.Restore(buf)
// Construct the bytes of a v1 state
var v1State mapStateV1
v1State.PinMap = map[string]struct{}{
c.Cid.String(): {}}
v1State.Version = 1
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
err := enc.Encode(v1State)
if err != nil {
t.Fatal(err)
}
vCodec := make([]byte, 1)
vCodec[0] = byte(v1State.Version)
v1Bytes := append(vCodec, buf.Bytes()...)
// Unmarshal first to check this is v1
ms := NewMapState()
err = ms.Unmarshal(v1Bytes)
if err != nil {
t.Error(err)
}
if ms.Version != 1 {
t.Error("unmarshal picked up the wrong version")
}
// Migrate state to current version
r := bytes.NewBuffer(v1Bytes)
err = ms.Restore(r)
if err != nil {
t.Error(err)
}
get := ms.Get(c.Cid)
if get.ReplicationFactor != -1 || !get.Cid.Equals(c.Cid) {
t.Error("expected something different")

View File

@ -1,9 +1,11 @@
package mapstate
import (
"encoding/json"
"bytes"
"errors"
msgpack "github.com/multiformats/go-multicodec/msgpack"
"github.com/ipfs/ipfs-cluster/api"
)
@ -16,10 +18,12 @@ func (st *MapState) migrateFrom(version int, snap []byte) error {
switch version {
case 1:
var mstv1 mapStateV1
err := json.Unmarshal(snap, &mstv1)
if err != nil {
buf := bytes.NewBuffer(snap)
dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf)
if err := dec.Decode(&mstv1); err != nil {
return err
}
for k := range mstv1.PinMap {
st.PinMap[k] = api.PinSerial{
Cid: k,

View File

@ -52,6 +52,10 @@ func copyEmptyStructToIfaces(in []struct{}) []interface{} {
return ifaces
}
func MultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
return multiaddrSplit(addr)
}
func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
pid, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {