Fix #277: Introduce maximum and minimum replication factor

This PR replaces ReplicationFactor with ReplicationFactorMax
and ReplicationFactor min.

This allows a CID to be pinned even though the desired
replication factor (max) is not reached, and prevents triggering
re-pinnings when the replication factor has not crossed the
lower threshold (min).

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-01-12 18:04:46 +01:00
parent 7ea011f35a
commit 4549282cba
18 changed files with 524 additions and 229 deletions

View File

@ -8,7 +8,7 @@ checks:
threshold: 500
method-complexity:
config:
threshold: 8
threshold: 12
method-lines:
config:
threshold: 60

195
allocate.go Normal file
View File

@ -0,0 +1,195 @@
package ipfscluster
import (
"errors"
"fmt"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
)
// This file gathers allocation logic used when pinning or re-pinning
// to find which peers should be allocated to a Cid. Allocation is constrained
// by ReplicationFactorMin and ReplicationFactorMax parametres obtained
// from the Pin object.
//The allocation
// process has several steps:
//
// * Find which peers are pinning a CID
// * Obtain the last values for the configured informer metrics from the
// monitor component
// * Divide the metrics between "current" (peers already pinning the CID)
// and "candidates" (peers that could pin the CID), as long as their metrics
// are valid.
// * Given the candidates:
// * Check if we are overpinning an item
// * Check if there are not enough candidates for the "needed" replication
// factor.
// * If there are enough candidates:
// * Call the configured allocator, which sorts the candidates (and
// may veto some depending on the allocation strategy.
// * The allocator returns a list of final candidate peers sorted by
// order of preference.
// * Take as many final candidates from the list as we can, until
// ReplicationFactorMax is reached. Error if there are less than
// ReplicationFactorMin.
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with valid replicationFactors (rplMin and rplMax
// which are positive and rplMin <= rplMax).
// It only returns new allocations when needed. nil, nil means current
// are ok.
func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID) ([]peer.ID, error) {
// Figure out who is holding the CID
currentAllocs := c.getCurrentAllocations(hash)
metrics, err := c.getInformerMetrics()
if err != nil {
return nil, err
}
currentMetrics := make(map[peer.ID]api.Metric)
candidatesMetrics := make(map[peer.ID]api.Metric)
// Divide metrics between current and candidates.
for _, m := range metrics {
switch {
case m.Discard() || containsPeer(blacklist, m.Peer):
// discard peers with invalid metrics and
// those in the blacklist
continue
case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m
default:
candidatesMetrics[m.Peer] = m
}
}
return c.obtainAllocations(hash,
rplMin,
rplMax,
currentMetrics,
candidatesMetrics)
}
// getCurrentAllocations returns the list of peers allocated to a Cid.
func (c *Cluster) getCurrentAllocations(h *cid.Cid) []peer.ID {
var allocs []peer.ID
st, err := c.consensus.State()
if err != nil {
// no state we assume it is empty. If there was other
// problem, we would fail to commit anyway.
allocs = []peer.ID{}
} else {
pin := st.Get(h)
allocs = pin.Allocations
}
return allocs
}
// getInformerMetrics returns the MonitorLastMetrics() for the
// configured informer.
func (c *Cluster) getInformerMetrics() ([]api.Metric, error) {
var metrics []api.Metric
metricName := c.informer.Name()
l, err := c.consensus.Leader()
if err != nil {
return nil, errors.New("cannot determine leading Monitor")
}
err = c.rpcClient.Call(l,
"Cluster", "PeerMonitorLastMetrics",
metricName,
&metrics)
if err != nil {
return nil, err
}
return metrics, nil
}
// allocationError logs an allocation error
func allocationError(hash *cid.Cid, needed, wanted int, candidatesValid []peer.ID) error {
logger.Errorf("Not enough candidates to allocate %s:", hash)
logger.Errorf(" Needed: %d", needed)
logger.Errorf(" Wanted: %d", wanted)
logger.Errorf(" Valid candidates: %d:", len(candidatesValid))
for _, c := range candidatesValid {
logger.Errorf(" - %s", c.Pretty())
}
errorMsg := "not enough peers to allocate CID. "
errorMsg += fmt.Sprintf("Needed at least: %d. ", needed)
errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted)
errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid))
errorMsg += "See logs for more info."
return errors.New(errorMsg)
}
func (c *Cluster) obtainAllocations(
hash *cid.Cid,
rplMin, rplMax int,
currentValidMetrics, candidatesMetrics map[peer.ID]api.Metric) ([]peer.ID, error) {
// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
for k := range currentValidMetrics {
validAllocations = append(validAllocations, k)
}
nCurrentValid := len(validAllocations)
nCandidatesValid := len(candidatesMetrics)
needed := rplMin - nCurrentValid // The minimum we need
wanted := rplMax - nCurrentValid // The maximum we want
logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid)
logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid)
logger.Debugf("obtainAllocations: Needed: %d", needed)
logger.Debugf("obtainAllocations: Wanted: %d", wanted)
// Reminder: rplMin <= rplMax AND >0
if wanted <= 0 { // alocations above maximum threshold: drop some
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
return validAllocations[0 : len(validAllocations)+wanted], nil
}
if needed <= 0 { // allocations are above minimal threshold
// We keep things as they are. Avoid any changes to the pin set.
return nil, nil
}
if nCandidatesValid < needed { // not enough candidates
candidatesValid := []peer.ID{}
for k := range candidatesMetrics {
candidatesValid = append(candidatesValid, k)
}
return nil, allocationError(hash, needed, wanted, candidatesValid)
}
// We can allocate from this point. Use the allocator to decide
// on the priority of candidates grab as many as "wanted"
// the allocator returns a list of peers ordered by priority
finalAllocs, err := c.allocator.Allocate(
hash, currentValidMetrics, candidatesMetrics)
if err != nil {
return nil, logError(err.Error())
}
logger.Debugf("obtainAllocations: allocate(): %s", finalAllocs)
// check that we have enough as the allocator may have returned
// less candidates than provided.
if got := len(finalAllocs); got < needed {
return nil, allocationError(hash, needed, wanted, finalAllocs)
}
allocationsToUse := minInt(wanted, len(finalAllocs))
// the final result is the currently valid allocations
// along with the ones provided by the allocator
return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil
}

