Finishing feedback and adding new test
Also updating Makefile to fetch sharness more robustly License: MIT Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
parent
9f74f6f47d
commit
6b8bcdfbc3
2
Makefile
2
Makefile
|
@ -78,7 +78,7 @@ test_problem: deps
|
|||
|
||||
$(sharness):
|
||||
@echo "Downloading sharness"
|
||||
@wget -q -O sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
|
||||
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
|
||||
@cd sharness/lib; tar -zxf sharness.tar.gz; cd ../..
|
||||
@mv sharness/lib/sharness-master sharness/lib/sharness
|
||||
@rm sharness/lib/sharness.tar.gz
|
||||
|
|
|
@ -139,5 +139,4 @@ func (c *Client) handleStreamResponse(resp *http.Response, obj interface{}) ([]a
|
|||
return outputs, fmt.Errorf("unexpected error code: %d", output.Code)
|
||||
}
|
||||
}
|
||||
return outputs, nil
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
"time"
|
||||
|
||||
types "github.com/ipfs/ipfs-cluster/api"
|
||||
importer "github.com/ipfs/ipfs-cluster/ipld-importer"
|
||||
"github.com/ipfs/ipfs-cluster/importer"
|
||||
|
||||
mux "github.com/gorilla/mux"
|
||||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
"github.com/ipfs/ipfs-cluster/consensus/raft"
|
||||
"github.com/ipfs/ipfs-cluster/informer/numpin"
|
||||
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
|
||||
"github.com/ipfs/ipfs-cluster/shard"
|
||||
"github.com/ipfs/ipfs-cluster/sharder"
|
||||
"github.com/ipfs/ipfs-cluster/state/mapstate"
|
||||
"github.com/ipfs/ipfs-cluster/test"
|
||||
|
||||
|
@ -90,7 +90,7 @@ func (ipfs *mockConnector) ConnectSwarms() error { r
|
|||
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
|
||||
func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil }
|
||||
func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil }
|
||||
func (ipfs *mockConnector) BlockPut(bwf api.BlockWithFormat) (string, error) { return "", nil }
|
||||
func (ipfs *mockConnector) BlockPut(bwf api.NodeWithMeta) (string, error) { return "", nil }
|
||||
|
||||
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {
|
||||
clusterCfg, _, _, consensusCfg, trackerCfg, bmonCfg, psmonCfg, _, sharderCfg := testingConfigs()
|
||||
|
@ -115,7 +115,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
|
|||
numpinCfg := &numpin.Config{}
|
||||
numpinCfg.Default()
|
||||
inf, _ := numpin.NewInformer(numpinCfg)
|
||||
sharder, _ := shard.NewSharder(sharderCfg)
|
||||
sharder, _ := sharder.NewSharder(sharderCfg)
|
||||
|
||||
ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second
|
||||
|
||||
|
|
|
@ -186,11 +186,12 @@ requires authorization. implies --https, which you can disable with --force-http
|
|||
Name: "add",
|
||||
Usage: "ipfs-cluster-ctl add <path> ... add a file to ipfs via cluster",
|
||||
Description: `
|
||||
Only works with file paths, no directories. Recurisive adding not yet supported. --shard flag not
|
||||
yet supported. Eventually users would use this endpoint if they want the file to be sharded across the cluster.
|
||||
This is useful in the case several ipfs peers want to ingest the file and combined have enough space
|
||||
to host but no single peer's repo has the capacity for the entire file. No stdin reading yet either, that
|
||||
is also TODO
|
||||
Only works with file paths, no directories. Recurisive adding not yet
|
||||
supported. --shard flag not yet supported. Eventually users would use this
|
||||
endpoint if they want the file to be sharded across the cluster. This is useful
|
||||
in the case several ipfs peers want to ingest the file and combined have enough
|
||||
space to host but no single peer's repo has the capacity for the entire file.
|
||||
No stdin reading yet either, that is also TODO
|
||||
`,
|
||||
Flags: []cli.Flag{
|
||||
cli.BoolFlag{
|
||||
|
@ -211,11 +212,13 @@ is also TODO
|
|||
},
|
||||
cli.StringFlag{
|
||||
Name: "layout, L",
|
||||
Usage: "Dag layout to use for dag generation. Currently 'trickle' is the only option supported",
|
||||
Usage: `Dag layout to use for dag generation. Currently 'trickle' is the only option
|
||||
supported`,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "chunker, s",
|
||||
Usage: "Chunking algorithm to use. Either fixed block size: 'size-<size>', or rabin chunker: 'rabin-<min>-<avg>-<max>'. Default is 'size-262144'",
|
||||
Usage: `Chunking algorithm to use. Either fixed block size: 'size-<size>', or rabin
|
||||
chunker: 'rabin-<min>-<avg>-<max>'. Default is 'size-262144'`,
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "raw-leaves",
|
||||
|
|
|
@ -712,7 +712,7 @@ func TestBlockPut(t *testing.T) {
|
|||
defer ipfs.Shutdown()
|
||||
|
||||
data := []byte(test.TestCid4Data)
|
||||
resp, err := ipfs.BlockPut(api.BlockWithFormat{
|
||||
resp, err := ipfs.BlockPut(api.NodeWithMeta{
|
||||
Data: data,
|
||||
Format: "protobuf",
|
||||
})
|
||||
|
|
|
@ -37,7 +37,7 @@ var nodeDataSet2 = [][]byte{[]byte(`Dag Node A`), []byte(`Dag Node B`), []byte(`
|
|||
// that verify a sequence of dag-nodes is correctly added with the right
|
||||
// metadata.
|
||||
type mockRPC struct {
|
||||
orderedPuts map[int]api.BlockWithFormat
|
||||
orderedPuts map[int]api.NodeWithMeta
|
||||
Host host.Host
|
||||
}
|
||||
|
||||
|
@ -92,7 +92,7 @@ func (mock *mockRPC) Allocate(in api.AllocateInfo, out *[]peer.ID) error {
|
|||
}
|
||||
|
||||
// Record the ordered sequence of BlockPut calls for later validation
|
||||
func (mock *mockRPC) IPFSBlockPut(in api.BlockWithFormat, out *string) error {
|
||||
func (mock *mockRPC) IPFSBlockPut(in api.NodeWithMeta, out *string) error {
|
||||
mock.orderedPuts[len(mock.orderedPuts)] = in
|
||||
return nil
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ func (mock *mockRPC) Pin(in api.PinSerial, out *struct{}) error {
|
|||
// Create a new sharder and register a mock RPC for testing
|
||||
func testNewSharder(t *testing.T) (*Sharder, *mockRPC) {
|
||||
mockRPC := &mockRPC{}
|
||||
mockRPC.orderedPuts = make(map[int]api.BlockWithFormat)
|
||||
mockRPC.orderedPuts = make(map[int]api.NodeWithMeta)
|
||||
client := NewMockRPCClient(t, mockRPC)
|
||||
cfg := &Config{
|
||||
AllocSize: DefaultAllocSize,
|
||||
|
@ -132,7 +132,7 @@ func TestAddAndFinalizeShard(t *testing.T) {
|
|||
// Create 3 ipld protobuf nodes and add to sharding session
|
||||
nodes := make([]*dag.ProtoNode, 3)
|
||||
cids := make([]string, 3)
|
||||
sessionID := "testAddShard"
|
||||
sessionID := ""
|
||||
for i, data := range nodeDataSet1 {
|
||||
nodes[i] = dag.NodeWithData(data)
|
||||
nodes[i].SetPrefix(nil)
|
||||
|
@ -142,7 +142,7 @@ func TestAddAndFinalizeShard(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID)
|
||||
sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ func linksContain(links []*ipld.Link, c string) bool {
|
|||
func verifyNodePuts(t *testing.T,
|
||||
dataSet [][]byte,
|
||||
cids []string,
|
||||
orderedPuts map[int]api.BlockWithFormat,
|
||||
orderedPuts map[int]api.NodeWithMeta,
|
||||
toVerify []int) {
|
||||
if len(cids) != len(toVerify) || len(dataSet) != len(toVerify) {
|
||||
t.Error("Malformed verifyNodePuts arguments")
|
||||
|
@ -229,7 +229,7 @@ func verifyNodePuts(t *testing.T,
|
|||
}
|
||||
}
|
||||
|
||||
func cborDataToNode(t *testing.T, putInfo api.BlockWithFormat) ipld.Node {
|
||||
func cborDataToNode(t *testing.T, putInfo api.NodeWithMeta) ipld.Node {
|
||||
if putInfo.Format != "cbor" {
|
||||
t.Fatalf("Unexpected shard node format %s", putInfo.Format)
|
||||
}
|
||||
|
@ -258,37 +258,35 @@ func TestInterleaveSessions(t *testing.T) {
|
|||
nodes2 := make([]*dag.ProtoNode, 3)
|
||||
cids2 := make([]string, 3)
|
||||
|
||||
sessionID1 := "interleave1"
|
||||
sessionID2 := "interleave2"
|
||||
sessionID1 := ""
|
||||
sessionID2 := ""
|
||||
for i := 0; i < 6; i++ {
|
||||
var nodes []*dag.ProtoNode
|
||||
var cids []string
|
||||
var dataSet [][]byte
|
||||
var sessionID string
|
||||
if i%2 == 0 { // Add to session 1
|
||||
nodes = nodes1
|
||||
cids = cids1
|
||||
dataSet = nodeDataSet1
|
||||
sessionID = sessionID1
|
||||
} else {
|
||||
nodes = nodes2
|
||||
cids = cids2
|
||||
dataSet = nodeDataSet2
|
||||
sessionID = sessionID2
|
||||
}
|
||||
j := i / 2
|
||||
nodes[j] = dag.NodeWithData(dataSet[j])
|
||||
nodes[j].SetPrefix(nil)
|
||||
cids[j] = nodes[j].Cid().String()
|
||||
size, err := nodes[j].Size()
|
||||
if i%2 == 0 { // session 1
|
||||
nodes1[j] = dag.NodeWithData(nodeDataSet1[j])
|
||||
nodes1[j].SetPrefix(nil)
|
||||
cids1[j] = nodes1[j].Cid().String()
|
||||
size, err := nodes1[j].Size()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = sharder.AddNode(size, nodes[j].RawData(), cids[j],
|
||||
sessionID)
|
||||
sessionID1, err = sharder.AddNode(size, nodes1[j].RawData(), cids1[j], sessionID1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
} else { // session 2
|
||||
nodes2[j] = dag.NodeWithData(nodeDataSet2[j])
|
||||
nodes2[j].SetPrefix(nil)
|
||||
cids2[j] = nodes2[j].Cid().String()
|
||||
size, err := nodes2[j].Size()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
sessionID2, err = sharder.AddNode(size, nodes2[j].RawData(), cids2[j], sessionID2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
err := sharder.Finalize(sessionID1)
|
||||
if err != nil {
|
||||
|
@ -366,7 +364,7 @@ func TestManyLinks(t *testing.T) {
|
|||
dataSet := getManyLinksDataSet(t)
|
||||
nodes := make([]*dag.ProtoNode, len(dataSet))
|
||||
cids := make([]string, len(dataSet))
|
||||
sessionID := "testManyLinks"
|
||||
sessionID := ""
|
||||
|
||||
for i, data := range dataSet {
|
||||
nodes[i] = dag.NodeWithData(data)
|
||||
|
@ -376,7 +374,7 @@ func TestManyLinks(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID)
|
||||
sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -443,9 +441,60 @@ func TestManyLinks(t *testing.T) {
|
|||
}
|
||||
|
||||
// Test that by adding in enough nodes multiple shard nodes will be created
|
||||
/*func TestMultipleShards(t *testing.T) {
|
||||
func TestMultipleShards(t *testing.T) {
|
||||
sharder, mockRPC := testNewSharder(t)
|
||||
sharder.allocSize = IPFSChunkSize + IPFSChunkSize/2
|
||||
sharder.allocSize = uint64(300000)
|
||||
nodes := make([]*dag.ProtoNode, 4)
|
||||
cids := make([]string, 4)
|
||||
sessionID := ""
|
||||
|
||||
for i := range nodes {
|
||||
data := repeatData(90000, i)
|
||||
nodes[i] = dag.NodeWithData(data)
|
||||
nodes[i].SetPrefix(nil)
|
||||
cids[i] = nodes[i].Cid().String()
|
||||
size, err := nodes[i].Size()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
err := sharder.Finalize(sessionID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
}*/
|
||||
if len(mockRPC.orderedPuts) != len(nodes)+3 { //data nodes, 2 shard nodes, 1 root
|
||||
t.Errorf("unexpected number of block puts called: %d", len(mockRPC.orderedPuts))
|
||||
}
|
||||
|
||||
// First shard node contains first 3 cids
|
||||
shardNode1 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)-1])
|
||||
links := shardNode1.Links()
|
||||
for _, c := range cids[:len(cids)-1] {
|
||||
if !linksContain(links, c) {
|
||||
t.Errorf("expected cid %s not in shard node", c)
|
||||
}
|
||||
}
|
||||
|
||||
// Second shard node only points to final cid
|
||||
shardNode2 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+1])
|
||||
links = shardNode2.Links()
|
||||
if len(links) != 1 || links[0].Cid.String() != cids[len(nodes)-1] {
|
||||
t.Errorf("unexpected links in second shard node")
|
||||
}
|
||||
}
|
||||
|
||||
// repeatData takes in a byte value and a number of times this value should be
|
||||
// repeated and returns a byte slice of this value repeated
|
||||
func repeatData(byteCount int, digit int) []byte {
|
||||
data := make([]byte, byteCount)
|
||||
for i := range data {
|
||||
data[i] = byte(digit)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user