From 111cc29fc66ff79ef278f3626aa85bbb235c9f5b Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 13 Aug 2019 16:03:55 +0200 Subject: [PATCH] Add tests: fix tests for adder BlockPutHelper --- add_test.go | 97 +++++++++++++++++++---------------------------------- 1 file changed, 35 insertions(+), 62 deletions(-) diff --git a/add_test.go b/add_test.go index 007f732a..4335414a 100644 --- a/add_test.go +++ b/add_test.go @@ -9,12 +9,11 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" + "github.com/ipfs/ipfs-cluster/adder" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" peer "github.com/libp2p/go-libp2p-core/peer" - rpc "github.com/libp2p/go-libp2p-gorpc" ) func TestAdd(t *testing.T) { @@ -109,7 +108,6 @@ func TestAddPeerDown(t *testing.T) { } func TestAddOnePeerFails(t *testing.T) { - ctx := context.Background() clusters, mock := createClusters(t) defer shutdownClusters(t, clusters, mock) sth := test.NewShardingTestHelper() @@ -121,36 +119,31 @@ func TestAddOnePeerFails(t *testing.T) { params := api.DefaultAddParams() params.Shard = false params.Name = "testlocal" - lg, closer := sth.GetRandFileReader(t, 50000) // 50 MB + lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB defer closer.Close() mr := files.NewMultiFileReader(lg, true) - r := multipart.NewReader(mr, mr.Boundary()) - var err error var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - _, err = clusters[0].AddFile(r, params) + _, err := clusters[0].AddFile(r, params) if err != nil { t.Fatal(err) } }() - err = clusters[1].Shutdown(ctx) - if err != nil { - t.Fatal(err) - } - + // Shutdown 1 cluster (the last). Things should keep working. + // Important that we shut down the hosts, otherwise the RPC + // Servers keep working along with BlockPuts. + time.Sleep(100 * time.Millisecond) + shutdownClusters(t, clusters[nClusters-1:], mock[nClusters-1:]) wg.Wait() }) } -// Kishan: Uncomment after https://github.com/ipfs/ipfs-cluster/issues/761 -// is resolved. This test would pass, but not for the reason we want it to. -// Add fails waiting for the leader. func TestAddAllPeersFail(t *testing.T) { ctx := context.Background() clusters, mock := createClusters(t) @@ -161,69 +154,49 @@ func TestAddAllPeersFail(t *testing.T) { waitForLeaderAndMetrics(t, clusters) t.Run("local", func(t *testing.T) { + // Prevent added content to be allocated to cluster 0 + // as it is already going to have something. + _, err := clusters[0].Pin(ctx, test.Cid1, api.PinOptions{ + ReplicationFactorMin: 1, + ReplicationFactorMax: 1, + UserAllocations: []peer.ID{clusters[0].host.ID()}, + }) + if err != nil { + t.Fatal(err) + } + + ttlDelay() + params := api.DefaultAddParams() params.Shard = false params.Name = "testlocal" - lg, closer := sth.GetRandFileReader(t, 50000) // 50 MB + // Allocate to every peer except 0 (which already has a pin) + params.PinOptions.ReplicationFactorMax = nClusters - 1 + params.PinOptions.ReplicationFactorMin = nClusters - 1 + + lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB defer closer.Close() - mr := files.NewMultiFileReader(lg, true) - r := multipart.NewReader(mr, mr.Boundary()) - params.PinOptions.ReplicationFactorMax = 2 - params.PinOptions.ReplicationFactorMin = 2 - clusters[0].allocator = &mockPinAllocator{ - peers: []peer.ID{clusters[1].id, clusters[2].id}, - } - var err error // var cid cid.Cid var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - _, err = clusters[0].AddFile(r, params) - if err == nil { - t.Fatalf("expected error") + _, err := clusters[0].AddFile(r, params) + if err != adder.ErrBlockAdder { + t.Fatal("expected ErrBlockAdder. Got: ", err) } }() - for i := 1; i < 3; i++ { - err = clusters[i].Shutdown(ctx) - if err != nil { - t.Fatal(err) - } - } + time.Sleep(100 * time.Millisecond) + // Shutdown all clusters except 0 to see the right error. + // Important that we shut down the hosts, otherwise + // the RPC Servers keep working along with BlockPuts. + // Note that this kills raft. + shutdownClusters(t, clusters[1:], mock[1:]) wg.Wait() - - // pinDelay() - - // pini, err := clusters[0].Status(ctx, cid) - // if err != nil { - // t.Error(err) - // } - // fmt.Println(pini.String()) - - // pin, err := clusters[0].PinGet(ctx, cid) - // if err != nil { - // t.Fatal(err) - // } - - // fmt.Println(pin.Allocations) }) } - -type mockPinAllocator struct { - peers []peer.ID -} - -// SetClient does nothing in this allocator -func (alloc mockPinAllocator) SetClient(c *rpc.Client) {} - -// Shutdown does nothing in this allocator -func (alloc mockPinAllocator) Shutdown(_ context.Context) error { return nil } - -func (alloc mockPinAllocator) Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]*api.Metric) ([]peer.ID, error) { - return alloc.peers, nil -}