From 4549282cbad716cf0a77edba13581a4dd86e4bc6 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 12 Jan 2018 18:04:46 +0100 Subject: [PATCH] Fix #277: Introduce maximum and minimum replication factor This PR replaces ReplicationFactor with ReplicationFactorMax and ReplicationFactor min. This allows a CID to be pinned even though the desired replication factor (max) is not reached, and prevents triggering re-pinnings when the replication factor has not crossed the lower threshold (min). License: MIT Signed-off-by: Hector Sanjuan --- .codeclimate.yml | 2 +- allocate.go | 195 +++++++++++++++++++++++ api/types.go | 44 ++--- api/types_test.go | 10 +- cluster.go | 129 +++------------ cluster_config.go | 116 ++++++++++---- cluster_config_test.go | 65 ++++++-- cluster_test.go | 4 +- consensus/raft/consensus_test.go | 10 +- consensus/raft/log_op_test.go | 2 +- ipfscluster_test.go | 14 +- peer_manager_test.go | 3 +- pintracker/maptracker/maptracker.go | 2 +- pintracker/maptracker/maptracker_test.go | 78 ++++++--- state/mapstate/map_state.go | 2 +- state/mapstate/map_state_test.go | 15 +- state/mapstate/migrate.go | 55 +++++-- util.go | 7 + 18 files changed, 524 insertions(+), 229 deletions(-) create mode 100644 allocate.go diff --git a/.codeclimate.yml b/.codeclimate.yml index c546a4a7..2813e27b 100644 --- a/.codeclimate.yml +++ b/.codeclimate.yml @@ -8,7 +8,7 @@ checks: threshold: 500 method-complexity: config: - threshold: 8 + threshold: 12 method-lines: config: threshold: 60 diff --git a/allocate.go b/allocate.go new file mode 100644 index 00000000..bf5afd48 --- /dev/null +++ b/allocate.go @@ -0,0 +1,195 @@ +package ipfscluster + +import ( + "errors" + "fmt" + + cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" + + "github.com/ipfs/ipfs-cluster/api" +) + +// This file gathers allocation logic used when pinning or re-pinning +// to find which peers should be allocated to a Cid. Allocation is constrained +// by ReplicationFactorMin and ReplicationFactorMax parametres obtained +// from the Pin object. + +//The allocation +// process has several steps: +// +// * Find which peers are pinning a CID +// * Obtain the last values for the configured informer metrics from the +// monitor component +// * Divide the metrics between "current" (peers already pinning the CID) +// and "candidates" (peers that could pin the CID), as long as their metrics +// are valid. +// * Given the candidates: +// * Check if we are overpinning an item +// * Check if there are not enough candidates for the "needed" replication +// factor. +// * If there are enough candidates: +// * Call the configured allocator, which sorts the candidates (and +// may veto some depending on the allocation strategy. +// * The allocator returns a list of final candidate peers sorted by +// order of preference. +// * Take as many final candidates from the list as we can, until +// ReplicationFactorMax is reached. Error if there are less than +// ReplicationFactorMin. + +// allocate finds peers to allocate a hash using the informer and the monitor +// it should only be used with valid replicationFactors (rplMin and rplMax +// which are positive and rplMin <= rplMax). +// It only returns new allocations when needed. nil, nil means current +// are ok. +func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID) ([]peer.ID, error) { + // Figure out who is holding the CID + currentAllocs := c.getCurrentAllocations(hash) + metrics, err := c.getInformerMetrics() + if err != nil { + return nil, err + } + + currentMetrics := make(map[peer.ID]api.Metric) + candidatesMetrics := make(map[peer.ID]api.Metric) + + // Divide metrics between current and candidates. + for _, m := range metrics { + switch { + case m.Discard() || containsPeer(blacklist, m.Peer): + // discard peers with invalid metrics and + // those in the blacklist + continue + case containsPeer(currentAllocs, m.Peer): + currentMetrics[m.Peer] = m + default: + candidatesMetrics[m.Peer] = m + } + } + + return c.obtainAllocations(hash, + rplMin, + rplMax, + currentMetrics, + candidatesMetrics) +} + +// getCurrentAllocations returns the list of peers allocated to a Cid. +func (c *Cluster) getCurrentAllocations(h *cid.Cid) []peer.ID { + var allocs []peer.ID + st, err := c.consensus.State() + if err != nil { + // no state we assume it is empty. If there was other + // problem, we would fail to commit anyway. + allocs = []peer.ID{} + } else { + pin := st.Get(h) + allocs = pin.Allocations + } + return allocs +} + +// getInformerMetrics returns the MonitorLastMetrics() for the +// configured informer. +func (c *Cluster) getInformerMetrics() ([]api.Metric, error) { + var metrics []api.Metric + metricName := c.informer.Name() + l, err := c.consensus.Leader() + if err != nil { + return nil, errors.New("cannot determine leading Monitor") + } + + err = c.rpcClient.Call(l, + "Cluster", "PeerMonitorLastMetrics", + metricName, + &metrics) + if err != nil { + return nil, err + } + return metrics, nil +} + +// allocationError logs an allocation error +func allocationError(hash *cid.Cid, needed, wanted int, candidatesValid []peer.ID) error { + logger.Errorf("Not enough candidates to allocate %s:", hash) + logger.Errorf(" Needed: %d", needed) + logger.Errorf(" Wanted: %d", wanted) + logger.Errorf(" Valid candidates: %d:", len(candidatesValid)) + for _, c := range candidatesValid { + logger.Errorf(" - %s", c.Pretty()) + } + errorMsg := "not enough peers to allocate CID. " + errorMsg += fmt.Sprintf("Needed at least: %d. ", needed) + errorMsg += fmt.Sprintf("Wanted at most: %d. ", wanted) + errorMsg += fmt.Sprintf("Valid candidates: %d. ", len(candidatesValid)) + errorMsg += "See logs for more info." + return errors.New(errorMsg) +} + +func (c *Cluster) obtainAllocations( + hash *cid.Cid, + rplMin, rplMax int, + currentValidMetrics, candidatesMetrics map[peer.ID]api.Metric) ([]peer.ID, error) { + + // The list of peers in current + validAllocations := make([]peer.ID, 0, len(currentValidMetrics)) + for k := range currentValidMetrics { + validAllocations = append(validAllocations, k) + } + + nCurrentValid := len(validAllocations) + nCandidatesValid := len(candidatesMetrics) + needed := rplMin - nCurrentValid // The minimum we need + wanted := rplMax - nCurrentValid // The maximum we want + + logger.Debugf("obtainAllocations: current valid: %d", nCurrentValid) + logger.Debugf("obtainAllocations: candidates valid: %d", nCandidatesValid) + logger.Debugf("obtainAllocations: Needed: %d", needed) + logger.Debugf("obtainAllocations: Wanted: %d", wanted) + + // Reminder: rplMin <= rplMax AND >0 + + if wanted <= 0 { // alocations above maximum threshold: drop some + // This could be done more intelligently by dropping them + // according to the allocator order (i.e. free-ing peers + // with most used space first). + return validAllocations[0 : len(validAllocations)+wanted], nil + } + + if needed <= 0 { // allocations are above minimal threshold + // We keep things as they are. Avoid any changes to the pin set. + return nil, nil + } + + if nCandidatesValid < needed { // not enough candidates + candidatesValid := []peer.ID{} + for k := range candidatesMetrics { + candidatesValid = append(candidatesValid, k) + } + return nil, allocationError(hash, needed, wanted, candidatesValid) + } + + // We can allocate from this point. Use the allocator to decide + // on the priority of candidates grab as many as "wanted" + + // the allocator returns a list of peers ordered by priority + finalAllocs, err := c.allocator.Allocate( + hash, currentValidMetrics, candidatesMetrics) + if err != nil { + return nil, logError(err.Error()) + } + + logger.Debugf("obtainAllocations: allocate(): %s", finalAllocs) + + // check that we have enough as the allocator may have returned + // less candidates than provided. + if got := len(finalAllocs); got < needed { + return nil, allocationError(hash, needed, wanted, finalAllocs) + } + + allocationsToUse := minInt(wanted, len(finalAllocs)) + + // the final result is the currently valid allocations + // along with the ones provided by the allocator + return append(validAllocations, finalAllocs[0:allocationsToUse]...), nil +} diff --git a/api/types.go b/api/types.go index 134f18aa..b3c09736 100644 --- a/api/types.go +++ b/api/types.go @@ -422,10 +422,11 @@ func StringsToPeers(strs []string) []peer.ID { // Pin is an argument that carries a Cid. It may carry more things in the // future. type Pin struct { - Cid *cid.Cid - Name string - Allocations []peer.ID - ReplicationFactor int + Cid *cid.Cid + Name string + Allocations []peer.ID + ReplicationFactorMin int + ReplicationFactorMax int } // PinCid is a shorcut to create a Pin only with a Cid. @@ -437,11 +438,12 @@ func PinCid(c *cid.Cid) Pin { // PinSerial is a serializable version of Pin type PinSerial struct { - Cid string `json:"cid"` - Name string `json:"name"` - Allocations []string `json:"allocations"` - Everywhere bool `json:"everywhere,omitempty"` // legacy - ReplicationFactor int `json:"replication_factor"` + Cid string `json:"cid"` + Name string `json:"name"` + Allocations []string `json:"allocations"` + ReplicationFactor int `json:"replication_factor,omitempty"` //legacy + ReplicationFactorMin int `json:"replication_factor_min"` + ReplicationFactorMax int `json:"replication_factor_max"` } // ToSerial converts a Pin to PinSerial. @@ -453,13 +455,13 @@ func (pin Pin) ToSerial() PinSerial { n := pin.Name allocs := PeersToStrings(pin.Allocations) - rpl := pin.ReplicationFactor return PinSerial{ - Cid: c, - Name: n, - Allocations: allocs, - ReplicationFactor: rpl, + Cid: c, + Name: n, + Allocations: allocs, + ReplicationFactorMin: pin.ReplicationFactorMin, + ReplicationFactorMax: pin.ReplicationFactorMax, } } @@ -471,15 +473,17 @@ func (pins PinSerial) ToPin() Pin { } // legacy format management - if pins.ReplicationFactor == 0 && pins.Everywhere { - pins.ReplicationFactor = -1 + if rf := pins.ReplicationFactor; rf != 0 { + pins.ReplicationFactorMin = rf + pins.ReplicationFactorMax = rf } return Pin{ - Cid: c, - Name: pins.Name, - Allocations: StringsToPeers(pins.Allocations), - ReplicationFactor: pins.ReplicationFactor, + Cid: c, + Name: pins.Name, + Allocations: StringsToPeers(pins.Allocations), + ReplicationFactorMin: pins.ReplicationFactorMin, + ReplicationFactorMax: pins.ReplicationFactorMax, } } diff --git a/api/types_test.go b/api/types_test.go index 9845ef39..d9aef5ae 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -148,15 +148,17 @@ func TestPinConv(t *testing.T) { }() c := Pin{ - Cid: testCid1, - Allocations: []peer.ID{testPeerID1}, - ReplicationFactor: -1, + Cid: testCid1, + Allocations: []peer.ID{testPeerID1}, + ReplicationFactorMax: -1, + ReplicationFactorMin: -1, } newc := c.ToSerial().ToPin() if c.Cid.String() != newc.Cid.String() || c.Allocations[0] != newc.Allocations[0] || - c.ReplicationFactor != newc.ReplicationFactor { + c.ReplicationFactorMin != newc.ReplicationFactorMin || + c.ReplicationFactorMax != newc.ReplicationFactorMax { t.Error("mismatch") } } diff --git a/cluster.go b/cluster.go index 7082416d..47108503 100644 --- a/cluster.go +++ b/cluster.go @@ -970,25 +970,36 @@ func (c *Cluster) Pin(pin api.Pin) error { // pin performs the actual pinning and supports a blacklist to be // able to evacuate a node. func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) error { - rpl := pin.ReplicationFactor - if rpl == 0 { - rpl = c.config.ReplicationFactor - pin.ReplicationFactor = rpl + rplMin := pin.ReplicationFactorMin + rplMax := pin.ReplicationFactorMax + if rplMin == 0 { + rplMin = c.config.ReplicationFactorMin + pin.ReplicationFactorMin = rplMin } + if rplMax == 0 { + rplMax = c.config.ReplicationFactorMax + pin.ReplicationFactorMax = rplMax + } + + if err := isReplicationFactorValid(rplMin, rplMax); err != nil { + return err + } + switch { - case rpl == 0: - return errors.New("replication factor is 0") - case rpl < 0: + case rplMin == -1 && rplMax == -1: pin.Allocations = []peer.ID{} logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid) - case rpl > 0: - allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor, blacklist) + default: + allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist) if err != nil { return err } + if allocs == nil { + logger.Infof("Skipping repinning of %s. Replication factor is within thresholds", pin.Cid) + return nil + } pin.Allocations = allocs logger.Infof("IPFS cluster pinning %s on %s:", pin.Cid, pin.Allocations) - } err := c.consensus.LogPin(pin) @@ -1276,104 +1287,6 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) { return id, err } -// allocate finds peers to allocate a hash using the informer and the monitor -// it should only be used with a positive replication factor -func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer.ID, error) { - if repl <= 0 { - return nil, errors.New("cannot decide allocation for replication factor <= 0") - } - - // Figure out who is currently holding this - var pinAllocations []peer.ID - st, err := c.consensus.State() - if err != nil { - // no state we assume it is empty. If there was other - // problem, we would fail to commit anyway. - pinAllocations = []peer.ID{} - } else { - pin := st.Get(hash) - pinAllocations = pin.Allocations - } - - // Get the LastMetrics from the leading monitor. They are the last - // valid metrics from current cluster peers - var metrics []api.Metric - metricName := c.informer.Name() - l, err := c.consensus.Leader() - if err != nil { - return nil, errors.New("cannot determine leading Monitor") - } - - err = c.rpcClient.Call(l, - "Cluster", "PeerMonitorLastMetrics", - metricName, - &metrics) - if err != nil { - return nil, err - } - - // We must divide the metrics between current and candidates - current := make(map[peer.ID]api.Metric) - candidates := make(map[peer.ID]api.Metric) - validAllocations := make([]peer.ID, 0, len(pinAllocations)) - for _, m := range metrics { - if m.Discard() || containsPeer(blacklist, m.Peer) { - // blacklisted peers do not exist for us - continue - } else if containsPeer(pinAllocations, m.Peer) { - current[m.Peer] = m - validAllocations = append(validAllocations, m.Peer) - } else { - candidates[m.Peer] = m - } - } - - currentValid := len(validAllocations) - candidatesValid := len(candidates) - needed := repl - currentValid - - logger.Debugf("allocate: Valid allocations: %d", currentValid) - logger.Debugf("allocate: Valid candidates: %d", candidatesValid) - logger.Debugf("allocate: Needed: %d", needed) - - // If needed == 0, we don't need anything. If needed < 0, we are - // reducing the replication factor - switch { - - case needed <= 0: // set the allocations to the needed ones - return validAllocations[0 : len(validAllocations)+needed], nil - case candidatesValid < needed: - candidatesIds := []peer.ID{} - for k := range candidates { - candidatesIds = append(candidatesIds, k) - } - err = logError( - "not enough candidates to allocate %s. Needed: %d. Got: %d (%s)", - hash, needed, candidatesValid, candidatesIds) - return nil, err - default: - // this will return candidate peers in order of - // preference according to the allocator. - candidateAllocs, err := c.allocator.Allocate(hash, current, candidates) - if err != nil { - return nil, logError(err.Error()) - } - - logger.Debugf("allocate: candidate allocations: %s", candidateAllocs) - - // we don't have enough peers to pin - if got := len(candidateAllocs); got < needed { - err = logError( - "cannot find enough allocations for %s. Needed: %d. Got: %d (%s)", - hash, needed, got, candidateAllocs) - return nil, err - } - - // the new allocations = the valid ones we had + the needed ones - return append(validAllocations, candidateAllocs[0:needed]...), nil - } -} - // diffPeers returns the peerIDs added and removed from peers2 in relation to // peers1 func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) { diff --git a/cluster_config.go b/cluster_config.go index ff572ca5..5fcae726 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -87,11 +87,23 @@ type Config struct { // large. IPFSSyncInterval time.Duration - // ReplicationFactor indicates the number of nodes that must pin content. - // For exampe, a replication_factor of 2 will prompt cluster to choose - // two nodes for each pinned hash. A replication_factor -1 will - // use every available node for each pin. - ReplicationFactor int + // ReplicationFactorMax indicates the target number of nodes + // that should pin content. For exampe, a replication_factor of + // 3 will have cluster allocate each pinned hash to 3 peers if + // possible. + // See also ReplicationFactorMin. A ReplicationFactorMax of -1 + // will allocate to every available node. + ReplicationFactorMax int + + // ReplicationFactorMin indicates the minimum number of healthy + // nodes pinning content. If the number of nodes available to pin + // is less than this threshold, an error will be returned. + // In the case of peer health issues, content pinned will be + // re-allocated if the threshold is crossed. + // For exampe, a ReplicationFactorMin of 2 will allocate at least + // two peer to hold content, and return an error if this is not + // possible. + ReplicationFactorMin int // MonitorPingInterval is frequency by which a cluster peer pings the // monitoring component. The ping metric has a TTL set to the double @@ -103,18 +115,20 @@ type Config struct { // saved using JSON. Most configuration keys are converted into simple types // like strings, and key names aim to be self-explanatory for the user. type configJSON struct { - ID string `json:"id"` - Peername string `json:"peername"` - PrivateKey string `json:"private_key"` - Secret string `json:"secret"` - Peers []string `json:"peers"` - Bootstrap []string `json:"bootstrap"` - LeaveOnShutdown bool `json:"leave_on_shutdown"` - ListenMultiaddress string `json:"listen_multiaddress"` - StateSyncInterval string `json:"state_sync_interval"` - IPFSSyncInterval string `json:"ipfs_sync_interval"` - ReplicationFactor int `json:"replication_factor"` - MonitorPingInterval string `json:"monitor_ping_interval"` + ID string `json:"id"` + Peername string `json:"peername"` + PrivateKey string `json:"private_key"` + Secret string `json:"secret"` + Peers []string `json:"peers"` + Bootstrap []string `json:"bootstrap"` + LeaveOnShutdown bool `json:"leave_on_shutdown"` + ListenMultiaddress string `json:"listen_multiaddress"` + StateSyncInterval string `json:"state_sync_interval"` + IPFSSyncInterval string `json:"ipfs_sync_interval"` + ReplicationFactor int `json:"replication_factor,omitempty"` // legacy + ReplicationFactorMin int `json:"replication_factor_min"` + ReplicationFactorMax int `json:"replication_factor_max"` + MonitorPingInterval string `json:"monitor_ping_interval"` } // ConfigKey returns a human-readable string to identify @@ -190,10 +204,34 @@ func (cfg *Config) Validate() error { return errors.New("cluster.monitoring_interval is invalid") } - if cfg.ReplicationFactor < -1 { - return errors.New("cluster.replication_factor is invalid") + rfMax := cfg.ReplicationFactorMax + rfMin := cfg.ReplicationFactorMin + + return isReplicationFactorValid(rfMin, rfMax) +} + +func isReplicationFactorValid(rplMin, rplMax int) error { + // check Max and Min are correct + if rplMin == 0 || rplMax == 0 { + return errors.New("cluster.replication_factor_min and max must be set") } + if rplMin > rplMax { + return errors.New("cluster.replication_factor_min is larger than max") + } + + if rplMin < -1 { + return errors.New("cluster.replication_factor_min is wrong") + } + + if rplMax < -1 { + return errors.New("cluster.replication_factor_max is wrong") + } + + if (rplMin == -1 && rplMax != -1) || + (rplMin != -1 && rplMax == -1) { + return errors.New("cluster.replication_factor_min and max must be -1 when one of them is") + } return nil } @@ -212,7 +250,8 @@ func (cfg *Config) setDefaults() { cfg.LeaveOnShutdown = DefaultLeaveOnShutdown cfg.StateSyncInterval = DefaultStateSyncInterval cfg.IPFSSyncInterval = DefaultIPFSSyncInterval - cfg.ReplicationFactor = DefaultReplicationFactor + cfg.ReplicationFactorMin = DefaultReplicationFactor + cfg.ReplicationFactorMax = DefaultReplicationFactor cfg.MonitorPingInterval = DefaultMonitorPingInterval } @@ -230,6 +269,14 @@ func (cfg *Config) LoadJSON(raw []byte) error { // Make sure all non-defined keys have good values. cfg.setDefaults() + parseDuration := func(txt string) time.Duration { + d, _ := time.ParseDuration(txt) + if txt != "" && d == 0 { + logger.Warningf("%s is not a valid duration. Default will be used", txt) + } + return d + } + id, err := peer.IDB58Decode(jcfg.ID) if err != nil { err = fmt.Errorf("error decoding cluster ID: %s", err) @@ -291,22 +338,22 @@ func (cfg *Config) LoadJSON(raw []byte) error { } cfg.ListenAddr = clusterAddr - if rf := jcfg.ReplicationFactor; rf == 0 { - logger.Warning("Replication factor set to -1 (pin everywhere)") - cfg.ReplicationFactor = -1 - } else { - cfg.ReplicationFactor = rf + rplMin := jcfg.ReplicationFactorMin + rplMax := jcfg.ReplicationFactorMax + if jcfg.ReplicationFactor != 0 { // read min and max + rplMin = jcfg.ReplicationFactor + rplMax = rplMin } + config.SetIfNotDefault(rplMin, &cfg.ReplicationFactorMin) + config.SetIfNotDefault(rplMax, &cfg.ReplicationFactorMax) - // Validation will detect problems here - interval, _ := time.ParseDuration(jcfg.StateSyncInterval) - cfg.StateSyncInterval = interval + stateSyncInterval := parseDuration(jcfg.StateSyncInterval) + ipfsSyncInterval := parseDuration(jcfg.IPFSSyncInterval) + monitorPingInterval := parseDuration(jcfg.MonitorPingInterval) - interval, _ = time.ParseDuration(jcfg.IPFSSyncInterval) - cfg.IPFSSyncInterval = interval - - interval, _ = time.ParseDuration(jcfg.MonitorPingInterval) - cfg.MonitorPingInterval = interval + config.SetIfNotDefault(stateSyncInterval, &cfg.StateSyncInterval) + config.SetIfNotDefault(ipfsSyncInterval, &cfg.IPFSSyncInterval) + config.SetIfNotDefault(monitorPingInterval, &cfg.MonitorPingInterval) cfg.LeaveOnShutdown = jcfg.LeaveOnShutdown @@ -350,7 +397,8 @@ func (cfg *Config) ToJSON() (raw []byte, err error) { jcfg.Secret = EncodeClusterSecret(cfg.Secret) jcfg.Peers = clusterPeers jcfg.Bootstrap = bootstrap - jcfg.ReplicationFactor = cfg.ReplicationFactor + jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin + jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax jcfg.LeaveOnShutdown = cfg.LeaveOnShutdown jcfg.ListenMultiaddress = cfg.ListenAddr.String() jcfg.StateSyncInterval = cfg.StateSyncInterval.String() diff --git a/cluster_config_test.go b/cluster_config_test.go index 4c9e5072..8b69bd02 100644 --- a/cluster_config_test.go +++ b/cluster_config_test.go @@ -21,7 +21,8 @@ var ccfgTestJSON = []byte(` "listen_multiaddress": "/ip4/127.0.0.1/tcp/10000", "state_sync_interval": "1m0s", "ipfs_sync_interval": "2m10s", - "replication_factor": 5, + "replication_factor_min": 5, + "replication_factor_max": 5, "monitor_ping_interval": "2s" } `) @@ -41,8 +42,8 @@ func TestLoadJSON(t *testing.T) { t.Error("expected 1 peer and 1 bootstrap") } - if cfg.ReplicationFactor != 5 { - t.Error("expected replication factor 5") + if cfg.ReplicationFactorMin != 5 { + t.Error("expected replication factor min == 5") } j := &configJSON{} @@ -111,20 +112,51 @@ func TestLoadJSON(t *testing.T) { j = &configJSON{} json.Unmarshal(ccfgTestJSON, j) - j.StateSyncInterval = "" + j.ReplicationFactor = 0 + j.ReplicationFactorMin = 0 + j.ReplicationFactorMax = 0 tst, _ = json.Marshal(j) - err = cfg.LoadJSON(tst) - if err == nil { - t.Error("expected error state_sync_interval") + cfg.LoadJSON(tst) + if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 { + t.Error("expected default replication factor") } j = &configJSON{} json.Unmarshal(ccfgTestJSON, j) - j.ReplicationFactor = 0 + j.ReplicationFactor = 3 tst, _ = json.Marshal(j) cfg.LoadJSON(tst) - if cfg.ReplicationFactor != -1 { - t.Error("expected default replication factor") + if cfg.ReplicationFactorMin != 3 || cfg.ReplicationFactorMax != 3 { + t.Error("expected replicationFactor Min/Max override") + } + + j = &configJSON{} + json.Unmarshal(ccfgTestJSON, j) + j.ReplicationFactorMin = -1 + tst, _ = json.Marshal(j) + err = cfg.LoadJSON(tst) + if err == nil { + t.Error("expected error when only one replication factor is -1") + } + + j = &configJSON{} + json.Unmarshal(ccfgTestJSON, j) + j.ReplicationFactorMin = 5 + j.ReplicationFactorMax = 4 + tst, _ = json.Marshal(j) + err = cfg.LoadJSON(tst) + if err == nil { + t.Error("expected error when only rplMin > rplMax") + } + + j = &configJSON{} + json.Unmarshal(ccfgTestJSON, j) + j.ReplicationFactorMin = 0 + j.ReplicationFactorMax = 0 + tst, _ = json.Marshal(j) + err = cfg.LoadJSON(tst) + if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 { + t.Error("expected default replication factors") } } @@ -163,4 +195,17 @@ func TestValidate(t *testing.T) { if cfg.Validate() == nil { t.Fatal("expected error validating") } + + cfg.Default() + cfg.ReplicationFactorMin = 10 + cfg.ReplicationFactorMax = 5 + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } + + cfg.Default() + cfg.ReplicationFactorMin = 0 + if cfg.Validate() == nil { + t.Fatal("expected error validating") + } } diff --git a/cluster_test.go b/cluster_test.go index f1cd5587..96ee9573 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -219,7 +219,7 @@ func TestClusterPins(t *testing.T) { if len(pins) != 1 { t.Fatal("pin should be part of the state") } - if !pins[0].Cid.Equals(c) || pins[0].ReplicationFactor != -1 { + if !pins[0].Cid.Equals(c) || pins[0].ReplicationFactorMin != -1 || pins[0].ReplicationFactorMax != -1 { t.Error("the Pin does not look as expected") } } @@ -239,7 +239,7 @@ func TestClusterPinGet(t *testing.T) { if err != nil { t.Fatal(err) } - if !pin.Cid.Equals(c) || pin.ReplicationFactor != -1 { + if !pin.Cid.Equals(c) || pin.ReplicationFactorMax != -1 || pin.ReplicationFactorMax != -1 { t.Error("the Pin does not look as expected") } diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index 10919865..4af7c887 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -96,7 +96,7 @@ func TestConsensusPin(t *testing.T) { defer cc.Shutdown() c, _ := cid.Decode(test.TestCid1) - err := cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1}) + err := cc.LogPin(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1}) if err != nil { t.Error("the operation did not make it to the log:", err) } @@ -186,7 +186,7 @@ func TestConsensusRmPeer(t *testing.T) { cc.raft.WaitForLeader(ctx) c, _ := cid.Decode(test.TestCid1) - err = cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1}) + err = cc.LogPin(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1}) if err != nil { t.Error("could not pin after adding peer:", err) } @@ -234,7 +234,7 @@ func TestRaftLatestSnapshot(t *testing.T) { // Make pin 1 c1, _ := cid.Decode(test.TestCid1) - err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1}) + err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactorMin: -1, ReplicationFactorMax: -1}) if err != nil { t.Error("the first pin did not make it to the log:", err) } @@ -247,7 +247,7 @@ func TestRaftLatestSnapshot(t *testing.T) { // Make pin 2 c2, _ := cid.Decode(test.TestCid2) - err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1}) + err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactorMin: -1, ReplicationFactorMax: -1}) if err != nil { t.Error("the second pin did not make it to the log:", err) } @@ -260,7 +260,7 @@ func TestRaftLatestSnapshot(t *testing.T) { // Make pin 3 c3, _ := cid.Decode(test.TestCid3) - err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1}) + err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactorMin: -1, ReplicationFactorMax: -1}) if err != nil { t.Error("the third pin did not make it to the log:", err) } diff --git a/consensus/raft/log_op_test.go b/consensus/raft/log_op_test.go index 1c639dcc..713faef0 100644 --- a/consensus/raft/log_op_test.go +++ b/consensus/raft/log_op_test.go @@ -40,7 +40,7 @@ func TestApplyToUnpin(t *testing.T) { st := mapstate.NewMapState() c, _ := cid.Decode(test.TestCid1) - st.Add(api.Pin{Cid: c, ReplicationFactor: -1}) + st.Add(api.Pin{Cid: c, ReplicationFactorMin: -1, ReplicationFactorMax: -1}) op.ApplyTo(st) pins := st.List() if len(pins) != 0 { diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 37ed122b..c29068a6 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -737,7 +737,8 @@ func TestClustersReplication(t *testing.T) { clusters, mock := createClusters(t) defer shutdownClusters(t, clusters, mock) for _, c := range clusters { - c.config.ReplicationFactor = nClusters - 1 + c.config.ReplicationFactorMin = nClusters - 1 + c.config.ReplicationFactorMax = nClusters - 1 } // Why is replication factor nClusters - 1? @@ -836,7 +837,8 @@ func TestClustersReplicationRealloc(t *testing.T) { clusters, mock := createClusters(t) defer shutdownClusters(t, clusters, mock) for _, c := range clusters { - c.config.ReplicationFactor = nClusters - 1 + c.config.ReplicationFactorMin = nClusters - 1 + c.config.ReplicationFactorMax = nClusters - 1 } j := rand.Intn(nClusters) @@ -936,7 +938,8 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) { clusters, mock := createClusters(t) defer shutdownClusters(t, clusters, mock) for _, c := range clusters { - c.config.ReplicationFactor = nClusters - 1 + c.config.ReplicationFactorMin = nClusters - 1 + c.config.ReplicationFactorMax = nClusters - 1 } j := rand.Intn(nClusters) @@ -959,7 +962,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) { if err == nil { t.Fatal("expected an error") } - if !strings.Contains(err.Error(), "not enough candidates") { + if !strings.Contains(err.Error(), "not enough peers to allocate") { t.Error("different error than expected") t.Error(err) } @@ -974,7 +977,8 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) { clusters, mock := createClusters(t) defer shutdownClusters(t, clusters, mock) for _, c := range clusters { - c.config.ReplicationFactor = nClusters - 1 + c.config.ReplicationFactorMin = nClusters - 1 + c.config.ReplicationFactorMax = nClusters - 1 } // pin something diff --git a/peer_manager_test.go b/peer_manager_test.go index cb8d90b9..30bd123d 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -292,7 +292,8 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { // Adjust the replication factor for re-allocation for _, c := range clusters { - c.config.ReplicationFactor = nClusters - 1 + c.config.ReplicationFactorMin = nClusters - 1 + c.config.ReplicationFactorMax = nClusters - 1 } // We choose to remove the leader, to make things even more interesting diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go index 84800b18..e40d9d06 100644 --- a/pintracker/maptracker/maptracker.go +++ b/pintracker/maptracker/maptracker.go @@ -181,7 +181,7 @@ func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) { } func (mpt *MapPinTracker) isRemote(c api.Pin) bool { - if c.ReplicationFactor < 0 { + if c.ReplicationFactorMin < 0 { return false } diff --git a/pintracker/maptracker/maptracker_test.go b/pintracker/maptracker/maptracker_test.go index 1f4ac7a7..07ae7ef8 100644 --- a/pintracker/maptracker/maptracker_test.go +++ b/pintracker/maptracker/maptracker_test.go @@ -44,9 +44,10 @@ func TestTrack(t *testing.T) { // Let's tart with a local pin c := api.Pin{ - Cid: h, - Allocations: []peer.ID{}, - ReplicationFactor: -1, + Cid: h, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, } err := mpt.Track(c) @@ -63,9 +64,10 @@ func TestTrack(t *testing.T) { // Unpin and set remote c = api.Pin{ - Cid: h, - Allocations: []peer.ID{test.TestPeerID2}, - ReplicationFactor: 1, + Cid: h, + Allocations: []peer.ID{test.TestPeerID2}, + ReplicationFactorMin: 1, + ReplicationFactorMax: 1, } err = mpt.Track(c) if err != nil { @@ -89,9 +91,10 @@ func TestUntrack(t *testing.T) { // LocalPin c := api.Pin{ - Cid: h1, - Allocations: []peer.ID{}, - ReplicationFactor: -1, + Cid: h1, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, } err := mpt.Track(c) @@ -101,9 +104,10 @@ func TestUntrack(t *testing.T) { // Remote pin c = api.Pin{ - Cid: h2, - Allocations: []peer.ID{test.TestPeerID2}, - ReplicationFactor: 1, + Cid: h2, + Allocations: []peer.ID{test.TestPeerID2}, + ReplicationFactorMin: 1, + ReplicationFactorMax: 1, } err = mpt.Track(c) if err != nil { @@ -146,9 +150,19 @@ func TestStatusAll(t *testing.T) { h2, _ := cid.Decode(test.TestCid2) // LocalPin - c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1} + c := api.Pin{ + Cid: h1, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + } mpt.Track(c) - c = api.Pin{Cid: h2, Allocations: []peer.ID{test.TestPeerID2}, ReplicationFactor: 1} + c = api.Pin{ + Cid: h2, + Allocations: []peer.ID{}, + ReplicationFactorMin: 1, + ReplicationFactorMax: 1, + } mpt.Track(c) time.Sleep(100 * time.Millisecond) @@ -176,9 +190,19 @@ func TestSyncAndRecover(t *testing.T) { h1, _ := cid.Decode(test.TestCid1) h2, _ := cid.Decode(test.TestCid2) - c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1} + c := api.Pin{ + Cid: h1, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + } mpt.Track(c) - c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1} + c = api.Pin{ + Cid: h2, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + } mpt.Track(c) time.Sleep(100 * time.Millisecond) @@ -231,7 +255,12 @@ func TestRecoverAll(t *testing.T) { h1, _ := cid.Decode(test.TestCid1) - c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1} + c := api.Pin{ + Cid: h1, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + } mpt.Track(c) time.Sleep(100 * time.Millisecond) mpt.set(h1, api.TrackerStatusPinError) @@ -264,9 +293,20 @@ func TestSyncAll(t *testing.T) { h1, _ := cid.Decode(test.TestCid1) h2, _ := cid.Decode(test.TestCid2) - c := api.Pin{Cid: h1, Allocations: []peer.ID{}, ReplicationFactor: -1} + c := api.Pin{ + Cid: h1, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + } + mpt.Track(c) - c = api.Pin{Cid: h2, Allocations: []peer.ID{}, ReplicationFactor: -1} + c = api.Pin{ + Cid: h2, + Allocations: []peer.ID{}, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + } mpt.Track(c) time.Sleep(100 * time.Millisecond) diff --git a/state/mapstate/map_state.go b/state/mapstate/map_state.go index fac8c429..12a8eb86 100644 --- a/state/mapstate/map_state.go +++ b/state/mapstate/map_state.go @@ -17,7 +17,7 @@ import ( // Version is the map state Version. States with old versions should // perform an upgrade before. -const Version = 2 +const Version = 3 var logger = logging.Logger("mapstate") diff --git a/state/mapstate/map_state_test.go b/state/mapstate/map_state_test.go index 6888e915..98e3db22 100644 --- a/state/mapstate/map_state_test.go +++ b/state/mapstate/map_state_test.go @@ -16,9 +16,10 @@ var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") var c = api.Pin{ - Cid: testCid1, - Allocations: []peer.ID{testPeerID1}, - ReplicationFactor: -1, + Cid: testCid1, + Allocations: []peer.ID{testPeerID1}, + ReplicationFactorMax: -1, + ReplicationFactorMin: -1, } func TestAdd(t *testing.T) { @@ -49,7 +50,8 @@ func TestGet(t *testing.T) { get := ms.Get(c.Cid) if get.Cid.String() != c.Cid.String() || get.Allocations[0] != c.Allocations[0] || - get.ReplicationFactor != c.ReplicationFactor { + get.ReplicationFactorMax != c.ReplicationFactorMax || + get.ReplicationFactorMin != c.ReplicationFactorMin { t.Error("returned something different") } } @@ -65,7 +67,8 @@ func TestList(t *testing.T) { list := ms.List() if list[0].Cid.String() != c.Cid.String() || list[0].Allocations[0] != c.Allocations[0] || - list[0].ReplicationFactor != c.ReplicationFactor { + list[0].ReplicationFactorMax != c.ReplicationFactorMax || + list[0].ReplicationFactorMin != c.ReplicationFactorMin { t.Error("returned something different") } } @@ -123,7 +126,7 @@ func TestMigrateFromV1(t *testing.T) { t.Error(err) } get := ms.Get(c.Cid) - if get.ReplicationFactor != -1 || !get.Cid.Equals(c.Cid) { + if get.ReplicationFactorMax != -1 || get.ReplicationFactorMin != -1 || !get.Cid.Equals(c.Cid) { t.Error("expected something different") t.Logf("%+v", get) } diff --git a/state/mapstate/migrate.go b/state/mapstate/migrate.go index 972bdc39..13679465 100644 --- a/state/mapstate/migrate.go +++ b/state/mapstate/migrate.go @@ -36,9 +36,9 @@ func (st *mapStateV1) unmarshal(bs []byte) error { // Migrate from v1 to v2 func (st *mapStateV1) next() migrateable { var mst2 mapStateV2 - mst2.PinMap = make(map[string]api.PinSerial) + mst2.PinMap = make(map[string]pinSerialV2) for k := range st.PinMap { - mst2.PinMap[k] = api.PinSerial{ + mst2.PinMap[k] = pinSerialV2{ Cid: k, Allocations: []string{}, ReplicationFactor: -1, @@ -47,8 +47,15 @@ func (st *mapStateV1) next() migrateable { return &mst2 } +type pinSerialV2 struct { + Cid string `json:"cid"` + Name string `json:"name"` + Allocations []string `json:"allocations"` + ReplicationFactor int `json:"replication_factor"` +} + type mapStateV2 struct { - PinMap map[string]api.PinSerial + PinMap map[string]pinSerialV2 Version int } @@ -58,14 +65,39 @@ func (st *mapStateV2) unmarshal(bs []byte) error { return dec.Decode(st) } -// No migration possible, v2 is the latest state func (st *mapStateV2) next() migrateable { + var mst3 mapStateV3 + mst3.PinMap = make(map[string]api.PinSerial) + for k, v := range st.PinMap { + mst3.PinMap[k] = api.PinSerial{ + Cid: v.Cid, + Name: v.Name, + Allocations: v.Allocations, + ReplicationFactorMin: v.ReplicationFactor, + ReplicationFactorMax: v.ReplicationFactor, + } + } + return &mst3 +} + +type mapStateV3 struct { + PinMap map[string]api.PinSerial + Version int +} + +func (st *mapStateV3) unmarshal(bs []byte) error { + buf := bytes.NewBuffer(bs) + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) + return dec.Decode(st) +} + +func (st *mapStateV3) next() migrateable { return nil } -func finalCopy(st *MapState, internal *mapStateV2) { - for k := range internal.PinMap { - st.PinMap[k] = internal.PinMap[k] +func finalCopy(st *MapState, internal *mapStateV3) { + for k, v := range internal.PinMap { + st.PinMap[k] = v } } @@ -75,8 +107,9 @@ func (st *MapState) migrateFrom(version int, snap []byte) error { case 1: var mst1 mapStateV1 m = &mst1 - break - + case 2: + var mst2 mapStateV2 + m = &mst2 default: return errors.New("version migration not supported") } @@ -89,11 +122,11 @@ func (st *MapState) migrateFrom(version int, snap []byte) error { for { next = m.next() if next == nil { - mst2, ok := m.(*mapStateV2) + mst3, ok := m.(*mapStateV3) if !ok { return errors.New("migration ended prematurely") } - finalCopy(st, mst2) + finalCopy(st, mst3) return nil } m = next diff --git a/util.go b/util.go index 16fcca2e..a7cc76ff 100644 --- a/util.go +++ b/util.go @@ -167,3 +167,10 @@ func containsPeer(list []peer.ID, peer peer.ID) bool { } return false } + +func minInt(x, y int) int { + if x < y { + return x + } + return y +}