Feat #277: Avoid re-pinning log entries when the new pin is the same

This ensures that we don't re-pin something which is already correctly
pinned with the same allocations.

It also ensures that we do re-pin something when the replication
factor associated to it changes.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-01-16 11:21:34 +01:00
parent b013850f94
commit ae1afe3af8
4 changed files with 82 additions and 35 deletions

View File

@ -40,11 +40,11 @@ import (
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with valid replicationFactors (rplMin and rplMax
// which are positive and rplMin <= rplMax).
// It only returns new allocations when needed. nil, nil means current
// are ok.
// It always returns allocations, but if no new allocations are needed,
// it will return the current ones.
func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID) ([]peer.ID, error) {
// Figure out who is holding the CID
currentAllocs := c.getCurrentAllocations(hash)
currentAllocs := c.getCurrentPin(hash).Allocations
metrics, err := c.getInformerMetrics()
if err != nil {
return nil, err
@ -67,26 +67,28 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
}
}
return c.obtainAllocations(hash,
newAllocs, err := c.obtainAllocations(hash,
rplMin,
rplMax,
currentMetrics,
candidatesMetrics)
if err != nil {
return newAllocs, err
}
if newAllocs == nil {
newAllocs = currentAllocs
}
return newAllocs, nil
}
// getCurrentAllocations returns the list of peers allocated to a Cid.
func (c *Cluster) getCurrentAllocations(h *cid.Cid) []peer.ID {
var allocs []peer.ID
// getCurrentPin returns the Pin object for h, if we can find one
// or builds an empty one.
func (c *Cluster) getCurrentPin(h *cid.Cid) api.Pin {
st, err := c.consensus.State()
if err != nil {
// no state we assume it is empty. If there was other
// problem, we would fail to commit anyway.
allocs = []peer.ID{}
} else {
pin := st.Get(h)
allocs = pin.Allocations
return api.PinCid(h)
}
return allocs
return st.Get(h)
}
// getInformerMetrics returns the MonitorLastMetrics() for the
@ -149,7 +151,7 @@ func (c *Cluster) obtainAllocations(
// Reminder: rplMin <= rplMax AND >0
if wanted <= 0 { // alocations above maximum threshold: drop some
if wanted < 0 { // alocations above maximum threshold: drop some
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
@ -157,7 +159,7 @@ func (c *Cluster) obtainAllocations(
}
if needed <= 0 { // allocations are above minimal threshold
// We keep things as they are. Avoid any changes to the pin set.
// We don't provide any new allocations
return nil, nil
}

View File

@ -10,6 +10,8 @@ package api
import (
"fmt"
"sort"
"strings"
"time"
cid "github.com/ipfs/go-cid"
@ -433,6 +435,7 @@ type Pin struct {
func PinCid(c *cid.Cid) Pin {
return Pin{
Cid: c,
Allocations: []peer.ID{},
}
}
@ -464,6 +467,38 @@ func (pin Pin) ToSerial() PinSerial {
}
}
// Equals checks if two pins are the same (with the same allocations).
// If allocations are the same but in different order, they are still
// considered equivalent.
func (pin Pin) Equals(pin2 Pin) bool {
pin1s := pin.ToSerial()
pin2s := pin2.ToSerial()
if pin1s.Cid != pin2s.Cid {
return false
}
if pin1s.Name != pin2s.Name {
return false
}
sort.Strings(pin1s.Allocations)
sort.Strings(pin2s.Allocations)
if strings.Join(pin1s.Allocations, ",") != strings.Join(pin2s.Allocations, ",") {
return false
}
if pin1s.ReplicationFactorMax != pin2s.ReplicationFactorMax {
return false
}
if pin1s.ReplicationFactorMin != pin2s.ReplicationFactorMin {
return false
}
return true
}
// ToPin converts a PinSerial to its native form.
func (pins PinSerial) ToPin() Pin {
c, err := cid.Decode(pins.Cid)

View File

@ -401,8 +401,10 @@ func (c *Cluster) repinFromPeer(p peer.ID) {
list := cState.List()
for _, pin := range list {
if containsPeer(pin.Allocations, p) {
logger.Infof("repinning %s out of %s", pin.Cid, p.Pretty())
c.pin(pin, []peer.ID{p}) // pin blacklisting this peer
ok, err := c.pin(pin, []peer.ID{p}) // pin blacklisting this peer
if ok && err == nil {
logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty())
}
}
}
}
@ -959,12 +961,18 @@ func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) {
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
func (c *Cluster) Pin(pin api.Pin) error {
return c.pin(pin, []peer.ID{})
_, err := c.pin(pin, []peer.ID{})
return err
}
// pin performs the actual pinning and supports a blacklist to be
// able to evacuate a node.
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) error {
// able to evacuate a node and returns whether the pin was submitted
// to the consensus layer or skipped (due to error or to the fact
// that it was already valid).
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) (bool, error) {
if pin.Cid == nil {
return false, errors.New("bad pin object")
}
rplMin := pin.ReplicationFactorMin
rplMax := pin.ReplicationFactorMax
if rplMin == 0 {
@ -977,31 +985,33 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) error {
}
if err := isReplicationFactorValid(rplMin, rplMax); err != nil {
return err
return false, err
}
switch {
case rplMin == -1 && rplMax == -1:
pin.Allocations = []peer.ID{}
logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid)
default:
allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist)
if err != nil {
return err
}
if allocs == nil {
logger.Infof("Skipping repinning of %s. Replication factor is within thresholds", pin.Cid)
return nil
return false, err
}
pin.Allocations = allocs
}
if c.getCurrentPin(pin.Cid).Equals(pin) {
// skip pinning
logger.Debugf("pinning %s skipped: already correctly allocated", pin.Cid)
return false, nil
}
if len(pin.Allocations) == 0 {
logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid)
} else {
logger.Infof("IPFS cluster pinning %s on %s:", pin.Cid, pin.Allocations)
}
err := c.consensus.LogPin(pin)
if err != nil {
return err
}
return nil
return true, c.consensus.LogPin(pin)
}
// Unpin makes the cluster Unpin a Cid. This implies adding the Cid

View File

@ -59,7 +59,7 @@ func (st *MapState) Get(c *cid.Cid) api.Pin {
defer st.pinMux.RUnlock()
pins, ok := st.PinMap[c.String()]
if !ok { // make sure no panics
return api.Pin{}
return api.PinCid(c)
}
return pins.ToPin()
}