2019-02-20 14:24:25 +00:00
package crdt
import (
"context"
"errors"
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
"fmt"
2019-08-26 12:57:17 +00:00
"sort"
2019-02-20 14:24:25 +00:00
"sync"
2019-05-21 09:55:48 +00:00
"time"
2019-02-20 14:24:25 +00:00
2020-03-13 20:40:02 +00:00
"github.com/ipfs/go-cid"
2019-02-20 14:24:25 +00:00
"github.com/ipfs/ipfs-cluster/api"
Fix #787: Connectivity fixes
Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.
We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.
For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.
This introduces a number of changes:
* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
* Protect the trusted Peer ID in the conn manager
* Give top priority in the PeerManager to that Peer (see below)
* Mark addresses as permanent in the Peerstore
The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.
This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.
All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
2019-05-23 16:41:33 +00:00
"github.com/ipfs/ipfs-cluster/pstoremgr"
2019-02-20 14:24:25 +00:00
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/dsstate"
ds "github.com/ipfs/go-datastore"
2019-06-14 10:41:11 +00:00
namespace "github.com/ipfs/go-datastore/namespace"
2019-02-20 14:24:25 +00:00
query "github.com/ipfs/go-datastore/query"
crdt "github.com/ipfs/go-ds-crdt"
2019-06-05 17:31:51 +00:00
dshelp "github.com/ipfs/go-ipfs-ds-help"
2020-03-13 20:40:02 +00:00
logging "github.com/ipfs/go-log/v2"
2019-06-14 10:41:11 +00:00
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
2020-04-14 20:03:24 +00:00
"github.com/libp2p/go-libp2p-core/routing"
2019-02-20 14:24:25 +00:00
rpc "github.com/libp2p/go-libp2p-gorpc"
pubsub "github.com/libp2p/go-libp2p-pubsub"
2019-06-05 17:31:51 +00:00
multihash "github.com/multiformats/go-multihash"
ipfslite "github.com/hsanjuan/ipfs-lite"
2019-06-14 10:41:11 +00:00
trace "go.opencensus.io/trace"
2019-02-20 14:24:25 +00:00
)
var logger = logging . Logger ( "crdt" )
var (
Fix #787: Connectivity fixes
Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.
We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.
For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.
This introduces a number of changes:
* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
* Protect the trusted Peer ID in the conn manager
* Give top priority in the PeerManager to that Peer (see below)
* Mark addresses as permanent in the Peerstore
The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.
This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.
All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
2019-05-23 16:41:33 +00:00
blocksNs = "b" // blockstore namespace
connMgrTag = "crdt"
2019-02-20 14:24:25 +00:00
)
// Common variables for the module.
var (
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
ErrNoLeader = errors . New ( "crdt consensus component does not provide a leader" )
ErrRmPeer = errors . New ( "crdt consensus component cannot remove peers" )
ErrMaxQueueSizeReached = errors . New ( "batching max_queue_size reached. Too many operations are waiting to be batched. Try increasing the max_queue_size or adjusting the batching options" )
2019-02-20 14:24:25 +00:00
)
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
// wraps pins so that they can be batched.
type batchItem struct {
ctx context . Context
isPin bool // pin or unpin
pin * api . Pin
}
2019-02-20 14:24:25 +00:00
// Consensus implement ipfscluster.Consensus and provides the facility to add
// and remove pins from the Cluster shared state. It uses a CRDT-backed
// implementation of go-datastore (go-ds-crdt).
type Consensus struct {
ctx context . Context
cancel context . CancelFunc
config * Config
2019-05-09 17:48:40 +00:00
trustedPeers sync . Map
Fix #787: Connectivity fixes
Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.
We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.
For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.
This introduces a number of changes:
* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
* Protect the trusted Peer ID in the conn manager
* Give top priority in the PeerManager to that Peer (see below)
* Mark addresses as permanent in the Peerstore
The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.
This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.
All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
2019-05-23 16:41:33 +00:00
host host . Host
peerManager * pstoremgr . Manager
2019-02-20 14:24:25 +00:00
store ds . Datastore
namespace ds . Key
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
state state . State
batchingState state . BatchingState
crdt * crdt . Datastore
ipfs * ipfslite . Peer
2019-02-20 14:24:25 +00:00
2020-04-14 20:03:24 +00:00
dht routing . Routing
2019-02-20 14:24:25 +00:00
pubsub * pubsub . PubSub
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
rpcClient * rpc . Client
rpcReady chan struct { }
stateReady chan struct { }
readyCh chan struct { }
batchItemCh chan batchItem
2019-02-20 14:24:25 +00:00
shutdownLock sync . RWMutex
shutdown bool
}
// New creates a new crdt Consensus component. The given PubSub will be used to
// broadcast new heads. The given thread-safe datastore will be used to persist
// data and all will be prefixed with cfg.DatastoreNamespace.
func New (
host host . Host ,
2020-04-14 20:03:24 +00:00
dht routing . Routing ,
2019-02-20 14:24:25 +00:00
pubsub * pubsub . PubSub ,
cfg * Config ,
store ds . Datastore ,
) ( * Consensus , error ) {
err := cfg . Validate ( )
if err != nil {
return nil , err
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
2019-06-07 21:55:35 +00:00
var blocksDatastore ds . Batching
ns := ds . NewKey ( cfg . DatastoreNamespace )
blocksDatastore = namespace . Wrap ( store , ns . ChildString ( blocksNs ) )
ipfs , err := ipfslite . New (
ctx ,
blocksDatastore ,
host ,
dht ,
& ipfslite . Config {
Offline : false ,
} ,
)
if err != nil {
logger . Errorf ( "error creating ipfs-lite: %s" , err )
cancel ( )
return nil , err
}
2019-02-20 14:24:25 +00:00
css := & Consensus {
Fix #787: Connectivity fixes
Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.
We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.
For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.
This introduces a number of changes:
* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
* Protect the trusted Peer ID in the conn manager
* Give top priority in the PeerManager to that Peer (see below)
* Mark addresses as permanent in the Peerstore
The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.
This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.
All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
2019-05-23 16:41:33 +00:00
ctx : ctx ,
cancel : cancel ,
config : cfg ,
host : host ,
peerManager : pstoremgr . New ( ctx , host , "" ) ,
dht : dht ,
store : store ,
2019-06-07 21:55:35 +00:00
ipfs : ipfs ,
namespace : ns ,
Fix #787: Connectivity fixes
Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.
We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.
For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.
This introduces a number of changes:
* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
* Protect the trusted Peer ID in the conn manager
* Give top priority in the PeerManager to that Peer (see below)
* Mark addresses as permanent in the Peerstore
The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.
This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.
All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
2019-05-23 16:41:33 +00:00
pubsub : pubsub ,
rpcReady : make ( chan struct { } , 1 ) ,
readyCh : make ( chan struct { } , 1 ) ,
2019-12-16 13:22:09 +00:00
stateReady : make ( chan struct { } , 1 ) ,
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
batchItemCh : make ( chan batchItem , cfg . Batching . MaxQueueSize ) ,
2019-02-20 14:24:25 +00:00
}
go css . setup ( )
return css , nil
}
func ( css * Consensus ) setup ( ) {
select {
case <- css . ctx . Done ( ) :
return
case <- css . rpcReady :
}
2019-05-27 11:59:01 +00:00
// Set up a fast-lookup trusted peers cache.
// Protect these peers in the ConnMgr
for _ , p := range css . config . TrustedPeers {
css . Trust ( css . ctx , p )
}
2019-02-20 14:24:25 +00:00
// Hash the cluster name and produce the topic name from there
// as a way to avoid pubsub topic collisions with other
// pubsub applications potentially when both potentially use
// simple names like "test".
topicName := css . config . ClusterName
topicHash , err := multihash . Sum ( [ ] byte ( css . config . ClusterName ) , multihash . MD5 , - 1 )
if err != nil {
logger . Errorf ( "error hashing topic: %s" , err )
} else {
topicName = topicHash . B58String ( )
}
// Validate pubsub messages for our topic (only accept
// from trusted sources)
err = css . pubsub . RegisterTopicValidator (
topicName ,
2021-01-13 19:52:31 +00:00
func ( ctx context . Context , _ peer . ID , msg * pubsub . Message ) bool {
signer := msg . GetFrom ( )
trusted := css . IsTrustedPeer ( ctx , signer )
if ! trusted {
logger . Debug ( "discarded pubsub message from non trusted source %s " , signer )
}
return trusted
2019-02-20 14:24:25 +00:00
} ,
)
if err != nil {
logger . Errorf ( "error registering topic validator: %s" , err )
}
broadcaster , err := crdt . NewPubSubBroadcaster (
css . ctx ,
css . pubsub ,
topicName , // subscription name
)
if err != nil {
logger . Errorf ( "error creating broadcaster: %s" , err )
return
}
opts := crdt . DefaultOptions ( )
opts . RebroadcastInterval = css . config . RebroadcastInterval
2019-09-12 17:22:52 +00:00
opts . DAGSyncerTimeout = 2 * time . Minute
2019-02-20 14:24:25 +00:00
opts . Logger = logger
opts . PutHook = func ( k ds . Key , v [ ] byte ) {
2019-05-31 02:56:33 +00:00
ctx , span := trace . StartSpan ( css . ctx , "crdt/PutHook" )
defer span . End ( )
2019-02-20 14:24:25 +00:00
pin := & api . Pin { }
err := pin . ProtoUnmarshal ( v )
if err != nil {
logger . Error ( err )
return
}
// TODO: tracing for this context
err = css . rpcClient . CallContext (
2019-05-31 02:56:33 +00:00
ctx ,
2019-02-20 14:24:25 +00:00
"" ,
2019-05-04 20:36:10 +00:00
"PinTracker" ,
2019-02-20 14:24:25 +00:00
"Track" ,
pin ,
& struct { } { } ,
)
if err != nil {
logger . Error ( err )
}
2018-08-15 10:30:00 +00:00
logger . Infof ( "new pin added: %s" , pin . Cid )
2019-02-20 14:24:25 +00:00
}
opts . DeleteHook = func ( k ds . Key ) {
2019-05-31 02:56:33 +00:00
ctx , span := trace . StartSpan ( css . ctx , "crdt/DeleteHook" )
defer span . End ( )
2020-03-13 20:40:02 +00:00
kb , err := dshelp . BinaryFromDsKey ( k )
2019-02-20 14:24:25 +00:00
if err != nil {
logger . Error ( err , k )
return
}
2020-03-13 20:40:02 +00:00
c , err := cid . Cast ( kb )
if err != nil {
logger . Error ( err , k )
return
}
2019-02-20 14:24:25 +00:00
pin := api . PinCid ( c )
err = css . rpcClient . CallContext (
2019-05-31 02:56:33 +00:00
ctx ,
2019-02-20 14:24:25 +00:00
"" ,
2019-05-04 20:36:10 +00:00
"PinTracker" ,
2019-02-20 14:24:25 +00:00
"Untrack" ,
pin ,
& struct { } { } ,
)
if err != nil {
logger . Error ( err )
}
2018-08-15 10:30:00 +00:00
logger . Infof ( "pin removed: %s" , c )
2019-02-20 14:24:25 +00:00
}
crdt , err := crdt . New (
css . store ,
css . namespace ,
2019-06-07 21:55:35 +00:00
css . ipfs ,
2019-02-20 14:24:25 +00:00
broadcaster ,
opts ,
)
if err != nil {
logger . Error ( err )
return
}
css . crdt = crdt
clusterState , err := dsstate . New (
css . crdt ,
// unsure if we should set something else but crdt is already
// namespaced and this would only namespace the keys, which only
// complicates things.
"" ,
dsstate . DefaultHandle ( ) ,
)
if err != nil {
logger . Errorf ( "error creating cluster state datastore: %s" , err )
return
}
css . state = clusterState
2019-08-24 13:45:25 +00:00
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
batchingState , err := dsstate . NewBatching (
css . crdt ,
"" ,
dsstate . DefaultHandle ( ) ,
)
if err != nil {
logger . Errorf ( "error creating cluster state batching datastore: %s" , err )
return
}
css . batchingState = batchingState
2019-08-24 13:45:25 +00:00
if css . config . TrustAll {
logger . Info ( "'trust all' mode enabled. Any peer in the cluster can modify the pinset." )
}
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
// launch batching workers
if css . config . batchingEnabled ( ) {
logger . Infof ( "'crdt batching' enabled: %d items / %s" ,
css . config . Batching . MaxBatchSize ,
css . config . Batching . MaxBatchAge . String ( ) ,
)
go css . batchWorker ( )
}
2019-12-16 13:22:09 +00:00
// notifies State() it is safe to return
close ( css . stateReady )
2019-02-20 14:24:25 +00:00
css . readyCh <- struct { } { }
}
2019-10-31 20:51:13 +00:00
// Shutdown closes this component, cancelling the pubsub subscription and
// closing the datastore.
2019-02-20 14:24:25 +00:00
func ( css * Consensus ) Shutdown ( ctx context . Context ) error {
css . shutdownLock . Lock ( )
defer css . shutdownLock . Unlock ( )
if css . shutdown {
logger . Debug ( "already shutdown" )
return nil
}
logger . Info ( "stopping Consensus component" )
css . cancel ( )
// Only close crdt after cancelling the context, otherwise
// the pubsub broadcaster stays on and locks it.
if crdt := css . crdt ; crdt != nil {
crdt . Close ( )
}
if css . config . hostShutdown {
css . host . Close ( )
}
css . shutdown = true
close ( css . rpcReady )
return nil
}
// SetClient gives the component the ability to communicate and
// leaves it ready to use.
func ( css * Consensus ) SetClient ( c * rpc . Client ) {
css . rpcClient = c
css . rpcReady <- struct { } { }
}
// Ready returns a channel which is signalled when the component
// is ready to use.
func ( css * Consensus ) Ready ( ctx context . Context ) <- chan struct { } {
return css . readyCh
}
2019-05-09 13:14:26 +00:00
// IsTrustedPeer returns whether the given peer is taken into account
// when submitting updates to the consensus state.
func ( css * Consensus ) IsTrustedPeer ( ctx context . Context , pid peer . ID ) bool {
2020-04-14 17:58:00 +00:00
_ , span := trace . StartSpan ( ctx , "consensus/IsTrustedPeer" )
2019-05-31 02:58:26 +00:00
defer span . End ( )
2019-06-10 11:35:25 +00:00
if css . config . TrustAll {
return true
}
2019-05-09 17:48:40 +00:00
if pid == css . host . ID ( ) {
return true
}
2019-06-10 11:35:25 +00:00
2019-05-09 17:48:40 +00:00
_ , ok := css . trustedPeers . Load ( pid )
return ok
}
Fix #787: Connectivity fixes
Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.
We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.
For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.
This introduces a number of changes:
* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
* Protect the trusted Peer ID in the conn manager
* Give top priority in the PeerManager to that Peer (see below)
* Mark addresses as permanent in the Peerstore
The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.
This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.
All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
2019-05-23 16:41:33 +00:00
// Trust marks a peer as "trusted". It makes sure it is trusted as issuer
// for pubsub updates, it is protected in the connection manager, it
// has the highest priority when the peerstore is saved, and it's addresses
// are always remembered.
2019-05-09 17:48:40 +00:00
func ( css * Consensus ) Trust ( ctx context . Context , pid peer . ID ) error {
2020-04-14 17:58:00 +00:00
_ , span := trace . StartSpan ( ctx , "consensus/Trust" )
2019-05-31 02:58:26 +00:00
defer span . End ( )
2019-05-09 17:48:40 +00:00
css . trustedPeers . Store ( pid , struct { } { } )
Fix #787: Connectivity fixes
Currently, unless doing Join() (--bootstrap), we do not connect to any peers on startup.
We however loaded up the peerstore file and Raft will automatically connect
older peers to figure out who is the leader etc. DHT bootstrap, after Raft
was working, did the rest.
For CRDTs we need to connect to people on a normal boot as otherwise, unless
bootstrapping, this does not happen, even if the peerstore contains known peers.
This introduces a number of changes:
* Move peerstore file management back inside the Cluster component, which was
already in charge of saving the peerstore file.
* We keep saving all "known addresses" but we load them with a non permanent
TTL, so that there will be clean up of peers we're not connected to for long.
* "Bootstrap" (connect) to a small number of peers during Cluster component creation.
* Bootstrap the DHT asap after this, so that other cluster components can
initialize with a working peer discovery mechanism.
* CRDT Trust() method will now:
* Protect the trusted Peer ID in the conn manager
* Give top priority in the PeerManager to that Peer (see below)
* Mark addresses as permanent in the Peerstore
The PeerManager now attaches priorities to peers when importing them and is
able to order them according to that priority. The result is that peers with
high priority are saved first in the peerstore file. When we load the peerstore
file, the first entries in it are given the highest priority.
This means that during startup we will connect to "trusted peers" first
(because they have been tagged with priority in the previous run and saved at
the top of the list). Once connected to a small number of peers, we let the
DHT bootstrap process in the background do the rest and discover the network.
All this makes the peerstore file a "bootstrap" list for CRDTs and we will attempt
to connect to peers on that list until some of those connections succeed.
2019-05-23 16:41:33 +00:00
if conman := css . host . ConnManager ( ) ; conman != nil {
conman . Protect ( pid , connMgrTag )
}
css . peerManager . SetPriority ( pid , 0 )
addrs := css . host . Peerstore ( ) . Addrs ( pid )
css . host . Peerstore ( ) . SetAddrs ( pid , addrs , peerstore . PermanentAddrTTL )
2019-05-09 17:48:40 +00:00
return nil
}
// Distrust removes a peer from the "trusted" set.
func ( css * Consensus ) Distrust ( ctx context . Context , pid peer . ID ) error {
2020-04-14 17:58:00 +00:00
_ , span := trace . StartSpan ( ctx , "consensus/Distrust" )
2019-05-31 02:58:26 +00:00
defer span . End ( )
2019-05-09 17:48:40 +00:00
css . trustedPeers . Delete ( pid )
return nil
2019-05-09 13:14:26 +00:00
}
2019-02-20 14:24:25 +00:00
// LogPin adds a new pin to the shared state.
func ( css * Consensus ) LogPin ( ctx context . Context , pin * api . Pin ) error {
2019-05-31 02:56:33 +00:00
ctx , span := trace . StartSpan ( ctx , "consensus/LogPin" )
defer span . End ( )
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
if css . config . batchingEnabled ( ) {
select {
case css . batchItemCh <- batchItem {
ctx : ctx ,
isPin : true ,
pin : pin ,
} :
return nil
default :
2021-05-12 13:26:51 +00:00
return fmt . Errorf ( "error pinning: %w" , ErrMaxQueueSizeReached )
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
}
}
2019-02-20 14:24:25 +00:00
return css . state . Add ( ctx , pin )
}
// LogUnpin removes a pin from the shared state.
func ( css * Consensus ) LogUnpin ( ctx context . Context , pin * api . Pin ) error {
2019-05-31 02:58:26 +00:00
ctx , span := trace . StartSpan ( ctx , "consensus/LogUnpin" )
defer span . End ( )
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
if css . config . batchingEnabled ( ) {
select {
case css . batchItemCh <- batchItem {
ctx : ctx ,
isPin : false ,
pin : pin ,
} :
return nil
default :
return fmt . Errorf ( "error unpinning: %w" , ErrMaxQueueSizeReached )
}
}
2019-02-20 14:24:25 +00:00
return css . state . Rm ( ctx , pin . Cid )
}
Feat #1008: Support pin-batching with CRDT consensus.
This adds batching support to crdt-consensus per #1008 . The crdt component can now take
advantage of the BatchingState, which uses the batching-crdt datastore. In
batching mode, the crdt datastore groups any Add and Delete operations
in a single delta (instead of just 1, as it does by default).
Batching is enabled in the crdt configuration section by setting MaxBatchSize
**and** MaxBatchAge. These two settings control when a batch is committed,
either by reaching a maximum number of pin/unpin operations, or by reaching a
maximum age.
Batching unlocks large pin-ingestion scalability for clusters, but should be
set according to expected work loads. An additional, hidden MaxQueueSize
parameter provides the ability to perform backpressure on Pin/Unpin
requests. When more than MaxQueueSize pin/unpins are waiting to be included in
a batch, the LogPin/LogUnpin operations will fail. If this happens, it is
means cluster cannot commit batches as fast as pins are arriving. Thus,
MaxQueueSize should be increase (to accommodate bursts), or the batch size
increased (to perform less commits and hopefully handle the requests faster).
Note that the underlying CRDT library will auto-commit when batch deltas reach
1MB of size.
2021-04-28 22:51:01 +00:00
// Launched in setup as a goroutine.
func ( css * Consensus ) batchWorker ( ) {
maxSize := css . config . Batching . MaxBatchSize
maxAge := css . config . Batching . MaxBatchAge
batchCurSize := 0
// Create the timer but stop it. It will reset when
// items start arriving.
batchTimer := time . NewTimer ( maxAge )
if ! batchTimer . Stop ( ) {
<- batchTimer . C
}
for {
select {
case <- css . ctx . Done ( ) :
return
case batchItem := <- css . batchItemCh :
// First item in batch. Start the timer
if batchCurSize == 0 {
batchTimer . Reset ( maxAge )
}
// Add/Rm from state
var err error
if batchItem . isPin {
err = css . batchingState . Add ( batchItem . ctx , batchItem . pin )
} else {
err = css . batchingState . Rm ( batchItem . ctx , batchItem . pin . Cid )
}
if err != nil {
logger . Errorf ( "error batching: %s (%s, isPin: %s)" , err , batchItem . pin . Cid , batchItem . isPin )
continue
}
batchCurSize ++
if batchCurSize < maxSize {
continue
}
if err := css . batchingState . Commit ( css . ctx ) ; err != nil {
logger . Errorf ( "error commiting batch after reaching max size: %s" , err )
continue
}
logger . Debugf ( "batch commit (size): %d items" , maxSize )
// Stop timer and commit. Leave ready to reset on next
// item.
if ! batchTimer . Stop ( ) {
<- batchTimer . C
}
batchCurSize = 0
case <- batchTimer . C :
// Commit
if err := css . batchingState . Commit ( css . ctx ) ; err != nil {
logger . Errorf ( "error commiting batch after reaching max age: %s" , err )
continue
}
logger . Debugf ( "batch commit (max age): %d items" , batchCurSize )
// timer is expired at this point, it will have to be
// reset.
batchCurSize = 0
}
}
}
2019-02-20 14:24:25 +00:00
// Peers returns the current known peerset. It uses
// the monitor component and considers every peer with
// valid known metrics a member.
func ( css * Consensus ) Peers ( ctx context . Context ) ( [ ] peer . ID , error ) {
2019-05-31 02:58:26 +00:00
ctx , span := trace . StartSpan ( ctx , "consensus/Peers" )
defer span . End ( )
2019-02-20 14:24:25 +00:00
var metrics [ ] * api . Metric
err := css . rpcClient . CallContext (
ctx ,
"" ,
2019-05-04 20:36:10 +00:00
"PeerMonitor" ,
"LatestMetrics" ,
2019-02-20 14:24:25 +00:00
css . config . PeersetMetric ,
& metrics ,
)
if err != nil {
return nil , err
}
var peers [ ] peer . ID
selfIncluded := false
for _ , m := range metrics {
peers = append ( peers , m . Peer )
if m . Peer == css . host . ID ( ) {
selfIncluded = true
}
}
// Always include self
if ! selfIncluded {
peers = append ( peers , css . host . ID ( ) )
}
2019-08-26 12:57:17 +00:00
sort . Sort ( peer . IDSlice ( peers ) )
2019-02-20 14:24:25 +00:00
return peers , nil
}
// WaitForSync is a no-op as it is not necessary to be fully synced for the
// component to be usable.
func ( css * Consensus ) WaitForSync ( ctx context . Context ) error { return nil }
// AddPeer is a no-op as we do not need to do peerset management with
// Merkle-CRDTs. Therefore adding a peer to the peerset means doing nothing.
2019-05-09 17:48:40 +00:00
func ( css * Consensus ) AddPeer ( ctx context . Context , pid peer . ID ) error {
return nil
}
2019-02-20 14:24:25 +00:00
// RmPeer is a no-op which always errors, as, since we do not do peerset
// management, we also have no ability to remove a peer from it.
func ( css * Consensus ) RmPeer ( ctx context . Context , pid peer . ID ) error {
return ErrRmPeer
}
2019-12-16 13:22:09 +00:00
// State returns the cluster shared state. It will block until the consensus
// component is ready, shutdown or the given context has been cancelled.
func ( css * Consensus ) State ( ctx context . Context ) ( state . ReadOnly , error ) {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
case <- css . ctx . Done ( ) :
2019-12-16 14:43:38 +00:00
return nil , css . ctx . Err ( )
2019-12-16 13:22:09 +00:00
case <- css . stateReady :
return css . state , nil
}
}
2019-02-20 14:24:25 +00:00
// Clean deletes all crdt-consensus datas from the datastore.
func ( css * Consensus ) Clean ( ctx context . Context ) error {
return Clean ( ctx , css . config , css . store )
}
// Clean deletes all crdt-consensus datas from the given datastore.
func Clean ( ctx context . Context , cfg * Config , store ds . Datastore ) error {
logger . Info ( "cleaning all CRDT data from datastore" )
q := query . Query {
Prefix : cfg . DatastoreNamespace ,
KeysOnly : true ,
}
results , err := store . Query ( q )
if err != nil {
return err
}
defer results . Close ( )
for r := range results . Next ( ) {
if r . Error != nil {
return err
}
k := ds . NewKey ( r . Key )
err := store . Delete ( k )
if err != nil {
// do not die, continue cleaning
logger . Error ( err )
}
}
return nil
}
// Leader returns ErrNoLeader.
func ( css * Consensus ) Leader ( ctx context . Context ) ( peer . ID , error ) {
return "" , ErrNoLeader
}
2019-12-16 12:21:51 +00:00
// OfflineState returns an offline, batching state using the given
// datastore. This allows to inspect and modify the shared state in offline
// mode.
2019-02-20 14:24:25 +00:00
func OfflineState ( cfg * Config , store ds . Datastore ) ( state . BatchingState , error ) {
batching , ok := store . ( ds . Batching )
if ! ok {
2019-12-12 20:22:54 +00:00
return nil , errors . New ( "must provide a Batching datastore" )
2019-02-20 14:24:25 +00:00
}
opts := crdt . DefaultOptions ( )
opts . Logger = logger
2020-04-14 17:58:00 +00:00
var blocksDatastore ds . Batching = namespace . Wrap (
2019-02-20 14:24:25 +00:00
batching ,
ds . NewKey ( cfg . DatastoreNamespace ) . ChildString ( blocksNs ) ,
)
ipfs , err := ipfslite . New (
context . Background ( ) ,
blocksDatastore ,
nil ,
nil ,
& ipfslite . Config {
Offline : true ,
} ,
)
if err != nil {
return nil , err
}
crdt , err := crdt . New (
batching ,
ds . NewKey ( cfg . DatastoreNamespace ) ,
2019-06-07 21:33:52 +00:00
ipfs ,
2019-02-20 14:24:25 +00:00
nil ,
opts ,
)
if err != nil {
return nil , err
}
return dsstate . NewBatching ( crdt , "" , dsstate . DefaultHandle ( ) )
}