ipfs-cluster/state/dsstate/datastore.go
Hector Sanjuan acbd7fda60 Consensus: add new "crdt" consensus component
This adds a new "crdt" consensus component using go-ds-crdt.

This implies several refactors to fully make cluster consensus-component
independent:

* Delete mapstate and fully adopt dsstate (after people have migrated).
* Return errors from state methods rather than ignoring them.
* Add a new "datastore" modules so that we can configure datastores in the
   main configuration like other components.
* Let the consensus components fully define the "state.State". Thus, they do
not receive the state, they receive the storage where we put the state (a
go-datastore).
* Allow to customize how the monitor component obtains Peers() (the current
  peerset), including avoiding using the current peerset. At the moment the
  crdt consensus uses the monitoring component to define the current peerset.
  Therefore the monitor component cannot rely on the consensus component to
  produce a peerset.
* Re-factor/re-implementation of "ipfs-cluster-service state"
  operations. Includes the dissapearance of the "migrate" one.

The CRDT consensus component defines creates a crdt-datastore (with ipfs-lite)
and uses it to intitialize a dssate. Thus the crdt-store is elegantly
wrapped. Any modifications to the state get automatically replicated to other
peers. We store all the CRDT DAG blocks in the local datastore.

The consensus components only expose a ReadOnly state, as any modifications to
the shared state should happen through them.

DHT and PubSub facilities must now be created outside of Cluster and passed in
so they can be re-used by different components.
2019-04-17 19:14:26 +02:00

309 lines
7.5 KiB
Go

