ipfs-cluster/allocate.go
Hector Sanjuan dcfc962f24 Feat #277: Address review comments
License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
2018-01-19 22:24:03 +01:00

203 lines
6.6 KiB
Go

package ipfscluster
import (
"errors"
"fmt"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
)
// This file gathers allocation logic used when pinning or re-pinning
// to find which peers should be allocated to a Cid. Allocation is constrained
// by ReplicationFactorMin and ReplicationFactorMax parametres obtained
// from the Pin object.
//The allocation
// process has several steps:
//
// * Find which peers are pinning a CID
// * Obtain the last values for the configured informer metrics from the
// monitor component
// * Divide the metrics between "current" (peers already pinning the CID)
// and "candidates" (peers that could pin the CID), as long as their metrics
// are valid.
// * Given the candidates:
// * Check if we are overpinning an item
// * Check if there are not enough candidates for the "needed" replication
// factor.
// * If there are enough candidates:
// * Call the configured allocator, which sorts the candidates (and
// may veto some depending on the allocation strategy.
// * The allocator returns a list of final candidate peers sorted by
// order of preference.
// * Take as many final candidates from the list as we can, until
// ReplicationFactorMax is reached. Error if there are less than
// ReplicationFactorMin.
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with valid replicationFactors (rplMin and rplMax
// which are positive and rplMin <= rplMax).
// It always returns allocations, but if no new allocations are needed,
// it will return the current ones. Note that allocate() does not take
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID) ([]peer.ID, error) {
// Figure out who is holding the CID
currentPin, _ := c.getCurrentPin(hash)
currentAllocs := currentPin.Allocations
metrics, err := c.getInformerMetrics()
if err != nil {
return nil, err
}
currentMetrics := make(map[peer.ID]api.Metric)
candidatesMetrics := make(map[peer.ID]api.Metric)
// Divide metrics between current and candidates.
for _, m := range metrics {
switch {
case m.Discard() || containsPeer(blacklist, m.Peer):
// discard peers with invalid metrics and
// those in the blacklist
continue
case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m
default:
candidatesMetrics[m.Peer] = m
}
}
newAllocs, err := c.obtainAllocations(hash,
rplMin,
rplMax,
currentMetrics,
candidatesMetrics)
if err != nil {
return newAllocs, err
}
if newAllocs == nil {
newAllocs = currentAllocs
}
return newAllocs, nil
}
// getCurrentPin returns the Pin object for h, if we can find one
// or builds an empty one.
func (c *Cluster) getCurrentPin(h *cid.Cid) (api.Pin, bool) {
st, err := c.consensus.State()
if err != nil {
return api.PinCid(h), false
}
ok := st.Has(h)
return st.Get(h), ok
}
// getInformerMetrics returns the MonitorLastMetrics() for the
// configured informer.
func (c *Cluster) getInformerMetrics() ([]api.Metric, error) {
var metrics []api.Metric
metricName := c.informer.Name()
l, err := c.consensus.Leader()
if err != nil {
return nil, errors.New("cannot determine leading Monitor")
}
err = c.rpcClient.Call(l,
"Cluster", "PeerMonitorLastMetrics",
metricName,
&metrics)
if err != nil {
return nil, err
}
return metrics, nil
}
// allocationError logs an allocation error
func allocationError(hash *cid.Cid, needed, wanted int, candidatesValid []peer.ID) error {
logger.Errorf("Not enough candidates to allocate %s:", hash)
logger.Errorf(" Needed: %d", needed)
logger.Errorf(" Wanted: %d", wanted)
logger.Errorf(" Valid candidates: %d:", len(candidatesValid))
for _, c := range candidatesValid {
logger.Errorf(" - %s", c.Pretty())
}
errorMsg := "not enough peers to allocate CID. "
errorMsg += fmt.Sprintf("Needed at least: %d. ", needed)
errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted)
errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid))
errorMsg += "See logs for more info."
return errors.New(errorMsg)
}
func (c *Cluster) obtainAllocations(
hash *cid.Cid,
rplMin, rplMax int,
currentValidMetrics, candidatesMetrics map[peer.ID]api.Metric) ([]peer.ID, error) {
// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
for k := range currentValidMetrics {
validAllocations = append(validAllocations, k)
}
nCurrentValid := len(validAllocations)
nCandidatesValid := len(candidatesMetrics)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid)
logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid)
logger.Debugf("obtainAllocations: Needed: %d", needed)
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
// Reminder: rplMin <= rplMax AND >0
if wanted < 0 { // alocations above maximum threshold: drop some
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
return validAllocations[0 : len(validAllocations)+wanted], nil
}
if needed <= 0 { // allocations are above minimal threshold
// We don't provide any new allocations
return nil, nil
}
if nCandidatesValid < needed { // not enough candidates
candidatesValid := []peer.ID{}
for k := range candidatesMetrics {
candidatesValid = append(candidatesValid, k)
}
return nil, allocationError(hash, needed, wanted, candidatesValid)
}
// We can allocate from this point. Use the allocator to decide
// on the priority of candidates grab as many as "wanted"
// the allocator returns a list of peers ordered by priority
finalAllocs, err := c.allocator.Allocate(
hash, currentValidMetrics, candidatesMetrics)
if err != nil {
return nil, logError(err.Error())
}
logger.Debugf("obtainAllocations: allocate(): %s", finalAllocs)
// check that we have enough as the allocator may have returned
// less candidates than provided.
if got := len(finalAllocs); got < needed {
return nil, allocationError(hash, needed, wanted, finalAllocs)
}
allocationsToUse := minInt(wanted, len(finalAllocs))
// the final result is the currently valid allocations
// along with the ones provided by the allocator
return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil
}