Merge pull request #1428 from iand/fix/dataraces

Fix data races
This commit is contained in:
Hector Sanjuan 2021-08-04 17:10:32 +02:00 committed by GitHub
commit 0c01079eca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 15 deletions

View File

@ -45,9 +45,7 @@ const (
maxAlerts = 1000 maxAlerts = 1000
) )
var ( var errFollowerMode = errors.New("this peer is configured to be in follower mode. Write operations are disabled")
errFollowerMode = errors.New("this peer is configured to be in follower mode. Write operations are disabled")
)
// Cluster is the main IPFS cluster component. It provides // Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system. // the go-API for it and orchestrates the components that make up the system.
@ -322,7 +320,6 @@ func (c *Cluster) pushInformerMetrics(ctx context.Context, informer Informer) {
} }
metric, err := c.sendInformerMetric(ctx, informer) metric, err := c.sendInformerMetric(ctx, informer)
if err != nil { if err != nil {
if (retries % retryWarnMod) == 0 { if (retries % retryWarnMod) == 0 {
logger.Errorf("error broadcasting metric: %s", err) logger.Errorf("error broadcasting metric: %s", err)
@ -392,9 +389,8 @@ func (c *Cluster) pushPingMetrics(ctx context.Context) {
// Alerts returns the last alerts recorded by this cluster peer with the most // Alerts returns the last alerts recorded by this cluster peer with the most
// recent first. // recent first.
func (c *Cluster) Alerts() []api.Alert { func (c *Cluster) Alerts() []api.Alert {
alerts := make([]api.Alert, len(c.alerts))
c.alertsMux.Lock() c.alertsMux.Lock()
alerts := make([]api.Alert, len(c.alerts))
{ {
total := len(alerts) total := len(alerts)
for i, a := range c.alerts { for i, a := range c.alerts {
@ -833,7 +829,7 @@ func (c *Cluster) ID(ctx context.Context) *api.ID {
id := &api.ID{ id := &api.ID{
ID: c.id, ID: c.id,
//PublicKey: c.host.Peerstore().PubKey(c.id), // PublicKey: c.host.Peerstore().PubKey(c.id),
Addresses: addrs, Addresses: addrs,
ClusterPeers: peers, ClusterPeers: peers,
ClusterPeersAddresses: addresses, ClusterPeersAddresses: addresses,
@ -1148,7 +1144,6 @@ func (c *Cluster) localPinInfoOp(
} }
// return the last pInfo/err, should be the root Cid if everything ok // return the last pInfo/err, should be the root Cid if everything ok
return pInfo, err return pInfo, err
} }
// RecoverAll triggers a RecoverAllLocal operation on all peers. // RecoverAll triggers a RecoverAllLocal operation on all peers.

View File

@ -5,6 +5,7 @@ package disk
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
@ -29,7 +30,9 @@ var logger = logging.Logger("diskinfo")
// Informer is a simple object to implement the ipfscluster.Informer // Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces. // and Component interfaces.
type Informer struct { type Informer struct {
config *Config config *Config // set when created, readonly
mu sync.Mutex // guards access to following fields
rpcClient *rpc.Client rpcClient *rpc.Client
} }
@ -53,6 +56,8 @@ func (disk *Informer) Name() string {
// SetClient provides us with an rpc.Client which allows // SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster. // contacting other components in the cluster.
func (disk *Informer) SetClient(c *rpc.Client) { func (disk *Informer) SetClient(c *rpc.Client) {
disk.mu.Lock()
defer disk.mu.Unlock()
disk.rpcClient = c disk.rpcClient = c
} }
@ -62,6 +67,9 @@ func (disk *Informer) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "informer/disk/Shutdown") _, span := trace.StartSpan(ctx, "informer/disk/Shutdown")
defer span.End() defer span.End()
disk.mu.Lock()
defer disk.mu.Unlock()
disk.rpcClient = nil disk.rpcClient = nil
return nil return nil
} }
@ -72,7 +80,11 @@ func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
ctx, span := trace.StartSpan(ctx, "informer/disk/GetMetric") ctx, span := trace.StartSpan(ctx, "informer/disk/GetMetric")
defer span.End() defer span.End()
if disk.rpcClient == nil { disk.mu.Lock()
rpcClient := disk.rpcClient
disk.mu.Unlock()
if rpcClient == nil {
return &api.Metric{ return &api.Metric{
Name: disk.Name(), Name: disk.Name(),
Valid: false, Valid: false,
@ -84,7 +96,7 @@ func (disk *Informer) GetMetric(ctx context.Context) *api.Metric {
valid := true valid := true
err := disk.rpcClient.CallContext( err := rpcClient.CallContext(
ctx, ctx,
"", "",
"IPFSConnector", "IPFSConnector",

View File

@ -40,9 +40,11 @@ type IpfsMock struct {
Port int Port int
pinMap state.State pinMap state.State
BlockStore map[string][]byte BlockStore map[string][]byte
reqCounts map[string]int
reqCounter chan string reqCounter chan string
reqCountsMux sync.Mutex // guards access to reqCounts
reqCounts map[string]int
closeMux sync.Mutex closeMux sync.Mutex
closed bool closed bool
} }
@ -142,18 +144,20 @@ func NewIpfsMock(t *testing.T) *IpfsMock {
m.Port = i m.Port = i
m.Addr = h[0] m.Addr = h[0]
return m return m
} }
func (m *IpfsMock) countRequests() { func (m *IpfsMock) countRequests() {
for str := range m.reqCounter { for str := range m.reqCounter {
m.reqCountsMux.Lock()
m.reqCounts[str]++ m.reqCounts[str]++
m.reqCountsMux.Unlock()
} }
} }
// GetCount allows to get the number of times and endpoint was called. // GetCount allows to get the number of times and endpoint was called.
// Do not use concurrently to requests happening.
func (m *IpfsMock) GetCount(path string) int { func (m *IpfsMock) GetCount(path string) int {
m.reqCountsMux.Lock()
defer m.reqCountsMux.Unlock()
return m.reqCounts[path] return m.reqCounts[path]
} }
@ -432,7 +436,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
resp := mockRepoStatResp{ resp := mockRepoStatResp{
RepoSize: uint64(len) * 1000, RepoSize: uint64(len) * 1000,
NumObjects: numObjs, NumObjects: numObjs,
StorageMax: 10000000000, //10 GB StorageMax: 10000000000, // 10 GB
} }
j, _ := json.Marshal(resp) j, _ := json.Marshal(resp)
w.Write(j) w.Write(j)