Fix multiple problems with state migration

Additionally, remove persisting the state version to the go-datastore. In the
future versions of the state, there is not a global format anymore (with a
global version). Instead, every pin element can potentially be stored in a
different version.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2019-02-19 19:46:03 +00:00
parent d57b81490f
commit 6d77954327
4 changed files with 57 additions and 59 deletions

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"io"
"io/ioutil"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/api"
@ -78,19 +79,23 @@ func restoreStateFromDisk(ctx context.Context) (state.State, bool, error) {
return nil, false, err
}
full, err := ioutil.ReadAll(r)
if err != nil {
return nil, false, err
}
stateFromSnap := mapstate.NewMapState()
// duplicate reader to both check version and migrate
var buf bytes.Buffer
r2 := io.TeeReader(r, &buf)
err = stateFromSnap.Unmarshal(r2)
reader1 := bytes.NewReader(full)
err = stateFromSnap.Unmarshal(reader1)
if err != nil {
return nil, false, err
}
if stateFromSnap.GetVersion() == mapstate.Version {
return stateFromSnap, true, nil
}
err = stateFromSnap.Migrate(ctx, &buf)
reader2 := bytes.NewReader(full)
err = stateFromSnap.Migrate(ctx, reader2)
if err != nil {
return nil, false, err
}
@ -150,7 +155,7 @@ func validateVersion(ctx context.Context, cfg *ipfscluster.Config, cCfg *raft.Co
logger.Error("Out of date ipfs-cluster state is saved.")
logger.Error("To migrate to the new version, run ipfs-cluster-service state upgrade.")
logger.Error("To launch a node without this state, rename the consensus data directory.")
logger.Error("Hint, the default is .ipfs-cluster/ipfs-cluster-data.")
logger.Error("Hint: the default is .ipfs-cluster/raft.")
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
err = errors.New("outdated state version stored")
}

View File

@ -4,7 +4,6 @@ package dsstate
import (
"context"
"errors"
"io"
"github.com/ipfs/ipfs-cluster/api"
@ -43,15 +42,13 @@ func DefaultHandle() codec.Handle {
// New returns a new state using the given datastore.
//
// The version number is used to set the state version in the cases where the
// state is new.
// All keys are namespaced with the given string when written. Thus the same
// go-datastore can be sharded for different uses.
//
// All keys are namespaced with the given string, allowing that this datastore
// can be sharded in different namespaces.
//
// The Handle controls options for the serialization of items and the state
// itself.
func New(dstore ds.Datastore, version int, namespace string, handle codec.Handle) (*State, error) {
func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) {
if handle == nil {
handle = DefaultHandle()
}
@ -60,20 +57,9 @@ func New(dstore ds.Datastore, version int, namespace string, handle codec.Handle
ds: dstore,
codecHandle: handle,
namespace: ds.NewKey(namespace),
version: 0, // TODO: Remove when all migrated
}
curVersion := st.GetVersion()
if curVersion < 0 {
return nil, errors.New("error reading state version")
}
// initialize
if curVersion == 0 {
err := st.SetVersion(version)
if err != nil {
return nil, err
}
}
return st, nil
}
@ -149,7 +135,6 @@ func (st *State) List(ctx context.Context) []api.Pin {
defer results.Close()
var pins []api.Pin
versionKey := st.versionKey()
for r := range results.Next() {
if r.Error != nil {
@ -157,10 +142,6 @@ func (st *State) List(ctx context.Context) []api.Pin {
return pins
}
k := ds.NewKey(r.Key)
if k.Equal(versionKey) {
continue
}
ci, err := st.unkey(k)
if err != nil {
logger.Error("key: ", k, "error: ", err)
@ -187,34 +168,13 @@ func (st *State) Migrate(ctx context.Context, r io.Reader) error {
return nil
}
func (st *State) versionKey() ds.Key {
return st.namespace.Child(ds.NewKey("/version"))
}
// GetVersion returns the current state version.
func (st *State) GetVersion() int {
v, err := st.ds.Get(st.versionKey())
if err != nil {
if err == ds.ErrNotFound {
return 0 // fine
}
logger.Error("error getting version: ", err)
return -1
}
if len(v) != 1 {
logger.Error("bad version length")
return -1
}
return int(v[0])
return st.version
}
// SetVersion allows to manually modify the state version.
func (st *State) SetVersion(v int) error {
err := st.ds.Put(st.versionKey(), []byte{byte(v)})
if err != nil {
logger.Error("error storing version:", v)
return err
}
st.version = v
return nil
}

View File

@ -41,7 +41,7 @@ func NewMapState() state.State {
mapDs := ds.NewMapDatastore()
mutexDs := sync.MutexWrap(mapDs)
dsSt, err := dsstate.New(mutexDs, Version, "", dsstate.DefaultHandle())
dsSt, err := dsstate.New(mutexDs, "", dsstate.DefaultHandle())
if err != nil {
panic(err)
}
@ -160,6 +160,8 @@ func (st *MapState) Unmarshal(r io.Reader) error {
// Try to unmarshal normally
err = st.dst.Unmarshal(iobuf)
if err == nil {
// Set current version
st.dst.SetVersion(Version)
return nil
}

View File

@ -10,9 +10,9 @@ import (
"errors"
"io"
msgpack "github.com/multiformats/go-multicodec/msgpack"
"github.com/ipfs/ipfs-cluster/api"
msgpack "github.com/multiformats/go-multicodec/msgpack"
)
// Instances of migrateable can be read from a serialized format and migrated
@ -142,9 +142,9 @@ func (st *mapStateV4) unmarshal(r io.Reader) error {
func (st *mapStateV4) next() migrateable {
var mst5 mapStateV5
mst5.PinMap = make(map[string]api.PinSerial)
mst5.PinMap = make(map[string]pinSerialV5)
for k, v := range st.PinMap {
pinsv5 := api.PinSerial{}
pinsv5 := pinSerialV5{}
pinsv5.Cid = v.Cid
pinsv5.Type = uint64(api.DataType)
pinsv5.Allocations = v.Allocations
@ -167,8 +167,25 @@ func (st *mapStateV4) next() migrateable {
/* V5 */
type pinOptionsV5 struct {
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
Name string `json:"name"`
ShardSize uint64 `json:"shard_size"`
}
type pinSerialV5 struct {
pinOptionsV5
Cid string `json:"cid"`
Type uint64 `json:"type"`
Allocations []string `json:"allocations"`
MaxDepth int `json:"max_depth"`
Reference string `json:"reference"`
}
type mapStateV5 struct {
PinMap map[string]api.PinSerial // this has not changed
PinMap map[string]pinSerialV5
Version int
}
@ -179,8 +196,22 @@ func (st *mapStateV5) unmarshal(r io.Reader) error {
func (st *mapStateV5) next() migrateable {
v6 := NewMapState()
for _, v := range st.PinMap {
v6.Add(context.Background(), v.ToPin())
for k, v := range st.PinMap {
logger.Infof("migrating", k, v.Cid)
// we need to convert because we added codec struct fields
// and thus serialization is not the same.
p := api.PinSerial{}
p.Cid = v.Cid
p.Type = v.Type
p.Allocations = v.Allocations
p.MaxDepth = v.MaxDepth
p.Reference = v.Reference
p.ReplicationFactorMax = v.ReplicationFactorMax
p.ReplicationFactorMin = v.ReplicationFactorMin
p.Name = v.Name
p.ShardSize = v.ShardSize
v6.Add(context.Background(), p.ToPin())
}
return v6.(*MapState)
}