pintracker: take care of tests

Simplify the tests, remove things that are not used at all, align the
behaviour of the mocks, add methods to test the correct behaviour of Status
etc.
This commit is contained in:
Hector Sanjuan 2019-12-13 12:03:01 +01:00
parent 8b6fd1fabe
commit 09d933fde1
5 changed files with 246 additions and 216 deletions

View File

@ -28,8 +28,8 @@ var DefaultListenAddrs = []string{"/ip4/0.0.0.0/tcp/9096", "/ip4/0.0.0.0/udp/909
// Configuration defaults // Configuration defaults
const ( const (
DefaultEnableRelayHop = true DefaultEnableRelayHop = true
DefaultStateSyncInterval = 600 * time.Second DefaultStateSyncInterval = 5 * time.Minute
DefaultPinRecoverInterval = 1 * time.Hour DefaultPinRecoverInterval = 5 * time.Minute
DefaultMonitorPingInterval = 15 * time.Second DefaultMonitorPingInterval = 15 * time.Second
DefaultPeerWatchInterval = 5 * time.Second DefaultPeerWatchInterval = 5 * time.Second
DefaultReplicationFactor = -1 DefaultReplicationFactor = -1

View File

@ -31,6 +31,7 @@ var testingClusterCfg = []byte(`{
"grace_period": "2m0s" "grace_period": "2m0s"
}, },
"state_sync_interval": "1m0s", "state_sync_interval": "1m0s",
"pin_recover_interval": "1m0s",
"replication_factor": -1, "replication_factor": -1,
"monitor_ping_interval": "1s", "monitor_ping_interval": "1s",
"peer_watch_interval": "1s", "peer_watch_interval": "1s",

View File

@ -1,10 +1,12 @@
// Package pintracker_test tests the multiple implementations // Package pintracker_test tests the multiple implementations
// of the PinTracker interface. // of the PinTracker interface.
//
// These tests are legacy from the time when there were several
// pintracker implementations.
package pintracker_test package pintracker_test
import ( import (
"context" "context"
"errors"
"sort" "sort"
"testing" "testing"
"time" "time"
@ -19,88 +21,33 @@ import (
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
) )
var ( var (
pinCancelCid = test.Cid3 pinOpts = api.PinOptions{
unpinCancelCid = test.Cid2
ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
pinOpts = api.PinOptions{
ReplicationFactorMax: -1, ReplicationFactorMax: -1,
ReplicationFactorMin: -1, ReplicationFactorMin: -1,
} }
) )
type mockIPFS struct{}
func mockRPCClient(t testing.TB) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("IPFSConnector", &mockIPFS{})
if err != nil {
t.Fatal(err)
}
return c
}
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
c := in.Cid
switch c.String() {
case test.SlowCid1.String():
time.Sleep(3 * time.Second)
case pinCancelCid.String():
return ErrPinCancelCid
}
return nil
}
func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
switch in.String() {
case test.Cid1.String(), test.Cid2.String():
*out = api.IPFSPinStatusRecursive
case test.Cid4.String():
*out = api.IPFSPinStatusError
return errors.New("an ipfs error")
default:
*out = api.IPFSPinStatusUnpinned
}
return nil
}
func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid.String() {
case test.SlowCid1.String():
time.Sleep(3 * time.Second)
case unpinCancelCid.String():
return ErrUnpinCancelCid
}
return nil
}
func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
m := map[string]api.IPFSPinStatus{
test.Cid1.String(): api.IPFSPinStatusRecursive,
}
*out = m
return nil
}
var sortPinInfoByCid = func(p []*api.PinInfo) { var sortPinInfoByCid = func(p []*api.PinInfo) {
sort.Slice(p, func(i, j int) bool { sort.Slice(p, func(i, j int) bool {
return p[i].Cid.String() < p[j].Cid.String() return p[i].Cid.String() < p[j].Cid.String()
}) })
} }
// prefilledState return a state instance with some pins. // prefilledState return a state instance with some pins:
// - Cid1 - pin everywhere
// - Cid2 - weird / remote // replication factor set to 0, no allocations
// - Cid3 - remote - this pin is on ipfs
// - Cid4 - pin everywhere - this pin is not on ipfs
func prefilledState(context.Context) (state.ReadOnly, error) { func prefilledState(context.Context) (state.ReadOnly, error) {
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil { if err != nil {
return nil, err return nil, err
} }
remote := api.PinWithOpts(test.Cid4, api.PinOptions{ remote := api.PinWithOpts(test.Cid3, api.PinOptions{
ReplicationFactorMax: 1, ReplicationFactorMax: 1,
ReplicationFactorMin: 1, ReplicationFactorMin: 1,
}) })
@ -109,8 +56,8 @@ func prefilledState(context.Context) (state.ReadOnly, error) {
pins := []*api.Pin{ pins := []*api.Pin{
api.PinWithOpts(test.Cid1, pinOpts), api.PinWithOpts(test.Cid1, pinOpts),
api.PinCid(test.Cid2), api.PinCid(test.Cid2),
api.PinWithOpts(test.Cid3, pinOpts),
remote, remote,
api.PinWithOpts(test.Cid4, pinOpts),
} }
ctx := context.Background() ctx := context.Background()
@ -123,16 +70,6 @@ func prefilledState(context.Context) (state.ReadOnly, error) {
return st, nil return st, nil
} }
func testSlowStatelessPinTracker(t testing.TB) *stateless.Tracker {
t.Helper()
cfg := &stateless.Config{}
cfg.Default()
spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState)
spt.SetClient(mockRPCClient(t))
return spt
}
func testStatelessPinTracker(t testing.TB) *stateless.Tracker { func testStatelessPinTracker(t testing.TB) *stateless.Tracker {
t.Helper() t.Helper()
@ -255,32 +192,12 @@ func TestPinTracker_StatusAll(t *testing.T) {
}, },
{ {
Cid: test.Cid3, Cid: test.Cid3,
Status: api.TrackerStatusPinned, Status: api.TrackerStatusRemote,
}, },
{ {
// in state but not on IPFS
Cid: test.Cid4, Cid: test.Cid4,
Status: api.TrackerStatusRemote, Status: api.TrackerStatusPinError,
},
},
},
{
"slow stateless statusall",
args{
api.PinWithOpts(test.Cid1, pinOpts),
testSlowStatelessPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusRemote,
},
{
Cid: test.Cid4,
Status: api.TrackerStatusRemote,
}, },
}, },
}, },
@ -290,7 +207,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
if err := tt.args.tracker.Track(context.Background(), tt.args.c); err != nil { if err := tt.args.tracker.Track(context.Background(), tt.args.c); err != nil {
t.Errorf("PinTracker.Track() error = %v", err) t.Errorf("PinTracker.Track() error = %v", err)
} }
time.Sleep(1 * time.Second) time.Sleep(200 * time.Millisecond)
got := tt.args.tracker.StatusAll(context.Background()) got := tt.args.tracker.StatusAll(context.Background())
if len(got) != len(tt.want) { if len(got) != len(tt.want) {
for _, pi := range got { for _, pi := range got {
@ -304,7 +221,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
sortPinInfoByCid(tt.want) sortPinInfoByCid(tt.want)
for i := range tt.want { for i := range tt.want {
if got[i].Cid.String() != tt.want[i].Cid.String() { if got[i].Cid != tt.want[i].Cid {
t.Errorf("got: %v\nwant: %v", got, tt.want) t.Errorf("got: %v\nwant: %v", got, tt.want)
} }
if got[i].Status != tt.want[i].Status { if got[i].Status != tt.want[i].Status {
@ -372,23 +289,12 @@ func TestPinTracker_Status(t *testing.T) {
Status: api.TrackerStatusUnpinned, Status: api.TrackerStatusUnpinned,
}, },
}, },
{
"slow stateless status",
args{
test.Cid1,
testSlowStatelessPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got := tt.args.tracker.Status(context.Background(), tt.args.c) got := tt.args.tracker.Status(context.Background(), tt.args.c)
if got.Cid.String() != tt.want.Cid.String() { if got.Cid != tt.want.Cid {
t.Errorf("PinTracker.Status() = %v, want %v", got.Cid, tt.want.Cid) t.Errorf("PinTracker.Status() = %v, want %v", got.Cid, tt.want.Cid)
} }
@ -425,11 +331,15 @@ func TestPinTracker_RecoverAll(t *testing.T) {
}, },
{ {
Cid: test.Cid3, Cid: test.Cid3,
Status: api.TrackerStatusPinned, Status: api.TrackerStatusRemote,
}, },
{ {
// This will recover and status
// is ignored as it could come back as
// queued, pinning or error.
Cid: test.Cid4, Cid: test.Cid4,
Status: api.TrackerStatusRemote, Status: api.TrackerStatusPinError,
}, },
}, },
false, false,
@ -447,18 +357,22 @@ func TestPinTracker_RecoverAll(t *testing.T) {
for _, pi := range got { for _, pi := range got {
t.Logf("pinfo: %v", pi) t.Logf("pinfo: %v", pi)
} }
t.Errorf("got len = %d, want = %d", len(got), len(tt.want)) t.Fatalf("got len = %d, want = %d", len(got), len(tt.want))
t.FailNow()
} }
sortPinInfoByCid(got) sortPinInfoByCid(got)
sortPinInfoByCid(tt.want) sortPinInfoByCid(tt.want)
for i := range tt.want { for i := range tt.want {
if got[i].Cid.String() != tt.want[i].Cid.String() { if got[i].Cid != tt.want[i].Cid {
t.Errorf("\ngot: %v,\nwant: %v", got[i].Cid, tt.want[i].Cid) t.Errorf("\ngot: %v,\nwant: %v", got[i].Cid, tt.want[i].Cid)
} }
// Cid4 needs to be recovered, we do not care
// on what status it finds itself.
if got[i].Cid == test.Cid4 {
continue
}
if got[i].Status != tt.want[i].Status { if got[i].Status != tt.want[i].Status {
t.Errorf("for cid: %v:\ngot: %v,\nwant: %v", tt.want[i].Cid, got[i].Status, tt.want[i].Status) t.Errorf("for cid: %v:\ngot: %v,\nwant: %v", tt.want[i].Cid, got[i].Status, tt.want[i].Status)
} }
@ -499,7 +413,7 @@ func TestPinTracker_Recover(t *testing.T) {
return return
} }
if got.Cid.String() != tt.want.Cid.String() { if got.Cid != tt.want.Cid {
t.Errorf("PinTracker.Recover() = %v, want %v", got, tt.want) t.Errorf("PinTracker.Recover() = %v, want %v", got, tt.want)
} }
}) })
@ -537,7 +451,7 @@ func TestUntrackTrack(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(time.Second / 2) time.Sleep(200 * time.Millisecond)
err = tt.args.tracker.Untrack(context.Background(), tt.args.c) err = tt.args.tracker.Untrack(context.Background(), tt.args.c)
if err != nil { if err != nil {
@ -559,10 +473,10 @@ func TestTrackUntrackWithCancel(t *testing.T) {
wantErr bool wantErr bool
}{ }{
{ {
"slow stateless tracker untrack w/ cancel", "stateless tracker untrack w/ cancel",
args{ args{
test.SlowCid1, test.SlowCid1,
testSlowStatelessPinTracker(t), testStatelessPinTracker(t),
}, },
api.PinInfo{ api.PinInfo{
Cid: test.SlowCid1, Cid: test.SlowCid1,
@ -579,7 +493,7 @@ func TestTrackUntrackWithCancel(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(100 * time.Millisecond) // let pinning start time.Sleep(200 * time.Millisecond) // let pinning start
pInfo := tt.args.tracker.Status(context.Background(), tt.args.c) pInfo := tt.args.tracker.Status(context.Background(), tt.args.c)
if pInfo.Status == api.TrackerStatusUnpinned { if pInfo.Status == api.TrackerStatusUnpinned {
@ -614,7 +528,7 @@ func TestTrackUntrackWithCancel(t *testing.T) {
func TestPinTracker_RemoteIgnoresError(t *testing.T) { func TestPinTracker_RemoteIgnoresError(t *testing.T) {
ctx := context.Background() ctx := context.Background()
testF := func(t *testing.T, pt ipfscluster.PinTracker) { testF := func(t *testing.T, pt ipfscluster.PinTracker) {
remoteCid := test.Cid4 remoteCid := test.Cid3
remote := api.PinWithOpts(remoteCid, pinOpts) remote := api.PinWithOpts(remoteCid, pinOpts)
remote.Allocations = []peer.ID{test.PeerID2} remote.Allocations = []peer.ID{test.PeerID2}
@ -628,12 +542,12 @@ func TestPinTracker_RemoteIgnoresError(t *testing.T) {
pi := pt.Status(ctx, remoteCid) pi := pt.Status(ctx, remoteCid)
if pi.Status != api.TrackerStatusRemote || pi.Error != "" { if pi.Status != api.TrackerStatusRemote || pi.Error != "" {
t.Error("Remote pin should not be in error") t.Error("Remote pin should not be in error", pi.Status, pi.Error)
} }
} }
t.Run("stateless pintracker", func(t *testing.T) { t.Run("stateless pintracker", func(t *testing.T) {
pt := testSlowStatelessPinTracker(t) pt := testStatelessPinTracker(t)
testF(t, pt) testF(t, pt)
}) })
} }

View File

@ -20,17 +20,69 @@ import (
var ( var (
pinCancelCid = test.Cid3 pinCancelCid = test.Cid3
unpinCancelCid = test.Cid2 unpinCancelCid = test.Cid2
ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation") errPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation") errUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
pinOpts = api.PinOptions{ pinOpts = api.PinOptions{
ReplicationFactorMax: -1, ReplicationFactorMax: -1,
ReplicationFactorMin: -1, ReplicationFactorMin: -1,
} }
) )
type mockIPFS struct{} // func TestMain(m *testing.M) {
// logging.SetLogLevel("pintracker", "debug")
// os.Exit(m.Run())
// }
// Overwrite Pin and Unpin methods on the normal mock in order to return
// special errors when unwanted operations have been triggered.
type mockIPFS struct {
}
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case pinCancelCid:
return errPinCancelCid
case test.SlowCid1:
time.Sleep(time.Second)
}
return nil
}
func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case unpinCancelCid:
return errUnpinCancelCid
case test.SlowCid1:
time.Sleep(time.Second)
}
return nil
}
func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
// Must be consistent with PinLsCid
m := map[string]api.IPFSPinStatus{
test.Cid1.String(): api.IPFSPinStatusRecursive,
test.Cid2.String(): api.IPFSPinStatusRecursive,
}
*out = m
return nil
}
func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
switch in {
case test.Cid1, test.Cid2:
*out = api.IPFSPinStatusRecursive
default:
*out = api.IPFSPinStatusUnpinned
return nil
}
return nil
}
func mockRPCClient(t testing.TB) *rpc.Client {
t.Helper()
func mockRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock") s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s) c := rpc.NewClientWithServer(nil, "mock", s)
@ -41,80 +93,38 @@ func mockRPCClient(t *testing.T) *rpc.Client {
return c return c
} }
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error { func getStateFunc(t testing.TB, items ...*api.Pin) func(context.Context) (state.ReadOnly, error) {
switch in.Cid.String() { t.Helper()
case test.SlowCid1.String(): ctx := context.Background()
time.Sleep(2 * time.Second)
case pinCancelCid.String(): st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
return ErrPinCancelCid if err != nil {
t.Fatal(err)
} }
return nil
for _, item := range items {
err := st.Add(ctx, item)
if err != nil {
t.Fatal(err)
}
}
return func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
} }
func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error { func testStatelessPinTracker(t testing.TB, pins ...*api.Pin) *Tracker {
switch in.Cid.String() {
case test.SlowCid1.String():
time.Sleep(2 * time.Second)
case unpinCancelCid.String():
return ErrUnpinCancelCid
}
return nil
}
func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
m := map[string]api.IPFSPinStatus{
test.Cid1.String(): api.IPFSPinStatusRecursive,
}
*out = m
return nil
}
func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
switch in.String() {
case test.Cid1.String(), test.Cid2.String():
*out = api.IPFSPinStatusRecursive
default:
*out = api.IPFSPinStatusUnpinned
}
return nil
}
func testSlowStatelessPinTracker(t *testing.T) *Tracker {
t.Helper() t.Helper()
cfg := &Config{} cfg := &Config{}
cfg.Default() cfg.Default()
cfg.ConcurrentPins = 1 cfg.ConcurrentPins = 1
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle()) spt := New(cfg, test.PeerID1, test.PeerName1, getStateFunc(t, pins...))
if err != nil {
t.Fatal(err)
}
getState := func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
spt := New(cfg, test.PeerID1, test.PeerName1, getState)
spt.SetClient(mockRPCClient(t)) spt.SetClient(mockRPCClient(t))
return spt return spt
} }
func testStatelessPinTracker(t testing.TB) *Tracker {
t.Helper()
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
getState := func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
spt := New(cfg, test.PeerID1, test.PeerName1, getState)
spt.SetClient(test.NewMockRPCClient(t))
return spt
}
func TestStatelessPinTracker_New(t *testing.T) { func TestStatelessPinTracker_New(t *testing.T) {
ctx := context.Background() ctx := context.Background()
spt := testStatelessPinTracker(t) spt := testStatelessPinTracker(t)
@ -159,7 +169,7 @@ func TestUntrackTrack(t *testing.T) {
func TestTrackUntrackWithCancel(t *testing.T) { func TestTrackUntrackWithCancel(t *testing.T) {
ctx := context.Background() ctx := context.Background()
spt := testSlowStatelessPinTracker(t) spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx) defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1 slowPinCid := test.SlowCid1
@ -167,27 +177,27 @@ func TestTrackUntrackWithCancel(t *testing.T) {
// LocalPin // LocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts) slowPin := api.PinWithOpts(slowPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin) err := spt.Track(ctx, slowPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(100 * time.Millisecond) // let pinning start time.Sleep(100 * time.Millisecond) // let pinning start
pInfo := spt.optracker.Get(context.Background(), slowPin.Cid) pInfo := spt.optracker.Get(ctx, slowPin.Cid)
if pInfo.Status == api.TrackerStatusUnpinned { if pInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("slowPin should be tracked") t.Fatal("slowPin should be tracked")
} }
if pInfo.Status == api.TrackerStatusPinning { if pInfo.Status == api.TrackerStatusPinning {
go func() { go func() {
err = spt.Untrack(context.Background(), slowPinCid) err = spt.Untrack(ctx, slowPinCid)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
}() }()
select { select {
case <-spt.optracker.OpContext(context.Background(), slowPinCid).Done(): case <-spt.optracker.OpContext(ctx, slowPinCid).Done():
return return
case <-time.Tick(100 * time.Millisecond): case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been cancelled by now") t.Errorf("operation context should have been cancelled by now")
@ -204,7 +214,7 @@ func TestTrackUntrackWithCancel(t *testing.T) {
// cancelling of the pinning operation happens (unlike on WithCancel). // cancelling of the pinning operation happens (unlike on WithCancel).
func TestTrackUntrackWithNoCancel(t *testing.T) { func TestTrackUntrackWithNoCancel(t *testing.T) {
ctx := context.Background() ctx := context.Background()
spt := testSlowStatelessPinTracker(t) spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx) defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1 slowPinCid := test.SlowCid1
@ -216,7 +226,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
// LocalPin // LocalPin
fastPin := api.PinWithOpts(fastPinCid, pinOpts) fastPin := api.PinWithOpts(fastPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin) err := spt.Track(ctx, slowPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -224,18 +234,18 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
// Otherwise fails when running with -race // Otherwise fails when running with -race
time.Sleep(300 * time.Millisecond) time.Sleep(300 * time.Millisecond)
err = spt.Track(context.Background(), fastPin) err = spt.Track(ctx, fastPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// fastPin should be queued because slow pin is pinning // fastPin should be queued because slow pin is pinning
fastPInfo := spt.optracker.Get(context.Background(), fastPin.Cid) fastPInfo := spt.optracker.Get(ctx, fastPin.Cid)
if fastPInfo.Status == api.TrackerStatusUnpinned { if fastPInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("fastPin should be tracked") t.Fatal("fastPin should be tracked")
} }
if fastPInfo.Status == api.TrackerStatusPinQueued { if fastPInfo.Status == api.TrackerStatusPinQueued {
err = spt.Untrack(context.Background(), fastPinCid) err = spt.Untrack(ctx, fastPinCid)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -247,7 +257,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
t.Errorf("fastPin should be queued to pin but is %s", fastPInfo.Status) t.Errorf("fastPin should be queued to pin but is %s", fastPInfo.Status)
} }
pi := spt.optracker.Get(context.Background(), fastPin.Cid) pi := spt.optracker.Get(ctx, fastPin.Cid)
if pi.Cid == cid.Undef { if pi.Cid == cid.Undef {
t.Error("fastPin should have been removed from tracker") t.Error("fastPin should have been removed from tracker")
} }
@ -255,7 +265,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
func TestUntrackTrackWithCancel(t *testing.T) { func TestUntrackTrackWithCancel(t *testing.T) {
ctx := context.Background() ctx := context.Background()
spt := testSlowStatelessPinTracker(t) spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx) defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1 slowPinCid := test.SlowCid1
@ -263,7 +273,7 @@ func TestUntrackTrackWithCancel(t *testing.T) {
// LocalPin // LocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts) slowPin := api.PinWithOpts(slowPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin) err := spt.Track(ctx, slowPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -272,27 +282,27 @@ func TestUntrackTrackWithCancel(t *testing.T) {
// Untrack should cancel the ongoing request // Untrack should cancel the ongoing request
// and unpin right away // and unpin right away
err = spt.Untrack(context.Background(), slowPinCid) err = spt.Untrack(ctx, slowPinCid)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
pi := spt.optracker.Get(context.Background(), slowPin.Cid) pi := spt.optracker.Get(ctx, slowPin.Cid)
if pi.Cid == cid.Undef { if pi.Cid == cid.Undef {
t.Fatal("expected slowPin to be tracked") t.Fatal("expected slowPin to be tracked")
} }
if pi.Status == api.TrackerStatusUnpinning { if pi.Status == api.TrackerStatusUnpinning {
go func() { go func() {
err = spt.Track(context.Background(), slowPin) err = spt.Track(ctx, slowPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
}() }()
select { select {
case <-spt.optracker.OpContext(context.Background(), slowPinCid).Done(): case <-spt.optracker.OpContext(ctx, slowPinCid).Done():
return return
case <-time.Tick(100 * time.Millisecond): case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been cancelled by now") t.Errorf("operation context should have been cancelled by now")
@ -317,35 +327,35 @@ func TestUntrackTrackWithNoCancel(t *testing.T) {
// LocalPin // LocalPin
fastPin := api.PinWithOpts(fastPinCid, pinOpts) fastPin := api.PinWithOpts(fastPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin) err := spt.Track(ctx, slowPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = spt.Track(context.Background(), fastPin) err = spt.Track(ctx, fastPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
err = spt.Untrack(context.Background(), slowPin.Cid) err = spt.Untrack(ctx, slowPin.Cid)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = spt.Untrack(context.Background(), fastPin.Cid) err = spt.Untrack(ctx, fastPin.Cid)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pi := spt.optracker.Get(context.Background(), fastPin.Cid) pi := spt.optracker.Get(ctx, fastPin.Cid)
if pi.Cid == cid.Undef { if pi.Cid == cid.Undef {
t.Fatal("c untrack operation should be tracked") t.Fatal("c untrack operation should be tracked")
} }
if pi.Status == api.TrackerStatusUnpinQueued { if pi.Status == api.TrackerStatusUnpinQueued {
err = spt.Track(context.Background(), fastPin) err = spt.Track(ctx, fastPin)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -359,6 +369,102 @@ func TestUntrackTrackWithNoCancel(t *testing.T) {
} }
} }
// TestStatusAll checks that StatusAll correctly reports tracked
// items and mismatches between what's on IPFS and on the state.
func TestStatusAll(t *testing.T) {
ctx := context.Background()
normalPin := api.PinWithOpts(test.Cid1, pinOpts)
normalPin2 := api.PinWithOpts(test.Cid4, pinOpts)
// - Build a state with one pins (Cid1,Cid4)
// - The IPFS Mock reports Cid1 and Cid2
// - Track a SlowCid additionally
spt := testStatelessPinTracker(t, normalPin, normalPin2)
defer spt.Shutdown(ctx)
slowPin := api.PinWithOpts(test.SlowCid1, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Needs to return:
// * A slow CID pinning
// * Cid1 is pinned
// * Cid4 should be in PinError (it's in the state but not on IPFS)
stAll := spt.StatusAll(ctx)
if len(stAll) != 3 {
t.Errorf("wrong status length. Expected 3, got: %d", len(stAll))
}
for _, pi := range stAll {
switch pi.Cid {
case test.Cid1:
if pi.Status != api.TrackerStatusPinned {
t.Error("cid1 should be pinned")
}
case test.Cid4:
if pi.Status != api.TrackerStatusPinError {
t.Error("cid2 should be in pin_error status")
}
case test.SlowCid1:
if pi.Status != api.TrackerStatusPinning {
t.Error("slowCid1 should be pinning")
}
default:
t.Error("Unexpected pin:", pi.Cid)
}
}
}
// TestStatus checks that the Status calls correctly reports tracked
// items and mismatches between what's on IPFS and on the state.
func TestStatus(t *testing.T) {
ctx := context.Background()
normalPin := api.PinWithOpts(test.Cid1, pinOpts)
normalPin2 := api.PinWithOpts(test.Cid4, pinOpts)
// - Build a state with one pins (Cid1,Cid4)
// - The IPFS Mock reports Cid1 and Cid2
// - Track a SlowCid additionally
spt := testStatelessPinTracker(t, normalPin, normalPin2)
defer spt.Shutdown(ctx)
slowPin := api.PinWithOpts(test.SlowCid1, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Status needs to return:
// * For slowCid1: A slow CID pinning
// * For Cid1: pinned
// * For Cid4: pin error
st := spt.Status(ctx, test.Cid1)
if st.Status != api.TrackerStatusPinned {
t.Error("cid1 should be pinned")
}
st = spt.Status(ctx, test.Cid4)
if st.Status != api.TrackerStatusPinError {
t.Error("cid2 should be in pin_error status")
}
st = spt.Status(ctx, test.SlowCid1)
if st.Status != api.TrackerStatusPinning {
t.Error("slowCid1 should be pinning")
}
}
var sortPinInfoByCid = func(p []*api.PinInfo) { var sortPinInfoByCid = func(p []*api.PinInfo) {
sort.Slice(p, func(i, j int) bool { sort.Slice(p, func(i, j int) bool {
return p[i].Cid.String() < p[j].Cid.String() return p[i].Cid.String() < p[j].Cid.String()
@ -367,8 +473,9 @@ var sortPinInfoByCid = func(p []*api.PinInfo) {
func BenchmarkTracker_localStatus(b *testing.B) { func BenchmarkTracker_localStatus(b *testing.B) {
tracker := testStatelessPinTracker(b) tracker := testStatelessPinTracker(b)
ctx := context.Background()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
tracker.localStatus(context.Background(), true) tracker.localStatus(ctx, true)
} }
} }

View File

@ -432,10 +432,18 @@ func (mock *mockPeerMonitor) MetricNames(ctx context.Context, in struct{}, out *
/* IPFSConnector methods */ /* IPFSConnector methods */
func (mock *mockIPFSConnector) Pin(ctx context.Context, in *api.Pin, out *struct{}) error { func (mock *mockIPFSConnector) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case SlowCid1:
time.Sleep(2 * time.Second)
}
return nil return nil
} }
func (mock *mockIPFSConnector) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error { func (mock *mockIPFSConnector) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case SlowCid1:
time.Sleep(2 * time.Second)
}
return nil return nil
} }