Re-allocate pins on peer removal

PeerRm now triggers re-pinning of all the Cids allocated to
the removed peer.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-07-03 17:45:22 +02:00
parent 1d81a1fedc
commit faa755f43a
5 changed files with 149 additions and 60 deletions

View File

@ -305,7 +305,7 @@ func (c *Cluster) alertsHandler() {
}
}
// find all Cids pinned to a given peer and triggers re-pins on them
// find all Cids pinned to a given peer and triggers re-pins on them.
func (c *Cluster) repinFromPeer(p peer.ID) {
cState, err := c.consensus.State()
if err != nil {
@ -317,7 +317,7 @@ func (c *Cluster) repinFromPeer(p peer.ID) {
for _, alloc := range pin.Allocations {
if alloc == p { // found pin allocated to node
logger.Infof("repinning %s out of %s", pin.Cid, p.Pretty())
c.Pin(pin)
c.pin(pin, []peer.ID{p}) // pin blacklisting this peer
}
}
}
@ -565,6 +565,11 @@ func (c *Cluster) PeerRemove(pid peer.ID) error {
return fmt.Errorf("%s is not a peer", pid.Pretty())
}
// The peer is no longer among the peer set, so we can re-allocate
// any CIDs associated to it.
logger.Infof("Re-allocating all CIDs directly associated to %s", pid)
c.repinFromPeer(pid)
err := c.consensus.LogRmPeer(pid)
if err != nil {
logger.Error(err)
@ -780,6 +785,12 @@ func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) {
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
func (c *Cluster) Pin(pin api.Pin) error {
return c.pin(pin, []peer.ID{})
}
// pin performs the actual pinning and supports a blacklist to be
// able to evacuate a node.
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) error {
rpl := pin.ReplicationFactor
if rpl == 0 {
rpl = c.config.ReplicationFactor
@ -792,7 +803,7 @@ func (c *Cluster) Pin(pin api.Pin) error {
pin.Allocations = []peer.ID{}
logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid)
case rpl > 0:
allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor)
allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor, blacklist)
if err != nil {
return err
}
@ -1037,38 +1048,32 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) {
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with a positive replication factor
func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) {
func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer.ID, error) {
if repl <= 0 {
return nil, errors.New("cannot decide allocation for replication factor <= 0")
}
// Figure out who is currently holding this
var currentlyAllocatedPeers []peer.ID
var pinAllocations []peer.ID
st, err := c.consensus.State()
if err != nil {
// no state we assume it is empty. If there was other
// problem, we would fail to commit anyway.
currentlyAllocatedPeers = []peer.ID{}
pinAllocations = []peer.ID{}
} else {
pin := st.Get(hash)
currentlyAllocatedPeers = pin.Allocations
pinAllocations = pin.Allocations
}
// initialize a candidate metrics map with all current clusterPeers
// (albeit with invalid metrics)
clusterPeers := c.peerManager.peers()
metricsMap := make(map[peer.ID]api.Metric)
for _, cp := range clusterPeers {
metricsMap[cp] = api.Metric{Valid: false}
}
// Request latest metrics logged by informers from the leader
// Get the LastMetrics from the leading monitor. They are the last
// valid metrics from current cluster peers
var metrics []api.Metric
metricName := c.informer.Name()
l, err := c.consensus.Leader()
if err != nil {
return nil, errors.New("cannot determine leading Monitor")
}
var metrics []api.Metric
err = c.rpcClient.Call(l,
"Cluster", "PeerMonitorLastMetrics",
metricName,
@ -1077,56 +1082,40 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) {
return nil, err
}
// put metrics in the metricsMap if we have an entry for the peer
// (means it's a current cluster peer)
// We must divide the metrics between current and candidates
current := make(map[peer.ID]api.Metric)
candidates := make(map[peer.ID]api.Metric)
validAllocations := make([]peer.ID, 0, len(pinAllocations))
for _, m := range metrics {
_, ok := metricsMap[m.Peer]
if !ok {
if m.Discard() || containsPeer(blacklist, m.Peer) {
// blacklisted peers do not exist for us
continue
}
metricsMap[m.Peer] = m
}
// Remove any invalid metric. This will clear any cluster peers
// for which we did not receive metrics.
for p, m := range metricsMap {
if m.Discard() {
delete(metricsMap, p)
} else if containsPeer(pinAllocations, m.Peer) {
current[m.Peer] = m
validAllocations = append(validAllocations, m.Peer)
} else {
candidates[m.Peer] = m
}
}
// Move metrics from currentlyAllocatedPeers to a new map
// and record which peers have valid allocations
currentlyAllocatedPeersMetrics := make(map[peer.ID]api.Metric)
validAllocations := make([]peer.ID, 0)
for _, p := range currentlyAllocatedPeers {
m, ok := metricsMap[p]
if !ok {
continue
}
currentlyAllocatedPeersMetrics[p] = m
delete(metricsMap, p)
validAllocations = append(validAllocations, p)
currentValid := len(validAllocations)
candidatesValid := len(candidates)
needed := repl - currentValid
}
// how many allocations do we need (note we will re-allocate if we did
// not receive good metrics for currently allocated peeers)
needed := repl - len(validAllocations)
logger.Debugf("allocate: Valid allocations: %s", validAllocations)
logger.Debugf("allocate: Valid allocations: %d", currentValid)
logger.Debugf("allocate: Valid candidates: %d", candidatesValid)
logger.Debugf("allocate: Needed: %d", needed)
// If needed == 0, we don't need anything. If needed < 0, we are
// reducing the replication factor
switch {
// set the allocations to the needed ones
case needed <= 0:
case needed <= 0: // set the allocations to the needed ones
return validAllocations[0 : len(validAllocations)+needed], nil
default:
// Allocate is called with currentAllocMetrics which contains
// only currentlyAllocatedPeers when they have provided valid metrics.
candidateAllocs, err := c.allocator.Allocate(hash, currentlyAllocatedPeersMetrics, metricsMap)
// this will return candidate peers in order of
// preference according to the allocator.
candidateAllocs, err := c.allocator.Allocate(hash, current, candidates)
if err != nil {
return nil, logError(err.Error())
}
@ -1135,8 +1124,9 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) {
// we don't have enough peers to pin
if len(candidateAllocs) < needed {
err = logError("cannot find enough allocations for this CID: needed: %d. Got: %s",
needed, candidateAllocs)
err = logError(
"cannot find enough allocations for %s. Needed: %d. Got: %s",
hash, needed, candidateAllocs)
return nil, err
}

View File

@ -126,9 +126,8 @@ Static clusters expect every member peer to be up and responding. Otherwise, the
We call a dynamic cluster, that in which the set of `cluster_peers` changes. Nodes are bootstrapped to existing cluster peers, the "peer add" and "peer rm" operations are used and/or the `leave_on_shutdown` configuration option is enabled. This option allows a node to abandon the consensus membership when shutting down. Thus reducing the cluster size by one.
Dynamic clusters allow greater flexibility at the cost of stablity. Join and leave operations are tricky as they change the consensus membership and they are likely to create bad situations in unhealthy clusters.
Dynamic clusters allow greater flexibility at the cost of stablity. Join and leave operations are tricky as they change the consensus membership and they are likely to create bad situations in unhealthy clusters. Also, bear in mind than removing a peer from the cluster will trigger a re-allocation of the pins that were associated to it. If the replication factor was 1, it is recommended to keep the ipfs daemon running so the content can actually be copied out to a daemon managed by a different peer.
Also, *currently*, pins allocated to a peer that left the cluster are not re-allocated to new peers.
The best way to diagnose and fix a broken cluster membership issue is to:
@ -142,6 +141,8 @@ wrong peer count, stop them, fix `cluster_peers` manually and restart them.
* In cases were no Leader can be elected, then manual stop and editing of `cluster_peers` is necessary.
## Pinning an item
`ipfs-cluster-ctl pin add <cid>` will tell ipfs-cluster to pin a CID.

View File

@ -208,8 +208,21 @@ func (mon *StdPeerMonitor) LogMetric(m api.Metric) {
// return metric
// }
// LastMetrics returns last known VALID metrics of a given type
// LastMetrics returns last known VALID metrics of a given type. A metric
// is only valid if it has not expired and belongs to a current cluster peer.
func (mon *StdPeerMonitor) LastMetrics(name string) []api.Metric {
// Ger current list of peers
var peers []peer.ID
err := mon.rpcClient.Call("",
"Cluster",
"PeerManagerPeers",
struct{}{},
&peers)
if err != nil {
logger.Errorf("LastMetrics could not list peers: %s", err)
return []api.Metric{}
}
mon.metricsMux.RLock()
defer mon.metricsMux.RUnlock()
@ -221,12 +234,18 @@ func (mon *StdPeerMonitor) LastMetrics(name string) []api.Metric {
metrics := make([]api.Metric, 0, len(mbyp))
for _, peerMetrics := range mbyp {
// only show metrics for current set of peers
for _, peer := range peers {
peerMetrics, ok := mbyp[peer]
if !ok {
continue
}
last, err := peerMetrics.latest()
if err != nil || last.Discard() {
continue
}
metrics = append(metrics, last)
}
return metrics
}

View File

@ -206,6 +206,76 @@ func TestClusterPeerRemoveSelf(t *testing.T) {
}
}
func TestClusterPeerRemoveReallocsPins(t *testing.T) {
clusters, mocks := createClusters(t)
defer shutdownClusters(t, clusters, mocks)
if len(clusters) < 3 {
t.Skip("test needs at least 3 clusters")
}
// Adjust the replication factor for re-allocation
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
}
cpeer := clusters[0]
clusterID := cpeer.ID().ID
tmpCid, _ := cid.Decode(test.TestCid1)
prefix := tmpCid.Prefix()
// Pin nCluster random pins. This ensures each peer will
// pin the same number of Cids.
for i := 0; i < nClusters; i++ {
h, err := prefix.Sum(randomBytes())
checkErr(t, err)
err = cpeer.Pin(api.PinCid(h))
checkErr(t, err)
time.Sleep(time.Second)
}
delay()
// At this point, all peers must have 1 pin associated to them.
// Find out which pin is associated to cpeer.
interestingCids := []*cid.Cid{}
pins := cpeer.Pins()
if len(pins) != nClusters {
t.Fatal("expected number of tracked pins to be nClusters")
}
for _, p := range pins {
if containsPeer(p.Allocations, clusterID) {
//t.Logf("%s pins %s", clusterID, p.Cid)
interestingCids = append(interestingCids, p.Cid)
}
}
if len(interestingCids) != nClusters-1 {
t.Fatal("The number of allocated Cids is not expected")
}
// Now remove cluster peer
err := clusters[0].PeerRemove(clusterID)
if err != nil {
t.Fatal("error removing peer:", err)
}
delay()
for _, icid := range interestingCids {
// Now check that the allocations are new.
newPin, err := clusters[0].PinGet(icid)
if err != nil {
t.Fatal("error getting the new allocations for", icid)
}
if containsPeer(newPin.Allocations, clusterID) {
t.Fatal("pin should not be allocated to the removed peer")
}
}
}
func TestClustersPeerJoin(t *testing.T) {
clusters, mocks := peerManagerClusters(t)
defer shutdownClusters(t, clusters, mocks)

View File

@ -144,3 +144,12 @@ func logError(fmtstr string, args ...interface{}) error {
logger.Error(msg)
return errors.New(msg)
}
func containsPeer(list []peer.ID, peer peer.ID) bool {
for _, p := range list {
if p == peer {
return true
}
}
return false
}