Align stateless tracker with sharding

Also: Fixes #500

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-08-14 20:50:43 +02:00
parent 10fa7a13b5
commit 949abb25f0
3 changed files with 84 additions and 17 deletions

View File

@ -275,11 +275,17 @@ func (mpt *MapPinTracker) SyncAll() ([]api.PinInfo, error) {
&ipsMap,
)
if err != nil {
// set everything as error
// set pinning or unpinning ops to error, since we can't
// verify them
pInfos := mpt.optracker.GetAll()
for _, pInfo := range pInfos {
mpt.optracker.SetError(pInfo.Cid, err)
results = append(results, mpt.optracker.Get(pInfo.Cid))
op, _ := optracker.TrackerStatusToOperationPhase(pInfo.Status)
if op == optracker.OperationPin || op == optracker.OperationUnpin {
mpt.optracker.SetError(pInfo.Cid, err)
results = append(results, mpt.optracker.Get(pInfo.Cid))
} else {
results = append(results, pInfo)
}
}
return results, nil
}

View File

@ -222,6 +222,8 @@ func TrackerStatusToOperationPhase(status api.TrackerStatus) (OperationType, Pha
return OperationUnpin, PhaseDone
case api.TrackerStatusRemote:
return OperationRemote, PhaseDone
case api.TrackerStatusSharded:
return OperationShard, PhaseDone
default:
return OperationUnknown, PhaseError
}

View File

@ -202,6 +202,14 @@ func (spt *Tracker) Shutdown() error {
func (spt *Tracker) Track(c api.Pin) error {
logger.Debugf("tracking %s", c.Cid)
// Sharded pins are never pinned. A sharded pin cannot turn into
// something else or viceversa like it happens with Remote pins so
// we just track them.
if c.Type == api.MetaType {
spt.optracker.TrackNewOperation(c, optracker.OperationShard, optracker.PhaseDone)
return nil
}
// Trigger unpin whenever something remote is tracked
// Note, IPFSConn checks with pin/ls before triggering
// pin/rm.
@ -262,19 +270,20 @@ func (spt *Tracker) Status(c *cid.Cid) api.PinInfo {
// check global state to see if cluster should even be caring about
// the provided cid
var gpin api.PinSerial
var gpinS api.PinSerial
err := spt.rpcClient.Call(
"",
"Cluster",
"PinGet",
api.PinCid(c).ToSerial(),
&gpin,
&gpinS,
)
if err != nil {
if rpc.IsRPCError(err) {
logger.Error(err)
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusClusterError,
Error: err.Error(),
TS: time.Now(),
@ -283,13 +292,26 @@ func (spt *Tracker) Status(c *cid.Cid) api.PinInfo {
// not part of global state. we should not care about
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
}
}
gpin := gpinS.ToPin()
// check if pin is a meta pin
if gpin.Type == api.MetaType {
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusSharded,
TS: time.Now(),
}
}
// check if pin is a remote pin
if gpin.ToPin().IsRemotePin(spt.peerID) {
if gpin.IsRemotePin(spt.peerID) {
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
@ -316,6 +338,7 @@ func (spt *Tracker) Status(c *cid.Cid) api.PinInfo {
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
}
return pi
@ -371,17 +394,34 @@ func (spt *Tracker) Sync(c *cid.Cid) (api.PinInfo, error) {
&gpin,
)
if err != nil {
// if not rpc error means it isn't in the global state
if !rpc.IsRPCError(err) {
spt.optracker.CleanError(c)
return api.PinInfo{}, nil
if rpc.IsRPCError(err) {
logger.Error(err)
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusClusterError,
Error: err.Error(),
TS: time.Now(),
}, err
}
logger.Error(err)
return api.PinInfo{}, err
// it isn't in the global state
spt.optracker.CleanError(c)
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
}, nil
}
// check if pin is a remote pin
if gpin.ToPin().IsRemotePin(spt.peerID) {
return api.PinInfo{}, nil
spt.optracker.CleanError(c)
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusRemote,
TS: time.Now(),
}, nil
}
}
@ -397,7 +437,13 @@ func (spt *Tracker) Sync(c *cid.Cid) (api.PinInfo, error) {
)
if err != nil {
logger.Error(err)
return api.PinInfo{}, err
return api.PinInfo{
Cid: c,
Peer: spt.peerID,
Status: api.TrackerStatusPinError,
TS: time.Now(),
Error: err.Error(),
}, err
}
if ips.ToTrackerStatus() == api.TrackerStatusPinned {
spt.optracker.CleanError(c)
@ -405,6 +451,7 @@ func (spt *Tracker) Sync(c *cid.Cid) (api.PinInfo, error) {
Cid: c,
Peer: spt.peerID,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
}
return pi, nil
}
@ -483,9 +530,9 @@ func (spt *Tracker) ipfsStatusAll() (map[string]api.PinInfo, error) {
}
// localStatus returns a joint set of consensusState and ipfsStatus
// marking pins which should be remote and leaving any ipfs pins that
// marking pins which should be meta or remote and leaving any ipfs pins that
// aren't in the consensusState out.
func (spt *Tracker) localStatus(incRemote bool) (map[string]api.PinInfo, error) {
func (spt *Tracker) localStatus(incExtra bool) (map[string]api.PinInfo, error) {
pininfos := make(map[string]api.PinInfo)
// get shared state
@ -515,12 +562,24 @@ func (spt *Tracker) localStatus(incRemote bool) (map[string]api.PinInfo, error)
for _, p := range statePins {
pCid := p.Cid.String()
if p.IsRemotePin(spt.peerID) && incRemote {
if p.Type == api.MetaType && incExtra {
// add pin to pininfos with sharded status
pininfos[pCid] = api.PinInfo{
Cid: p.Cid,
Peer: spt.peerID,
Status: api.TrackerStatusSharded,
TS: time.Now(),
}
continue
}
if p.IsRemotePin(spt.peerID) && incExtra {
// add pin to pininfos with a status of remote
pininfos[pCid] = api.PinInfo{
Cid: p.Cid,
Peer: spt.peerID,
Status: api.TrackerStatusRemote,
TS: time.Now(),
}
continue
}