ipfs-cluster/api/types.go
Hector Sanjuan d1700dbe81 Fixes #1319: Status wrongly shows pins as REMOTE
The Allocations of a pin that has been added with default replication factor
are kept even when the replication factor turns out to be -1.

This resulted in the Status(cid) code skipping calls to a number of peers
and setting the pin directly as REMOTE.

The fix, on one side makes sure Allocations is always nil when the replication
factor is -1. On the other size, lets the globalPinInfoCid method check the
replication factor value, rather than the number of allocations to decide if
any nodes are bound to be remote.

On the plus side, the pin tracker used the IsRemotePin method, which uses the
replication factor, so things were pinned even if the Status(cid) method shows
them as remote.
2021-03-24 00:47:15 +01:00

1119 lines
32 KiB
Go

// Package api holds declarations for types used in ipfs-cluster APIs to make
// them re-usable across differen tools. This include RPC API "Serial[izable]"
// versions for types. The Go API uses natives types, while RPC API,
// REST APIs etc use serializable types (i.e. json format). Conversion methods
// exists between types.
//
// Note that all conversion methods ignore any parsing errors. All values must
// be validated first before initializing any of the types defined here.
package api
import (
"encoding/json"
"fmt"
"net/url"
"sort"
"strconv"
"strings"
"time"
pb "github.com/ipfs/ipfs-cluster/api/pb"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"
protocol "github.com/libp2p/go-libp2p-core/protocol"
multiaddr "github.com/multiformats/go-multiaddr"
// needed to parse /ws multiaddresses
_ "github.com/libp2p/go-ws-transport"
// needed to parse /dns* multiaddresses
_ "github.com/multiformats/go-multiaddr-dns"
"github.com/pkg/errors"
proto "google.golang.org/protobuf/proto"
)
var logger = logging.Logger("apitypes")
var unixZero = time.Unix(0, 0)
func init() {
// Use /p2p/ multiaddresses
multiaddr.SwapToP2pMultiaddrs()
// intialize trackerStatusString
stringTrackerStatus = make(map[string]TrackerStatus)
for k, v := range trackerStatusString {
stringTrackerStatus[v] = k
}
}
// TrackerStatus values
const (
// IPFSStatus should never take this value.
// When used as a filter. It means "all".
TrackerStatusUndefined TrackerStatus = 0
// The cluster node is offline or not responding
TrackerStatusClusterError TrackerStatus = 1 << iota
// An error occurred pinning
TrackerStatusPinError
// An error occurred unpinning
TrackerStatusUnpinError
// The IPFS daemon has pinned the item
TrackerStatusPinned
// The IPFS daemon is currently pinning the item
TrackerStatusPinning
// The IPFS daemon is currently unpinning the item
TrackerStatusUnpinning
// The IPFS daemon is not pinning the item
TrackerStatusUnpinned
// The IPFS daemon is not pinning the item but it is being tracked
TrackerStatusRemote
// The item has been queued for pinning on the IPFS daemon
TrackerStatusPinQueued
// The item has been queued for unpinning on the IPFS daemon
TrackerStatusUnpinQueued
// The IPFS daemon is not pinning the item through this cid but it is
// tracked in a cluster dag
TrackerStatusSharded
)
// Composite TrackerStatus.
const (
TrackerStatusError = TrackerStatusClusterError | TrackerStatusPinError | TrackerStatusUnpinError
TrackerStatusQueued = TrackerStatusPinQueued | TrackerStatusUnpinQueued
)
// TrackerStatus represents the status of a tracked Cid in the PinTracker
type TrackerStatus int
var trackerStatusString = map[TrackerStatus]string{
TrackerStatusUndefined: "undefined",
TrackerStatusClusterError: "cluster_error",
TrackerStatusPinError: "pin_error",
TrackerStatusUnpinError: "unpin_error",
TrackerStatusError: "error",
TrackerStatusPinned: "pinned",
TrackerStatusPinning: "pinning",
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemote: "remote",
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
TrackerStatusQueued: "queued",
}
// values autofilled in init()
var stringTrackerStatus map[string]TrackerStatus
// String converts a TrackerStatus into a readable string.
// If the given TrackerStatus is a filter (with several
// bits set), it will return a comma-separated list.
func (st TrackerStatus) String() string {
var values []string
// simple and known composite values
if v, ok := trackerStatusString[st]; ok {
return v
}
// other filters
for k, v := range trackerStatusString {
if st&k > 0 {
values = append(values, v)
}
}
return strings.Join(values, ",")
}
// Match returns true if the tracker status matches the given filter.
// For example TrackerStatusPinError will match TrackerStatusPinError
// and TrackerStatusError
func (st TrackerStatus) Match(filter TrackerStatus) bool {
return filter == 0 || st&filter > 0
}
// MarshalJSON uses the string representation of TrackerStatus for JSON
// encoding.
func (st TrackerStatus) MarshalJSON() ([]byte, error) {
return json.Marshal(st.String())
}
// UnmarshalJSON sets a tracker status from its JSON representation.
func (st *TrackerStatus) UnmarshalJSON(data []byte) error {
var v string
err := json.Unmarshal(data, &v)
if err != nil {
return err
}
*st = TrackerStatusFromString(v)
return nil
}
// TrackerStatusFromString parses a string and returns the matching
// TrackerStatus value. The string can be a comma-separated list
// representing a TrackerStatus filter. Unknown status names are
// ignored.
func TrackerStatusFromString(str string) TrackerStatus {
values := strings.Split(strings.Replace(str, " ", "", -1), ",")
var status TrackerStatus
for _, v := range values {
st, ok := stringTrackerStatus[v]
if ok {
status |= st
}
}
return status
}
// TrackerStatusAll all known TrackerStatus values.
func TrackerStatusAll() []TrackerStatus {
var list []TrackerStatus
for k := range trackerStatusString {
if k != TrackerStatusUndefined {
list = append(list, k)
}
}
return list
}
// IPFSPinStatus values
// FIXME include maxdepth
const (
IPFSPinStatusBug IPFSPinStatus = iota
IPFSPinStatusError
IPFSPinStatusDirect
IPFSPinStatusRecursive
IPFSPinStatusIndirect
IPFSPinStatusUnpinned
)
// IPFSPinStatus represents the status of a pin in IPFS (direct, recursive etc.)
type IPFSPinStatus int
// IPFSPinStatusFromString parses a string and returns the matching
// IPFSPinStatus.
func IPFSPinStatusFromString(t string) IPFSPinStatus {
// Since indirect statuses are of the form "indirect through <cid>"
// use a prefix match
switch {
case strings.HasPrefix(t, "indirect"):
return IPFSPinStatusIndirect
case strings.HasPrefix(t, "recursive"):
// FIXME: Maxdepth?
return IPFSPinStatusRecursive
case t == "direct":
return IPFSPinStatusDirect
default:
return IPFSPinStatusBug
}
}
// IsPinned returns true if the item is pinned as expected by the
// maxDepth parameter.
func (ips IPFSPinStatus) IsPinned(maxDepth PinDepth) bool {
switch {
case maxDepth < 0:
return ips == IPFSPinStatusRecursive
case maxDepth == 0:
return ips == IPFSPinStatusDirect
case maxDepth > 0:
// FIXME: when we know how ipfs returns partial pins.
return ips == IPFSPinStatusRecursive
}
return false
}
// ToTrackerStatus converts the IPFSPinStatus value to the
// appropriate TrackerStatus value.
func (ips IPFSPinStatus) ToTrackerStatus() TrackerStatus {
return ipfsPinStatus2TrackerStatusMap[ips]
}
var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{
IPFSPinStatusDirect: TrackerStatusPinned,
IPFSPinStatusRecursive: TrackerStatusPinned,
IPFSPinStatusIndirect: TrackerStatusUnpinned,
IPFSPinStatusUnpinned: TrackerStatusUnpinned,
IPFSPinStatusBug: TrackerStatusUndefined,
IPFSPinStatusError: TrackerStatusClusterError, //TODO(ajl): check suitability
}
// GlobalPinInfo contains cluster-wide status information about a tracked Cid,
// indexed by cluster peer.
type GlobalPinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"n"`
// https://github.com/golang/go/issues/28827
// Peer IDs are of string Kind(). We can't use peer IDs here
// as Go ignores TextMarshaler.
PeerMap map[string]*PinInfoShort `json:"peer_map" codec:"pm,omitempty"`
}
// String returns the string representation of a GlobalPinInfo.
func (gpi *GlobalPinInfo) String() string {
str := fmt.Sprintf("Cid: %v\n", gpi.Cid.String())
str = str + "Peer:\n"
for _, p := range gpi.PeerMap {
str = str + fmt.Sprintf("\t%+v\n", p)
}
return str
}
// Add adds a PinInfo object to a GlobalPinInfo
func (gpi *GlobalPinInfo) Add(pi *PinInfo) {
if !gpi.Cid.Defined() {
gpi.Cid = pi.Cid
gpi.Name = pi.Name
}
if gpi.PeerMap == nil {
gpi.PeerMap = make(map[string]*PinInfoShort)
}
gpi.PeerMap[peer.Encode(pi.Peer)] = &pi.PinInfoShort
}
// PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo
// objects and does not carry redundant information as PinInfo would.
type PinInfoShort struct {
PeerName string `json:"peername" codec:"pn,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
TS time.Time `json:"timestamp" codec:"ts,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
}
// PinInfo holds information about local pins. This is used by the Pin
// Trackers.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"m,omitempty"`
Peer peer.ID `json:"Peer" codec:"p,omitempty"`
PinInfoShort
}
// ToGlobal converts a PinInfo object to a GlobalPinInfo with
// a single peer corresponding to the given PinInfo.
func (pi *PinInfo) ToGlobal() *GlobalPinInfo {
gpi := GlobalPinInfo{}
gpi.Add(pi)
return &gpi
}
// Version holds version information
type Version struct {
Version string `json:"version" codec:"v"`
}
// ConnectGraph holds information about the connectivity of the cluster To
// read, traverse the keys of ClusterLinks. Each such id is one of the peers
// of the "ClusterID" peer running the query. ClusterLinks[id] in turn lists
// the ids that peer "id" sees itself connected to. It is possible that id is
// a peer of ClusterID, but ClusterID can not reach id over rpc, in which case
// ClusterLinks[id] == [], as id's view of its connectivity can not be
// retrieved.
//
// Iff there was an error reading the IPFSID of the peer then id will not be a
// key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS
// then id will be a key of IPFSLinks. In the event of a SwarmPeers error
// IPFSLinks[id] == [].
type ConnectGraph struct {
ClusterID peer.ID `json:"cluster_id" codec:"id"`
IDtoPeername map[string]string `json:"id_to_peername" codec:"ip,omitempty"`
// ipfs to ipfs links
IPFSLinks map[string][]peer.ID `json:"ipfs_links" codec:"il,omitempty"`
// cluster to cluster links
ClusterLinks map[string][]peer.ID `json:"cluster_links" codec:"cl,omitempty"`
// cluster trust links
ClusterTrustLinks map[string]bool `json:"cluster_trust_links" codec:"ctl,omitempty"`
// cluster to ipfs links
ClustertoIPFS map[string]peer.ID `json:"cluster_to_ipfs" codec:"ci,omitempty"`
}
// Multiaddr is a concrete type to wrap a Multiaddress so that it knows how to
// serialize and deserialize itself.
type Multiaddr struct {
multiaddr.Multiaddr
}
// NewMultiaddr returns a cluster Multiaddr wrapper creating the
// multiaddr.Multiaddr with the given string.
func NewMultiaddr(mstr string) (Multiaddr, error) {
m, err := multiaddr.NewMultiaddr(mstr)
return Multiaddr{Multiaddr: m}, err
}
// NewMultiaddrWithValue returns a new cluster Multiaddr wrapper using the
// given multiaddr.Multiaddr.
func NewMultiaddrWithValue(ma multiaddr.Multiaddr) Multiaddr {
return Multiaddr{Multiaddr: ma}
}
// MarshalJSON returns a JSON-formatted multiaddress.
func (maddr Multiaddr) MarshalJSON() ([]byte, error) {
return maddr.Multiaddr.MarshalJSON()
}
// UnmarshalJSON parses a cluster Multiaddr from the JSON representation.
func (maddr *Multiaddr) UnmarshalJSON(data []byte) error {
maddr.Multiaddr, _ = multiaddr.NewMultiaddr("/ip4/127.0.0.1") // null multiaddresses not allowed
return maddr.Multiaddr.UnmarshalJSON(data)
}
// MarshalBinary returs the bytes of the wrapped multiaddress.
func (maddr Multiaddr) MarshalBinary() ([]byte, error) {
return maddr.Multiaddr.MarshalBinary()
}
// UnmarshalBinary casts some bytes as a multiaddress wraps it with
// the given cluster Multiaddr.
func (maddr *Multiaddr) UnmarshalBinary(data []byte) error {
datacopy := make([]byte, len(data)) // This is super important
copy(datacopy, data)
maddr.Multiaddr, _ = multiaddr.NewMultiaddr("/ip4/127.0.0.1") // null multiaddresses not allowed
return maddr.Multiaddr.UnmarshalBinary(datacopy)
}
// Value returns the wrapped multiaddr.Multiaddr.
func (maddr Multiaddr) Value() multiaddr.Multiaddr {
return maddr.Multiaddr
}
// ID holds information about the Cluster peer
type ID struct {
ID peer.ID `json:"id" codec:"i,omitempty"`
Addresses []Multiaddr `json:"addresses" codec:"a,omitempty"`
ClusterPeers []peer.ID `json:"cluster_peers" codec:"cp,omitempty"`
ClusterPeersAddresses []Multiaddr `json:"cluster_peers_addresses" codec:"cpa,omitempty"`
Version string `json:"version" codec:"v,omitempty"`
Commit string `json:"commit" codec:"c,omitempty"`
RPCProtocolVersion protocol.ID `json:"rpc_protocol_version" codec:"rv,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
IPFS *IPFSID `json:"ipfs,omitempty" codec:"ip,omitempty"`
Peername string `json:"peername" codec:"pn,omitempty"`
//PublicKey crypto.PubKey
}
// IPFSID is used to store information about the underlying IPFS daemon
type IPFSID struct {
ID peer.ID `json:"id,omitempty" codec:"i,omitempty"`
Addresses []Multiaddr `json:"addresses" codec:"a,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
}
// PinType specifies which sort of Pin object we are dealing with.
// In practice, the PinType decides how a Pin object is treated by the
// PinTracker.
// See descriptions above.
// A sharded Pin would look like:
//
// [ Meta ] (not pinned on IPFS, only present in cluster state)
// |
// v
// [ Cluster DAG ] (pinned everywhere in "direct")
// | .. |
// v v
// [Shard1] .. [ShardN] (allocated to peers and pinned with max-depth=1
// | | .. | | | .. |
// v v .. v v v .. v
// [][]..[] [][]..[] Blocks (indirectly pinned on ipfs, not tracked in cluster)
//
//
type PinType uint64
// PinType values. See PinType documentation for further explanation.
const (
// BadType type showing up anywhere indicates a bug
BadType PinType = 1 << iota
// DataType is a regular, non-sharded pin. It is pinned recursively.
// It has no associated reference.
DataType
// MetaType tracks the original CID of a sharded DAG. Its Reference
// points to the Cluster DAG CID.
MetaType
// ClusterDAGType pins carry the CID of the root node that points to
// all the shard-root-nodes of the shards in which a DAG has been
// divided. Its Reference carries the MetaType CID.
// ClusterDAGType pins are pinned directly everywhere.
ClusterDAGType
// ShardType pins carry the root CID of a shard, which points
// to individual blocks on the original DAG that the user is adding,
// which has been sharded.
// They carry a Reference to the previous shard.
// ShardTypes are pinned with MaxDepth=1 (root and
// direct children only).
ShardType
)
// AllType is a PinType used for filtering all pin types
const AllType PinType = DataType | MetaType | ClusterDAGType | ShardType
// PinTypeFromString is the inverse of String. It returns the PinType value
// corresponding to the input string
func PinTypeFromString(str string) PinType {
switch str {
case "pin":
return DataType
case "meta-pin":
return MetaType
case "clusterdag-pin":
return ClusterDAGType
case "shard-pin":
return ShardType
case "all":
return AllType
case "":
return AllType
default:
return BadType
}
}
// String returns a printable value to identify the PinType
func (pT PinType) String() string {
switch pT {
case DataType:
return "pin"
case MetaType:
return "meta-pin"
case ClusterDAGType:
return "clusterdag-pin"
case ShardType:
return "shard-pin"
case AllType:
return "all"
default:
return "bad-type"
}
}
var pinOptionsMetaPrefix = "meta-"
// PinMode is a PinOption that indicates how to pin something on IPFS,
// recursively or direct.
type PinMode int
// PinMode values
const (
PinModeRecursive PinMode = 0
PinModeDirect PinMode = 1
)
// PinModeFromString converst a string to PinMode.
func PinModeFromString(s string) PinMode {
switch s {
case "recursive", "":
return PinModeRecursive
case "direct":
return PinModeDirect
default:
logger.Warnf("unknown pin mode %s. Defaulting to recursive", s)
return PinModeRecursive
}
}
// String returns a human-readable value for PinMode.
func (pm PinMode) String() string {
switch pm {
case PinModeRecursive:
return "recursive"
case PinModeDirect:
return "direct"
default:
return "recursive"
}
}
// MarshalJSON converts the PinMode into a readable string in JSON.
func (pm PinMode) MarshalJSON() ([]byte, error) {
return json.Marshal(pm.String())
}
// UnmarshalJSON takes a JSON value and parses it into PinMode.
func (pm *PinMode) UnmarshalJSON(b []byte) error {
var s string
err := json.Unmarshal(b, &s)
if err != nil {
return err
}
*pm = PinModeFromString(s)
return nil
}
// ToPinDepth converts the Mode to Depth.
func (pm PinMode) ToPinDepth() PinDepth {
switch pm {
case PinModeRecursive:
return -1
case PinModeDirect:
return 0
default:
logger.Warn("unknown pin mode %d. Defaulting to -1 depth", pm)
return -1
}
}
// PinOptions wraps user-defined options for Pins
type PinOptions struct {
ReplicationFactorMin int `json:"replication_factor_min" codec:"rn,omitempty"`
ReplicationFactorMax int `json:"replication_factor_max" codec:"rx,omitempty"`
Name string `json:"name" codec:"n,omitempty"`
Mode PinMode `json:"mode" codec:"o,omitempty"`
ShardSize uint64 `json:"shard_size" codec:"s,omitempty"`
UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"`
ExpireAt time.Time `json:"expire_at" codec:"e,omitempty"`
Metadata map[string]string `json:"metadata" codec:"m,omitempty"`
PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"`
}
// Equals returns true if two PinOption objects are equivalent. po and po2 may
// be nil.
func (po *PinOptions) Equals(po2 *PinOptions) bool {
if po == nil && po2 != nil || po2 == nil && po != nil {
return false
}
if po == po2 { // same as pin.Equals()
return false
}
if po.Name != po2.Name {
return false
}
if po.Mode != po2.Mode {
return false
}
if po.ReplicationFactorMax != po2.ReplicationFactorMax {
return false
}
if po.ReplicationFactorMin != po2.ReplicationFactorMin {
return false
}
if po.ShardSize != po2.ShardSize {
return false
}
lenAllocs1 := len(po.UserAllocations)
lenAllocs2 := len(po2.UserAllocations)
if lenAllocs1 != lenAllocs2 {
return false
}
// avoid side effects in the original objects
allocs1 := PeersToStrings(po.UserAllocations)
allocs2 := PeersToStrings(po2.UserAllocations)
sort.Strings(allocs1)
sort.Strings(allocs2)
if strings.Join(allocs1, ",") != strings.Join(allocs2, ",") {
return false
}
if !po.ExpireAt.Equal(po2.ExpireAt) {
return false
}
for k, v := range po.Metadata {
v2 := po2.Metadata[k]
if k != "" && v != v2 {
return false
}
}
// deliberately ignore Update
return true
}
// ToQuery returns the PinOption as query arguments.
func (po *PinOptions) ToQuery() (string, error) {
q := url.Values{}
q.Set("replication-min", fmt.Sprintf("%d", po.ReplicationFactorMin))
q.Set("replication-max", fmt.Sprintf("%d", po.ReplicationFactorMax))
q.Set("name", po.Name)
q.Set("mode", po.Mode.String())
q.Set("shard-size", fmt.Sprintf("%d", po.ShardSize))
q.Set("user-allocations", strings.Join(PeersToStrings(po.UserAllocations), ","))
if !po.ExpireAt.IsZero() {
v, err := po.ExpireAt.MarshalText()
if err != nil {
return "", err
}
q.Set("expire-at", string(v))
}
for k, v := range po.Metadata {
if k == "" {
continue
}
q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v)
}
if po.PinUpdate != cid.Undef {
q.Set("pin-update", po.PinUpdate.String())
}
return q.Encode(), nil
}
// FromQuery is the inverse of ToQuery().
func (po *PinOptions) FromQuery(q url.Values) error {
po.Name = q.Get("name")
po.Mode = PinModeFromString(q.Get("mode"))
rplStr := q.Get("replication")
if rplStr != "" { // override
q.Set("replication-min", rplStr)
q.Set("replication-max", rplStr)
}
err := parseIntParam(q, "replication-min", &po.ReplicationFactorMin)
if err != nil {
return err
}
err = parseIntParam(q, "replication-max", &po.ReplicationFactorMax)
if err != nil {
return err
}
if v := q.Get("shard-size"); v != "" {
shardSize, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return errors.New("parameter shard_size is invalid")
}
po.ShardSize = shardSize
}
if allocs := q.Get("user-allocations"); allocs != "" {
po.UserAllocations = StringsToPeers(strings.Split(allocs, ","))
}
if v := q.Get("expire-at"); v != "" {
var tm time.Time
err := tm.UnmarshalText([]byte(v))
if err != nil {
return errors.Wrap(err, "expire-at cannot be parsed")
}
po.ExpireAt = tm
} else if v = q.Get("expire-in"); v != "" {
d, err := time.ParseDuration(v)
if err != nil {
return errors.Wrap(err, "expire-in cannot be parsed")
}
if d < time.Second {
return errors.New("expire-in duration too short")
}
po.ExpireAt = time.Now().Add(d)
}
po.Metadata = make(map[string]string)
for k := range q {
if !strings.HasPrefix(k, pinOptionsMetaPrefix) {
continue
}
metaKey := strings.TrimPrefix(k, pinOptionsMetaPrefix)
if metaKey == "" {
continue
}
po.Metadata[metaKey] = q.Get(k)
}
updateStr := q.Get("pin-update")
if updateStr != "" {
updateCid, err := cid.Decode(updateStr)
if err != nil {
return fmt.Errorf("error decoding update option parameter: %s", err)
}
po.PinUpdate = updateCid
}
return nil
}
// PinDepth indicates how deep a pin should be pinned, with
// -1 meaning "to the bottom", or "recursive".
type PinDepth int
// ToPinMode converts PinDepth to PinMode
func (pd PinDepth) ToPinMode() PinMode {
switch pd {
case -1:
return PinModeRecursive
case 0:
return PinModeDirect
default:
logger.Warnf("bad pin depth: %d", pd)
return PinModeRecursive
}
}
// Pin carries all the information associated to a CID that is pinned
// in IPFS Cluster. It also carries transient information (that may not
// get protobuffed, like UserAllocations).
type Pin struct {
PinOptions
Cid cid.Cid `json:"cid" codec:"c"`
// See PinType comments
Type PinType `json:"type" codec:"t,omitempty"`
// The peers to which this pin is allocated
Allocations []peer.ID `json:"allocations" codec:"a,omitempty"`
// MaxDepth associated to this pin. -1 means
// recursive.
MaxDepth PinDepth `json:"max_depth" codec:"d,omitempty"`
// We carry a reference CID to this pin. For
// ClusterDAGs, it is the MetaPin CID. For the
// MetaPin it is the ClusterDAG CID. For Shards,
// it is the previous shard CID.
// When not needed the pointer is nil
Reference *cid.Cid `json:"reference" codec:"r,omitempty"`
}
// String is a string representation of a Pin.
func (pin *Pin) String() string {
var b strings.Builder
fmt.Fprintf(&b, "cid: %s\n", pin.Cid.String())
fmt.Fprintf(&b, "type: %s\n", pin.Type)
fmt.Fprintf(&b, "allocations: %v\n", pin.Allocations)
fmt.Fprintf(&b, "maxdepth: %d\n", pin.MaxDepth)
if pin.Reference != nil {
fmt.Fprintf(&b, "reference: %s\n", pin.Reference)
}
return b.String()
}
// IsPinEverywhere returns when the both replication factors are set to -1.
func (pin *Pin) IsPinEverywhere() bool {
return pin.ReplicationFactorMin == -1 && pin.ReplicationFactorMax == -1
}
// PinPath is a wrapper for holding pin options and path of the content.
type PinPath struct {
PinOptions
Path string `json:"path"`
}
// PinCid is a shortcut to create a Pin only with a Cid. Default is for pin to
// be recursive and the pin to be of DataType.
func PinCid(c cid.Cid) *Pin {
return &Pin{
Cid: c,
Type: DataType,
Allocations: []peer.ID{},
MaxDepth: -1, // Recursive
}
}
// PinWithOpts creates a new Pin calling PinCid(c) and then sets its
// PinOptions fields with the given options. Pin fields that are linked to
// options are set accordingly (MaxDepth from Mode).
func PinWithOpts(c cid.Cid, opts PinOptions) *Pin {
p := PinCid(c)
p.PinOptions = opts
p.MaxDepth = p.Mode.ToPinDepth()
return p
}
func convertPinType(t PinType) pb.Pin_PinType {
var i pb.Pin_PinType
for t != 1 {
if t == 0 {
return pb.Pin_BadType
}
t = t >> 1
i++
}
return i
}
// ProtoMarshal marshals this Pin using probobuf.
func (pin *Pin) ProtoMarshal() ([]byte, error) {
allocs := make([][]byte, len(pin.Allocations))
for i, pid := range pin.Allocations {
bs, err := pid.Marshal()
if err != nil {
return nil, err
}
allocs[i] = bs
}
var expireAtProto uint64
// Only set the protobuf field with non-zero times.
if !(pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero)) {
expireAtProto = uint64(pin.ExpireAt.Unix())
}
opts := &pb.PinOptions{
ReplicationFactorMin: int32(pin.ReplicationFactorMin),
ReplicationFactorMax: int32(pin.ReplicationFactorMax),
Name: pin.Name,
ShardSize: pin.ShardSize,
Metadata: pin.Metadata,
PinUpdate: pin.PinUpdate.Bytes(),
ExpireAt: expireAtProto,
// Mode: pin.Mode,
// UserAllocations: pin.UserAllocations,
}
pbPin := &pb.Pin{
Cid: pin.Cid.Bytes(),
Type: convertPinType(pin.Type),
Allocations: allocs,
MaxDepth: int32(pin.MaxDepth),
Options: opts,
}
if ref := pin.Reference; ref != nil {
pbPin.Reference = ref.Bytes()
}
return proto.Marshal(pbPin)
}
// ProtoUnmarshal unmarshals this fields from protobuf-encoded bytes.
func (pin *Pin) ProtoUnmarshal(data []byte) error {
pbPin := pb.Pin{}
err := proto.Unmarshal(data, &pbPin)
if err != nil {
return err
}
ci, err := cid.Cast(pbPin.GetCid())
if err != nil {
pin.Cid = cid.Undef
} else {
pin.Cid = ci
}
pin.Type = 1 << uint64(pbPin.GetType())
pbAllocs := pbPin.GetAllocations()
lenAllocs := len(pbAllocs)
allocs := make([]peer.ID, lenAllocs)
for i, pidb := range pbAllocs {
pid, err := peer.IDFromBytes(pidb)
if err != nil {
return err
}
allocs[i] = pid
}
pin.Allocations = allocs
pin.MaxDepth = PinDepth(pbPin.GetMaxDepth())
ref, err := cid.Cast(pbPin.GetReference())
if err != nil {
pin.Reference = nil
} else {
pin.Reference = &ref
}
opts := pbPin.GetOptions()
pin.ReplicationFactorMin = int(opts.GetReplicationFactorMin())
pin.ReplicationFactorMax = int(opts.GetReplicationFactorMax())
pin.Name = opts.GetName()
pin.ShardSize = opts.GetShardSize()
// pin.UserAllocations = opts.GetUserAllocations()
t := opts.GetExpireAt()
if t > 0 {
pin.ExpireAt = time.Unix(int64(t), 0)
}
pin.Metadata = opts.GetMetadata()
pinUpdate, err := cid.Cast(opts.GetPinUpdate())
if err == nil {
pin.PinUpdate = pinUpdate
}
// We do not store the PinMode option but we can
// derive it from the MaxDepth setting.
pin.Mode = pin.MaxDepth.ToPinMode()
return nil
}
// Equals checks if two pins are the same (with the same allocations).
// If allocations are the same but in different order, they are still
// considered equivalent.
// pin or pin2 may be nil. If both are nil, Equals returns false.
func (pin *Pin) Equals(pin2 *Pin) bool {
if pin == nil && pin2 != nil || pin2 == nil && pin != nil {
return false
}
if pin == pin2 {
// ask @lanzafame why this is not true
// in any case, this is anomalous and we should
// not be using this with two nils.
return false
}
if !pin.Cid.Equals(pin2.Cid) {
return false
}
if pin.Type != pin2.Type {
return false
}
if pin.MaxDepth != pin2.MaxDepth {
return false
}
if pin.Reference != nil && pin2.Reference == nil ||
pin.Reference == nil && pin2.Reference != nil {
return false
}
if pin.Reference != nil && pin2.Reference != nil &&
!pin.Reference.Equals(*pin2.Reference) {
return false
}
allocs1 := PeersToStrings(pin.Allocations)
sort.Strings(allocs1)
allocs2 := PeersToStrings(pin2.Allocations)
sort.Strings(allocs2)
if strings.Join(allocs1, ",") != strings.Join(allocs2, ",") {
return false
}
return pin.PinOptions.Equals(&pin2.PinOptions)
}
// IsRemotePin determines whether a Pin's ReplicationFactor has
// been met, so as to either pin or unpin it from the peer.
func (pin *Pin) IsRemotePin(pid peer.ID) bool {
if pin.IsPinEverywhere() {
return false
}
for _, p := range pin.Allocations {
if p == pid {
return false
}
}
return true
}
// ExpiredAt returns whether the pin has expired at the given time.
func (pin *Pin) ExpiredAt(t time.Time) bool {
if pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero) {
return false
}
return pin.ExpireAt.Before(t)
}
// NodeWithMeta specifies a block of data and a set of optional metadata fields
// carrying information about the encoded ipld node
type NodeWithMeta struct {
Data []byte `codec:"d,omitempty"`
Cid cid.Cid `codec:"c,omitempty"`
CumSize uint64 `codec:"s,omitempty"` // Cumulative size
}
// Size returns how big is the block. It is different from CumSize, which
// records the size of the underlying tree.
func (n *NodeWithMeta) Size() uint64 {
return uint64(len(n.Data))
}
// Metric transports information about a peer.ID. It is used to decide
// pin allocations by a PinAllocator. IPFS cluster is agnostic to
// the Value, which should be interpreted by the PinAllocator.
// The ReceivedAt value is a timestamp representing when a peer has received
// the metric value.
type Metric struct {
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
Value string `json:"value" codec:"v,omitempty"`
Expire int64 `json:"expire" codec:"e,omitempty"`
Valid bool `json:"valid" codec:"d,omitempty"`
ReceivedAt int64 `json:"received_at" codec:"t,omitempty"` // ReceivedAt contains a UnixNano timestamp
}
// SetTTL sets Metric to expire after the given time.Duration
func (m *Metric) SetTTL(d time.Duration) {
exp := time.Now().Add(d)
m.Expire = exp.UnixNano()
}
// GetTTL returns the time left before the Metric expires
func (m *Metric) GetTTL() time.Duration {
expDate := time.Unix(0, m.Expire)
return time.Until(expDate)
}
// Expired returns if the Metric has expired
func (m *Metric) Expired() bool {
expDate := time.Unix(0, m.Expire)
return time.Now().After(expDate)
}
// Discard returns if the metric not valid or has expired
func (m *Metric) Discard() bool {
return !m.Valid || m.Expired()
}
// MetricSlice is a sortable Metric array.
type MetricSlice []*Metric
func (es MetricSlice) Len() int { return len(es) }
func (es MetricSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
func (es MetricSlice) Less(i, j int) bool {
if es[i].Peer == es[j].Peer {
return es[i].Expire < es[j].Expire
}
return es[i].Peer < es[j].Peer
}
// Alert carries alerting information about a peer.
type Alert struct {
Metric
TriggeredAt time.Time `json:"triggered_at" codec:"r,omitempty"`
}
// Error can be used by APIs to return errors.
type Error struct {
Code int `json:"code" codec:"o,omitempty"`
Message string `json:"message" codec:"m,omitempty"`
}
// Error implements the error interface and returns the error's message.
func (e *Error) Error() string {
return fmt.Sprintf("%s (%d)", e.Message, e.Code)
}
// IPFSRepoStat wraps information about the IPFS repository.
type IPFSRepoStat struct {
RepoSize uint64 `codec:"r,omitempty"`
StorageMax uint64 `codec:"s, omitempty"`
}
// IPFSRepoGC represents the streaming response sent from repo gc API of IPFS.
type IPFSRepoGC struct {
Key cid.Cid `json:"key,omitempty" codec:"k,omitempty"`
Error string `json:"error,omitempty" codec:"e,omitempty"`
}
// RepoGC contains garbage collected CIDs from a cluster peer's IPFS daemon.
type RepoGC struct {
Peer peer.ID `json:"peer" codec:"p,omitempty"` // the Cluster peer ID
Peername string `json:"peername" codec:"pn,omitempty"`
Keys []IPFSRepoGC `json:"keys" codec:"k"`
Error string `json:"error,omitempty" codec:"e,omitempty"`
}
// GlobalRepoGC contains cluster-wide information about garbage collected CIDs
// from IPFS.
type GlobalRepoGC struct {
PeerMap map[string]*RepoGC `json:"peer_map" codec:"pm,omitempty"`
}