View File

@ -425,7 +425,8 @@ type Pin struct {
Cid *cid.Cid
Name string
Allocations []peer.ID
ReplicationFactor int
ReplicationFactorMin int
ReplicationFactorMax int
}
// PinCid is a shorcut to create a Pin only with a Cid.
@ -440,8 +441,9 @@ type PinSerial struct {
Cid string `json:"cid"`
Name string `json:"name"`
Allocations []string `json:"allocations"`
Everywhere bool `json:"everywhere,omitempty"` // legacy
ReplicationFactor int `json:"replication_factor"`
ReplicationFactor int `json:"replication_factor,omitempty"` //legacy
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
}
// ToSerial converts a Pin to PinSerial.
@ -453,13 +455,13 @@ func (pin Pin) ToSerial() PinSerial {
n := pin.Name
allocs := PeersToStrings(pin.Allocations)
rpl := pin.ReplicationFactor
return PinSerial{
Cid: c,
Name: n,
Allocations: allocs,
ReplicationFactor: rpl,
ReplicationFactorMin: pin.ReplicationFactorMin,
ReplicationFactorMax: pin.ReplicationFactorMax,
}
}
@ -471,15 +473,17 @@ func (pins PinSerial) ToPin() Pin {
}
// legacy format management
if pins.ReplicationFactor == 0 && pins.Everywhere {
pins.ReplicationFactor = -1
if rf := pins.ReplicationFactor; rf != 0 {
pins.ReplicationFactorMin = rf
pins.ReplicationFactorMax = rf
}
return Pin{
Cid: c,
Name: pins.Name,
Allocations: StringsToPeers(pins.Allocations),
ReplicationFactor: pins.ReplicationFactor,
ReplicationFactorMin: pins.ReplicationFactorMin,
ReplicationFactorMax: pins.ReplicationFactorMax,
}
}

View File

