Pin datastructure updated to support sharding

4 PinTypes specify how CID is pinned
Changes to Pin and Unpin to handle different PinTypes
Tests for different PinTypes
Migration for new state format using new Pin datastructures
Visibility of the PinTypes used internally limited by default

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2018-03-18 15:29:02 -04:00 committed by Hector Sanjuan
parent 2d47e09d7c
commit 238f3726f3
24 changed files with 748 additions and 68 deletions

View File

@ -83,9 +83,9 @@ func (c *Client) Unpin(ci *cid.Cid) error {
// Allocations returns the consensus state listing all tracked items and
// the peers that should be pinning them.
func (c *Client) Allocations() ([]api.Pin, error) {
func (c *Client) Allocations(pinType api.PinType) ([]api.Pin, error) {
var pins []api.PinSerial
err := c.do("GET", "/allocations", nil, &pins)
err := c.do("GET", fmt.Sprintf("/allocations?pintype=%s", pinType.String()), nil, &pins)
result := make([]api.Pin, len(pins))
for i, p := range pins {
result[i] = p.ToPin()

View File

@ -8,6 +8,9 @@ import (
rpc "github.com/hsanjuan/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
cid "github.com/ipfs/go-cid"
types "github.com/ipfs/ipfs-cluster/api"
ma "github.com/multiformats/go-multiaddr"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
@ -166,7 +169,7 @@ func TestAllocations(t *testing.T) {
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
pins, err := c.Allocations()
pins, err := c.Allocations(types.PinType(types.AllType))
if err != nil {
t.Fatal(err)
}

View File

@ -776,13 +776,33 @@ func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
}
}
// filterOutPin returns true if the given pin should be filtered out according
// to the input filter type
func (api *API) filterOutPin(filter types.PinType, pin types.Pin) bool {
if filter == types.AllType {
return false
}
return pin.Type != filter
}
func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
pintype := queryValues.Get("pintype")
filter := types.PinTypeFromString(pintype)
var pins []types.PinSerial
err := api.rpcClient.Call("",
err := api.rpcClient.Call(
"",
"Cluster",
"Pins",
struct{}{},
&pins)
&pins,
)
for i, pinS := range pins {
if api.filterOutPin(filter, pinS.ToPin()) {
// remove this pin from output
pins = append(pins[:i], pins[i+1:]...)
}
}
sendResponse(w, err, pins)
}
@ -951,6 +971,7 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial {
pin := types.PinSerial{
Cid: hash,
Type: types.DataType,
}
queryValues := r.URL.Query()

View File

@ -398,7 +398,7 @@ func TestAPIAllocationsEndpoint(t *testing.T) {
tf := func(t *testing.T, url urlF) {
var resp []api.PinSerial
makeGet(t, rest, url(rest)+"/allocations", &resp)
makeGet(t, rest, url(rest)+"/allocations?a=false", &resp)
if len(resp) != 3 ||
resp[0].Cid != test.TestCid1 || resp[1].Cid != test.TestCid2 ||
resp[2].Cid != test.TestCid3 {

View File

@ -53,6 +53,9 @@ const (
TrackerStatusPinQueued
// The item has been queued for unpinning on the IPFS daemon
TrackerStatusUnpinQueued
// The IPFS daemon is not pinning the item through this cid but it is
// tracked in a cluster dag
TrackerStatusSharded
)
// TrackerStatus represents the status of a tracked Cid in the PinTracker
@ -515,22 +518,133 @@ func (addrsS MultiaddrsSerial) ToMultiaddrs() []ma.Multiaddr {
return addrs
}
// PeersToStrings IDB58Encodes a list of peers.
func PeersToStrings(peers []peer.ID) []string {
strs := make([]string, len(peers))
for i, p := range peers {
if p != "" {
strs[i] = peer.IDB58Encode(p)
}
}
return strs
}
// StringsToPeers decodes peer.IDs from strings.
func StringsToPeers(strs []string) []peer.ID {
peers := make([]peer.ID, len(strs))
for i, p := range strs {
var err error
peers[i], err = peer.IDB58Decode(p)
if err != nil {
logger.Error(p, err)
}
}
return peers
}
// CidsToStrings encodes cid.Cids to strings.
func CidsToStrings(cids []*cid.Cid) []string {
strs := make([]string, len(cids))
for i, c := range cids {
strs[i] = c.String()
}
return strs
}
// StringsToCidSet decodes cid.Cids from strings.
func StringsToCidSet(strs []string) *cid.Set {
cids := cid.NewSet()
for _, str := range strs {
c, err := cid.Decode(str)
if err != nil {
logger.Error(str, err)
}
cids.Add(c)
}
return cids
}
// PinType values
const (
DataType PinType = iota + 1
MetaType
CdagType
ShardType
)
// AllType is a PinType used for filtering all pin types
const AllType = -1
// PinType specifies which of four possible interpretations a pin represents.
// DataType pins are the simplest and represent a pin in the pinset used to
// directly track user data. ShardType pins are metadata pins that track
// many nodes in a user's data DAG. ShardType pins have a parent pin, and in
// general can have many parents. ClusterDAG, or Cdag for short, pins are also
// metadata pins that do not directly track user data DAGs but rather other
// metadata pins. CdagType pins have at least one parent. Finally MetaType
// pins always track the cid of the root of a user-tracked data DAG. However
// MetaType pins are not stored directly in the ipfs pinset. Instead the
// underlying DAG is tracked via the metadata pins underneath the root of a
// CdagType pin
type PinType int
// PinTypeFromString is the inverse of String. It returns the PinType value
// corresponding to the input string
func PinTypeFromString(str string) PinType {
switch str {
case "pin":
return DataType
case "meta-pin":
return MetaType
case "clusterdag-pin":
return CdagType
case "shard-pin":
return ShardType
case "all":
return AllType
default:
return PinType(0) // invalid string
}
}
// String returns a printable value to identify the PinType
func (pT *PinType) String() string {
switch *pT {
case DataType:
return "pin"
case MetaType:
return "meta-pin"
case CdagType:
return "clusterdag-pin"
case ShardType:
return "shard-pin"
case AllType:
return "all"
default:
panic("String() called on invalid shard type")
}
}
// Pin is an argument that carries a Cid. It may carry more things in the
// future.
type Pin struct {
Cid *cid.Cid
Name string
Type PinType
Allocations []peer.ID
ReplicationFactorMin int
ReplicationFactorMax int
Recursive bool
Parents *cid.Set
Clusterdag *cid.Cid
}
// PinCid is a shortcut to create a Pin only with a Cid. Default is for pin to
// be recursive
// be recursive and the pin to be of DataType
func PinCid(c *cid.Cid) Pin {
return Pin{
Cid: c,
Type: DataType,
Allocations: []peer.ID{},
Recursive: true,
}
@ -540,10 +654,13 @@ func PinCid(c *cid.Cid) Pin {
type PinSerial struct {
Cid string `json:"cid"`
Name string `json:"name"`
Type int `json:"type"`
Allocations []string `json:"allocations"`
ReplicationFactorMin int `json:"replication_factor_min"`
ReplicationFactorMax int `json:"replication_factor_max"`
Recursive bool `json:"recursive"`
Parents []string `json:"parents"`
Clusterdag string `json:"clusterdag"`
}
// ToSerial converts a Pin to PinSerial.
@ -552,17 +669,28 @@ func (pin Pin) ToSerial() PinSerial {
if pin.Cid != nil {
c = pin.Cid.String()
}
cdag := ""
if pin.Clusterdag != nil {
cdag = pin.Clusterdag.String()
}
n := pin.Name
allocs := PeersToStrings(pin.Allocations)
var parents []string
if pin.Parents != nil {
parents = CidsToStrings(pin.Parents.Keys())
}
return PinSerial{
Cid: c,
Name: n,
Allocations: allocs,
Type: int(pin.Type),
ReplicationFactorMin: pin.ReplicationFactorMin,
ReplicationFactorMax: pin.ReplicationFactorMax,
Recursive: pin.Recursive,
Parents: parents,
Clusterdag: cdag,
}
}
@ -581,6 +709,10 @@ func (pin Pin) Equals(pin2 Pin) bool {
return false
}
if pin1s.Type != pin2s.Type {
return false
}
if pin1s.Recursive != pin2s.Recursive {
return false
}
@ -599,6 +731,18 @@ func (pin Pin) Equals(pin2 Pin) bool {
if pin1s.ReplicationFactorMin != pin2s.ReplicationFactorMin {
return false
}
if pin1s.Clusterdag != pin2s.Clusterdag {
return false
}
sort.Strings(pin1s.Parents)
sort.Strings(pin2s.Parents)
if strings.Join(pin1s.Parents, ",") != strings.Join(pin2s.Parents, ",") {
return false
}
return true
}
@ -608,14 +752,24 @@ func (pins PinSerial) ToPin() Pin {
if err != nil {
logger.Debug(pins.Cid, err)
}
var cdag *cid.Cid
if pins.Clusterdag != "" {
cdag, err = cid.Decode(pins.Clusterdag)
if err != nil {
logger.Error(pins.Clusterdag, err)
}
}
return Pin{
Cid: c,
Name: pins.Name,
Allocations: StringsToPeers(pins.Allocations),
Type: PinType(pins.Type),
ReplicationFactorMin: pins.ReplicationFactorMin,
ReplicationFactorMax: pins.ReplicationFactorMax,
Recursive: pins.Recursive,
Parents: StringsToCidSet(pins.Parents),
Clusterdag: cdag,
}
}

View File

@ -1,6 +1,7 @@
package api
import (
"fmt"
"reflect"
"testing"
"time"
@ -15,6 +16,9 @@ var testMAddr, _ = ma.NewMultiaddr("/ip4/1.2.3.4")
var testMAddr2, _ = ma.NewMultiaddr("/dns4/a.b.c.d")
var testMAddr3, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/8081/ws/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd")
var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
var testCid2, _ = cid.Decode("QmYCLpFCj9Av8NFjkQogvtXspnTDFWaizLpVFEijHTH4eV")
var testCid3, _ = cid.Decode("QmZmdA3UZKuHuy9FrWsxJ82q21nbEh97NUnxTzF5EHxZia")
var testCid4, _ = cid.Decode("QmZbNfi13Sb2WUDMjiW1ZNhnds5KDk6mJB5hP9B5h9m5CJ")
var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
var testPeerID2, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabd")
var testPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
@ -183,19 +187,37 @@ func TestPinConv(t *testing.T) {
}
}()
parents := cid.NewSet()
parents.Add(testCid2)
c := Pin{
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
Recursive: true,
Parents: parents,
Name: "A test pin",
Type: CdagType,
Clusterdag: testCid4,
}
newc := c.ToSerial().ToPin()
if c.Cid.String() != newc.Cid.String() ||
c.Allocations[0] != newc.Allocations[0] ||
c.ReplicationFactorMin != newc.ReplicationFactorMin ||
c.ReplicationFactorMax != newc.ReplicationFactorMax {
t.Error("mismatch")
c.ReplicationFactorMax != newc.ReplicationFactorMax ||
c.Recursive != newc.Recursive ||
c.Parents.Len() != newc.Parents.Len() ||
c.Parents.Keys()[0].String() != newc.Parents.Keys()[0].String() ||
c.Name != newc.Name || c.Type != newc.Type ||
c.Clusterdag.String() != newc.Clusterdag.String() {
fmt.Printf("c: %v\ncnew: %v\n", c, newc)
t.Fatal("mismatch")
}
if !c.Equals(newc) {
t.Error("all pin fields are equal but Equals returns false")
}
}

View File

@ -10,12 +10,14 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/sharder"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
p2praft "github.com/libp2p/go-libp2p-raft"
ma "github.com/multiformats/go-multiaddr"
)
@ -877,8 +879,8 @@ func (c *Cluster) Pins() []api.Pin {
logger.Error(err)
return []api.Pin{}
}
return cState.List()
}
// PinGet returns information for a single Cid managed by Cluster.
@ -916,14 +918,92 @@ func (c *Cluster) Pin(pin api.Pin) error {
return err
}
// validate pin ensures that the metadata accompanying the cid is
// self-consistent. This amounts to verifying that the data structure matches
// the expected form of the pinType carried in the pin.
func (c *Cluster) validatePin(pin api.Pin, rplMin, rplMax int) error {
switch pin.Type {
case api.DataType:
if pin.Clusterdag != nil ||
(pin.Parents != nil && pin.Parents.Len() != 0) {
return errors.New("data pins should not reference other pins")
}
case api.ShardType:
if !pin.Recursive {
return errors.New("must pin shards recursively")
}
// In general multiple clusterdags may reference the same shard
// and sharder sessions typically update a shard pin's metadata.
// Hence we check for an existing shard and carefully update.
cState, err := c.consensus.State()
if err != nil && err != p2praft.ErrNoState {
return err
}
if err == p2praft.ErrNoState || !cState.Has(pin.Cid) {
break
}
// State already tracks pin's CID
existing := cState.Get(pin.Cid)
// For now all repins of the same shard must use the same
// replmax and replmin. It is unclear what the best UX is here
// especially if the same Shard is referenced in multiple
// clusterdags. This simplistic policy avoids complexity and
// suits existing needs for shard pins.
if existing.ReplicationFactorMin != rplMin ||
existing.ReplicationFactorMax != rplMax {
return errors.New("shard update with wrong repl factors")
}
case api.CdagType:
if pin.Recursive {
return errors.New("must pin roots directly")
}
if pin.Clusterdag == nil {
return errors.New("roots must reference a dag")
}
if pin.Parents.Len() > 1 {
return errors.New("cdag nodes are referenced once")
}
case api.MetaType:
if len(pin.Allocations) != 0 {
return errors.New("meta pin should not specify allocations")
}
default:
return errors.New("unrecognized pin type")
}
return nil
}
// updatePinParents modifies the api.Pin input to give it the correct parents
// so that previous additions to the pins parents are maintained after this
// pin is committed to consensus. If this pin carries new parents they are
// merged with those already existing for this CID
func (c *Cluster) updatePinParents(pin *api.Pin) error {
cState, err := c.consensus.State()
if err != nil && err != p2praft.ErrNoState {
return err
}
// first pin of this cid, nothing to update
if err == p2praft.ErrNoState || !cState.Has(pin.Cid) {
return nil
}
existing := cState.Get(pin.Cid)
// no existing parents this pin is up to date
if existing.Parents == nil || len(existing.Parents.Keys()) == 0 {
return nil
}
for _, c := range existing.Parents.Keys() {
pin.Parents.Add(c)
}
return nil
}
// pin performs the actual pinning and supports a blacklist to be
// able to evacuate a node and returns whether the pin was submitted
// to the consensus layer or skipped (due to error or to the fact
// that it was already valid).
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (bool, error) {
if pin.Cid == nil {
return false, errors.New("bad pin object")
}
// Determine repl factors
rplMin := pin.ReplicationFactorMin
rplMax := pin.ReplicationFactorMax
if rplMin == 0 {
@ -935,9 +1015,26 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID)
pin.ReplicationFactorMax = rplMax
}
// Validate pin
if pin.Cid == nil {
return false, errors.New("bad pin object")
}
if err := isReplicationFactorValid(rplMin, rplMax); err != nil {
return false, err
}
err := c.validatePin(pin, rplMin, rplMax)
if err != nil {
return false, err
}
if pin.Type == api.MetaType {
return true, c.consensus.LogPin(pin)
}
// Ensure parents do not overwrite existing and merge non-intersecting
err = c.updatePinParents(&pin)
if err != nil {
return false, err
}
switch {
case rplMin == -1 && rplMax == -1:
@ -973,16 +1070,93 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID)
// of underlying IPFS daemon unpinning operations.
func (c *Cluster) Unpin(h *cid.Cid) error {
logger.Info("IPFS cluster unpinning:", h)
pin := api.Pin{
Cid: h,
}
err := c.consensus.LogUnpin(pin)
cState, err := c.consensus.State()
if err != nil {
return err
}
return nil
if !cState.Has(h) {
return errors.New("cannot unpin pin uncommitted to state")
}
pin := cState.Get(h)
switch pin.Type {
case api.DataType:
return c.consensus.LogUnpin(pin)
case api.ShardType:
err := "unpinning shard cid %s before unpinning parent"
return errors.New(err)
case api.MetaType:
// Unpin cluster dag and referenced shards
err := c.unpinClusterDag(pin)
if err != nil {
return err
}
return c.consensus.LogUnpin(pin)
case api.CdagType:
err := "unpinning cluster dag root %s before unpinning parent"
return errors.New(err)
default:
return errors.New("unrecognized pin type")
}
}
// unpinClusterDag unpins the clusterDAG metadata node and the shard metadata
// nodes that it references. It handles the case where multiple parents
// reference the same metadata node, only unpinning those nodes without
// existing references
func (c *Cluster) unpinClusterDag(metaPin api.Pin) error {
if metaPin.Clusterdag == nil {
return errors.New("metaPin not linked to clusterdag")
}
cdagBytes, err := c.ipfs.BlockGet(metaPin.Clusterdag)
if err != nil {
return err
}
cdag, err := sharder.CborDataToNode(cdagBytes, "cbor")
if err != nil {
return err
}
// traverse all shards of cdag
for _, shardLink := range cdag.Links() {
err = c.unpinShard(metaPin.Clusterdag, shardLink.Cid)
if err != nil {
return err
}
}
// by invariant in Pin cdag has only one parent and can be unpinned
cdagWrap := api.PinCid(metaPin.Clusterdag)
return c.consensus.LogUnpin(cdagWrap)
}
func (c *Cluster) unpinShard(cdagCid, shardCid *cid.Cid) error {
cState, err := c.consensus.State()
if err != nil {
return err
}
if !cState.Has(cdagCid) || !cState.Has(shardCid) {
return errors.New("nodes of the clusterdag are not committed to the state")
}
shardPin := cState.Get(shardCid)
if shardPin.Parents == nil || !shardPin.Parents.Has(cdagCid) {
return errors.New("clusterdag references shard node but shard node does not reference clusterdag as parent")
}
// Remove the parent from the shardPin
for _, c := range shardPin.Parents.Keys() { //parents non-nil by check above
if c.String() == cdagCid.String() {
shardPin.Parents.Remove(c)
break
}
}
// Recommit state if other references exist
if shardPin.Parents.Len() > 0 {
return c.consensus.LogPin(shardPin)
}
return c.consensus.LogUnpin(shardPin)
}
// Version returns the current IPFS Cluster version.

View File

@ -90,7 +90,18 @@ func (ipfs *mockConnector) ConnectSwarms() error { retu
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil }
func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil }
func (ipfs *mockConnector) BlockPut(bwf api.NodeWithMeta) (string, error) { return "", nil }
func (ipfs *mockConnector) BlockPut(nwm api.NodeWithMeta) (string, error) { return "", nil }
func (ipfs *mockConnector) BlockGet(c *cid.Cid) ([]byte, error) {
switch c.String() {
case test.TestShardCid:
return test.TestShardData, nil
case test.TestCdagCid:
return test.TestCdagData, nil
default:
return nil, errors.New("block not found")
}
}
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {
clusterCfg, _, _, consensusCfg, trackerCfg, bmonCfg, psmonCfg, _, sharderCfg := testingConfigs()
@ -231,6 +242,103 @@ func TestClusterPin(t *testing.T) {
}
}
func pinDirectShard(t *testing.T, cl *Cluster) {
cShard, _ := cid.Decode(test.TestShardCid)
cCdag, _ := cid.Decode(test.TestCdagCid)
cMeta, _ := cid.Decode(test.TestMetaRootCid)
parents := cid.NewSet()
parents.Add(cCdag)
shardPin := api.Pin{
Cid: cShard,
Type: api.ShardType,
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
Recursive: true,
Parents: parents,
}
err := cl.Pin(shardPin)
if err != nil {
t.Fatal("pin should have worked:", err)
}
parents = cid.NewSet()
parents.Add(cMeta)
cdagPin := api.Pin{
Cid: cCdag,
Type: api.CdagType,
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
Recursive: false,
Parents: parents,
Clusterdag: cShard,
}
err = cl.Pin(cdagPin)
if err != nil {
t.Fatal("pin should have worked:", err)
}
metaPin := api.Pin{
Cid: cMeta,
Type: api.MetaType,
Clusterdag: cCdag,
}
err = cl.Pin(metaPin)
if err != nil {
t.Fatal("pin should have worked:", err)
}
}
func TestClusterPinMeta(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
pinDirectShard(t, cl)
}
func TestClusterUnpinShardFail(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
pinDirectShard(t, cl)
// verify pins
if len(cl.Pins()) != 3 {
t.Fatal("should have 3 pins")
}
// Unpinning metadata should fail
cShard, _ := cid.Decode(test.TestShardCid)
cCdag, _ := cid.Decode(test.TestCdagCid)
err := cl.Unpin(cShard)
if err == nil {
t.Error("should error when unpinning shard")
}
err = cl.Unpin(cCdag)
if err == nil {
t.Error("should error when unpinning cluster dag")
}
}
func TestClusterUnpinMeta(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
pinDirectShard(t, cl)
// verify pins
if len(cl.Pins()) != 3 {
t.Fatal("should have 3 pins")
}
// Unpinning from root should work
cMeta, _ := cid.Decode(test.TestMetaRootCid)
err := cl.Unpin(cMeta)
if err != nil {
t.Error(err)
}
}
func TestClusterPins(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
@ -283,12 +391,23 @@ func TestClusterUnpin(t *testing.T) {
defer cl.Shutdown()
c, _ := cid.Decode(test.TestCid1)
// Unpin should error without pin being committed to state
err := cl.Unpin(c)
if err == nil {
t.Error("unpin should have failed")
}
// Unpin after pin should succeed
err = cl.Pin(api.PinCid(c))
if err != nil {
t.Fatal("pin should have worked:", err)
}
err = cl.Unpin(c)
if err != nil {
t.Error("unpin should have worked:", err)
}
// test an error case
// test another error case
cl.consensus.Shutdown()
err = cl.Unpin(c)
if err == nil {

View File

@ -119,6 +119,51 @@ func TestConsensusUnpin(t *testing.T) {
}
}
func TestConsensusUpdate(t *testing.T) {
cc := testingConsensus(t, 1)
defer cleanRaft(1)
defer cc.Shutdown()
// Pin first
c1, _ := cid.Decode(test.TestCid1)
pin := api.Pin{
Cid: c1,
Type: api.ShardType,
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
Parents: nil,
}
err := cc.LogPin(pin)
if err != nil {
t.Fatal("the initial operation did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
// Update pin
c2, _ := cid.Decode(test.TestCid2)
pin.Parents = cid.NewSet()
pin.Parents.Add(c2)
err = cc.LogPin(pin)
if err != nil {
t.Error("the update op did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
st, err := cc.State()
if err != nil {
t.Fatal("error getting state:", err)
}
pins := st.List()
if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 {
t.Error("the added pin should be in the state")
}
if pins[0].Parents == nil || pins[0].Parents.Len() != 1 ||
!pins[0].Parents.Has(c2) {
t.Error("pin updated incorrectly")
}
}
func TestConsensusAddPeer(t *testing.T) {
cc := testingConsensus(t, 1)
cc2 := testingConsensus(t, 2)

View File

@ -61,7 +61,6 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
op.Cid,
&struct{}{},
nil)
default:
logger.Error("unknown LogOp type. Ignoring")
}

View File

@ -181,6 +181,28 @@ func textFormatPrintPin(obj *api.PinSerial) {
obj.ReplicationFactorMin, obj.ReplicationFactorMax,
sortAlloc)
}
var recStr string
if obj.Recursive {
recStr = "Recursive"
} else {
recStr = "Non-recursive"
}
fmt.Printf("| %s | ", recStr)
pinType := obj.ToPin().Type
typeStr := pinType.String()
var infoStr string
switch pinType {
case api.DataType:
infoStr = typeStr
case api.MetaType:
infoStr = fmt.Sprintf("%s-- clusterDAG=%s",typeStr, obj.Clusterdag)
case api.CdagType, api.ShardType:
infoStr = typeStr
default:
infoStr = ""
}
fmt.Printf("| %s ", infoStr)
}
func textFormatPrintAddedOutput(obj *api.AddedOutput) {

View File

@ -446,9 +446,16 @@ which peers they are currently allocated. This list does not include
any monitoring information about the IPFS status of the CIDs, it
merely represents the list of pins which are part of the shared state of
the cluster. For IPFS-status information about the pins, use "status".
Metadata CIDs used to track sharded files are hidden by default. To view
all CIDs call with the -a flag.
`,
ArgsUsage: "[CID]",
Flags: []cli.Flag{},
Flags: []cli.Flag{
cli.BoolFlag{
Name: "all, a",
Usage: "display hidden CIDs",
},
},
Action: func(c *cli.Context) error {
cidStr := c.Args().First()
if cidStr != "" {
@ -457,7 +464,11 @@ the cluster. For IPFS-status information about the pins, use "status".
resp, cerr := globalClient.Allocation(ci)
formatResponse(c, resp, cerr)
} else {
resp, cerr := globalClient.Allocations()
filter := api.PinType(api.DataType)
if c.Bool("all") {
filter = api.PinType(api.AllType)
}
resp, cerr := globalClient.Allocations(filter)
formatResponse(c, resp, cerr)
}
return nil
@ -472,7 +483,7 @@ the cluster. For IPFS-status information about the pins, use "status".
This command retrieves the status of the CIDs tracked by IPFS
Cluster, including which member is pinning them and any errors.
If a CID is provided, the status will be only fetched for a single
item.
item. Metadata CIDs are included in the status response
The status of a CID may not be accurate. A manual sync can be triggered
with "sync".

View File

@ -92,6 +92,8 @@ type IPFSConnector interface {
RepoSize() (uint64, error)
// BlockPut directly adds a block of data to the IPFS repo
BlockPut(api.NodeWithMeta) (string, error)
// BlockGet retrieves the raw data of an IPFS block
BlockGet(*cid.Cid) ([]byte, error)
}
// Peered represents a component which needs to be aware of the peers

View File

@ -426,18 +426,22 @@ func TestClustersPin(t *testing.T) {
pinList := clusters[0].Pins()
for i := 0; i < len(pinList); i++ {
// test re-unpin fails
j := rand.Intn(nClusters) // choose a random cluster peer
err := clusters[j].Unpin(pinList[i].Cid)
if err != nil {
t.Errorf("error unpinning %s: %s", pinList[i].Cid, err)
}
// test re-unpin
err = clusters[j].Unpin(pinList[i].Cid)
if err != nil {
t.Errorf("error re-unpinning %s: %s", pinList[i].Cid, err)
}
delay()
for i := 0; i < nPins; i++ {
j := rand.Intn(nClusters) // choose a random cluster peer
err := clusters[j].Unpin(pinList[i].Cid)
if err == nil {
t.Errorf("expected error re-unpinning %s: %s", pinList[i].Cid, err)
}
}
}
delay()
funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll()
@ -1017,6 +1021,7 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) {
Cid: h,
ReplicationFactorMax: 2,
ReplicationFactorMin: 1,
Type: api.DataType,
})
if err != nil {
t.Fatal(err)

View File

@ -1013,3 +1013,9 @@ func (ipfs *Connector) BlockPut(b api.NodeWithMeta) (string, error) {
}
return keyRaw.Key, nil
}
// BlockGet retrieves an ipfs block with the given cid
func (ipfs *Connector) BlockGet(c *cid.Cid) ([]byte, error) {
url := "block/get?arg=" + c.String()
return ipfs.post(url, "", nil)
}

View File

@ -724,6 +724,20 @@ func TestBlockPut(t *testing.T) {
}
}
func TestBlockGet(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
shardCid, err := cid.Decode(test.TestShardCid)
data, err := ipfs.BlockGet(shardCid)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, test.TestShardData) {
t.Fatal("unexpected data returned")
}
}
func TestRepoSize(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)

View File

@ -199,6 +199,27 @@ func (mpt *MapPinTracker) Track(c api.Pin) error {
return nil
}
// TODO: Fix this for sharding
// FIXME: Fix this for sharding
// The problem is remote/unpin operation won't be cancelled
// but I don't know how bad is that
// Also, this is dup code
if c.Type == ShardType {
// cancel any other ops
op := mpt.optracker.TrackNewOperation(c, optracker.OperationSharded, optracker.PhaseInProgress)
if op == nil {
return nil
}
err := mpt.unpin(op)
op.Cancel()
if err != nil {
op.SetError(err)
} else {
op.SetPhase(optracker.PhaseDone)
}
return nil
}
return mpt.enqueue(c, optracker.OperationPin, mpt.pinCh)
}

View File

@ -303,12 +303,20 @@ func (rpcapi *RPCAPI) IPFSSwarmPeers(ctx context.Context, in struct{}, out *api.
}
// IPFSBlockPut runs IPFSConnector.BlockPut().
func (rpcapi *RPCAPI) IPFSBlockPut(in api.NodeWithMeta, out *string) error {
func (rpcapi *RPCAPI) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *string) error {
res, err := rpcapi.c.ipfs.BlockPut(in)
*out = res
return err
}
// IPFSBlockGet runs IPFSConnector.BlockGet().
func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in api.PinSerial, out *[]byte) error {
c := in.ToPin().Cid
res, err := rpcapi.c.ipfs.BlockGet(c)
*out = res
return err
}
/*
Consensus component methods
*/
@ -347,14 +355,14 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe
*/
// SharderAddNode runs Sharder.AddNode(node).
func (rpcapi *RPCAPI) SharderAddNode(in api.NodeWithMeta, out *string) error {
func (rpcapi *RPCAPI) SharderAddNode(ctx context.Context, in api.NodeWithMeta, out *string) error {
shardID, err := rpcapi.c.sharder.AddNode(in.Size, in.Data, in.Cid, in.ID)
*out = shardID
return err
}
// SharderFinalize runs Sharder.Finalize().
func (rpcapi *RPCAPI) SharderFinalize(in string, out *struct{}) error {
func (rpcapi *RPCAPI) SharderFinalize(ctx context.Context, in string, out *struct{}) error {
return rpcapi.c.sharder.Finalize(in)
}

View File

@ -1,6 +1,9 @@
package sharder
// clusterdag.go defines functions for handling edge cases where clusterDAG
// clusterdag.go defines functions for constructing and parsing ipld-cbor nodes
// of the clusterDAG used to track sharded DAGs in ipfs-cluster
// Most logic goes into handling the edge cases in which clusterDAG
// metadata for a single shard cannot fit within a single shard node. We
// make the following simplifying assumption: a single shard will not track
// more than 35,808,256 links (~2^25). This is the limit at which the current
@ -16,15 +19,45 @@ package sharder
import (
"fmt"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
dag "github.com/ipfs/go-ipfs/merkledag"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
mh "github.com/multiformats/go-multihash"
)
func init() {
ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock)
ipld.Register(cid.Raw, dag.DecodeRawBlock)
ipld.Register(cid.DagCBOR, cbor.DecodeBlock) // need to decode CBOR
}
// MaxLinks is the max number of links that, when serialized fit into a block
const MaxLinks = 5984
const fixedPerLink = 40
const hashFn = mh.SHA2_256
// CborDataToNode parses cbor data into a clusterDAG node while making a few
// checks
func CborDataToNode(raw []byte, format string) (ipld.Node, error) {
if format != "cbor" {
return nil, fmt.Errorf("unexpected shard node format %s", format)
}
shardCid, err := cid.NewPrefixV1(cid.DagCBOR, hashFn).Sum(raw)
if err != nil {
return nil, err
}
shardBlk, err := blocks.NewBlockWithCid(raw, shardCid)
if err != nil {
return nil, err
}
shardNode, err := ipld.Decode(shardBlk)
if err != nil {
return nil, err
}
return shardNode, nil
}
// makeDAG parses a shardObj which stores all of the node-links a shardDAG
// is responsible for tracking. In general a single node of links may exceed
@ -36,8 +69,8 @@ const fixedPerLink = 40
func makeDAG(obj shardObj) ([]ipld.Node, error) {
// No indirect node
if len(obj) <= MaxLinks {
node, err := cbor.WrapObject(obj, mh.SHA2_256,
mh.DefaultLengths[mh.SHA2_256])
node, err := cbor.WrapObject(obj, hashFn,
mh.DefaultLengths[hashFn])
if err != nil {
return nil, err
}
@ -59,16 +92,16 @@ func makeDAG(obj shardObj) ([]ipld.Node, error) {
}
leafObj[fmt.Sprintf("%d", j)] = c
}
leafNode, err := cbor.WrapObject(leafObj, mh.SHA2_256,
mh.DefaultLengths[mh.SHA2_256])
leafNode, err := cbor.WrapObject(leafObj, hashFn,
mh.DefaultLengths[hashFn])
if err != nil {
return nil, err
}
indirectObj[fmt.Sprintf("%d", i)] = leafNode.Cid()
leafNodes = append(leafNodes, leafNode)
}
indirectNode, err := cbor.WrapObject(indirectObj, mh.SHA2_256,
mh.DefaultLengths[mh.SHA2_256])
indirectNode, err := cbor.WrapObject(indirectObj, hashFn,
mh.DefaultLengths[hashFn])
if err != nil {
return nil, err
}

View File

@ -10,7 +10,6 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
dag "github.com/ipfs/go-ipfs/merkledag"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/ipfs-cluster/api"
crypto "github.com/libp2p/go-libp2p-crypto"
@ -20,15 +19,8 @@ import (
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
)
func init() {
ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock)
ipld.Register(cid.Raw, dag.DecodeRawBlock)
ipld.Register(cid.DagCBOR, cbor.DecodeBlock) // need to decode CBOR
}
var nodeDataSet1 = [][]byte{[]byte(`Dag Node 1`), []byte(`Dag Node 2`), []byte(`Dag Node 3`)}
var nodeDataSet2 = [][]byte{[]byte(`Dag Node A`), []byte(`Dag Node B`), []byte(`Dag Node C`)}
@ -81,18 +73,18 @@ func makeTestingHost() host.Host {
}
// GetInformerMetrics does nothing as mock allocator does not check metrics
func (mock *mockRPC) GetInformerMetrics(in struct{}, out *[]api.Metric) error {
func (mock *mockRPC) GetInformerMetrics(ctx context.Context, in struct{}, out *[]api.Metric) error {
return nil
}
// All pins get allocated to the mockRPC's server host
func (mock *mockRPC) Allocate(in api.AllocateInfo, out *[]peer.ID) error {
func (mock *mockRPC) Allocate(ctx context.Context, in api.AllocateInfo, out *[]peer.ID) error {
*out = []peer.ID{mock.Host.ID()}
return nil
}
// Record the ordered sequence of BlockPut calls for later validation
func (mock *mockRPC) IPFSBlockPut(in api.NodeWithMeta, out *string) error {
func (mock *mockRPC) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *string) error {
mock.orderedPuts[len(mock.orderedPuts)] = in
return nil
}
@ -101,7 +93,7 @@ func (mock *mockRPC) IPFSBlockPut(in api.NodeWithMeta, out *string) error {
// TODO: once the sharder Pinning is stabalized (support for pinning to
// specific peers and non-recursive pinning through RPC) we should validate
// pinning calls alongside block put calls
func (mock *mockRPC) Pin(in api.PinSerial, out *struct{}) error {
func (mock *mockRPC) Pin(ctx context.Context, in api.PinSerial, out *struct{}) error {
return nil
}
@ -230,18 +222,7 @@ func verifyNodePuts(t *testing.T,
}
func cborDataToNode(t *testing.T, putInfo api.NodeWithMeta) ipld.Node {
if putInfo.Format != "cbor" {
t.Fatalf("Unexpected shard node format %s", putInfo.Format)
}
shardCid, err := cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256).Sum(putInfo.Data)
if err != nil {
t.Fatal(err)
}
shardBlk, err := blocks.NewBlockWithCid(putInfo.Data, shardCid)
if err != nil {
t.Fatal(err)
}
shardNode, err := ipld.Decode(shardBlk)
shardNode, err := CborDataToNode(putInfo.Data, putInfo.Format)
if err != nil {
t.Fatal(err)
}

View File

@ -116,6 +116,9 @@ func (st *mapStateV3) next() migrateable {
ReplicationFactorMin: v.ReplicationFactorMin,
ReplicationFactorMax: v.ReplicationFactorMax,
Recursive: true,
Type: api.DataType,
Parents: nil,
Clusterdag: "",
}
}
return &mst4

View File

@ -1,6 +1,8 @@
package test
import (
"encoding/hex"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
@ -16,6 +18,14 @@ var (
// ErrorCid is meant to be used as a Cid which causes errors. i.e. the
// ipfs mock fails when pinning this CID.
ErrorCid = "QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmc"
// Shard and Cdag Cids
TestShardCid = "zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh"
TestCdagCid = "zdpuApF6HZBu8rscHSVJ7ra3VSYWc5dJnnxt42bGKyZ1a4qPo"
TestMetaRootCid = "QmYCLpFCj9Av8NFjkQogvtXspnTDFWaizLpVFEijHTH4eV"
TestShardData, _ = hex.DecodeString("a16130d82a58230012209273fd63ec94bed5abb219b2d9cb010cabe4af7b0177292d4335eff50464060a")
TestCdagData, _ = hex.DecodeString("a16130d82a5825000171122030e9b9b4f1bc4b5a3759a93b4e77983cd053f84174e1b0cd628dc6c32fb0da14")
TestPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
TestPeerID2, _ = peer.IDB58Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
TestPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")

View File

@ -255,6 +255,23 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
}
j, _ := json.Marshal(resp)
w.Write(j)
case "block/get":
query := r.URL.Query()
arg, ok := query["arg"]
if !ok {
goto ERROR
}
if len(arg) != 1 {
goto ERROR
}
switch arg[0] {
case TestShardCid:
w.Write(TestShardData)
case TestCdagCid:
w.Write(TestCdagData)
default:
goto ERROR
}
case "repo/stat":
len := len(m.pinMap.List())
resp := mockRepoStatResp{

10
util.go
View File

@ -6,6 +6,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
@ -94,6 +95,15 @@ func containsPeer(list []peer.ID, peer peer.ID) bool {
return false
}
func containsCid(list []*cid.Cid, ci *cid.Cid) bool {
for _, c := range list {
if c.String() == ci.String() {
return true
}
}
return false
}
func minInt(x, y int) int {
if x < y {
return x