uses new gorpc method to distinguish err type

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
Adrian Lanzafame 2018-06-25 13:33:17 +10:00 committed by Hector Sanjuan
parent 33f56e8867
commit c6eada9db5
11 changed files with 358 additions and 163 deletions

View File

@ -888,14 +888,3 @@ type Error struct {
func (e *Error) Error() string {
return fmt.Sprintf("%s (%d)", e.Message, e.Code)
}
// CidNotInGlobalStateError allows for the differentiation of network
// and rpc errors from an error indicating that the Pin isn't in the
// global state.
type CidNotInGlobalStateError struct {
Cid *cid.Cid
}
func (c *CidNotInGlobalStateError) Error() string {
return fmt.Sprintf("cid is not part of the global state: %s", c.Cid)
}

View File

@ -866,7 +866,7 @@ func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) {
}
pin, ok := st.Get(h)
if !ok {
return pin, &api.CidNotInGlobalStateError{Cid: h}
return pin, errors.New("cid is not part of the global state")
}
return pin, nil
}

View File

@ -780,7 +780,7 @@ func TestClusterRecoverAllLocal(t *testing.T) {
defer cl.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cl.Pin(api.Pin{Cid: c, ReplicationFactorMax: -1})
err := cl.Pin(api.PinCid(c))
if err != nil {
t.Fatal("pin should have worked:", err)
}

View File

@ -400,6 +400,7 @@ func (mpt *MapPinTracker) SetClient(c *rpc.Client) {
}
// OpContext exports the internal optracker's OpContext method.
// For testing purposes only.
func (mpt *MapPinTracker) OpContext(c *cid.Cid) context.Context {
return mpt.optracker.OpContext(c)
}

View File

@ -199,3 +199,30 @@ func (op *Operation) ToTrackerStatus() api.TrackerStatus {
}
}
// TrackerStatusToOperationPhase takes an api.TrackerStatus and
// converts it to an OpType and Phase.
func TrackerStatusToOperationPhase(status api.TrackerStatus) (OperationType, Phase) {
switch status {
case api.TrackerStatusPinError:
return OperationPin, PhaseError
case api.TrackerStatusPinQueued:
return OperationPin, PhaseQueued
case api.TrackerStatusPinning:
return OperationPin, PhaseInProgress
case api.TrackerStatusPinned:
return OperationPin, PhaseDone
case api.TrackerStatusUnpinError:
return OperationUnpin, PhaseError
case api.TrackerStatusUnpinQueued:
return OperationUnpin, PhaseQueued
case api.TrackerStatusUnpinning:
return OperationUnpin, PhaseInProgress
case api.TrackerStatusUnpinned:
return OperationUnpin, PhaseDone
case api.TrackerStatusRemote:
return OperationRemote, PhaseDone
default:
return OperationUnknown, PhaseError
}
}

View File

