diff --git a/Makefile b/Makefile index 145f0af2..10b8a14a 100644 --- a/Makefile +++ b/Makefile @@ -78,7 +78,7 @@ test_problem: deps $(sharness): @echo "Downloading sharness" - @wget -q -O sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz + @curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz @cd sharness/lib; tar -zxf sharness.tar.gz; cd ../.. @mv sharness/lib/sharness-master sharness/lib/sharness @rm sharness/lib/sharness.tar.gz diff --git a/api/rest/client/request.go b/api/rest/client/request.go index 43ce3423..28c1b38b 100644 --- a/api/rest/client/request.go +++ b/api/rest/client/request.go @@ -139,5 +139,4 @@ func (c *Client) handleStreamResponse(resp *http.Response, obj interface{}) ([]a return outputs, fmt.Errorf("unexpected error code: %d", output.Code) } } - return outputs, nil } diff --git a/api/rest/restapi.go b/api/rest/restapi.go index ab3093eb..af86d173 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -22,7 +22,7 @@ import ( "time" types "github.com/ipfs/ipfs-cluster/api" - importer "github.com/ipfs/ipfs-cluster/ipld-importer" + "github.com/ipfs/ipfs-cluster/importer" mux "github.com/gorilla/mux" rpc "github.com/hsanjuan/go-libp2p-gorpc" diff --git a/cluster_test.go b/cluster_test.go index ae5cb1b6..074ea446 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -13,7 +13,7 @@ import ( "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" - "github.com/ipfs/ipfs-cluster/shard" + "github.com/ipfs/ipfs-cluster/sharder" "github.com/ipfs/ipfs-cluster/state/mapstate" "github.com/ipfs/ipfs-cluster/test" @@ -86,11 +86,11 @@ func (ipfs *mockConnector) SwarmPeers() (api.SwarmPeers, error) { return []peer.ID{test.TestPeerID4, test.TestPeerID5}, nil } -func (ipfs *mockConnector) ConnectSwarms() error { return nil } -func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil } -func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil } -func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil } -func (ipfs *mockConnector) BlockPut(bwf api.BlockWithFormat) (string, error) { return "", nil } +func (ipfs *mockConnector) ConnectSwarms() error { return nil } +func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil } +func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil } +func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil } +func (ipfs *mockConnector) BlockPut(bwf api.NodeWithMeta) (string, error) { return "", nil } func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) { clusterCfg, _, _, consensusCfg, trackerCfg, bmonCfg, psmonCfg, _, sharderCfg := testingConfigs() @@ -115,7 +115,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate numpinCfg := &numpin.Config{} numpinCfg.Default() inf, _ := numpin.NewInformer(numpinCfg) - sharder, _ := shard.NewSharder(sharderCfg) + sharder, _ := sharder.NewSharder(sharderCfg) ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second diff --git a/ipld-importer/add.go b/importer/add.go similarity index 100% rename from ipld-importer/add.go rename to importer/add.go diff --git a/ipld-importer/import.go b/importer/import.go similarity index 100% rename from ipld-importer/import.go rename to importer/import.go diff --git a/ipld-importer/import_test.go b/importer/import_test.go similarity index 100% rename from ipld-importer/import_test.go rename to importer/import_test.go diff --git a/ipld-importer/outgoing-dagservice.go b/importer/outgoing-dagservice.go similarity index 100% rename from ipld-importer/outgoing-dagservice.go rename to importer/outgoing-dagservice.go diff --git a/ipld-importer/printing-dagservice.go b/importer/printing-dagservice.go similarity index 100% rename from ipld-importer/printing-dagservice.go rename to importer/printing-dagservice.go diff --git a/ipld-importer/testingData/A/alpha/contents-overview.txt b/importer/testingData/A/alpha/contents-overview.txt similarity index 100% rename from ipld-importer/testingData/A/alpha/contents-overview.txt rename to importer/testingData/A/alpha/contents-overview.txt diff --git a/ipld-importer/testingData/A/beta/lorem-ipsum-explained.io b/importer/testingData/A/beta/lorem-ipsum-explained.io similarity index 100% rename from ipld-importer/testingData/A/beta/lorem-ipsum-explained.io rename to importer/testingData/A/beta/lorem-ipsum-explained.io diff --git a/ipld-importer/testingData/A/file.txt b/importer/testingData/A/file.txt similarity index 100% rename from ipld-importer/testingData/A/file.txt rename to importer/testingData/A/file.txt diff --git a/ipld-importer/testingData/A/gamma/don-quixote.txt b/importer/testingData/A/gamma/don-quixote.txt similarity index 100% rename from ipld-importer/testingData/A/gamma/don-quixote.txt rename to importer/testingData/A/gamma/don-quixote.txt diff --git a/ipld-importer/testingData/B/emerson-essays.txt b/importer/testingData/B/emerson-essays.txt similarity index 100% rename from ipld-importer/testingData/B/emerson-essays.txt rename to importer/testingData/B/emerson-essays.txt diff --git a/ipld-importer/testingData/B/poor-richard.txt b/importer/testingData/B/poor-richard.txt similarity index 100% rename from ipld-importer/testingData/B/poor-richard.txt rename to importer/testingData/B/poor-richard.txt diff --git a/ipfs-cluster-ctl/main.go b/ipfs-cluster-ctl/main.go index 5ecd4b90..d3f6820a 100644 --- a/ipfs-cluster-ctl/main.go +++ b/ipfs-cluster-ctl/main.go @@ -186,11 +186,12 @@ requires authorization. implies --https, which you can disable with --force-http Name: "add", Usage: "ipfs-cluster-ctl add ... add a file to ipfs via cluster", Description: ` -Only works with file paths, no directories. Recurisive adding not yet supported. --shard flag not -yet supported. Eventually users would use this endpoint if they want the file to be sharded across the cluster. -This is useful in the case several ipfs peers want to ingest the file and combined have enough space -to host but no single peer's repo has the capacity for the entire file. No stdin reading yet either, that -is also TODO +Only works with file paths, no directories. Recurisive adding not yet +supported. --shard flag not yet supported. Eventually users would use this +endpoint if they want the file to be sharded across the cluster. This is useful +in the case several ipfs peers want to ingest the file and combined have enough +space to host but no single peer's repo has the capacity for the entire file. +No stdin reading yet either, that is also TODO `, Flags: []cli.Flag{ cli.BoolFlag{ @@ -210,12 +211,14 @@ is also TODO Usage: "write no output", }, cli.StringFlag{ - Name: "layout, L", - Usage: "Dag layout to use for dag generation. Currently 'trickle' is the only option supported", + Name: "layout, L", + Usage: `Dag layout to use for dag generation. Currently 'trickle' is the only option +supported`, }, cli.StringFlag{ - Name: "chunker, s", - Usage: "Chunking algorithm to use. Either fixed block size: 'size-', or rabin chunker: 'rabin---'. Default is 'size-262144'", + Name: "chunker, s", + Usage: `Chunking algorithm to use. Either fixed block size: 'size-', or rabin +chunker: 'rabin---'. Default is 'size-262144'`, }, cli.BoolFlag{ Name: "raw-leaves", diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index 52e00e3c..d32e829e 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -712,7 +712,7 @@ func TestBlockPut(t *testing.T) { defer ipfs.Shutdown() data := []byte(test.TestCid4Data) - resp, err := ipfs.BlockPut(api.BlockWithFormat{ + resp, err := ipfs.BlockPut(api.NodeWithMeta{ Data: data, Format: "protobuf", }) diff --git a/sharder/sharder_test.go b/sharder/sharder_test.go index e7e3981e..3a0e603a 100644 --- a/sharder/sharder_test.go +++ b/sharder/sharder_test.go @@ -37,7 +37,7 @@ var nodeDataSet2 = [][]byte{[]byte(`Dag Node A`), []byte(`Dag Node B`), []byte(` // that verify a sequence of dag-nodes is correctly added with the right // metadata. type mockRPC struct { - orderedPuts map[int]api.BlockWithFormat + orderedPuts map[int]api.NodeWithMeta Host host.Host } @@ -92,7 +92,7 @@ func (mock *mockRPC) Allocate(in api.AllocateInfo, out *[]peer.ID) error { } // Record the ordered sequence of BlockPut calls for later validation -func (mock *mockRPC) IPFSBlockPut(in api.BlockWithFormat, out *string) error { +func (mock *mockRPC) IPFSBlockPut(in api.NodeWithMeta, out *string) error { mock.orderedPuts[len(mock.orderedPuts)] = in return nil } @@ -112,7 +112,7 @@ func (mock *mockRPC) Pin(in api.PinSerial, out *struct{}) error { // Create a new sharder and register a mock RPC for testing func testNewSharder(t *testing.T) (*Sharder, *mockRPC) { mockRPC := &mockRPC{} - mockRPC.orderedPuts = make(map[int]api.BlockWithFormat) + mockRPC.orderedPuts = make(map[int]api.NodeWithMeta) client := NewMockRPCClient(t, mockRPC) cfg := &Config{ AllocSize: DefaultAllocSize, @@ -132,7 +132,7 @@ func TestAddAndFinalizeShard(t *testing.T) { // Create 3 ipld protobuf nodes and add to sharding session nodes := make([]*dag.ProtoNode, 3) cids := make([]string, 3) - sessionID := "testAddShard" + sessionID := "" for i, data := range nodeDataSet1 { nodes[i] = dag.NodeWithData(data) nodes[i].SetPrefix(nil) @@ -142,7 +142,7 @@ func TestAddAndFinalizeShard(t *testing.T) { if err != nil { t.Fatal(err) } - err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID) + sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID) if err != nil { t.Fatal(err) } @@ -196,7 +196,7 @@ func linksContain(links []*ipld.Link, c string) bool { func verifyNodePuts(t *testing.T, dataSet [][]byte, cids []string, - orderedPuts map[int]api.BlockWithFormat, + orderedPuts map[int]api.NodeWithMeta, toVerify []int) { if len(cids) != len(toVerify) || len(dataSet) != len(toVerify) { t.Error("Malformed verifyNodePuts arguments") @@ -229,7 +229,7 @@ func verifyNodePuts(t *testing.T, } } -func cborDataToNode(t *testing.T, putInfo api.BlockWithFormat) ipld.Node { +func cborDataToNode(t *testing.T, putInfo api.NodeWithMeta) ipld.Node { if putInfo.Format != "cbor" { t.Fatalf("Unexpected shard node format %s", putInfo.Format) } @@ -258,36 +258,34 @@ func TestInterleaveSessions(t *testing.T) { nodes2 := make([]*dag.ProtoNode, 3) cids2 := make([]string, 3) - sessionID1 := "interleave1" - sessionID2 := "interleave2" + sessionID1 := "" + sessionID2 := "" for i := 0; i < 6; i++ { - var nodes []*dag.ProtoNode - var cids []string - var dataSet [][]byte - var sessionID string - if i%2 == 0 { // Add to session 1 - nodes = nodes1 - cids = cids1 - dataSet = nodeDataSet1 - sessionID = sessionID1 - } else { - nodes = nodes2 - cids = cids2 - dataSet = nodeDataSet2 - sessionID = sessionID2 - } j := i / 2 - nodes[j] = dag.NodeWithData(dataSet[j]) - nodes[j].SetPrefix(nil) - cids[j] = nodes[j].Cid().String() - size, err := nodes[j].Size() - if err != nil { - t.Fatal(err) - } - err = sharder.AddNode(size, nodes[j].RawData(), cids[j], - sessionID) - if err != nil { - t.Fatal(err) + if i%2 == 0 { // session 1 + nodes1[j] = dag.NodeWithData(nodeDataSet1[j]) + nodes1[j].SetPrefix(nil) + cids1[j] = nodes1[j].Cid().String() + size, err := nodes1[j].Size() + if err != nil { + t.Fatal(err) + } + sessionID1, err = sharder.AddNode(size, nodes1[j].RawData(), cids1[j], sessionID1) + if err != nil { + t.Fatal(err) + } + } else { // session 2 + nodes2[j] = dag.NodeWithData(nodeDataSet2[j]) + nodes2[j].SetPrefix(nil) + cids2[j] = nodes2[j].Cid().String() + size, err := nodes2[j].Size() + if err != nil { + t.Fatal(err) + } + sessionID2, err = sharder.AddNode(size, nodes2[j].RawData(), cids2[j], sessionID2) + if err != nil { + t.Fatal(err) + } } } err := sharder.Finalize(sessionID1) @@ -366,7 +364,7 @@ func TestManyLinks(t *testing.T) { dataSet := getManyLinksDataSet(t) nodes := make([]*dag.ProtoNode, len(dataSet)) cids := make([]string, len(dataSet)) - sessionID := "testManyLinks" + sessionID := "" for i, data := range dataSet { nodes[i] = dag.NodeWithData(data) @@ -376,7 +374,7 @@ func TestManyLinks(t *testing.T) { if err != nil { t.Fatal(err) } - err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID) + sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID) if err != nil { t.Fatal(err) } @@ -443,9 +441,60 @@ func TestManyLinks(t *testing.T) { } // Test that by adding in enough nodes multiple shard nodes will be created -/*func TestMultipleShards(t *testing.T) { +func TestMultipleShards(t *testing.T) { sharder, mockRPC := testNewSharder(t) - sharder.allocSize = IPFSChunkSize + IPFSChunkSize / 2 + sharder.allocSize = IPFSChunkSize + IPFSChunkSize/2 + sharder.allocSize = uint64(300000) + nodes := make([]*dag.ProtoNode, 4) + cids := make([]string, 4) + sessionID := "" + for i := range nodes { + data := repeatData(90000, i) + nodes[i] = dag.NodeWithData(data) + nodes[i].SetPrefix(nil) + cids[i] = nodes[i].Cid().String() + size, err := nodes[i].Size() + if err != nil { + t.Fatal(err) + } + sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID) + if err != nil { + t.Fatal(err) + } + } + err := sharder.Finalize(sessionID) + if err != nil { + t.Fatal(err) + } -}*/ + if len(mockRPC.orderedPuts) != len(nodes)+3 { //data nodes, 2 shard nodes, 1 root + t.Errorf("unexpected number of block puts called: %d", len(mockRPC.orderedPuts)) + } + + // First shard node contains first 3 cids + shardNode1 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)-1]) + links := shardNode1.Links() + for _, c := range cids[:len(cids)-1] { + if !linksContain(links, c) { + t.Errorf("expected cid %s not in shard node", c) + } + } + + // Second shard node only points to final cid + shardNode2 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+1]) + links = shardNode2.Links() + if len(links) != 1 || links[0].Cid.String() != cids[len(nodes)-1] { + t.Errorf("unexpected links in second shard node") + } +} + +// repeatData takes in a byte value and a number of times this value should be +// repeated and returns a byte slice of this value repeated +func repeatData(byteCount int, digit int) []byte { + data := make([]byte, byteCount) + for i := range data { + data[i] = byte(digit) + } + return data +}