Merge pull request #1768 from ipfs-cluster/fix/bad-context-propagation

Fix bad context propagation / deadlocks
This commit is contained in:
Hector Sanjuan 2022-09-26 19:36:13 +02:00 committed by GitHub
commit 048d168126
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 46 additions and 78 deletions

View File

@ -82,7 +82,6 @@ func (c *defaultClient) doRequest(
r.ContentLength = -1 // this lets go use "chunked".
}
ctx = trace.NewContext(ctx, span)
r = r.WithContext(ctx)
return c.client.Do(r)

View File

@ -86,7 +86,7 @@ type Cluster struct {
paMux sync.Mutex
// shutdown function and related variables
shutdownLock sync.Mutex
shutdownLock sync.RWMutex
shutdownB bool
removed bool
@ -778,9 +778,8 @@ func (c *Cluster) Ready() <-chan struct{} {
// Shutdown does not close the libp2p host, the DHT, the datastore or
// generally anything that Cluster did not create.
func (c *Cluster) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "cluster/Shutdown")
ctx, span := trace.StartSpan(ctx, "cluster/Shutdown")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
@ -889,9 +888,8 @@ func (c *Cluster) Done() <-chan struct{} {
// ID returns information about the Cluster peer
func (c *Cluster) ID(ctx context.Context) api.ID {
_, span := trace.StartSpan(ctx, "cluster/ID")
ctx, span := trace.StartSpan(ctx, "cluster/ID")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
// ignore error since it is included in response object
ipfsID, err := c.ipfs.ID(ctx)
@ -957,9 +955,14 @@ func (c *Cluster) ID(ctx context.Context) api.ID {
// The new peer ID will be passed to the consensus
// component to be added to the peerset.
func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
_, span := trace.StartSpan(ctx, "cluster/PeerAdd")
ctx, span := trace.StartSpan(ctx, "cluster/PeerAdd")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
c.shutdownLock.RLock()
defer c.shutdownLock.RUnlock()
if c.shutdownB {
return nil, errors.New("cluster is shutdown")
}
// starting 10 nodes on the same box for testing
// causes deadlock and a global lock here
@ -992,9 +995,8 @@ func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
// The peer will be removed from the consensus peerset.
// This may first trigger repinnings for all content if not disabled.
func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
_, span := trace.StartSpan(ctx, "cluster/PeerRemove")
ctx, span := trace.StartSpan(ctx, "cluster/PeerRemove")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
// We need to repin before removing the peer, otherwise, it won't
// be able to submit the pins.
@ -1015,9 +1017,8 @@ func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
// cluster and making sure that the new peer is ready to discover and contact
// the rest.
func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
_, span := trace.StartSpan(ctx, "cluster/Join")
ctx, span := trace.StartSpan(ctx, "cluster/Join")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
logger.Debugf("Join(%s)", addr)
@ -1100,7 +1101,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
// ConnectSwarms in the background after a while, when we have likely
// received some metrics.
time.AfterFunc(c.config.MonitorPingInterval, func() {
c.ipfs.ConnectSwarms(ctx)
c.ipfs.ConnectSwarms(c.ctx)
})
// wait for leader and for state to catch up
@ -1118,7 +1119,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
for range out {
}
}()
go c.RecoverAllLocal(ctx, out)
go c.RecoverAllLocal(c.ctx, out)
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
return nil
@ -1146,11 +1147,10 @@ func (c *Cluster) distances(ctx context.Context, exclude peer.ID) (*distanceChec
// - Sends unpin for expired items for which this peer is "closest"
// (skipped for follower peers)
func (c *Cluster) StateSync(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "cluster/StateSync")
ctx, span := trace.StartSpan(ctx, "cluster/StateSync")
defer span.End()
logger.Debug("StateSync")
ctx = trace.NewContext(c.ctx, span)
logger.Debug("StateSync")
if c.config.FollowerMode {
return nil
@ -1198,9 +1198,8 @@ func (c *Cluster) StateSync(ctx context.Context) error {
// an error happens, it is returned. This method blocks until it finishes. The
// operation can be aborted by canceling the context.
func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.GlobalPinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/StatusAll")
ctx, span := trace.StartSpan(ctx, "cluster/StatusAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
in := make(chan api.TrackerStatus, 1)
in <- filter
@ -1211,9 +1210,8 @@ func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out c
// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer on
// the out channel. It blocks until finished.
func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
ctx, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.tracker.StatusAll(ctx, filter, out)
}
@ -1222,18 +1220,16 @@ func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus,
// current peers. If an error happens, the GlobalPinInfo should contain
// as much information as could be fetched from the other peers.
func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/Status")
ctx, span := trace.StartSpan(ctx, "cluster/Status")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoCid(ctx, "PinTracker", "Status", h)
}
// StatusLocal returns this peer's PinInfo for a given Cid.
func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo {
_, span := trace.StartSpan(ctx, "cluster/StatusLocal")
ctx, span := trace.StartSpan(ctx, "cluster/StatusLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.tracker.Status(ctx, h)
}
@ -1268,9 +1264,8 @@ func (c *Cluster) localPinInfoOp(
// GlobalPinInfo objets for all recovered items. This method blocks until
// finished. Operation can be aborted by canceling the context.
func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/RecoverAll")
ctx, span := trace.StartSpan(ctx, "cluster/RecoverAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoStream(ctx, "Cluster", "RecoverAllLocal", nil, out)
}
@ -1287,9 +1282,8 @@ func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo)
//
// RecoverAllLocal is called automatically every PinRecoverInterval.
func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
ctx, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.tracker.RecoverAll(ctx, out)
}
@ -1301,9 +1295,8 @@ func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) e
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/Recover")
ctx, span := trace.StartSpan(ctx, "cluster/Recover")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoCid(ctx, "PinTracker", "Recover", h)
}
@ -1315,9 +1308,8 @@ func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, er
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
ctx, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.localPinInfoOp(ctx, h, c.tracker.Recover)
}
@ -1330,9 +1322,8 @@ func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, err
// The operation can be aborted by canceling the context. This methods blocks
// until the operation has completed.
func (c *Cluster) Pins(ctx context.Context, out chan<- api.Pin) error {
_, span := trace.StartSpan(ctx, "cluster/Pins")
ctx, span := trace.StartSpan(ctx, "cluster/Pins")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
cState, err := c.consensus.State(ctx)
if err != nil {
@ -1370,9 +1361,8 @@ func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) {
// the item is successfully pinned. For that, use Status(). PinGet
// returns an error if the given Cid is not part of the global state.
func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/PinGet")
ctx, span := trace.StartSpan(ctx, "cluster/PinGet")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
st, err := c.consensus.State(ctx)
if err != nil {
@ -1407,10 +1397,9 @@ func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) {
// If the Update option is set, the pin options (including allocations) will
// be copied from an existing one. This is equivalent to running PinUpdate.
func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Pin")
ctx, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
@ -1622,9 +1611,8 @@ func (c *Cluster) pin(
// Unpin does not reflect the success or failure of underlying IPFS daemon
// unpinning operations, which happen in async fashion.
func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Unpin")
ctx, span := trace.StartSpan(ctx, "cluster/Unpin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
if c.config.FollowerMode {
return api.Pin{}, errFollowerMode
@ -1644,7 +1632,7 @@ func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
return pin, errors.New(err)
case api.MetaType:
// Unpin cluster dag and referenced shards
err := c.unpinClusterDag(pin)
err := c.unpinClusterDag(ctx, pin)
if err != nil {
return pin, err
}
@ -1661,10 +1649,7 @@ func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
// nodes that it references. It handles the case where multiple parents
// reference the same metadata node, only unpinning those nodes without
// existing references
func (c *Cluster) unpinClusterDag(metaPin api.Pin) error {
ctx, span := trace.StartSpan(c.ctx, "cluster/unpinClusterDag")
defer span.End()
func (c *Cluster) unpinClusterDag(ctx context.Context, metaPin api.Pin) error {
cids, err := c.cidsFromMetaPin(ctx, metaPin.Cid)
if err != nil {
return err
@ -1718,10 +1703,9 @@ func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
// Pin object.
func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/PinPath")
ctx, span := trace.StartSpan(ctx, "cluster/PinPath")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
ci, err := c.ipfs.Resolve(ctx, path)
if err != nil {
return api.Pin{}, err
@ -1733,10 +1717,9 @@ func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions)
// UnpinPath unpins a CID resolved from its IPFS Path. If returns the
// previously pinned Pin object.
func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/UnpinPath")
ctx, span := trace.StartSpan(ctx, "cluster/UnpinPath")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
ci, err := c.ipfs.Resolve(ctx, path)
if err != nil {
return api.Pin{}, err
@ -1771,9 +1754,8 @@ func (c *Cluster) Version() string {
// Peers returns the IDs of the members of this Cluster on the out channel.
// This method blocks until it has finished.
func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) {
_, span := trace.StartSpan(ctx, "cluster/Peers")
ctx, span := trace.StartSpan(ctx, "cluster/Peers")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
peers, err := c.consensus.Peers(ctx)
if err != nil {
@ -2251,9 +2233,8 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, er
// RepoGC performs garbage collection sweep on all peers' IPFS repo.
func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
_, span := trace.StartSpan(ctx, "cluster/RepoGC")
ctx, span := trace.StartSpan(ctx, "cluster/RepoGC")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
members, err := c.consensus.Peers(ctx)
if err != nil {
@ -2286,7 +2267,7 @@ func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err)
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, member))
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, member))
globalRepoGC.PeerMap[member.String()] = api.RepoGC{
Peer: member,
@ -2301,9 +2282,8 @@ func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
// RepoGCLocal performs garbage collection only on the local IPFS deamon.
func (c *Cluster) RepoGCLocal(ctx context.Context) (api.RepoGC, error) {
_, span := trace.StartSpan(ctx, "cluster/RepoGCLocal")
ctx, span := trace.StartSpan(ctx, "cluster/RepoGCLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
resp, err := c.ipfs.RepoGC(ctx)
if err != nil {

View File

@ -251,8 +251,8 @@ func (cc *Consensus) op(ctx context.Context, pin api.Pin, t LogOpType) *LogOp {
// returns true if the operation was redirected to the leader
// note that if the leader just dissappeared, the rpc call will
// fail because we haven't heard that it's gone.
func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) {
ctx, span := trace.StartSpan(cc.ctx, "consensus/redirectToLeader")
func (cc *Consensus) redirectToLeader(ctx context.Context, method string, arg interface{}) (bool, error) {
ctx, span := trace.StartSpan(ctx, "consensus/redirectToLeader")
defer span.End()
var finalErr error
@ -336,7 +336,7 @@ func (cc *Consensus) commit(ctx context.Context, op *LogOp, rpcOp string, redire
// try to send it to the leader
// redirectToLeader has it's own retry loop. If this fails
// we're done here.
ok, err := cc.redirectToLeader(rpcOp, redirectArg)
ok, err := cc.redirectToLeader(ctx, rpcOp, redirectArg)
if err != nil || ok {
return err
}
@ -404,7 +404,7 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
if finalErr != nil {
logger.Errorf("retrying to add peer. Attempt #%d failed: %s", i, finalErr)
}
ok, err := cc.redirectToLeader("AddPeer", pid)
ok, err := cc.redirectToLeader(ctx, "AddPeer", pid)
if err != nil || ok {
return err
}
@ -435,7 +435,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
if finalErr != nil {
logger.Errorf("retrying to remove peer. Attempt #%d failed: %s", i, finalErr)
}
ok, err := cc.redirectToLeader("RmPeer", pid)
ok, err := cc.redirectToLeader(ctx, "RmPeer", pid)
if err != nil || ok {
return err
}

View File

@ -1198,9 +1198,8 @@ func (ipfs *Connector) shouldUpdateMetric() bool {
// Trigger a broadcast of the local informer metrics.
func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric")
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric")
defer span.End()
ctx = trace.NewContext(ipfs.ctx, span)
if !ipfs.shouldUpdateMetric() {
return nil

View File

@ -123,9 +123,7 @@ func (op *Operation) Context() context.Context {
// Cancel will cancel the context associated to this operation.
func (op *Operation) Cancel() {
_, span := trace.StartSpan(op.ctx, "optracker/Cancel")
op.cancel()
span.End()
}
// Phase returns the Phase.
@ -141,7 +139,6 @@ func (op *Operation) Phase() Phase {
// SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) {
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
op.mu.Lock()
{
op.tracker.recordMetricUnsafe(op, -1)
@ -150,8 +147,6 @@ func (op *Operation) SetPhase(ph Phase) {
op.tracker.recordMetricUnsafe(op, 1)
}
op.mu.Unlock()
span.End()
}
// AttemptCount returns the number of times that this operation has been in
@ -201,7 +196,6 @@ func (op *Operation) Error() string {
// SetError sets the phase to PhaseError along with
// an error message. It updates the timestamp.
func (op *Operation) SetError(err error) {
_, span := trace.StartSpan(op.ctx, "optracker/SetError")
op.mu.Lock()
{
op.tracker.recordMetricUnsafe(op, -1)
@ -211,7 +205,6 @@ func (op *Operation) SetError(err error) {
op.tracker.recordMetricUnsafe(op, 1)
}
op.mu.Unlock()
span.End()
}
// Type returns the operation Type.
@ -237,9 +230,6 @@ func (op *Operation) Timestamp() time.Time {
// Canceled returns whether the context for this
// operation has been canceled.
func (op *Operation) Canceled() bool {
ctx, span := trace.StartSpan(op.ctx, "optracker/Canceled")
_ = ctx
defer span.End()
select {
case <-op.ctx.Done():
return true

View File

@ -76,8 +76,7 @@ func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *Ope
// If an operation exists it is of different type, it is
// canceled and the new one replaces it in the tracker.
func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase) *Operation {
ctx = trace.NewContext(opt.ctx, trace.FromContext(ctx))
ctx, span := trace.StartSpan(ctx, "optracker/TrackNewOperation")
_, span := trace.StartSpan(ctx, "optracker/TrackNewOperation")
defer span.End()
opt.mu.Lock()
@ -95,8 +94,9 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
op.tracker.recordMetric(op, -1)
op.Cancel() // cancel ongoing operation and replace it
}
op2 := newOperation(ctx, pin, typ, ph, opt)
// IMPORTANT: the operations must have the OperationTracker context,
// as otherwise their context would be canceled after being added.
op2 := newOperation(opt.ctx, pin, typ, ph, opt)
if ok && op.Type() == typ {
// Carry over the attempt count when doing an operation of the
// same type. The old operation exists and was canceled.