Fix: repinning does not re-allocate as needed
Long story: Since #1768 there has been a recurring repinning test failure with Raft consensus. Per the test, if a pin is allocated to a peer that has been shutdown, submitting the pin again should re-allocate it to a peer that is still running. Investigation on why this test fails and why it fails only in Raft lead to realizing that this and other similar tests, were passing by chance. The needed re-allocations were made not by the new submission of the pin, but by the automatic-repinning feature. The actual resubmitted pin was carrying the same allocations (one of them being the peer that was down), but it was silently failing because the RedirectToLeader() code path was using cc.ctx and hitting the peer that had been shutdown, which caused it to error. Fixing the context propagation, meant that we would re-overwrite the pin with the old allocations, thus the actual behaviour did not pass the test. So, on one side, this fix an number of tests that had not disabled automatic repinning and was probably getting in the way of things. On the other side, this removes a condition that prevents re-allocation of pins if they exists and options have not changed. I don't fully understand why this was there though, since the Allocate() code does return the old allocations anyways when they are enough, so it should not re-allocate randomly. I suspect this was preventing some misbehaviour in the Allocate() code from the time before it was improved with multiple allocators etc.
This commit is contained in:
parent
53bec20807
commit
11124ee224
18
cluster.go
18
cluster.go
|
@ -638,6 +638,8 @@ func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID, pin api.Pin) {
|
|||
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
|
||||
defer span.End()
|
||||
|
||||
logger.Debugf("repinning %s from peer %s", pin.Cid, p)
|
||||
|
||||
pin.Allocations = nil // force re-allocations
|
||||
// note that pin() should not result in different allocations
|
||||
// if we are not under the replication-factor min.
|
||||
|
@ -1554,22 +1556,6 @@ func (c *Cluster) pin(
|
|||
return pin, true, c.consensus.LogPin(ctx, pin)
|
||||
}
|
||||
|
||||
// We did not change ANY options and the pin exists so we just repin
|
||||
// what there is without doing new allocations. While this submits
|
||||
// pins to the consensus layer even if they are, this should trigger the
|
||||
// pin tracker and allows users to get re-pin operations by re-adding
|
||||
// without having to use recover, which is naturally expected.
|
||||
//
|
||||
// blacklist is set on repinFromPeer having any blacklisted peers
|
||||
// means we are repinning and need to trigger allocate(), therefore we
|
||||
// can't overwrite the incoming pin (which has Allocations set to
|
||||
// nil).
|
||||
if existing.Defined() &&
|
||||
pin.PinOptions.Equals(existing.PinOptions) &&
|
||||
len(blacklist) == 0 {
|
||||
pin = existing
|
||||
}
|
||||
|
||||
// Usually allocations are unset when pinning normally, however, the
|
||||
// allocations may have been preset by the adder in which case they
|
||||
// need to be respected. Whenever allocations are set. We don't
|
||||
|
|
|
@ -275,10 +275,13 @@ func (cc *Consensus) redirectToLeader(ctx context.Context, method string, arg in
|
|||
// means we timed out waiting for a leader
|
||||
// we don't retry in this case
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("timed out waiting for leader: %s", err)
|
||||
err = fmt.Errorf("timed out waiting for leader: %w", err)
|
||||
logger.Error(err)
|
||||
return false, err
|
||||
}
|
||||
leader, err = peer.Decode(pidstr)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1383,6 +1383,7 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) {
|
|||
for _, c := range clusters {
|
||||
c.config.ReplicationFactorMin = 1
|
||||
c.config.ReplicationFactorMax = nClusters
|
||||
c.config.DisableRepinning = true
|
||||
}
|
||||
|
||||
ttlDelay() // make sure we have places to pin
|
||||
|
@ -1438,6 +1439,7 @@ func TestClustersReplicationFactorInBetween(t *testing.T) {
|
|||
for _, c := range clusters {
|
||||
c.config.ReplicationFactorMin = 1
|
||||
c.config.ReplicationFactorMax = nClusters
|
||||
c.config.DisableRepinning = true
|
||||
}
|
||||
|
||||
ttlDelay()
|
||||
|
@ -1493,6 +1495,7 @@ func TestClustersReplicationFactorMin(t *testing.T) {
|
|||
for _, c := range clusters {
|
||||
c.config.ReplicationFactorMin = nClusters - 1
|
||||
c.config.ReplicationFactorMax = nClusters
|
||||
c.config.DisableRepinning = true
|
||||
}
|
||||
|
||||
// Shutdown two peers
|
||||
|
@ -1525,6 +1528,7 @@ func TestClustersReplicationMinMaxNoRealloc(t *testing.T) {
|
|||
for _, c := range clusters {
|
||||
c.config.ReplicationFactorMin = 1
|
||||
c.config.ReplicationFactorMax = nClusters
|
||||
c.config.DisableRepinning = true
|
||||
}
|
||||
|
||||
ttlDelay()
|
||||
|
@ -1579,6 +1583,7 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
|
|||
for _, c := range clusters {
|
||||
c.config.ReplicationFactorMin = 3
|
||||
c.config.ReplicationFactorMax = 4
|
||||
c.config.DisableRepinning = true
|
||||
}
|
||||
|
||||
ttlDelay() // make sure metrics are in
|
||||
|
@ -1667,6 +1672,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
|
|||
for _, c := range clusters {
|
||||
c.config.ReplicationFactorMin = nClusters - 1
|
||||
c.config.ReplicationFactorMax = nClusters - 1
|
||||
c.config.DisableRepinning = true
|
||||
}
|
||||
|
||||
ttlDelay()
|
||||
|
@ -1750,8 +1756,8 @@ func TestClustersReplicationRealloc(t *testing.T) {
|
|||
continue
|
||||
}
|
||||
pinfo := c.tracker.Status(ctx, h)
|
||||
t.Log(pinfo.Peer.Pretty(), pinfo.Status)
|
||||
if pinfo.Status == api.TrackerStatusPinned {
|
||||
//t.Log(pinfo.Peer.Pretty())
|
||||
numPinned++
|
||||
}
|
||||
}
|
||||
|
@ -1774,6 +1780,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
|
|||
for _, c := range clusters {
|
||||
c.config.ReplicationFactorMin = nClusters - 1
|
||||
c.config.ReplicationFactorMax = nClusters - 1
|
||||
c.config.DisableRepinning = true
|
||||
}
|
||||
|
||||
ttlDelay()
|
||||
|
|
|
@ -380,6 +380,8 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-spt.ctx.Done():
|
||||
return false
|
||||
case out <- info:
|
||||
return true
|
||||
}
|
||||
|
@ -389,6 +391,7 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out
|
|||
for p := range statePins {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-spt.ctx.Done():
|
||||
default:
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user