cluster: do not allow to repin recursive pins as something else.
i.e. a direct pin can be repinned as recursive, but a recursive pin cannot be pinned as direct (this fails in IPFS too). Additionally, save a couple of calls to the datastore by obtaining the existing pin only once.
This commit is contained in:
parent
ddea1a995c
commit
99eb29a7d6
|
@ -47,7 +47,7 @@ import (
|
|||
// into account if the given CID was previously in a "pin everywhere" mode,
|
||||
// and will consider such Pins as currently unallocated ones, providing
|
||||
// new allocations as available.
|
||||
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
|
||||
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pin, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
|
||||
defer span.End()
|
||||
|
||||
|
@ -61,8 +61,7 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int
|
|||
|
||||
// Figure out who is holding the CID
|
||||
var currentAllocs []peer.ID
|
||||
currentPin, err := c.PinGet(ctx, hash)
|
||||
if err == nil {
|
||||
if currentPin != nil {
|
||||
currentAllocs = currentPin.Allocations
|
||||
}
|
||||
metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name())
|
||||
|
|
29
cluster.go
29
cluster.go
|
@ -1308,8 +1308,8 @@ func checkPinType(pin *api.Pin) error {
|
|||
|
||||
// setupPin ensures that the Pin object is fit for pinning. We check
|
||||
// and set the replication factors and ensure that the pinType matches the
|
||||
// metadata consistently.
|
||||
func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error {
|
||||
// metadata consistently. Returns the existing Pin object for this Cid if it exists.
|
||||
func (c *Cluster) setupPin(ctx context.Context, pin, existing *api.Pin) error {
|
||||
ctx, span := trace.StartSpan(ctx, "cluster/setupPin")
|
||||
defer span.End()
|
||||
|
||||
|
@ -1322,18 +1322,24 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error {
|
|||
return errors.New("pin.ExpireAt set before current time")
|
||||
}
|
||||
|
||||
existing, err := c.PinGet(ctx, pin.Cid)
|
||||
if err != nil && err != state.ErrNotFound {
|
||||
return err
|
||||
if existing == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if existing != nil && existing.Type != pin.Type {
|
||||
// If an pin CID is already pin, we do a couple more checks
|
||||
if existing.Type != pin.Type {
|
||||
msg := "cannot repin CID with different tracking method, "
|
||||
msg += "clear state with pin rm to proceed. "
|
||||
msg += "New: %s. Was: %s"
|
||||
return fmt.Errorf(msg, pin.Type, existing.Type)
|
||||
}
|
||||
|
||||
if existing.Mode == api.PinModeRecursive && pin.Mode != api.PinModeRecursive {
|
||||
msg := "cannot repin a CID which is already pinned in "
|
||||
msg += "recursive mode (new pin is pinned as %s). Unpin it first."
|
||||
return fmt.Errorf(msg, pin.Mode)
|
||||
}
|
||||
|
||||
return checkPinType(pin)
|
||||
}
|
||||
|
||||
|
@ -1365,8 +1371,13 @@ func (c *Cluster) pin(
|
|||
return pin, true, err
|
||||
}
|
||||
|
||||
existing, err := c.PinGet(ctx, pin.Cid)
|
||||
if err != nil && err != state.ErrNotFound {
|
||||
return pin, false, err
|
||||
}
|
||||
|
||||
// setup pin might produce some side-effects to our pin
|
||||
err := c.setupPin(ctx, pin)
|
||||
err = c.setupPin(ctx, pin, existing)
|
||||
if err != nil {
|
||||
return pin, false, err
|
||||
}
|
||||
|
@ -1379,8 +1390,7 @@ func (c *Cluster) pin(
|
|||
// pins to the consensus layer even if they are, this should trigger the
|
||||
// pin tracker and allows users to get re-pin operations by re-adding
|
||||
// without having to use recover, which is naturally expected.
|
||||
existing, err := c.PinGet(ctx, pin.Cid)
|
||||
if err == nil &&
|
||||
if existing != nil &&
|
||||
pin.PinOptions.Equals(&existing.PinOptions) &&
|
||||
len(blacklist) == 0 {
|
||||
pin = existing
|
||||
|
@ -1396,6 +1406,7 @@ func (c *Cluster) pin(
|
|||
allocs, err := c.allocate(
|
||||
ctx,
|
||||
pin.Cid,
|
||||
existing,
|
||||
pin.ReplicationFactorMin,
|
||||
pin.ReplicationFactorMax,
|
||||
blacklist,
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/state"
|
||||
"github.com/ipfs/ipfs-cluster/version"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
|
@ -342,7 +343,12 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out
|
|||
return errFollowerMode
|
||||
}
|
||||
|
||||
err := rpcapi.c.setupPin(ctx, in)
|
||||
existing, err := rpcapi.c.PinGet(ctx, in.Cid)
|
||||
if err != nil && err != state.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
err = rpcapi.c.setupPin(ctx, in, existing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -364,6 +370,7 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out
|
|||
allocs, err := rpcapi.c.allocate(
|
||||
ctx,
|
||||
in.Cid,
|
||||
existing,
|
||||
in.ReplicationFactorMin,
|
||||
in.ReplicationFactorMax,
|
||||
[]peer.ID{}, // blacklist
|
||||
|
|
Loading…
Reference in New Issue
Block a user