Add tests: fix tests for adder BlockPutHelper

This commit is contained in:
Hector Sanjuan 2019-08-13 16:03:55 +02:00
parent 5c2af68459
commit 111cc29fc6

View File

@ -9,12 +9,11 @@ import (
"testing" "testing"
"time" "time"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
) )
func TestAdd(t *testing.T) { func TestAdd(t *testing.T) {
@ -109,7 +108,6 @@ func TestAddPeerDown(t *testing.T) {
} }
func TestAddOnePeerFails(t *testing.T) { func TestAddOnePeerFails(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t) clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock) defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper() sth := test.NewShardingTestHelper()
@ -121,36 +119,31 @@ func TestAddOnePeerFails(t *testing.T) {
params := api.DefaultAddParams() params := api.DefaultAddParams()
params.Shard = false params.Shard = false
params.Name = "testlocal" params.Name = "testlocal"
lg, closer := sth.GetRandFileReader(t, 50000) // 50 MB lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB
defer closer.Close() defer closer.Close()
mr := files.NewMultiFileReader(lg, true) mr := files.NewMultiFileReader(lg, true)
r := multipart.NewReader(mr, mr.Boundary()) r := multipart.NewReader(mr, mr.Boundary())
var err error
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
_, err = clusters[0].AddFile(r, params) _, err := clusters[0].AddFile(r, params)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
}() }()
err = clusters[1].Shutdown(ctx) // Shutdown 1 cluster (the last). Things should keep working.
if err != nil { // Important that we shut down the hosts, otherwise the RPC
t.Fatal(err) // Servers keep working along with BlockPuts.
} time.Sleep(100 * time.Millisecond)
shutdownClusters(t, clusters[nClusters-1:], mock[nClusters-1:])
wg.Wait() wg.Wait()
}) })
} }
// Kishan: Uncomment after https://github.com/ipfs/ipfs-cluster/issues/761
// is resolved. This test would pass, but not for the reason we want it to.
// Add fails waiting for the leader.
func TestAddAllPeersFail(t *testing.T) { func TestAddAllPeersFail(t *testing.T) {
ctx := context.Background() ctx := context.Background()
clusters, mock := createClusters(t) clusters, mock := createClusters(t)
@ -161,69 +154,49 @@ func TestAddAllPeersFail(t *testing.T) {
waitForLeaderAndMetrics(t, clusters) waitForLeaderAndMetrics(t, clusters)
t.Run("local", func(t *testing.T) { t.Run("local", func(t *testing.T) {
// Prevent added content to be allocated to cluster 0
// as it is already going to have something.
_, err := clusters[0].Pin(ctx, test.Cid1, api.PinOptions{
ReplicationFactorMin: 1,
ReplicationFactorMax: 1,
UserAllocations: []peer.ID{clusters[0].host.ID()},
})
if err != nil {
t.Fatal(err)
}
ttlDelay()
params := api.DefaultAddParams() params := api.DefaultAddParams()
params.Shard = false params.Shard = false
params.Name = "testlocal" params.Name = "testlocal"
lg, closer := sth.GetRandFileReader(t, 50000) // 50 MB // Allocate to every peer except 0 (which already has a pin)
params.PinOptions.ReplicationFactorMax = nClusters - 1
params.PinOptions.ReplicationFactorMin = nClusters - 1
lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB
defer closer.Close() defer closer.Close()
mr := files.NewMultiFileReader(lg, true) mr := files.NewMultiFileReader(lg, true)
r := multipart.NewReader(mr, mr.Boundary()) r := multipart.NewReader(mr, mr.Boundary())
params.PinOptions.ReplicationFactorMax = 2
params.PinOptions.ReplicationFactorMin = 2
clusters[0].allocator = &mockPinAllocator{
peers: []peer.ID{clusters[1].id, clusters[2].id},
}
var err error
// var cid cid.Cid // var cid cid.Cid
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
_, err = clusters[0].AddFile(r, params) _, err := clusters[0].AddFile(r, params)
if err == nil { if err != adder.ErrBlockAdder {
t.Fatalf("expected error") t.Fatal("expected ErrBlockAdder. Got: ", err)
} }
}() }()
for i := 1; i < 3; i++ { time.Sleep(100 * time.Millisecond)
err = clusters[i].Shutdown(ctx)
if err != nil {
t.Fatal(err)
}
}
// Shutdown all clusters except 0 to see the right error.
// Important that we shut down the hosts, otherwise
// the RPC Servers keep working along with BlockPuts.
// Note that this kills raft.
shutdownClusters(t, clusters[1:], mock[1:])
wg.Wait() wg.Wait()
// pinDelay()
// pini, err := clusters[0].Status(ctx, cid)
// if err != nil {
// t.Error(err)
// }
// fmt.Println(pini.String())
// pin, err := clusters[0].PinGet(ctx, cid)
// if err != nil {
// t.Fatal(err)
// }
// fmt.Println(pin.Allocations)
}) })
} }
type mockPinAllocator struct {
peers []peer.ID
}
// SetClient does nothing in this allocator
func (alloc mockPinAllocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc mockPinAllocator) Shutdown(_ context.Context) error { return nil }
func (alloc mockPinAllocator) Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]*api.Metric) ([]peer.ID, error) {
return alloc.peers, nil
}