ipfs-cluster/pintracker/maptracker/maptracker.go
Hector Sanjuan 73aabfa8ec Fix #408: Race condition when StateSync runs
When StateSync() runs and triggers Untrack() on items
that have just been removed from the state but on which
Untrack() is underway, the operation tracker would be
reset to phase queued and in some cases stay so.

Also happened for Track()

This caused failures of TestClustersPin as SyncStatus()
is triggered regularly while Tracks() and Untracks() happen.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
2018-05-12 10:49:45 +02:00

524 lines
13 KiB
Go

// Package maptracker implements a PinTracker component for IPFS Cluster. It
// uses a map to keep track of the state of tracked pins.
package maptracker
import (
"context"
"errors"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("pintracker")
var (
errUnpinningTimeout = errors.New("unpinning operation is taking too long")
errPinningTimeout = errors.New("pinning operation is taking too long")
errPinned = errors.New("the item is unexpectedly pinned on IPFS")
errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS")
)
// MapPinTracker is a PinTracker implementation which uses a Go map
// to store the status of the tracked Cids. This component is thread-safe.
type MapPinTracker struct {
mux sync.RWMutex
status map[string]api.PinInfo
config *Config
optracker *operationTracker
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
peerID peer.ID
pinCh chan api.Pin
unpinCh chan api.Pin
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// NewMapPinTracker returns a new object which has been correcly
// initialized with the given configuration.
func NewMapPinTracker(cfg *Config, pid peer.ID) *MapPinTracker {
ctx, cancel := context.WithCancel(context.Background())
mpt := &MapPinTracker{
ctx: ctx,
cancel: cancel,
status: make(map[string]api.PinInfo),
config: cfg,
optracker: newOperationTracker(ctx),
rpcReady: make(chan struct{}, 1),
peerID: pid,
pinCh: make(chan api.Pin, cfg.MaxPinQueueSize),
unpinCh: make(chan api.Pin, cfg.MaxPinQueueSize),
}
for i := 0; i < mpt.config.ConcurrentPins; i++ {
go mpt.pinWorker()
}
go mpt.unpinWorker()
return mpt
}
// reads the queue and makes pins to the IPFS daemon one by one
func (mpt *MapPinTracker) pinWorker() {
for {
select {
case p := <-mpt.pinCh:
if opc, ok := mpt.optracker.get(p.Cid); ok && opc.op == operationPin {
mpt.optracker.updateOperationPhase(
p.Cid,
phaseInProgress,
)
mpt.pin(p)
}
case <-mpt.ctx.Done():
return
}
}
}
// reads the queue and makes unpin requests to the IPFS daemon
func (mpt *MapPinTracker) unpinWorker() {
for {
select {
case p := <-mpt.unpinCh:
if opc, ok := mpt.optracker.get(p.Cid); ok && opc.op == operationUnpin {
mpt.optracker.updateOperationPhase(
p.Cid,
phaseInProgress,
)
mpt.unpin(p)
}
case <-mpt.ctx.Done():
return
}
}
}
// Shutdown finishes the services provided by the MapPinTracker and cancels
// any active context.
func (mpt *MapPinTracker) Shutdown() error {
mpt.shutdownLock.Lock()
defer mpt.shutdownLock.Unlock()
if mpt.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("stopping MapPinTracker")
mpt.cancel()
close(mpt.rpcReady)
mpt.wg.Wait()
mpt.shutdown = true
return nil
}
func (mpt *MapPinTracker) set(c *cid.Cid, s api.TrackerStatus) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s api.TrackerStatus) {
if s == api.TrackerStatusUnpinned {
delete(mpt.status, c.String())
return
}
mpt.status[c.String()] = api.PinInfo{
Cid: c,
Peer: mpt.peerID,
Status: s,
TS: time.Now(),
Error: "",
}
}
func (mpt *MapPinTracker) get(c *cid.Cid) api.PinInfo {
mpt.mux.RLock()
defer mpt.mux.RUnlock()
return mpt.unsafeGet(c)
}
func (mpt *MapPinTracker) unsafeGet(c *cid.Cid) api.PinInfo {
p, ok := mpt.status[c.String()]
if !ok {
return api.PinInfo{
Cid: c,
Peer: mpt.peerID,
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
Error: "",
}
}
return p
}
// sets a Cid in error state
func (mpt *MapPinTracker) setError(c *cid.Cid, err error) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSetError(c, err)
}
func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) {
p := mpt.unsafeGet(c)
switch p.Status {
case api.TrackerStatusPinned, api.TrackerStatusPinning, api.TrackerStatusPinError:
mpt.status[c.String()] = api.PinInfo{
Cid: c,
Peer: mpt.peerID,
Status: api.TrackerStatusPinError,
TS: time.Now(),
Error: err.Error(),
}
case api.TrackerStatusUnpinned, api.TrackerStatusUnpinning, api.TrackerStatusUnpinError:
mpt.status[c.String()] = api.PinInfo{
Cid: c,
Peer: mpt.peerID,
Status: api.TrackerStatusUnpinError,
TS: time.Now(),
Error: err.Error(),
}
}
}
func (mpt *MapPinTracker) isRemote(c api.Pin) bool {
if c.ReplicationFactorMax < 0 {
return false
}
if c.ReplicationFactorMax == 0 {
logger.Errorf("Pin with replication factor 0! %+v", c)
}
for _, p := range c.Allocations {
if p == mpt.peerID {
return false
}
}
return true
}
func (mpt *MapPinTracker) pin(c api.Pin) error {
logger.Debugf("issuing pin call for %s", c.Cid)
mpt.set(c.Cid, api.TrackerStatusPinning)
var ctx context.Context
opc, ok := mpt.optracker.get(c.Cid)
if !ok {
logger.Debug("pin operation wasn't being tracked")
ctx = mpt.ctx
} else {
ctx = opc.ctx
}
err := mpt.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSPin",
c.ToSerial(),
&struct{}{},
)
if err != nil {
mpt.setError(c.Cid, err)
return err
}
mpt.set(c.Cid, api.TrackerStatusPinned)
mpt.optracker.finish(c.Cid)
return nil
}
func (mpt *MapPinTracker) unpin(c api.Pin) error {
logger.Debugf("issuing unpin call for %s", c.Cid)
mpt.set(c.Cid, api.TrackerStatusUnpinning)
var ctx context.Context
opc, ok := mpt.optracker.get(c.Cid)
if !ok {
logger.Debug("pin operation wasn't being tracked")
ctx = mpt.ctx
} else {
ctx = opc.ctx
}
err := mpt.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSUnpin",
c.ToSerial(),
&struct{}{},
)
if err != nil {
mpt.setError(c.Cid, err)
return err
}
mpt.set(c.Cid, api.TrackerStatusUnpinned)
mpt.optracker.finish(c.Cid)
return nil
}
// Track tells the MapPinTracker to start managing a Cid,
// possibly triggering Pin operations on the IPFS daemon.
func (mpt *MapPinTracker) Track(c api.Pin) error {
logger.Debugf("tracking %s", c.Cid)
if opc, ok := mpt.optracker.get(c.Cid); ok {
switch {
case opc.op == operationPin:
return nil // already ongoing
case opc.op == operationUnpin && opc.phase == phaseQueued:
mpt.optracker.finish(c.Cid)
return nil // cancelled while in queue, all done
case opc.op == operationUnpin && opc.phase == phaseInProgress:
mpt.optracker.finish(c.Cid)
// cancelled while unpinning: continue and trigger pin
}
}
if mpt.isRemote(c) {
if mpt.get(c.Cid).Status == api.TrackerStatusPinned {
mpt.optracker.trackNewOperation(
mpt.ctx,
c.Cid,
operationUnpin,
)
mpt.unpin(c)
}
mpt.set(c.Cid, api.TrackerStatusRemote)
return nil
}
mpt.optracker.trackNewOperation(mpt.ctx, c.Cid, operationPin)
mpt.set(c.Cid, api.TrackerStatusPinQueued)
select {
case mpt.pinCh <- c:
default:
err := errors.New("pin queue is full")
mpt.setError(c.Cid, err)
mpt.optracker.finish(c.Cid)
logger.Error(err.Error())
return err
}
return nil
}
// Untrack tells the MapPinTracker to stop managing a Cid.
// If the Cid is pinned locally, it will be unpinned.
func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
logger.Debugf("untracking %s", c)
if opc, ok := mpt.optracker.get(c); ok {
switch {
case opc.op == operationUnpin:
return nil // already ongoing
case opc.op == operationPin && opc.phase == phaseQueued:
mpt.optracker.finish(c)
return nil // cancelled while in queue, all done
case opc.op == operationPin && opc.phase == phaseInProgress:
mpt.optracker.finish(c)
// cancelled while pinning: continue and trigger unpin
}
}
if pinStatus := mpt.get(c); pinStatus.Status == api.TrackerStatusUnpinned {
return nil
}
mpt.optracker.trackNewOperation(mpt.ctx, c, operationUnpin)
mpt.set(c, api.TrackerStatusUnpinQueued)
select {
case mpt.unpinCh <- api.PinCid(c):
default:
err := errors.New("unpin queue is full")
mpt.setError(c, err)
mpt.optracker.finish(c)
logger.Error(err.Error())
return err
}
return nil
}
// Status returns information for a Cid tracked by this
// MapPinTracker.
func (mpt *MapPinTracker) Status(c *cid.Cid) api.PinInfo {
return mpt.get(c)
}
// StatusAll returns information for all Cids tracked by this
// MapPinTracker.
func (mpt *MapPinTracker) StatusAll() []api.PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
pins := make([]api.PinInfo, 0, len(mpt.status))
for _, v := range mpt.status {
pins = append(pins, v)
}
return pins
}
// Sync verifies that the status of a Cid matches that of
// the IPFS daemon. If not, it will be transitioned
// to PinError or UnpinError.
//
// Sync returns the updated local status for the given Cid.
// Pins in error states can be recovered with Recover().
// An error is returned if we are unable to contact
// the IPFS daemon.
func (mpt *MapPinTracker) Sync(c *cid.Cid) (api.PinInfo, error) {
var ips api.IPFSPinStatus
err := mpt.rpcClient.Call(
"",
"Cluster",
"IPFSPinLsCid",
api.PinCid(c).ToSerial(),
&ips,
)
if err != nil {
mpt.setError(c, err)
return mpt.get(c), err
}
return mpt.syncStatus(c, ips), nil
}
// SyncAll verifies that the statuses of all tracked Cids match the
// one reported by the IPFS daemon. If not, they will be transitioned
// to PinError or UnpinError.
//
// SyncAll returns the list of local status for all tracked Cids which
// were updated or have errors. Cids in error states can be recovered
// with Recover().
// An error is returned if we are unable to contact the IPFS daemon.
func (mpt *MapPinTracker) SyncAll() ([]api.PinInfo, error) {
var ipsMap map[string]api.IPFSPinStatus
var pInfos []api.PinInfo
err := mpt.rpcClient.Call(
"",
"Cluster",
"IPFSPinLs",
"recursive",
&ipsMap,
)
if err != nil {
mpt.mux.Lock()
for k := range mpt.status {
c, _ := cid.Decode(k)
mpt.unsafeSetError(c, err)
pInfos = append(pInfos, mpt.unsafeGet(c))
}
mpt.mux.Unlock()
return pInfos, err
}
status := mpt.StatusAll()
for _, pInfoOrig := range status {
var pInfoNew api.PinInfo
c := pInfoOrig.Cid
ips, ok := ipsMap[c.String()]
if !ok {
pInfoNew = mpt.syncStatus(c, api.IPFSPinStatusUnpinned)
} else {
pInfoNew = mpt.syncStatus(c, ips)
}
if pInfoOrig.Status != pInfoNew.Status ||
pInfoNew.Status == api.TrackerStatusUnpinError ||
pInfoNew.Status == api.TrackerStatusPinError {
pInfos = append(pInfos, pInfoNew)
}
}
return pInfos, nil
}
func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinInfo {
p := mpt.get(c)
if ips.IsPinned() {
switch p.Status {
case api.TrackerStatusPinned: // nothing
case api.TrackerStatusPinning, api.TrackerStatusPinError:
mpt.set(c, api.TrackerStatusPinned)
case api.TrackerStatusUnpinning: // nothing
case api.TrackerStatusUnpinned:
mpt.setError(c, errPinned)
case api.TrackerStatusUnpinError: // nothing, keep error as it was
default: //remote
}
} else {
switch p.Status {
case api.TrackerStatusPinned:
mpt.setError(c, errUnpinned)
case api.TrackerStatusPinError: // nothing, keep error as it was
case api.TrackerStatusPinning: // nothing
case api.TrackerStatusUnpinning, api.TrackerStatusUnpinError:
mpt.set(c, api.TrackerStatusUnpinned)
case api.TrackerStatusUnpinned: // nothing
default: // remote
}
}
return mpt.get(c)
}
// Recover will re-track or re-untrack a Cid in error state,
// possibly retriggering an IPFS pinning operation and returning
// only when it is done. The pinning/unpinning operation happens
// synchronously, jumping the queues.
func (mpt *MapPinTracker) Recover(c *cid.Cid) (api.PinInfo, error) {
p := mpt.get(c)
logger.Infof("Attempting to recover %s", c)
var err error
switch p.Status {
case api.TrackerStatusPinError:
// FIXME: This always recovers recursive == true
// but sharding will bring direct-pin objects
err = mpt.pin(api.PinCid(c))
case api.TrackerStatusUnpinError:
err = mpt.unpin(api.PinCid(c))
default:
logger.Warningf("%s does not need recovery. Try syncing first", c)
return p, nil
}
if err != nil {
logger.Errorf("error recovering %s: %s", c, err)
}
return mpt.get(c), err
}
// RecoverAll attempts to recover all items tracked by this peer.
func (mpt *MapPinTracker) RecoverAll() ([]api.PinInfo, error) {
statuses := mpt.StatusAll()
resp := make([]api.PinInfo, 0)
for _, st := range statuses {
r, err := mpt.Recover(st.Cid)
if err != nil {
return resp, err
}
resp = append(resp, r)
}
return resp, nil
}
// SetClient makes the MapPinTracker ready to perform RPC requests to
// other components.
func (mpt *MapPinTracker) SetClient(c *rpc.Client) {
mpt.rpcClient = c
mpt.rpcReady <- struct{}{}
}