From edb38b2830757534dec20ea97eed81354b4b29ac Mon Sep 17 00:00:00 2001 From: Wyatt Daviau Date: Mon, 26 Feb 2018 20:27:53 -0500 Subject: [PATCH] fixing make check errors License: MIT Signed-off-by: Wyatt Daviau --- api/types.go | 2 + cluster_test.go | 17 ++++---- config_test.go | 17 +++++--- ipfscluster_test.go | 22 +++++++---- ipfsconn/ipfshttp/ipfshttp_test.go | 5 ++- ipld-importer/import_test.go | 2 +- shard/clusterdag.go | 23 ++++++----- shard/sharder.go | 8 ++-- shard/sharder_test.go | 62 ++++++++++++++++++------------ 9 files changed, 96 insertions(+), 62 deletions(-) diff --git a/api/types.go b/api/types.go index 84de301e..f9afd6d5 100644 --- a/api/types.go +++ b/api/types.go @@ -643,6 +643,8 @@ type AllocateInfo struct { Candidates map[peer.ID]Metric } +// GetCid decodes the cid string within AllocateInfo. If the cid string is "" +// then GetCid returns nil func (aI *AllocateInfo) GetCid() *cid.Cid { if aI.Cid == "" { return nil diff --git a/cluster_test.go b/cluster_test.go index 2f6cb292..ae5cb1b6 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/informer/numpin" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" + "github.com/ipfs/ipfs-cluster/shard" "github.com/ipfs/ipfs-cluster/state/mapstate" "github.com/ipfs/ipfs-cluster/test" @@ -85,14 +86,14 @@ func (ipfs *mockConnector) SwarmPeers() (api.SwarmPeers, error) { return []peer.ID{test.TestPeerID4, test.TestPeerID5}, nil } -func (ipfs *mockConnector) ConnectSwarms() error { return nil } -func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil } -func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil } -func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil } -func (ipfs *mockConnector) BlockPut(data []byte) (string, error) { return "", nil } +func (ipfs *mockConnector) ConnectSwarms() error { return nil } +func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil } +func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil } +func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil } +func (ipfs *mockConnector) BlockPut(bwf api.BlockWithFormat) (string, error) { return "", nil } func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) { - clusterCfg, _, _, consensusCfg, trackerCfg, bmonCfg, psmonCfg, _ := testingConfigs() + clusterCfg, _, _, consensusCfg, trackerCfg, bmonCfg, psmonCfg, _, sharderCfg := testingConfigs() host, err := NewClusterHost(context.Background(), clusterCfg) if err != nil { @@ -114,6 +115,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate numpinCfg := &numpin.Config{} numpinCfg.Default() inf, _ := numpin.NewInformer(numpinCfg) + sharder, _ := shard.NewSharder(sharderCfg) ReadyTimeout = consensusCfg.WaitForLeaderTimeout + 1*time.Second @@ -127,7 +129,8 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate tracker, mon, alloc, - inf) + inf, + sharder) if err != nil { t.Fatal("cannot create cluster:", err) } diff --git a/config_test.go b/config_test.go index e80e6609..4ecbd94e 100644 --- a/config_test.go +++ b/config_test.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" + "github.com/ipfs/ipfs-cluster/shard" ) var testingClusterSecret, _ = DecodeClusterSecret("2588b80d5cb05374fa142aed6cbb047d1f4ef8ef15e37eba68c65b9d30df67ed") @@ -81,8 +82,12 @@ var testingDiskInfCfg = []byte(`{ "metric_type": "freespace" }`) -func testingConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *pubsubmon.Config, *disk.Config) { - clusterCfg, apiCfg, ipfsCfg, consensusCfg, trackerCfg, basicmonCfg, pubsubmonCfg, diskInfCfg := testingEmptyConfigs() +var testingSharderCfg = []byte(`{ + "alloc_size": 5000000 +}`) + +func testingConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *pubsubmon.Config, *disk.Config, *shard.Config) { + clusterCfg, apiCfg, ipfsCfg, consensusCfg, trackerCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, sharderCfg := testingEmptyConfigs() clusterCfg.LoadJSON(testingClusterCfg) apiCfg.LoadJSON(testingAPICfg) ipfsCfg.LoadJSON(testingIpfsCfg) @@ -91,11 +96,12 @@ func testingConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *m basicmonCfg.LoadJSON(testingMonCfg) pubsubmonCfg.LoadJSON(testingMonCfg) diskInfCfg.LoadJSON(testingDiskInfCfg) + sharderCfg.LoadJSON(testingSharderCfg) - return clusterCfg, apiCfg, ipfsCfg, consensusCfg, trackerCfg, basicmonCfg, pubsubmonCfg, diskInfCfg + return clusterCfg, apiCfg, ipfsCfg, consensusCfg, trackerCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, sharderCfg } -func testingEmptyConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *pubsubmon.Config, *disk.Config) { +func testingEmptyConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Config, *maptracker.Config, *basic.Config, *pubsubmon.Config, *disk.Config, *shard.Config) { clusterCfg := &Config{} apiCfg := &rest.Config{} ipfshttpCfg := &ipfshttp.Config{} @@ -104,7 +110,8 @@ func testingEmptyConfigs() (*Config, *rest.Config, *ipfshttp.Config, *raft.Confi basicmonCfg := &basic.Config{} pubsubmonCfg := &pubsubmon.Config{} diskInfCfg := &disk.Config{} - return clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, basicmonCfg, pubsubmonCfg, diskInfCfg + sharderCfg := &shard.Config{} + return clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, basicmonCfg, pubsubmonCfg, diskInfCfg, sharderCfg } // func TestConfigDefault(t *testing.T) { diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 6b46cbf6..90faf934 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -21,6 +21,7 @@ import ( "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" + "github.com/ipfs/ipfs-cluster/shard" "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state/mapstate" "github.com/ipfs/ipfs-cluster/test" @@ -83,7 +84,7 @@ func randomBytes() []byte { return bs } -func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (host.Host, *Config, *raft.Consensus, API, IPFSConnector, state.State, PinTracker, PeerMonitor, PinAllocator, Informer, *test.IpfsMock) { +func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) (host.Host, *Config, *raft.Consensus, API, IPFSConnector, state.State, PinTracker, PeerMonitor, PinAllocator, Informer, Sharder, *test.IpfsMock) { mock := test.NewIpfsMock() // //clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i)) @@ -102,7 +103,7 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) ( checkErr(t, err) peername := fmt.Sprintf("peer_%d", i) - clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, bmonCfg, psmonCfg, diskInfCfg := testingConfigs() + clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, bmonCfg, psmonCfg, diskInfCfg, sharderCfg := testingConfigs() clusterCfg.ID = pid clusterCfg.Peername = peername @@ -137,7 +138,10 @@ func createComponents(t *testing.T, i int, clusterSecret []byte, staging bool) ( raftCon, err := raft.NewConsensus(host, consensusCfg, state, staging) checkErr(t, err) - return host, clusterCfg, raftCon, api, ipfs, state, tracker, mon, alloc, inf, mock + sharder, err := shard.NewSharder(sharderCfg) + checkErr(t, err) + + return host, clusterCfg, raftCon, api, ipfs, state, tracker, mon, alloc, inf, sharder, mock } func makeMonitor(t *testing.T, h host.Host, bmonCfg *basic.Config, psmonCfg *pubsubmon.Config) PeerMonitor { @@ -162,8 +166,8 @@ func createCluster(t *testing.T, host host.Host, clusterCfg *Config, raftCons *r } func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster, *test.IpfsMock) { - host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth, clusterSecret, false) - cl := createCluster(t, host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf) + host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, sharder, mock := createComponents(t, nth, clusterSecret, false) + cl := createCluster(t, host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, sharder) <-cl.Ready() return cl, mock } @@ -179,6 +183,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { mons := make([]PeerMonitor, nClusters, nClusters) allocs := make([]PinAllocator, nClusters, nClusters) infs := make([]Informer, nClusters, nClusters) + sharders := make([]Sharder, nClusters, nClusters) ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters) hosts := make([]host.Host, nClusters, nClusters) @@ -189,7 +194,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { for i := 0; i < nClusters; i++ { // staging = true for all except first (i==0) - host, clusterCfg, raftCon, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, i, testingClusterSecret, i != 0) + host, clusterCfg, raftCon, api, ipfs, state, tracker, mon, alloc, inf, sharder, mock := createComponents(t, i, testingClusterSecret, i != 0) hosts[i] = host cfgs[i] = clusterCfg raftCons[i] = raftCon @@ -200,6 +205,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { mons[i] = mon allocs[i] = alloc infs[i] = inf + sharders[i] = sharder ipfsMocks[i] = mock } @@ -218,13 +224,13 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { } // Start first node - clusters[0] = createCluster(t, hosts[0], cfgs[0], raftCons[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0]) + clusters[0] = createCluster(t, hosts[0], cfgs[0], raftCons[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0], sharders[0]) <-clusters[0].Ready() bootstrapAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", clusters[0].host.Addrs()[0], clusters[0].id.Pretty())) // Start the rest and join for i := 1; i < nClusters; i++ { - clusters[i] = createCluster(t, hosts[i], cfgs[i], raftCons[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i]) + clusters[i] = createCluster(t, hosts[i], cfgs[i], raftCons[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i], sharders[i]) err := clusters[i].Join(bootstrapAddr) if err != nil { logger.Error(err) diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index 75e42679..52e00e3c 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -712,7 +712,10 @@ func TestBlockPut(t *testing.T) { defer ipfs.Shutdown() data := []byte(test.TestCid4Data) - resp, err := ipfs.BlockPut(data) + resp, err := ipfs.BlockPut(api.BlockWithFormat{ + Data: data, + Format: "protobuf", + }) if err != nil { t.Fatal(err) } diff --git a/ipld-importer/import_test.go b/ipld-importer/import_test.go index 0bc560b5..9129e1ea 100644 --- a/ipld-importer/import_test.go +++ b/ipld-importer/import_test.go @@ -64,7 +64,7 @@ func TestToChannel(t *testing.T) { t.Fatal(err) } - outChan, err := ToChannel(file, context.Background()) + outChan, err := ToChannel(context.Background(), file) if err != nil { t.Fatal(err) } diff --git a/shard/clusterdag.go b/shard/clusterdag.go index 8c610c69..bdaf90f1 100644 --- a/shard/clusterdag.go +++ b/shard/clusterdag.go @@ -22,8 +22,9 @@ import ( mh "github.com/multiformats/go-multihash" ) +// MaxLinks is the max number of links that, when serialized fit into a block const MaxLinks = 5984 -const FixedPerLink = 40 +const fixedPerLink = 40 // makeDAG parses a shardObj tracking links in a shardNode to a shardNode that // carries links to all data nodes that this shard tracks. In general the @@ -85,7 +86,7 @@ func byteCount(obj shardObj) uint64 { // 35 bytes for each cid count := 1 for key := range obj { - count += FixedPerLink + count += fixedPerLink count += len(key) } return uint64(count) + indirectCount(len(obj)) @@ -113,25 +114,23 @@ func deltaByteCount(obj shardObj) uint64 { linkNum := len(obj) q1 := linkNum / MaxLinks q2 := (linkNum + 1) / MaxLinks - count := uint64(FixedPerLink) + count := uint64(fixedPerLink) count += uint64(len(fmt.Sprintf("%d", len(obj)))) - // no new nodes created by adding a link - if q1 == q2 { - return count - } else { + // new shard nodes created by adding link + if q1 != q2 { // first new leaf node created, i.e. indirect created too if q2 == 1 { - count += 1 // map overhead of indirect node - count += 1 + FixedPerLink // FixedPerLink + len("0") + count++ // map overhead of indirect node + count += 1 + fixedPerLink // fixedPerLink + len("0") } // added to indirect node - count += FixedPerLink + count += fixedPerLink count += uint64(len(fmt.Sprintf("%d", q2))) // overhead of new leaf node - count += 1 - return count + count++ } + return count } diff --git a/shard/sharder.go b/shard/sharder.go index 0712bed9..422eee71 100644 --- a/shard/sharder.go +++ b/shard/sharder.go @@ -121,9 +121,9 @@ func (s *Sharder) getAssignment() (peer.ID, uint64, error) { // between this function and allocate.go functions. Not too bad // for now though. allocInfo := api.AllocateInfo{ - "", - nil, - candidates, + Cid: "", + Current: nil, + Candidates: candidates, } allocs := make([]peer.ID, 0) @@ -237,7 +237,7 @@ func (s *Sharder) Finalize(id string) error { s.currentID = id session, ok := s.idToSession[id] if !ok { - return errors.New("Cannot finalize untracked id") + return errors.New("cannot finalize untracked id") } // call flush if err := s.flush(); err != nil { diff --git a/shard/sharder_test.go b/shard/sharder_test.go index 4ddf5dd2..45350550 100644 --- a/shard/sharder_test.go +++ b/shard/sharder_test.go @@ -70,14 +70,14 @@ func makeTestingHost() host.Host { ps.AddPubKey(pid, pub) ps.AddPrivKey(pid, priv) ps.AddAddr(pid, maddr, peerstore.PermanentAddrTTL) - mock_network, _ := swarm.NewNetwork(context.Background(), + mockNetwork, _ := swarm.NewNetwork(context.Background(), []ma.Multiaddr{maddr}, pid, ps, nil, ) - return basichost.New(mock_network) + return basichost.New(mockNetwork) } // GetInformerMetrics does nothing as mock allocator does not check metrics @@ -142,7 +142,7 @@ func TestAddAndFinalizeShard(t *testing.T) { if err != nil { t.Fatal(err) } - err = sharder.AddNode(size, nodes[i].RawData(), nodes[i].Cid().String(), sessionID) + err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID) if err != nil { t.Fatal(err) } @@ -160,23 +160,16 @@ func TestAddAndFinalizeShard(t *testing.T) { shardNode := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)]) // Traverse shard node to verify all expected links are there - for i, link := range shardNode.Links() { - // TODO remove dependence on link order, and make use of the - // link number info that exists somewhere within the cbor object - // but apparently not the ipld links (is this a bug in ipld cbor?) - /*i, err := strconv.Atoi(link.Name) - if err != nil || i >= 3 { - t.Errorf("Unexpected link name :%s:", link.Name) - continue - }*/ - if link.Cid.String() != cids[i] { - t.Errorf("Link %d should point to %s. Instead points to %s", i, cids[i], link.Cid.String()) + links := shardNode.Links() + for _, c := range cids { + if !linksContain(links, c) { + t.Errorf("expected cid %s not in shard node", c) } } rootNode := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+1]) // Verify that clusterDAG root points to shard node - links := rootNode.Links() + links = rootNode.Links() if len(links) != 1 { t.Fatalf("Expected 1 link in root got %d", len(links)) } @@ -186,6 +179,16 @@ func TestAddAndFinalizeShard(t *testing.T) { } } +// helper function determining whether a cid is referenced in a slice of links +func linksContain(links []*ipld.Link, c string) bool { + for _, link := range links { + if link.Cid.String() == c { + return true + } + } + return false +} + // verifyNodePuts takes in a slice of byte slices containing the underlying data // of added nodes, an ordered slice of the cids of these nodes, a map between // IPFSBlockPut call order and arguments, and a slice determining which @@ -304,11 +307,13 @@ func TestInterleaveSessions(t *testing.T) { // verify clusterDAG for session 1 shardNode1 := cborDataToNode(t, mockRPC.orderedPuts[6]) - for i, link := range shardNode1.Links() { - if link.Cid.String() != cids1[i] { - t.Errorf("Link %d should point to %s. Instead points to %s", i, cids1[i], link.Cid.String()) + links1 := shardNode1.Links() + for _, c := range cids1 { + if !linksContain(links1, c) { + t.Errorf("expected cid %s not in links of shard node of session 1", c) } } + rootNode1 := cborDataToNode(t, mockRPC.orderedPuts[7]) links := rootNode1.Links() if len(links) != 1 { @@ -321,11 +326,14 @@ func TestInterleaveSessions(t *testing.T) { // verify clusterDAG for session 2 shardNode2 := cborDataToNode(t, mockRPC.orderedPuts[8]) - for i, link := range shardNode2.Links() { - if link.Cid.String() != cids2[i] { - t.Errorf("Link %d should point to %s. Instead points to %s", i, cids2[i], link.Cid.String()) + // Traverse shard node to verify all expected links are there + links2 := shardNode2.Links() + for _, c := range cids2 { + if !linksContain(links2, c) { + t.Errorf("expected cid %s not in links of shard node of session 2", c) } } + rootNode2 := cborDataToNode(t, mockRPC.orderedPuts[9]) links = rootNode2.Links() if len(links) != 1 { @@ -415,9 +423,9 @@ func TestManyLinks(t *testing.T) { if len(links) != 3 { t.Fatalf("Expected 3 links in indirect got %d", len(links)) } - if links[0].Cid.String() != shardNodeLeaf1.Cid().String() || - links[1].Cid.String() != shardNodeLeaf2.Cid().String() || - links[2].Cid.String() != shardNodeLeaf3.Cid().String() { + if !linksContain(links, shardNodeLeaf1.Cid().String()) || + !linksContain(links, shardNodeLeaf2.Cid().String()) || + !linksContain(links, shardNodeLeaf3.Cid().String()) { t.Errorf("Unexpected shard leaf nodes in shard root node") } @@ -435,3 +443,9 @@ func TestManyLinks(t *testing.T) { } // Test that by adding in enough nodes multiple shard nodes will be created +/*func TestMultipleShards(t *testing.T) { + sharder, mockRPC := testNewSharder(t) + sharder.allocSize = IPFSChunkSize + IPFSChunkSize / 2 + + +}*/