d4484ec3f1
* build(deps): bump github.com/google/uuid from 1.3.0 to 1.3.1 Bumps [github.com/google/uuid](https://github.com/google/uuid) from 1.3.0 to 1.3.1. - [Release notes](https://github.com/google/uuid/releases) - [Changelog](https://github.com/google/uuid/blob/master/CHANGELOG.md) - [Commits](https://github.com/google/uuid/compare/v1.3.0...v1.3.1) --- updated-dependencies: - dependency-name: github.com/google/uuid dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * Dependency upgrades * build(deps): bump golang.org/x/crypto from 0.12.0 to 0.13.0 Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.12.0 to 0.13.0. - [Commits](https://github.com/golang/crypto/compare/v0.12.0...v0.13.0) --- updated-dependencies: - dependency-name: golang.org/x/crypto dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/rs/cors from 1.8.3 to 1.10.1 Bumps [github.com/rs/cors](https://github.com/rs/cors) from 1.8.3 to 1.10.1. - [Release notes](https://github.com/rs/cors/releases) - [Commits](https://github.com/rs/cors/compare/v1.8.3...v1.10.1) --- updated-dependencies: - dependency-name: github.com/rs/cors dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-block-format from 0.1.2 to 0.2.0 Bumps [github.com/ipfs/go-block-format](https://github.com/ipfs/go-block-format) from 0.1.2 to 0.2.0. - [Release notes](https://github.com/ipfs/go-block-format/releases) - [Commits](https://github.com/ipfs/go-block-format/compare/v0.1.2...v0.2.0) --- updated-dependencies: - dependency-name: github.com/ipfs/go-block-format dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * build(deps): bump github.com/ipfs/go-ipfs-api from 0.5.0 to 0.7.0 Bumps [github.com/ipfs/go-ipfs-api](https://github.com/ipfs/go-ipfs-api) from 0.5.0 to 0.7.0. - [Release notes](https://github.com/ipfs/go-ipfs-api/releases) - [Commits](https://github.com/ipfs/go-ipfs-api/compare/v0.5.0...v0.7.0) --- updated-dependencies: - dependency-name: github.com/ipfs/go-ipfs-api dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
299 lines
7.5 KiB
Go
299 lines
7.5 KiB
Go
package ipfscluster
|
|
|
|
// This files has tests for Add* using multiple cluster peers.
|
|
|
|
import (
|
|
"context"
|
|
"mime/multipart"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ipfs-cluster/ipfs-cluster/adder"
|
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
|
"github.com/ipfs-cluster/ipfs-cluster/test"
|
|
files "github.com/ipfs/boxo/files"
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
func TestAdd(t *testing.T) {
|
|
ctx := context.Background()
|
|
clusters, mock := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mock)
|
|
sth := test.NewShardingTestHelper()
|
|
defer sth.Clean(t)
|
|
|
|
waitForLeaderAndMetrics(t, clusters)
|
|
|
|
t.Run("default", func(t *testing.T) {
|
|
params := api.DefaultAddParams()
|
|
params.Shard = false
|
|
params.Name = "testlocal"
|
|
mfr, closer := sth.GetTreeMultiReader(t)
|
|
defer closer.Close()
|
|
r := multipart.NewReader(mfr, mfr.Boundary())
|
|
ci, err := clusters[0].AddFile(context.Background(), r, params)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if ci.String() != test.ShardingDirBalancedRootCID {
|
|
t.Fatal("unexpected root CID for local add")
|
|
}
|
|
|
|
// We need to sleep a lot because it takes time to
|
|
// catch up on a first/single pin on crdts
|
|
time.Sleep(10 * time.Second)
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
pin := c.StatusLocal(ctx, ci)
|
|
if pin.Error != "" {
|
|
t.Error(pin.Error)
|
|
}
|
|
if pin.Status != api.TrackerStatusPinned {
|
|
t.Error("item should be pinned and is", pin.Status)
|
|
}
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
})
|
|
|
|
t.Run("local_one_allocation", func(t *testing.T) {
|
|
params := api.DefaultAddParams()
|
|
params.Shard = false
|
|
params.Name = "testlocal"
|
|
params.ReplicationFactorMin = 1
|
|
params.ReplicationFactorMax = 1
|
|
params.Local = true
|
|
mfr, closer := sth.GetTreeMultiReader(t)
|
|
defer closer.Close()
|
|
r := multipart.NewReader(mfr, mfr.Boundary())
|
|
ci, err := clusters[2].AddFile(context.Background(), r, params)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if ci.String() != test.ShardingDirBalancedRootCID {
|
|
t.Fatal("unexpected root CID for local add")
|
|
}
|
|
|
|
// We need to sleep a lot because it takes time to
|
|
// catch up on a first/single pin on crdts
|
|
time.Sleep(10 * time.Second)
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
pin := c.StatusLocal(ctx, ci)
|
|
if pin.Error != "" {
|
|
t.Error(pin.Error)
|
|
}
|
|
switch c.id {
|
|
case clusters[2].id:
|
|
if pin.Status != api.TrackerStatusPinned {
|
|
t.Error("item should be pinned and is", pin.Status)
|
|
}
|
|
default:
|
|
if pin.Status != api.TrackerStatusRemote {
|
|
t.Errorf("item should only be allocated to cluster2")
|
|
}
|
|
}
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
})
|
|
}
|
|
|
|
func TestAddWithUserAllocations(t *testing.T) {
|
|
ctx := context.Background()
|
|
clusters, mock := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mock)
|
|
sth := test.NewShardingTestHelper()
|
|
defer sth.Clean(t)
|
|
|
|
waitForLeaderAndMetrics(t, clusters)
|
|
|
|
t.Run("local", func(t *testing.T) {
|
|
params := api.DefaultAddParams()
|
|
params.ReplicationFactorMin = 2
|
|
params.ReplicationFactorMax = 2
|
|
params.UserAllocations = []peer.ID{clusters[0].id, clusters[1].id}
|
|
params.Shard = false
|
|
params.Name = "testlocal"
|
|
mfr, closer := sth.GetTreeMultiReader(t)
|
|
defer closer.Close()
|
|
r := multipart.NewReader(mfr, mfr.Boundary())
|
|
ci, err := clusters[0].AddFile(context.Background(), r, params)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
pinDelay()
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
if c == clusters[0] || c == clusters[1] {
|
|
pin := c.StatusLocal(ctx, ci)
|
|
if pin.Error != "" {
|
|
t.Error(pin.Error)
|
|
}
|
|
if pin.Status != api.TrackerStatusPinned {
|
|
t.Error("item should be pinned and is", pin.Status)
|
|
}
|
|
} else {
|
|
pin := c.StatusLocal(ctx, ci)
|
|
if pin.Status != api.TrackerStatusRemote {
|
|
t.Error("expected tracker status remote")
|
|
}
|
|
}
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
})
|
|
}
|
|
|
|
func TestAddPeerDown(t *testing.T) {
|
|
ctx := context.Background()
|
|
clusters, mock := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mock)
|
|
sth := test.NewShardingTestHelper()
|
|
defer sth.Clean(t)
|
|
err := clusters[0].Shutdown(ctx)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
waitForLeaderAndMetrics(t, clusters)
|
|
|
|
t.Run("local", func(t *testing.T) {
|
|
params := api.DefaultAddParams()
|
|
params.Shard = false
|
|
params.Name = "testlocal"
|
|
mfr, closer := sth.GetTreeMultiReader(t)
|
|
defer closer.Close()
|
|
r := multipart.NewReader(mfr, mfr.Boundary())
|
|
ci, err := clusters[1].AddFile(context.Background(), r, params)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if ci.String() != test.ShardingDirBalancedRootCID {
|
|
t.Fatal("unexpected root CID for local add")
|
|
}
|
|
|
|
// We need to sleep a lot because it takes time to
|
|
// catch up on a first/single pin on crdts
|
|
time.Sleep(10 * time.Second)
|
|
|
|
f := func(t *testing.T, c *Cluster) {
|
|
if c.id == clusters[0].id {
|
|
return
|
|
}
|
|
pin := c.StatusLocal(ctx, ci)
|
|
if pin.Error != "" {
|
|
t.Error(pin.Error)
|
|
}
|
|
if pin.Status != api.TrackerStatusPinned {
|
|
t.Error("item should be pinned")
|
|
}
|
|
}
|
|
|
|
runF(t, clusters, f)
|
|
})
|
|
}
|
|
|
|
func TestAddOnePeerFails(t *testing.T) {
|
|
clusters, mock := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mock)
|
|
sth := test.NewShardingTestHelper()
|
|
defer sth.Clean(t)
|
|
|
|
waitForLeaderAndMetrics(t, clusters)
|
|
|
|
t.Run("local", func(t *testing.T) {
|
|
params := api.DefaultAddParams()
|
|
params.Shard = false
|
|
params.Name = "testlocal"
|
|
lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB
|
|
defer closer.Close()
|
|
|
|
mr := files.NewMultiFileReader(lg, true, false)
|
|
r := multipart.NewReader(mr, mr.Boundary())
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, err := clusters[0].AddFile(context.Background(), r, params)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}()
|
|
|
|
// Disconnect 1 cluster (the last). Things should keep working.
|
|
// Important that we close the hosts, otherwise the RPC
|
|
// Servers keep working along with BlockPuts.
|
|
time.Sleep(100 * time.Millisecond)
|
|
c := clusters[nClusters-1]
|
|
c.Shutdown(context.Background())
|
|
c.dht.Close()
|
|
c.host.Close()
|
|
wg.Wait()
|
|
})
|
|
}
|
|
|
|
func TestAddAllPeersFail(t *testing.T) {
|
|
ctx := context.Background()
|
|
clusters, mock := createClusters(t)
|
|
defer shutdownClusters(t, clusters, mock)
|
|
sth := test.NewShardingTestHelper()
|
|
defer sth.Clean(t)
|
|
|
|
waitForLeaderAndMetrics(t, clusters)
|
|
|
|
t.Run("local", func(t *testing.T) {
|
|
// Prevent added content to be allocated to cluster 0
|
|
// as it is already going to have something.
|
|
_, err := clusters[0].Pin(ctx, test.Cid1, api.PinOptions{
|
|
ReplicationFactorMin: 1,
|
|
ReplicationFactorMax: 1,
|
|
UserAllocations: []peer.ID{clusters[0].host.ID()},
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
ttlDelay()
|
|
|
|
params := api.DefaultAddParams()
|
|
params.Shard = false
|
|
params.Name = "testlocal"
|
|
// Allocate to every peer except 0 (which already has a pin)
|
|
params.PinOptions.ReplicationFactorMax = nClusters - 1
|
|
params.PinOptions.ReplicationFactorMin = nClusters - 1
|
|
|
|
lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB
|
|
defer closer.Close()
|
|
mr := files.NewMultiFileReader(lg, true, false)
|
|
r := multipart.NewReader(mr, mr.Boundary())
|
|
|
|
// var cid cid.Cid
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, err := clusters[0].AddFile(context.Background(), r, params)
|
|
if err != adder.ErrBlockAdder {
|
|
t.Error("expected ErrBlockAdder. Got: ", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Shutdown all clusters except 0 to see the right error.
|
|
// Important that we shut down the hosts, otherwise
|
|
// the RPC Servers keep working along with BlockPuts.
|
|
// Note that this kills raft.
|
|
runF(t, clusters[1:], func(t *testing.T, c *Cluster) {
|
|
c.Shutdown(ctx)
|
|
c.dht.Close()
|
|
c.host.Close()
|
|
})
|
|
wg.Wait()
|
|
})
|
|
}
|