acbd7fda60
This adds a new "crdt" consensus component using go-ds-crdt. This implies several refactors to fully make cluster consensus-component independent: * Delete mapstate and fully adopt dsstate (after people have migrated). * Return errors from state methods rather than ignoring them. * Add a new "datastore" modules so that we can configure datastores in the main configuration like other components. * Let the consensus components fully define the "state.State". Thus, they do not receive the state, they receive the storage where we put the state (a go-datastore). * Allow to customize how the monitor component obtains Peers() (the current peerset), including avoiding using the current peerset. At the moment the crdt consensus uses the monitoring component to define the current peerset. Therefore the monitor component cannot rely on the consensus component to produce a peerset. * Re-factor/re-implementation of "ipfs-cluster-service state" operations. Includes the dissapearance of the "migrate" one. The CRDT consensus component defines creates a crdt-datastore (with ipfs-lite) and uses it to intitialize a dssate. Thus the crdt-store is elegantly wrapped. Any modifications to the state get automatically replicated to other peers. We store all the CRDT DAG blocks in the local datastore. The consensus components only expose a ReadOnly state, as any modifications to the shared state should happen through them. DHT and PubSub facilities must now be created outside of Cluster and passed in so they can be re-used by different components.
309 lines
7.5 KiB
Go
309 lines
7.5 KiB
Go
// Package dsstate implements the IPFS Cluster state interface using
|
|
// an underlying go-datastore.
|
|
package dsstate
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/state"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
ds "github.com/ipfs/go-datastore"
|
|
query "github.com/ipfs/go-datastore/query"
|
|
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
|
logging "github.com/ipfs/go-log"
|
|
codec "github.com/ugorji/go/codec"
|
|
|
|
trace "go.opencensus.io/trace"
|
|
)
|
|
|
|
var _ state.State = (*State)(nil)
|
|
var _ state.BatchingState = (*BatchingState)(nil)
|
|
|
|
var logger = logging.Logger("dsstate")
|
|
|
|
// State implements the IPFS Cluster "state" interface by wrapping
|
|
// a go-datastore and choosing how api.Pin objects are stored
|
|
// in it. It also provides serialization methods for the whole
|
|
// state which are datastore-independent.
|
|
type State struct {
|
|
dsRead ds.Read
|
|
dsWrite ds.Write
|
|
codecHandle codec.Handle
|
|
namespace ds.Key
|
|
version int
|
|
}
|
|
|
|
// DefaultHandle returns the codec handler of choice (Msgpack).
|
|
func DefaultHandle() codec.Handle {
|
|
h := &codec.MsgpackHandle{}
|
|
return h
|
|
}
|
|
|
|
// New returns a new state using the given datastore.
|
|
//
|
|
// All keys are namespaced with the given string when written. Thus the same
|
|
// go-datastore can be sharded for different uses.
|
|
//
|
|
// The Handle controls options for the serialization of the full state
|
|
// (marshaling/unmarshaling).
|
|
func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) {
|
|
if handle == nil {
|
|
handle = DefaultHandle()
|
|
}
|
|
|
|
st := &State{
|
|
dsRead: dstore,
|
|
dsWrite: dstore,
|
|
codecHandle: handle,
|
|
namespace: ds.NewKey(namespace),
|
|
}
|
|
|
|
return st, nil
|
|
}
|
|
|
|
// Add adds a new Pin or replaces an existing one.
|
|
func (st *State) Add(ctx context.Context, c *api.Pin) error {
|
|
_, span := trace.StartSpan(ctx, "state/dsstate/Add")
|
|
defer span.End()
|
|
|
|
ps, err := st.serializePin(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return st.dsWrite.Put(st.key(c.Cid), ps)
|
|
}
|
|
|
|
// Rm removes an existing Pin. It is a no-op when the
|
|
// item does not exist.
|
|
func (st *State) Rm(ctx context.Context, c cid.Cid) error {
|
|
_, span := trace.StartSpan(ctx, "state/dsstate/Rm")
|
|
defer span.End()
|
|
|
|
err := st.dsWrite.Delete(st.key(c))
|
|
if err == ds.ErrNotFound {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Get returns a Pin from the store and whether it
|
|
// was present. When not present, a default pin
|
|
// is returned.
|
|
func (st *State) Get(ctx context.Context, c cid.Cid) (*api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "state/dsstate/Get")
|
|
defer span.End()
|
|
|
|
v, err := st.dsRead.Get(st.key(c))
|
|
if err != nil {
|
|
if err == ds.ErrNotFound {
|
|
return nil, state.ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
p, err := st.deserializePin(c, v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// Has returns whether a Cid is stored.
|
|
func (st *State) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
|
_, span := trace.StartSpan(ctx, "state/dsstate/Has")
|
|
defer span.End()
|
|
|
|
ok, err := st.dsRead.Has(st.key(c))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return ok, nil
|
|
}
|
|
|
|
// List returns the unsorted list of all Pins that have been added to the
|
|
// datastore.
|
|
func (st *State) List(ctx context.Context) ([]*api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "state/dsstate/List")
|
|
defer span.End()
|
|
|
|
q := query.Query{
|
|
Prefix: st.namespace.String(),
|
|
}
|
|
|
|
results, err := st.dsRead.Query(q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer results.Close()
|
|
|
|
var pins []*api.Pin
|
|
|
|
for r := range results.Next() {
|
|
if r.Error != nil {
|
|
logger.Errorf("error in query result: %s", r.Error)
|
|
return pins, r.Error
|
|
}
|
|
k := ds.NewKey(r.Key)
|
|
ci, err := st.unkey(k)
|
|
if err != nil {
|
|
logger.Warning("bad key (ignoring). key: ", k, "error: ", err)
|
|
continue
|
|
}
|
|
|
|
p, err := st.deserializePin(ci, r.Value)
|
|
if err != nil {
|
|
logger.Errorf("error deserializing pin (%s): %s", r.Key, err)
|
|
continue
|
|
}
|
|
|
|
pins = append(pins, p)
|
|
}
|
|
return pins, nil
|
|
}
|
|
|
|
// Migrate migrates an older state version to the current one.
|
|
// This is a no-op for now.
|
|
func (st *State) Migrate(ctx context.Context, r io.Reader) error {
|
|
ctx, span := trace.StartSpan(ctx, "state/map/Migrate")
|
|
defer span.End()
|
|
return nil
|
|
}
|
|
|
|
type serialEntry struct {
|
|
Key string `codec:"k"`
|
|
Value []byte `codec:"v"`
|
|
}
|
|
|
|
// Marshal dumps the state to a writer. It does this by encoding every
|
|
// key/value in the store. The keys are stored without the namespace part to
|
|
// reduce the size of the snapshot.
|
|
func (st *State) Marshal(w io.Writer) error {
|
|
q := query.Query{
|
|
Prefix: st.namespace.String(),
|
|
}
|
|
|
|
results, err := st.dsRead.Query(q)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer results.Close()
|
|
|
|
enc := codec.NewEncoder(w, st.codecHandle)
|
|
|
|
for r := range results.Next() {
|
|
if r.Error != nil {
|
|
logger.Errorf("error in query result: %s", r.Error)
|
|
return r.Error
|
|
}
|
|
|
|
k := ds.NewKey(r.Key)
|
|
// reduce snapshot size by not storing the prefix
|
|
err := enc.Encode(serialEntry{
|
|
Key: k.BaseNamespace(),
|
|
Value: r.Value,
|
|
})
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Unmarshal reads and parses a previous dump of the state.
|
|
// All the parsed key/values are added to the store. As of now,
|
|
// Unmarshal does not empty the existing store from any values
|
|
// before unmarshaling from the given reader.
|
|
func (st *State) Unmarshal(r io.Reader) error {
|
|
dec := codec.NewDecoder(r, st.codecHandle)
|
|
for {
|
|
var entry serialEntry
|
|
if err := dec.Decode(&entry); err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
k := st.namespace.Child(ds.NewKey(entry.Key))
|
|
err := st.dsWrite.Put(k, entry.Value)
|
|
if err != nil {
|
|
logger.Error("error adding unmarshaled key to datastore:", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// convert Cid to /namespace/cidKey
|
|
func (st *State) key(c cid.Cid) ds.Key {
|
|
k := dshelp.CidToDsKey(c)
|
|
return st.namespace.Child(k)
|
|
}
|
|
|
|
// convert /namespace/cidKey to Cid
|
|
func (st *State) unkey(k ds.Key) (cid.Cid, error) {
|
|
return dshelp.DsKeyToCid(ds.NewKey(k.BaseNamespace()))
|
|
}
|
|
|
|
// this decides how a Pin object is serialized to be stored in the
|
|
// datastore. Changing this may require a migration!
|
|
func (st *State) serializePin(c *api.Pin) ([]byte, error) {
|
|
return c.ProtoMarshal()
|
|
}
|
|
|
|
// this deserializes a Pin object from the datastore. It should be
|
|
// the exact opposite from serializePin.
|
|
func (st *State) deserializePin(c cid.Cid, buf []byte) (*api.Pin, error) {
|
|
p := &api.Pin{}
|
|
err := p.ProtoUnmarshal(buf)
|
|
p.Cid = c
|
|
return p, err
|
|
}
|
|
|
|
// BatchingState implements the IPFS Cluster "state" interface by wrapping a
|
|
// batching go-datastore. All writes are batched and only written disk
|
|
// when Commit() is called.
|
|
type BatchingState struct {
|
|
*State
|
|
batch ds.Batch
|
|
}
|
|
|
|
// NewBatching returns a new batching statate using the given datastore.
|
|
//
|
|
// All keys are namespaced with the given string when written. Thus the same
|
|
// go-datastore can be sharded for different uses.
|
|
//
|
|
// The Handle controls options for the serialization of the full state
|
|
// (marshaling/unmarshaling).
|
|
func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) {
|
|
if handle == nil {
|
|
handle = DefaultHandle()
|
|
}
|
|
|
|
batch, err := dstore.Batch()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
st := &State{
|
|
dsRead: dstore,
|
|
dsWrite: batch,
|
|
codecHandle: handle,
|
|
namespace: ds.NewKey(namespace),
|
|
}
|
|
|
|
bst := &BatchingState{}
|
|
bst.State = st
|
|
bst.batch = batch
|
|
return bst, nil
|
|
}
|
|
|
|
// Commit persists the batched write operations.
|
|
func (bst *BatchingState) Commit(ctx context.Context) error {
|
|
ctx, span := trace.StartSpan(ctx, "state/dsstate/Commit")
|
|
defer span.End()
|
|
return bst.batch.Commit()
|
|
}
|