Fix bad context propagation / deadlocks

We are propagating the wrong context (mostly from the Cluster top-level
methods). This makes that request cancellations (and cancellations of the
associated contexts) are not propagated to many methods, and can result in
deadlocks when an operation that is holding a lock is not aborted.

This affects for example the operation tracker. Getting all operations from
the tracker relies on someone reading from the out channel, or on the context
being cancelled. When a request is aborted in the middle of the response, and
the context is not cancelled, everything that wants to list operations would
become deadlocked, including operations that need write locks like
TrackNewOperation.

This fixes it.
This commit is contained in:
Hector Sanjuan 2022-09-21 17:31:33 +02:00
parent 328f2388d0
commit 21855c3130
6 changed files with 46 additions and 78 deletions

View File

@ -82,7 +82,6 @@ func (c *defaultClient) doRequest(
r.ContentLength = -1 // this lets go use "chunked".
}
ctx = trace.NewContext(ctx, span)
r = r.WithContext(ctx)
return c.client.Do(r)

View File

@ -86,7 +86,7 @@ type Cluster struct {
paMux sync.Mutex
// shutdown function and related variables
shutdownLock sync.Mutex
shutdownLock sync.RWMutex
shutdownB bool
removed bool
@ -778,9 +778,8 @@ func (c *Cluster) Ready() <-chan struct{} {
// Shutdown does not close the libp2p host, the DHT, the datastore or
// generally anything that Cluster did not create.
func (c *Cluster) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "cluster/Shutdown")
ctx, span := trace.StartSpan(ctx, "cluster/Shutdown")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
@ -889,9 +888,8 @@ func (c *Cluster) Done() <-chan struct{} {
// ID returns information about the Cluster peer
func (c *Cluster) ID(ctx context.Context) api.ID {
_, span := trace.StartSpan(ctx, "cluster/ID")
ctx, span := trace.StartSpan(ctx, "cluster/ID")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
// ignore error since it is included in response object
ipfsID, err := c.ipfs.ID(ctx)
@ -957,9 +955,14 @@ func (c *Cluster) ID(ctx context.Context) api.ID {
// The new peer ID will be passed to the consensus
// component to be added to the peerset.
func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
_, span := trace.StartSpan(ctx, "cluster/PeerAdd")
ctx, span := trace.StartSpan(ctx, "cluster/PeerAdd")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
c.shutdownLock.RLock()
defer c.shutdownLock.RUnlock()
if c.shutdownB {
return nil, errors.New("cluster is shutdown")
}
// starting 10 nodes on the same box for testing
// causes deadlock and a global lock here
@ -992,9 +995,8 @@ func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
// The peer will be removed from the consensus peerset.
// This may first trigger repinnings for all content if not disabled.
func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
_, span := trace.StartSpan(ctx, "cluster/PeerRemove")
ctx, span := trace.StartSpan(ctx, "cluster/PeerRemove")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
// We need to repin before removing the peer, otherwise, it won't
// be able to submit the pins.
@ -1015,9 +1017,8 @@ func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
// cluster and making sure that the new peer is ready to discover and contact
// the rest.
func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
_, span := trace.StartSpan(ctx, "cluster/Join")
ctx, span := trace.StartSpan(ctx, "cluster/Join")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
logger.Debugf("Join(%s)", addr)
@ -1100,7 +1101,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
// ConnectSwarms in the background after a while, when we have likely
// received some metrics.
time.AfterFunc(c.config.MonitorPingInterval, func() {
c.ipfs.ConnectSwarms(ctx)
c.ipfs.ConnectSwarms(c.ctx)
})
// wait for leader and for state to catch up
@ -1118,7 +1119,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
for range out {
}
}()
go c.RecoverAllLocal(ctx, out)
go c.RecoverAllLocal(c.ctx, out)
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
return nil
@ -1146,11 +1147,10 @@ func (c *Cluster) distances(ctx context.Context, exclude peer.ID) (*distanceChec
// - Sends unpin for expired items for which this peer is "closest"
// (skipped for follower peers)
func (c *Cluster) StateSync(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "cluster/StateSync")
ctx, span := trace.StartSpan(ctx, "cluster/StateSync")
defer span.End()
logger.Debug("StateSync")
ctx = trace.NewContext(c.ctx, span)
logger.Debug("StateSync")
if c.config.FollowerMode {
return nil
@ -1198,9 +1198,8 @@ func (c *Cluster) StateSync(ctx context.Context) error {
// an error happens, it is returned. This method blocks until it finishes. The
// operation can be aborted by canceling the context.
func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.GlobalPinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/StatusAll")
ctx, span := trace.StartSpan(ctx, "cluster/StatusAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
in := make(chan api.TrackerStatus, 1)
in <- filter
@ -1211,9 +1210,8 @@ func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out c
// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer on
// the out channel. It blocks until finished.
func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
ctx, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.tracker.StatusAll(ctx, filter, out)
}
@ -1222,18 +1220,16 @@ func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus,
// current peers. If an error happens, the GlobalPinInfo should contain
// as much information as could be fetched from the other peers.
func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/Status")
ctx, span := trace.StartSpan(ctx, "cluster/Status")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoCid(ctx, "PinTracker", "Status", h)
}
// StatusLocal returns this peer's PinInfo for a given Cid.
func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo {
_, span := trace.StartSpan(ctx, "cluster/StatusLocal")
ctx, span := trace.StartSpan(ctx, "cluster/StatusLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.tracker.Status(ctx, h)
}
@ -1268,9 +1264,8 @@ func (c *Cluster) localPinInfoOp(
// GlobalPinInfo objets for all recovered items. This method blocks until
// finished. Operation can be aborted by canceling the context.
func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/RecoverAll")
ctx, span := trace.StartSpan(ctx, "cluster/RecoverAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoStream(ctx, "Cluster", "RecoverAllLocal", nil, out)
}
@ -1287,9 +1282,8 @@ func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo)
//
// RecoverAllLocal is called automatically every PinRecoverInterval.
func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) error {
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
ctx, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.tracker.RecoverAll(ctx, out)
}
@ -1301,9 +1295,8 @@ func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) e
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/Recover")
ctx, span := trace.StartSpan(ctx, "cluster/Recover")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.globalPinInfoCid(ctx, "PinTracker", "Recover", h)
}
@ -1315,9 +1308,8 @@ func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, er
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
ctx, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
return c.localPinInfoOp(ctx, h, c.tracker.Recover)
}
@ -1330,9 +1322,8 @@ func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, err
// The operation can be aborted by canceling the context. This methods blocks
// until the operation has completed.
func (c *Cluster) Pins(ctx context.Context, out chan<- api.Pin) error {
_, span := trace.StartSpan(ctx, "cluster/Pins")
ctx, span := trace.StartSpan(ctx, "cluster/Pins")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
cState, err := c.consensus.State(ctx)
if err != nil {
@ -1370,9 +1361,8 @@ func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) {
// the item is successfully pinned. For that, use Status(). PinGet
// returns an error if the given Cid is not part of the global state.
func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/PinGet")
ctx, span := trace.StartSpan(ctx, "cluster/PinGet")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
st, err := c.consensus.State(ctx)
if err != nil {
@ -1407,10 +1397,9 @@ func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) {
// If the Update option is set, the pin options (including allocations) will
// be copied from an existing one. This is equivalent to running PinUpdate.
func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Pin")
ctx, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
@ -1622,9 +1611,8 @@ func (c *Cluster) pin(
// Unpin does not reflect the success or failure of underlying IPFS daemon
// unpinning operations, which happen in async fashion.
func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Unpin")
ctx, span := trace.StartSpan(ctx, "cluster/Unpin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
if c.config.FollowerMode {
return api.Pin{}, errFollowerMode
@ -1644,7 +1632,7 @@ func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
return pin, errors.New(err)
case api.MetaType:
// Unpin cluster dag and referenced shards
err := c.unpinClusterDag(pin)
err := c.unpinClusterDag(ctx, pin)
if err != nil {
return pin, err
}
@ -1661,10 +1649,7 @@ func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
// nodes that it references. It handles the case where multiple parents
// reference the same metadata node, only unpinning those nodes without
// existing references
func (c *Cluster) unpinClusterDag(metaPin api.Pin) error {
ctx, span := trace.StartSpan(c.ctx, "cluster/unpinClusterDag")
defer span.End()
func (c *Cluster) unpinClusterDag(ctx context.Context, metaPin api.Pin) error {
cids, err := c.cidsFromMetaPin(ctx, metaPin.Cid)
if err != nil {
return err
@ -1718,10 +1703,9 @@ func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
// Pin object.
func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/PinPath")
ctx, span := trace.StartSpan(ctx, "cluster/PinPath")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
ci, err := c.ipfs.Resolve(ctx, path)
if err != nil {
return api.Pin{}, err
@ -1733,10 +1717,9 @@ func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions)
// UnpinPath unpins a CID resolved from its IPFS Path. If returns the
// previously pinned Pin object.
func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/UnpinPath")
ctx, span := trace.StartSpan(ctx, "cluster/UnpinPath")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
ci, err := c.ipfs.Resolve(ctx, path)
if err != nil {
return api.Pin{}, err
@ -1771,9 +1754,8 @@ func (c *Cluster) Version() string {
// Peers returns the IDs of the members of this Cluster on the out channel.
// This method blocks until it has finished.
func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) {
_, span := trace.StartSpan(ctx, "cluster/Peers")
ctx, span := trace.StartSpan(ctx, "cluster/Peers")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
peers, err := c.consensus.Peers(ctx)
if err != nil {
@ -2251,9 +2233,8 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, er
// RepoGC performs garbage collection sweep on all peers' IPFS repo.
func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
_, span := trace.StartSpan(ctx, "cluster/RepoGC")
ctx, span := trace.StartSpan(ctx, "cluster/RepoGC")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
members, err := c.consensus.Peers(ctx)
if err != nil {
@ -2286,7 +2267,7 @@ func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err)
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, member))
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, member))
globalRepoGC.PeerMap[member.String()] = api.RepoGC{
Peer: member,
@ -2301,9 +2282,8 @@ func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
// RepoGCLocal performs garbage collection only on the local IPFS deamon.
func (c *Cluster) RepoGCLocal(ctx context.Context) (api.RepoGC, error) {
_, span := trace.StartSpan(ctx, "cluster/RepoGCLocal")
ctx, span := trace.StartSpan(ctx, "cluster/RepoGCLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
resp, err := c.ipfs.RepoGC(ctx)
if err != nil {

View File

@ -251,8 +251,8 @@ func (cc *Consensus) op(ctx context.Context, pin api.Pin, t LogOpType) *LogOp {
// returns true if the operation was redirected to the leader
// note that if the leader just dissappeared, the rpc call will
// fail because we haven't heard that it's gone.
func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) {
ctx, span := trace.StartSpan(cc.ctx, "consensus/redirectToLeader")
func (cc *Consensus) redirectToLeader(ctx context.Context, method string, arg interface{}) (bool, error) {
ctx, span := trace.StartSpan(ctx, "consensus/redirectToLeader")
defer span.End()
var finalErr error
@ -336,7 +336,7 @@ func (cc *Consensus) commit(ctx context.Context, op *LogOp, rpcOp string, redire
// try to send it to the leader
// redirectToLeader has it's own retry loop. If this fails
// we're done here.
ok, err := cc.redirectToLeader(rpcOp, redirectArg)
ok, err := cc.redirectToLeader(ctx, rpcOp, redirectArg)
if err != nil || ok {
return err
}
@ -404,7 +404,7 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
if finalErr != nil {
logger.Errorf("retrying to add peer. Attempt #%d failed: %s", i, finalErr)
}
ok, err := cc.redirectToLeader("AddPeer", pid)
ok, err := cc.redirectToLeader(ctx, "AddPeer", pid)
if err != nil || ok {
return err
}
@ -435,7 +435,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
if finalErr != nil {
logger.Errorf("retrying to remove peer. Attempt #%d failed: %s", i, finalErr)
}
ok, err := cc.redirectToLeader("RmPeer", pid)
ok, err := cc.redirectToLeader(ctx, "RmPeer", pid)
if err != nil || ok {
return err
}

View File

@ -1198,9 +1198,8 @@ func (ipfs *Connector) shouldUpdateMetric() bool {
// Trigger a broadcast of the local informer metrics.
func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric")
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric")
defer span.End()
ctx = trace.NewContext(ipfs.ctx, span)
if !ipfs.shouldUpdateMetric() {
return nil

View File

@ -123,9 +123,7 @@ func (op *Operation) Context() context.Context {
// Cancel will cancel the context associated to this operation.
func (op *Operation) Cancel() {
_, span := trace.StartSpan(op.ctx, "optracker/Cancel")
op.cancel()
span.End()
}
// Phase returns the Phase.
@ -141,7 +139,6 @@ func (op *Operation) Phase() Phase {
// SetPhase changes the Phase and updates the timestamp.
func (op *Operation) SetPhase(ph Phase) {
_, span := trace.StartSpan(op.ctx, "optracker/SetPhase")
op.mu.Lock()
{
op.tracker.recordMetricUnsafe(op, -1)
@ -150,8 +147,6 @@ func (op *Operation) SetPhase(ph Phase) {
op.tracker.recordMetricUnsafe(op, 1)
}
op.mu.Unlock()
span.End()
}
// AttemptCount returns the number of times that this operation has been in
@ -201,7 +196,6 @@ func (op *Operation) Error() string {
// SetError sets the phase to PhaseError along with
// an error message. It updates the timestamp.
func (op *Operation) SetError(err error) {
_, span := trace.StartSpan(op.ctx, "optracker/SetError")
op.mu.Lock()
{
op.tracker.recordMetricUnsafe(op, -1)
@ -211,7 +205,6 @@ func (op *Operation) SetError(err error) {
op.tracker.recordMetricUnsafe(op, 1)
}
op.mu.Unlock()
span.End()
}
// Type returns the operation Type.
@ -237,9 +230,6 @@ func (op *Operation) Timestamp() time.Time {
// Canceled returns whether the context for this
// operation has been canceled.
func (op *Operation) Canceled() bool {
ctx, span := trace.StartSpan(op.ctx, "optracker/Canceled")
_ = ctx
defer span.End()
select {
case <-op.ctx.Done():
return true

View File

@ -76,8 +76,7 @@ func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *Ope
// If an operation exists it is of different type, it is
// canceled and the new one replaces it in the tracker.
func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase) *Operation {
ctx = trace.NewContext(opt.ctx, trace.FromContext(ctx))
ctx, span := trace.StartSpan(ctx, "optracker/TrackNewOperation")
_, span := trace.StartSpan(ctx, "optracker/TrackNewOperation")
defer span.End()
opt.mu.Lock()
@ -95,8 +94,9 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin,
op.tracker.recordMetric(op, -1)
op.Cancel() // cancel ongoing operation and replace it
}
op2 := newOperation(ctx, pin, typ, ph, opt)
// IMPORTANT: the operations must have the OperationTracker context,
// as otherwise their context would be canceled after being added.
op2 := newOperation(opt.ctx, pin, typ, ph, opt)
if ok && op.Type() == typ {
// Carry over the attempt count when doing an operation of the
// same type. The old operation exists and was canceled.