crdt: Allow to configure CRDT in "TrustAll" mode

Specifying "*" as part of "trusted_peers" in the configuration will
result in trusting all peers.

This is useful for private clusters where we don't want to list every
peer ID in the config.
This commit is contained in:
Hector Sanjuan 2019-06-10 13:35:25 +02:00
parent cded46f377
commit b349aacc83
6 changed files with 38 additions and 43 deletions

View File

@ -58,7 +58,7 @@ var testingRaftCfg = []byte(`{
var testingCrdtCfg = []byte(`{
"cluster_name": "crdt-test",
"trusted_peers": [],
"trusted_peers": ["*"],
"rebroadcast_interval": "150ms"
}`)

View File

@ -34,6 +34,10 @@ type Config struct {
// The topic we wish to subscribe to
ClusterName string
// TrustAll specifies whether we should trust all peers regardless of
// the TrustedPeers contents.
TrustAll bool
// Any update received from a peer outside this set is ignored and not
// forwarded. Trusted peers can also access additional RPC endpoints
// for this peer that are forbidden for other peers.
@ -100,9 +104,17 @@ func (cfg *Config) LoadJSON(raw []byte) error {
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
cfg.ClusterName = jcfg.ClusterName
cfg.TrustedPeers = api.StringsToPeers(jcfg.TrustedPeers)
if len(cfg.TrustedPeers) != len(jcfg.TrustedPeers) {
return errors.New("error parsing some peer IDs crdt.trusted_peers")
for _, p := range jcfg.TrustedPeers {
if p == "*" {
cfg.TrustAll = true
cfg.TrustedPeers = []peer.ID{}
break
}
pid, err := peer.IDB58Decode(p)
if err != nil {
return fmt.Errorf("error parsing trusted peers: %s", err)
}
cfg.TrustedPeers = append(cfg.TrustedPeers, pid)
}
config.SetIfNotDefault(jcfg.PeersetMetric, &cfg.PeersetMetric)
@ -153,6 +165,7 @@ func (cfg *Config) Default() error {
cfg.PeersetMetric = DefaultPeersetMetric
cfg.DatastoreNamespace = DefaultDatastoreNamespace
cfg.TrustedPeers = DefaultTrustedPeers
cfg.TrustAll = false
return nil
}

View File

@ -18,6 +18,9 @@ func TestLoadJSON(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if cfg.TrustAll {
t.Error("expected TrustAll to be false")
}
cfg = &Config{}
err = cfg.LoadJSON([]byte(`
@ -29,6 +32,19 @@ func TestLoadJSON(t *testing.T) {
if err == nil {
t.Fatal("expected error parsing trusted_peers")
}
cfg = &Config{}
err = cfg.LoadJSON([]byte(`
{
"cluster_name": "test",
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6", "*"]
}`))
if err != nil {
t.Fatal(err)
}
if !cfg.TrustAll {
t.Error("expected TrustAll to be true")
}
}
func TestToJSON(t *testing.T) {

View File

@ -308,9 +308,14 @@ func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
ctx, span := trace.StartSpan(ctx, "consensus/IsTrustedPeer")
defer span.End()
if css.config.TrustAll {
return true
}
if pid == css.host.ID() {
return true
}
_, ok := css.trustedPeers.Load(pid)
return ok
}

View File

@ -339,13 +339,6 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
// Start the rest and join
for i := 1; i < nClusters; i++ {
clusters[i] = createCluster(t, hosts[i], dhts[i], cfgs[i], stores[i], cons[i], apis[i], ipfss[i], trackers[i], mons[i], allocs[i], infs[i], tracers[i])
for j := 0; j < i; j++ {
// all previous clusters trust the new one
clusters[j].consensus.Trust(ctx, hosts[i].ID())
// new cluster trusts all the previous
clusters[i].consensus.Trust(ctx, hosts[j].ID())
}
err := clusters[i].Join(ctx, bootstrapAddr)
if err != nil {
logger.Error(err)

View File

@ -101,15 +101,6 @@ func TestClustersPeerAdd(t *testing.T) {
t.Log(i, id.ClusterPeers)
t.Fatal("cluster peers should be up to date with the cluster")
}
for j := 0; j < i; j++ {
if err := clusters[j].consensus.Trust(ctx, clusters[i].id); err != nil {
t.Fatal(err)
}
if err := clusters[i].consensus.Trust(ctx, clusters[j].id); err != nil {
t.Fatal(err)
}
}
}
h := test.Cid1
@ -523,15 +514,6 @@ func TestClustersPeerJoin(t *testing.T) {
}
for i := 1; i < len(clusters); i++ {
for j := 0; j < i; j++ {
if err := clusters[j].consensus.Trust(ctx, clusters[i].id); err != nil {
t.Fatal(err)
}
if err := clusters[i].consensus.Trust(ctx, clusters[j].id); err != nil {
t.Fatal(err)
}
}
err := clusters[i].Join(ctx, clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
@ -578,10 +560,6 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) {
}
f := func(t *testing.T, c *Cluster) {
if err := c.consensus.Trust(ctx, clusters[0].id); err != nil {
t.Fatal(err)
}
err := c.Join(ctx, clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
@ -674,15 +652,6 @@ func TestClustersPeerRejoin(t *testing.T) {
// add all clusters
for i := 1; i < len(clusters); i++ {
for j := 0; j < i; j++ {
if err := clusters[j].consensus.Trust(ctx, clusters[i].id); err != nil {
t.Fatal(err)
}
if err := clusters[i].consensus.Trust(ctx, clusters[j].id); err != nil {
t.Fatal(err)
}
}
err := clusters[i].Join(ctx, clusterAddr(clusters[0]))
if err != nil {
t.Fatal(err)
@ -730,7 +699,6 @@ func TestClustersPeerRejoin(t *testing.T) {
delay()
c0.consensus.Trust(ctx, clusters[1].id)
err = c0.Join(ctx, clusterAddr(clusters[1]))
if err != nil {
t.Fatal(err)