ipfs-cluster/state/dsstate/datastore.go
Hector Sanjuan 6447ea51d2 Remove *Serial types. Use pointers for all types.
This takes advantange of the latest features in go-cid, peer.ID and
go-multiaddr and makes the Go types serializable by default.

This means we no longer need to copy between Pin <-> PinSerial, or ID <->
IDSerial etc. We can now efficiently binary-encode these types using short
field keys and without parsing/stringifying (in many cases it just a cast).

We still get the same json output as before (with minor modifications for
Cids).

This should greatly improve Cluster performance and memory usage when dealing
with large collections of items.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2019-02-27 17:04:35 +00:00

272 lines
6.4 KiB
Go

// Package dsstate implements the IPFS Cluster state interface using
// an underlying go-datastore.
package dsstate
import (
"context"
"io"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
logging "github.com/ipfs/go-log"
codec "github.com/ugorji/go/codec"
trace "go.opencensus.io/trace"
)
var _ state.State = (*State)(nil)
var logger = logging.Logger("dsstate")
// State implements the IPFS Cluster "state" interface by wrapping
// a go-datastore and choosing how api.Pin objects are stored
// in it. It also provides serialization methods for the whole
// state which are datastore-independent.
type State struct {
ds ds.Datastore
codecHandle codec.Handle
namespace ds.Key
version int
}
// DefaultHandle returns the codec handler of choice (Msgpack).
func DefaultHandle() codec.Handle {
h := &codec.MsgpackHandle{}
return h
}
// New returns a new state using the given datastore.
//
// All keys are namespaced with the given string when written. Thus the same
// go-datastore can be sharded for different uses.
//
//
// The Handle controls options for the serialization of items and the state
// itself.
func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) {
if handle == nil {
handle = DefaultHandle()
}
st := &State{
ds: dstore,
codecHandle: handle,
namespace: ds.NewKey(namespace),
version: 0, // TODO: Remove when all migrated
}
return st, nil
}
// Add adds a new Pin or replaces an existing one.
func (st *State) Add(ctx context.Context, c *api.Pin) error {
_, span := trace.StartSpan(ctx, "state/dsstate/Add")
defer span.End()
ps, err := st.serializePin(c)
if err != nil {
return err
}
return st.ds.Put(st.key(c.Cid), ps)
}
// Rm removes an existing Pin. It is a no-op when the
// item does not exist.
func (st *State) Rm(ctx context.Context, c cid.Cid) error {
_, span := trace.StartSpan(ctx, "state/dsstate/Rm")
defer span.End()
err := st.ds.Delete(st.key(c))
if err == ds.ErrNotFound {
return nil
}
return err
}
// Get returns a Pin from the store and whether it
// was present. When not present, a default pin
// is returned.
func (st *State) Get(ctx context.Context, c cid.Cid) (*api.Pin, bool) {
_, span := trace.StartSpan(ctx, "state/dsstate/Get")
defer span.End()
v, err := st.ds.Get(st.key(c))
if err != nil {
return api.PinCid(c), false
}
p, err := st.deserializePin(c, v)
if err != nil {
return api.PinCid(c), false
}
return p, true
}
// Has returns whether a Cid is stored.
func (st *State) Has(ctx context.Context, c cid.Cid) bool {
_, span := trace.StartSpan(ctx, "state/dsstate/Has")
defer span.End()
ok, err := st.ds.Has(st.key(c))
if err != nil {
logger.Error(err)
}
return ok && err == nil
}
// List returns the unsorted list of all Pins that have been added to the
// datastore.
func (st *State) List(ctx context.Context) []*api.Pin {
_, span := trace.StartSpan(ctx, "state/dsstate/List")
defer span.End()
q := query.Query{
Prefix: st.namespace.String(),
}
results, err := st.ds.Query(q)
if err != nil {
return []*api.Pin{}
}
defer results.Close()
var pins []*api.Pin
for r := range results.Next() {
if r.Error != nil {
logger.Errorf("error in query result: %s", r.Error)
return pins
}
k := ds.NewKey(r.Key)
ci, err := st.unkey(k)
if err != nil {
logger.Error("key: ", k, "error: ", err)
logger.Error(string(r.Value))
continue
}
p, err := st.deserializePin(ci, r.Value)
if err != nil {
logger.Errorf("error deserializing pin (%s): %s", r.Key, err)
continue
}
pins = append(pins, p)
}
return pins
}
// Migrate migrates an older state version to the current one.
// This is a no-op for now.
func (st *State) Migrate(ctx context.Context, r io.Reader) error {
ctx, span := trace.StartSpan(ctx, "state/map/Migrate")
defer span.End()
return nil
}
// GetVersion returns the current state version.
func (st *State) GetVersion() int {
return st.version
}
// SetVersion allows to manually modify the state version.
func (st *State) SetVersion(v int) error {
st.version = v
return nil
}
type serialEntry struct {
Key string `codec:"k"`
Value []byte `codec:"v"`
}
// Marshal dumps the state to a writer. It does this by encoding every
// key/value in the store. The keys are stored without the namespace part to
// reduce the size of the snapshot.
func (st *State) Marshal(w io.Writer) error {
q := query.Query{
Prefix: st.namespace.String(),
}
results, err := st.ds.Query(q)
if err != nil {
return err
}
defer results.Close()
enc := codec.NewEncoder(w, st.codecHandle)
for r := range results.Next() {
if r.Error != nil {
logger.Errorf("error in query result: %s", r.Error)
return r.Error
}
k := ds.NewKey(r.Key)
// reduce snapshot size by not storing the prefix
err := enc.Encode(serialEntry{
Key: k.BaseNamespace(),
Value: r.Value,
})
if err != nil {
logger.Error(err)
return err
}
}
return nil
}
// Unmarshal reads and parses a previous dump of the state.
// All the parsed key/values are added to the store. As of now,
// Unmarshal does not empty the existing store from any values
// before unmarshaling from the given reader.
func (st *State) Unmarshal(r io.Reader) error {
dec := codec.NewDecoder(r, st.codecHandle)
for {
var entry serialEntry
if err := dec.Decode(&entry); err == io.EOF {
break
} else if err != nil {
return err
}
k := st.namespace.Child(ds.NewKey(entry.Key))
err := st.ds.Put(k, entry.Value)
if err != nil {
logger.Error("error adding unmarshaled key to datastore:", err)
return err
}
}
return nil
}
// convert Cid to /namespace/cidKey
func (st *State) key(c cid.Cid) ds.Key {
k := dshelp.CidToDsKey(c)
return st.namespace.Child(k)
}
// convert /namespace/cidKey to Cid
func (st *State) unkey(k ds.Key) (cid.Cid, error) {
return dshelp.DsKeyToCid(ds.NewKey(k.BaseNamespace()))
}
// this decides how a Pin object is serialized to be stored in the
// datastore. Changing this may require a migration!
func (st *State) serializePin(c *api.Pin) ([]byte, error) {
return c.ProtoMarshal()
}
// this deserializes a Pin object from the datastore. It should be
// the exact opposite from serializePin.
func (st *State) deserializePin(c cid.Cid, buf []byte) (*api.Pin, error) {
p := &api.Pin{}
err := p.ProtoUnmarshal(buf)
p.Cid = c
return p, err
}