ipfs-cluster/state/dsstate/datastore.go
Hector Sanjuan 9b9d76f92d Pinset streaming and method type revamp
This commit introduces the new go-libp2p-gorpc streaming capabilities for
Cluster. The main aim is to work towards heavily reducing memory usage when
working with very large pinsets.

As a side-effect, it takes the chance to revampt all types for all public
methods so that pointers to static what should be static objects are not used
anymore. This should heavily reduce heap allocations and GC activity.

The main change is that state.List now returns a channel from which to read
the pins, rather than pins being all loaded into a huge slice.

Things reading pins have been all updated to iterate on the channel rather
than on the slice. The full pinset is no longer fully loaded onto memory for
things that run regularly like StateSync().

Additionally, the /allocations endpoint of the rest API no longer returns an
array of pins, but rather streams json-encoded pin objects directly. This
change has extended to the restapi client (which puts pins into a channel as
they arrive) and to ipfs-cluster-ctl.

There are still pending improvements like StatusAll() calls which should also
stream responses, and specially BlockPut calls which should stream blocks
directly into IPFS on a single call.

These are coming up in future commits.
2022-03-19 03:02:55 +01:00

340 lines
8.2 KiB
Go

// Package dsstate implements the IPFS Cluster state interface using
// an underlying go-datastore.
package dsstate
import (
"context"
"io"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
logging "github.com/ipfs/go-log/v2"
codec "github.com/ugorji/go/codec"
trace "go.opencensus.io/trace"
)
var _ state.State = (*State)(nil)
var _ state.BatchingState = (*BatchingState)(nil)
var logger = logging.Logger("dsstate")
// State implements the IPFS Cluster "state" interface by wrapping
// a go-datastore and choosing how api.Pin objects are stored
// in it. It also provides serialization methods for the whole
// state which are datastore-independent.
type State struct {
dsRead ds.Read
dsWrite ds.Write
codecHandle codec.Handle
namespace ds.Key
// version int
}
// DefaultHandle returns the codec handler of choice (Msgpack).
func DefaultHandle() codec.Handle {
h := &codec.MsgpackHandle{}
return h
}
// New returns a new state using the given datastore.
//
// All keys are namespaced with the given string when written. Thus the same
// go-datastore can be sharded for different uses.
//
// The Handle controls options for the serialization of the full state
// (marshaling/unmarshaling).
func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) {
if handle == nil {
handle = DefaultHandle()
}
st := &State{
dsRead: dstore,
dsWrite: dstore,
codecHandle: handle,
namespace: ds.NewKey(namespace),
}
return st, nil
}
// Add adds a new Pin or replaces an existing one.
func (st *State) Add(ctx context.Context, c api.Pin) error {
_, span := trace.StartSpan(ctx, "state/dsstate/Add")
defer span.End()
ps, err := st.serializePin(c)
if err != nil {
return err
}
return st.dsWrite.Put(ctx, st.key(c.Cid), ps)
}
// Rm removes an existing Pin. It is a no-op when the
// item does not exist.
func (st *State) Rm(ctx context.Context, c cid.Cid) error {
_, span := trace.StartSpan(ctx, "state/dsstate/Rm")
defer span.End()
err := st.dsWrite.Delete(ctx, st.key(c))
if err == ds.ErrNotFound {
return nil
}
return err
}
// Get returns a Pin from the store and whether it
// was present. When not present, a default pin
// is returned.
func (st *State) Get(ctx context.Context, c cid.Cid) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "state/dsstate/Get")
defer span.End()
v, err := st.dsRead.Get(ctx, st.key(c))
if err != nil {
if err == ds.ErrNotFound {
return api.Pin{}, state.ErrNotFound
}
return api.Pin{}, err
}
p, err := st.deserializePin(c, v)
if err != nil {
return api.Pin{}, err
}
return p, nil
}
// Has returns whether a Cid is stored.
func (st *State) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, span := trace.StartSpan(ctx, "state/dsstate/Has")
defer span.End()
ok, err := st.dsRead.Has(ctx, st.key(c))
if err != nil {
return false, err
}
return ok, nil
}
// List returns the unsorted list of all Pins that have been added to the
// datastore.
func (st *State) List(ctx context.Context) (<-chan api.Pin, error) {
_, span := trace.StartSpan(ctx, "state/dsstate/List")
defer span.End()
q := query.Query{
Prefix: st.namespace.String(),
}
results, err := st.dsRead.Query(ctx, q)
if err != nil {
return nil, err
}
pinsCh := make(chan api.Pin, 1024)
go func() {
defer close(pinsCh)
defer results.Close()
total := 0
for r := range results.Next() {
// Abort if we shutdown.
select {
case <-ctx.Done():
logger.Warningf("Full pinset listing aborted: %s", ctx.Err())
return
default:
}
if r.Error != nil {
logger.Errorf("error in query result: %s", r.Error)
return
}
k := ds.NewKey(r.Key)
ci, err := st.unkey(k)
if err != nil {
logger.Warn("bad key (ignoring). key: ", k, "error: ", err)
continue
}
p, err := st.deserializePin(ci, r.Value)
if err != nil {
logger.Errorf("error deserializing pin (%s): %s", r.Key, err)
continue
}
pinsCh <- p
if total > 0 && total%500000 == 0 {
logger.Infof("Full pinset listing in progress: %d pins so far", total)
}
total++
}
if total >= 500000 {
logger.Infof("Full pinset listing finished: %d pins", total)
}
}()
return pinsCh, nil
}
// Migrate migrates an older state version to the current one.
// This is a no-op for now.
func (st *State) Migrate(ctx context.Context, r io.Reader) error {
return nil
}
type serialEntry struct {
Key string `codec:"k"`
Value []byte `codec:"v"`
}
// Marshal dumps the state to a writer. It does this by encoding every
// key/value in the store. The keys are stored without the namespace part to
// reduce the size of the snapshot.
func (st *State) Marshal(w io.Writer) error {
q := query.Query{
Prefix: st.namespace.String(),
}
results, err := st.dsRead.Query(context.Background(), q)
if err != nil {
return err
}
defer results.Close()
enc := codec.NewEncoder(w, st.codecHandle)
for r := range results.Next() {
if r.Error != nil {
logger.Errorf("error in query result: %s", r.Error)
return r.Error
}
k := ds.NewKey(r.Key)
// reduce snapshot size by not storing the prefix
err := enc.Encode(serialEntry{
Key: k.BaseNamespace(),
Value: r.Value,
})
if err != nil {
logger.Error(err)
return err
}
}
return nil
}
// Unmarshal reads and parses a previous dump of the state.
// All the parsed key/values are added to the store. As of now,
// Unmarshal does not empty the existing store from any values
// before unmarshaling from the given reader.
func (st *State) Unmarshal(r io.Reader) error {
dec := codec.NewDecoder(r, st.codecHandle)
for {
var entry serialEntry
if err := dec.Decode(&entry); err == io.EOF {
break
} else if err != nil {
return err
}
k := st.namespace.Child(ds.NewKey(entry.Key))
err := st.dsWrite.Put(context.Background(), k, entry.Value)
if err != nil {
logger.Error("error adding unmarshaled key to datastore:", err)
return err
}
}
return nil
}
// used to be on go-ipfs-ds-help
func cidToDsKey(c cid.Cid) ds.Key {
return dshelp.NewKeyFromBinary(c.Bytes())
}
// used to be on go-ipfs-ds-help
func dsKeyToCid(k ds.Key) (cid.Cid, error) {
kb, err := dshelp.BinaryFromDsKey(k)
if err != nil {
return cid.Undef, err
}
return cid.Cast(kb)
}
// convert Cid to /namespace/cid1Key
func (st *State) key(c cid.Cid) ds.Key {
k := cidToDsKey(c)
return st.namespace.Child(k)
}
// convert /namespace/cidKey to Cid
func (st *State) unkey(k ds.Key) (cid.Cid, error) {
return dsKeyToCid(ds.NewKey(k.BaseNamespace()))
}
// this decides how a Pin object is serialized to be stored in the
// datastore. Changing this may require a migration!
func (st *State) serializePin(c api.Pin) ([]byte, error) {
return c.ProtoMarshal()
}
// this deserializes a Pin object from the datastore. It should be
// the exact opposite from serializePin.
func (st *State) deserializePin(c cid.Cid, buf []byte) (api.Pin, error) {
p := api.Pin{}
err := p.ProtoUnmarshal(buf)
p.Cid = c
return p, err
}
// BatchingState implements the IPFS Cluster "state" interface by wrapping a
// batching go-datastore. All writes are batched and only written disk
// when Commit() is called.
type BatchingState struct {
*State
batch ds.Batch
}
// NewBatching returns a new batching statate using the given datastore.
//
// All keys are namespaced with the given string when written. Thus the same
// go-datastore can be sharded for different uses.
//
// The Handle controls options for the serialization of the full state
// (marshaling/unmarshaling).
func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) {
if handle == nil {
handle = DefaultHandle()
}
batch, err := dstore.Batch(context.Background())
if err != nil {
return nil, err
}
st := &State{
dsRead: dstore,
dsWrite: batch,
codecHandle: handle,
namespace: ds.NewKey(namespace),
}
bst := &BatchingState{}
bst.State = st
bst.batch = batch
return bst, nil
}
// Commit persists the batched write operations.
func (bst *BatchingState) Commit(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "state/dsstate/Commit")
defer span.End()
return bst.batch.Commit(ctx)
}