Merge pull request #1127 from ipfs/fix/1064-repin-followers
Fix #1064: Make the peer closest to the CID in charge of repinning
This commit is contained in:
commit
b0dcfe68c7
69
cluster.go
69
cluster.go
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"mime/multipart"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -418,13 +417,15 @@ func (c *Cluster) alertsHandler() {
|
|||
logger.Warn(err)
|
||||
return
|
||||
}
|
||||
|
||||
distance, err := c.distances(c.ctx, alrt.Peer)
|
||||
if err != nil {
|
||||
logger.Warn(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, pin := range list {
|
||||
if len(pin.Allocations) == 1 && containsPeer(pin.Allocations, alrt.Peer) {
|
||||
logger.Warn("a pin with only one allocation cannot be repinned")
|
||||
logger.Warn("to make repinning possible, pin with a replication factor of 2+")
|
||||
continue
|
||||
}
|
||||
if c.shouldPeerRepinCid(alrt.Peer, pin) {
|
||||
if containsPeer(pin.Allocations, alrt.Peer) && distance.isClosest(pin.Cid) {
|
||||
c.repinFromPeer(c.ctx, alrt.Peer, pin)
|
||||
}
|
||||
}
|
||||
|
@ -432,25 +433,6 @@ func (c *Cluster) alertsHandler() {
|
|||
}
|
||||
}
|
||||
|
||||
// shouldPeerRepinCid returns true if the current peer is the top of the
|
||||
// allocs list. The failed peer is ignored, i.e. if current peer is
|
||||
// second and the failed peer is first, the function will still
|
||||
// return true.
|
||||
func (c *Cluster) shouldPeerRepinCid(failed peer.ID, pin *api.Pin) bool {
|
||||
if containsPeer(pin.Allocations, failed) && containsPeer(pin.Allocations, c.id) {
|
||||
allocs := peer.IDSlice(pin.Allocations)
|
||||
sort.Sort(allocs)
|
||||
if allocs[0] == c.id {
|
||||
return true
|
||||
}
|
||||
|
||||
if allocs[1] == c.id && allocs[0] == failed {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// detects any changes in the peerset and saves the configuration. When it
|
||||
// detects that we have been removed from the peerset, it shuts down this peer.
|
||||
func (c *Cluster) watchPeers() {
|
||||
|
@ -1007,6 +989,22 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Distances returns a distance checker using current trusted peers.
|
||||
// It can optionally receive a peer ID to exclude from the checks.
|
||||
func (c *Cluster) distances(ctx context.Context, exclude peer.ID) (*distanceChecker, error) {
|
||||
trustedPeers, err := c.getTrustedPeers(ctx, exclude)
|
||||
if err != nil {
|
||||
logger.Error("could not get trusted peers:", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &distanceChecker{
|
||||
local: c.id,
|
||||
otherPeers: trustedPeers,
|
||||
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// StateSync performs maintenance tasks on the global state that require
|
||||
// looping through all the items. It is triggered automatically on
|
||||
// StateSyncInterval. Currently it:
|
||||
|
@ -1036,20 +1034,14 @@ func (c *Cluster) StateSync(ctx context.Context) error {
|
|||
// other trusted peers. We cannot know if our peer ID is trusted by
|
||||
// other peers in the Cluster. This assumes yes. Setting FollowerMode
|
||||
// is a way to assume the opposite and skip this completely.
|
||||
trustedPeers, err := c.getTrustedPeers(ctx)
|
||||
distance, err := c.distances(ctx, "")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
checker := distanceChecker{
|
||||
local: c.id,
|
||||
otherPeers: trustedPeers,
|
||||
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
|
||||
return err // could not list peers
|
||||
}
|
||||
|
||||
// Unpin expired items when we are the closest peer to them.
|
||||
for _, p := range clusterPins {
|
||||
if p.ExpiredAt(timeNow) && checker.isClosest(p.Cid) {
|
||||
if p.ExpiredAt(timeNow) && distance.isClosest(p.Cid) {
|
||||
logger.Infof("Unpinning %s: pin expired at %s", p.Cid, p.ExpireAt)
|
||||
if _, err := c.Unpin(ctx, p.Cid); err != nil {
|
||||
logger.Error(err)
|
||||
|
@ -1627,8 +1619,9 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID {
|
|||
return peers
|
||||
}
|
||||
|
||||
// getTrustedPeers gives listed of trusted peers except the current peer.
|
||||
func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
|
||||
// getTrustedPeers gives listed of trusted peers except the current peer and
|
||||
// the excluded peer if provided.
|
||||
func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer.ID, error) {
|
||||
peers, err := c.consensus.Peers(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1637,7 +1630,7 @@ func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
|
|||
trustedPeers := make([]peer.ID, 0, len(peers))
|
||||
|
||||
for _, p := range peers {
|
||||
if p == c.id || !c.consensus.IsTrustedPeer(ctx, p) {
|
||||
if p == c.id || p == exclude || !c.consensus.IsTrustedPeer(ctx, p) {
|
||||
continue
|
||||
}
|
||||
trustedPeers = append(trustedPeers, p)
|
||||
|
|
Loading…
Reference in New Issue
Block a user