Make sure sync and recover operations receive all cids in a clusterDAG.

Cleanup some code and fixmes.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-08-06 14:31:45 +02:00
parent 8f1a15b279
commit c81f61eeea
8 changed files with 126 additions and 93 deletions

View File

@ -54,7 +54,7 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
}
// Figure out who is holding the CID
currentPin, _ := c.getCurrentPin(hash)
currentPin, _ := c.PinGet(hash)
currentAllocs := currentPin.Allocations
metrics := c.monitor.LatestMetrics(c.informer.Name())
@ -96,17 +96,6 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
return newAllocs, nil
}
// getCurrentPin returns the Pin object for h, if we can find one
// or builds an empty one.
func (c *Cluster) getCurrentPin(h *cid.Cid) (api.Pin, bool) {
st, err := c.consensus.State()
if err != nil {
return api.PinCid(h), false
}
ok := st.Has(h)
return st.Get(h), ok
}
// allocationError logs an allocation error
func allocationError(hash *cid.Cid, needed, wanted int, candidatesValid []peer.ID) error {
logger.Errorf("Not enough candidates to allocate %s:", hash)

View File

@ -10,7 +10,7 @@ import (
// DefaultShardSize is the shard size for params objects created with DefaultParams().
var DefaultShardSize = uint64(100 * 1024 * 1024) // 100 MB
// Params contains all of the configurable parameters needed to specify the
// AddParams contains all of the configurable parameters needed to specify the
// importing process of a file being added to an ipfs-cluster
type AddParams struct {
PinOptions
@ -134,6 +134,7 @@ func (p *AddParams) ToQueryString() string {
return query
}
// Equals checks if p equals p2.
func (p *AddParams) Equals(p2 *AddParams) bool {
return p.ReplicationFactorMin == p2.ReplicationFactorMin &&
p.ReplicationFactorMax == p2.ReplicationFactorMax &&

View File

@ -135,7 +135,8 @@ func (ips IPFSPinStatus) IsPinned(maxDepth int) bool {
return ips == IPFSPinStatusRecursive
case maxDepth == 0:
return ips == IPFSPinStatusDirect
case maxDepth > 0: // FIXME
case maxDepth > 0:
// FIXME: when we know how ipfs returns partial pins.
return ips == IPFSPinStatusRecursive
}
return false

View File

@ -761,8 +761,7 @@ func (c *Cluster) StateSync() error {
// c. Track items which should not be local as remote
for _, p := range trackedPins {
pCid := p.Cid
currentPin := cState.Get(pCid)
has := cState.Has(pCid)
currentPin, has := cState.Get(pCid)
allocatedHere := containsPeer(currentPin.Allocations, c.id) || currentPin.ReplicationFactorMin == -1
switch {
@ -835,19 +834,34 @@ func (c *Cluster) Sync(h *cid.Cid) (api.GlobalPinInfo, error) {
return c.globalPinInfoCid("SyncLocal", h)
}
// used for RecoverLocal and SyncLocal.
func (c *Cluster) localPinInfoOp(
h *cid.Cid,
f func(*cid.Cid) (api.PinInfo, error),
) (pInfo api.PinInfo, err error) {
cids, err := c.cidsFromMetaPin(h)
if err != nil {
return api.PinInfo{}, err
}
for _, ci := range cids {
pInfo, err = f(ci)
if err != nil {
logger.Error("tracker.SyncCid() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
break
}
}
// return the last pInfo/err, should be the root Cid if everything ok
return pInfo, err
}
// SyncLocal performs a local sync operation for the given Cid. This will
// tell the tracker to verify the status of the Cid against the IPFS daemon.
// It returns the updated PinInfo for the Cid.
func (c *Cluster) SyncLocal(h *cid.Cid) (api.PinInfo, error) {
var err error
pInfo, err := c.tracker.Sync(h)
// Despite errors, trackers provides an updated PinInfo so
// we just log it.
if err != nil {
logger.Error("tracker.SyncCid() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
}
return pInfo, err
func (c *Cluster) SyncLocal(h *cid.Cid) (pInfo api.PinInfo, err error) {
return c.localPinInfoOp(h, c.tracker.Sync)
}
// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
@ -864,8 +878,8 @@ func (c *Cluster) Recover(h *cid.Cid) (api.GlobalPinInfo, error) {
// RecoverLocal triggers a recover operation for a given Cid in this peer only.
// It returns the updated PinInfo, after recovery.
func (c *Cluster) RecoverLocal(h *cid.Cid) (api.PinInfo, error) {
return c.tracker.Recover(h)
func (c *Cluster) RecoverLocal(h *cid.Cid) (pInfo api.PinInfo, err error) {
return c.localPinInfoOp(h, c.tracker.Recover)
}
// Pins returns the list of Cids managed by Cluster and which are part
@ -889,7 +903,11 @@ func (c *Cluster) Pins() []api.Pin {
// the item is successfully pinned. For that, use Status(). PinGet
// returns an error if the given Cid is not part of the global state.
func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) {
pin, ok := c.getCurrentPin(h)
st, err := c.consensus.State()
if err != nil {
return api.PinCid(h), err
}
pin, ok := st.Get(h)
if !ok {
return pin, errors.New("cid is not part of the global state")
}
@ -981,18 +999,10 @@ func (c *Cluster) setupPin(pin *api.Pin) error {
return err
}
// We ensure that if the given pin exists already, it is not of
// different type (i.e. sharding and already locally pinned item)
var existing *api.Pin
cState, err := c.consensus.State()
if err == nil && pin.Cid != nil && cState.Has(pin.Cid) {
pinTmp := cState.Get(pin.Cid)
existing = &pinTmp
if existing.Type != pin.Type {
return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed")
}
existing, err := c.PinGet(pin.Cid)
if err == nil && existing.Type != pin.Type { // it exists
return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed")
}
return checkPinType(pin)
}
@ -1025,7 +1035,7 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID)
}
pin.Allocations = allocs
if curr, _ := c.getCurrentPin(pin.Cid); curr.Equals(pin) {
if curr, _ := c.PinGet(pin.Cid); curr.Equals(pin) {
// skip pinning
logger.Debugf("pinning %s skipped: already correctly allocated", pin.Cid)
return false, nil
@ -1048,16 +1058,11 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID)
// of underlying IPFS daemon unpinning operations.
func (c *Cluster) Unpin(h *cid.Cid) error {
logger.Info("IPFS cluster unpinning:", h)
cState, err := c.consensus.State()
pin, err := c.PinGet(h)
if err != nil {
return err
return fmt.Errorf("cannot unpin pin uncommitted to state: %s", err)
}
if !cState.Has(h) {
return errors.New("cannot unpin pin uncommitted to state")
}
pin := cState.Get(h)
switch pin.Type {
case api.DataType:
return c.consensus.LogUnpin(pin)
@ -1084,39 +1089,20 @@ func (c *Cluster) Unpin(h *cid.Cid) error {
// reference the same metadata node, only unpinning those nodes without
// existing references
func (c *Cluster) unpinClusterDag(metaPin api.Pin) error {
cDAG := metaPin.Reference
if cDAG == nil {
return errors.New("metaPin not linked to ClusterDAG")
}
cdagBytes, err := c.ipfs.BlockGet(cDAG)
if err != nil {
return err
}
cdag, err := sharding.CborDataToNode(cdagBytes, "cbor")
cids, err := c.cidsFromMetaPin(metaPin.Cid)
if err != nil {
return err
}
// traverse all shards of cdag
for _, shardLink := range cdag.Links() {
err = c.unpinShard(cDAG, shardLink.Cid)
// TODO: FIXME: potentially unpinning shards which are referenced
// by other clusterDAGs.
for _, ci := range cids {
err = c.consensus.LogUnpin(api.PinCid(ci))
if err != nil {
return err
}
}
// by invariant in Pin cdag has only one parent and can be unpinned
cdagWrap := api.PinCid(cDAG)
return c.consensus.LogUnpin(cdagWrap)
}
func (c *Cluster) unpinShard(cdagCid, shardCid *cid.Cid) error {
shardPin := api.PinCid(shardCid)
return c.consensus.LogUnpin(shardPin)
// TODO: FIXME: verify this shard is not referenced by any other
// clusterDAGs.
return nil
}
// AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer
@ -1325,6 +1311,51 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) {
return id, err
}
// cidsFromMetaPin expands a meta-pin and returns a list of Cids that
// Cluster handles for it: the ShardPins, the ClusterDAG and the MetaPin, in
// that order (the MetaPin is the last element).
// It returns a slice with only the given Cid if it's not a known Cid or not a
// MetaPin.
func (c *Cluster) cidsFromMetaPin(h *cid.Cid) ([]*cid.Cid, error) {
cState, err := c.consensus.State()
if err != nil {
return nil, err
}
list := make([]*cid.Cid, 0)
list = append(list, h)
pin, ok := cState.Get(h)
if !ok {
return list, nil
}
if pin.Type != api.MetaType {
return list, nil
}
list = append([]*cid.Cid{pin.Reference}, list...)
clusterDagPin, err := c.PinGet(pin.Reference)
if err != nil {
return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err)
}
clusterDagBlock, err := c.ipfs.BlockGet(clusterDagPin.Cid)
if err != nil {
return list, fmt.Errorf("error reading clusterDAG block from ipfs: %s", err)
}
clusterDagNode, err := sharding.CborDataToNode(clusterDagBlock, "cbor")
if err != nil {
return list, fmt.Errorf("error parsing clusterDAG block: %s", err)
}
for _, l := range clusterDagNode.Links() {
list = append([]*cid.Cid{l.Cid}, list...)
}
return list, nil
}
// diffPeers returns the peerIDs added and removed from peers2 in relation to
// peers1
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {

View File

@ -181,15 +181,9 @@ func (mpt *MapPinTracker) enqueue(c api.Pin, typ optracker.OperationType, ch cha
func (mpt *MapPinTracker) Track(c api.Pin) error {
logger.Debugf("tracking %s", c.Cid)
// TODO: Fix this for sharding
// FIXME: Fix this for sharding
// The problem is remote/unpin operation won't be cancelled
// but I don't know how bad is that
// Also, this is dup code
// Sharded pins are never pinned. A sharded pin cannot turn into
// something else or viceversa like it happens with Remote pins.
// Thus we just mark as sharded
// something else or viceversa like it happens with Remote pins so
// we just track them.
if c.Type == api.MetaType {
mpt.optracker.TrackNewOperation(c, optracker.OperationSharded, optracker.PhaseDone)
return nil
@ -197,13 +191,14 @@ func (mpt *MapPinTracker) Track(c api.Pin) error {
// Trigger unpin whenever something remote is tracked
// Note, IPFSConn checks with pin/ls before triggering
// pin/rm.
// pin/rm, so this actually does not always trigger unpin
// to ipfs.
if util.IsRemotePin(c, mpt.peerID) {
op := mpt.optracker.TrackNewOperation(c, optracker.OperationRemote, optracker.PhaseInProgress)
if op == nil {
return nil // ongoing unpin
return nil // Ongoing operationRemote / PhaseInProgress
}
err := mpt.unpin(op)
err := mpt.unpin(op) // unpin all the time, even if not pinned
op.Cancel()
if err != nil {
op.SetError(err)
@ -315,8 +310,21 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI
status = api.TrackerStatusUnpinned
}
if ips.IsPinned(-1) { // FIXME FIXME FIXME: how much do we want to check
// that something is pinned as EXPECTED (with right max depth).
// TODO(hector): for sharding, we may need to check that a shard
// is pinned to the right depth. For now, we assumed that if it's pinned
// in some way, then it must be right (including direct).
pinned := func(i api.IPFSPinStatus) bool {
switch i {
case api.IPFSPinStatusRecursive:
return i.IsPinned(-1)
case api.IPFSPinStatusDirect:
return i.IsPinned(0)
default:
return i.IsPinned(1) // Pinned with depth 1 or more.
}
}
if pinned(ips) {
switch status {
case api.TrackerStatusPinError:
// If an item that we wanted to pin is pinned, we mark it so

View File

@ -22,7 +22,7 @@ type State interface {
// Has returns true if the state is holding information for a Cid
Has(*cid.Cid) bool
// Get returns the information attacthed to this pin
Get(*cid.Cid) api.Pin
Get(*cid.Cid) (api.Pin, bool)
// Migrate restores the serialized format of an outdated state to the current version
Migrate(r io.Reader) error
// Return the version of this state

View File

@ -60,14 +60,17 @@ func (st *MapState) Rm(c *cid.Cid) error {
// fields initialized, regardless of the
// presence of the provided Cid in the state.
// To check the presence, use MapState.Has(*cid.Cid).
func (st *MapState) Get(c *cid.Cid) api.Pin {
func (st *MapState) Get(c *cid.Cid) (api.Pin, bool) {
if c == nil {
return api.PinCid(c), false
}
st.pinMux.RLock()
defer st.pinMux.RUnlock()
pins, ok := st.PinMap[c.String()]
if !ok { // make sure no panics
return api.PinCid(c)
return api.PinCid(c), false
}
return pins.ToPin()
return pins.ToPin(), true
}
// Has returns true if the Cid belongs to the State.

View File

@ -51,7 +51,7 @@ func TestGet(t *testing.T) {
}()
ms := NewMapState()
ms.Add(c)
get := ms.Get(c.Cid)
get, _ := ms.Get(c.Cid)
if get.Cid.String() != c.Cid.String() ||
get.Allocations[0] != c.Allocations[0] ||
get.ReplicationFactorMax != c.ReplicationFactorMax ||
@ -92,7 +92,7 @@ func TestMarshalUnmarshal(t *testing.T) {
if ms.Version != ms2.Version {
t.Fatal(err)
}
get := ms2.Get(c.Cid)
get, _ := ms2.Get(c.Cid)
if get.Allocations[0] != testPeerID1 {
t.Error("expected different peer id")
}
@ -129,7 +129,7 @@ func TestMigrateFromV1(t *testing.T) {
if err != nil {
t.Error(err)
}
get := ms.Get(c.Cid)
get, _ := ms.Get(c.Cid)
if get.ReplicationFactorMax != -1 || get.ReplicationFactorMin != -1 || !get.Cid.Equals(c.Cid) {
t.Error("expected something different")
t.Logf("%+v", get)