ipfs-cluster/state/mapstate/map_state.go

163 lines
4.3 KiB
Go
Raw Normal View History

// Package mapstate implements the State interface for IPFS Cluster by using
// a map to keep track of the consensus-shared state.
package mapstate
2016-12-02 18:33:39 +00:00
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"sync"
2016-12-02 18:33:39 +00:00
msgpack "github.com/multiformats/go-multicodec/msgpack"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/ipfs/ipfs-cluster/api"
2016-12-02 18:33:39 +00:00
)
// Version is the map state Version. States with old versions should
// perform an upgrade before.
const Version = 2
var logger = logging.Logger("mapstate")
// MapState is a very simple database to store the state of the system
// using a Go map. It is thread safe. It implements the State interface.
2016-12-02 18:33:39 +00:00
type MapState struct {
pinMux sync.RWMutex
PinMap map[string]api.PinSerial
Version int
2016-12-02 18:33:39 +00:00
}
// NewMapState initializes the internal map and returns a new MapState object.
func NewMapState() *MapState {
return &MapState{
PinMap: make(map[string]api.PinSerial),
Version: Version,
}
}
// Add adds a Pin to the internal map.
func (st *MapState) Add(c api.Pin) error {
st.pinMux.Lock()
defer st.pinMux.Unlock()
st.PinMap[c.Cid.String()] = c.ToSerial()
2016-12-02 18:33:39 +00:00
return nil
}
// Rm removes a Cid from the internal map.
func (st *MapState) Rm(c *cid.Cid) error {
st.pinMux.Lock()
defer st.pinMux.Unlock()
2016-12-02 18:33:39 +00:00
delete(st.PinMap, c.String())
return nil
}
// Get returns Pin information for a CID.
func (st *MapState) Get(c *cid.Cid) api.Pin {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
pins, ok := st.PinMap[c.String()]
if !ok { // make sure no panics
return api.Pin{}
}
return pins.ToPin()
}
// Has returns true if the Cid belongs to the State.
func (st *MapState) Has(c *cid.Cid) bool {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
_, ok := st.PinMap[c.String()]
return ok
}
// List provides the list of tracked Pins.
func (st *MapState) List() []api.Pin {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
cids := make([]api.Pin, 0, len(st.PinMap))
for _, v := range st.PinMap {
if v.Cid == "" {
continue
}
cids = append(cids, v.ToPin())
}
return cids
}
// Snapshot dumps the MapState to the given writer, in pretty json
// format.
func (st *MapState) Snapshot(w io.Writer) error {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(st)
}
// Restore restores a snapshot from the state's internal bytes. It should
// migrate the format if it is not compatible with the current version.
func (st *MapState) Restore(r io.Reader) error {
bs, err := ioutil.ReadAll(r)
if err != nil {
return err
}
err = st.Unmarshal(bs)
if st.Version == Version { // Unmarshal restored for us
return nil
}
bytesNoVersion := bs[1:] // Restore is aware of encoding format
err = st.migrateFrom(st.Version, bytesNoVersion)
if err != nil {
return err
}
st.Version = Version
return nil
}
// GetVersion returns the current version of this state object.
// It is not necessarily up to date
func (st *MapState) GetVersion() int {
return st.Version
}
// Marshal encodes the state using msgpack
func (st *MapState) Marshal() ([]byte, error) {
logger.Debugf("Marshal-- Marshalling state of version %d", st.Version)
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
if err := enc.Encode(st); err != nil {
return nil, err
}
// First byte indicates the version (probably should make this a varint
// if we stick to this encoding)
vCodec := make([]byte, 1)
vCodec[0] = byte(st.Version)
ret := append(vCodec, buf.Bytes()...)
logger.Debugf("Marshal-- The final marshaled bytes: %x", ret)
return ret, nil
}
// Unmarshal decodes the state using msgpack. It first decodes just
// the version number. If this is not the current version the bytes
// are stored within the state's internal reader, which can be migrated
// to the current version in a later call to restore. Note: Out of date
// version is not an error
func (st *MapState) Unmarshal(bs []byte) error {
// Check version byte
logger.Debugf("The incoming bytes to unmarshal: %x", bs)
v := int(bs[0])
logger.Debugf("The interpreted version: %d", v)
if v != Version { // snapshot is out of date
st.Version = v
return nil
}
// snapshot is up to date
buf := bytes.NewBuffer(bs[1:])
dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf)
return dec.Decode(st)
}