ToPin call and priority pinning

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2018-03-01 14:52:10 -05:00
parent 4e1f59062d
commit 0a34f3382b
8 changed files with 46 additions and 16 deletions

View File

@ -45,7 +45,7 @@ import (
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID) ([]peer.ID, error) {
func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
// Figure out who is holding the CID
currentPin, _ := c.getCurrentPin(hash)
currentAllocs := currentPin.Allocations
@ -56,6 +56,7 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
currentMetrics := make(map[peer.ID]api.Metric)
candidatesMetrics := make(map[peer.ID]api.Metric)
priorityMetrics := make(map[peer.ID]api.Metric)
// Divide metrics between current and candidates.
for _, m := range metrics {
@ -66,6 +67,8 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
continue
case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m
case containsPeer(prioritylist, m.Peer):
priorityMetrics[m.Peer] = m
default:
candidatesMetrics[m.Peer] = m
}
@ -75,7 +78,8 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
rplMin,
rplMax,
currentMetrics,
candidatesMetrics)
candidatesMetrics,
priorityMetrics)
if err != nil {
return newAllocs, err
}
@ -136,7 +140,8 @@ func allocationError(hash *cid.Cid, needed, wanted int, candidatesValid []peer.I
func (c *Cluster) obtainAllocations(
hash *cid.Cid,
rplMin, rplMax int,
currentValidMetrics, candidatesMetrics map[peer.ID]api.Metric) ([]peer.ID, error) {
currentValidMetrics, candidatesMetrics map[peer.ID]api.Metric,
priorityMetrics map[peer.ID]api.Metric) ([]peer.ID, error) {
// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
@ -145,7 +150,7 @@ func (c *Cluster) obtainAllocations(
}
nCurrentValid := len(validAllocations)
nCandidatesValid := len(candidatesMetrics)
nCandidatesValid := len(candidatesMetrics) + len(priorityMetrics)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
@ -170,6 +175,9 @@ func (c *Cluster) obtainAllocations(
if nCandidatesValid < needed { // not enough candidates
candidatesValid := []peer.ID{}
for k := range priorityMetrics {
candidatesValid = append(candidatesValid, k)
}
for k := range candidatesMetrics {
candidatesValid = append(candidatesValid, k)
}
@ -181,7 +189,7 @@ func (c *Cluster) obtainAllocations(
// the allocator returns a list of peers ordered by priority
finalAllocs, err := c.allocator.Allocate(
hash, currentValidMetrics, candidatesMetrics)
hash, currentValidMetrics, candidatesMetrics, priorityMetrics)
if err != nil {
return nil, logError(err.Error())
}

View File

@ -34,7 +34,10 @@ func (alloc AscendAllocator) Shutdown() error { return nil }
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
func (alloc AscendAllocator) Allocate(c *cid.Cid, current,
candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
return util.SortNumeric(candidates, false), nil
first := util.SortNumeric(priority, false)
last := util.SortNumeric(candidates, false)
return append(first, last...), nil
}

View File

@ -99,7 +99,7 @@ func Test(t *testing.T) {
alloc := &AscendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -34,7 +34,9 @@ func (alloc DescendAllocator) Shutdown() error { return nil }
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
func (alloc DescendAllocator) Allocate(c *cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
return util.SortNumeric(candidates, true), nil
first := util.SortNumeric(priority, true)
last := util.SortNumeric(candidates, true)
return append(first, last...), nil
}

View File

@ -99,7 +99,7 @@ func Test(t *testing.T) {
alloc := &DescendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -405,7 +405,7 @@ func (c *Cluster) repinFromPeer(p peer.ID) {
list := cState.List()
for _, pin := range list {
if containsPeer(pin.Allocations, p) {
ok, err := c.pin(pin, []peer.ID{p}) // pin blacklisting this peer
ok, err := c.pin(pin, []peer.ID{p}, []peer.ID{}) // pin blacklisting this peer
if ok && err == nil {
logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty())
}
@ -983,7 +983,19 @@ func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) {
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
func (c *Cluster) Pin(pin api.Pin) error {
_, err := c.pin(pin, []peer.ID{})
_, err := c.pin(pin, []peer.ID{}, []peer.ID{})
return err
}
// PinTo makes the cluster Pin a Cid as in Pin. PinTo's argument already
// contains a set of peers that should perform the pin. If the max repl factor
// is less than the size of the specified peerset then peers are chosen from
// this set in allocation order. If the min repl factor is greater than the
// specified peerset then peers are allocated as in Pin. PinTo is best effort.
// If the peers selected for pinning are unavailable then PinTo will attempt to
// allocate other peers and will not register an error.
func (c *Cluster) PinTo(pin api.Pin) error {
_, err := c.pin(pin, []peer.ID{}, pin.Allocations)
return err
}
@ -991,7 +1003,7 @@ func (c *Cluster) Pin(pin api.Pin) error {
// able to evacuate a node and returns whether the pin was submitted
// to the consensus layer or skipped (due to error or to the fact
// that it was already valid).
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) (bool, error) {
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (bool, error) {
if pin.Cid == nil {
return false, errors.New("bad pin object")
}
@ -1014,7 +1026,7 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) (bool, error) {
case rplMin == -1 && rplMax == -1:
pin.Allocations = []peer.ID{}
default:
allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist)
allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist, prioritylist)
if err != nil {
return false, err
}

View File

@ -147,7 +147,7 @@ type PinAllocator interface {
// which are currently pinning the content. The candidates map
// contains the metrics for all peers which are eligible for pinning
// the content.
Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error)
Allocate(c *cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error)
}
// PeerMonitor is a component in charge of monitoring the peers in the cluster

View File

@ -35,6 +35,11 @@ func (rpcapi *RPCAPI) Pin(in api.PinSerial, out *struct{}) error {
return rpcapi.c.Pin(in.ToPin())
}
// PinTo runs Cluster.PinTo().
func (rpcapi *RPCAPI) PinTo(in api.PinSerial, out *struct{}) error {
return rpcapi.c.PinTo(in.ToPin())
}
// Unpin runs Cluster.Unpin().
func (rpcapi *RPCAPI) Unpin(in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid