a97ed10d0b
This commit introduces an api.Cid type and replaces the usage of cid.Cid everywhere. The main motivation here is to override MarshalJSON so that Cids are JSON-ified as '"Qm...."' instead of '{ "/": "Qm....." }', as this "ipld" representation of IDs is horrible to work with, and our APIs are not issuing IPLD objects to start with. Unfortunately, there is no way to do this cleanly, and the best way is to just switch everything to our own type.
668 lines
16 KiB
Go
668 lines
16 KiB
Go
// Package crdt implements the IPFS Cluster consensus interface using
|
|
// CRDT-datastore to replicate the cluster global state to every peer.
|
|
package crdt
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/pstoremgr"
|
|
"github.com/ipfs/ipfs-cluster/state"
|
|
"github.com/ipfs/ipfs-cluster/state/dsstate"
|
|
|
|
ds "github.com/ipfs/go-datastore"
|
|
namespace "github.com/ipfs/go-datastore/namespace"
|
|
query "github.com/ipfs/go-datastore/query"
|
|
crdt "github.com/ipfs/go-ds-crdt"
|
|
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
host "github.com/libp2p/go-libp2p-core/host"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
|
|
"github.com/libp2p/go-libp2p-core/routing"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
multihash "github.com/multiformats/go-multihash"
|
|
|
|
ipfslite "github.com/hsanjuan/ipfs-lite"
|
|
trace "go.opencensus.io/trace"
|
|
)
|
|
|
|
var logger = logging.Logger("crdt")
|
|
|
|
var (
|
|
blocksNs = "b" // blockstore namespace
|
|
connMgrTag = "crdt"
|
|
)
|
|
|
|
// Common variables for the module.
|
|
var (
|
|
ErrNoLeader = errors.New("crdt consensus component does not provide a leader")
|
|
ErrRmPeer = errors.New("crdt consensus component cannot remove peers")
|
|
ErrMaxQueueSizeReached = errors.New("batching max_queue_size reached. Too many operations are waiting to be batched. Try increasing the max_queue_size or adjusting the batching options")
|
|
)
|
|
|
|
// wraps pins so that they can be batched.
|
|
type batchItem struct {
|
|
ctx context.Context
|
|
isPin bool // pin or unpin
|
|
pin api.Pin
|
|
}
|
|
|
|
// Consensus implement ipfscluster.Consensus and provides the facility to add
|
|
// and remove pins from the Cluster shared state. It uses a CRDT-backed
|
|
// implementation of go-datastore (go-ds-crdt).
|
|
type Consensus struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
config *Config
|
|
|
|
trustedPeers sync.Map
|
|
|
|
host host.Host
|
|
peerManager *pstoremgr.Manager
|
|
|
|
store ds.Datastore
|
|
namespace ds.Key
|
|
|
|
state state.State
|
|
batchingState state.BatchingState
|
|
crdt *crdt.Datastore
|
|
ipfs *ipfslite.Peer
|
|
|
|
dht routing.Routing
|
|
pubsub *pubsub.PubSub
|
|
|
|
rpcClient *rpc.Client
|
|
rpcReady chan struct{}
|
|
stateReady chan struct{}
|
|
readyCh chan struct{}
|
|
batchItemCh chan batchItem
|
|
|
|
shutdownLock sync.RWMutex
|
|
shutdown bool
|
|
}
|
|
|
|
// New creates a new crdt Consensus component. The given PubSub will be used to
|
|
// broadcast new heads. The given thread-safe datastore will be used to persist
|
|
// data and all will be prefixed with cfg.DatastoreNamespace.
|
|
func New(
|
|
host host.Host,
|
|
dht routing.Routing,
|
|
pubsub *pubsub.PubSub,
|
|
cfg *Config,
|
|
store ds.Datastore,
|
|
) (*Consensus, error) {
|
|
err := cfg.Validate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
var blocksDatastore ds.Batching
|
|
ns := ds.NewKey(cfg.DatastoreNamespace)
|
|
blocksDatastore = namespace.Wrap(store, ns.ChildString(blocksNs))
|
|
|
|
ipfs, err := ipfslite.New(
|
|
ctx,
|
|
blocksDatastore,
|
|
host,
|
|
dht,
|
|
&ipfslite.Config{
|
|
Offline: false,
|
|
},
|
|
)
|
|
if err != nil {
|
|
logger.Errorf("error creating ipfs-lite: %s", err)
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
css := &Consensus{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
config: cfg,
|
|
host: host,
|
|
peerManager: pstoremgr.New(ctx, host, ""),
|
|
dht: dht,
|
|
store: store,
|
|
ipfs: ipfs,
|
|
namespace: ns,
|
|
pubsub: pubsub,
|
|
rpcReady: make(chan struct{}, 1),
|
|
readyCh: make(chan struct{}, 1),
|
|
stateReady: make(chan struct{}, 1),
|
|
batchItemCh: make(chan batchItem, cfg.Batching.MaxQueueSize),
|
|
}
|
|
|
|
go css.setup()
|
|
return css, nil
|
|
}
|
|
|
|
func (css *Consensus) setup() {
|
|
select {
|
|
case <-css.ctx.Done():
|
|
return
|
|
case <-css.rpcReady:
|
|
}
|
|
|
|
// Set up a fast-lookup trusted peers cache.
|
|
// Protect these peers in the ConnMgr
|
|
for _, p := range css.config.TrustedPeers {
|
|
css.Trust(css.ctx, p)
|
|
}
|
|
|
|
// Hash the cluster name and produce the topic name from there
|
|
// as a way to avoid pubsub topic collisions with other
|
|
// pubsub applications potentially when both potentially use
|
|
// simple names like "test".
|
|
topicName := css.config.ClusterName
|
|
topicHash, err := multihash.Sum([]byte(css.config.ClusterName), multihash.MD5, -1)
|
|
if err != nil {
|
|
logger.Errorf("error hashing topic: %s", err)
|
|
} else {
|
|
topicName = topicHash.B58String()
|
|
}
|
|
|
|
// Validate pubsub messages for our topic (only accept
|
|
// from trusted sources)
|
|
err = css.pubsub.RegisterTopicValidator(
|
|
topicName,
|
|
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) bool {
|
|
signer := msg.GetFrom()
|
|
trusted := css.IsTrustedPeer(ctx, signer)
|
|
if !trusted {
|
|
logger.Debug("discarded pubsub message from non trusted source %s ", signer)
|
|
}
|
|
return trusted
|
|
},
|
|
)
|
|
if err != nil {
|
|
logger.Errorf("error registering topic validator: %s", err)
|
|
}
|
|
|
|
broadcaster, err := crdt.NewPubSubBroadcaster(
|
|
css.ctx,
|
|
css.pubsub,
|
|
topicName, // subscription name
|
|
)
|
|
if err != nil {
|
|
logger.Errorf("error creating broadcaster: %s", err)
|
|
return
|
|
}
|
|
|
|
opts := crdt.DefaultOptions()
|
|
opts.RebroadcastInterval = css.config.RebroadcastInterval
|
|
opts.DAGSyncerTimeout = 2 * time.Minute
|
|
opts.Logger = logger
|
|
opts.RepairInterval = css.config.RepairInterval
|
|
opts.MultiHeadProcessing = false
|
|
opts.NumWorkers = 50
|
|
opts.PutHook = func(k ds.Key, v []byte) {
|
|
ctx, span := trace.StartSpan(css.ctx, "crdt/PutHook")
|
|
defer span.End()
|
|
|
|
pin := api.Pin{}
|
|
err := pin.ProtoUnmarshal(v)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return
|
|
}
|
|
|
|
// TODO: tracing for this context
|
|
err = css.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"PinTracker",
|
|
"Track",
|
|
pin,
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
logger.Infof("new pin added: %s", pin.Cid)
|
|
}
|
|
opts.DeleteHook = func(k ds.Key) {
|
|
ctx, span := trace.StartSpan(css.ctx, "crdt/DeleteHook")
|
|
defer span.End()
|
|
|
|
kb, err := dshelp.BinaryFromDsKey(k)
|
|
if err != nil {
|
|
logger.Error(err, k)
|
|
return
|
|
}
|
|
c, err := api.CastCid(kb)
|
|
if err != nil {
|
|
logger.Error(err, k)
|
|
return
|
|
}
|
|
|
|
pin := api.PinCid(c)
|
|
|
|
err = css.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"PinTracker",
|
|
"Untrack",
|
|
pin,
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
logger.Infof("pin removed: %s", c)
|
|
}
|
|
|
|
crdt, err := crdt.New(
|
|
css.store,
|
|
css.namespace,
|
|
css.ipfs,
|
|
broadcaster,
|
|
opts,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return
|
|
}
|
|
|
|
css.crdt = crdt
|
|
|
|
clusterState, err := dsstate.New(
|
|
css.crdt,
|
|
// unsure if we should set something else but crdt is already
|
|
// namespaced and this would only namespace the keys, which only
|
|
// complicates things.
|
|
"",
|
|
dsstate.DefaultHandle(),
|
|
)
|
|
if err != nil {
|
|
logger.Errorf("error creating cluster state datastore: %s", err)
|
|
return
|
|
}
|
|
css.state = clusterState
|
|
|
|
batchingState, err := dsstate.NewBatching(
|
|
css.crdt,
|
|
"",
|
|
dsstate.DefaultHandle(),
|
|
)
|
|
if err != nil {
|
|
logger.Errorf("error creating cluster state batching datastore: %s", err)
|
|
return
|
|
}
|
|
css.batchingState = batchingState
|
|
|
|
if css.config.TrustAll {
|
|
logger.Info("'trust all' mode enabled. Any peer in the cluster can modify the pinset.")
|
|
}
|
|
|
|
// launch batching workers
|
|
if css.config.batchingEnabled() {
|
|
logger.Infof("'crdt batching' enabled: %d items / %s",
|
|
css.config.Batching.MaxBatchSize,
|
|
css.config.Batching.MaxBatchAge.String(),
|
|
)
|
|
go css.batchWorker()
|
|
}
|
|
|
|
// notifies State() it is safe to return
|
|
close(css.stateReady)
|
|
css.readyCh <- struct{}{}
|
|
}
|
|
|
|
// Shutdown closes this component, cancelling the pubsub subscription and
|
|
// closing the datastore.
|
|
func (css *Consensus) Shutdown(ctx context.Context) error {
|
|
css.shutdownLock.Lock()
|
|
defer css.shutdownLock.Unlock()
|
|
|
|
if css.shutdown {
|
|
logger.Debug("already shutdown")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("stopping Consensus component")
|
|
|
|
css.cancel()
|
|
|
|
// Only close crdt after cancelling the context, otherwise
|
|
// the pubsub broadcaster stays on and locks it.
|
|
if crdt := css.crdt; crdt != nil {
|
|
crdt.Close()
|
|
}
|
|
|
|
if css.config.hostShutdown {
|
|
css.host.Close()
|
|
}
|
|
|
|
css.shutdown = true
|
|
close(css.rpcReady)
|
|
return nil
|
|
}
|
|
|
|
// SetClient gives the component the ability to communicate and
|
|
// leaves it ready to use.
|
|
func (css *Consensus) SetClient(c *rpc.Client) {
|
|
css.rpcClient = c
|
|
css.rpcReady <- struct{}{}
|
|
}
|
|
|
|
// Ready returns a channel which is signalled when the component
|
|
// is ready to use.
|
|
func (css *Consensus) Ready(ctx context.Context) <-chan struct{} {
|
|
return css.readyCh
|
|
}
|
|
|
|
// IsTrustedPeer returns whether the given peer is taken into account
|
|
// when submitting updates to the consensus state.
|
|
func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
|
|
_, span := trace.StartSpan(ctx, "consensus/IsTrustedPeer")
|
|
defer span.End()
|
|
|
|
if css.config.TrustAll {
|
|
return true
|
|
}
|
|
|
|
if pid == css.host.ID() {
|
|
return true
|
|
}
|
|
|
|
_, ok := css.trustedPeers.Load(pid)
|
|
return ok
|
|
}
|
|
|
|
// Trust marks a peer as "trusted". It makes sure it is trusted as issuer
|
|
// for pubsub updates, it is protected in the connection manager, it
|
|
// has the highest priority when the peerstore is saved, and it's addresses
|
|
// are always remembered.
|
|
func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error {
|
|
_, span := trace.StartSpan(ctx, "consensus/Trust")
|
|
defer span.End()
|
|
|
|
css.trustedPeers.Store(pid, struct{}{})
|
|
if conman := css.host.ConnManager(); conman != nil {
|
|
conman.Protect(pid, connMgrTag)
|
|
}
|
|
css.peerManager.SetPriority(pid, 0)
|
|
addrs := css.host.Peerstore().Addrs(pid)
|
|
css.host.Peerstore().SetAddrs(pid, addrs, peerstore.PermanentAddrTTL)
|
|
return nil
|
|
}
|
|
|
|
// Distrust removes a peer from the "trusted" set.
|
|
func (css *Consensus) Distrust(ctx context.Context, pid peer.ID) error {
|
|
_, span := trace.StartSpan(ctx, "consensus/Distrust")
|
|
defer span.End()
|
|
|
|
css.trustedPeers.Delete(pid)
|
|
return nil
|
|
}
|
|
|
|
// LogPin adds a new pin to the shared state.
|
|
func (css *Consensus) LogPin(ctx context.Context, pin api.Pin) error {
|
|
ctx, span := trace.StartSpan(ctx, "consensus/LogPin")
|
|
defer span.End()
|
|
|
|
if css.config.batchingEnabled() {
|
|
select {
|
|
case css.batchItemCh <- batchItem{
|
|
ctx: ctx,
|
|
isPin: true,
|
|
pin: pin,
|
|
}:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("error pinning: %w", ErrMaxQueueSizeReached)
|
|
}
|
|
}
|
|
|
|
return css.state.Add(ctx, pin)
|
|
}
|
|
|
|
// LogUnpin removes a pin from the shared state.
|
|
func (css *Consensus) LogUnpin(ctx context.Context, pin api.Pin) error {
|
|
ctx, span := trace.StartSpan(ctx, "consensus/LogUnpin")
|
|
defer span.End()
|
|
|
|
if css.config.batchingEnabled() {
|
|
select {
|
|
case css.batchItemCh <- batchItem{
|
|
ctx: ctx,
|
|
isPin: false,
|
|
pin: pin,
|
|
}:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("error unpinning: %w", ErrMaxQueueSizeReached)
|
|
}
|
|
}
|
|
|
|
return css.state.Rm(ctx, pin.Cid)
|
|
}
|
|
|
|
// Launched in setup as a goroutine.
|
|
func (css *Consensus) batchWorker() {
|
|
maxSize := css.config.Batching.MaxBatchSize
|
|
maxAge := css.config.Batching.MaxBatchAge
|
|
batchCurSize := 0
|
|
// Create the timer but stop it. It will reset when
|
|
// items start arriving.
|
|
batchTimer := time.NewTimer(maxAge)
|
|
if !batchTimer.Stop() {
|
|
<-batchTimer.C
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-css.ctx.Done():
|
|
return
|
|
case batchItem := <-css.batchItemCh:
|
|
// First item in batch. Start the timer
|
|
if batchCurSize == 0 {
|
|
batchTimer.Reset(maxAge)
|
|
}
|
|
|
|
// Add/Rm from state
|
|
var err error
|
|
if batchItem.isPin {
|
|
err = css.batchingState.Add(batchItem.ctx, batchItem.pin)
|
|
} else {
|
|
err = css.batchingState.Rm(batchItem.ctx, batchItem.pin.Cid)
|
|
}
|
|
if err != nil {
|
|
logger.Errorf("error batching: %s (%s, isPin: %s)", err, batchItem.pin.Cid, batchItem.isPin)
|
|
continue
|
|
}
|
|
|
|
batchCurSize++
|
|
|
|
if batchCurSize < maxSize {
|
|
continue
|
|
}
|
|
|
|
if err := css.batchingState.Commit(css.ctx); err != nil {
|
|
logger.Errorf("error commiting batch after reaching max size: %s", err)
|
|
continue
|
|
}
|
|
logger.Infof("batch commit (size): %d items", maxSize)
|
|
|
|
// Stop timer and commit. Leave ready to reset on next
|
|
// item.
|
|
if !batchTimer.Stop() {
|
|
<-batchTimer.C
|
|
}
|
|
batchCurSize = 0
|
|
|
|
case <-batchTimer.C:
|
|
// Commit
|
|
if err := css.batchingState.Commit(css.ctx); err != nil {
|
|
logger.Errorf("error commiting batch after reaching max age: %s", err)
|
|
continue
|
|
}
|
|
logger.Infof("batch commit (max age): %d items", batchCurSize)
|
|
// timer is expired at this point, it will have to be
|
|
// reset.
|
|
batchCurSize = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
// Peers returns the current known peerset. It uses
|
|
// the monitor component and considers every peer with
|
|
// valid known metrics a member.
|
|
func (css *Consensus) Peers(ctx context.Context) ([]peer.ID, error) {
|
|
ctx, span := trace.StartSpan(ctx, "consensus/Peers")
|
|
defer span.End()
|
|
|
|
var metrics []api.Metric
|
|
|
|
err := css.rpcClient.CallContext(
|
|
ctx,
|
|
"",
|
|
"PeerMonitor",
|
|
"LatestMetrics",
|
|
css.config.PeersetMetric,
|
|
&metrics,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var peers []peer.ID
|
|
|
|
selfIncluded := false
|
|
for _, m := range metrics {
|
|
peers = append(peers, m.Peer)
|
|
if m.Peer == css.host.ID() {
|
|
selfIncluded = true
|
|
}
|
|
}
|
|
|
|
// Always include self
|
|
if !selfIncluded {
|
|
peers = append(peers, css.host.ID())
|
|
}
|
|
|
|
sort.Sort(peer.IDSlice(peers))
|
|
|
|
return peers, nil
|
|
}
|
|
|
|
// WaitForSync is a no-op as it is not necessary to be fully synced for the
|
|
// component to be usable.
|
|
func (css *Consensus) WaitForSync(ctx context.Context) error { return nil }
|
|
|
|
// AddPeer is a no-op as we do not need to do peerset management with
|
|
// Merkle-CRDTs. Therefore adding a peer to the peerset means doing nothing.
|
|
func (css *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
|
|
return nil
|
|
}
|
|
|
|
// RmPeer is a no-op which always errors, as, since we do not do peerset
|
|
// management, we also have no ability to remove a peer from it.
|
|
func (css *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
|
|
return ErrRmPeer
|
|
}
|
|
|
|
// State returns the cluster shared state. It will block until the consensus
|
|
// component is ready, shutdown or the given context has been cancelled.
|
|
func (css *Consensus) State(ctx context.Context) (state.ReadOnly, error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-css.ctx.Done():
|
|
return nil, css.ctx.Err()
|
|
case <-css.stateReady:
|
|
return css.state, nil
|
|
}
|
|
}
|
|
|
|
// Clean deletes all crdt-consensus datas from the datastore.
|
|
func (css *Consensus) Clean(ctx context.Context) error {
|
|
return Clean(ctx, css.config, css.store)
|
|
}
|
|
|
|
// Clean deletes all crdt-consensus datas from the given datastore.
|
|
func Clean(ctx context.Context, cfg *Config, store ds.Datastore) error {
|
|
logger.Info("cleaning all CRDT data from datastore")
|
|
q := query.Query{
|
|
Prefix: cfg.DatastoreNamespace,
|
|
KeysOnly: true,
|
|
}
|
|
|
|
results, err := store.Query(ctx, q)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer results.Close()
|
|
|
|
for r := range results.Next() {
|
|
if r.Error != nil {
|
|
return err
|
|
}
|
|
k := ds.NewKey(r.Key)
|
|
err := store.Delete(ctx, k)
|
|
if err != nil {
|
|
// do not die, continue cleaning
|
|
logger.Error(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Leader returns ErrNoLeader.
|
|
func (css *Consensus) Leader(ctx context.Context) (peer.ID, error) {
|
|
return "", ErrNoLeader
|
|
}
|
|
|
|
// OfflineState returns an offline, batching state using the given
|
|
// datastore. This allows to inspect and modify the shared state in offline
|
|
// mode.
|
|
func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error) {
|
|
batching, ok := store.(ds.Batching)
|
|
if !ok {
|
|
return nil, errors.New("must provide a Batching datastore")
|
|
}
|
|
opts := crdt.DefaultOptions()
|
|
opts.Logger = logger
|
|
|
|
var blocksDatastore ds.Batching = namespace.Wrap(
|
|
batching,
|
|
ds.NewKey(cfg.DatastoreNamespace).ChildString(blocksNs),
|
|
)
|
|
|
|
ipfs, err := ipfslite.New(
|
|
context.Background(),
|
|
blocksDatastore,
|
|
nil,
|
|
nil,
|
|
&ipfslite.Config{
|
|
Offline: true,
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
crdt, err := crdt.New(
|
|
batching,
|
|
ds.NewKey(cfg.DatastoreNamespace),
|
|
ipfs,
|
|
nil,
|
|
opts,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return dsstate.NewBatching(crdt, "", dsstate.DefaultHandle())
|
|
}
|