df2753dfc6
Also updates to the optracker to make retrieving information easier. License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
582 lines
11 KiB
Go
582 lines
11 KiB
Go
package maptracker
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"math/rand"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
|
cid "github.com/ipfs/go-cid"
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/test"
|
|
)
|
|
|
|
var (
|
|
pinCancelCid = test.TestCid3
|
|
unpinCancelCid = test.TestCid2
|
|
ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
|
|
ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
|
|
)
|
|
|
|
type mockService struct {
|
|
rpcClient *rpc.Client
|
|
}
|
|
|
|
func mockRPCClient(t *testing.T) *rpc.Client {
|
|
s := rpc.NewServer(nil, "mock")
|
|
c := rpc.NewClientWithServer(nil, "mock", s)
|
|
err := s.RegisterName("Cluster", &mockService{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
return c
|
|
}
|
|
|
|
func (mock *mockService) IPFSPin(ctx context.Context, in api.PinSerial, out *struct{}) error {
|
|
c := in.ToPin().Cid
|
|
switch c.String() {
|
|
case test.TestSlowCid1:
|
|
time.Sleep(2 * time.Second)
|
|
case pinCancelCid:
|
|
return ErrPinCancelCid
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (mock *mockService) IPFSUnpin(ctx context.Context, in api.PinSerial, out *struct{}) error {
|
|
c := in.ToPin().Cid
|
|
switch c.String() {
|
|
case test.TestSlowCid1:
|
|
time.Sleep(2 * time.Second)
|
|
case unpinCancelCid:
|
|
return ErrUnpinCancelCid
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func testPin(c *cid.Cid, min, max int, allocs ...peer.ID) api.Pin {
|
|
pin := api.PinCid(c)
|
|
pin.ReplicationFactorMin = min
|
|
pin.ReplicationFactorMax = max
|
|
pin.Allocations = allocs
|
|
return pin
|
|
}
|
|
|
|
func testSlowMapPinTracker(t *testing.T) *MapPinTracker {
|
|
cfg := &Config{}
|
|
cfg.Default()
|
|
cfg.ConcurrentPins = 1
|
|
mpt := NewMapPinTracker(cfg, test.TestPeerID1)
|
|
mpt.SetClient(mockRPCClient(t))
|
|
return mpt
|
|
}
|
|
|
|
func testMapPinTracker(t *testing.T) *MapPinTracker {
|
|
cfg := &Config{}
|
|
cfg.Default()
|
|
cfg.ConcurrentPins = 1
|
|
mpt := NewMapPinTracker(cfg, test.TestPeerID1)
|
|
mpt.SetClient(test.NewMockRPCClient(t))
|
|
return mpt
|
|
}
|
|
|
|
func TestNew(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
}
|
|
|
|
func TestShutdown(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
err := mpt.Shutdown()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = mpt.Shutdown()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func TestTrack(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
h, _ := cid.Decode(test.TestCid1)
|
|
|
|
// Let's tart with a local pin
|
|
c := testPin(h, -1, -1)
|
|
|
|
err := mpt.Track(c)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond) // let it be pinned
|
|
|
|
st := mpt.Status(h)
|
|
if st.Status != api.TrackerStatusPinned {
|
|
t.Fatalf("cid should be pinned and is %s", st.Status)
|
|
}
|
|
|
|
// Unpin and set remote
|
|
c = testPin(h, 1, 1, test.TestPeerID2)
|
|
err = mpt.Track(c)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond) // let it be unpinned
|
|
|
|
st = mpt.Status(h)
|
|
if st.Status != api.TrackerStatusRemote {
|
|
t.Fatalf("cid should be pinned and is %s", st.Status)
|
|
}
|
|
}
|
|
|
|
func TestUntrack(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
h1, _ := cid.Decode(test.TestCid1)
|
|
h2, _ := cid.Decode(test.TestCid2)
|
|
|
|
// LocalPin
|
|
c := testPin(h1, -1, -1)
|
|
|
|
err := mpt.Track(c)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Remote pin
|
|
c = testPin(h2, 1, 1, test.TestPeerID2)
|
|
err = mpt.Track(c)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(time.Second / 2)
|
|
|
|
err = mpt.Untrack(h2)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = mpt.Untrack(h1)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = mpt.Untrack(h1)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(time.Second / 2)
|
|
|
|
st := mpt.Status(h1)
|
|
if st.Status != api.TrackerStatusUnpinned {
|
|
t.Fatalf("cid should be unpinned and is %s", st.Status)
|
|
}
|
|
|
|
st = mpt.Status(h2)
|
|
if st.Status != api.TrackerStatusUnpinned {
|
|
t.Fatalf("cid should be unpinned and is %s", st.Status)
|
|
}
|
|
}
|
|
|
|
func TestStatusAll(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
h1, _ := cid.Decode(test.TestCid1)
|
|
h2, _ := cid.Decode(test.TestCid2)
|
|
|
|
// LocalPin
|
|
c := testPin(h1, -1, -1)
|
|
mpt.Track(c)
|
|
c = testPin(h2, 1, 1)
|
|
mpt.Track(c)
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
stAll := mpt.StatusAll()
|
|
if len(stAll) != 2 {
|
|
t.Logf("%+v", stAll)
|
|
t.Fatal("expected 2 pins")
|
|
}
|
|
|
|
for _, st := range stAll {
|
|
if st.Cid.Equals(h1) && st.Status != api.TrackerStatusPinned {
|
|
t.Fatal("expected pinned")
|
|
}
|
|
if st.Cid.Equals(h2) && st.Status != api.TrackerStatusRemote {
|
|
t.Fatal("expected remote")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSyncAndRecover(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
h1, _ := cid.Decode(test.TestCid1)
|
|
h2, _ := cid.Decode(test.TestCid2)
|
|
|
|
c := testPin(h1, -1, -1)
|
|
mpt.Track(c)
|
|
c = testPin(h2, -1, -1)
|
|
mpt.Track(c)
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// IPFSPinLS RPC returns unpinned for anything != Cid1 or Cid3
|
|
info, err := mpt.Sync(h2)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if info.Status != api.TrackerStatusPinError {
|
|
t.Error("expected pin_error")
|
|
}
|
|
|
|
info, err = mpt.Sync(h1)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if info.Status != api.TrackerStatusPinned {
|
|
t.Error("expected pinned")
|
|
}
|
|
|
|
info, err = mpt.Recover(h1)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if info.Status != api.TrackerStatusPinned {
|
|
t.Error("expected pinned")
|
|
}
|
|
|
|
_, err = mpt.Recover(h2)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
info = mpt.Status(h2)
|
|
if info.Status != api.TrackerStatusPinned {
|
|
t.Error("expected pinned")
|
|
}
|
|
}
|
|
|
|
func TestRecoverAll(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
h1, _ := cid.Decode(test.TestCid1)
|
|
|
|
c := testPin(h1, -1, -1)
|
|
mpt.Track(c)
|
|
time.Sleep(100 * time.Millisecond)
|
|
mpt.optracker.SetError(h1, errors.New("fakeerror"))
|
|
pins, err := mpt.RecoverAll()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(pins) != 1 {
|
|
t.Fatal("there should be only one pin")
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
info := mpt.Status(h1)
|
|
|
|
if info.Status != api.TrackerStatusPinned {
|
|
t.Error("the pin should have been recovered")
|
|
}
|
|
}
|
|
|
|
func TestSyncAll(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
synced, err := mpt.SyncAll()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// This relies on the rpc mock implementation
|
|
|
|
if len(synced) != 0 {
|
|
t.Fatal("should not have synced anything when it tracks nothing")
|
|
}
|
|
|
|
h1, _ := cid.Decode(test.TestCid1)
|
|
h2, _ := cid.Decode(test.TestCid2)
|
|
|
|
c := testPin(h1, -1, -1)
|
|
mpt.Track(c)
|
|
c = testPin(h2, -1, -1)
|
|
mpt.Track(c)
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
synced, err = mpt.SyncAll()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(synced) != 1 || synced[0].Status != api.TrackerStatusPinError {
|
|
t.Logf("%+v", synced)
|
|
t.Fatal("should have synced h2")
|
|
}
|
|
}
|
|
|
|
func TestUntrackTrack(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
h1, _ := cid.Decode(test.TestCid1)
|
|
|
|
// LocalPin
|
|
c := testPin(h1, -1, -1)
|
|
err := mpt.Track(c)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(time.Second / 2)
|
|
|
|
err = mpt.Untrack(h1)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func TestTrackUntrackWithCancel(t *testing.T) {
|
|
mpt := testSlowMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
slowPinCid, _ := cid.Decode(test.TestSlowCid1)
|
|
|
|
// LocalPin
|
|
slowPin := testPin(slowPinCid, -1, -1)
|
|
|
|
err := mpt.Track(slowPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond) // let pinning start
|
|
|
|
pInfo := mpt.Status(slowPin.Cid)
|
|
if pInfo.Status == api.TrackerStatusUnpinned {
|
|
t.Fatal("slowPin should be tracked")
|
|
}
|
|
|
|
if pInfo.Status == api.TrackerStatusPinning {
|
|
go func() {
|
|
err = mpt.Untrack(slowPinCid)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}()
|
|
select {
|
|
case <-mpt.optracker.OpContext(slowPinCid).Done():
|
|
return
|
|
case <-time.Tick(100 * time.Millisecond):
|
|
t.Errorf("operation context should have been cancelled by now")
|
|
}
|
|
} else {
|
|
t.Error("slowPin should be pinning and is:", pInfo.Status)
|
|
}
|
|
}
|
|
|
|
func TestTrackUntrackWithNoCancel(t *testing.T) {
|
|
mpt := testSlowMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
slowPinCid, _ := cid.Decode(test.TestSlowCid1)
|
|
fastPinCid, _ := cid.Decode(pinCancelCid)
|
|
|
|
// SlowLocalPin
|
|
slowPin := testPin(slowPinCid, -1, -1)
|
|
|
|
// LocalPin
|
|
fastPin := testPin(fastPinCid, -1, -1)
|
|
|
|
err := mpt.Track(slowPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = mpt.Track(fastPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// fastPin should be queued because slow pin is pinning
|
|
pInfo := mpt.Status(fastPinCid)
|
|
if pInfo.Status == api.TrackerStatusPinQueued {
|
|
err = mpt.Untrack(fastPinCid)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
pi := mpt.Status(fastPinCid)
|
|
if pi.Error == ErrPinCancelCid.Error() {
|
|
t.Fatal(ErrPinCancelCid)
|
|
}
|
|
} else {
|
|
t.Error("fastPin should be queued to pin:", pInfo.Status)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
pInfo = mpt.Status(fastPinCid)
|
|
if pInfo.Status != api.TrackerStatusUnpinned {
|
|
t.Error("fastPin should have been removed from tracker:", pInfo.Status)
|
|
}
|
|
}
|
|
|
|
func TestUntrackTrackWithCancel(t *testing.T) {
|
|
mpt := testSlowMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
slowPinCid, _ := cid.Decode(test.TestSlowCid1)
|
|
|
|
// LocalPin
|
|
slowPin := testPin(slowPinCid, -1, -1)
|
|
|
|
err := mpt.Track(slowPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(time.Second / 2)
|
|
|
|
// Untrack should cancel the ongoing request
|
|
// and unpin right away
|
|
err = mpt.Untrack(slowPinCid)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
pInfo := mpt.Status(slowPinCid)
|
|
if pInfo.Status == api.TrackerStatusUnpinned {
|
|
t.Fatal("expected slowPin to be tracked")
|
|
}
|
|
|
|
if pInfo.Status == api.TrackerStatusUnpinning {
|
|
go func() {
|
|
err = mpt.Track(slowPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}()
|
|
select {
|
|
case <-mpt.optracker.OpContext(slowPinCid).Done():
|
|
return
|
|
case <-time.Tick(100 * time.Millisecond):
|
|
t.Errorf("operation context should have been cancelled by now")
|
|
}
|
|
} else {
|
|
t.Error("slowPin should be in unpinning")
|
|
}
|
|
|
|
}
|
|
|
|
func TestUntrackTrackWithNoCancel(t *testing.T) {
|
|
mpt := testSlowMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
slowPinCid, _ := cid.Decode(test.TestSlowCid1)
|
|
fastPinCid, _ := cid.Decode(unpinCancelCid)
|
|
|
|
// SlowLocalPin
|
|
slowPin := testPin(slowPinCid, -1, -1)
|
|
|
|
// LocalPin
|
|
fastPin := testPin(fastPinCid, -1, -1)
|
|
|
|
err := mpt.Track(slowPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = mpt.Track(fastPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
time.Sleep(3 * time.Second)
|
|
|
|
err = mpt.Untrack(slowPin.Cid)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = mpt.Untrack(fastPin.Cid)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
pInfo := mpt.Status(fastPinCid)
|
|
if pInfo.Status == api.TrackerStatusUnpinned {
|
|
t.Fatal("c untrack operation should be tracked")
|
|
}
|
|
|
|
if pInfo.Status == api.TrackerStatusUnpinQueued {
|
|
err = mpt.Track(fastPin)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
pi := mpt.Status(fastPinCid)
|
|
if pi.Error == ErrUnpinCancelCid.Error() {
|
|
t.Fatal(ErrUnpinCancelCid)
|
|
}
|
|
} else {
|
|
t.Error("c should be queued to unpin")
|
|
}
|
|
}
|
|
|
|
func TestTrackUntrackConcurrent(t *testing.T) {
|
|
mpt := testMapPinTracker(t)
|
|
defer mpt.Shutdown()
|
|
|
|
h1, _ := cid.Decode(test.TestCid1)
|
|
|
|
// LocalPin
|
|
c := testPin(h1, -1, -1)
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for i := 0; i < 50; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := 0; j < 50; j++ {
|
|
var err error
|
|
op := rand.Intn(2)
|
|
if op == 1 {
|
|
err = mpt.Track(c)
|
|
} else {
|
|
err = mpt.Untrack(c.Cid)
|
|
}
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
st := mpt.Status(h1)
|
|
t.Log(st.Status)
|
|
if st.Status != api.TrackerStatusUnpinned && st.Status != api.TrackerStatusPinned {
|
|
t.Fatal("should be pinned or unpinned")
|
|
}
|
|
}
|