@ -138,37 +138,6 @@ func (opt *OperationTracker) Get(c *cid.Cid) api.PinInfo {
return pInfo
}
// FilterGet returns a PinInfo for Cid, only if the Phase or OperationType
// match what the associated Operation has. Note, only supports
// filters of type OperationType or Phase, any other type
// will result in a nil slice being returned.
func (opt *OperationTracker) FilterGet(c *cid.Cid, filter interface{}) (api.PinInfo, bool) {
opt.mu.RLock()
defer opt.mu.RUnlock()
op, ok := opt.operations[c.String()]
if !ok {
return api.PinInfo{}, false
}
var pi api.PinInfo
switch f := filter.(type) {
case OperationType:
if op.Type() != f {
return api.PinInfo{}, false
}
pi = opt.unsafePinInfo(op)
case Phase:
if op.Phase() != f {
return api.PinInfo{}, false
}
pi = opt.unsafePinInfo(op)
default:
return api.PinInfo{}, false
}
return pi, ok
}
// GetExists returns a PinInfo object for a Cid only if there exists
// an associated Operation.
func (opt *OperationTracker) GetExists(c *cid.Cid) (api.PinInfo, bool) {
@ -211,6 +180,17 @@ func (opt *OperationTracker) CleanError(c *cid.Cid) {
return
}
// CleanAllDone deletes any operation from the tracker that is in PhaseDone.
func (opt *OperationTracker) CleanAllDone() {
opt.mu.Lock()
defer opt.mu.Unlock()
for _, op := range opt.operations {
if op.Phase() == PhaseDone {
delete(opt.operations, op.Cid().String())
}
}
}
// OpContext gets the context of an operation, if any.
func (opt *OperationTracker) OpContext(c *cid.Cid) context.Context {
opt.mu.RLock()
@ -237,11 +217,11 @@ func (opt *OperationTracker) Filter(filters ...interface{}) []api.PinInfo {
return pinfos
}
// FilterOps returns a slice that only contains operations
// filterOps returns a slice that only contains operations
// with the matching filter. Note, only supports
// filters of type OperationType or Phase, any other type
// will result in a nil slice being returned.
func (opt *OperationTracker) FilterOps(filters ...interface{}) []*Operation {
func (opt *OperationTracker) filterOps(filters ...interface{}) []*Operation {
var fltops []*Operation
opt.mu.RLock()
defer opt.mu.RUnlock()

View File

@ -5,8 +5,6 @@ import (
"errors"
"testing"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
@ -204,7 +202,7 @@ func TestOperationTracker_OpContext(t *testing.T) {
}
}
func TestOperationTracker_FilterOps(t *testing.T) {
func TestOperationTracker_filterOps(t *testing.T) {
ctx := context.Background()
testOpsMap := map[string]*Operation{
test.TestCid1: &Operation{pin: api.PinCid(test.MustDecodeCid(test.TestCid1)), opType: OperationPin, phase: PhaseQueued},
@ -216,7 +214,7 @@ func TestOperationTracker_FilterOps(t *testing.T) {
t.Run("filter ops to pin operations", func(t *testing.T) {
wantLen := 2
wantOp := OperationPin
got := opt.FilterOps(wantOp)
got := opt.filterOps(wantOp)
if len(got) != wantLen {
t.Errorf("want: %d %s operations; got: %d", wantLen, wantOp.String(), len(got))
}
@ -230,7 +228,7 @@ func TestOperationTracker_FilterOps(t *testing.T) {
t.Run("filter ops to in progress phase", func(t *testing.T) {
wantLen := 2
wantPhase := PhaseInProgress
got := opt.FilterOps(PhaseInProgress)
got := opt.filterOps(PhaseInProgress)
if len(got) != wantLen {
t.Errorf("want: %d %s operations; got: %d", wantLen, wantPhase.String(), len(got))
}
@ -245,7 +243,7 @@ func TestOperationTracker_FilterOps(t *testing.T) {
wantLen := 1
wantPhase := PhaseQueued
wantOp := OperationPin
got := opt.FilterOps(OperationPin, PhaseQueued)
got := opt.filterOps(OperationPin, PhaseQueued)
if len(got) != wantLen {
t.Errorf("want: %d %s operations; got: %d", wantLen, wantPhase.String(), len(got))
}
@ -260,60 +258,3 @@ func TestOperationTracker_FilterOps(t *testing.T) {
}
})
}
func TestOperationTracker_FilterGet(t *testing.T) {
ctx := context.Background()
testOpsMap := map[string]*Operation{
test.TestCid1: &Operation{pin: api.PinCid(test.MustDecodeCid(test.TestCid1)), opType: OperationPin, phase: PhaseQueued},
test.TestCid2: &Operation{pin: api.PinCid(test.MustDecodeCid(test.TestCid2)), opType: OperationPin, phase: PhaseInProgress},
test.TestCid3: &Operation{pin: api.PinCid(test.MustDecodeCid(test.TestCid3)), opType: OperationUnpin, phase: PhaseError},
}
opt := &OperationTracker{ctx: ctx, operations: testOpsMap}
type args struct {
c *cid.Cid
filter interface{}
}
tests := []struct {
name string
args args
want api.PinInfo
wantOk bool
}{
{
"filter get to pin operation",
args{
test.MustDecodeCid(test.TestCid1),
OperationPin,
},
opt.unsafePinInfo(testOpsMap[test.TestCid1]),
true,
},
{
"filter get to in progress phase",
args{
test.MustDecodeCid(test.TestCid3),
OperationPin,
},
opt.unsafePinInfo(testOpsMap[test.TestCid3]),
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, ok := opt.FilterGet(tt.args.c, tt.args.filter)
if ok != tt.wantOk {
t.Fatalf("wantOk: %v; got: %v", tt.wantOk, ok)
t.FailNow()
}
if tt.wantOk {
if got.Cid.String() != tt.want.Cid.String() {
t.Errorf("want: %v; got: %v", tt.want.Cid, got.Cid)
}
if got.Status != tt.want.Status {
t.Errorf("want: %v; got: %v", tt.want.Status, got.Status)
}
}
})
}
}

View File

@ -31,7 +31,7 @@ type mockService struct {
rpcClient *rpc.Client
}
func mockRPCClient(t *testing.T) *rpc.Client {
func mockRPCClient(t testing.TB) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("Cluster", &mockService{})
@ -198,7 +198,7 @@ var sortPinInfoByCid = func(p []api.PinInfo) {
})
}
func testSlowMapPinTracker(t *testing.T) *maptracker.MapPinTracker {
func testSlowMapPinTracker(t testing.TB) *maptracker.MapPinTracker {
cfg := &maptracker.Config{}
cfg.Default()
mpt := maptracker.NewMapPinTracker(cfg, test.TestPeerID1)
@ -206,7 +206,7 @@ func testSlowMapPinTracker(t *testing.T) *maptracker.MapPinTracker {
return mpt
}
func testMapPinTracker(t *testing.T) *maptracker.MapPinTracker {
func testMapPinTracker(t testing.TB) *maptracker.MapPinTracker {
cfg := &maptracker.Config{}
cfg.Default()
mpt := maptracker.NewMapPinTracker(cfg, test.TestPeerID1)
@ -214,7 +214,7 @@ func testMapPinTracker(t *testing.T) *maptracker.MapPinTracker {
return mpt
}
func testSlowStatelessPinTracker(t *testing.T) *stateless.Tracker {
func testSlowStatelessPinTracker(t testing.TB) *stateless.Tracker {
cfg := &stateless.Config{}
cfg.Default()
mpt := stateless.New(cfg, test.TestPeerID1)
@ -222,7 +222,7 @@ func testSlowStatelessPinTracker(t *testing.T) *stateless.Tracker {
return mpt
}
func testStatelessPinTracker(t *testing.T) *stateless.Tracker {
func testStatelessPinTracker(t testing.TB) *stateless.Tracker {
cfg := &stateless.Config{}
cfg.Default()
spt := stateless.New(cfg, test.TestPeerID1)
@ -266,6 +266,42 @@ func TestPinTracker_Track(t *testing.T) {
}
}
func BenchmarkPinTracker_Track(b *testing.B) {
type args struct {
c api.Pin
tracker ipfscluster.PinTracker
}
tests := []struct {
name string
args args
}{
{
"basic stateless track",
args{
api.PinCid(test.MustDecodeCid(test.TestCid1)),
testStatelessPinTracker(b),
},
},
{
"basic map track",
args{
api.PinCid(test.MustDecodeCid(test.TestCid1)),
testMapPinTracker(b),
},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := tt.args.tracker.Track(tt.args.c); err != nil {
b.Errorf("PinTracker.Track() error = %v", err)
}
}
})
}
}
func TestPinTracker_Untrack(t *testing.T) {
type args struct {
c *cid.Cid
@ -423,6 +459,37 @@ func TestPinTracker_StatusAll(t *testing.T) {
}
}
func BenchmarkPinTracker_StatusAll(b *testing.B) {
type args struct {
tracker ipfscluster.PinTracker
}
tests := []struct {
name string
args args
}{
{
"basic stateless track",
args{
testStatelessPinTracker(b),
},
},
{
"basic map track",
args{
testMapPinTracker(b),
},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
tt.args.tracker.StatusAll()
}
})
}
}
func TestPinTracker_Status(t *testing.T) {
type args struct {
c *cid.Cid

View File

@ -66,24 +66,18 @@ func New(cfg *Config, pid peer.ID) *Tracker {
// Used for both pinning and unpinning
func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan *optracker.Operation) {
logger.Debug("entering opworker")
ticker := time.NewTicker(1 * time.Second) //TODO(ajl): make config var
ticker := time.NewTicker(10 * time.Second) //TODO(ajl): make config var
for {
select {
case <-ticker.C:
// every tick, clear out all Done operations
doneOps := spt.optracker.FilterOps(optracker.PhaseDone)
for _, doneOp := range doneOps {
spt.optracker.Clean(doneOp)
}
spt.optracker.CleanAllDone()
case op := <-opChan:
if cont := applyPinF(pinF, op); cont {
continue
}
switch op.Type() {
case optracker.OperationUnpin, optracker.OperationRemote:
spt.optracker.Clean(op)
}
spt.optracker.Clean(op)
case <-spt.ctx.Done():
return
}
@ -277,7 +271,7 @@ func (spt *Tracker) Status(c *cid.Cid) api.PinInfo {
&gpin,
)
if err != nil {
if _, ok := err.(*api.CidNotInGlobalStateError); ok {
if !rpc.IsRPCError(err) {
return api.PinInfo{}
}
logger.Error(err)
@ -327,15 +321,15 @@ func (spt *Tracker) SyncAll() ([]api.PinInfo, error) {
return nil, err
}
for _, p := range spt.optracker.FilterOps(optracker.OperationPin, optracker.PhaseError) {
if _, ok := localpis[p.Cid().String()]; ok {
spt.optracker.Clean(p)
for _, p := range spt.optracker.Filter(optracker.OperationPin, optracker.PhaseError) {
if _, ok := localpis[p.Cid.String()]; ok {
spt.optracker.CleanError(p.Cid)
}
}
for _, p := range spt.optracker.FilterOps(optracker.OperationUnpin, optracker.PhaseError) {
if _, ok := localpis[p.Cid().String()]; ok {
spt.optracker.Clean(p)
for _, p := range spt.optracker.Filter(optracker.OperationUnpin, optracker.PhaseError) {
if _, ok := localpis[p.Cid.String()]; !ok {
spt.optracker.CleanError(p.Cid)
}
}
@ -344,7 +338,63 @@ func (spt *Tracker) SyncAll() ([]api.PinInfo, error) {
// Sync returns the updated local status for the given Cid.
func (spt *Tracker) Sync(c *cid.Cid) (api.PinInfo, error) {
return spt.Status(c), nil
oppi, ok := spt.optracker.GetExists(c)
if !ok {
return spt.Status(c), nil
}
if oppi.Status == api.TrackerStatusUnpinError {
// check global state to see if cluster should even be caring about
// the provided cid
var gpin api.PinSerial
err := spt.rpcClient.Call(
"",
"Cluster",
"PinGet",
api.PinCid(c).ToSerial(),
&gpin,
)
if err != nil {
// if not rpc error means it isn't in the global state
if !rpc.IsRPCError(err) {
spt.optracker.CleanError(c)
return api.PinInfo{}, nil
}
logger.Error(err)
return api.PinInfo{}, err
}
// check if pin is a remote pin
if gpin.ToPin().IsRemotePin(spt.peerID) {
return api.PinInfo{}, nil
}
}
if oppi.Status == api.TrackerStatusPinError {
// else attempt to get status from ipfs node
var ips api.IPFSPinStatus
err := spt.rpcClient.Call(
"",
"Cluster",
"IPFSPinLsCid",
api.PinCid(c).ToSerial(),
&ips,
)
if err != nil {
logger.Error(err)
return api.PinInfo{}, err
}
if ips.ToTrackerStatus() == api.TrackerStatusPinned {
spt.optracker.CleanError(c)
pi := api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
}
return pi, nil
}
}
return spt.optracker.Get(c), nil
}
// RecoverAll attempts to recover all items tracked by this peer.
@ -366,7 +416,7 @@ func (spt *Tracker) RecoverAll() ([]api.PinInfo, error) {
// only when it is done.
func (spt *Tracker) Recover(c *cid.Cid) (api.PinInfo, error) {
logger.Infof("Attempting to recover %s", c)
pInfo, ok := spt.getError(c)
pInfo, ok := spt.optracker.GetExists(c)
if !ok {
return spt.Status(c), nil
}
@ -382,12 +432,7 @@ func (spt *Tracker) Recover(c *cid.Cid) (api.PinInfo, error) {
return spt.Status(c), err
}
pi, ok := spt.optracker.GetExists(c)
if !ok {
return spt.Status(c), nil
}
return pi, nil
return spt.Status(c), nil
}
func (spt *Tracker) ipfsStatusAll() (map[string]api.PinInfo, error) {
@ -414,6 +459,7 @@ func (spt *Tracker) ipfsStatusAll() (map[string]api.PinInfo, error) {
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
}
pins[cidstr] = p
}
@ -427,21 +473,21 @@ func (spt *Tracker) localStatus(incRemote bool) (map[string]api.PinInfo, error)
pininfos := make(map[string]api.PinInfo)
// get shared state
var csps []api.PinSerial
var statePinsSerial []api.PinSerial
err := spt.rpcClient.Call(
"",
"Cluster",
"Pins",
struct{}{},
&csps,
&statePinsSerial,
)
if err != nil {
logger.Error(err)
return nil, err
}
var cspis []api.Pin
for _, p := range csps {
cspis = append(cspis, p.ToPin())
var statePins []api.Pin
for _, p := range statePinsSerial {
statePins = append(statePins, p.ToPin())
}
// get statuses from ipfs node first
@ -451,10 +497,11 @@ func (spt *Tracker) localStatus(incRemote bool) (map[string]api.PinInfo, error)
return nil, err
}
for _, p := range cspis {
for _, p := range statePins {
pCid := p.Cid.String()
if p.IsRemotePin(spt.peerID) && incRemote {
// add pin to pininfos with a status of remote
pininfos[p.Cid.String()] = api.PinInfo{
pininfos[pCid] = api.PinInfo{
Cid: p.Cid,
Peer: spt.peerID,
Status: api.TrackerStatusRemote,
@ -462,28 +509,19 @@ func (spt *Tracker) localStatus(incRemote bool) (map[string]api.PinInfo, error)
continue
}
// lookup p in localpis
if lp, ok := localpis[p.Cid.String()]; ok {
pininfos[p.Cid.String()] = lp
if lp, ok := localpis[pCid]; ok {
pininfos[pCid] = lp
}
}
return pininfos, nil
}
func (spt *Tracker) getError(c *cid.Cid) (api.PinInfo, bool) {
return spt.optracker.FilterGet(c, optracker.PhaseError)
}
func (spt *Tracker) getErrorsAll() []api.PinInfo {
return spt.optracker.Filter(optracker.PhaseError)
}
func (spt *Tracker) removeError(c *cid.Cid) {
spt.optracker.CleanError(c)
return
}
// OpContext exports the internal optracker's OpContext method.
// Primarily for testing purposes.
// For testing purposes only.
func (spt *Tracker) OpContext(c *cid.Cid) context.Context {
return spt.optracker.OpContext(c)
}

View File

@ -3,14 +3,15 @@ package stateless
import (
"context"
"errors"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
"sort"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)
var (
@ -56,6 +57,47 @@ func (mock *mockService) IPFSUnpin(ctx context.Context, in api.PinSerial, out *s
return nil
}
func (mock *mockService) IPFSPinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
m := map[string]api.IPFSPinStatus{
test.TestCid1: api.IPFSPinStatusRecursive,
}
*out = m
return nil
}
func (mock *mockService) IPFSPinLsCid(ctx context.Context, in api.PinSerial, out *api.IPFSPinStatus) error {
switch in.Cid {
case test.TestCid1, test.TestCid2:
*out = api.IPFSPinStatusRecursive
default:
*out = api.IPFSPinStatusUnpinned
}
return nil
}
func (mock *mockService) Pins(ctx context.Context, in struct{}, out *[]api.PinSerial) error {
*out = []api.PinSerial{
{Cid: test.TestCid1, ReplicationFactorMax: -1},
{Cid: test.TestCid3, ReplicationFactorMax: -1},
}
return nil
}
func (mock *mockService) PinGet(ctx context.Context, in api.PinSerial, out *api.PinSerial) error {
switch in.Cid {
case test.ErrorCid:
return errors.New("expected error when using ErrorCid")
case test.TestCid1:
*out = api.Pin{Cid: test.MustDecodeCid(in.Cid), ReplicationFactorMax: -1}.ToSerial()
return nil
case test.TestCid2:
*out = api.Pin{Cid: test.MustDecodeCid(in.Cid), ReplicationFactorMax: -1}.ToSerial()
return nil
}
*out = in
return nil
}
func testSlowStatelessPinTracker(t *testing.T) *Tracker {
cfg := &Config{}
cfg.Default()
@ -64,7 +106,7 @@ func testSlowStatelessPinTracker(t *testing.T) *Tracker {
return mpt
}
func testStatelessPinTracker(t *testing.T) *Tracker {
func testStatelessPinTracker(t testing.TB) *Tracker {
cfg := &Config{}
cfg.Default()
spt := New(cfg, test.TestPeerID1)
@ -335,3 +377,113 @@ func TestUntrackTrackWithNoCancel(t *testing.T) {
t.Error("c should be queued to unpin")
}
}
var sortPinInfoByCid = func(p []api.PinInfo) {
sort.Slice(p, func(i, j int) bool {
return p[i].Cid.String() < p[j].Cid.String()
})
}
func TestStatelessTracker_SyncAll(t *testing.T) {
type args struct {
cs []*cid.Cid
tracker *Tracker
}
tests := []struct {
name string
args args
want []api.PinInfo
wantErr bool
}{
{
"basic stateless syncall",
args{
[]*cid.Cid{
test.MustDecodeCid(test.TestCid1),
test.MustDecodeCid(test.TestCid2),
},
testStatelessPinTracker(t),
},
[]api.PinInfo{
api.PinInfo{
Cid: test.MustDecodeCid(test.TestCid1),
Status: api.TrackerStatusPinned,
},
api.PinInfo{
Cid: test.MustDecodeCid(test.TestCid2),
Status: api.TrackerStatusPinned,
},
},
false,
},
{
"slow stateless syncall",
args{
[]*cid.Cid{
test.MustDecodeCid(test.TestCid1),
test.MustDecodeCid(test.TestCid2),
},
testSlowStatelessPinTracker(t),
},
[]api.PinInfo{
api.PinInfo{
Cid: test.MustDecodeCid(test.TestCid1),
Status: api.TrackerStatusPinned,
},
api.PinInfo{
Cid: test.MustDecodeCid(test.TestCid2),
Status: api.TrackerStatusPinned,
},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.args.tracker.SyncAll()
if (err != nil) != tt.wantErr {
t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr)
return
}
if len(got) != 0 {
t.Fatalf("should not have synced anything when it tracks nothing")
}
for _, c := range tt.args.cs {
err := tt.args.tracker.Track(api.Pin{Cid: c, ReplicationFactorMax: -1})
if err != nil {
t.Fatal(err)
}
tt.args.tracker.optracker.SetError(c, errors.New("test error"))
}
got, err = tt.args.tracker.SyncAll()
if (err != nil) != tt.wantErr {
t.Errorf("PinTracker.SyncAll() error = %v, wantErr %v", err, tt.wantErr)
return
}
sortPinInfoByCid(got)
sortPinInfoByCid(tt.want)
for i := range got {
if got[i].Cid.String() != tt.want[i].Cid.String() {
t.Errorf("got: %v\n want %v", got[i].Cid.String(), tt.want[i].Cid.String())
}
if got[i].Status != tt.want[i].Status {
t.Errorf("got: %v\n want %v", got[i].Status, tt.want[i].Status)
}
}
})
}
}
func BenchmarkTracker_localStatus(b *testing.B) {
tracker := testStatelessPinTracker(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
tracker.localStatus(true)
}
}

View File

@ -22,13 +22,13 @@ type mockService struct{}
// NewMockRPCClient creates a mock ipfs-cluster RPC server and returns
// a client to it.
func NewMockRPCClient(t *testing.T) *rpc.Client {
func NewMockRPCClient(t testing.TB) *rpc.Client {
return NewMockRPCClientWithHost(t, nil)
}
// NewMockRPCClientWithHost returns a mock ipfs-cluster RPC server
// initialized with a given host.
func NewMockRPCClientWithHost(t *testing.T, h host.Host) *rpc.Client {
func NewMockRPCClientWithHost(t testing.TB, h host.Host) *rpc.Client {
s := rpc.NewServer(h, "mock")
c := rpc.NewClientWithServer(h, "mock", s)
err := s.RegisterName("Cluster", &mockService{})