@ -150,13 +150,15 @@ func TestPinConv(t *testing.T) {
c := Pin{
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
ReplicationFactor: -1,
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
}
newc := c.ToSerial().ToPin()
if c.Cid.String() != newc.Cid.String() ||
c.Allocations[0] != newc.Allocations[0] ||
c.ReplicationFactor != newc.ReplicationFactor {
c.ReplicationFactorMin != newc.ReplicationFactorMin ||
c.ReplicationFactorMax != newc.ReplicationFactorMax {
t.Error("mismatch")
}
}

View File

@ -970,25 +970,36 @@ func (c *Cluster) Pin(pin api.Pin) error {
// pin performs the actual pinning and supports a blacklist to be
// able to evacuate a node.
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) error {
rpl := pin.ReplicationFactor
if rpl == 0 {
rpl = c.config.ReplicationFactor
pin.ReplicationFactor = rpl
rplMin := pin.ReplicationFactorMin
rplMax := pin.ReplicationFactorMax
if rplMin == 0 {
rplMin = c.config.ReplicationFactorMin
pin.ReplicationFactorMin = rplMin
}
if rplMax == 0 {
rplMax = c.config.ReplicationFactorMax
pin.ReplicationFactorMax = rplMax
}
if err := isReplicationFactorValid(rplMin, rplMax); err != nil {
return err
}
switch {
case rpl == 0:
return errors.New("replication factor is 0")
case rpl < 0:
case rplMin == -1 && rplMax == -1:
pin.Allocations = []peer.ID{}
logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid)
case rpl > 0:
allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor, blacklist)
default:
allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist)
if err != nil {
return err
}
if allocs == nil {
logger.Infof("Skipping repinning of %s. Replication factor is within thresholds", pin.Cid)
return nil
}
pin.Allocations = allocs
logger.Infof("IPFS cluster pinning %s on %s:", pin.Cid, pin.Allocations)
}
err := c.consensus.LogPin(pin)
@ -1276,104 +1287,6 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) {
return id, err
}
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with a positive replication factor
func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer.ID, error) {
if repl <= 0 {
return nil, errors.New("cannot decide allocation for replication factor <= 0")
}
// Figure out who is currently holding this
var pinAllocations []peer.ID
st, err := c.consensus.State()
if err != nil {
// no state we assume it is empty. If there was other
// problem, we would fail to commit anyway.
pinAllocations = []peer.ID{}
} else {
pin := st.Get(hash)
pinAllocations = pin.Allocations
}
// Get the LastMetrics from the leading monitor. They are the last
// valid metrics from current cluster peers
var metrics []api.Metric
metricName := c.informer.Name()
l, err := c.consensus.Leader()
if err != nil {
return nil, errors.New("cannot determine leading Monitor")
}
err = c.rpcClient.Call(l,
"Cluster", "PeerMonitorLastMetrics",
metricName,
&metrics)
if err != nil {
return nil, err
}
// We must divide the metrics between current and candidates
current := make(map[peer.ID]api.Metric)
candidates := make(map[peer.ID]api.Metric)
validAllocations := make([]peer.ID, 0, len(pinAllocations))
for _, m := range metrics {
if m.Discard() || containsPeer(blacklist, m.Peer) {
// blacklisted peers do not exist for us
continue
} else if containsPeer(pinAllocations, m.Peer) {
current[m.Peer] = m
validAllocations = append(validAllocations, m.Peer)
} else {
candidates[m.Peer] = m
}
}
currentValid := len(validAllocations)
candidatesValid := len(candidates)
needed := repl - currentValid
logger.Debugf("allocate: Valid allocations: %d", currentValid)
logger.Debugf("allocate: Valid candidates: %d", candidatesValid)
logger.Debugf("allocate: Needed: %d", needed)
// If needed == 0, we don't need anything. If needed < 0, we are
// reducing the replication factor
switch {
case needed <= 0: // set the allocations to the needed ones
return validAllocations[0 : len(validAllocations)+needed], nil
case candidatesValid < needed:
candidatesIds := []peer.ID{}
for k := range candidates {
candidatesIds = append(candidatesIds, k)
}
err = logError(
"not enough candidates to allocate %s. Needed: %d. Got: %d (%s)",
hash, needed, candidatesValid, candidatesIds)
return nil, err
default:
// this will return candidate peers in order of
// preference according to the allocator.
candidateAllocs, err := c.allocator.Allocate(hash, current, candidates)
if err != nil {
return nil, logError(err.Error())
}
logger.Debugf("allocate: candidate allocations: %s", candidateAllocs)
// we don't have enough peers to pin
if got := len(candidateAllocs); got < needed {
err = logError(
"cannot find enough allocations for %s. Needed: %d. Got: %d (%s)",
hash, needed, got, candidateAllocs)
return nil, err
}
// the new allocations = the valid ones we had + the needed ones
return append(validAllocations, candidateAllocs[0:needed]...), nil
}
}
// diffPeers returns the peerIDs added and removed from peers2 in relation to
// peers1
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {

View File

@ -87,11 +87,23 @@ type Config struct {
// large.
IPFSSyncInterval time.Duration
// ReplicationFactor indicates the number of nodes that must pin content.
// For exampe, a replication_factor of 2 will prompt cluster to choose
// two nodes for each pinned hash. A replication_factor -1 will
// use every available node for each pin.
ReplicationFactor int
// ReplicationFactorMax indicates the target number of nodes
// that should pin content. For exampe, a replication_factor of
// 3 will have cluster allocate each pinned hash to 3 peers if
// possible.
// See also ReplicationFactorMin. A ReplicationFactorMax of -1
// will allocate to every available node.
ReplicationFactorMax int
// ReplicationFactorMin indicates the minimum number of healthy
// nodes pinning content. If the number of nodes available to pin
// is less than this threshold, an error will be returned.
// In the case of peer health issues, content pinned will be
// re-allocated if the threshold is crossed.
// For exampe, a ReplicationFactorMin of 2 will allocate at least
// two peer to hold content, and return an error if this is not
// possible.
ReplicationFactorMin int
// MonitorPingInterval is frequency by which a cluster peer pings the
// monitoring component. The ping metric has a TTL set to the double
@ -113,7 +125,9 @@ type configJSON struct {
ListenMultiaddress string `json:"listen_multiaddress"`
StateSyncInterval string `json:"state_sync_interval"`
IPFSSyncInterval string `json:"ipfs_sync_interval"`
ReplicationFactor int `json:"replication_factor"`
ReplicationFactor int `json:"replication_factor,omitempty"` // legacy
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
MonitorPingInterval string `json:"monitor_ping_interval"`
}
@ -190,10 +204,34 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.monitoring_interval is invalid")
}
if cfg.ReplicationFactor < -1 {
return errors.New("cluster.replication_factor is invalid")
rfMax := cfg.ReplicationFactorMax
rfMin := cfg.ReplicationFactorMin
return isReplicationFactorValid(rfMin, rfMax)
}
func isReplicationFactorValid(rplMin, rplMax int) error {
// check Max and Min are correct
if rplMin == 0 || rplMax == 0 {
return errors.New("cluster.replication_factor_min and max must be set")
}
if rplMin > rplMax {
return errors.New("cluster.replication_factor_min is larger than max")
}
if rplMin < -1 {
return errors.New("cluster.replication_factor_min is wrong")
}
if rplMax < -1 {
return errors.New("cluster.replication_factor_max is wrong")
}
if (rplMin == -1 && rplMax != -1) ||
(rplMin != -1 && rplMax == -1) {
return errors.New("cluster.replication_factor_min and max must be -1 when one of them is")
}
return nil
}
@ -212,7 +250,8 @@ func (cfg *Config) setDefaults() {
cfg.LeaveOnShutdown = DefaultLeaveOnShutdown
cfg.StateSyncInterval = DefaultStateSyncInterval
cfg.IPFSSyncInterval = DefaultIPFSSyncInterval
cfg.ReplicationFactor = DefaultReplicationFactor
cfg.ReplicationFactorMin = DefaultReplicationFactor
cfg.ReplicationFactorMax = DefaultReplicationFactor
cfg.MonitorPingInterval = DefaultMonitorPingInterval
}
@ -230,6 +269,14 @@ func (cfg *Config) LoadJSON(raw []byte) error {
// Make sure all non-defined keys have good values.
cfg.setDefaults()
parseDuration := func(txt string) time.Duration {
d, _ := time.ParseDuration(txt)
if txt != "" && d == 0 {
logger.Warningf("%s is not a valid duration. Default will be used", txt)
}
return d
}
id, err := peer.IDB58Decode(jcfg.ID)
if err != nil {
err = fmt.Errorf("error decoding cluster ID: %s", err)
@ -291,22 +338,22 @@ func (cfg *Config) LoadJSON(raw []byte) error {
}
cfg.ListenAddr = clusterAddr
if rf := jcfg.ReplicationFactor; rf == 0 {
logger.Warning("Replication factor set to -1 (pin everywhere)")
cfg.ReplicationFactor = -1
} else {
cfg.ReplicationFactor = rf
rplMin := jcfg.ReplicationFactorMin
rplMax := jcfg.ReplicationFactorMax
if jcfg.ReplicationFactor != 0 { // read min and max
rplMin = jcfg.ReplicationFactor
rplMax = rplMin
}
config.SetIfNotDefault(rplMin, &cfg.ReplicationFactorMin)
config.SetIfNotDefault(rplMax, &cfg.ReplicationFactorMax)
// Validation will detect problems here
interval, _ := time.ParseDuration(jcfg.StateSyncInterval)
cfg.StateSyncInterval = interval
stateSyncInterval := parseDuration(jcfg.StateSyncInterval)
ipfsSyncInterval := parseDuration(jcfg.IPFSSyncInterval)
monitorPingInterval := parseDuration(jcfg.MonitorPingInterval)
interval, _ = time.ParseDuration(jcfg.IPFSSyncInterval)
cfg.IPFSSyncInterval = interval
interval, _ = time.ParseDuration(jcfg.MonitorPingInterval)
cfg.MonitorPingInterval = interval
config.SetIfNotDefault(stateSyncInterval, &cfg.StateSyncInterval)
config.SetIfNotDefault(ipfsSyncInterval, &cfg.IPFSSyncInterval)
config.SetIfNotDefault(monitorPingInterval, &cfg.MonitorPingInterval)
cfg.LeaveOnShutdown = jcfg.LeaveOnShutdown
@ -350,7 +397,8 @@ func (cfg *Config) ToJSON() (raw []byte, err error) {
jcfg.Secret = EncodeClusterSecret(cfg.Secret)
jcfg.Peers = clusterPeers
jcfg.Bootstrap = bootstrap
jcfg.ReplicationFactor = cfg.ReplicationFactor
jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin
jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax
jcfg.LeaveOnShutdown = cfg.LeaveOnShutdown
jcfg.ListenMultiaddress = cfg.ListenAddr.String()
jcfg.StateSyncInterval = cfg.StateSyncInterval.String()

View File

@ -21,7 +21,8 @@ var ccfgTestJSON = []byte(`
"listen_multiaddress": "/ip4/127.0.0.1/tcp/10000",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": 5,
"replication_factor_min": 5,
"replication_factor_max": 5,
"monitor_ping_interval": "2s"
}
`)
@ -41,8 +42,8 @@ func TestLoadJSON(t *testing.T) {
t.Error("expected 1 peer and 1 bootstrap")
}
if cfg.ReplicationFactor != 5 {
t.Error("expected replication factor 5")
if cfg.ReplicationFactorMin != 5 {
t.Error("expected replication factor min == 5")
}
j := &configJSON{}
@ -111,20 +112,51 @@ func TestLoadJSON(t *testing.T) {
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.StateSyncInterval = ""
j.ReplicationFactor = 0
j.ReplicationFactorMin = 0
j.ReplicationFactorMax = 0
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error state_sync_interval")
cfg.LoadJSON(tst)
if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 {
t.Error("expected default replication factor")
}
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactor = 0
j.ReplicationFactor = 3
tst, _ = json.Marshal(j)
cfg.LoadJSON(tst)
if cfg.ReplicationFactor != -1 {
t.Error("expected default replication factor")
if cfg.ReplicationFactorMin != 3 || cfg.ReplicationFactorMax != 3 {
t.Error("expected replicationFactor Min/Max override")
}
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactorMin = -1
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error when only one replication factor is -1")
}
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactorMin = 5
j.ReplicationFactorMax = 4
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error when only rplMin > rplMax")
}
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactorMin = 0
j.ReplicationFactorMax = 0
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 {
t.Error("expected default replication factors")
}
}
@ -163,4 +195,17 @@ func TestValidate(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.ReplicationFactorMin = 10
cfg.ReplicationFactorMax = 5
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
cfg.Default()
cfg.ReplicationFactorMin = 0
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}

View File

@ -219,7 +219,7 @@ func TestClusterPins(t *testing.T) {
if len(pins) != 1 {
t.Fatal("pin should be part of the state")
}
if !pins[0].Cid.Equals(c) || pins[0].ReplicationFactor != -1 {
if !pins[0].Cid.Equals(c) || pins[0].ReplicationFactorMin != -1 || pins[0].ReplicationFactorMax != -1 {
t.Error("the Pin does not look as expected")
}
}
@ -239,7 +239,7 @@ func TestClusterPinGet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !pin.Cid.Equals(c) || pin.ReplicationFactor != -1 {
if !pin.Cid.Equals(c) || pin.ReplicationFactorMax != -1 || pin.ReplicationFactorMax != -1 {
t.Error("the Pin does not look as expected")
}

View File

@ -96,7 +96,7 @@ func TestConsensusPin(t *testing.T) {
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1})
err := cc.LogPin(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
@ -186,7 +186,7 @@ func TestConsensusRmPeer(t *testing.T) {
cc.raft.WaitForLeader(ctx)
c, _ := cid.Decode(test.TestCid1)
err = cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1})
err = cc.LogPin(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("could not pin after adding peer:", err)
}
@ -234,7 +234,7 @@ func TestRaftLatestSnapshot(t *testing.T) {
// Make pin 1
c1, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1})
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the first pin did not make it to the log:", err)
}
@ -247,7 +247,7 @@ func TestRaftLatestSnapshot(t *testing.T) {
// Make pin 2
c2, _ := cid.Decode(test.TestCid2)
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1})
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the second pin did not make it to the log:", err)
}
@ -260,7 +260,7 @@ func TestRaftLatestSnapshot(t *testing.T) {
// Make pin 3
c3, _ := cid.Decode(test.TestCid3)
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1})
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
if err != nil {
t.Error("the third pin did not make it to the log:", err)
}

View File

@ -40,7 +40,7 @@ func TestApplyToUnpin(t *testing.T) {
st := mapstate.NewMapState()
c, _ := cid.Decode(test.TestCid1)
st.Add(api.Pin{Cid: c, ReplicationFactor: -1})
st.Add(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1})
op.ApplyTo(st)
pins := st.List()
if len(pins) != 0 {

View File

@ -737,7 +737,8 @@ func TestClustersReplication(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
c.config.ReplicationFactorMin = nClusters - 1
c.config.ReplicationFactorMax = nClusters - 1
}
// Why is replication factor nClusters - 1?
@ -836,7 +837,8 @@ func TestClustersReplicationRealloc(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
c.config.ReplicationFactorMin = nClusters - 1
c.config.ReplicationFactorMax = nClusters - 1
}
j := rand.Intn(nClusters)
@ -936,7 +938,8 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
c.config.ReplicationFactorMin = nClusters - 1
c.config.ReplicationFactorMax = nClusters - 1
}
j := rand.Intn(nClusters)
@ -959,7 +962,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
if err == nil {
t.Fatal("expected an error")
}
if !strings.Contains(err.Error(), "not enough candidates") {
if !strings.Contains(err.Error(), "not enough peers to allocate") {
t.Error("different error than expected")
t.Error(err)
}
@ -974,7 +977,8 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
c.config.ReplicationFactorMin = nClusters - 1
c.config.ReplicationFactorMax = nClusters - 1
}
// pin something

View File

@ -292,7 +292,8 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
// Adjust the replication factor for re-allocation
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
c.config.ReplicationFactorMin = nClusters - 1
c.config.ReplicationFactorMax = nClusters - 1
}
// We choose to remove the leader, to make things even more interesting

