From 99eb29a7d62485d39d1d664a585aa5b51e4c6445 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 20 Apr 2020 15:26:03 +0200 Subject: [PATCH] cluster: do not allow to repin recursive pins as something else. i.e. a direct pin can be repinned as recursive, but a recursive pin cannot be pinned as direct (this fails in IPFS too). Additionally, save a couple of calls to the datastore by obtaining the existing pin only once. --- allocate.go | 5 ++--- cluster.go | 29 ++++++++++++++++++++--------- rpc_api.go | 9 ++++++++- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/allocate.go b/allocate.go index c3eea302..f4680b88 100644 --- a/allocate.go +++ b/allocate.go @@ -47,7 +47,7 @@ import ( // into account if the given CID was previously in a "pin everywhere" mode, // and will consider such Pins as currently unallocated ones, providing // new allocations as available. -func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) { +func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin *api.Pin, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) { ctx, span := trace.StartSpan(ctx, "cluster/allocate") defer span.End() @@ -61,8 +61,7 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int // Figure out who is holding the CID var currentAllocs []peer.ID - currentPin, err := c.PinGet(ctx, hash) - if err == nil { + if currentPin != nil { currentAllocs = currentPin.Allocations } metrics := c.monitor.LatestMetrics(ctx, c.informers[0].Name()) diff --git a/cluster.go b/cluster.go index 19a732bd..127db27d 100644 --- a/cluster.go +++ b/cluster.go @@ -1308,8 +1308,8 @@ func checkPinType(pin *api.Pin) error { // setupPin ensures that the Pin object is fit for pinning. We check // and set the replication factors and ensure that the pinType matches the -// metadata consistently. -func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error { +// metadata consistently. Returns the existing Pin object for this Cid if it exists. +func (c *Cluster) setupPin(ctx context.Context, pin, existing *api.Pin) error { ctx, span := trace.StartSpan(ctx, "cluster/setupPin") defer span.End() @@ -1322,18 +1322,24 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error { return errors.New("pin.ExpireAt set before current time") } - existing, err := c.PinGet(ctx, pin.Cid) - if err != nil && err != state.ErrNotFound { - return err + if existing == nil { + return nil } - if existing != nil && existing.Type != pin.Type { + // If an pin CID is already pin, we do a couple more checks + if existing.Type != pin.Type { msg := "cannot repin CID with different tracking method, " msg += "clear state with pin rm to proceed. " msg += "New: %s. Was: %s" return fmt.Errorf(msg, pin.Type, existing.Type) } + if existing.Mode == api.PinModeRecursive && pin.Mode != api.PinModeRecursive { + msg := "cannot repin a CID which is already pinned in " + msg += "recursive mode (new pin is pinned as %s). Unpin it first." + return fmt.Errorf(msg, pin.Mode) + } + return checkPinType(pin) } @@ -1365,8 +1371,13 @@ func (c *Cluster) pin( return pin, true, err } + existing, err := c.PinGet(ctx, pin.Cid) + if err != nil && err != state.ErrNotFound { + return pin, false, err + } + // setup pin might produce some side-effects to our pin - err := c.setupPin(ctx, pin) + err = c.setupPin(ctx, pin, existing) if err != nil { return pin, false, err } @@ -1379,8 +1390,7 @@ func (c *Cluster) pin( // pins to the consensus layer even if they are, this should trigger the // pin tracker and allows users to get re-pin operations by re-adding // without having to use recover, which is naturally expected. - existing, err := c.PinGet(ctx, pin.Cid) - if err == nil && + if existing != nil && pin.PinOptions.Equals(&existing.PinOptions) && len(blacklist) == 0 { pin = existing @@ -1396,6 +1406,7 @@ func (c *Cluster) pin( allocs, err := c.allocate( ctx, pin.Cid, + existing, pin.ReplicationFactorMin, pin.ReplicationFactorMax, blacklist, diff --git a/rpc_api.go b/rpc_api.go index 9223f393..ea3cea30 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/version" cid "github.com/ipfs/go-cid" @@ -342,7 +343,12 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out return errFollowerMode } - err := rpcapi.c.setupPin(ctx, in) + existing, err := rpcapi.c.PinGet(ctx, in.Cid) + if err != nil && err != state.ErrNotFound { + return err + } + + err = rpcapi.c.setupPin(ctx, in, existing) if err != nil { return err } @@ -364,6 +370,7 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out allocs, err := rpcapi.c.allocate( ctx, in.Cid, + existing, in.ReplicationFactorMin, in.ReplicationFactorMax, []peer.ID{}, // blacklist