30fd5ee6a0
This introduces the possiblity of running Cluster with multiple informer components. The first one in the list is the used for "allocations". This is just groundwork for working with several informers in the future.
207 lines
6.7 KiB
Go
207 lines
6.7 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"go.opencensus.io/trace"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
)
|
|
|
|
// This file gathers allocation logic used when pinning or re-pinning
|
|
// to find which peers should be allocated to a Cid. Allocation is constrained
|
|
// by ReplicationFactorMin and ReplicationFactorMax parameters obtained
|
|
// from the Pin object.
|
|
|
|
// The allocation process has several steps:
|
|
//
|
|
// * Find which peers are pinning a CID
|
|
// * Obtain the last values for the configured informer metrics from the
|
|
// monitor component
|
|
// * Divide the metrics between "current" (peers already pinning the CID)
|
|
// and "candidates" (peers that could pin the CID), as long as their metrics
|
|
// are valid.
|
|
// * Given the candidates:
|
|
// * Check if we are overpinning an item
|
|
// * Check if there are not enough candidates for the "needed" replication
|
|
// factor.
|
|
// * If there are enough candidates:
|
|
// * Call the configured allocator, which sorts the candidates (and
|
|
// may veto some depending on the allocation strategy.
|
|
// * The allocator returns a list of final candidate peers sorted by
|
|
// order of preference.
|
|
// * Take as many final candidates from the list as we can, until
|
|
// ReplicationFactorMax is reached. Error if there are less than
|
|
// ReplicationFactorMin.
|
|
|
|
// allocate finds peers to allocate a hash using the informer and the monitor
|
|
// it should only be used with valid replicationFactors (if rplMin and rplMax
|
|
// are > 0, then rplMin <= rplMax).
|
|
// It always returns allocations, but if no new allocations are needed,
|
|
// it will return the current ones. Note that allocate() does not take
|
|
// into account if the given CID was previously in a "pin everywhere" mode,
|
|
// and will consider such Pins as currently unallocated ones, providing
|
|
// new allocations as available.
|
|
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
|
|
defer span.End()
|
|
|
|
if (rplMin + rplMax) == 0 {
|
|
return nil, fmt.Errorf("bad replication factors: %d/%d", rplMin, rplMax)
|
|
}
|
|
|
|
if rplMin < 0 && rplMax < 0 { // allocate everywhere
|
|
return []peer.ID{}, nil
|
|
}
|
|
|
|
// Figure out who is holding the CID
|
|
var currentAllocs []peer.ID
|
|
currentPin, err := c.PinGet(ctx, hash)
|
|
if err == nil {
|
|
currentAllocs = currentPin.Allocations
|
|
}
|
|
metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name())
|
|
|
|
currentMetrics := make(map[peer.ID]*api.Metric)
|
|
candidatesMetrics := make(map[peer.ID]*api.Metric)
|
|
priorityMetrics := make(map[peer.ID]*api.Metric)
|
|
|
|
// Divide metrics between current and candidates.
|
|
// All metrics in metrics are valid (at least the
|
|
// moment they were compiled by the monitor)
|
|
for _, m := range metrics {
|
|
switch {
|
|
case containsPeer(blacklist, m.Peer):
|
|
// discard blacklisted peers
|
|
continue
|
|
case containsPeer(currentAllocs, m.Peer):
|
|
currentMetrics[m.Peer] = m
|
|
case containsPeer(prioritylist, m.Peer):
|
|
priorityMetrics[m.Peer] = m
|
|
default:
|
|
candidatesMetrics[m.Peer] = m
|
|
}
|
|
}
|
|
|
|
newAllocs, err := c.obtainAllocations(
|
|
ctx,
|
|
hash,
|
|
rplMin,
|
|
rplMax,
|
|
currentMetrics,
|
|
candidatesMetrics,
|
|
priorityMetrics,
|
|
)
|
|
if err != nil {
|
|
return newAllocs, err
|
|
}
|
|
if newAllocs == nil {
|
|
newAllocs = currentAllocs
|
|
}
|
|
return newAllocs, nil
|
|
}
|
|
|
|
// allocationError logs an allocation error
|
|
func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID) error {
|
|
logger.Errorf("Not enough candidates to allocate %s:", hash)
|
|
logger.Errorf(" Needed: %d", needed)
|
|
logger.Errorf(" Wanted: %d", wanted)
|
|
logger.Errorf(" Valid candidates: %d:", len(candidatesValid))
|
|
for _, c := range candidatesValid {
|
|
logger.Errorf(" - %s", c.Pretty())
|
|
}
|
|
errorMsg := "not enough peers to allocate CID. "
|
|
errorMsg += fmt.Sprintf("Needed at least: %d. ", needed)
|
|
errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted)
|
|
errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid))
|
|
errorMsg += "See logs for more info."
|
|
return errors.New(errorMsg)
|
|
}
|
|
|
|
func (c *Cluster) obtainAllocations(
|
|
ctx context.Context,
|
|
hash cid.Cid,
|
|
rplMin, rplMax int,
|
|
currentValidMetrics map[peer.ID]*api.Metric,
|
|
candidatesMetrics map[peer.ID]*api.Metric,
|
|
priorityMetrics map[peer.ID]*api.Metric,
|
|
) ([]peer.ID, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
|
|
defer span.End()
|
|
|
|
// The list of peers in current
|
|
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
|
|
for k := range currentValidMetrics {
|
|
validAllocations = append(validAllocations, k)
|
|
}
|
|
|
|
nCurrentValid := len(validAllocations)
|
|
nCandidatesValid := len(candidatesMetrics) + len(priorityMetrics)
|
|
needed := rplMin - nCurrentValid // The minimum we need
|
|
wanted := rplMax - nCurrentValid // The maximum we want
|
|
|
|
logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid)
|
|
logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid)
|
|
logger.Debugf("obtainAllocations: Needed: %d", needed)
|
|
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
|
|
|
|
// Reminder: rplMin <= rplMax AND >0
|
|
|
|
if wanted < 0 { // allocations above maximum threshold: drop some
|
|
// This could be done more intelligently by dropping them
|
|
// according to the allocator order (i.e. free-ing peers
|
|
// with most used space first).
|
|
return validAllocations[0 : len(validAllocations)+wanted], nil
|
|
}
|
|
|
|
if needed <= 0 { // allocations are above minimal threshold
|
|
// We don't provide any new allocations
|
|
return nil, nil
|
|
}
|
|
|
|
if nCandidatesValid < needed { // not enough candidates
|
|
candidatesValid := []peer.ID{}
|
|
for k := range priorityMetrics {
|
|
candidatesValid = append(candidatesValid, k)
|
|
}
|
|
for k := range candidatesMetrics {
|
|
candidatesValid = append(candidatesValid, k)
|
|
}
|
|
return nil, allocationError(hash, needed, wanted, candidatesValid)
|
|
}
|
|
|
|
// We can allocate from this point. Use the allocator to decide
|
|
// on the priority of candidates grab as many as "wanted"
|
|
|
|
// the allocator returns a list of peers ordered by priority
|
|
finalAllocs, err := c.allocator.Allocate(
|
|
ctx,
|
|
hash,
|
|
currentValidMetrics,
|
|
candidatesMetrics,
|
|
priorityMetrics,
|
|
)
|
|
if err != nil {
|
|
return nil, logError(err.Error())
|
|
}
|
|
|
|
logger.Debugf("obtainAllocations: allocate(): %s", finalAllocs)
|
|
|
|
// check that we have enough as the allocator may have returned
|
|
// less candidates than provided.
|
|
if got := len(finalAllocs); got < needed {
|
|
return nil, allocationError(hash, needed, wanted, finalAllocs)
|
|
}
|
|
|
|
allocationsToUse := minInt(wanted, len(finalAllocs))
|
|
|
|
// the final result is the currently valid allocations
|
|
// along with the ones provided by the allocator
|
|
return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil
|
|
}
|