ipfs-cluster/map_pin_tracker.go
Hector Sanjuan fa0a0d2caf Missing return
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2016-12-19 19:58:01 +01:00

446 lines
8.6 KiB
Go

package ipfscluster
import (
"context"
"errors"
"fmt"
"sync"
"time"
peer "github.com/libp2p/go-libp2p-peer"
cid "github.com/ipfs/go-cid"
)
// A Pin or Unpin operation will be considered failed
// if the Cid has stayed in Pinning or Unpinning state
// for longer than these values.
var (
PinningTimeout = 15 * time.Minute
UnpinningTimeout = 10 * time.Second
)
const (
PinError = iota
UnpinError
Pinned
Pinning
Unpinning
Unpinned
RemotePin
)
type GlobalPinInfo struct {
Cid *cid.Cid
Status map[peer.ID]PinInfo
}
// PinInfo holds information about local pins. PinInfo is
// serialized when requesting the Global status, therefore
// we cannot use *cid.Cid.
type PinInfo struct {
CidStr string
Peer peer.ID
IPFS IPFSStatus
TS time.Time
}
type IPFSStatus int
func (st IPFSStatus) String() string {
switch st {
case PinError:
return "pin_error"
case UnpinError:
return "unpin_error"
case Pinned:
return "pinned"
case Pinning:
return "pinning"
case Unpinning:
return "unpinning"
case Unpinned:
return "unpinned"
}
return ""
}
type MapPinTracker struct {
mux sync.Mutex
status map[string]PinInfo
ctx context.Context
rpcCh chan RPC
peerID peer.ID
shutdownLock sync.Mutex
shutdown bool
shutdownCh chan struct{}
wg sync.WaitGroup
}
func NewMapPinTracker(cfg *Config) *MapPinTracker {
ctx := context.Background()
pID, err := peer.IDB58Decode(cfg.ID)
if err != nil {
panic(err)
}
mpt := &MapPinTracker{
ctx: ctx,
status: make(map[string]PinInfo),
rpcCh: make(chan RPC, RPCMaxQueue),
peerID: pID,
shutdownCh: make(chan struct{}),
}
logger.Info("starting MapPinTracker")
mpt.run()
return mpt
}
func (mpt *MapPinTracker) run() {
mpt.wg.Add(1)
go func() {
defer mpt.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mpt.ctx = ctx
<-mpt.shutdownCh
}()
}
func (mpt *MapPinTracker) Shutdown() error {
mpt.shutdownLock.Lock()
defer mpt.shutdownLock.Unlock()
if mpt.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("stopping MapPinTracker")
mpt.shutdownCh <- struct{}{}
mpt.wg.Wait()
mpt.shutdown = true
return nil
}
func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s IPFSStatus) {
if s == Unpinned {
delete(mpt.status, c.String())
return
}
mpt.status[c.String()] = PinInfo{
// cid: c,
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: s,
TS: time.Now(),
}
}
func (mpt *MapPinTracker) set(c *cid.Cid, s IPFSStatus) {
mpt.mux.Lock()
defer mpt.mux.Unlock()
mpt.unsafeSet(c, s)
}
func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
p, ok := mpt.status[c.String()]
if !ok {
return PinInfo{
// cid: c,
CidStr: c.String(),
Peer: mpt.peerID,
IPFS: Unpinned,
TS: time.Now(),
}
}
return p
}
func (mpt *MapPinTracker) pin(c *cid.Cid) error {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
mpt.set(c, Pinning)
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSPinRPC, c), true)
if resp.Error != nil {
mpt.set(c, PinError)
return resp.Error
}
mpt.set(c, Pinned)
return nil
}
func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
mpt.set(c, Unpinning)
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), true)
if resp.Error != nil {
mpt.set(c, PinError)
return resp.Error
}
mpt.set(c, Unpinned)
return nil
}
func (mpt *MapPinTracker) Track(c *cid.Cid) error {
return mpt.pin(c)
}
func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
return mpt.unpin(c)
}
func (mpt *MapPinTracker) LocalStatusCid(c *cid.Cid) PinInfo {
return mpt.get(c)
}
func (mpt *MapPinTracker) LocalStatus() []PinInfo {
mpt.mux.Lock()
defer mpt.mux.Unlock()
pins := make([]PinInfo, 0, len(mpt.status))
for _, v := range mpt.status {
pins = append(pins, v)
}
return pins
}
func (mpt *MapPinTracker) GlobalStatus() ([]GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
rpc := NewRPC(TrackerLocalStatusRPC, nil)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true)
if resp.Error != nil {
return nil, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return nil, errors.New("unexpected responses format")
}
fullMap := make(map[string]GlobalPinInfo)
mergePins := func(pins []PinInfo) {
for _, p := range pins {
item, ok := fullMap[p.CidStr]
c, _ := cid.Decode(p.CidStr)
if !ok {
fullMap[p.CidStr] = GlobalPinInfo{
Cid: c,
Status: map[peer.ID]PinInfo{
p.Peer: p,
},
}
} else {
item.Status[p.Peer] = p
}
}
}
for _, r := range responses {
if r.Error != nil {
logger.Error("error in one of the broadcast responses: ", r.Error)
continue
}
pins, ok := r.Data.([]PinInfo)
if !ok {
return nil, fmt.Errorf("unexpected response format: %+v", r.Data)
}
mergePins(pins)
}
status := make([]GlobalPinInfo, 0, len(fullMap))
for _, v := range fullMap {
status = append(status, v)
}
return status, nil
}
func (mpt *MapPinTracker) GlobalStatusCid(c *cid.Cid) (GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
pin := GlobalPinInfo{
Cid: c,
Status: make(map[peer.ID]PinInfo),
}
rpc := NewRPC(TrackerLocalStatusCidRPC, c)
wrpc := NewRPC(BroadcastRPC, rpc)
resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true)
if resp.Error != nil {
return pin, resp.Error
}
responses, ok := resp.Data.([]RPCResponse)
if !ok {
return pin, errors.New("unexpected responses format")
}
for _, r := range responses {
if r.Error != nil {
return pin, r.Error
}
info, ok := r.Data.(PinInfo)
if !ok {
return pin, errors.New("unexpected response format")
}
pin.Status[info.Peer] = info
}
return pin, nil
}
func (mpt *MapPinTracker) Sync(c *cid.Cid) bool {
ctx, cancel := context.WithCancel(mpt.ctx)
defer cancel()
p := mpt.get(c)
// We assume errors will need a Recover() so we return true
if p.IPFS == PinError || p.IPFS == UnpinError {
return true
}
resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSIsPinnedRPC, c), true)
if resp.Error != nil {
if p.IPFS == Pinned || p.IPFS == Pinning {
mpt.set(c, PinError)
return true
}
if p.IPFS == Unpinned || p.IPFS == Unpinning {
mpt.set(c, UnpinError)
return true
}
return false
}
ipfsPinned, ok := resp.Data.(bool)
if !ok {
logger.Error("wrong type of IPFSIsPinnedRPC response")
return false
}
if ipfsPinned {
switch p.IPFS {
case Pinned:
return false
case Pinning:
mpt.set(c, Pinned)
return true
case Unpinning:
if time.Since(p.TS) > UnpinningTimeout {
mpt.set(c, UnpinError)
return true
}
return false
case Unpinned:
mpt.set(c, UnpinError)
return true
}
} else {
switch p.IPFS {
case Pinned:
mpt.set(c, PinError)
return true
case Pinning:
if time.Since(p.TS) > PinningTimeout {
mpt.set(c, PinError)
return true
}
return false
case Unpinning:
mpt.set(c, Unpinned)
return true
case Unpinned:
return false
}
}
return false
}
func (mpt *MapPinTracker) Recover(c *cid.Cid) error {
p := mpt.get(c)
if p.IPFS != PinError && p.IPFS != UnpinError {
return nil
}
if p.IPFS == PinError {
mpt.pin(c)
}
if p.IPFS == UnpinError {
mpt.unpin(c)
}
return nil
}
func (mpt *MapPinTracker) SyncAll() []PinInfo {
var changedPins []PinInfo
pins := mpt.LocalStatus()
for _, p := range pins {
c, _ := cid.Decode(p.CidStr)
changed := mpt.Sync(c)
if changed {
changedPins = append(changedPins, p)
}
}
return changedPins
}
func (mpt *MapPinTracker) SyncState(cState State) []*cid.Cid {
clusterPins := cState.ListPins()
clusterMap := make(map[string]struct{})
// Make a map for faster lookup
for _, c := range clusterPins {
var a struct{}
clusterMap[c.String()] = a
}
var toRemove []*cid.Cid
var toAdd []*cid.Cid
var changed []*cid.Cid
mpt.mux.Lock()
// Collect items in the State not in the tracker
for _, c := range clusterPins {
_, ok := mpt.status[c.String()]
if !ok {
toAdd = append(toAdd, c)
}
}
// Collect items in the tracker not in the State
for _, p := range mpt.status {
c, _ := cid.Decode(p.CidStr)
_, ok := clusterMap[p.CidStr]
if !ok {
toRemove = append(toRemove, c)
}
}
// Update new items and mark them as pinning error
for _, c := range toAdd {
mpt.unsafeSet(c, PinError)
changed = append(changed, c)
}
// Mark items that need to be removed as unpin error
for _, c := range toRemove {
mpt.unsafeSet(c, UnpinError)
changed = append(changed, c)
}
mpt.mux.Unlock()
return changed
}
func (mpt *MapPinTracker) RpcChan() <-chan RPC {
return mpt.rpcCh
}