RPC auth: Support Trusted Peers in CRDT consensus component.
TrustedPeers are specified in the configuration. Additional peers can be added at runtime with Trust/Distrust functions. Unfortunately we cannot use consensus.PeerAdd as a way to trust a peer as cluster.PeerAdd+Join can be called by any peer and this calls consensus.PeerAdd. The result is consensus.PeerAdd doing a lot in Raft while consensus.Trust does nothing, while in CRDTs consensus.Trust does something but consensus.PeerAdd does nothing. But this is more or less consistent.
This commit is contained in:
parent
c5a2e7fdc5
commit
949e6f2364
|
@ -20,13 +20,14 @@ func PeersToStrings(peers []peer.ID) []string {
|
||||||
|
|
||||||
// StringsToPeers decodes peer.IDs from strings.
|
// StringsToPeers decodes peer.IDs from strings.
|
||||||
func StringsToPeers(strs []string) []peer.ID {
|
func StringsToPeers(strs []string) []peer.ID {
|
||||||
peers := make([]peer.ID, len(strs))
|
peers := []peer.ID{}
|
||||||
for i, p := range strs {
|
for _, p := range strs {
|
||||||
var err error
|
pid, err := peer.IDB58Decode(p)
|
||||||
peers[i], err = peer.IDB58Decode(p)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Debugf("'%s': %s", p, err)
|
logger.Debugf("'%s': %s", p, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
peers = append(peers, pid)
|
||||||
}
|
}
|
||||||
return peers
|
return peers
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,7 @@ var testingRaftCfg = []byte(`{
|
||||||
|
|
||||||
var testingCrdtCfg = []byte(`{
|
var testingCrdtCfg = []byte(`{
|
||||||
"cluster_name": "crdt-test",
|
"cluster_name": "crdt-test",
|
||||||
|
"trusted_peers": [],
|
||||||
"rebroadcast_interval": "150ms"
|
"rebroadcast_interval": "150ms"
|
||||||
}`)
|
}`)
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/kelseyhightower/envconfig"
|
"github.com/kelseyhightower/envconfig"
|
||||||
|
peer "github.com/libp2p/go-libp2p-peer"
|
||||||
|
|
||||||
|
"github.com/ipfs/ipfs-cluster/api"
|
||||||
"github.com/ipfs/ipfs-cluster/config"
|
"github.com/ipfs/ipfs-cluster/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,6 +22,7 @@ var (
|
||||||
DefaultPeersetMetric = "ping"
|
DefaultPeersetMetric = "ping"
|
||||||
DefaultDatastoreNamespace = "/c" // from "/crdt"
|
DefaultDatastoreNamespace = "/c" // from "/crdt"
|
||||||
DefaultRebroadcastInterval = time.Minute
|
DefaultRebroadcastInterval = time.Minute
|
||||||
|
DefaultTrustedPeers = []peer.ID{}
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config is the configuration object for Consensus.
|
// Config is the configuration object for Consensus.
|
||||||
|
@ -31,6 +34,11 @@ type Config struct {
|
||||||
// The topic we wish to subscribe to
|
// The topic we wish to subscribe to
|
||||||
ClusterName string
|
ClusterName string
|
||||||
|
|
||||||
|
// Any update received from a peer outside this set is ignored and not
|
||||||
|
// forwarded. Trusted peers can also access additional RPC endpoints
|
||||||
|
// for this peer that are forbidden for other peers.
|
||||||
|
TrustedPeers []peer.ID
|
||||||
|
|
||||||
// The interval before re-announcing the current state
|
// The interval before re-announcing the current state
|
||||||
// to the network when no activity is observed.
|
// to the network when no activity is observed.
|
||||||
RebroadcastInterval time.Duration
|
RebroadcastInterval time.Duration
|
||||||
|
@ -48,7 +56,9 @@ type Config struct {
|
||||||
|
|
||||||
type jsonConfig struct {
|
type jsonConfig struct {
|
||||||
ClusterName string `json:"cluster_name"`
|
ClusterName string `json:"cluster_name"`
|
||||||
|
TrustedPeers []string `json:"trusted_peers"`
|
||||||
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`
|
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`
|
||||||
|
|
||||||
PeersetMetric string `json:"peerset_metric,omitempty"`
|
PeersetMetric string `json:"peerset_metric,omitempty"`
|
||||||
DatastoreNamespace string `json:"datastore_namespace,omitempty"`
|
DatastoreNamespace string `json:"datastore_namespace,omitempty"`
|
||||||
}
|
}
|
||||||
|
@ -89,6 +99,12 @@ func (cfg *Config) LoadJSON(raw []byte) error {
|
||||||
|
|
||||||
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
||||||
cfg.ClusterName = jcfg.ClusterName
|
cfg.ClusterName = jcfg.ClusterName
|
||||||
|
|
||||||
|
cfg.TrustedPeers = api.StringsToPeers(jcfg.TrustedPeers)
|
||||||
|
if len(cfg.TrustedPeers) != len(jcfg.TrustedPeers) {
|
||||||
|
return errors.New("error parsing some peer IDs crdt.trusted_peers")
|
||||||
|
}
|
||||||
|
|
||||||
config.SetIfNotDefault(jcfg.PeersetMetric, &cfg.PeersetMetric)
|
config.SetIfNotDefault(jcfg.PeersetMetric, &cfg.PeersetMetric)
|
||||||
config.SetIfNotDefault(jcfg.DatastoreNamespace, &cfg.DatastoreNamespace)
|
config.SetIfNotDefault(jcfg.DatastoreNamespace, &cfg.DatastoreNamespace)
|
||||||
config.ParseDurations(
|
config.ParseDurations(
|
||||||
|
@ -108,6 +124,7 @@ func (cfg *Config) ToJSON() ([]byte, error) {
|
||||||
func (cfg *Config) toJSONConfig() *jsonConfig {
|
func (cfg *Config) toJSONConfig() *jsonConfig {
|
||||||
jcfg := &jsonConfig{
|
jcfg := &jsonConfig{
|
||||||
ClusterName: cfg.ClusterName,
|
ClusterName: cfg.ClusterName,
|
||||||
|
TrustedPeers: api.PeersToStrings(cfg.TrustedPeers),
|
||||||
PeersetMetric: "",
|
PeersetMetric: "",
|
||||||
RebroadcastInterval: "",
|
RebroadcastInterval: "",
|
||||||
}
|
}
|
||||||
|
@ -135,6 +152,7 @@ func (cfg *Config) Default() error {
|
||||||
cfg.RebroadcastInterval = DefaultRebroadcastInterval
|
cfg.RebroadcastInterval = DefaultRebroadcastInterval
|
||||||
cfg.PeersetMetric = DefaultPeersetMetric
|
cfg.PeersetMetric = DefaultPeersetMetric
|
||||||
cfg.DatastoreNamespace = DefaultDatastoreNamespace
|
cfg.DatastoreNamespace = DefaultDatastoreNamespace
|
||||||
|
cfg.TrustedPeers = DefaultTrustedPeers
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,8 @@ import (
|
||||||
|
|
||||||
var cfgJSON = []byte(`
|
var cfgJSON = []byte(`
|
||||||
{
|
{
|
||||||
"cluster_name": "test"
|
"cluster_name": "test",
|
||||||
|
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6"]
|
||||||
}
|
}
|
||||||
`)
|
`)
|
||||||
|
|
||||||
|
@ -17,6 +18,17 @@ func TestLoadJSON(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfg = &Config{}
|
||||||
|
err = cfg.LoadJSON([]byte(`
|
||||||
|
{
|
||||||
|
"cluster_name": "test",
|
||||||
|
"trusted_peers": ["abc"]
|
||||||
|
}`))
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error parsing trusted_peers")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestToJSON(t *testing.T) {
|
func TestToJSON(t *testing.T) {
|
||||||
|
|
|
@ -45,6 +45,8 @@ type Consensus struct {
|
||||||
|
|
||||||
config *Config
|
config *Config
|
||||||
|
|
||||||
|
trustedPeers sync.Map
|
||||||
|
|
||||||
host host.Host
|
host host.Host
|
||||||
|
|
||||||
store ds.Datastore
|
store ds.Datastore
|
||||||
|
@ -94,6 +96,11 @@ func New(
|
||||||
readyCh: make(chan struct{}, 1),
|
readyCh: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set up a fast-lookup trusted peers cache.
|
||||||
|
for _, p := range css.config.TrustedPeers {
|
||||||
|
css.Trust(ctx, p)
|
||||||
|
}
|
||||||
|
|
||||||
go css.setup()
|
go css.setup()
|
||||||
return css, nil
|
return css, nil
|
||||||
}
|
}
|
||||||
|
@ -122,8 +129,7 @@ func (css *Consensus) setup() {
|
||||||
err = css.pubsub.RegisterTopicValidator(
|
err = css.pubsub.RegisterTopicValidator(
|
||||||
topicName,
|
topicName,
|
||||||
func(ctx context.Context, p peer.ID, msg *pubsub.Message) bool {
|
func(ctx context.Context, p peer.ID, msg *pubsub.Message) bool {
|
||||||
// This is where peer authentication will go.
|
return css.IsTrustedPeer(ctx, p)
|
||||||
return true
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -279,7 +285,23 @@ func (css *Consensus) Ready(ctx context.Context) <-chan struct{} {
|
||||||
// IsTrustedPeer returns whether the given peer is taken into account
|
// IsTrustedPeer returns whether the given peer is taken into account
|
||||||
// when submitting updates to the consensus state.
|
// when submitting updates to the consensus state.
|
||||||
func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
|
func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
|
||||||
return true // TODO
|
if pid == css.host.ID() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
_, ok := css.trustedPeers.Load(pid)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trust marks a peer as "trusted".
|
||||||
|
func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error {
|
||||||
|
css.trustedPeers.Store(pid, struct{}{})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Distrust removes a peer from the "trusted" set.
|
||||||
|
func (css *Consensus) Distrust(ctx context.Context, pid peer.ID) error {
|
||||||
|
css.trustedPeers.Delete(pid)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogPin adds a new pin to the shared state.
|
// LogPin adds a new pin to the shared state.
|
||||||
|
@ -334,7 +356,9 @@ func (css *Consensus) WaitForSync(ctx context.Context) error { return nil }
|
||||||
|
|
||||||
// AddPeer is a no-op as we do not need to do peerset management with
|
// AddPeer is a no-op as we do not need to do peerset management with
|
||||||
// Merkle-CRDTs. Therefore adding a peer to the peerset means doing nothing.
|
// Merkle-CRDTs. Therefore adding a peer to the peerset means doing nothing.
|
||||||
func (css *Consensus) AddPeer(ctx context.Context, pid peer.ID) error { return nil }
|
func (css *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// RmPeer is a no-op which always errors, as, since we do not do peerset
|
// RmPeer is a no-op which always errors, as, since we do not do peerset
|
||||||
// management, we also have no ability to remove a peer from it.
|
// management, we also have no ability to remove a peer from it.
|
||||||
|
|
|
@ -211,6 +211,11 @@ func TestConsensusAddRmPeer(t *testing.T) {
|
||||||
t.Error("could not add peer:", err)
|
t.Error("could not add peer:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = cc2.Trust(ctx, cc.host.ID())
|
||||||
|
if err != nil {
|
||||||
|
t.Error("could not trust peer:", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Make a pin on peer1 and check it arrived to peer2
|
// Make a pin on peer1 and check it arrived to peer2
|
||||||
err = cc.LogPin(ctx, testPin(test.Cid1))
|
err = cc.LogPin(ctx, testPin(test.Cid1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -241,6 +241,12 @@ func (css *Consensus) IsTrustedPeer(ctx context.Context, p peer.ID) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Trust is a no-op.
|
||||||
|
func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error { return nil }
|
||||||
|
|
||||||
|
// Distrust is a no-op.
|
||||||
|
func (css *Consensus) Distrust(ctx context.Context, pid peer.ID) error { return nil }
|
||||||
|
|
||||||
func (cc *Consensus) op(ctx context.Context, pin *api.Pin, t LogOpType) *LogOp {
|
func (cc *Consensus) op(ctx context.Context, pin *api.Pin, t LogOpType) *LogOp {
|
||||||
return &LogOp{
|
return &LogOp{
|
||||||
Cid: pin,
|
Cid: pin,
|
||||||
|
|
|
@ -58,6 +58,10 @@ type Consensus interface {
|
||||||
// non-trusted one. This should be fast as it will be
|
// non-trusted one. This should be fast as it will be
|
||||||
// called repeteadly for every remote RPC request.
|
// called repeteadly for every remote RPC request.
|
||||||
IsTrustedPeer(context.Context, peer.ID) bool
|
IsTrustedPeer(context.Context, peer.ID) bool
|
||||||
|
// Trust marks a peer as "trusted"
|
||||||
|
Trust(context.Context, peer.ID) error
|
||||||
|
// Distrust removes a peer from the "trusted" set
|
||||||
|
Distrust(context.Context, peer.ID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// API is a component which offers an API for Cluster. This is
|
// API is a component which offers an API for Cluster. This is
|
||||||
|
|
Loading…
Reference in New Issue
Block a user