From faa755f43a204eae555535342f06a8139b52485c Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 3 Jul 2017 17:45:22 +0200 Subject: [PATCH] Re-allocate pins on peer removal PeerRm now triggers re-pinning of all the Cids allocated to the removed peer. License: MIT Signed-off-by: Hector Sanjuan --- cluster.go | 102 +++++++++++++++------------------- docs/ipfs-cluster-guide.md | 5 +- monitor/basic/peer_monitor.go | 23 +++++++- peer_manager_test.go | 70 +++++++++++++++++++++++ util.go | 9 +++ 5 files changed, 149 insertions(+), 60 deletions(-) diff --git a/cluster.go b/cluster.go index b3d94dcb..04c7a65d 100644 --- a/cluster.go +++ b/cluster.go @@ -305,7 +305,7 @@ func (c *Cluster) alertsHandler() { } } -// find all Cids pinned to a given peer and triggers re-pins on them +// find all Cids pinned to a given peer and triggers re-pins on them. func (c *Cluster) repinFromPeer(p peer.ID) { cState, err := c.consensus.State() if err != nil { @@ -317,7 +317,7 @@ func (c *Cluster) repinFromPeer(p peer.ID) { for _, alloc := range pin.Allocations { if alloc == p { // found pin allocated to node logger.Infof("repinning %s out of %s", pin.Cid, p.Pretty()) - c.Pin(pin) + c.pin(pin, []peer.ID{p}) // pin blacklisting this peer } } } @@ -565,6 +565,11 @@ func (c *Cluster) PeerRemove(pid peer.ID) error { return fmt.Errorf("%s is not a peer", pid.Pretty()) } + // The peer is no longer among the peer set, so we can re-allocate + // any CIDs associated to it. + logger.Infof("Re-allocating all CIDs directly associated to %s", pid) + c.repinFromPeer(pid) + err := c.consensus.LogRmPeer(pid) if err != nil { logger.Error(err) @@ -780,6 +785,12 @@ func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) { // to the global state. Pin does not reflect the success or failure // of underlying IPFS daemon pinning operations. func (c *Cluster) Pin(pin api.Pin) error { + return c.pin(pin, []peer.ID{}) +} + +// pin performs the actual pinning and supports a blacklist to be +// able to evacuate a node. +func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) error { rpl := pin.ReplicationFactor if rpl == 0 { rpl = c.config.ReplicationFactor @@ -792,7 +803,7 @@ func (c *Cluster) Pin(pin api.Pin) error { pin.Allocations = []peer.ID{} logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid) case rpl > 0: - allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor) + allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor, blacklist) if err != nil { return err } @@ -1037,38 +1048,32 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) { // allocate finds peers to allocate a hash using the informer and the monitor // it should only be used with a positive replication factor -func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) { +func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer.ID, error) { if repl <= 0 { return nil, errors.New("cannot decide allocation for replication factor <= 0") } // Figure out who is currently holding this - var currentlyAllocatedPeers []peer.ID + var pinAllocations []peer.ID st, err := c.consensus.State() if err != nil { // no state we assume it is empty. If there was other // problem, we would fail to commit anyway. - currentlyAllocatedPeers = []peer.ID{} + pinAllocations = []peer.ID{} } else { pin := st.Get(hash) - currentlyAllocatedPeers = pin.Allocations + pinAllocations = pin.Allocations } - // initialize a candidate metrics map with all current clusterPeers - // (albeit with invalid metrics) - clusterPeers := c.peerManager.peers() - metricsMap := make(map[peer.ID]api.Metric) - for _, cp := range clusterPeers { - metricsMap[cp] = api.Metric{Valid: false} - } - - // Request latest metrics logged by informers from the leader + // Get the LastMetrics from the leading monitor. They are the last + // valid metrics from current cluster peers + var metrics []api.Metric metricName := c.informer.Name() l, err := c.consensus.Leader() if err != nil { return nil, errors.New("cannot determine leading Monitor") } - var metrics []api.Metric + err = c.rpcClient.Call(l, "Cluster", "PeerMonitorLastMetrics", metricName, @@ -1077,56 +1082,40 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) { return nil, err } - // put metrics in the metricsMap if we have an entry for the peer - // (means it's a current cluster peer) + // We must divide the metrics between current and candidates + current := make(map[peer.ID]api.Metric) + candidates := make(map[peer.ID]api.Metric) + validAllocations := make([]peer.ID, 0, len(pinAllocations)) for _, m := range metrics { - _, ok := metricsMap[m.Peer] - if !ok { + if m.Discard() || containsPeer(blacklist, m.Peer) { + // blacklisted peers do not exist for us continue - } - metricsMap[m.Peer] = m - } - - // Remove any invalid metric. This will clear any cluster peers - // for which we did not receive metrics. - for p, m := range metricsMap { - if m.Discard() { - delete(metricsMap, p) + } else if containsPeer(pinAllocations, m.Peer) { + current[m.Peer] = m + validAllocations = append(validAllocations, m.Peer) + } else { + candidates[m.Peer] = m } } - // Move metrics from currentlyAllocatedPeers to a new map - // and record which peers have valid allocations - currentlyAllocatedPeersMetrics := make(map[peer.ID]api.Metric) - validAllocations := make([]peer.ID, 0) - for _, p := range currentlyAllocatedPeers { - m, ok := metricsMap[p] - if !ok { - continue - } - currentlyAllocatedPeersMetrics[p] = m - delete(metricsMap, p) - validAllocations = append(validAllocations, p) + currentValid := len(validAllocations) + candidatesValid := len(candidates) + needed := repl - currentValid - } - - // how many allocations do we need (note we will re-allocate if we did - // not receive good metrics for currently allocated peeers) - needed := repl - len(validAllocations) - - logger.Debugf("allocate: Valid allocations: %s", validAllocations) + logger.Debugf("allocate: Valid allocations: %d", currentValid) + logger.Debugf("allocate: Valid candidates: %d", candidatesValid) logger.Debugf("allocate: Needed: %d", needed) // If needed == 0, we don't need anything. If needed < 0, we are // reducing the replication factor switch { - // set the allocations to the needed ones - case needed <= 0: + + case needed <= 0: // set the allocations to the needed ones return validAllocations[0 : len(validAllocations)+needed], nil default: - // Allocate is called with currentAllocMetrics which contains - // only currentlyAllocatedPeers when they have provided valid metrics. - candidateAllocs, err := c.allocator.Allocate(hash, currentlyAllocatedPeersMetrics, metricsMap) + // this will return candidate peers in order of + // preference according to the allocator. + candidateAllocs, err := c.allocator.Allocate(hash, current, candidates) if err != nil { return nil, logError(err.Error()) } @@ -1135,8 +1124,9 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) { // we don't have enough peers to pin if len(candidateAllocs) < needed { - err = logError("cannot find enough allocations for this CID: needed: %d. Got: %s", - needed, candidateAllocs) + err = logError( + "cannot find enough allocations for %s. Needed: %d. Got: %s", + hash, needed, candidateAllocs) return nil, err } diff --git a/docs/ipfs-cluster-guide.md b/docs/ipfs-cluster-guide.md index 5cdbb9eb..a0a47bed 100644 --- a/docs/ipfs-cluster-guide.md +++ b/docs/ipfs-cluster-guide.md @@ -126,9 +126,8 @@ Static clusters expect every member peer to be up and responding. Otherwise, the We call a dynamic cluster, that in which the set of `cluster_peers` changes. Nodes are bootstrapped to existing cluster peers, the "peer add" and "peer rm" operations are used and/or the `leave_on_shutdown` configuration option is enabled. This option allows a node to abandon the consensus membership when shutting down. Thus reducing the cluster size by one. -Dynamic clusters allow greater flexibility at the cost of stablity. Join and leave operations are tricky as they change the consensus membership and they are likely to create bad situations in unhealthy clusters. +Dynamic clusters allow greater flexibility at the cost of stablity. Join and leave operations are tricky as they change the consensus membership and they are likely to create bad situations in unhealthy clusters. Also, bear in mind than removing a peer from the cluster will trigger a re-allocation of the pins that were associated to it. If the replication factor was 1, it is recommended to keep the ipfs daemon running so the content can actually be copied out to a daemon managed by a different peer. -Also, *currently*, pins allocated to a peer that left the cluster are not re-allocated to new peers. The best way to diagnose and fix a broken cluster membership issue is to: @@ -142,6 +141,8 @@ wrong peer count, stop them, fix `cluster_peers` manually and restart them. * In cases were no Leader can be elected, then manual stop and editing of `cluster_peers` is necessary. + + ## Pinning an item `ipfs-cluster-ctl pin add ` will tell ipfs-cluster to pin a CID. diff --git a/monitor/basic/peer_monitor.go b/monitor/basic/peer_monitor.go index f0356a74..5362f544 100644 --- a/monitor/basic/peer_monitor.go +++ b/monitor/basic/peer_monitor.go @@ -208,8 +208,21 @@ func (mon *StdPeerMonitor) LogMetric(m api.Metric) { // return metric // } -// LastMetrics returns last known VALID metrics of a given type +// LastMetrics returns last known VALID metrics of a given type. A metric +// is only valid if it has not expired and belongs to a current cluster peer. func (mon *StdPeerMonitor) LastMetrics(name string) []api.Metric { + // Ger current list of peers + var peers []peer.ID + err := mon.rpcClient.Call("", + "Cluster", + "PeerManagerPeers", + struct{}{}, + &peers) + if err != nil { + logger.Errorf("LastMetrics could not list peers: %s", err) + return []api.Metric{} + } + mon.metricsMux.RLock() defer mon.metricsMux.RUnlock() @@ -221,12 +234,18 @@ func (mon *StdPeerMonitor) LastMetrics(name string) []api.Metric { metrics := make([]api.Metric, 0, len(mbyp)) - for _, peerMetrics := range mbyp { + // only show metrics for current set of peers + for _, peer := range peers { + peerMetrics, ok := mbyp[peer] + if !ok { + continue + } last, err := peerMetrics.latest() if err != nil || last.Discard() { continue } metrics = append(metrics, last) + } return metrics } diff --git a/peer_manager_test.go b/peer_manager_test.go index c609fcad..abc37997 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -206,6 +206,76 @@ func TestClusterPeerRemoveSelf(t *testing.T) { } } +func TestClusterPeerRemoveReallocsPins(t *testing.T) { + clusters, mocks := createClusters(t) + defer shutdownClusters(t, clusters, mocks) + + if len(clusters) < 3 { + t.Skip("test needs at least 3 clusters") + } + + // Adjust the replication factor for re-allocation + for _, c := range clusters { + c.config.ReplicationFactor = nClusters - 1 + } + + cpeer := clusters[0] + clusterID := cpeer.ID().ID + + tmpCid, _ := cid.Decode(test.TestCid1) + prefix := tmpCid.Prefix() + + // Pin nCluster random pins. This ensures each peer will + // pin the same number of Cids. + for i := 0; i < nClusters; i++ { + h, err := prefix.Sum(randomBytes()) + checkErr(t, err) + err = cpeer.Pin(api.PinCid(h)) + checkErr(t, err) + time.Sleep(time.Second) + } + + delay() + + // At this point, all peers must have 1 pin associated to them. + // Find out which pin is associated to cpeer. + interestingCids := []*cid.Cid{} + + pins := cpeer.Pins() + if len(pins) != nClusters { + t.Fatal("expected number of tracked pins to be nClusters") + } + for _, p := range pins { + if containsPeer(p.Allocations, clusterID) { + //t.Logf("%s pins %s", clusterID, p.Cid) + interestingCids = append(interestingCids, p.Cid) + } + } + + if len(interestingCids) != nClusters-1 { + t.Fatal("The number of allocated Cids is not expected") + } + + // Now remove cluster peer + err := clusters[0].PeerRemove(clusterID) + if err != nil { + t.Fatal("error removing peer:", err) + } + + delay() + + for _, icid := range interestingCids { + // Now check that the allocations are new. + newPin, err := clusters[0].PinGet(icid) + if err != nil { + t.Fatal("error getting the new allocations for", icid) + } + if containsPeer(newPin.Allocations, clusterID) { + t.Fatal("pin should not be allocated to the removed peer") + } + } +} + func TestClustersPeerJoin(t *testing.T) { clusters, mocks := peerManagerClusters(t) defer shutdownClusters(t, clusters, mocks) diff --git a/util.go b/util.go index 05e4851f..193b1df2 100644 --- a/util.go +++ b/util.go @@ -144,3 +144,12 @@ func logError(fmtstr string, args ...interface{}) error { logger.Error(msg) return errors.New(msg) } + +func containsPeer(list []peer.ID, peer peer.ID) bool { + for _, p := range list { + if p == peer { + return true + } + } + return false +}