Merge pull request #1774 from ipfs-cluster/fix/raft-test
Fix: repinning does not re-allocate as needed
This commit is contained in:
commit
9cc97690bf
18
cluster.go
18
cluster.go
|
@ -638,6 +638,8 @@ func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID, pin api.Pin) {
|
||||||
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
|
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
logger.Debugf("repinning %s from peer %s", pin.Cid, p)
|
||||||
|
|
||||||
pin.Allocations = nil // force re-allocations
|
pin.Allocations = nil // force re-allocations
|
||||||
// note that pin() should not result in different allocations
|
// note that pin() should not result in different allocations
|
||||||
// if we are not under the replication-factor min.
|
// if we are not under the replication-factor min.
|
||||||
|
@ -1554,22 +1556,6 @@ func (c *Cluster) pin(
|
||||||
return pin, true, c.consensus.LogPin(ctx, pin)
|
return pin, true, c.consensus.LogPin(ctx, pin)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We did not change ANY options and the pin exists so we just repin
|
|
||||||
// what there is without doing new allocations. While this submits
|
|
||||||
// pins to the consensus layer even if they are, this should trigger the
|
|
||||||
// pin tracker and allows users to get re-pin operations by re-adding
|
|
||||||
// without having to use recover, which is naturally expected.
|
|
||||||
//
|
|
||||||
// blacklist is set on repinFromPeer having any blacklisted peers
|
|
||||||
// means we are repinning and need to trigger allocate(), therefore we
|
|
||||||
// can't overwrite the incoming pin (which has Allocations set to
|
|
||||||
// nil).
|
|
||||||
if existing.Defined() &&
|
|
||||||
pin.PinOptions.Equals(existing.PinOptions) &&
|
|
||||||
len(blacklist) == 0 {
|
|
||||||
pin = existing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Usually allocations are unset when pinning normally, however, the
|
// Usually allocations are unset when pinning normally, however, the
|
||||||
// allocations may have been preset by the adder in which case they
|
// allocations may have been preset by the adder in which case they
|
||||||
// need to be respected. Whenever allocations are set. We don't
|
// need to be respected. Whenever allocations are set. We don't
|
||||||
|
|
|
@ -275,10 +275,13 @@ func (cc *Consensus) redirectToLeader(ctx context.Context, method string, arg in
|
||||||
// means we timed out waiting for a leader
|
// means we timed out waiting for a leader
|
||||||
// we don't retry in this case
|
// we don't retry in this case
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("timed out waiting for leader: %s", err)
|
err = fmt.Errorf("timed out waiting for leader: %w", err)
|
||||||
|
logger.Error(err)
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
leader, err = peer.Decode(pidstr)
|
leader, err = peer.Decode(pidstr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1383,6 +1383,7 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) {
|
||||||
for _, c := range clusters {
|
for _, c := range clusters {
|
||||||
c.config.ReplicationFactorMin = 1
|
c.config.ReplicationFactorMin = 1
|
||||||
c.config.ReplicationFactorMax = nClusters
|
c.config.ReplicationFactorMax = nClusters
|
||||||
|
c.config.DisableRepinning = true
|
||||||
}
|
}
|
||||||
|
|
||||||
ttlDelay() // make sure we have places to pin
|
ttlDelay() // make sure we have places to pin
|
||||||
|
@ -1438,6 +1439,7 @@ func TestClustersReplicationFactorInBetween(t *testing.T) {
|
||||||
for _, c := range clusters {
|
for _, c := range clusters {
|
||||||
c.config.ReplicationFactorMin = 1
|
c.config.ReplicationFactorMin = 1
|
||||||
c.config.ReplicationFactorMax = nClusters
|
c.config.ReplicationFactorMax = nClusters
|
||||||
|
c.config.DisableRepinning = true
|
||||||
}
|
}
|
||||||
|
|
||||||
ttlDelay()
|
ttlDelay()
|
||||||
|
@ -1493,6 +1495,7 @@ func TestClustersReplicationFactorMin(t *testing.T) {
|
||||||
for _, c := range clusters {
|
for _, c := range clusters {
|
||||||
c.config.ReplicationFactorMin = nClusters - 1
|
c.config.ReplicationFactorMin = nClusters - 1
|
||||||
c.config.ReplicationFactorMax = nClusters
|
c.config.ReplicationFactorMax = nClusters
|
||||||
|
c.config.DisableRepinning = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown two peers
|
// Shutdown two peers
|
||||||
|
@ -1525,6 +1528,7 @@ func TestClustersReplicationMinMaxNoRealloc(t *testing.T) {
|
||||||
for _, c := range clusters {
|
for _, c := range clusters {
|
||||||
c.config.ReplicationFactorMin = 1
|
c.config.ReplicationFactorMin = 1
|
||||||
c.config.ReplicationFactorMax = nClusters
|
c.config.ReplicationFactorMax = nClusters
|
||||||
|
c.config.DisableRepinning = true
|
||||||
}
|
}
|
||||||
|
|
||||||
ttlDelay()
|
ttlDelay()
|
||||||
|
@ -1579,6 +1583,7 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
|
||||||
for _, c := range clusters {
|
for _, c := range clusters {
|
||||||
c.config.ReplicationFactorMin = 3
|
c.config.ReplicationFactorMin = 3
|
||||||
c.config.ReplicationFactorMax = 4
|
c.config.ReplicationFactorMax = 4
|
||||||
|
c.config.DisableRepinning = true
|
||||||
}
|
}
|
||||||
|
|
||||||
ttlDelay() // make sure metrics are in
|
ttlDelay() // make sure metrics are in
|
||||||
|
@ -1667,6 +1672,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
|
||||||
for _, c := range clusters {
|
for _, c := range clusters {
|
||||||
c.config.ReplicationFactorMin = nClusters - 1
|
c.config.ReplicationFactorMin = nClusters - 1
|
||||||
c.config.ReplicationFactorMax = nClusters - 1
|
c.config.ReplicationFactorMax = nClusters - 1
|
||||||
|
c.config.DisableRepinning = true
|
||||||
}
|
}
|
||||||
|
|
||||||
ttlDelay()
|
ttlDelay()
|
||||||
|
@ -1750,8 +1756,8 @@ func TestClustersReplicationRealloc(t *testing.T) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pinfo := c.tracker.Status(ctx, h)
|
pinfo := c.tracker.Status(ctx, h)
|
||||||
|
t.Log(pinfo.Peer.Pretty(), pinfo.Status)
|
||||||
if pinfo.Status == api.TrackerStatusPinned {
|
if pinfo.Status == api.TrackerStatusPinned {
|
||||||
//t.Log(pinfo.Peer.Pretty())
|
|
||||||
numPinned++
|
numPinned++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1774,6 +1780,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
|
||||||
for _, c := range clusters {
|
for _, c := range clusters {
|
||||||
c.config.ReplicationFactorMin = nClusters - 1
|
c.config.ReplicationFactorMin = nClusters - 1
|
||||||
c.config.ReplicationFactorMax = nClusters - 1
|
c.config.ReplicationFactorMax = nClusters - 1
|
||||||
|
c.config.DisableRepinning = true
|
||||||
}
|
}
|
||||||
|
|
||||||
ttlDelay()
|
ttlDelay()
|
||||||
|
|
|
@ -380,6 +380,8 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return false
|
return false
|
||||||
|
case <-spt.ctx.Done():
|
||||||
|
return false
|
||||||
case out <- info:
|
case out <- info:
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -389,6 +391,7 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out
|
||||||
for p := range statePins {
|
for p := range statePins {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
case <-spt.ctx.Done():
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user