Fix: handling allocations
* pin() should not allocate if allocations are already provided * pin() should not skip pinning if the exact same pin exists * Additionally this was unreliable as it allocated it before so the pin may have existed but the allocations may have been artificially changed. * pin() re-uses existing pin when pin options are the same and thus avoids changing the allocations of a pin. As a side effect, this fixes re-allocations which were broken: peers called `shouldPeerRepinCid()` and instead of repinning that single cid proceeded to repin the full state. For every pin. Additionally tests have been adapted. It may be that some re-alloc tests were very unreliable for the problems above.
This commit is contained in:
parent
d9a5e17974
commit
96752e4e58
|
@ -478,6 +478,10 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
if po.Name != po2.Name {
|
||||
return false
|
||||
}
|
||||
|
||||
if po.ReplicationFactorMax != po2.ReplicationFactorMax {
|
||||
return false
|
||||
}
|
||||
|
@ -766,10 +770,6 @@ func (pin *Pin) Equals(pin2 *Pin) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
if pin.Name != pin2.Name {
|
||||
return false
|
||||
}
|
||||
|
||||
if pin.Type != pin2.Type {
|
||||
return false
|
||||
}
|
||||
|
|
66
cluster.go
66
cluster.go
|
@ -381,6 +381,11 @@ func (c *Cluster) alertsHandler() {
|
|||
continue // only handle ping alerts
|
||||
}
|
||||
|
||||
if c.config.DisableRepinning {
|
||||
logger.Debugf("repinning is disabled. Will not re-allocate pins on alerts")
|
||||
return
|
||||
}
|
||||
|
||||
cState, err := c.consensus.State(c.ctx)
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
|
@ -398,7 +403,7 @@ func (c *Cluster) alertsHandler() {
|
|||
continue
|
||||
}
|
||||
if c.shouldPeerRepinCid(alrt.Peer, pin) {
|
||||
c.repinFromPeer(c.ctx, alrt.Peer)
|
||||
c.repinFromPeer(c.ctx, alrt.Peer, pin)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -435,7 +440,7 @@ func (c *Cluster) watchPeers() {
|
|||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
logger.Debugf("%s watching peers", c.id)
|
||||
// logger.Debugf("%s watching peers", c.id)
|
||||
hasMe := false
|
||||
peers, err := c.consensus.Peers(c.ctx)
|
||||
if err != nil {
|
||||
|
@ -482,8 +487,8 @@ func (c *Cluster) reBootstrap() {
|
|||
}
|
||||
|
||||
// find all Cids pinned to a given peer and triggers re-pins on them.
|
||||
func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID) {
|
||||
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
|
||||
func (c *Cluster) vacatePeer(ctx context.Context, p peer.ID) {
|
||||
ctx, span := trace.StartSpan(ctx, "cluster/vacatePeer")
|
||||
defer span.End()
|
||||
|
||||
if c.config.DisableRepinning {
|
||||
|
@ -503,12 +508,22 @@ func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID) {
|
|||
}
|
||||
for _, pin := range list {
|
||||
if containsPeer(pin.Allocations, p) {
|
||||
_, ok, err := c.pin(ctx, pin, []peer.ID{p}) // pin blacklisting this peer
|
||||
c.repinFromPeer(ctx, p, pin)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// repinFromPeer triggers a repin on a given pin object blacklisting one of the
|
||||
// allocations.
|
||||
func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID, pin *api.Pin) {
|
||||
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
|
||||
defer span.End()
|
||||
|
||||
pin.Allocations = nil // force re-allocations
|
||||
_, ok, err := c.pin(ctx, pin, []peer.ID{p})
|
||||
if ok && err == nil {
|
||||
logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// run launches some go-routines which live throughout the cluster's life
|
||||
|
@ -837,7 +852,7 @@ func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
|
|||
// We need to repin before removing the peer, otherwise, it won't
|
||||
// be able to submit the pins.
|
||||
logger.Infof("re-allocating all CIDs directly associated to %s", pid)
|
||||
c.repinFromPeer(ctx, pid)
|
||||
c.vacatePeer(ctx, pid)
|
||||
|
||||
err := c.consensus.RmPeer(ctx, pid)
|
||||
if err != nil {
|
||||
|
@ -1125,6 +1140,10 @@ func (c *Cluster) SyncLocal(ctx context.Context, h cid.Cid) (pInfo *api.PinInfo,
|
|||
|
||||
// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
|
||||
// by this peer.
|
||||
//
|
||||
// Recover operations ask IPFS to pin or unpin items in error state. Recover
|
||||
// is faster than calling Pin on the same CID as it avoids committing an
|
||||
// identical pin to the consensus layer.
|
||||
func (c *Cluster) RecoverAllLocal(ctx context.Context) ([]*api.PinInfo, error) {
|
||||
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
|
||||
defer span.End()
|
||||
|
@ -1135,6 +1154,10 @@ func (c *Cluster) RecoverAllLocal(ctx context.Context) ([]*api.PinInfo, error) {
|
|||
|
||||
// Recover triggers a recover operation for a given Cid in all
|
||||
// cluster peers.
|
||||
//
|
||||
// Recover operations ask IPFS to pin or unpin items in error state. Recover
|
||||
// is faster than calling Pin on the same CID as it avoids committing an
|
||||
// identical pin to the consensus layer.
|
||||
func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, error) {
|
||||
_, span := trace.StartSpan(ctx, "cluster/Recover")
|
||||
defer span.End()
|
||||
|
@ -1145,6 +1168,10 @@ func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, e
|
|||
|
||||
// RecoverLocal triggers a recover operation for a given Cid in this peer only.
|
||||
// It returns the updated PinInfo, after recovery.
|
||||
//
|
||||
// Recover operations ask IPFS to pin or unpin items in error state. Recover
|
||||
// is faster than calling Pin on the same CID as it avoids committing an
|
||||
// identical pin to the consensus layer.
|
||||
func (c *Cluster) RecoverLocal(ctx context.Context, h cid.Cid) (pInfo *api.PinInfo, err error) {
|
||||
_, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
|
||||
defer span.End()
|
||||
|
@ -1343,6 +1370,23 @@ func (c *Cluster) pin(
|
|||
return pin, true, c.consensus.LogPin(ctx, pin)
|
||||
}
|
||||
|
||||
// We did not change ANY options and the pin exists so we just repin
|
||||
// what there is without doing new allocations. While this submits
|
||||
// pins to the consensus layer even if they are, this should trigger the
|
||||
// pin tracker and allows users to get re-pin operations by re-adding
|
||||
// without having to use recover, which is naturally expected.
|
||||
existing, err := c.PinGet(ctx, pin.Cid)
|
||||
if err == nil &&
|
||||
pin.PinOptions.Equals(&existing.PinOptions) &&
|
||||
len(blacklist) == 0 {
|
||||
pin = existing
|
||||
}
|
||||
|
||||
// Usually allocations are unset when pinning normally, however, the
|
||||
// allocations may have been preset by the adder in which case they
|
||||
// need to be respected. Whenever allocations are set. We don't
|
||||
// re-allocate.
|
||||
if len(pin.Allocations) == 0 {
|
||||
allocs, err := c.allocate(
|
||||
ctx,
|
||||
pin.Cid,
|
||||
|
@ -1355,12 +1399,6 @@ func (c *Cluster) pin(
|
|||
return pin, false, err
|
||||
}
|
||||
pin.Allocations = allocs
|
||||
|
||||
// Equals can handle nil objects.
|
||||
if curr, _ := c.PinGet(ctx, pin.Cid); curr.Equals(pin) {
|
||||
// skip pinning
|
||||
logger.Debugf("pinning %s skipped: already correctly allocated", pin.Cid)
|
||||
return pin, false, nil
|
||||
}
|
||||
|
||||
if len(pin.Allocations) == 0 {
|
||||
|
|
|
@ -1665,8 +1665,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
|
|||
ttlDelay()
|
||||
|
||||
j := rand.Intn(nClusters)
|
||||
h := test.Cid1
|
||||
_, err := clusters[j].Pin(ctx, h, api.PinOptions{})
|
||||
_, err := clusters[j].Pin(ctx, test.Cid1, api.PinOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -1679,7 +1678,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
|
|||
|
||||
waitForLeaderAndMetrics(t, clusters)
|
||||
|
||||
_, err = clusters[2].Pin(ctx, h, api.PinOptions{})
|
||||
_, err = clusters[2].Pin(ctx, test.Cid2, api.PinOptions{})
|
||||
if err == nil {
|
||||
t.Fatal("expected an error")
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package ipfscluster
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -395,15 +396,13 @@ func TestClustersPeerRemoveLeader(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
||||
// This test is testing that the peers are vacated upon
|
||||
// removal.
|
||||
|
||||
ctx := context.Background()
|
||||
clusters, mocks := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mocks)
|
||||
|
||||
if consensus == "crdt" {
|
||||
t.Log("FIXME when re-alloc changes come through")
|
||||
return
|
||||
}
|
||||
|
||||
if len(clusters) < 3 {
|
||||
t.Skip("test needs at least 3 clusters")
|
||||
}
|
||||
|
@ -415,31 +414,33 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
}
|
||||
|
||||
// We choose to remove the leader, to make things even more interesting
|
||||
leaderID, err := clusters[0].consensus.Leader(ctx)
|
||||
chosenID, err := clusters[0].consensus.Leader(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
// choose a random peer
|
||||
i := rand.Intn(nClusters)
|
||||
chosenID = clusters[i].host.ID()
|
||||
}
|
||||
|
||||
var leader *Cluster
|
||||
var leaderi int
|
||||
var chosen *Cluster
|
||||
var chosenIndex int
|
||||
for i, cl := range clusters {
|
||||
if id := cl.ID(ctx).ID; id == leaderID {
|
||||
leader = cl
|
||||
leaderi = i
|
||||
if id := cl.ID(ctx).ID; id == chosenID {
|
||||
chosen = cl
|
||||
chosenIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if leader == nil {
|
||||
t.Fatal("did not find a leader?")
|
||||
if chosen == nil {
|
||||
t.Fatal("did not get to choose a peer?")
|
||||
}
|
||||
|
||||
leaderMock := mocks[leaderi]
|
||||
chosenMock := mocks[chosenIndex]
|
||||
|
||||
// Remove leader from set
|
||||
clusters = append(clusters[:leaderi], clusters[leaderi+1:]...)
|
||||
mocks = append(mocks[:leaderi], mocks[leaderi+1:]...)
|
||||
defer leader.Shutdown(ctx)
|
||||
defer leaderMock.Close()
|
||||
// Remove the chosen peer from set
|
||||
clusters = append(clusters[:chosenIndex], clusters[chosenIndex+1:]...)
|
||||
mocks = append(mocks[:chosenIndex], mocks[chosenIndex+1:]...)
|
||||
defer chosen.Shutdown(ctx)
|
||||
defer chosenMock.Close()
|
||||
|
||||
prefix := test.Cid1.Prefix()
|
||||
|
||||
|
@ -448,7 +449,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
for i := 0; i < nClusters; i++ {
|
||||
h, err := prefix.Sum(randomBytes())
|
||||
checkErr(t, err)
|
||||
_, err = leader.Pin(ctx, h, api.PinOptions{})
|
||||
_, err = chosen.Pin(ctx, h, api.PinOptions{})
|
||||
checkErr(t, err)
|
||||
ttlDelay()
|
||||
}
|
||||
|
@ -457,10 +458,10 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
|
||||
// At this point, all peers must have nClusters -1 pins
|
||||
// associated to them.
|
||||
// Find out which pins are associated to the leader.
|
||||
// Find out which pins are associated to the chosen peer.
|
||||
interestingCids := []cid.Cid{}
|
||||
|
||||
pins, err := leader.Pins(ctx)
|
||||
pins, err := chosen.Pins(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -468,23 +469,20 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
t.Fatal("expected number of tracked pins to be nClusters")
|
||||
}
|
||||
for _, p := range pins {
|
||||
if containsPeer(p.Allocations, leaderID) {
|
||||
//t.Logf("%s pins %s", leaderID, p.Cid)
|
||||
if containsPeer(p.Allocations, chosenID) {
|
||||
//t.Logf("%s pins %s", chosenID, p.Cid)
|
||||
interestingCids = append(interestingCids, p.Cid)
|
||||
}
|
||||
}
|
||||
|
||||
if len(interestingCids) != nClusters-1 {
|
||||
//t.Fatal("The number of allocated Cids is not expected")
|
||||
t.Fatalf("Expected %d allocated CIDs but got %d", nClusters-1,
|
||||
len(interestingCids))
|
||||
}
|
||||
|
||||
// Now the leader removes itself
|
||||
err = leader.PeerRemove(ctx, leaderID)
|
||||
if err != nil {
|
||||
t.Fatal("error removing peer:", err)
|
||||
}
|
||||
// Now the chosen removes itself. Ignoring errors as they will
|
||||
// be caught below and crdt does error here.
|
||||
chosen.PeerRemove(ctx, chosenID)
|
||||
|
||||
delay()
|
||||
waitForLeaderAndMetrics(t, clusters)
|
||||
|
@ -496,7 +494,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal("error getting the new allocations for", icid)
|
||||
}
|
||||
if containsPeer(newPin.Allocations, leaderID) {
|
||||
if containsPeer(newPin.Allocations, chosenID) {
|
||||
t.Fatal("pin should not be allocated to the removed peer")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user