ipfs-cluster/state/mapstate/map_state.go
Wyatt 47b744f1c0 ipfs-cluster-service state upgrade cli command
ipfs-cluster-service now has a migration subcommand that upgrades
    persistant state snapshots with an out-of-date format version to the
    newest version of raft state. If all cluster members shutdown with
    consistent state, upgrade ipfs-cluster, and run the state upgrade command,
    the new version of cluster will be compatible with persistent storage.
    ipfs-cluster now validates its persistent state upon loading it and exits
    with a clear error in the case the state format version is not up to date.

    Raft snapshotting is enforced on all shutdowns and the json backup is no
    longer run.  This commit makes use of recent changes to libp2p-raft
    allowing raft states to implement their own marshaling strategies. Now
    mapstate handles the logic for its (de)serialization.  In the interest of
    supporting various potential upgrade formats the state serialization
    begins with a varint (right now one byte) describing the version.

    Some go tests are modified and a go test is added to cover new ipfs-cluster
    raft snapshot reading functions.  Sharness tests are added to cover the
    state upgrade command.
2017-11-28 22:35:48 -05:00

166 lines
4.3 KiB
Go

// Package mapstate implements the State interface for IPFS Cluster by using
// a map to keep track of the consensus-shared state.
package mapstate
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"sync"
msgpack "github.com/multiformats/go-multicodec/msgpack"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/ipfs/ipfs-cluster/api"
)
// Version is the map state Version. States with old versions should
// perform an upgrade before.
const Version = 2
var logger = logging.Logger("mapstate")
// MapState is a very simple database to store the state of the system
// using a Go map. It is thread safe. It implements the State interface.
type MapState struct {
pinMux sync.RWMutex
PinMap map[string]api.PinSerial
Version int
}
// NewMapState initializes the internal map and returns a new MapState object.
func NewMapState() *MapState {
return &MapState{
PinMap: make(map[string]api.PinSerial),
Version: Version,
}
}
// Add adds a Pin to the internal map.
func (st *MapState) Add(c api.Pin) error {
st.pinMux.Lock()
defer st.pinMux.Unlock()
st.PinMap[c.Cid.String()] = c.ToSerial()
return nil
}
// Rm removes a Cid from the internal map.
func (st *MapState) Rm(c *cid.Cid) error {
st.pinMux.Lock()
defer st.pinMux.Unlock()
delete(st.PinMap, c.String())
return nil
}
// Get returns Pin information for a CID.
func (st *MapState) Get(c *cid.Cid) api.Pin {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
pins, ok := st.PinMap[c.String()]
if !ok { // make sure no panics
return api.Pin{}
}
return pins.ToPin()
}
// Has returns true if the Cid belongs to the State.
func (st *MapState) Has(c *cid.Cid) bool {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
_, ok := st.PinMap[c.String()]
return ok
}
// List provides the list of tracked Pins.
func (st *MapState) List() []api.Pin {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
cids := make([]api.Pin, 0, len(st.PinMap))
for _, v := range st.PinMap {
if v.Cid == "" {
continue
}
cids = append(cids, v.ToPin())
}
return cids
}
// Snapshot dumps the MapState to the given writer, in pretty json
// format.
func (st *MapState) Snapshot(w io.Writer) error {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(st)
}
// Restore restores a snapshot from the state's internal bytes. It should
// migrate the format if it is not compatible with the current version.
func (st *MapState) Restore(r io.Reader) error {
bs, err := ioutil.ReadAll(r)
if err != nil {
return err
}
err = st.Unmarshal(bs)
if st.Version == Version { // Unmarshal restored for us
return nil
}
bytesNoVersion := bs[1:] // Restore is aware of encoding format
err = st.migrateFrom(st.Version, bytesNoVersion)
if err != nil {
return err
}
st.Version = Version
return nil
}
// GetVersion returns the current version of this state object.
// It is not necessarily up to date
func (st *MapState) GetVersion() int {
return st.Version
}
// Marshal encodes the state using msgpack
func (st *MapState) Marshal() ([]byte, error) {
logger.Debugf("Marshal-- Marshalling state of version %d", st.Version)
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
if err := enc.Encode(st); err != nil {
return nil, err
}
// First byte indicates the version (probably should make this a varint
// if we stick to this encoding)
vCodec := make([]byte, 1)
vCodec[0] = byte(st.Version)
ret := append(vCodec, buf.Bytes()...)
logger.Debugf("Marshal-- The final marshaled bytes: %x", ret)
return ret, nil
}
// Unmarshal decodes the state using msgpack. It first decodes just
// the version number. If this is not the current version the bytes
// are stored within the state's internal reader, which can be migrated
// to the current version in a later call to restore. Note: Out of date
// version is not an error
func (st *MapState) Unmarshal(bs []byte) error {
// Check version byte
logger.Debugf("The incoming bytes to unmarshal: %x", bs)
v := int(bs[0])
logger.Debugf("The interpreted version: %d", v)
if v != Version { // snapshot is out of date
st.Version = v
return nil
}
// snapshot is up to date
buf := bytes.NewBuffer(bs[1:])
dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf)
if err := dec.Decode(st); err != nil {
return err
}
return nil
}