From 238f3726f3a1252034cb76f78a529cbbbf6975fe Mon Sep 17 00:00:00 2001 From: Wyatt Daviau Date: Sun, 18 Mar 2018 15:29:02 -0400 Subject: [PATCH] Pin datastructure updated to support sharding 4 PinTypes specify how CID is pinned Changes to Pin and Unpin to handle different PinTypes Tests for different PinTypes Migration for new state format using new Pin datastructures Visibility of the PinTypes used internally limited by default License: MIT Signed-off-by: Wyatt Daviau --- api/rest/client/methods.go | 4 +- api/rest/client/methods_test.go | 5 +- api/rest/restapi.go | 27 +++- api/rest/restapi_test.go | 2 +- api/types.go | 156 +++++++++++++++++++++- api/types_test.go | 26 +++- cluster.go | 196 ++++++++++++++++++++++++++-- cluster_test.go | 123 ++++++++++++++++- consensus/raft/consensus_test.go | 45 +++++++ consensus/raft/log_op.go | 1 - ipfs-cluster-ctl/formatters.go | 22 ++++ ipfs-cluster-ctl/main.go | 17 ++- ipfscluster.go | 2 + ipfscluster_test.go | 17 ++- ipfsconn/ipfshttp/ipfshttp.go | 6 + ipfsconn/ipfshttp/ipfshttp_test.go | 14 ++ pintracker/maptracker/maptracker.go | 21 +++ rpc_api.go | 14 +- sharder/clusterdag.go | 47 ++++++- sharder/sharder_test.go | 29 +--- state/mapstate/migrate.go | 3 + test/cids.go | 12 +- test/ipfs_mock.go | 17 +++ util.go | 10 ++ 24 files changed, 748 insertions(+), 68 deletions(-) diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index d84a3e5b..050dfb51 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -83,9 +83,9 @@ func (c *Client) Unpin(ci *cid.Cid) error { // Allocations returns the consensus state listing all tracked items and // the peers that should be pinning them. -func (c *Client) Allocations() ([]api.Pin, error) { +func (c *Client) Allocations(pinType api.PinType) ([]api.Pin, error) { var pins []api.PinSerial - err := c.do("GET", "/allocations", nil, &pins) + err := c.do("GET", fmt.Sprintf("/allocations?pintype=%s", pinType.String()), nil, &pins) result := make([]api.Pin, len(pins)) for i, p := range pins { result[i] = p.ToPin() diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index 59aac11e..63827daf 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -8,6 +8,9 @@ import ( rpc "github.com/hsanjuan/go-libp2p-gorpc" peer "github.com/libp2p/go-libp2p-peer" + cid "github.com/ipfs/go-cid" + types "github.com/ipfs/ipfs-cluster/api" + ma "github.com/multiformats/go-multiaddr" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/rest" @@ -166,7 +169,7 @@ func TestAllocations(t *testing.T) { defer shutdown(api) testF := func(t *testing.T, c *Client) { - pins, err := c.Allocations() + pins, err := c.Allocations(types.PinType(types.AllType)) if err != nil { t.Fatal(err) } diff --git a/api/rest/restapi.go b/api/rest/restapi.go index af86d173..556ff195 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -776,13 +776,33 @@ func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) { } } +// filterOutPin returns true if the given pin should be filtered out according +// to the input filter type +func (api *API) filterOutPin(filter types.PinType, pin types.Pin) bool { + if filter == types.AllType { + return false + } + return pin.Type != filter +} + func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) { + queryValues := r.URL.Query() + pintype := queryValues.Get("pintype") + filter := types.PinTypeFromString(pintype) var pins []types.PinSerial - err := api.rpcClient.Call("", + err := api.rpcClient.Call( + "", "Cluster", "Pins", struct{}{}, - &pins) + &pins, + ) + for i, pinS := range pins { + if api.filterOutPin(filter, pinS.ToPin()) { + // remove this pin from output + pins = append(pins[:i], pins[i+1:]...) + } + } sendResponse(w, err, pins) } @@ -950,7 +970,8 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial { } pin := types.PinSerial{ - Cid: hash, + Cid: hash, + Type: types.DataType, } queryValues := r.URL.Query() diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 995a7fe7..78e8c331 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -398,7 +398,7 @@ func TestAPIAllocationsEndpoint(t *testing.T) { tf := func(t *testing.T, url urlF) { var resp []api.PinSerial - makeGet(t, rest, url(rest)+"/allocations", &resp) + makeGet(t, rest, url(rest)+"/allocations?a=false", &resp) if len(resp) != 3 || resp[0].Cid != test.TestCid1 || resp[1].Cid != test.TestCid2 || resp[2].Cid != test.TestCid3 { diff --git a/api/types.go b/api/types.go index c9baeeba..c1e1396f 100644 --- a/api/types.go +++ b/api/types.go @@ -53,6 +53,9 @@ const ( TrackerStatusPinQueued // The item has been queued for unpinning on the IPFS daemon TrackerStatusUnpinQueued + // The IPFS daemon is not pinning the item through this cid but it is + // tracked in a cluster dag + TrackerStatusSharded ) // TrackerStatus represents the status of a tracked Cid in the PinTracker @@ -515,22 +518,133 @@ func (addrsS MultiaddrsSerial) ToMultiaddrs() []ma.Multiaddr { return addrs } +// PeersToStrings IDB58Encodes a list of peers. +func PeersToStrings(peers []peer.ID) []string { + strs := make([]string, len(peers)) + for i, p := range peers { + if p != "" { + strs[i] = peer.IDB58Encode(p) + } + } + return strs +} + +// StringsToPeers decodes peer.IDs from strings. +func StringsToPeers(strs []string) []peer.ID { + peers := make([]peer.ID, len(strs)) + for i, p := range strs { + var err error + peers[i], err = peer.IDB58Decode(p) + if err != nil { + logger.Error(p, err) + } + } + return peers +} + +// CidsToStrings encodes cid.Cids to strings. +func CidsToStrings(cids []*cid.Cid) []string { + strs := make([]string, len(cids)) + for i, c := range cids { + strs[i] = c.String() + } + return strs +} + +// StringsToCidSet decodes cid.Cids from strings. +func StringsToCidSet(strs []string) *cid.Set { + cids := cid.NewSet() + for _, str := range strs { + c, err := cid.Decode(str) + if err != nil { + logger.Error(str, err) + } + cids.Add(c) + } + return cids +} + +// PinType values +const ( + DataType PinType = iota + 1 + MetaType + CdagType + ShardType +) + +// AllType is a PinType used for filtering all pin types +const AllType = -1 + +// PinType specifies which of four possible interpretations a pin represents. +// DataType pins are the simplest and represent a pin in the pinset used to +// directly track user data. ShardType pins are metadata pins that track +// many nodes in a user's data DAG. ShardType pins have a parent pin, and in +// general can have many parents. ClusterDAG, or Cdag for short, pins are also +// metadata pins that do not directly track user data DAGs but rather other +// metadata pins. CdagType pins have at least one parent. Finally MetaType +// pins always track the cid of the root of a user-tracked data DAG. However +// MetaType pins are not stored directly in the ipfs pinset. Instead the +// underlying DAG is tracked via the metadata pins underneath the root of a +// CdagType pin +type PinType int + +// PinTypeFromString is the inverse of String. It returns the PinType value +// corresponding to the input string +func PinTypeFromString(str string) PinType { + switch str { + case "pin": + return DataType + case "meta-pin": + return MetaType + case "clusterdag-pin": + return CdagType + case "shard-pin": + return ShardType + case "all": + return AllType + default: + return PinType(0) // invalid string + } +} + +// String returns a printable value to identify the PinType +func (pT *PinType) String() string { + switch *pT { + case DataType: + return "pin" + case MetaType: + return "meta-pin" + case CdagType: + return "clusterdag-pin" + case ShardType: + return "shard-pin" + case AllType: + return "all" + default: + panic("String() called on invalid shard type") + } +} + // Pin is an argument that carries a Cid. It may carry more things in the // future. type Pin struct { Cid *cid.Cid Name string + Type PinType Allocations []peer.ID ReplicationFactorMin int ReplicationFactorMax int Recursive bool + Parents *cid.Set + Clusterdag *cid.Cid } // PinCid is a shortcut to create a Pin only with a Cid. Default is for pin to -// be recursive +// be recursive and the pin to be of DataType func PinCid(c *cid.Cid) Pin { return Pin{ Cid: c, + Type: DataType, Allocations: []peer.ID{}, Recursive: true, } @@ -540,10 +654,13 @@ func PinCid(c *cid.Cid) Pin { type PinSerial struct { Cid string `json:"cid"` Name string `json:"name"` + Type int `json:"type"` Allocations []string `json:"allocations"` ReplicationFactorMin int `json:"replication_factor_min"` ReplicationFactorMax int `json:"replication_factor_max"` Recursive bool `json:"recursive"` + Parents []string `json:"parents"` + Clusterdag string `json:"clusterdag"` } // ToSerial converts a Pin to PinSerial. @@ -552,17 +669,28 @@ func (pin Pin) ToSerial() PinSerial { if pin.Cid != nil { c = pin.Cid.String() } + cdag := "" + if pin.Clusterdag != nil { + cdag = pin.Clusterdag.String() + } n := pin.Name allocs := PeersToStrings(pin.Allocations) + var parents []string + if pin.Parents != nil { + parents = CidsToStrings(pin.Parents.Keys()) + } return PinSerial{ Cid: c, Name: n, Allocations: allocs, + Type: int(pin.Type), ReplicationFactorMin: pin.ReplicationFactorMin, ReplicationFactorMax: pin.ReplicationFactorMax, Recursive: pin.Recursive, + Parents: parents, + Clusterdag: cdag, } } @@ -581,6 +709,10 @@ func (pin Pin) Equals(pin2 Pin) bool { return false } + if pin1s.Type != pin2s.Type { + return false + } + if pin1s.Recursive != pin2s.Recursive { return false } @@ -599,6 +731,18 @@ func (pin Pin) Equals(pin2 Pin) bool { if pin1s.ReplicationFactorMin != pin2s.ReplicationFactorMin { return false } + + if pin1s.Clusterdag != pin2s.Clusterdag { + return false + } + + sort.Strings(pin1s.Parents) + sort.Strings(pin2s.Parents) + + if strings.Join(pin1s.Parents, ",") != strings.Join(pin2s.Parents, ",") { + return false + } + return true } @@ -608,14 +752,24 @@ func (pins PinSerial) ToPin() Pin { if err != nil { logger.Debug(pins.Cid, err) } + var cdag *cid.Cid + if pins.Clusterdag != "" { + cdag, err = cid.Decode(pins.Clusterdag) + if err != nil { + logger.Error(pins.Clusterdag, err) + } + } return Pin{ Cid: c, Name: pins.Name, Allocations: StringsToPeers(pins.Allocations), + Type: PinType(pins.Type), ReplicationFactorMin: pins.ReplicationFactorMin, ReplicationFactorMax: pins.ReplicationFactorMax, Recursive: pins.Recursive, + Parents: StringsToCidSet(pins.Parents), + Clusterdag: cdag, } } diff --git a/api/types_test.go b/api/types_test.go index eb12667d..7a76cf6c 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -1,6 +1,7 @@ package api import ( + "fmt" "reflect" "testing" "time" @@ -15,6 +16,9 @@ var testMAddr, _ = ma.NewMultiaddr("/ip4/1.2.3.4") var testMAddr2, _ = ma.NewMultiaddr("/dns4/a.b.c.d") var testMAddr3, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/8081/ws/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd") var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") +var testCid2, _ = cid.Decode("QmYCLpFCj9Av8NFjkQogvtXspnTDFWaizLpVFEijHTH4eV") +var testCid3, _ = cid.Decode("QmZmdA3UZKuHuy9FrWsxJ82q21nbEh97NUnxTzF5EHxZia") +var testCid4, _ = cid.Decode("QmZbNfi13Sb2WUDMjiW1ZNhnds5KDk6mJB5hP9B5h9m5CJ") var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") var testPeerID2, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabd") var testPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") @@ -183,19 +187,37 @@ func TestPinConv(t *testing.T) { } }() + parents := cid.NewSet() + parents.Add(testCid2) c := Pin{ Cid: testCid1, Allocations: []peer.ID{testPeerID1}, ReplicationFactorMax: -1, ReplicationFactorMin: -1, + Recursive: true, + Parents: parents, + Name: "A test pin", + Type: CdagType, + Clusterdag: testCid4, } newc := c.ToSerial().ToPin() if c.Cid.String() != newc.Cid.String() || c.Allocations[0] != newc.Allocations[0] || c.ReplicationFactorMin != newc.ReplicationFactorMin || - c.ReplicationFactorMax != newc.ReplicationFactorMax { - t.Error("mismatch") + c.ReplicationFactorMax != newc.ReplicationFactorMax || + c.Recursive != newc.Recursive || + c.Parents.Len() != newc.Parents.Len() || + c.Parents.Keys()[0].String() != newc.Parents.Keys()[0].String() || + c.Name != newc.Name || c.Type != newc.Type || + c.Clusterdag.String() != newc.Clusterdag.String() { + + fmt.Printf("c: %v\ncnew: %v\n", c, newc) + t.Fatal("mismatch") + } + + if !c.Equals(newc) { + t.Error("all pin fields are equal but Equals returns false") } } diff --git a/cluster.go b/cluster.go index 398d58bd..56fcd939 100644 --- a/cluster.go +++ b/cluster.go @@ -10,12 +10,14 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/rpcutil" + "github.com/ipfs/ipfs-cluster/sharder" "github.com/ipfs/ipfs-cluster/state" rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" + p2praft "github.com/libp2p/go-libp2p-raft" ma "github.com/multiformats/go-multiaddr" ) @@ -877,8 +879,8 @@ func (c *Cluster) Pins() []api.Pin { logger.Error(err) return []api.Pin{} } - return cState.List() + } // PinGet returns information for a single Cid managed by Cluster. @@ -916,14 +918,92 @@ func (c *Cluster) Pin(pin api.Pin) error { return err } +// validate pin ensures that the metadata accompanying the cid is +// self-consistent. This amounts to verifying that the data structure matches +// the expected form of the pinType carried in the pin. +func (c *Cluster) validatePin(pin api.Pin, rplMin, rplMax int) error { + switch pin.Type { + case api.DataType: + if pin.Clusterdag != nil || + (pin.Parents != nil && pin.Parents.Len() != 0) { + return errors.New("data pins should not reference other pins") + } + case api.ShardType: + if !pin.Recursive { + return errors.New("must pin shards recursively") + } + // In general multiple clusterdags may reference the same shard + // and sharder sessions typically update a shard pin's metadata. + // Hence we check for an existing shard and carefully update. + cState, err := c.consensus.State() + if err != nil && err != p2praft.ErrNoState { + return err + } + if err == p2praft.ErrNoState || !cState.Has(pin.Cid) { + break + } + + // State already tracks pin's CID + existing := cState.Get(pin.Cid) + // For now all repins of the same shard must use the same + // replmax and replmin. It is unclear what the best UX is here + // especially if the same Shard is referenced in multiple + // clusterdags. This simplistic policy avoids complexity and + // suits existing needs for shard pins. + if existing.ReplicationFactorMin != rplMin || + existing.ReplicationFactorMax != rplMax { + return errors.New("shard update with wrong repl factors") + } + case api.CdagType: + if pin.Recursive { + return errors.New("must pin roots directly") + } + if pin.Clusterdag == nil { + return errors.New("roots must reference a dag") + } + if pin.Parents.Len() > 1 { + return errors.New("cdag nodes are referenced once") + } + case api.MetaType: + if len(pin.Allocations) != 0 { + return errors.New("meta pin should not specify allocations") + } + default: + return errors.New("unrecognized pin type") + } + return nil +} + +// updatePinParents modifies the api.Pin input to give it the correct parents +// so that previous additions to the pins parents are maintained after this +// pin is committed to consensus. If this pin carries new parents they are +// merged with those already existing for this CID +func (c *Cluster) updatePinParents(pin *api.Pin) error { + cState, err := c.consensus.State() + if err != nil && err != p2praft.ErrNoState { + return err + } + // first pin of this cid, nothing to update + if err == p2praft.ErrNoState || !cState.Has(pin.Cid) { + return nil + } + existing := cState.Get(pin.Cid) + // no existing parents this pin is up to date + if existing.Parents == nil || len(existing.Parents.Keys()) == 0 { + return nil + } + for _, c := range existing.Parents.Keys() { + pin.Parents.Add(c) + } + return nil +} + // pin performs the actual pinning and supports a blacklist to be // able to evacuate a node and returns whether the pin was submitted // to the consensus layer or skipped (due to error or to the fact // that it was already valid). func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (bool, error) { - if pin.Cid == nil { - return false, errors.New("bad pin object") - } + // Determine repl factors rplMin := pin.ReplicationFactorMin rplMax := pin.ReplicationFactorMax if rplMin == 0 { @@ -935,9 +1015,26 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) pin.ReplicationFactorMax = rplMax } + // Validate pin + if pin.Cid == nil { + return false, errors.New("bad pin object") + } if err := isReplicationFactorValid(rplMin, rplMax); err != nil { return false, err } + err := c.validatePin(pin, rplMin, rplMax) + if err != nil { + return false, err + } + if pin.Type == api.MetaType { + return true, c.consensus.LogPin(pin) + } + + // Ensure parents do not overwrite existing and merge non-intersecting + err = c.updatePinParents(&pin) + if err != nil { + return false, err + } switch { case rplMin == -1 && rplMax == -1: @@ -973,16 +1070,93 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) // of underlying IPFS daemon unpinning operations. func (c *Cluster) Unpin(h *cid.Cid) error { logger.Info("IPFS cluster unpinning:", h) - - pin := api.Pin{ - Cid: h, - } - - err := c.consensus.LogUnpin(pin) + cState, err := c.consensus.State() if err != nil { return err } - return nil + + if !cState.Has(h) { + return errors.New("cannot unpin pin uncommitted to state") + } + pin := cState.Get(h) + + switch pin.Type { + case api.DataType: + return c.consensus.LogUnpin(pin) + case api.ShardType: + err := "unpinning shard cid %s before unpinning parent" + return errors.New(err) + case api.MetaType: + // Unpin cluster dag and referenced shards + err := c.unpinClusterDag(pin) + if err != nil { + return err + } + return c.consensus.LogUnpin(pin) + case api.CdagType: + err := "unpinning cluster dag root %s before unpinning parent" + return errors.New(err) + default: + return errors.New("unrecognized pin type") + } +} + +// unpinClusterDag unpins the clusterDAG metadata node and the shard metadata +// nodes that it references. It handles the case where multiple parents +// reference the same metadata node, only unpinning those nodes without +// existing references +func (c *Cluster) unpinClusterDag(metaPin api.Pin) error { + if metaPin.Clusterdag == nil { + return errors.New("metaPin not linked to clusterdag") + } + + cdagBytes, err := c.ipfs.BlockGet(metaPin.Clusterdag) + if err != nil { + return err + } + cdag, err := sharder.CborDataToNode(cdagBytes, "cbor") + if err != nil { + return err + } + + // traverse all shards of cdag + for _, shardLink := range cdag.Links() { + err = c.unpinShard(metaPin.Clusterdag, shardLink.Cid) + if err != nil { + return err + } + } + + // by invariant in Pin cdag has only one parent and can be unpinned + cdagWrap := api.PinCid(metaPin.Clusterdag) + return c.consensus.LogUnpin(cdagWrap) +} + +func (c *Cluster) unpinShard(cdagCid, shardCid *cid.Cid) error { + cState, err := c.consensus.State() + if err != nil { + return err + } + if !cState.Has(cdagCid) || !cState.Has(shardCid) { + return errors.New("nodes of the clusterdag are not committed to the state") + } + shardPin := cState.Get(shardCid) + if shardPin.Parents == nil || !shardPin.Parents.Has(cdagCid) { + return errors.New("clusterdag references shard node but shard node does not reference clusterdag as parent") + } + // Remove the parent from the shardPin + for _, c := range shardPin.Parents.Keys() { //parents non-nil by check above + if c.String() == cdagCid.String() { + shardPin.Parents.Remove(c) + break + } + } + + // Recommit state if other references exist + if shardPin.Parents.Len() > 0 { + return c.consensus.LogPin(shardPin) + } + return c.consensus.LogUnpin(shardPin) } // Version returns the current IPFS Cluster version. diff --git a/cluster_test.go b/cluster_test.go index 074ea446..b80a9663 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -90,7 +90,18 @@ func (ipfs *mockConnector) ConnectSwarms() error { retu func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil } func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil } func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil } -func (ipfs *mockConnector) BlockPut(bwf api.NodeWithMeta) (string, error) { return "", nil } +func (ipfs *mockConnector) BlockPut(nwm api.NodeWithMeta) (string, error) { return "", nil } + +func (ipfs *mockConnector) BlockGet(c *cid.Cid) ([]byte, error) { + switch c.String() { + case test.TestShardCid: + return test.TestShardData, nil + case test.TestCdagCid: + return test.TestCdagData, nil + default: + return nil, errors.New("block not found") + } +} func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) { clusterCfg, _, _, consensusCfg, trackerCfg, bmonCfg, psmonCfg, _, sharderCfg := testingConfigs() @@ -231,6 +242,103 @@ func TestClusterPin(t *testing.T) { } } +func pinDirectShard(t *testing.T, cl *Cluster) { + cShard, _ := cid.Decode(test.TestShardCid) + cCdag, _ := cid.Decode(test.TestCdagCid) + cMeta, _ := cid.Decode(test.TestMetaRootCid) + parents := cid.NewSet() + parents.Add(cCdag) + shardPin := api.Pin{ + Cid: cShard, + Type: api.ShardType, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + Recursive: true, + Parents: parents, + } + err := cl.Pin(shardPin) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + parents = cid.NewSet() + parents.Add(cMeta) + cdagPin := api.Pin{ + Cid: cCdag, + Type: api.CdagType, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + Recursive: false, + Parents: parents, + Clusterdag: cShard, + } + err = cl.Pin(cdagPin) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + metaPin := api.Pin{ + Cid: cMeta, + Type: api.MetaType, + Clusterdag: cCdag, + } + err = cl.Pin(metaPin) + if err != nil { + t.Fatal("pin should have worked:", err) + } +} + +func TestClusterPinMeta(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + + pinDirectShard(t, cl) +} + +func TestClusterUnpinShardFail(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + + pinDirectShard(t, cl) + // verify pins + if len(cl.Pins()) != 3 { + t.Fatal("should have 3 pins") + } + // Unpinning metadata should fail + cShard, _ := cid.Decode(test.TestShardCid) + cCdag, _ := cid.Decode(test.TestCdagCid) + + err := cl.Unpin(cShard) + if err == nil { + t.Error("should error when unpinning shard") + } + err = cl.Unpin(cCdag) + if err == nil { + t.Error("should error when unpinning cluster dag") + } +} + +func TestClusterUnpinMeta(t *testing.T) { + cl, _, _, _, _ := testingCluster(t) + defer cleanRaft() + defer cl.Shutdown() + + pinDirectShard(t, cl) + // verify pins + if len(cl.Pins()) != 3 { + t.Fatal("should have 3 pins") + } + // Unpinning from root should work + cMeta, _ := cid.Decode(test.TestMetaRootCid) + + err := cl.Unpin(cMeta) + if err != nil { + t.Error(err) + } +} + func TestClusterPins(t *testing.T) { cl, _, _, _, _ := testingCluster(t) defer cleanRaft() @@ -283,12 +391,23 @@ func TestClusterUnpin(t *testing.T) { defer cl.Shutdown() c, _ := cid.Decode(test.TestCid1) + // Unpin should error without pin being committed to state err := cl.Unpin(c) + if err == nil { + t.Error("unpin should have failed") + } + + // Unpin after pin should succeed + err = cl.Pin(api.PinCid(c)) if err != nil { t.Fatal("pin should have worked:", err) } + err = cl.Unpin(c) + if err != nil { + t.Error("unpin should have worked:", err) + } - // test an error case + // test another error case cl.consensus.Shutdown() err = cl.Unpin(c) if err == nil { diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index 843d3f90..310bc8c7 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -119,6 +119,51 @@ func TestConsensusUnpin(t *testing.T) { } } +func TestConsensusUpdate(t *testing.T) { + cc := testingConsensus(t, 1) + defer cleanRaft(1) + defer cc.Shutdown() + + // Pin first + c1, _ := cid.Decode(test.TestCid1) + pin := api.Pin{ + Cid: c1, + Type: api.ShardType, + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + Parents: nil, + } + err := cc.LogPin(pin) + if err != nil { + t.Fatal("the initial operation did not make it to the log:", err) + } + time.Sleep(250 * time.Millisecond) + + // Update pin + c2, _ := cid.Decode(test.TestCid2) + pin.Parents = cid.NewSet() + pin.Parents.Add(c2) + err = cc.LogPin(pin) + if err != nil { + t.Error("the update op did not make it to the log:", err) + } + + time.Sleep(250 * time.Millisecond) + st, err := cc.State() + if err != nil { + t.Fatal("error getting state:", err) + } + + pins := st.List() + if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 { + t.Error("the added pin should be in the state") + } + if pins[0].Parents == nil || pins[0].Parents.Len() != 1 || + !pins[0].Parents.Has(c2) { + t.Error("pin updated incorrectly") + } +} + func TestConsensusAddPeer(t *testing.T) { cc := testingConsensus(t, 1) cc2 := testingConsensus(t, 2) diff --git a/consensus/raft/log_op.go b/consensus/raft/log_op.go index c2dc66d5..f4f26af3 100644 --- a/consensus/raft/log_op.go +++ b/consensus/raft/log_op.go @@ -61,7 +61,6 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { op.Cid, &struct{}{}, nil) - default: logger.Error("unknown LogOp type. Ignoring") } diff --git a/ipfs-cluster-ctl/formatters.go b/ipfs-cluster-ctl/formatters.go index 00dbf35a..d80d79f3 100644 --- a/ipfs-cluster-ctl/formatters.go +++ b/ipfs-cluster-ctl/formatters.go @@ -181,6 +181,28 @@ func textFormatPrintPin(obj *api.PinSerial) { obj.ReplicationFactorMin, obj.ReplicationFactorMax, sortAlloc) } + var recStr string + if obj.Recursive { + recStr = "Recursive" + } else { + recStr = "Non-recursive" + } + fmt.Printf("| %s | ", recStr) + + pinType := obj.ToPin().Type + typeStr := pinType.String() + var infoStr string + switch pinType { + case api.DataType: + infoStr = typeStr + case api.MetaType: + infoStr = fmt.Sprintf("%s-- clusterDAG=%s",typeStr, obj.Clusterdag) + case api.CdagType, api.ShardType: + infoStr = typeStr + default: + infoStr = "" + } + fmt.Printf("| %s ", infoStr) } func textFormatPrintAddedOutput(obj *api.AddedOutput) { diff --git a/ipfs-cluster-ctl/main.go b/ipfs-cluster-ctl/main.go index d3f6820a..980163e1 100644 --- a/ipfs-cluster-ctl/main.go +++ b/ipfs-cluster-ctl/main.go @@ -446,9 +446,16 @@ which peers they are currently allocated. This list does not include any monitoring information about the IPFS status of the CIDs, it merely represents the list of pins which are part of the shared state of the cluster. For IPFS-status information about the pins, use "status". +Metadata CIDs used to track sharded files are hidden by default. To view +all CIDs call with the -a flag. `, ArgsUsage: "[CID]", - Flags: []cli.Flag{}, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "all, a", + Usage: "display hidden CIDs", + }, + }, Action: func(c *cli.Context) error { cidStr := c.Args().First() if cidStr != "" { @@ -457,7 +464,11 @@ the cluster. For IPFS-status information about the pins, use "status". resp, cerr := globalClient.Allocation(ci) formatResponse(c, resp, cerr) } else { - resp, cerr := globalClient.Allocations() + filter := api.PinType(api.DataType) + if c.Bool("all") { + filter = api.PinType(api.AllType) + } + resp, cerr := globalClient.Allocations(filter) formatResponse(c, resp, cerr) } return nil @@ -472,7 +483,7 @@ the cluster. For IPFS-status information about the pins, use "status". This command retrieves the status of the CIDs tracked by IPFS Cluster, including which member is pinning them and any errors. If a CID is provided, the status will be only fetched for a single -item. +item. Metadata CIDs are included in the status response The status of a CID may not be accurate. A manual sync can be triggered with "sync". diff --git a/ipfscluster.go b/ipfscluster.go index 0f0d4e45..47a9421b 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -92,6 +92,8 @@ type IPFSConnector interface { RepoSize() (uint64, error) // BlockPut directly adds a block of data to the IPFS repo BlockPut(api.NodeWithMeta) (string, error) + // BlockGet retrieves the raw data of an IPFS block + BlockGet(*cid.Cid) ([]byte, error) } // Peered represents a component which needs to be aware of the peers diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 13ed2c66..44b57ad4 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -426,18 +426,22 @@ func TestClustersPin(t *testing.T) { pinList := clusters[0].Pins() for i := 0; i < len(pinList); i++ { + // test re-unpin fails j := rand.Intn(nClusters) // choose a random cluster peer err := clusters[j].Unpin(pinList[i].Cid) if err != nil { t.Errorf("error unpinning %s: %s", pinList[i].Cid, err) } - // test re-unpin - err = clusters[j].Unpin(pinList[i].Cid) - if err != nil { - t.Errorf("error re-unpinning %s: %s", pinList[i].Cid, err) - } - } + delay() + for i := 0; i < nPins; i++ { + j := rand.Intn(nClusters) // choose a random cluster peer + err := clusters[j].Unpin(pinList[i].Cid) + if err == nil { + t.Errorf("expected error re-unpinning %s: %s", pinList[i].Cid, err) + } + } + delay() funpinned := func(t *testing.T, c *Cluster) { status := c.tracker.StatusAll() @@ -1017,6 +1021,7 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) { Cid: h, ReplicationFactorMax: 2, ReplicationFactorMin: 1, + Type: api.DataType, }) if err != nil { t.Fatal(err) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 5d7f8bd1..fcb63f6c 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -1013,3 +1013,9 @@ func (ipfs *Connector) BlockPut(b api.NodeWithMeta) (string, error) { } return keyRaw.Key, nil } + +// BlockGet retrieves an ipfs block with the given cid +func (ipfs *Connector) BlockGet(c *cid.Cid) ([]byte, error) { + url := "block/get?arg=" + c.String() + return ipfs.post(url, "", nil) +} diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index d32e829e..2ff0c240 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -724,6 +724,20 @@ func TestBlockPut(t *testing.T) { } } +func TestBlockGet(t *testing.T) { + ipfs, mock := testIPFSConnector(t) + defer mock.Close() + defer ipfs.Shutdown() + shardCid, err := cid.Decode(test.TestShardCid) + data, err := ipfs.BlockGet(shardCid) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(data, test.TestShardData) { + t.Fatal("unexpected data returned") + } +} + func TestRepoSize(t *testing.T) { ctx := context.Background() ipfs, mock := testIPFSConnector(t) diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go index bf027d98..0827b889 100644 --- a/pintracker/maptracker/maptracker.go +++ b/pintracker/maptracker/maptracker.go @@ -199,6 +199,27 @@ func (mpt *MapPinTracker) Track(c api.Pin) error { return nil } + // TODO: Fix this for sharding + // FIXME: Fix this for sharding + // The problem is remote/unpin operation won't be cancelled + // but I don't know how bad is that + // Also, this is dup code + if c.Type == ShardType { + // cancel any other ops + op := mpt.optracker.TrackNewOperation(c, optracker.OperationSharded, optracker.PhaseInProgress) + if op == nil { + return nil + } + err := mpt.unpin(op) + op.Cancel() + if err != nil { + op.SetError(err) + } else { + op.SetPhase(optracker.PhaseDone) + } + return nil + } + return mpt.enqueue(c, optracker.OperationPin, mpt.pinCh) } diff --git a/rpc_api.go b/rpc_api.go index 953e4817..2dedf8ce 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -303,12 +303,20 @@ func (rpcapi *RPCAPI) IPFSSwarmPeers(ctx context.Context, in struct{}, out *api. } // IPFSBlockPut runs IPFSConnector.BlockPut(). -func (rpcapi *RPCAPI) IPFSBlockPut(in api.NodeWithMeta, out *string) error { +func (rpcapi *RPCAPI) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *string) error { res, err := rpcapi.c.ipfs.BlockPut(in) *out = res return err } +// IPFSBlockGet runs IPFSConnector.BlockGet(). +func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in api.PinSerial, out *[]byte) error { + c := in.ToPin().Cid + res, err := rpcapi.c.ipfs.BlockGet(c) + *out = res + return err +} + /* Consensus component methods */ @@ -347,14 +355,14 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe */ // SharderAddNode runs Sharder.AddNode(node). -func (rpcapi *RPCAPI) SharderAddNode(in api.NodeWithMeta, out *string) error { +func (rpcapi *RPCAPI) SharderAddNode(ctx context.Context, in api.NodeWithMeta, out *string) error { shardID, err := rpcapi.c.sharder.AddNode(in.Size, in.Data, in.Cid, in.ID) *out = shardID return err } // SharderFinalize runs Sharder.Finalize(). -func (rpcapi *RPCAPI) SharderFinalize(in string, out *struct{}) error { +func (rpcapi *RPCAPI) SharderFinalize(ctx context.Context, in string, out *struct{}) error { return rpcapi.c.sharder.Finalize(in) } diff --git a/sharder/clusterdag.go b/sharder/clusterdag.go index d728a39f..8ad0dcc5 100644 --- a/sharder/clusterdag.go +++ b/sharder/clusterdag.go @@ -1,6 +1,9 @@ package sharder -// clusterdag.go defines functions for handling edge cases where clusterDAG +// clusterdag.go defines functions for constructing and parsing ipld-cbor nodes +// of the clusterDAG used to track sharded DAGs in ipfs-cluster + +// Most logic goes into handling the edge cases in which clusterDAG // metadata for a single shard cannot fit within a single shard node. We // make the following simplifying assumption: a single shard will not track // more than 35,808,256 links (~2^25). This is the limit at which the current @@ -16,15 +19,45 @@ package sharder import ( "fmt" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + dag "github.com/ipfs/go-ipfs/merkledag" cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" mh "github.com/multiformats/go-multihash" ) +func init() { + ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock) + ipld.Register(cid.Raw, dag.DecodeRawBlock) + ipld.Register(cid.DagCBOR, cbor.DecodeBlock) // need to decode CBOR +} + // MaxLinks is the max number of links that, when serialized fit into a block const MaxLinks = 5984 const fixedPerLink = 40 +const hashFn = mh.SHA2_256 + +// CborDataToNode parses cbor data into a clusterDAG node while making a few +// checks +func CborDataToNode(raw []byte, format string) (ipld.Node, error) { + if format != "cbor" { + return nil, fmt.Errorf("unexpected shard node format %s", format) + } + shardCid, err := cid.NewPrefixV1(cid.DagCBOR, hashFn).Sum(raw) + if err != nil { + return nil, err + } + shardBlk, err := blocks.NewBlockWithCid(raw, shardCid) + if err != nil { + return nil, err + } + shardNode, err := ipld.Decode(shardBlk) + if err != nil { + return nil, err + } + return shardNode, nil +} // makeDAG parses a shardObj which stores all of the node-links a shardDAG // is responsible for tracking. In general a single node of links may exceed @@ -36,8 +69,8 @@ const fixedPerLink = 40 func makeDAG(obj shardObj) ([]ipld.Node, error) { // No indirect node if len(obj) <= MaxLinks { - node, err := cbor.WrapObject(obj, mh.SHA2_256, - mh.DefaultLengths[mh.SHA2_256]) + node, err := cbor.WrapObject(obj, hashFn, + mh.DefaultLengths[hashFn]) if err != nil { return nil, err } @@ -59,16 +92,16 @@ func makeDAG(obj shardObj) ([]ipld.Node, error) { } leafObj[fmt.Sprintf("%d", j)] = c } - leafNode, err := cbor.WrapObject(leafObj, mh.SHA2_256, - mh.DefaultLengths[mh.SHA2_256]) + leafNode, err := cbor.WrapObject(leafObj, hashFn, + mh.DefaultLengths[hashFn]) if err != nil { return nil, err } indirectObj[fmt.Sprintf("%d", i)] = leafNode.Cid() leafNodes = append(leafNodes, leafNode) } - indirectNode, err := cbor.WrapObject(indirectObj, mh.SHA2_256, - mh.DefaultLengths[mh.SHA2_256]) + indirectNode, err := cbor.WrapObject(indirectObj, hashFn, + mh.DefaultLengths[hashFn]) if err != nil { return nil, err } diff --git a/sharder/sharder_test.go b/sharder/sharder_test.go index 3a0e603a..3e71a564 100644 --- a/sharder/sharder_test.go +++ b/sharder/sharder_test.go @@ -10,7 +10,6 @@ import ( blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" dag "github.com/ipfs/go-ipfs/merkledag" - cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/ipfs-cluster/api" crypto "github.com/libp2p/go-libp2p-crypto" @@ -20,15 +19,8 @@ import ( swarm "github.com/libp2p/go-libp2p-swarm" basichost "github.com/libp2p/go-libp2p/p2p/host/basic" ma "github.com/multiformats/go-multiaddr" - mh "github.com/multiformats/go-multihash" ) -func init() { - ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock) - ipld.Register(cid.Raw, dag.DecodeRawBlock) - ipld.Register(cid.DagCBOR, cbor.DecodeBlock) // need to decode CBOR -} - var nodeDataSet1 = [][]byte{[]byte(`Dag Node 1`), []byte(`Dag Node 2`), []byte(`Dag Node 3`)} var nodeDataSet2 = [][]byte{[]byte(`Dag Node A`), []byte(`Dag Node B`), []byte(`Dag Node C`)} @@ -81,18 +73,18 @@ func makeTestingHost() host.Host { } // GetInformerMetrics does nothing as mock allocator does not check metrics -func (mock *mockRPC) GetInformerMetrics(in struct{}, out *[]api.Metric) error { +func (mock *mockRPC) GetInformerMetrics(ctx context.Context, in struct{}, out *[]api.Metric) error { return nil } // All pins get allocated to the mockRPC's server host -func (mock *mockRPC) Allocate(in api.AllocateInfo, out *[]peer.ID) error { +func (mock *mockRPC) Allocate(ctx context.Context, in api.AllocateInfo, out *[]peer.ID) error { *out = []peer.ID{mock.Host.ID()} return nil } // Record the ordered sequence of BlockPut calls for later validation -func (mock *mockRPC) IPFSBlockPut(in api.NodeWithMeta, out *string) error { +func (mock *mockRPC) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *string) error { mock.orderedPuts[len(mock.orderedPuts)] = in return nil } @@ -101,7 +93,7 @@ func (mock *mockRPC) IPFSBlockPut(in api.NodeWithMeta, out *string) error { // TODO: once the sharder Pinning is stabalized (support for pinning to // specific peers and non-recursive pinning through RPC) we should validate // pinning calls alongside block put calls -func (mock *mockRPC) Pin(in api.PinSerial, out *struct{}) error { +func (mock *mockRPC) Pin(ctx context.Context, in api.PinSerial, out *struct{}) error { return nil } @@ -230,18 +222,7 @@ func verifyNodePuts(t *testing.T, } func cborDataToNode(t *testing.T, putInfo api.NodeWithMeta) ipld.Node { - if putInfo.Format != "cbor" { - t.Fatalf("Unexpected shard node format %s", putInfo.Format) - } - shardCid, err := cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256).Sum(putInfo.Data) - if err != nil { - t.Fatal(err) - } - shardBlk, err := blocks.NewBlockWithCid(putInfo.Data, shardCid) - if err != nil { - t.Fatal(err) - } - shardNode, err := ipld.Decode(shardBlk) + shardNode, err := CborDataToNode(putInfo.Data, putInfo.Format) if err != nil { t.Fatal(err) } diff --git a/state/mapstate/migrate.go b/state/mapstate/migrate.go index c7475e66..b4b263b1 100644 --- a/state/mapstate/migrate.go +++ b/state/mapstate/migrate.go @@ -116,6 +116,9 @@ func (st *mapStateV3) next() migrateable { ReplicationFactorMin: v.ReplicationFactorMin, ReplicationFactorMax: v.ReplicationFactorMax, Recursive: true, + Type: api.DataType, + Parents: nil, + Clusterdag: "", } } return &mst4 diff --git a/test/cids.go b/test/cids.go index f53c6347..fb48cec0 100644 --- a/test/cids.go +++ b/test/cids.go @@ -1,6 +1,8 @@ package test import ( + "encoding/hex" + cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-peer" ) @@ -15,7 +17,15 @@ var ( TestSlowCid1 = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd" // ErrorCid is meant to be used as a Cid which causes errors. i.e. the // ipfs mock fails when pinning this CID. - ErrorCid = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmc" + ErrorCid = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmc" + // Shard and Cdag Cids + TestShardCid = "zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh" + TestCdagCid = "zdpuApF6HZBu8rscHSVJ7ra3VSYWc5dJnnxt42bGKyZ1a4qPo" + TestMetaRootCid = "QmYCLpFCj9Av8NFjkQogvtXspnTDFWaizLpVFEijHTH4eV" + + TestShardData, _ = hex.DecodeString("a16130d82a58230012209273fd63ec94bed5abb219b2d9cb010cabe4af7b0177292d4335eff50464060a") + TestCdagData, _ = hex.DecodeString("a16130d82a5825000171122030e9b9b4f1bc4b5a3759a93b4e77983cd053f84174e1b0cd628dc6c32fb0da14") + TestPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") TestPeerID2, _ = peer.IDB58Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6") TestPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index d5f07f0f..2c8932e7 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -255,6 +255,23 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { } j, _ := json.Marshal(resp) w.Write(j) + case "block/get": + query := r.URL.Query() + arg, ok := query["arg"] + if !ok { + goto ERROR + } + if len(arg) != 1 { + goto ERROR + } + switch arg[0] { + case TestShardCid: + w.Write(TestShardData) + case TestCdagCid: + w.Write(TestCdagData) + default: + goto ERROR + } case "repo/stat": len := len(m.pinMap.List()) resp := mockRepoStatResp{ diff --git a/util.go b/util.go index ccb07d48..3208531a 100644 --- a/util.go +++ b/util.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/ipfs-cluster/api" + cid "github.com/ipfs/go-cid" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" @@ -94,6 +95,15 @@ func containsPeer(list []peer.ID, peer peer.ID) bool { return false } +func containsCid(list []*cid.Cid, ci *cid.Cid) bool { + for _, c := range list { + if c.String() == ci.String() { + return true + } + } + return false +} + func minInt(x, y int) int { if x < y { return x