// Package dsstate implements the IPFS Cluster state interface using
// an underlying go-datastore.
package dsstate
import (
"context"
"io"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
logging "github.com/ipfs/go-log"
codec "github.com/ugorji/go/codec"
trace "go.opencensus.io/trace"
)
var _ state.State = (*State)(nil)
var _ state.BatchingState = (*BatchingState)(nil)
var logger = logging.Logger("dsstate")
// State implements the IPFS Cluster "state" interface by wrapping
// a go-datastore and choosing how api.Pin objects are stored
// in it. It also provides serialization methods for the whole
// state which are datastore-independent.
type State struct {
dsRead ds.Read
dsWrite ds.Write
codecHandle codec.Handle
namespace ds.Key
version int
}
// DefaultHandle returns the codec handler of choice (Msgpack).
func DefaultHandle() codec.Handle {
h := &codec.MsgpackHandle{}
return h
}
// New returns a new state using the given datastore.
//
// All keys are namespaced with the given string when written. Thus the same
// go-datastore can be sharded for different uses.
//
// The Handle controls options for the serialization of the full state
// (marshaling/unmarshaling).
func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) {
if handle == nil {
handle = DefaultHandle()
}
st := &State{
dsRead: dstore,
dsWrite: dstore,
codecHandle: handle,
namespace: ds.NewKey(namespace),
}
return st, nil
}
// Add adds a new Pin or replaces an existing one.
func (st *State) Add(ctx context.Context, c *api.Pin) error {
_, span := trace.StartSpan(ctx, "state/dsstate/Add")
defer span.End()
ps, err := st.serializePin(c)
if err != nil {
return err
}
return st.dsWrite.Put(st.key(c.Cid), ps)
}
// Rm removes an existing Pin. It is a no-op when the
// item does not exist.
func (st *State) Rm(ctx context.Context, c cid.Cid) error {
_, span := trace.StartSpan(ctx, "state/dsstate/Rm")
defer span.End()
err := st.dsWrite.Delete(st.key(c))
if err == ds.ErrNotFound {
return nil
}
return err
}
// Get returns a Pin from the store and whether it
// was present. When not present, a default pin
// is returned.
func (st *State) Get(ctx context.Context, c cid.Cid) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "state/dsstate/Get")
defer span.End()
v, err := st.dsRead.Get(st.key(c))
if err != nil {
if err == ds.ErrNotFound {
return nil, state.ErrNotFound
}
return nil, err
}
p, err := st.deserializePin(c, v)
if err != nil {
return nil, err
}
return p, nil
}
// Has returns whether a Cid is stored.
func (st *State) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, span := trace.StartSpan(ctx, "state/dsstate/Has")
defer span.End()
ok, err := st.dsRead.Has(st.key(c))
if err != nil {
return false, err
}
return ok, nil
}
// List returns the unsorted list of all Pins that have been added to the
// datastore.
func (st *State) List(ctx context.Context) ([]*api.Pin, error) {
_, span := trace.StartSpan(ctx, "state/dsstate/List")
defer span.End()
q := query.Query{
Prefix: st.namespace.String(),
}
results, err := st.dsRead.Query(q)
if err != nil {
return nil, err
}
defer results.Close()
var pins []*api.Pin
for r := range results.Next() {
if r.Error != nil {
logger.Errorf("error in query result: %s", r.Error)
return pins, r.Error
}
k := ds.NewKey(r.Key)
ci, err := st.unkey(k)
if err != nil {
logger.Warning("bad key (ignoring). key: ", k, "error: ", err)
continue
}
p, err := st.deserializePin(ci, r.Value)
if err != nil {
logger.Errorf("error deserializing pin (%s): %s", r.Key, err)
continue
}
pins = append(pins, p)
}
return pins, nil
}
// Migrate migrates an older state version to the current one.
// This is a no-op for now.
func (st *State) Migrate(ctx context.Context, r io.Reader) error {
ctx, span := trace.StartSpan(ctx, "state/map/Migrate")
defer span.End()
return nil
}
type serialEntry struct {
Key string `codec:"k"`
Value []byte `codec:"v"`
}
// Marshal dumps the state to a writer. It does this by encoding every
// key/value in the store. The keys are stored without the namespace part to
// reduce the size of the snapshot.
func (st *State) Marshal(w io.Writer) error {
q := query.Query{
Prefix: st.namespace.String(),
}
results, err := st.dsRead.Query(q)
if err != nil {
return err
}
defer results.Close()
enc := codec.NewEncoder(w, st.codecHandle)
for r := range results.Next() {
if r.Error != nil {
logger.Errorf("error in query result: %s", r.Error)
return r.Error
}
k := ds.NewKey(r.Key)
// reduce snapshot size by not storing the prefix
err := enc.Encode(serialEntry{
Key: k.BaseNamespace(),
Value: r.Value,
})
if err != nil {
logger.Error(err)
return err
}
}
return nil
}
// Unmarshal reads and parses a previous dump of the state.
// All the parsed key/values are added to the store. As of now,
// Unmarshal does not empty the existing store from any values
// before unmarshaling from the given reader.
func (st *State) Unmarshal(r io.Reader) error {
dec := codec.NewDecoder(r, st.codecHandle)
for {
var entry serialEntry
if err := dec.Decode(&entry); err == io.EOF {
break
} else if err != nil {
return err
}
k := st.namespace.Child(ds.NewKey(entry.Key))
err := st.dsWrite.Put(k, entry.Value)
if err != nil {
logger.Error("error adding unmarshaled key to datastore:", err)
return err
}
}
return nil
}
// convert Cid to /namespace/cidKey
func (st *State) key(c cid.Cid) ds.Key {
k := dshelp.CidToDsKey(c)
return st.namespace.Child(k)
}
// convert /namespace/cidKey to Cid
func (st *State) unkey(k ds.Key) (cid.Cid, error) {
return dshelp.DsKeyToCid(ds.NewKey(k.BaseNamespace()))
}
// this decides how a Pin object is serialized to be stored in the
// datastore. Changing this may require a migration!
func (st *State) serializePin(c *api.Pin) ([]byte, error) {
return c.ProtoMarshal()
}
// this deserializes a Pin object from the datastore. It should be
// the exact opposite from serializePin.
func (st *State) deserializePin(c cid.Cid, buf []byte) (*api.Pin, error) {
p := &api.Pin{}
err := p.ProtoUnmarshal(buf)
p.Cid = c
return p, err
}
// BatchingState implements the IPFS Cluster "state" interface by wrapping a
// batching go-datastore. All writes are batched and only written disk
// when Commit() is called.
type BatchingState struct {
*State
batch ds.Batch
}
// NewBatching returns a new batching statate using the given datastore.
//
// All keys are namespaced with the given string when written. Thus the same
// go-datastore can be sharded for different uses.
//
// The Handle controls options for the serialization of the full state
// (marshaling/unmarshaling).
func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) {
if handle == nil {
handle = DefaultHandle()
}
batch, err := dstore.Batch()
if err != nil {
return nil, err
}
st := &State{
dsRead: dstore,
dsWrite: batch,
codecHandle: handle,
namespace: ds.NewKey(namespace),
}
bst := &BatchingState{}
bst.State = st
bst.batch = batch
return bst, nil
}
// Commit persists the batched write operations.
func (bst *BatchingState) Commit(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "state/dsstate/Commit")
defer span.End()
return bst.batch.Commit()
}