diff --git a/api/rest/client/request.go b/api/rest/client/request.go index a08e6109..43fc87ec 100644 --- a/api/rest/client/request.go +++ b/api/rest/client/request.go @@ -82,7 +82,6 @@ func (c *defaultClient) doRequest( r.ContentLength = -1 // this lets go use "chunked". } - ctx = trace.NewContext(ctx, span) r = r.WithContext(ctx) return c.client.Do(r) diff --git a/cluster.go b/cluster.go index d4bfea29..d999a426 100644 --- a/cluster.go +++ b/cluster.go @@ -86,7 +86,7 @@ type Cluster struct { paMux sync.Mutex // shutdown function and related variables - shutdownLock sync.Mutex + shutdownLock sync.RWMutex shutdownB bool removed bool @@ -778,9 +778,8 @@ func (c *Cluster) Ready() <-chan struct{} { // Shutdown does not close the libp2p host, the DHT, the datastore or // generally anything that Cluster did not create. func (c *Cluster) Shutdown(ctx context.Context) error { - _, span := trace.StartSpan(ctx, "cluster/Shutdown") + ctx, span := trace.StartSpan(ctx, "cluster/Shutdown") defer span.End() - ctx = trace.NewContext(c.ctx, span) c.shutdownLock.Lock() defer c.shutdownLock.Unlock() @@ -889,9 +888,8 @@ func (c *Cluster) Done() <-chan struct{} { // ID returns information about the Cluster peer func (c *Cluster) ID(ctx context.Context) api.ID { - _, span := trace.StartSpan(ctx, "cluster/ID") + ctx, span := trace.StartSpan(ctx, "cluster/ID") defer span.End() - ctx = trace.NewContext(c.ctx, span) // ignore error since it is included in response object ipfsID, err := c.ipfs.ID(ctx) @@ -957,9 +955,14 @@ func (c *Cluster) ID(ctx context.Context) api.ID { // The new peer ID will be passed to the consensus // component to be added to the peerset. func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) { - _, span := trace.StartSpan(ctx, "cluster/PeerAdd") + ctx, span := trace.StartSpan(ctx, "cluster/PeerAdd") defer span.End() - ctx = trace.NewContext(c.ctx, span) + + c.shutdownLock.RLock() + defer c.shutdownLock.RUnlock() + if c.shutdownB { + return nil, errors.New("cluster is shutdown") + } // starting 10 nodes on the same box for testing // causes deadlock and a global lock here @@ -992,9 +995,8 @@ func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) { // The peer will be removed from the consensus peerset. // This may first trigger repinnings for all content if not disabled. func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error { - _, span := trace.StartSpan(ctx, "cluster/PeerRemove") + ctx, span := trace.StartSpan(ctx, "cluster/PeerRemove") defer span.End() - ctx = trace.NewContext(c.ctx, span) // We need to repin before removing the peer, otherwise, it won't // be able to submit the pins. @@ -1015,9 +1017,8 @@ func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error { // cluster and making sure that the new peer is ready to discover and contact // the rest. func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { - _, span := trace.StartSpan(ctx, "cluster/Join") + ctx, span := trace.StartSpan(ctx, "cluster/Join") defer span.End() - ctx = trace.NewContext(c.ctx, span) logger.Debugf("Join(%s)", addr) @@ -1100,7 +1101,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { // ConnectSwarms in the background after a while, when we have likely // received some metrics. time.AfterFunc(c.config.MonitorPingInterval, func() { - c.ipfs.ConnectSwarms(ctx) + c.ipfs.ConnectSwarms(c.ctx) }) // wait for leader and for state to catch up @@ -1118,7 +1119,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { for range out { } }() - go c.RecoverAllLocal(ctx, out) + go c.RecoverAllLocal(c.ctx, out) logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty()) return nil @@ -1146,11 +1147,10 @@ func (c *Cluster) distances(ctx context.Context, exclude peer.ID) (*distanceChec // - Sends unpin for expired items for which this peer is "closest" // (skipped for follower peers) func (c *Cluster) StateSync(ctx context.Context) error { - _, span := trace.StartSpan(ctx, "cluster/StateSync") + ctx, span := trace.StartSpan(ctx, "cluster/StateSync") defer span.End() - logger.Debug("StateSync") - ctx = trace.NewContext(c.ctx, span) + logger.Debug("StateSync") if c.config.FollowerMode { return nil @@ -1198,9 +1198,8 @@ func (c *Cluster) StateSync(ctx context.Context) error { // an error happens, it is returned. This method blocks until it finishes. The // operation can be aborted by canceling the context. func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.GlobalPinInfo) error { - _, span := trace.StartSpan(ctx, "cluster/StatusAll") + ctx, span := trace.StartSpan(ctx, "cluster/StatusAll") defer span.End() - ctx = trace.NewContext(c.ctx, span) in := make(chan api.TrackerStatus, 1) in <- filter @@ -1211,9 +1210,8 @@ func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out c // StatusAllLocal returns the PinInfo for all the tracked Cids in this peer on // the out channel. It blocks until finished. func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error { - _, span := trace.StartSpan(ctx, "cluster/StatusAllLocal") + ctx, span := trace.StartSpan(ctx, "cluster/StatusAllLocal") defer span.End() - ctx = trace.NewContext(c.ctx, span) return c.tracker.StatusAll(ctx, filter, out) } @@ -1222,18 +1220,16 @@ func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, // current peers. If an error happens, the GlobalPinInfo should contain // as much information as could be fetched from the other peers. func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/Status") + ctx, span := trace.StartSpan(ctx, "cluster/Status") defer span.End() - ctx = trace.NewContext(c.ctx, span) return c.globalPinInfoCid(ctx, "PinTracker", "Status", h) } // StatusLocal returns this peer's PinInfo for a given Cid. func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo { - _, span := trace.StartSpan(ctx, "cluster/StatusLocal") + ctx, span := trace.StartSpan(ctx, "cluster/StatusLocal") defer span.End() - ctx = trace.NewContext(c.ctx, span) return c.tracker.Status(ctx, h) } @@ -1268,9 +1264,8 @@ func (c *Cluster) localPinInfoOp( // GlobalPinInfo objets for all recovered items. This method blocks until // finished. Operation can be aborted by canceling the context. func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo) error { - _, span := trace.StartSpan(ctx, "cluster/RecoverAll") + ctx, span := trace.StartSpan(ctx, "cluster/RecoverAll") defer span.End() - ctx = trace.NewContext(c.ctx, span) return c.globalPinInfoStream(ctx, "Cluster", "RecoverAllLocal", nil, out) } @@ -1287,9 +1282,8 @@ func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo) // // RecoverAllLocal is called automatically every PinRecoverInterval. func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) error { - _, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal") + ctx, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal") defer span.End() - ctx = trace.NewContext(c.ctx, span) return c.tracker.RecoverAll(ctx, out) } @@ -1301,9 +1295,8 @@ func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) e // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/Recover") + ctx, span := trace.StartSpan(ctx, "cluster/Recover") defer span.End() - ctx = trace.NewContext(c.ctx, span) return c.globalPinInfoCid(ctx, "PinTracker", "Recover", h) } @@ -1315,9 +1308,8 @@ func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, er // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) { - _, span := trace.StartSpan(ctx, "cluster/RecoverLocal") + ctx, span := trace.StartSpan(ctx, "cluster/RecoverLocal") defer span.End() - ctx = trace.NewContext(c.ctx, span) return c.localPinInfoOp(ctx, h, c.tracker.Recover) } @@ -1330,9 +1322,8 @@ func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, err // The operation can be aborted by canceling the context. This methods blocks // until the operation has completed. func (c *Cluster) Pins(ctx context.Context, out chan<- api.Pin) error { - _, span := trace.StartSpan(ctx, "cluster/Pins") + ctx, span := trace.StartSpan(ctx, "cluster/Pins") defer span.End() - ctx = trace.NewContext(c.ctx, span) cState, err := c.consensus.State(ctx) if err != nil { @@ -1370,9 +1361,8 @@ func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) { // the item is successfully pinned. For that, use Status(). PinGet // returns an error if the given Cid is not part of the global state. func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) { - _, span := trace.StartSpan(ctx, "cluster/PinGet") + ctx, span := trace.StartSpan(ctx, "cluster/PinGet") defer span.End() - ctx = trace.NewContext(c.ctx, span) st, err := c.consensus.State(ctx) if err != nil { @@ -1407,10 +1397,9 @@ func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) { // If the Update option is set, the pin options (including allocations) will // be copied from an existing one. This is equivalent to running PinUpdate. func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) { - _, span := trace.StartSpan(ctx, "cluster/Pin") + ctx, span := trace.StartSpan(ctx, "cluster/Pin") defer span.End() - ctx = trace.NewContext(c.ctx, span) pin := api.PinWithOpts(h, opts) result, _, err := c.pin(ctx, pin, []peer.ID{}) @@ -1622,9 +1611,8 @@ func (c *Cluster) pin( // Unpin does not reflect the success or failure of underlying IPFS daemon // unpinning operations, which happen in async fashion. func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) { - _, span := trace.StartSpan(ctx, "cluster/Unpin") + ctx, span := trace.StartSpan(ctx, "cluster/Unpin") defer span.End() - ctx = trace.NewContext(c.ctx, span) if c.config.FollowerMode { return api.Pin{}, errFollowerMode @@ -1644,7 +1632,7 @@ func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) { return pin, errors.New(err) case api.MetaType: // Unpin cluster dag and referenced shards - err := c.unpinClusterDag(pin) + err := c.unpinClusterDag(ctx, pin) if err != nil { return pin, err } @@ -1661,10 +1649,7 @@ func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) { // nodes that it references. It handles the case where multiple parents // reference the same metadata node, only unpinning those nodes without // existing references -func (c *Cluster) unpinClusterDag(metaPin api.Pin) error { - ctx, span := trace.StartSpan(c.ctx, "cluster/unpinClusterDag") - defer span.End() - +func (c *Cluster) unpinClusterDag(ctx context.Context, metaPin api.Pin) error { cids, err := c.cidsFromMetaPin(ctx, metaPin.Cid) if err != nil { return err @@ -1718,10 +1703,9 @@ func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts // PinPath pins an CID resolved from its IPFS Path. It returns the resolved // Pin object. func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) { - _, span := trace.StartSpan(ctx, "cluster/PinPath") + ctx, span := trace.StartSpan(ctx, "cluster/PinPath") defer span.End() - ctx = trace.NewContext(c.ctx, span) ci, err := c.ipfs.Resolve(ctx, path) if err != nil { return api.Pin{}, err @@ -1733,10 +1717,9 @@ func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) // UnpinPath unpins a CID resolved from its IPFS Path. If returns the // previously pinned Pin object. func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) { - _, span := trace.StartSpan(ctx, "cluster/UnpinPath") + ctx, span := trace.StartSpan(ctx, "cluster/UnpinPath") defer span.End() - ctx = trace.NewContext(c.ctx, span) ci, err := c.ipfs.Resolve(ctx, path) if err != nil { return api.Pin{}, err @@ -1771,9 +1754,8 @@ func (c *Cluster) Version() string { // Peers returns the IDs of the members of this Cluster on the out channel. // This method blocks until it has finished. func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) { - _, span := trace.StartSpan(ctx, "cluster/Peers") + ctx, span := trace.StartSpan(ctx, "cluster/Peers") defer span.End() - ctx = trace.NewContext(c.ctx, span) peers, err := c.consensus.Peers(ctx) if err != nil { @@ -2251,9 +2233,8 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, er // RepoGC performs garbage collection sweep on all peers' IPFS repo. func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) { - _, span := trace.StartSpan(ctx, "cluster/RepoGC") + ctx, span := trace.StartSpan(ctx, "cluster/RepoGC") defer span.End() - ctx = trace.NewContext(c.ctx, span) members, err := c.consensus.Peers(ctx) if err != nil { @@ -2286,7 +2267,7 @@ func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) { logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err) - pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, member)) + pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, member)) globalRepoGC.PeerMap[member.String()] = api.RepoGC{ Peer: member, @@ -2301,9 +2282,8 @@ func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) { // RepoGCLocal performs garbage collection only on the local IPFS deamon. func (c *Cluster) RepoGCLocal(ctx context.Context) (api.RepoGC, error) { - _, span := trace.StartSpan(ctx, "cluster/RepoGCLocal") + ctx, span := trace.StartSpan(ctx, "cluster/RepoGCLocal") defer span.End() - ctx = trace.NewContext(c.ctx, span) resp, err := c.ipfs.RepoGC(ctx) if err != nil { diff --git a/consensus/raft/consensus.go b/consensus/raft/consensus.go index bea88dce..9f4a7ee5 100644 --- a/consensus/raft/consensus.go +++ b/consensus/raft/consensus.go @@ -251,8 +251,8 @@ func (cc *Consensus) op(ctx context.Context, pin api.Pin, t LogOpType) *LogOp { // returns true if the operation was redirected to the leader // note that if the leader just dissappeared, the rpc call will // fail because we haven't heard that it's gone. -func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, error) { - ctx, span := trace.StartSpan(cc.ctx, "consensus/redirectToLeader") +func (cc *Consensus) redirectToLeader(ctx context.Context, method string, arg interface{}) (bool, error) { + ctx, span := trace.StartSpan(ctx, "consensus/redirectToLeader") defer span.End() var finalErr error @@ -336,7 +336,7 @@ func (cc *Consensus) commit(ctx context.Context, op *LogOp, rpcOp string, redire // try to send it to the leader // redirectToLeader has it's own retry loop. If this fails // we're done here. - ok, err := cc.redirectToLeader(rpcOp, redirectArg) + ok, err := cc.redirectToLeader(ctx, rpcOp, redirectArg) if err != nil || ok { return err } @@ -404,7 +404,7 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error { if finalErr != nil { logger.Errorf("retrying to add peer. Attempt #%d failed: %s", i, finalErr) } - ok, err := cc.redirectToLeader("AddPeer", pid) + ok, err := cc.redirectToLeader(ctx, "AddPeer", pid) if err != nil || ok { return err } @@ -435,7 +435,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { if finalErr != nil { logger.Errorf("retrying to remove peer. Attempt #%d failed: %s", i, finalErr) } - ok, err := cc.redirectToLeader("RmPeer", pid) + ok, err := cc.redirectToLeader(ctx, "RmPeer", pid) if err != nil || ok { return err } diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 7515d96f..66e5f10b 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -1198,9 +1198,8 @@ func (ipfs *Connector) shouldUpdateMetric() bool { // Trigger a broadcast of the local informer metrics. func (ipfs *Connector) updateInformerMetric(ctx context.Context) error { - _, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric") + ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric") defer span.End() - ctx = trace.NewContext(ipfs.ctx, span) if !ipfs.shouldUpdateMetric() { return nil diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index 748c18f8..1196316c 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -123,9 +123,7 @@ func (op *Operation) Context() context.Context { // Cancel will cancel the context associated to this operation. func (op *Operation) Cancel() { - _, span := trace.StartSpan(op.ctx, "optracker/Cancel") op.cancel() - span.End() } // Phase returns the Phase. @@ -141,7 +139,6 @@ func (op *Operation) Phase() Phase { // SetPhase changes the Phase and updates the timestamp. func (op *Operation) SetPhase(ph Phase) { - _, span := trace.StartSpan(op.ctx, "optracker/SetPhase") op.mu.Lock() { op.tracker.recordMetricUnsafe(op, -1) @@ -150,8 +147,6 @@ func (op *Operation) SetPhase(ph Phase) { op.tracker.recordMetricUnsafe(op, 1) } op.mu.Unlock() - - span.End() } // AttemptCount returns the number of times that this operation has been in @@ -201,7 +196,6 @@ func (op *Operation) Error() string { // SetError sets the phase to PhaseError along with // an error message. It updates the timestamp. func (op *Operation) SetError(err error) { - _, span := trace.StartSpan(op.ctx, "optracker/SetError") op.mu.Lock() { op.tracker.recordMetricUnsafe(op, -1) @@ -211,7 +205,6 @@ func (op *Operation) SetError(err error) { op.tracker.recordMetricUnsafe(op, 1) } op.mu.Unlock() - span.End() } // Type returns the operation Type. @@ -237,9 +230,6 @@ func (op *Operation) Timestamp() time.Time { // Canceled returns whether the context for this // operation has been canceled. func (op *Operation) Canceled() bool { - ctx, span := trace.StartSpan(op.ctx, "optracker/Canceled") - _ = ctx - defer span.End() select { case <-op.ctx.Done(): return true diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index 92c1e2f8..514e0625 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -76,8 +76,7 @@ func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *Ope // If an operation exists it is of different type, it is // canceled and the new one replaces it in the tracker. func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, typ OperationType, ph Phase) *Operation { - ctx = trace.NewContext(opt.ctx, trace.FromContext(ctx)) - ctx, span := trace.StartSpan(ctx, "optracker/TrackNewOperation") + _, span := trace.StartSpan(ctx, "optracker/TrackNewOperation") defer span.End() opt.mu.Lock() @@ -95,8 +94,9 @@ func (opt *OperationTracker) TrackNewOperation(ctx context.Context, pin api.Pin, op.tracker.recordMetric(op, -1) op.Cancel() // cancel ongoing operation and replace it } - - op2 := newOperation(ctx, pin, typ, ph, opt) + // IMPORTANT: the operations must have the OperationTracker context, + // as otherwise their context would be canceled after being added. + op2 := newOperation(opt.ctx, pin, typ, ph, opt) if ok && op.Type() == typ { // Carry over the attempt count when doing an operation of the // same type. The old operation exists and was canceled.