Support replication factor as a pin parameter

This adds a replication_factor query argument to the API
endpoint which allows to set a replication factor per Pin.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-03-08 18:28:43 +01:00
parent 9b652bcfb3
commit 01d65a1595
13 changed files with 89 additions and 78 deletions

View File

@ -337,7 +337,6 @@ func (addrsS MultiaddrsSerial) ToMultiaddrs() []ma.Multiaddr {
type Pin struct {
Cid *cid.Cid
Allocations []peer.ID
Everywhere bool
ReplicationFactor int
}
@ -350,9 +349,10 @@ func PinCid(c *cid.Cid) Pin {
// PinSerial is a serializable version of Pin
type PinSerial struct {
Cid string `json:"cid"`
Allocations []string `json:"allocations"`
Everywhere bool `json:"everywhere"`
Cid string `json:"cid"`
Allocations []string `json:"allocations"`
Everywhere bool `json:"everywhere,omitempty"` // legacy
ReplicationFactor int `json:"replication_factor"`
}
// ToSerial converts a Pin to PinSerial.
@ -364,9 +364,9 @@ func (pin Pin) ToSerial() PinSerial {
}
return PinSerial{
Cid: pin.Cid.String(),
Allocations: allocs,
Everywhere: pin.Everywhere,
Cid: pin.Cid.String(),
Allocations: allocs,
ReplicationFactor: pin.ReplicationFactor,
}
}
@ -378,10 +378,16 @@ func (pins PinSerial) ToPin() Pin {
for i, p := range pins.Allocations {
allocs[i], _ = peer.IDB58Decode(p)
}
// legacy format management
if pins.ReplicationFactor == 0 && pins.Everywhere {
pins.ReplicationFactor = -1
}
return Pin{
Cid: c,
Allocations: allocs,
Everywhere: pins.Everywhere,
Cid: c,
Allocations: allocs,
ReplicationFactor: pins.ReplicationFactor,
}
}

View File

@ -141,15 +141,15 @@ func TestPinConv(t *testing.T) {
}()
c := Pin{
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
Everywhere: true,
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
ReplicationFactor: -1,
}
newc := c.ToSerial().ToPin()
if c.Cid.String() != newc.Cid.String() ||
c.Allocations[0] != newc.Allocations[0] ||
c.Everywhere != newc.Everywhere {
c.ReplicationFactor != newc.ReplicationFactor {
t.Error("mismatch")
}
}

View File

@ -262,11 +262,6 @@ func (c *Cluster) alertsHandler() {
case <-c.ctx.Done():
return
case alrt := <-c.monitor.Alerts():
// no point in repinning when pinning everywhere
if c.config.ReplicationFactor == -1 {
break
}
leader, _ := c.consensus.Leader()
// discard while not leaders as our monitor is not
// getting metrics in that case
@ -289,11 +284,11 @@ func (c *Cluster) repinFromPeer(p peer.ID) {
return
}
list := cState.List()
for _, cidInfo := range list {
for _, alloc := range cidInfo.Allocations {
if alloc == p {
logger.Infof("repinning %s out of %s", cidInfo.Cid, p.Pretty())
c.Pin(cidInfo.Cid)
for _, pin := range list {
for _, alloc := range pin.Allocations {
if alloc == p { // found pin allocated to node
logger.Infof("repinning %s out of %s", pin.Cid, p.Pretty())
c.Pin(pin)
}
}
}
@ -724,26 +719,24 @@ func (c *Cluster) Pins() []api.Pin {
// Pin returns an error if the operation could not be persisted
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
func (c *Cluster) Pin(h *cid.Cid) error {
pin := api.Pin{
Cid: h,
func (c *Cluster) Pin(pin api.Pin) error {
rpl := pin.ReplicationFactor
if rpl == 0 {
rpl = c.config.ReplicationFactor
pin.ReplicationFactor = rpl
}
rpl := c.config.ReplicationFactor
switch {
case rpl == 0:
return errors.New("replication factor is 0")
case rpl < 0:
pin.Everywhere = true
logger.Infof("IPFS cluster pinning %s everywhere:", h)
logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid)
case rpl > 0:
allocs, err := c.allocate(h)
allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor)
if err != nil {
return err
}
pin.Allocations = allocs
logger.Infof("IPFS cluster pinning %s on %s:", h, pin.Allocations)
logger.Infof("IPFS cluster pinning %s on %s:", pin.Cid, pin.Allocations)
}
@ -969,8 +962,8 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) {
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with a positive replication factor
func (c *Cluster) allocate(hash *cid.Cid) ([]peer.ID, error) {
if c.config.ReplicationFactor <= 0 {
func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) {
if repl <= 0 {
return nil, errors.New("cannot decide allocation for replication factor <= 0")
}
@ -1040,7 +1033,7 @@ func (c *Cluster) allocate(hash *cid.Cid) ([]peer.ID, error) {
// how many allocations do we need (note we will re-allocate if we did
// not receive good metrics for currently allocated peeers)
needed := c.config.ReplicationFactor - len(currentlyAllocatedPeersMetrics)
needed := repl - len(currentlyAllocatedPeersMetrics)
// if we are already good (note invalid metrics would trigger
// re-allocations as they are not included in currentAllocMetrics)

View File

@ -124,7 +124,7 @@ func TestClusterStateSync(t *testing.T) {
}
c, _ := cid.Decode(test.TestCid1)
err = cl.Pin(c)
err = cl.Pin(api.PinCid(c))
if err != nil {
t.Fatal("pin should have worked:", err)
}
@ -168,14 +168,14 @@ func TestClusterPin(t *testing.T) {
defer cl.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cl.Pin(c)
err := cl.Pin(api.PinCid(c))
if err != nil {
t.Fatal("pin should have worked:", err)
}
// test an error case
cl.consensus.Shutdown()
err = cl.Pin(c)
err = cl.Pin(api.PinCid(c))
if err == nil {
t.Error("expected an error but things worked")
}

View File

@ -59,7 +59,7 @@ func TestConsensusPin(t *testing.T) {
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c, Everywhere: true})
err := cc.LogPin(api.Pin{Cid: c, ReplicationFactor: -1})
if err != nil {
t.Error("the operation did not make it to the log:", err)
}

View File

@ -107,7 +107,7 @@ func textFormatPrintVersion(obj *api.Version) {
func textFormatPrintPin(obj *api.PinSerial) {
fmt.Printf("%s | Allocations: ", obj.Cid)
if obj.Everywhere {
if obj.ReplicationFactor < 0 {
fmt.Printf("[everywhere]\n")
} else {
fmt.Printf("%s\n", obj.Allocations)

View File

@ -267,12 +267,12 @@ func TestClustersPin(t *testing.T) {
j := rand.Intn(nClusters) // choose a random cluster peer
h, err := prefix.Sum(randomBytes()) // create random cid
checkErr(t, err)
err = clusters[j].Pin(h)
err = clusters[j].Pin(api.PinCid(h))
if err != nil {
t.Errorf("error pinning %s: %s", h, err)
}
// Test re-pin
err = clusters[j].Pin(h)
err = clusters[j].Pin(api.PinCid(h))
if err != nil {
t.Errorf("error repinning %s: %s", h, err)
}
@ -325,7 +325,7 @@ func TestClustersStatusAll(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(h)
clusters[0].Pin(api.PinCid(h))
delay()
// Global status
f := func(t *testing.T, c *Cluster) {
@ -370,8 +370,8 @@ func TestClustersSyncAllLocal(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2))
delay()
f := func(t *testing.T, c *Cluster) {
// Sync bad ID
@ -398,8 +398,8 @@ func TestClustersSyncLocal(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2))
delay()
f := func(t *testing.T, c *Cluster) {
@ -429,8 +429,8 @@ func TestClustersSyncAll(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2))
delay()
j := rand.Intn(nClusters) // choose a random cluster peer
@ -460,8 +460,8 @@ func TestClustersSync(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2))
delay()
j := rand.Intn(nClusters)
@ -521,8 +521,8 @@ func TestClustersRecoverLocal(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2))
delay()
@ -553,8 +553,8 @@ func TestClustersRecover(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h, _ := cid.Decode(test.ErrorCid) // This cid always fails
h2, _ := cid.Decode(test.TestCid2)
clusters[0].Pin(h)
clusters[0].Pin(h2)
clusters[0].Pin(api.PinCid(h))
clusters[0].Pin(api.PinCid(h2))
delay()
@ -647,7 +647,7 @@ func TestClustersReplication(t *testing.T) {
j := rand.Intn(nClusters) // choose a random cluster peer
h, err := prefix.Sum(randomBytes()) // create random cid
checkErr(t, err)
err = clusters[j].Pin(h)
err = clusters[j].Pin(api.PinCid(h))
if err != nil {
t.Error(err)
}
@ -737,7 +737,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
j := rand.Intn(nClusters)
h, _ := cid.Decode(test.TestCid1)
err := clusters[j].Pin(h)
err := clusters[j].Pin(api.PinCid(h))
if err != nil {
t.Error(err)
}
@ -746,7 +746,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
time.Sleep(time.Second / 2)
// Re-pin should fail as it is allocated already
err = clusters[j].Pin(h)
err = clusters[j].Pin(api.PinCid(h))
if err == nil {
t.Fatal("expected an error")
}
@ -767,7 +767,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
time.Sleep(2 * time.Second)
// now pin should succeed
err = clusters[j].Pin(h)
err = clusters[j].Pin(api.PinCid(h))
if err != nil {
t.Fatal(err)
}
@ -806,7 +806,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
j := rand.Intn(nClusters)
h, _ := cid.Decode(test.TestCid1)
err := clusters[j].Pin(h)
err := clusters[j].Pin(api.PinCid(h))
if err != nil {
t.Fatal(err)
}
@ -836,7 +836,7 @@ loop:
}
}
err = clusters[2].Pin(h)
err = clusters[2].Pin(api.PinCid(h))
if err == nil {
t.Fatal("expected an error")
}
@ -862,7 +862,7 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
time.Sleep(time.Second)
// pin something
h, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(h)
clusters[0].Pin(api.PinCid(h))
time.Sleep(time.Second)
pinLocal := 0
pinRemote := 0

View File

@ -37,7 +37,7 @@ func TestApplyToUnpin(t *testing.T) {
st := mapstate.NewMapState()
c, _ := cid.Decode(test.TestCid1)
st.Add(api.Pin{Cid: c, Everywhere: true})
st.Add(api.Pin{Cid: c, ReplicationFactor: -1})
op.ApplyTo(st)
pins := st.List()
if len(pins) != 0 {

View File

@ -187,7 +187,7 @@ func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) {
}
func (mpt *MapPinTracker) isRemote(c api.Pin) bool {
if c.Everywhere {
if c.ReplicationFactor < 0 {
return false
}

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
@ -56,7 +57,7 @@ func TestClustersPeerAdd(t *testing.T) {
}
h, _ := cid.Decode(test.TestCid1)
err := clusters[1].Pin(h)
err := clusters[1].Pin(api.PinCid(h))
if err != nil {
t.Fatal(err)
}
@ -220,7 +221,7 @@ func TestClustersPeerJoin(t *testing.T) {
}
}
hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(hash)
clusters[0].Pin(api.PinCid(hash))
delay()
f := func(t *testing.T, c *Cluster) {
@ -253,7 +254,7 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) {
runF(t, clusters[1:], f)
hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(hash)
clusters[0].Pin(api.PinCid(hash))
delay()
f2 := func(t *testing.T, c *Cluster) {
@ -295,7 +296,7 @@ func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
runF(t, clusters[2:], f)
hash, _ := cid.Decode(test.TestCid1)
clusters[0].Pin(hash)
clusters[0].Pin(api.PinCid(hash))
delay()
f2 := func(t *testing.T, c *Cluster) {

View File

@ -421,12 +421,24 @@ func (rest *RESTAPI) recoverHandler(w http.ResponseWriter, r *http.Request) {
func parseCidOrError(w http.ResponseWriter, r *http.Request) api.PinSerial {
vars := mux.Vars(r)
hash := vars["hash"]
_, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return api.PinSerial{Cid: ""}
}
return api.PinSerial{Cid: hash}
pin := api.PinSerial{
Cid: hash,
}
queryValues := r.URL.Query()
rplStr := queryValues.Get("replication_factor")
if rpl, err := strconv.Atoi(rplStr); err == nil {
pin.ReplicationFactor = rpl
}
return pin
}
func parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID {

View File

@ -32,8 +32,7 @@ func (rpcapi *RPCAPI) ID(in struct{}, out *api.IDSerial) error {
// Pin runs Cluster.Pin().
func (rpcapi *RPCAPI) Pin(in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
return rpcapi.c.Pin(c)
return rpcapi.c.Pin(in.ToPin())
}
// Unpin runs Cluster.Unpin().

View File

@ -13,9 +13,9 @@ var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
var c = api.Pin{
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
Everywhere: false,
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
ReplicationFactor: -1,
}
func TestAdd(t *testing.T) {
@ -46,7 +46,7 @@ func TestGet(t *testing.T) {
get := ms.Get(c.Cid)
if get.Cid.String() != c.Cid.String() ||
get.Allocations[0] != c.Allocations[0] ||
get.Everywhere != c.Everywhere {
get.ReplicationFactor != c.ReplicationFactor {
t.Error("returned something different")
}
}
@ -62,7 +62,7 @@ func TestList(t *testing.T) {
list := ms.List()
if list[0].Cid.String() != c.Cid.String() ||
list[0].Allocations[0] != c.Allocations[0] ||
list[0].Everywhere != c.Everywhere {
list[0].ReplicationFactor != c.ReplicationFactor {
t.Error("returned something different")
}
}