View File

@ -181,7 +181,7 @@ func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) {
}
func (mpt *MapPinTracker) isRemote(c api.Pin) bool {
if c.ReplicationFactor < 0 {
if c.ReplicationFactorMin < 0 {
return false
}

View File

@ -46,7 +46,8 @@ func TestTrack(t *testing.T) {
c := api.Pin{
Cid: h,
Allocations: []peer.ID{},
ReplicationFactor: -1,
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
err := mpt.Track(c)
@ -65,7 +66,8 @@ func TestTrack(t *testing.T) {
c = api.Pin{
Cid: h,
Allocations: []peer.ID{test.TestPeerID2},
ReplicationFactor: 1,
ReplicationFactorMin: 1,
ReplicationFactorMax: 1,
}
err = mpt.Track(c)
if err != nil {
@ -91,7 +93,8 @@ func TestUntrack(t *testing.T) {
c := api.Pin{
Cid: h1,
Allocations: []peer.ID{},
ReplicationFactor: -1,
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
err := mpt.Track(c)
@ -103,7 +106,8 @@ func TestUntrack(t *testing.T) {
c = api.Pin{
Cid: h2,
Allocations: []peer.ID{test.TestPeerID2},
ReplicationFactor: 1,
ReplicationFactorMin: 1,
ReplicationFactorMax: 1,
}
err = mpt.Track(c)
if err != nil {
@ -146,9 +150,19 @@ func TestStatusAll(t *testing.T) {
h2, _ := cid.Decode(test.TestCid2)
// LocalPin
c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1}
c := api.Pin{
Cid: h1,
Allocations: []peer.ID{},
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
mpt.Track(c)
c = api.Pin{Cid: h2, Allocations: []peer.ID{test.TestPeerID2}, ReplicationFactor: 1}
c = api.Pin{
Cid: h2,
Allocations: []peer.ID{},
ReplicationFactorMin: 1,
ReplicationFactorMax: 1,
}
mpt.Track(c)
time.Sleep(100 * time.Millisecond)
@ -176,9 +190,19 @@ func TestSyncAndRecover(t *testing.T) {
h1, _ := cid.Decode(test.TestCid1)
h2, _ := cid.Decode(test.TestCid2)
c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1}
c := api.Pin{
Cid: h1,
Allocations: []peer.ID{},
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
mpt.Track(c)
c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1}
c = api.Pin{
Cid: h2,
Allocations: []peer.ID{},
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
mpt.Track(c)
time.Sleep(100 * time.Millisecond)
@ -231,7 +255,12 @@ func TestRecoverAll(t *testing.T) {
h1, _ := cid.Decode(test.TestCid1)
c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1}
c := api.Pin{
Cid: h1,
Allocations: []peer.ID{},
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
mpt.Track(c)
time.Sleep(100 * time.Millisecond)
mpt.set(h1, api.TrackerStatusPinError)
@ -264,9 +293,20 @@ func TestSyncAll(t *testing.T) {
h1, _ := cid.Decode(test.TestCid1)
h2, _ := cid.Decode(test.TestCid2)
c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1}
c := api.Pin{
Cid: h1,
Allocations: []peer.ID{},
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
mpt.Track(c)
c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1}
c = api.Pin{
Cid: h2,
Allocations: []peer.ID{},
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
}
mpt.Track(c)
time.Sleep(100 * time.Millisecond)

View File

@ -17,7 +17,7 @@ import (
// Version is the map state Version. States with old versions should
// perform an upgrade before.
const Version = 2
const Version = 3
var logger = logging.Logger("mapstate")

View File

@ -18,7 +18,8 @@ var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLe
var c = api.Pin{
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
ReplicationFactor: -1,
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
}
func TestAdd(t *testing.T) {
@ -49,7 +50,8 @@ func TestGet(t *testing.T) {
get := ms.Get(c.Cid)
if get.Cid.String() != c.Cid.String() ||
get.Allocations[0] != c.Allocations[0] ||
get.ReplicationFactor != c.ReplicationFactor {
get.ReplicationFactorMax != c.ReplicationFactorMax ||
get.ReplicationFactorMin != c.ReplicationFactorMin {
t.Error("returned something different")
}
}
@ -65,7 +67,8 @@ func TestList(t *testing.T) {
list := ms.List()
if list[0].Cid.String() != c.Cid.String() ||
list[0].Allocations[0] != c.Allocations[0] ||
list[0].ReplicationFactor != c.ReplicationFactor {
list[0].ReplicationFactorMax != c.ReplicationFactorMax ||
list[0].ReplicationFactorMin != c.ReplicationFactorMin {
t.Error("returned something different")
}
}
@ -123,7 +126,7 @@ func TestMigrateFromV1(t *testing.T) {
t.Error(err)
}
get := ms.Get(c.Cid)
if get.ReplicationFactor != -1 || !get.Cid.Equals(c.Cid) {
if get.ReplicationFactorMax != -1 || get.ReplicationFactorMin != -1 || !get.Cid.Equals(c.Cid) {
t.Error("expected something different")
t.Logf("%+v", get)
}

View File

@ -36,9 +36,9 @@ func (st *mapStateV1) unmarshal(bs []byte) error {
// Migrate from v1 to v2
func (st *mapStateV1) next() migrateable {
var mst2 mapStateV2
mst2.PinMap = make(map[string]api.PinSerial)
mst2.PinMap = make(map[string]pinSerialV2)
for k := range st.PinMap {
mst2.PinMap[k] = api.PinSerial{
mst2.PinMap[k] = pinSerialV2{
Cid: k,
Allocations: []string{},
ReplicationFactor: -1,
@ -47,8 +47,15 @@ func (st *mapStateV1) next() migrateable {
return &mst2
}
type pinSerialV2 struct {
Cid string `json:"cid"`
Name string `json:"name"`
Allocations []string `json:"allocations"`
ReplicationFactor int `json:"replication_factor"`
}
type mapStateV2 struct {
PinMap map[string]api.PinSerial
PinMap map[string]pinSerialV2
Version int
}
@ -58,14 +65,39 @@ func (st *mapStateV2) unmarshal(bs []byte) error {
return dec.Decode(st)
}
// No migration possible, v2 is the latest state
func (st *mapStateV2) next() migrateable {
var mst3 mapStateV3
mst3.PinMap = make(map[string]api.PinSerial)
for k, v := range st.PinMap {
mst3.PinMap[k] = api.PinSerial{
Cid: v.Cid,
Name: v.Name,
Allocations: v.Allocations,
ReplicationFactorMin: v.ReplicationFactor,
ReplicationFactorMax: v.ReplicationFactor,
}
}
return &mst3
}
type mapStateV3 struct {
PinMap map[string]api.PinSerial
Version int
}
func (st *mapStateV3) unmarshal(bs []byte) error {
buf := bytes.NewBuffer(bs)
dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf)
return dec.Decode(st)
}
func (st *mapStateV3) next() migrateable {
return nil
}
func finalCopy(st *MapState, internal *mapStateV2) {
for k := range internal.PinMap {
st.PinMap[k] = internal.PinMap[k]
func finalCopy(st *MapState, internal *mapStateV3) {
for k, v := range internal.PinMap {
st.PinMap[k] = v
}
}
@ -75,8 +107,9 @@ func (st *MapState) migrateFrom(version int, snap []byte) error {
case 1:
var mst1 mapStateV1
m = &mst1
break
case 2:
var mst2 mapStateV2
m = &mst2
default:
return errors.New("version migration not supported")
}
@ -89,11 +122,11 @@ func (st *MapState) migrateFrom(version int, snap []byte) error {
for {
next = m.next()
if next == nil {
mst2, ok := m.(*mapStateV2)
mst3, ok := m.(*mapStateV3)
if !ok {
return errors.New("migration ended prematurely")
}
finalCopy(st, mst2)
finalCopy(st, mst3)
return nil
}
m = next

View File

@ -167,3 +167,10 @@ func containsPeer(list []peer.ID, peer peer.ID) bool {
}
return false
}
func minInt(x, y int) int {
if x < y {
return x
}
return y
}