package ipfscluster import ( "context" "errors" "fmt" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" "go.opencensus.io/trace" "github.com/ipfs/ipfs-cluster/api" ) // This file gathers allocation logic used when pinning or re-pinning // to find which peers should be allocated to a Cid. Allocation is constrained // by ReplicationFactorMin and ReplicationFactorMax parameters obtained // from the Pin object. // The allocation process has several steps: // // * Find which peers are pinning a CID // * Obtain the last values for the configured informer metrics from the // monitor component // * Divide the metrics between "current" (peers already pinning the CID) // and "candidates" (peers that could pin the CID), as long as their metrics // are valid. // * Given the candidates: // * Check if we are overpinning an item // * Check if there are not enough candidates for the "needed" replication // factor. // * If there are enough candidates: // * Call the configured allocator, which sorts the candidates (and // may veto some depending on the allocation strategy. // * The allocator returns a list of final candidate peers sorted by // order of preference. // * Take as many final candidates from the list as we can, until // ReplicationFactorMax is reached. Error if there are less than // ReplicationFactorMin. // allocate finds peers to allocate a hash using the informer and the monitor // it should only be used with valid replicationFactors (if rplMin and rplMax // are > 0, then rplMin <= rplMax). // It always returns allocations, but if no new allocations are needed, // it will return the current ones. Note that allocate() does not take // into account if the given CID was previously in a "pin everywhere" mode, // and will consider such Pins as currently unallocated ones, providing // new allocations as available. func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) { ctx, span := trace.StartSpan(ctx, "cluster/allocate") defer span.End() if (rplMin + rplMax) == 0 { return nil, fmt.Errorf("bad replication factors: %d/%d", rplMin, rplMax) } if rplMin < 0 && rplMax < 0 { // allocate everywhere return []peer.ID{}, nil } // Figure out who is holding the CID var currentAllocs []peer.ID currentPin, err := c.PinGet(ctx, hash) if err == nil { currentAllocs = currentPin.Allocations } metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name()) currentMetrics := make(map[peer.ID]*api.Metric) candidatesMetrics := make(map[peer.ID]*api.Metric) priorityMetrics := make(map[peer.ID]*api.Metric) // Divide metrics between current and candidates. // All metrics in metrics are valid (at least the // moment they were compiled by the monitor) for _, m := range metrics { switch { case containsPeer(blacklist, m.Peer): // discard blacklisted peers continue case containsPeer(currentAllocs, m.Peer): currentMetrics[m.Peer] = m case containsPeer(prioritylist, m.Peer): priorityMetrics[m.Peer] = m default: candidatesMetrics[m.Peer] = m } } newAllocs, err := c.obtainAllocations( ctx, hash, rplMin, rplMax, currentMetrics, candidatesMetrics, priorityMetrics, ) if err != nil { return newAllocs, err } if newAllocs == nil { newAllocs = currentAllocs } return newAllocs, nil } // allocationError logs an allocation error func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID) error { logger.Errorf("Not enough candidates to allocate %s:", hash) logger.Errorf(" Needed: %d", needed) logger.Errorf(" Wanted: %d", wanted) logger.Errorf(" Valid candidates: %d:", len(candidatesValid)) for _, c := range candidatesValid { logger.Errorf(" - %s", c.Pretty()) } errorMsg := "not enough peers to allocate CID. " errorMsg += fmt.Sprintf("Needed at least: %d. ", needed) errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted) errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid)) errorMsg += "See logs for more info." return errors.New(errorMsg) } func (c *Cluster) obtainAllocations( ctx context.Context, hash cid.Cid, rplMin, rplMax int, currentValidMetrics map[peer.ID]*api.Metric, candidatesMetrics map[peer.ID]*api.Metric, priorityMetrics map[peer.ID]*api.Metric, ) ([]peer.ID, error) { ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations") defer span.End() // The list of peers in current validAllocations := make([]peer.ID, 0, len(currentValidMetrics)) for k := range currentValidMetrics { validAllocations = append(validAllocations, k) } nCurrentValid := len(validAllocations) nCandidatesValid := len(candidatesMetrics) + len(priorityMetrics) needed := rplMin - nCurrentValid // The minimum we need wanted := rplMax - nCurrentValid // The maximum we want logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid) logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid) logger.Debugf("obtainAllocations: Needed: %d", needed) logger.Debugf("obtainAllocations: Wanted: %d", wanted) // Reminder: rplMin <= rplMax AND >0 if wanted < 0 { // allocations above maximum threshold: drop some // This could be done more intelligently by dropping them // according to the allocator order (i.e. free-ing peers // with most used space first). return validAllocations[0 : len(validAllocations)+wanted], nil } if needed <= 0 { // allocations are above minimal threshold // We don't provide any new allocations return nil, nil } if nCandidatesValid < needed { // not enough candidates candidatesValid := []peer.ID{} for k := range priorityMetrics { candidatesValid = append(candidatesValid, k) } for k := range candidatesMetrics { candidatesValid = append(candidatesValid, k) } return nil, allocationError(hash, needed, wanted, candidatesValid) } // We can allocate from this point. Use the allocator to decide // on the priority of candidates grab as many as "wanted" // the allocator returns a list of peers ordered by priority finalAllocs, err := c.allocator.Allocate( ctx, hash, currentValidMetrics, candidatesMetrics, priorityMetrics, ) if err != nil { return nil, logError(err.Error()) } logger.Debugf("obtainAllocations: allocate(): %s", finalAllocs) // check that we have enough as the allocator may have returned // less candidates than provided. if got := len(finalAllocs); got < needed { return nil, allocationError(hash, needed, wanted, finalAllocs) } allocationsToUse := minInt(wanted, len(finalAllocs)) // the final result is the currently valid allocations // along with the ones provided by the allocator return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil }