diff --git a/allocate.go b/allocate.go index a700a25c..1f5c3948 100644 --- a/allocate.go +++ b/allocate.go @@ -54,7 +54,7 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I } // Figure out who is holding the CID - currentPin, _ := c.getCurrentPin(hash) + currentPin, _ := c.PinGet(hash) currentAllocs := currentPin.Allocations metrics := c.monitor.LatestMetrics(c.informer.Name()) @@ -96,17 +96,6 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I return newAllocs, nil } -// getCurrentPin returns the Pin object for h, if we can find one -// or builds an empty one. -func (c *Cluster) getCurrentPin(h *cid.Cid) (api.Pin, bool) { - st, err := c.consensus.State() - if err != nil { - return api.PinCid(h), false - } - ok := st.Has(h) - return st.Get(h), ok -} - // allocationError logs an allocation error func allocationError(hash *cid.Cid, needed, wanted int, candidatesValid []peer.ID) error { logger.Errorf("Not enough candidates to allocate %s:", hash) diff --git a/api/add.go b/api/add.go index 157f3cfa..55681282 100644 --- a/api/add.go +++ b/api/add.go @@ -10,7 +10,7 @@ import ( // DefaultShardSize is the shard size for params objects created with DefaultParams(). var DefaultShardSize = uint64(100 * 1024 * 1024) // 100 MB -// Params contains all of the configurable parameters needed to specify the +// AddParams contains all of the configurable parameters needed to specify the // importing process of a file being added to an ipfs-cluster type AddParams struct { PinOptions @@ -134,6 +134,7 @@ func (p *AddParams) ToQueryString() string { return query } +// Equals checks if p equals p2. func (p *AddParams) Equals(p2 *AddParams) bool { return p.ReplicationFactorMin == p2.ReplicationFactorMin && p.ReplicationFactorMax == p2.ReplicationFactorMax && diff --git a/api/types.go b/api/types.go index 2f93460c..b865db90 100644 --- a/api/types.go +++ b/api/types.go @@ -135,7 +135,8 @@ func (ips IPFSPinStatus) IsPinned(maxDepth int) bool { return ips == IPFSPinStatusRecursive case maxDepth == 0: return ips == IPFSPinStatusDirect - case maxDepth > 0: // FIXME + case maxDepth > 0: + // FIXME: when we know how ipfs returns partial pins. return ips == IPFSPinStatusRecursive } return false diff --git a/cluster.go b/cluster.go index 79f43ad3..2c1ce19f 100644 --- a/cluster.go +++ b/cluster.go @@ -761,8 +761,7 @@ func (c *Cluster) StateSync() error { // c. Track items which should not be local as remote for _, p := range trackedPins { pCid := p.Cid - currentPin := cState.Get(pCid) - has := cState.Has(pCid) + currentPin, has := cState.Get(pCid) allocatedHere := containsPeer(currentPin.Allocations, c.id) || currentPin.ReplicationFactorMin == -1 switch { @@ -835,19 +834,34 @@ func (c *Cluster) Sync(h *cid.Cid) (api.GlobalPinInfo, error) { return c.globalPinInfoCid("SyncLocal", h) } +// used for RecoverLocal and SyncLocal. +func (c *Cluster) localPinInfoOp( + h *cid.Cid, + f func(*cid.Cid) (api.PinInfo, error), +) (pInfo api.PinInfo, err error) { + cids, err := c.cidsFromMetaPin(h) + if err != nil { + return api.PinInfo{}, err + } + + for _, ci := range cids { + pInfo, err = f(ci) + if err != nil { + logger.Error("tracker.SyncCid() returned with error: ", err) + logger.Error("Is the ipfs daemon running?") + break + } + } + // return the last pInfo/err, should be the root Cid if everything ok + return pInfo, err + +} + // SyncLocal performs a local sync operation for the given Cid. This will // tell the tracker to verify the status of the Cid against the IPFS daemon. // It returns the updated PinInfo for the Cid. -func (c *Cluster) SyncLocal(h *cid.Cid) (api.PinInfo, error) { - var err error - pInfo, err := c.tracker.Sync(h) - // Despite errors, trackers provides an updated PinInfo so - // we just log it. - if err != nil { - logger.Error("tracker.SyncCid() returned with error: ", err) - logger.Error("Is the ipfs daemon running?") - } - return pInfo, err +func (c *Cluster) SyncLocal(h *cid.Cid) (pInfo api.PinInfo, err error) { + return c.localPinInfoOp(h, c.tracker.Sync) } // RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked @@ -864,8 +878,8 @@ func (c *Cluster) Recover(h *cid.Cid) (api.GlobalPinInfo, error) { // RecoverLocal triggers a recover operation for a given Cid in this peer only. // It returns the updated PinInfo, after recovery. -func (c *Cluster) RecoverLocal(h *cid.Cid) (api.PinInfo, error) { - return c.tracker.Recover(h) +func (c *Cluster) RecoverLocal(h *cid.Cid) (pInfo api.PinInfo, err error) { + return c.localPinInfoOp(h, c.tracker.Recover) } // Pins returns the list of Cids managed by Cluster and which are part @@ -889,7 +903,11 @@ func (c *Cluster) Pins() []api.Pin { // the item is successfully pinned. For that, use Status(). PinGet // returns an error if the given Cid is not part of the global state. func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) { - pin, ok := c.getCurrentPin(h) + st, err := c.consensus.State() + if err != nil { + return api.PinCid(h), err + } + pin, ok := st.Get(h) if !ok { return pin, errors.New("cid is not part of the global state") } @@ -981,18 +999,10 @@ func (c *Cluster) setupPin(pin *api.Pin) error { return err } - // We ensure that if the given pin exists already, it is not of - // different type (i.e. sharding and already locally pinned item) - var existing *api.Pin - cState, err := c.consensus.State() - if err == nil && pin.Cid != nil && cState.Has(pin.Cid) { - pinTmp := cState.Get(pin.Cid) - existing = &pinTmp - if existing.Type != pin.Type { - return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed") - } + existing, err := c.PinGet(pin.Cid) + if err == nil && existing.Type != pin.Type { // it exists + return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed") } - return checkPinType(pin) } @@ -1025,7 +1035,7 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) } pin.Allocations = allocs - if curr, _ := c.getCurrentPin(pin.Cid); curr.Equals(pin) { + if curr, _ := c.PinGet(pin.Cid); curr.Equals(pin) { // skip pinning logger.Debugf("pinning %s skipped: already correctly allocated", pin.Cid) return false, nil @@ -1048,16 +1058,11 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) // of underlying IPFS daemon unpinning operations. func (c *Cluster) Unpin(h *cid.Cid) error { logger.Info("IPFS cluster unpinning:", h) - cState, err := c.consensus.State() + pin, err := c.PinGet(h) if err != nil { - return err + return fmt.Errorf("cannot unpin pin uncommitted to state: %s", err) } - if !cState.Has(h) { - return errors.New("cannot unpin pin uncommitted to state") - } - pin := cState.Get(h) - switch pin.Type { case api.DataType: return c.consensus.LogUnpin(pin) @@ -1084,39 +1089,20 @@ func (c *Cluster) Unpin(h *cid.Cid) error { // reference the same metadata node, only unpinning those nodes without // existing references func (c *Cluster) unpinClusterDag(metaPin api.Pin) error { - cDAG := metaPin.Reference - if cDAG == nil { - return errors.New("metaPin not linked to ClusterDAG") - } - - cdagBytes, err := c.ipfs.BlockGet(cDAG) - if err != nil { - return err - } - cdag, err := sharding.CborDataToNode(cdagBytes, "cbor") + cids, err := c.cidsFromMetaPin(metaPin.Cid) if err != nil { return err } - // traverse all shards of cdag - for _, shardLink := range cdag.Links() { - err = c.unpinShard(cDAG, shardLink.Cid) + // TODO: FIXME: potentially unpinning shards which are referenced + // by other clusterDAGs. + for _, ci := range cids { + err = c.consensus.LogUnpin(api.PinCid(ci)) if err != nil { return err } } - - // by invariant in Pin cdag has only one parent and can be unpinned - cdagWrap := api.PinCid(cDAG) - return c.consensus.LogUnpin(cdagWrap) -} - -func (c *Cluster) unpinShard(cdagCid, shardCid *cid.Cid) error { - shardPin := api.PinCid(shardCid) - return c.consensus.LogUnpin(shardPin) - - // TODO: FIXME: verify this shard is not referenced by any other - // clusterDAGs. + return nil } // AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer @@ -1325,6 +1311,51 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) { return id, err } +// cidsFromMetaPin expands a meta-pin and returns a list of Cids that +// Cluster handles for it: the ShardPins, the ClusterDAG and the MetaPin, in +// that order (the MetaPin is the last element). +// It returns a slice with only the given Cid if it's not a known Cid or not a +// MetaPin. +func (c *Cluster) cidsFromMetaPin(h *cid.Cid) ([]*cid.Cid, error) { + cState, err := c.consensus.State() + if err != nil { + return nil, err + } + + list := make([]*cid.Cid, 0) + list = append(list, h) + + pin, ok := cState.Get(h) + if !ok { + return list, nil + } + + if pin.Type != api.MetaType { + return list, nil + } + + list = append([]*cid.Cid{pin.Reference}, list...) + clusterDagPin, err := c.PinGet(pin.Reference) + if err != nil { + return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err) + } + + clusterDagBlock, err := c.ipfs.BlockGet(clusterDagPin.Cid) + if err != nil { + return list, fmt.Errorf("error reading clusterDAG block from ipfs: %s", err) + } + + clusterDagNode, err := sharding.CborDataToNode(clusterDagBlock, "cbor") + if err != nil { + return list, fmt.Errorf("error parsing clusterDAG block: %s", err) + } + for _, l := range clusterDagNode.Links() { + list = append([]*cid.Cid{l.Cid}, list...) + } + + return list, nil +} + // diffPeers returns the peerIDs added and removed from peers2 in relation to // peers1 func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) { diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go index edde8d2c..9242b85a 100644 --- a/pintracker/maptracker/maptracker.go +++ b/pintracker/maptracker/maptracker.go @@ -181,15 +181,9 @@ func (mpt *MapPinTracker) enqueue(c api.Pin, typ optracker.OperationType, ch cha func (mpt *MapPinTracker) Track(c api.Pin) error { logger.Debugf("tracking %s", c.Cid) - // TODO: Fix this for sharding - // FIXME: Fix this for sharding - // The problem is remote/unpin operation won't be cancelled - // but I don't know how bad is that - // Also, this is dup code - // Sharded pins are never pinned. A sharded pin cannot turn into - // something else or viceversa like it happens with Remote pins. - // Thus we just mark as sharded + // something else or viceversa like it happens with Remote pins so + // we just track them. if c.Type == api.MetaType { mpt.optracker.TrackNewOperation(c, optracker.OperationSharded, optracker.PhaseDone) return nil @@ -197,13 +191,14 @@ func (mpt *MapPinTracker) Track(c api.Pin) error { // Trigger unpin whenever something remote is tracked // Note, IPFSConn checks with pin/ls before triggering - // pin/rm. + // pin/rm, so this actually does not always trigger unpin + // to ipfs. if util.IsRemotePin(c, mpt.peerID) { op := mpt.optracker.TrackNewOperation(c, optracker.OperationRemote, optracker.PhaseInProgress) if op == nil { - return nil // ongoing unpin + return nil // Ongoing operationRemote / PhaseInProgress } - err := mpt.unpin(op) + err := mpt.unpin(op) // unpin all the time, even if not pinned op.Cancel() if err != nil { op.SetError(err) @@ -315,8 +310,21 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI status = api.TrackerStatusUnpinned } - if ips.IsPinned(-1) { // FIXME FIXME FIXME: how much do we want to check - // that something is pinned as EXPECTED (with right max depth). + // TODO(hector): for sharding, we may need to check that a shard + // is pinned to the right depth. For now, we assumed that if it's pinned + // in some way, then it must be right (including direct). + pinned := func(i api.IPFSPinStatus) bool { + switch i { + case api.IPFSPinStatusRecursive: + return i.IsPinned(-1) + case api.IPFSPinStatusDirect: + return i.IsPinned(0) + default: + return i.IsPinned(1) // Pinned with depth 1 or more. + } + } + + if pinned(ips) { switch status { case api.TrackerStatusPinError: // If an item that we wanted to pin is pinned, we mark it so diff --git a/state/interface.go b/state/interface.go index 79cfde20..8cd19ecc 100644 --- a/state/interface.go +++ b/state/interface.go @@ -22,7 +22,7 @@ type State interface { // Has returns true if the state is holding information for a Cid Has(*cid.Cid) bool // Get returns the information attacthed to this pin - Get(*cid.Cid) api.Pin + Get(*cid.Cid) (api.Pin, bool) // Migrate restores the serialized format of an outdated state to the current version Migrate(r io.Reader) error // Return the version of this state diff --git a/state/mapstate/map_state.go b/state/mapstate/map_state.go index 45e091b1..cb17d967 100644 --- a/state/mapstate/map_state.go +++ b/state/mapstate/map_state.go @@ -60,14 +60,17 @@ func (st *MapState) Rm(c *cid.Cid) error { // fields initialized, regardless of the // presence of the provided Cid in the state. // To check the presence, use MapState.Has(*cid.Cid). -func (st *MapState) Get(c *cid.Cid) api.Pin { +func (st *MapState) Get(c *cid.Cid) (api.Pin, bool) { + if c == nil { + return api.PinCid(c), false + } st.pinMux.RLock() defer st.pinMux.RUnlock() pins, ok := st.PinMap[c.String()] if !ok { // make sure no panics - return api.PinCid(c) + return api.PinCid(c), false } - return pins.ToPin() + return pins.ToPin(), true } // Has returns true if the Cid belongs to the State. diff --git a/state/mapstate/map_state_test.go b/state/mapstate/map_state_test.go index 84b4d1c6..722d1e66 100644 --- a/state/mapstate/map_state_test.go +++ b/state/mapstate/map_state_test.go @@ -51,7 +51,7 @@ func TestGet(t *testing.T) { }() ms := NewMapState() ms.Add(c) - get := ms.Get(c.Cid) + get, _ := ms.Get(c.Cid) if get.Cid.String() != c.Cid.String() || get.Allocations[0] != c.Allocations[0] || get.ReplicationFactorMax != c.ReplicationFactorMax || @@ -92,7 +92,7 @@ func TestMarshalUnmarshal(t *testing.T) { if ms.Version != ms2.Version { t.Fatal(err) } - get := ms2.Get(c.Cid) + get, _ := ms2.Get(c.Cid) if get.Allocations[0] != testPeerID1 { t.Error("expected different peer id") } @@ -129,7 +129,7 @@ func TestMigrateFromV1(t *testing.T) { if err != nil { t.Error(err) } - get := ms.Get(c.Cid) + get, _ := ms.Get(c.Cid) if get.ReplicationFactorMax != -1 || get.ReplicationFactorMin != -1 || !get.Cid.Equals(c.Cid) { t.Error("expected something different") t.Logf("%+v", get)