diff --git a/allocate.go b/allocate.go index 289693c3..123eea16 100644 --- a/allocate.go +++ b/allocate.go @@ -45,7 +45,7 @@ import ( // into account if the given CID was previously in a "pin everywhere" mode, // and will consider such Pins as currently unallocated ones, providing // new allocations as available. -func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID) ([]peer.ID, error) { +func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) { // Figure out who is holding the CID currentPin, _ := c.getCurrentPin(hash) currentAllocs := currentPin.Allocations @@ -56,6 +56,7 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I currentMetrics := make(map[peer.ID]api.Metric) candidatesMetrics := make(map[peer.ID]api.Metric) + priorityMetrics := make(map[peer.ID]api.Metric) // Divide metrics between current and candidates. for _, m := range metrics { @@ -66,6 +67,8 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I continue case containsPeer(currentAllocs, m.Peer): currentMetrics[m.Peer] = m + case containsPeer(prioritylist, m.Peer): + priorityMetrics[m.Peer] = m default: candidatesMetrics[m.Peer] = m } @@ -75,7 +78,8 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I rplMin, rplMax, currentMetrics, - candidatesMetrics) + candidatesMetrics, + priorityMetrics) if err != nil { return newAllocs, err } @@ -136,7 +140,8 @@ func allocationError(hash *cid.Cid, needed, wanted int, candidatesValid []peer.I func (c *Cluster) obtainAllocations( hash *cid.Cid, rplMin, rplMax int, - currentValidMetrics, candidatesMetrics map[peer.ID]api.Metric) ([]peer.ID, error) { + currentValidMetrics, candidatesMetrics map[peer.ID]api.Metric, + priorityMetrics map[peer.ID]api.Metric) ([]peer.ID, error) { // The list of peers in current validAllocations := make([]peer.ID, 0, len(currentValidMetrics)) @@ -145,7 +150,7 @@ func (c *Cluster) obtainAllocations( } nCurrentValid := len(validAllocations) - nCandidatesValid := len(candidatesMetrics) + nCandidatesValid := len(candidatesMetrics) + len(priorityMetrics) needed := rplMin - nCurrentValid // The minimum we need wanted := rplMax - nCurrentValid // The maximum we want @@ -170,6 +175,9 @@ func (c *Cluster) obtainAllocations( if nCandidatesValid < needed { // not enough candidates candidatesValid := []peer.ID{} + for k := range priorityMetrics { + candidatesValid = append(candidatesValid, k) + } for k := range candidatesMetrics { candidatesValid = append(candidatesValid, k) } @@ -181,7 +189,7 @@ func (c *Cluster) obtainAllocations( // the allocator returns a list of peers ordered by priority finalAllocs, err := c.allocator.Allocate( - hash, currentValidMetrics, candidatesMetrics) + hash, currentValidMetrics, candidatesMetrics, priorityMetrics) if err != nil { return nil, logError(err.Error()) } diff --git a/allocator/ascendalloc/ascendalloc.go b/allocator/ascendalloc/ascendalloc.go index eb3fd72f..b1c9c10a 100644 --- a/allocator/ascendalloc/ascendalloc.go +++ b/allocator/ascendalloc/ascendalloc.go @@ -34,7 +34,10 @@ func (alloc AscendAllocator) Shutdown() error { return nil } // carry a numeric value such as "used disk". We do not pay attention to // the metrics of the currently allocated peers and we just sort the // candidates based on their metric values (smallest to largest). -func (alloc AscendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) { +func (alloc AscendAllocator) Allocate(c *cid.Cid, current, + candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) { // sort our metrics - return util.SortNumeric(candidates, false), nil + first := util.SortNumeric(priority, false) + last := util.SortNumeric(candidates, false) + return append(first, last...), nil } diff --git a/allocator/ascendalloc/ascendalloc_test.go b/allocator/ascendalloc/ascendalloc_test.go index 8787710c..379f88e9 100644 --- a/allocator/ascendalloc/ascendalloc_test.go +++ b/allocator/ascendalloc/ascendalloc_test.go @@ -99,7 +99,7 @@ func Test(t *testing.T) { alloc := &AscendAllocator{} for i, tc := range testCases { t.Logf("Test case %d", i) - res, err := alloc.Allocate(testCid, tc.current, tc.candidates) + res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil) if err != nil { t.Fatal(err) } diff --git a/allocator/descendalloc/descendalloc.go b/allocator/descendalloc/descendalloc.go index be072e95..ef4ed28b 100644 --- a/allocator/descendalloc/descendalloc.go +++ b/allocator/descendalloc/descendalloc.go @@ -34,7 +34,9 @@ func (alloc DescendAllocator) Shutdown() error { return nil } // carry a numeric value such as "used disk". We do not pay attention to // the metrics of the currently allocated peers and we just sort the // candidates based on their metric values (largest to smallest). -func (alloc DescendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) { +func (alloc DescendAllocator) Allocate(c *cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) { // sort our metrics - return util.SortNumeric(candidates, true), nil + first := util.SortNumeric(priority, true) + last := util.SortNumeric(candidates, true) + return append(first, last...), nil } diff --git a/allocator/descendalloc/descendalloc_test.go b/allocator/descendalloc/descendalloc_test.go index b1463498..b79b4699 100644 --- a/allocator/descendalloc/descendalloc_test.go +++ b/allocator/descendalloc/descendalloc_test.go @@ -99,7 +99,7 @@ func Test(t *testing.T) { alloc := &DescendAllocator{} for i, tc := range testCases { t.Logf("Test case %d", i) - res, err := alloc.Allocate(testCid, tc.current, tc.candidates) + res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil) if err != nil { t.Fatal(err) } diff --git a/cluster.go b/cluster.go index 089333d7..7f6df6e3 100644 --- a/cluster.go +++ b/cluster.go @@ -405,7 +405,7 @@ func (c *Cluster) repinFromPeer(p peer.ID) { list := cState.List() for _, pin := range list { if containsPeer(pin.Allocations, p) { - ok, err := c.pin(pin, []peer.ID{p}) // pin blacklisting this peer + ok, err := c.pin(pin, []peer.ID{p}, []peer.ID{}) // pin blacklisting this peer if ok && err == nil { logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty()) } @@ -983,7 +983,19 @@ func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) { // to the global state. Pin does not reflect the success or failure // of underlying IPFS daemon pinning operations. func (c *Cluster) Pin(pin api.Pin) error { - _, err := c.pin(pin, []peer.ID{}) + _, err := c.pin(pin, []peer.ID{}, []peer.ID{}) + return err +} + +// PinTo makes the cluster Pin a Cid as in Pin. PinTo's argument already +// contains a set of peers that should perform the pin. If the max repl factor +// is less than the size of the specified peerset then peers are chosen from +// this set in allocation order. If the min repl factor is greater than the +// specified peerset then peers are allocated as in Pin. PinTo is best effort. +// If the peers selected for pinning are unavailable then PinTo will attempt to +// allocate other peers and will not register an error. +func (c *Cluster) PinTo(pin api.Pin) error { + _, err := c.pin(pin, []peer.ID{}, pin.Allocations) return err } @@ -991,7 +1003,7 @@ func (c *Cluster) Pin(pin api.Pin) error { // able to evacuate a node and returns whether the pin was submitted // to the consensus layer or skipped (due to error or to the fact // that it was already valid). -func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) (bool, error) { +func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (bool, error) { if pin.Cid == nil { return false, errors.New("bad pin object") } @@ -1014,7 +1026,7 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) (bool, error) { case rplMin == -1 && rplMax == -1: pin.Allocations = []peer.ID{} default: - allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist) + allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist, prioritylist) if err != nil { return false, err } diff --git a/ipfscluster.go b/ipfscluster.go index 7b9c267d..5d5b5fff 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -147,7 +147,7 @@ type PinAllocator interface { // which are currently pinning the content. The candidates map // contains the metrics for all peers which are eligible for pinning // the content. - Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) + Allocate(c *cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) } // PeerMonitor is a component in charge of monitoring the peers in the cluster diff --git a/rpc_api.go b/rpc_api.go index 07428f9d..d13c1e24 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -35,6 +35,11 @@ func (rpcapi *RPCAPI) Pin(in api.PinSerial, out *struct{}) error { return rpcapi.c.Pin(in.ToPin()) } +// PinTo runs Cluster.PinTo(). +func (rpcapi *RPCAPI) PinTo(in api.PinSerial, out *struct{}) error { + return rpcapi.c.PinTo(in.ToPin()) +} + // Unpin runs Cluster.Unpin(). func (rpcapi *RPCAPI) Unpin(in api.PinSerial, out *struct{}) error { c := in.ToPin().Cid