stateless: reduce num of methods

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
Adrian Lanzafame 2018-06-11 11:57:42 +10:00 committed by Hector Sanjuan
parent bec77546ce
commit 33f56e8867
3 changed files with 68 additions and 70 deletions

View File

@ -83,7 +83,7 @@ func init() {
rand.Seed(time.Now().UnixNano())
if len(customLogLvlFacilities) < 0 {
if len(customLogLvlFacilities) <= 0 {
for f := range LoggingFacilities {
SetFacilityLogLevel(f, logLevel)
}

View File

@ -193,11 +193,22 @@ func (opt *OperationTracker) GetAll() []api.PinInfo {
return pinfos
}
// GetOp returns the operations associated with the provided cid.
func (opt *OperationTracker) GetOp(c *cid.Cid) *Operation {
// CleanError removes the associated Operation, if it is
// in PhaseError.
func (opt *OperationTracker) CleanError(c *cid.Cid) {
opt.mu.RLock()
defer opt.mu.RUnlock()
return opt.operations[c.String()]
errop, ok := opt.operations[c.String()]
if !ok {
return
}
if errop.Phase() != PhaseError {
return
}
opt.Clean(errop)
return
}
// OpContext gets the context of an operation, if any.

View File

@ -75,30 +75,10 @@ func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan
for _, doneOp := range doneOps {
spt.optracker.Clean(doneOp)
}
// remoteOps := spt.optracker.Filter(optracker.OperationRemote)
// for _, remoteOp := range remoteOps {
// spt.optracker.Clean(remoteOp)
// }
case op := <-opChan:
if op.Cancelled() {
// operation was cancelled. Move on.
// This saves some time, but not 100% needed.
if cont := applyPinF(pinF, op); cont {
continue
}
op.SetPhase(optracker.PhaseInProgress)
err := pinF(op) // call pin/unpin
if err != nil {
if op.Cancelled() {
// there was an error because
// we were cancelled. Move on.
continue
}
op.SetError(err)
op.Cancel()
continue
}
op.SetPhase(optracker.PhaseDone)
op.Cancel()
switch op.Type() {
case optracker.OperationUnpin, optracker.OperationRemote:
@ -110,6 +90,30 @@ func (spt *Tracker) opWorker(pinF func(*optracker.Operation) error, opChan chan
}
}
// applyPinF returns true if caller should call `continue` inside calling loop.
func applyPinF(pinF func(*optracker.Operation) error, op *optracker.Operation) bool {
if op.Cancelled() {
// operation was cancelled. Move on.
// This saves some time, but not 100% needed.
return true
}
op.SetPhase(optracker.PhaseInProgress)
err := pinF(op) // call pin/unpin
if err != nil {
if op.Cancelled() {
// there was an error because
// we were cancelled. Move on.
return true
}
op.SetError(err)
op.Cancel()
return true
}
op.SetPhase(optracker.PhaseDone)
op.Cancel()
return false
}
func (spt *Tracker) pin(op *optracker.Operation) error {
logger.Debugf("issuing pin call for %s", op.Cid())
err := spt.rpcClient.CallContext(
@ -139,7 +143,6 @@ func (spt *Tracker) unpin(op *optracker.Operation) error {
if err != nil {
return err
}
return nil
}
@ -286,9 +289,23 @@ func (spt *Tracker) Status(c *cid.Cid) api.PinInfo {
}
// else attempt to get status from ipfs node
pi, err := spt.ipfsStatus(c)
var ips api.IPFSPinStatus
err = spt.rpcClient.Call(
"",
"Cluster",
"IPFSPinLsCid",
api.PinCid(c).ToSerial(),
&ips,
)
if err != nil {
logger.Error(err)
return api.PinInfo{}
}
pi := api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
}
return pi
@ -373,26 +390,6 @@ func (spt *Tracker) Recover(c *cid.Cid) (api.PinInfo, error) {
return pi, nil
}
func (spt *Tracker) ipfsStatus(c *cid.Cid) (api.PinInfo, error) {
var ips api.IPFSPinStatus
err := spt.rpcClient.Call(
"",
"Cluster",
"IPFSPinLsCid",
api.PinCid(c).ToSerial(),
&ips,
)
if err != nil {
return api.PinInfo{}, err
}
pi := api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
}
return pi, nil
}
func (spt *Tracker) ipfsStatusAll() (map[string]api.PinInfo, error) {
var ipsMap map[string]api.IPFSPinStatus
err := spt.rpcClient.Call(
@ -423,27 +420,6 @@ func (spt *Tracker) ipfsStatusAll() (map[string]api.PinInfo, error) {
return pins, nil
}
func (spt *Tracker) consensusState() ([]api.Pin, error) {
var ps []api.PinSerial
err := spt.rpcClient.Call(
"",
"Cluster",
"Pins",
struct{}{},
&ps,
)
if err != nil {
logger.Error(err)
return nil, err
}
var pins []api.Pin
for _, p := range ps {
pins = append(pins, p.ToPin())
}
return pins, nil
}
// localStatus returns a joint set of consensusState and ipfsStatus
// marking pins which should be remote and leaving any ipfs pins that
// aren't in the consensusState out.
@ -451,11 +427,22 @@ func (spt *Tracker) localStatus(incRemote bool) (map[string]api.PinInfo, error)
pininfos := make(map[string]api.PinInfo)
// get shared state
cspis, err := spt.consensusState()
var csps []api.PinSerial
err := spt.rpcClient.Call(
"",
"Cluster",
"Pins",
struct{}{},
&csps,
)
if err != nil {
logger.Error(err)
return nil, err
}
var cspis []api.Pin
for _, p := range csps {
cspis = append(cspis, p.ToPin())
}
// get statuses from ipfs node first
localpis, err := spt.ipfsStatusAll()
@ -491,7 +478,7 @@ func (spt *Tracker) getErrorsAll() []api.PinInfo {
}
func (spt *Tracker) removeError(c *cid.Cid) {
spt.optracker.Clean(spt.optracker.GetOp(c))
spt.optracker.CleanError(c)
return
}