// Package api holds declarations for types used in ipfs-cluster APIs to make // them re-usable across differen tools. This include RPC API "Serial[izable]" // versions for types. The Go API uses natives types, while RPC API, // REST APIs etc use serializable types (i.e. json format). Conversion methods // exists between types. // // Note that all conversion methods ignore any parsing errors. All values must // be validated first before initializing any of the types defined here. package api import ( "encoding/json" "fmt" "net/url" "sort" "strconv" "strings" "time" pb "github.com/ipfs-cluster/ipfs-cluster/api/pb" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" protocol "github.com/libp2p/go-libp2p-core/protocol" multiaddr "github.com/multiformats/go-multiaddr" // needed to parse /ws multiaddresses _ "github.com/libp2p/go-ws-transport" // needed to parse /dns* multiaddresses _ "github.com/multiformats/go-multiaddr-dns" "github.com/pkg/errors" proto "google.golang.org/protobuf/proto" ) var logger = logging.Logger("apitypes") var unixZero = time.Unix(0, 0) func init() { // intialize trackerStatusString stringTrackerStatus = make(map[string]TrackerStatus) for k, v := range trackerStatusString { stringTrackerStatus[v] = k } } // TrackerStatus values const ( // IPFSStatus should never take this value. // When used as a filter. It means "all". TrackerStatusUndefined TrackerStatus = 0 // The cluster node is offline or not responding TrackerStatusClusterError TrackerStatus = 1 << iota // An error occurred pinning TrackerStatusPinError // An error occurred unpinning TrackerStatusUnpinError // The IPFS daemon has pinned the item TrackerStatusPinned // The IPFS daemon is currently pinning the item TrackerStatusPinning // The IPFS daemon is currently unpinning the item TrackerStatusUnpinning // The IPFS daemon is not pinning the item TrackerStatusUnpinned // The IPFS daemon is not pinning the item but it is being tracked TrackerStatusRemote // The item has been queued for pinning on the IPFS daemon TrackerStatusPinQueued // The item has been queued for unpinning on the IPFS daemon TrackerStatusUnpinQueued // The IPFS daemon is not pinning the item through this cid but it is // tracked in a cluster dag TrackerStatusSharded // The item is in the state and should be pinned, but // it is however not pinned and not queued/pinning. TrackerStatusUnexpectedlyUnpinned ) // Composite TrackerStatus. const ( TrackerStatusError = TrackerStatusClusterError | TrackerStatusPinError | TrackerStatusUnpinError TrackerStatusQueued = TrackerStatusPinQueued | TrackerStatusUnpinQueued ) // TrackerStatus represents the status of a tracked Cid in the PinTracker type TrackerStatus int var trackerStatusString = map[TrackerStatus]string{ TrackerStatusUndefined: "undefined", TrackerStatusClusterError: "cluster_error", TrackerStatusPinError: "pin_error", TrackerStatusUnpinError: "unpin_error", TrackerStatusError: "error", TrackerStatusPinned: "pinned", TrackerStatusPinning: "pinning", TrackerStatusUnpinning: "unpinning", TrackerStatusUnpinned: "unpinned", TrackerStatusRemote: "remote", TrackerStatusPinQueued: "pin_queued", TrackerStatusUnpinQueued: "unpin_queued", TrackerStatusQueued: "queued", TrackerStatusSharded: "sharded", TrackerStatusUnexpectedlyUnpinned: "unexpectedly_unpinned", } // values autofilled in init() var stringTrackerStatus map[string]TrackerStatus // String converts a TrackerStatus into a readable string. // If the given TrackerStatus is a filter (with several // bits set), it will return a comma-separated list. func (st TrackerStatus) String() string { var values []string // simple and known composite values if v, ok := trackerStatusString[st]; ok { return v } // other filters for k, v := range trackerStatusString { if st&k > 0 { values = append(values, v) } } return strings.Join(values, ",") } // Match returns true if the tracker status matches the given filter. // For example TrackerStatusPinError will match TrackerStatusPinError // and TrackerStatusError. func (st TrackerStatus) Match(filter TrackerStatus) bool { return filter == TrackerStatusUndefined || st == TrackerStatusUndefined || st&filter > 0 } // MarshalJSON uses the string representation of TrackerStatus for JSON // encoding. func (st TrackerStatus) MarshalJSON() ([]byte, error) { return json.Marshal(st.String()) } // UnmarshalJSON sets a tracker status from its JSON representation. func (st *TrackerStatus) UnmarshalJSON(data []byte) error { var v string err := json.Unmarshal(data, &v) if err != nil { return err } *st = TrackerStatusFromString(v) return nil } // TrackerStatusFromString parses a string and returns the matching // TrackerStatus value. The string can be a comma-separated list // representing a TrackerStatus filter. Unknown status names are // ignored. func TrackerStatusFromString(str string) TrackerStatus { values := strings.Split(strings.Replace(str, " ", "", -1), ",") var status TrackerStatus for _, v := range values { st, ok := stringTrackerStatus[v] if ok { status |= st } } return status } // TrackerStatusAll all known TrackerStatus values. func TrackerStatusAll() []TrackerStatus { var list []TrackerStatus for k := range trackerStatusString { if k != TrackerStatusUndefined { list = append(list, k) } } return list } // IPFSPinStatus values // FIXME include maxdepth const ( IPFSPinStatusBug IPFSPinStatus = iota IPFSPinStatusError IPFSPinStatusDirect IPFSPinStatusRecursive IPFSPinStatusIndirect IPFSPinStatusUnpinned ) // IPFSPinStatus represents the status of a pin in IPFS (direct, recursive etc.) type IPFSPinStatus int // IPFSPinStatusFromString parses a string and returns the matching // IPFSPinStatus. func IPFSPinStatusFromString(t string) IPFSPinStatus { // Since indirect statuses are of the form "indirect through " // use a prefix match switch { case strings.HasPrefix(t, "indirect"): return IPFSPinStatusIndirect case strings.HasPrefix(t, "recursive"): // FIXME: Maxdepth? return IPFSPinStatusRecursive case t == "direct": return IPFSPinStatusDirect default: return IPFSPinStatusBug } } // String returns the string form of the status as written by IPFS. func (ips IPFSPinStatus) String() string { switch ips { case IPFSPinStatusDirect: return "direct" case IPFSPinStatusRecursive: return "recursive" case IPFSPinStatusIndirect: return "indirect" default: return "" } } // UnmarshalJSON parses a status from JSON func (ips *IPFSPinStatus) UnmarshalJSON(b []byte) error { var str string err := json.Unmarshal(b, &str) if err != nil { return err } *ips = IPFSPinStatusFromString(str) return nil } // MarshalJSON converts a status to JSON. func (ips IPFSPinStatus) MarshalJSON() ([]byte, error) { return json.Marshal(ips.String()) } // IsPinned returns true if the item is pinned as expected by the // maxDepth parameter. func (ips IPFSPinStatus) IsPinned(maxDepth PinDepth) bool { switch { case maxDepth < 0: return ips == IPFSPinStatusRecursive case maxDepth == 0: return ips == IPFSPinStatusDirect case maxDepth > 0: // FIXME: when we know how ipfs returns partial pins. return ips == IPFSPinStatusRecursive } return false } // ToTrackerStatus converts the IPFSPinStatus value to the // appropriate TrackerStatus value. func (ips IPFSPinStatus) ToTrackerStatus() TrackerStatus { return ipfsPinStatus2TrackerStatusMap[ips] } var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{ IPFSPinStatusDirect: TrackerStatusPinned, IPFSPinStatusRecursive: TrackerStatusPinned, IPFSPinStatusIndirect: TrackerStatusUnpinned, IPFSPinStatusUnpinned: TrackerStatusUnpinned, IPFSPinStatusBug: TrackerStatusUndefined, IPFSPinStatusError: TrackerStatusClusterError, //TODO(ajl): check suitability } // Cid embeds a cid.Cid with the MarshalJSON/UnmarshalJSON methods overwritten. type Cid struct { cid.Cid } // CidUndef is an Undefined CID. var CidUndef = Cid{cid.Undef} // NewCid wraps a cid.Cid in a Cid. func NewCid(c cid.Cid) Cid { return Cid{ Cid: c, } } // DecodeCid parses a CID from its string form. func DecodeCid(str string) (Cid, error) { c, err := cid.Decode(str) return Cid{c}, err } // CastCid returns a CID from its bytes. func CastCid(bs []byte) (Cid, error) { c, err := cid.Cast(bs) return Cid{c}, err } // MarshalJSON marshals a CID as JSON as a normal CID string. func (c Cid) MarshalJSON() ([]byte, error) { if !c.Defined() { return []byte("null"), nil } return []byte(`"` + c.String() + `"`), nil } // UnmarshalJSON reads a CID from its representation as JSON string. func (c *Cid) UnmarshalJSON(b []byte) error { if string(b) == "null" { *c = CidUndef return nil } var cidStr string err := json.Unmarshal(b, &cidStr) if err != nil { return err } cc, err := DecodeCid(cidStr) if err != nil { return err } *c = cc return nil } // Equals returns true if two Cids are equal. func (c Cid) Equals(c2 Cid) bool { return c.Cid.Equals(c2.Cid) } // IPFSPinInfo represents an IPFS Pin, which only has a CID and type. // Its JSON form is what IPFS returns when querying a pinset. type IPFSPinInfo struct { Cid Cid `json:"Cid" codec:"c"` Type IPFSPinStatus `json:"Type" codec:"t"` } // GlobalPinInfo contains cluster-wide status information about a tracked Cid, // indexed by cluster peer. type GlobalPinInfo struct { Cid Cid `json:"cid" codec:"c"` Name string `json:"name" codec:"n"` Allocations []peer.ID `json:"allocations" codec:"a,omitempty"` Origins []Multiaddr `json:"origins" codec:"g,omitempty"` Created time.Time `json:"created" codec:"t,omitempty"` Metadata map[string]string `json:"metadata" codec:"m,omitempty"` // https://github.com/golang/go/issues/28827 // Peer IDs are of string Kind(). We can't use peer IDs here // as Go ignores TextMarshaler. PeerMap map[string]PinInfoShort `json:"peer_map" codec:"pm,omitempty"` } // String returns the string representation of a GlobalPinInfo. func (gpi GlobalPinInfo) String() string { str := fmt.Sprintf("Cid: %s\n", gpi.Cid) str = str + "Peers:\n" for pid, p := range gpi.PeerMap { str = str + fmt.Sprintf("\t%s: %+v\n", pid, p) } return str } // Add adds a PinInfo object to a GlobalPinInfo func (gpi *GlobalPinInfo) Add(pi PinInfo) { if !gpi.Cid.Defined() || !pi.Status.Match(TrackerStatusClusterError) { gpi.Cid = pi.Cid gpi.Name = pi.Name gpi.Allocations = pi.Allocations gpi.Origins = pi.Origins gpi.Created = pi.Created gpi.Metadata = pi.Metadata } if gpi.PeerMap == nil { gpi.PeerMap = make(map[string]PinInfoShort) } gpi.PeerMap[peer.Encode(pi.Peer)] = pi.PinInfoShort } // Defined returns if the object is not empty. func (gpi *GlobalPinInfo) Defined() bool { return gpi.Cid.Defined() } // Match returns true if one of the statuses in GlobalPinInfo matches // the given filter. func (gpi GlobalPinInfo) Match(filter TrackerStatus) bool { for _, pi := range gpi.PeerMap { if pi.Status.Match(filter) { return true } } return false } // PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo // objects and does not carry redundant information as PinInfo would. type PinInfoShort struct { PeerName string `json:"peername" codec:"pn,omitempty"` IPFS peer.ID `json:"ipfs_peer_id,omitempty" codec:"i,omitempty"` IPFSAddresses []Multiaddr `json:"ipfs_peer_addresses,omitempty" codec:"ia,omitempty"` Status TrackerStatus `json:"status" codec:"st,omitempty"` TS time.Time `json:"timestamp" codec:"ts,omitempty"` Error string `json:"error" codec:"e,omitempty"` AttemptCount int `json:"attempt_count" codec:"a,omitempty"` PriorityPin bool `json:"priority_pin" codec:"y,omitempty"` } // String provides a string representation of PinInfoShort. func (pis PinInfoShort) String() string { var b strings.Builder fmt.Fprintf(&b, "status: %s\n", pis.Status) fmt.Fprintf(&b, "peername: %s\n", pis.PeerName) fmt.Fprintf(&b, "ipfs: %s\n", pis.IPFS) fmt.Fprintf(&b, "ipfsAddresses: %v\n", pis.IPFSAddresses) fmt.Fprintf(&b, "error: %s\n", pis.Error) fmt.Fprintf(&b, "attemptCount: %d\n", pis.AttemptCount) fmt.Fprintf(&b, "priority: %t\n", pis.PriorityPin) return b.String() } // PinInfo holds information about local pins. This is used by the Pin // Trackers. type PinInfo struct { Cid Cid `json:"cid" codec:"c"` Name string `json:"name" codec:"m,omitempty"` Peer peer.ID `json:"peer" codec:"p,omitempty"` Allocations []peer.ID `json:"allocations" codec:"o,omitempty"` Origins []Multiaddr `json:"origins" codec:"g,omitempty"` Created time.Time `json:"created" codec:"t,omitempty"` Metadata map[string]string `json:"metadata" codec:"md,omitempty"` PinInfoShort } // ToGlobal converts a PinInfo object to a GlobalPinInfo with // a single peer corresponding to the given PinInfo. func (pi PinInfo) ToGlobal() GlobalPinInfo { gpi := GlobalPinInfo{} gpi.Add(pi) return gpi } // Defined retuns if the PinInfo is not zero. func (pi PinInfo) Defined() bool { return pi.Cid.Defined() } // String provides a string representation of PinInfo. func (pi PinInfo) String() string { var b strings.Builder fmt.Fprintf(&b, "cid: %s\n", pi.Cid) fmt.Fprintf(&b, "name: %s\n", pi.Name) fmt.Fprintf(&b, "peer: %s\n", pi.Peer) fmt.Fprintf(&b, "allocations: %v\n", pi.Allocations) fmt.Fprintf(&b, "%s\n", pi.PinInfoShort) return b.String() } // Version holds version information type Version struct { Version string `json:"version" codec:"v"` } // ConnectGraph holds information about the connectivity of the cluster To // read, traverse the keys of ClusterLinks. Each such id is one of the peers // of the "ClusterID" peer running the query. ClusterLinks[id] in turn lists // the ids that peer "id" sees itself connected to. It is possible that id is // a peer of ClusterID, but ClusterID can not reach id over rpc, in which case // ClusterLinks[id] == [], as id's view of its connectivity can not be // retrieved. // // Iff there was an error reading the IPFSID of the peer then id will not be a // key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS // then id will be a key of IPFSLinks. In the event of a SwarmPeers error // IPFSLinks[id] == []. type ConnectGraph struct { ClusterID peer.ID `json:"cluster_id" codec:"id"` IDtoPeername map[string]string `json:"id_to_peername" codec:"ip,omitempty"` // ipfs to ipfs links IPFSLinks map[string][]peer.ID `json:"ipfs_links" codec:"il,omitempty"` // cluster to cluster links ClusterLinks map[string][]peer.ID `json:"cluster_links" codec:"cl,omitempty"` // cluster trust links ClusterTrustLinks map[string]bool `json:"cluster_trust_links" codec:"ctl,omitempty"` // cluster to ipfs links ClustertoIPFS map[string]peer.ID `json:"cluster_to_ipfs" codec:"ci,omitempty"` } // Multiaddr is a concrete type to wrap a Multiaddress so that it knows how to // serialize and deserialize itself. type Multiaddr struct { multiaddr.Multiaddr } // NewMultiaddr returns a cluster Multiaddr wrapper creating the // multiaddr.Multiaddr with the given string. func NewMultiaddr(mstr string) (Multiaddr, error) { m, err := multiaddr.NewMultiaddr(mstr) return Multiaddr{Multiaddr: m}, err } // NewMultiaddrWithValue returns a new cluster Multiaddr wrapper using the // given multiaddr.Multiaddr. func NewMultiaddrWithValue(ma multiaddr.Multiaddr) Multiaddr { return Multiaddr{Multiaddr: ma} } // MarshalJSON returns a JSON-formatted multiaddress. func (maddr Multiaddr) MarshalJSON() ([]byte, error) { return maddr.Multiaddr.MarshalJSON() } // UnmarshalJSON parses a cluster Multiaddr from the JSON representation. func (maddr *Multiaddr) UnmarshalJSON(data []byte) error { maddr.Multiaddr, _ = multiaddr.NewMultiaddr("/ip4/127.0.0.1") // null multiaddresses not allowed return maddr.Multiaddr.UnmarshalJSON(data) } // MarshalBinary returs the bytes of the wrapped multiaddress. func (maddr Multiaddr) MarshalBinary() ([]byte, error) { return maddr.Multiaddr.MarshalBinary() } // UnmarshalBinary casts some bytes as a multiaddress wraps it with // the given cluster Multiaddr. func (maddr *Multiaddr) UnmarshalBinary(data []byte) error { datacopy := make([]byte, len(data)) // This is super important copy(datacopy, data) maddr.Multiaddr, _ = multiaddr.NewMultiaddr("/ip4/127.0.0.1") // null multiaddresses not allowed return maddr.Multiaddr.UnmarshalBinary(datacopy) } // Value returns the wrapped multiaddr.Multiaddr. func (maddr Multiaddr) Value() multiaddr.Multiaddr { return maddr.Multiaddr } // ID holds information about the Cluster peer type ID struct { ID peer.ID `json:"id" codec:"i,omitempty"` Addresses []Multiaddr `json:"addresses" codec:"a,omitempty"` ClusterPeers []peer.ID `json:"cluster_peers" codec:"cp,omitempty"` ClusterPeersAddresses []Multiaddr `json:"cluster_peers_addresses" codec:"cpa,omitempty"` Version string `json:"version" codec:"v,omitempty"` Commit string `json:"commit" codec:"c,omitempty"` RPCProtocolVersion protocol.ID `json:"rpc_protocol_version" codec:"rv,omitempty"` Error string `json:"error" codec:"e,omitempty"` IPFS IPFSID `json:"ipfs,omitempty" codec:"ip,omitempty"` Peername string `json:"peername" codec:"pn,omitempty"` //PublicKey crypto.PubKey } // IPFSID is used to store information about the underlying IPFS daemon type IPFSID struct { ID peer.ID `json:"id,omitempty" codec:"i,omitempty"` Addresses []Multiaddr `json:"addresses" codec:"a,omitempty"` Error string `json:"error" codec:"e,omitempty"` } // PinType specifies which sort of Pin object we are dealing with. // In practice, the PinType decides how a Pin object is treated by the // PinTracker. // See descriptions above. // A sharded Pin would look like: // // [ Meta ] (not pinned on IPFS, only present in cluster state) // | // v // [ Cluster DAG ] (pinned everywhere in "direct") // | .. | // v v // [Shard1] .. [ShardN] (allocated to peers and pinned with max-depth=1 // | | .. | | | .. | // v v .. v v v .. v // [][]..[] [][]..[] Blocks (indirectly pinned on ipfs, not tracked in cluster) // // type PinType uint64 // PinType values. See PinType documentation for further explanation. const ( // BadType type showing up anywhere indicates a bug BadType PinType = 1 << iota // DataType is a regular, non-sharded pin. It is pinned recursively. // It has no associated reference. DataType // MetaType tracks the original CID of a sharded DAG. Its Reference // points to the Cluster DAG CID. MetaType // ClusterDAGType pins carry the CID of the root node that points to // all the shard-root-nodes of the shards in which a DAG has been // divided. Its Reference carries the MetaType CID. // ClusterDAGType pins are pinned directly everywhere. ClusterDAGType // ShardType pins carry the root CID of a shard, which points // to individual blocks on the original DAG that the user is adding, // which has been sharded. // They carry a Reference to the previous shard. // ShardTypes are pinned with MaxDepth=1 (root and // direct children only). ShardType ) // AllType is a PinType used for filtering all pin types const AllType PinType = DataType | MetaType | ClusterDAGType | ShardType // PinTypeFromString is the inverse of String. It returns the PinType value // corresponding to the input string func PinTypeFromString(str string) PinType { switch str { case "pin": return DataType case "meta-pin": return MetaType case "clusterdag-pin": return ClusterDAGType case "shard-pin": return ShardType case "all": return AllType case "": return AllType default: return BadType } } // String returns a printable value to identify the PinType func (pT PinType) String() string { switch pT { case DataType: return "pin" case MetaType: return "meta-pin" case ClusterDAGType: return "clusterdag-pin" case ShardType: return "shard-pin" case AllType: return "all" default: return "bad-type" } } // MarshalJSON provides json-representation of the pin type. func (pT PinType) MarshalJSON() ([]byte, error) { return json.Marshal(pT.String()) } // UnmarshalJSON provides json-representation of the pin type. func (pT *PinType) UnmarshalJSON(b []byte) error { var str string err := json.Unmarshal(b, &str) if err != nil { return err } t := PinTypeFromString(str) *pT = t return nil } var pinOptionsMetaPrefix = "meta-" // PinMode is a PinOption that indicates how to pin something on IPFS, // recursively or direct. type PinMode int // PinMode values const ( PinModeRecursive PinMode = 0 PinModeDirect PinMode = 1 ) // PinModeFromString converst a string to PinMode. func PinModeFromString(s string) PinMode { switch s { case "recursive", "": return PinModeRecursive case "direct": return PinModeDirect default: logger.Warnf("unknown pin mode %s. Defaulting to recursive", s) return PinModeRecursive } } // String returns a human-readable value for PinMode. func (pm PinMode) String() string { switch pm { case PinModeRecursive: return "recursive" case PinModeDirect: return "direct" default: return "recursive" } } // ToIPFSPinStatus converts a PinMode to IPFSPinStatus. func (pm PinMode) ToIPFSPinStatus() IPFSPinStatus { if pm == PinModeDirect { return IPFSPinStatusDirect } if pm == PinModeRecursive { return IPFSPinStatusRecursive } return IPFSPinStatusBug } // MarshalJSON converts the PinMode into a readable string in JSON. func (pm PinMode) MarshalJSON() ([]byte, error) { return json.Marshal(pm.String()) } // UnmarshalJSON takes a JSON value and parses it into PinMode. func (pm *PinMode) UnmarshalJSON(b []byte) error { var s string err := json.Unmarshal(b, &s) if err != nil { return err } *pm = PinModeFromString(s) return nil } // ToPinDepth converts the Mode to Depth. func (pm PinMode) ToPinDepth() PinDepth { switch pm { case PinModeRecursive: return -1 case PinModeDirect: return 0 default: logger.Warn("unknown pin mode %d. Defaulting to -1 depth", pm) return -1 } } // PinOptions wraps user-defined options for Pins type PinOptions struct { ReplicationFactorMin int `json:"replication_factor_min" codec:"rn,omitempty"` ReplicationFactorMax int `json:"replication_factor_max" codec:"rx,omitempty"` Name string `json:"name" codec:"n,omitempty"` Mode PinMode `json:"mode" codec:"o,omitempty"` ShardSize uint64 `json:"shard_size" codec:"s,omitempty"` UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"` ExpireAt time.Time `json:"expire_at" codec:"e,omitempty"` Metadata map[string]string `json:"metadata" codec:"m,omitempty"` PinUpdate Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` Origins []Multiaddr `json:"origins" codec:"g,omitempty"` } // Equals returns true if two PinOption objects are equivalent. po and po2 may // be nil. func (po PinOptions) Equals(po2 PinOptions) bool { if po.Name != po2.Name { return false } if po.Mode != po2.Mode { return false } if po.ReplicationFactorMax != po2.ReplicationFactorMax { return false } if po.ReplicationFactorMin != po2.ReplicationFactorMin { return false } if po.ShardSize != po2.ShardSize { return false } lenAllocs1 := len(po.UserAllocations) lenAllocs2 := len(po2.UserAllocations) if lenAllocs1 != lenAllocs2 { return false } // avoid side effects in the original objects allocs1 := PeersToStrings(po.UserAllocations) allocs2 := PeersToStrings(po2.UserAllocations) sort.Strings(allocs1) sort.Strings(allocs2) if strings.Join(allocs1, ",") != strings.Join(allocs2, ",") { return false } if !po.ExpireAt.Equal(po2.ExpireAt) { return false } for k, v := range po.Metadata { v2 := po2.Metadata[k] if k != "" && v != v2 { return false } } // deliberately ignore Update lenOrigins1 := len(po.Origins) lenOrigins2 := len(po2.Origins) if lenOrigins1 != lenOrigins2 { return false } for _, o1 := range po.Origins { found := false for _, o2 := range po2.Origins { if o1.Value().Equal(o2.Value()) { found = true } } if !found { return false } } return true } // ToQuery returns the PinOption as query arguments. func (po PinOptions) ToQuery() (string, error) { q := url.Values{} q.Set("replication-min", fmt.Sprintf("%d", po.ReplicationFactorMin)) q.Set("replication-max", fmt.Sprintf("%d", po.ReplicationFactorMax)) q.Set("name", po.Name) q.Set("mode", po.Mode.String()) q.Set("shard-size", fmt.Sprintf("%d", po.ShardSize)) q.Set("user-allocations", strings.Join(PeersToStrings(po.UserAllocations), ",")) if !po.ExpireAt.IsZero() { v, err := po.ExpireAt.MarshalText() if err != nil { return "", err } q.Set("expire-at", string(v)) } for k, v := range po.Metadata { if k == "" { continue } q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v) } if po.PinUpdate.Defined() { q.Set("pin-update", po.PinUpdate.String()) } if len(po.Origins) > 0 { origins := make([]string, len(po.Origins)) for i, o := range po.Origins { origins[i] = o.String() } q.Set("origins", strings.Join(origins, ",")) } return q.Encode(), nil } // FromQuery is the inverse of ToQuery(). func (po *PinOptions) FromQuery(q url.Values) error { po.Name = q.Get("name") po.Mode = PinModeFromString(q.Get("mode")) rplStr := q.Get("replication") if rplStr != "" { // override q.Set("replication-min", rplStr) q.Set("replication-max", rplStr) } err := parseIntParam(q, "replication-min", &po.ReplicationFactorMin) if err != nil { return err } err = parseIntParam(q, "replication-max", &po.ReplicationFactorMax) if err != nil { return err } if v := q.Get("shard-size"); v != "" { shardSize, err := strconv.ParseUint(v, 10, 64) if err != nil { return errors.New("parameter shard_size is invalid") } po.ShardSize = shardSize } if allocs := q.Get("user-allocations"); allocs != "" { po.UserAllocations = StringsToPeers(strings.Split(allocs, ",")) } if v := q.Get("expire-at"); v != "" { var tm time.Time err := tm.UnmarshalText([]byte(v)) if err != nil { return errors.Wrap(err, "expire-at cannot be parsed") } po.ExpireAt = tm } else if v = q.Get("expire-in"); v != "" { d, err := time.ParseDuration(v) if err != nil { return errors.Wrap(err, "expire-in cannot be parsed") } if d < time.Second { return errors.New("expire-in duration too short") } po.ExpireAt = time.Now().Add(d) } po.Metadata = make(map[string]string) for k := range q { if !strings.HasPrefix(k, pinOptionsMetaPrefix) { continue } metaKey := strings.TrimPrefix(k, pinOptionsMetaPrefix) if metaKey == "" { continue } po.Metadata[metaKey] = q.Get(k) } updateStr := q.Get("pin-update") if updateStr != "" { updateCid, err := DecodeCid(updateStr) if err != nil { return fmt.Errorf("error decoding update option parameter: %s", err) } po.PinUpdate = updateCid } originsStr := q.Get("origins") if originsStr != "" { origins := strings.Split(originsStr, ",") maOrigins := make([]Multiaddr, len(origins)) for i, ostr := range origins { maOrig, err := NewMultiaddr(ostr) if err != nil { return fmt.Errorf("error decoding multiaddress: %w", err) } _, err = maOrig.ValueForProtocol(multiaddr.P_P2P) if err != nil { return fmt.Errorf("multiaddress does not contain peer ID: %w", err) } maOrigins[i] = maOrig } po.Origins = maOrigins } return nil } // PinDepth indicates how deep a pin should be pinned, with // -1 meaning "to the bottom", or "recursive". type PinDepth int // ToPinMode converts PinDepth to PinMode func (pd PinDepth) ToPinMode() PinMode { switch pd { case -1: return PinModeRecursive case 0: return PinModeDirect default: logger.Warnf("bad pin depth: %d", pd) return PinModeRecursive } } // Pin carries all the information associated to a CID that is pinned // in IPFS Cluster. It also carries transient information (that may not // get protobuffed, like UserAllocations). type Pin struct { PinOptions Cid Cid `json:"cid" codec:"c"` // See PinType comments Type PinType `json:"type" codec:"t,omitempty"` // The peers to which this pin is allocated Allocations []peer.ID `json:"allocations" codec:"a,omitempty"` // MaxDepth associated to this pin. -1 means // recursive. MaxDepth PinDepth `json:"max_depth" codec:"d,omitempty"` // We carry a reference CID to this pin. For // ClusterDAGs, it is the MetaPin CID. For the // MetaPin it is the ClusterDAG CID. For Shards, // it is the previous shard CID. // When not needed the pointer is nil Reference *Cid `json:"reference" codec:"r,omitempty"` // The time that the pin was submitted to the consensus layer. Timestamp time.Time `json:"timestamp" codec:"i,omitempty"` } // String is a string representation of a Pin. func (pin Pin) String() string { var b strings.Builder fmt.Fprintf(&b, "cid: %s\n", pin.Cid.String()) fmt.Fprintf(&b, "type: %s\n", pin.Type) fmt.Fprintf(&b, "allocations: %v\n", pin.Allocations) fmt.Fprintf(&b, "maxdepth: %d\n", pin.MaxDepth) if pin.Reference != nil { fmt.Fprintf(&b, "reference: %s\n", pin.Reference) } return b.String() } // IsPinEverywhere returns when the both replication factors are set to -1. func (pin Pin) IsPinEverywhere() bool { return pin.ReplicationFactorMin == -1 && pin.ReplicationFactorMax == -1 } // PinPath is a wrapper for holding pin options and path of the content. type PinPath struct { PinOptions Path string `json:"path"` } // Defined returns if the path has a value. func (pp PinPath) Defined() bool { return pp.Path != "" } // PinCid is a shortcut to create a Pin only with a Cid. Default is for pin to // be recursive and the pin to be of DataType. func PinCid(c Cid) Pin { return Pin{ Cid: c, Type: DataType, Allocations: []peer.ID{}, MaxDepth: -1, // Recursive Timestamp: time.Now(), } } // PinWithOpts creates a new Pin calling PinCid(c) and then sets its // PinOptions fields with the given options. Pin fields that are linked to // options are set accordingly (MaxDepth from Mode). func PinWithOpts(c Cid, opts PinOptions) Pin { p := PinCid(c) p.PinOptions = opts p.MaxDepth = p.Mode.ToPinDepth() return p } func convertPinType(t PinType) pb.Pin_PinType { var i pb.Pin_PinType for t != 1 { if t == 0 { return pb.Pin_BadType } t = t >> 1 i++ } return i } // ProtoMarshal marshals this Pin using probobuf. func (pin Pin) ProtoMarshal() ([]byte, error) { allocs := make([][]byte, len(pin.Allocations)) for i, pid := range pin.Allocations { bs, err := pid.Marshal() if err != nil { return nil, err } allocs[i] = bs } // Cursory google search says len=0 slices will be // decoded as null, which is fine. origins := make([][]byte, len(pin.Origins)) for i, orig := range pin.Origins { origins[i] = orig.Bytes() } var expireAtProto uint64 // Only set the protobuf field with non-zero times. if !(pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero)) { expireAtProto = uint64(pin.ExpireAt.Unix()) } var timestampProto uint64 // Only set the protobuf field with non-zero times. if !(pin.Timestamp.IsZero() || pin.Timestamp.Equal(unixZero)) { timestampProto = uint64(pin.Timestamp.Unix()) } // Our metadata needs to always be seralized in exactly the same way, // and that is why we use an array sorted by key and deprecated using // a protobuf map. var sortedMetadata []*pb.Metadata var metaKeys []string for k := range pin.Metadata { metaKeys = append(metaKeys, k) } sort.Strings(metaKeys) for _, k := range metaKeys { metadata := &pb.Metadata{ Key: k, Value: pin.Metadata[k], } sortedMetadata = append(sortedMetadata, metadata) } opts := &pb.PinOptions{ ReplicationFactorMin: int32(pin.ReplicationFactorMin), ReplicationFactorMax: int32(pin.ReplicationFactorMax), Name: pin.Name, ShardSize: pin.ShardSize, // Metadata: pin.Metadata, PinUpdate: pin.PinUpdate.Bytes(), ExpireAt: expireAtProto, // Mode: pin.Mode, // UserAllocations: pin.UserAllocations, Origins: origins, SortedMetadata: sortedMetadata, } pbPin := &pb.Pin{ Cid: pin.Cid.Bytes(), Type: convertPinType(pin.Type), Allocations: allocs, MaxDepth: int32(pin.MaxDepth), Options: opts, Timestamp: timestampProto, } if ref := pin.Reference; ref != nil { pbPin.Reference = ref.Bytes() } return proto.Marshal(pbPin) } // ProtoUnmarshal unmarshals this fields from protobuf-encoded bytes. func (pin *Pin) ProtoUnmarshal(data []byte) error { pbPin := pb.Pin{} err := proto.Unmarshal(data, &pbPin) if err != nil { return err } ci, err := CastCid(pbPin.GetCid()) if err != nil { pin.Cid = CidUndef } else { pin.Cid = ci } pin.Type = 1 << uint64(pbPin.GetType()) pbAllocs := pbPin.GetAllocations() lenAllocs := len(pbAllocs) allocs := make([]peer.ID, lenAllocs) for i, pidb := range pbAllocs { pid, err := peer.IDFromBytes(pidb) if err != nil { return err } allocs[i] = pid } pin.Allocations = allocs pin.MaxDepth = PinDepth(pbPin.GetMaxDepth()) ref, err := CastCid(pbPin.GetReference()) if err != nil { pin.Reference = nil } else { pin.Reference = &ref } ts := pbPin.GetTimestamp() if ts > 0 { pin.Timestamp = time.Unix(int64(ts), 0) } opts := pbPin.GetOptions() pin.ReplicationFactorMin = int(opts.GetReplicationFactorMin()) pin.ReplicationFactorMax = int(opts.GetReplicationFactorMax()) pin.Name = opts.GetName() pin.ShardSize = opts.GetShardSize() // pin.UserAllocations = opts.GetUserAllocations() exp := opts.GetExpireAt() if exp > 0 { pin.ExpireAt = time.Unix(int64(exp), 0) } // Use whatever metadata is available. //lint:ignore SA1019 we keed to keep backwards compat pin.Metadata = opts.GetMetadata() sortedMetadata := opts.GetSortedMetadata() if len(sortedMetadata) > 0 && pin.Metadata == nil { pin.Metadata = make(map[string]string, len(sortedMetadata)) } for _, md := range opts.GetSortedMetadata() { pin.Metadata[md.Key] = md.Value } pinUpdate, err := CastCid(opts.GetPinUpdate()) if err == nil { pin.PinUpdate = pinUpdate } // We do not store the PinMode option but we can // derive it from the MaxDepth setting. pin.Mode = pin.MaxDepth.ToPinMode() pbOrigins := opts.GetOrigins() origins := make([]Multiaddr, len(pbOrigins)) for i, orig := range pbOrigins { maOrig, err := multiaddr.NewMultiaddrBytes(orig) if err != nil { return err } origins[i] = NewMultiaddrWithValue(maOrig) } pin.Origins = origins return nil } // Equals checks if two pins are the same (with the same allocations). // If allocations are the same but in different order, they are still // considered equivalent. func (pin Pin) Equals(pin2 Pin) bool { if !pin.Cid.Equals(pin2.Cid) { return false } if pin.Type != pin2.Type { return false } if pin.MaxDepth != pin2.MaxDepth { return false } if pin.Reference != nil && pin2.Reference == nil || pin.Reference == nil && pin2.Reference != nil { return false } if pin.Reference != nil && pin2.Reference != nil && !pin.Reference.Equals(*pin2.Reference) { return false } allocs1 := PeersToStrings(pin.Allocations) sort.Strings(allocs1) allocs2 := PeersToStrings(pin2.Allocations) sort.Strings(allocs2) if strings.Join(allocs1, ",") != strings.Join(allocs2, ",") { return false } return pin.PinOptions.Equals(pin2.PinOptions) } // IsRemotePin determines whether a Pin's ReplicationFactor has // been met, so as to either pin or unpin it from the peer. func (pin Pin) IsRemotePin(pid peer.ID) bool { if pin.IsPinEverywhere() { return false } for _, p := range pin.Allocations { if p == pid { return false } } return true } // ExpiredAt returns whether the pin has expired at the given time. func (pin Pin) ExpiredAt(t time.Time) bool { if pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero) { return false } return pin.ExpireAt.Before(t) } // Defined returns true if this is not a zero-object pin (the CID must be set). func (pin Pin) Defined() bool { return pin.Cid.Defined() } // NodeWithMeta specifies a block of data and a set of optional metadata fields // carrying information about the encoded ipld node type NodeWithMeta struct { Data []byte `codec:"d,omitempty"` Cid Cid `codec:"c,omitempty"` CumSize uint64 `codec:"s,omitempty"` // Cumulative size } // Size returns how big is the block. It is different from CumSize, which // records the size of the underlying tree. func (n *NodeWithMeta) Size() uint64 { return uint64(len(n.Data)) } // MetricsSet is a map to carry slices of metrics indexed by type. type MetricsSet map[string][]Metric // Metric transports information about a peer.ID. It is used to decide // pin allocations by a PinAllocator. IPFS cluster is agnostic to // the Value, which should be interpreted by the PinAllocator. // The ReceivedAt value is a timestamp representing when a peer has received // the metric value. type Metric struct { Name string `json:"name" codec:"n,omitempty"` Peer peer.ID `json:"peer" codec:"p,omitempty"` Value string `json:"value" codec:"v,omitempty"` Expire int64 `json:"expire" codec:"e,omitempty"` Valid bool `json:"valid" codec:"d,omitempty"` Weight int64 `json:"weight" codec:"w,omitempty"` Partitionable bool `json:"partitionable" codec:"o,omitempty"` ReceivedAt int64 `json:"received_at" codec:"t,omitempty"` // ReceivedAt contains a UnixNano timestamp } func (m Metric) String() string { return fmt.Sprintf("%s | %s | %s | Recv: %d | Exp: %d | W: %d | Part: %t | Valid: %t", m.Name, m.Peer, m.Value, m.ReceivedAt, m.Expire, m.Weight, m.Partitionable, m.Valid, ) } // Defined returns true if the metric name is set. func (m Metric) Defined() bool { return m.Name != "" } // SetTTL sets Metric to expire after the given time.Duration func (m *Metric) SetTTL(d time.Duration) { exp := time.Now().Add(d) m.Expire = exp.UnixNano() } // GetTTL returns the time left before the Metric expires func (m Metric) GetTTL() time.Duration { expDate := time.Unix(0, m.Expire) ttl := time.Until(expDate) if ttl < 0 { ttl = 0 } return ttl } // Expired returns if the Metric has expired func (m Metric) Expired() bool { expDate := time.Unix(0, m.Expire) return time.Now().After(expDate) } // Discard returns if the metric not valid or has expired func (m Metric) Discard() bool { return !m.Valid || m.Expired() } // GetWeight returns the weight of the metric. // This is for compatiblity. func (m Metric) GetWeight() int64 { return m.Weight } // MetricSlice is a sortable Metric array. type MetricSlice []Metric func (es MetricSlice) Len() int { return len(es) } func (es MetricSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } func (es MetricSlice) Less(i, j int) bool { if es[i].Peer == es[j].Peer { return es[i].Expire < es[j].Expire } return es[i].Peer < es[j].Peer } // Alert carries alerting information about a peer. type Alert struct { Metric TriggeredAt time.Time `json:"triggered_at" codec:"r,omitempty"` } // Error can be used by APIs to return errors. type Error struct { Code int `json:"code" codec:"o,omitempty"` Message string `json:"message" codec:"m,omitempty"` } // Error implements the error interface and returns the error's message. func (e Error) Error() string { return fmt.Sprintf("%s (%d)", e.Message, e.Code) } // IPFSRepoStat wraps information about the IPFS repository. type IPFSRepoStat struct { RepoSize uint64 `codec:"r,omitempty"` StorageMax uint64 `codec:"s, omitempty"` } // IPFSRepoGC represents the streaming response sent from repo gc API of IPFS. type IPFSRepoGC struct { Key Cid `json:"key,omitempty" codec:"k,omitempty"` Error string `json:"error,omitempty" codec:"e,omitempty"` } // RepoGC contains garbage collected CIDs from a cluster peer's IPFS daemon. type RepoGC struct { Peer peer.ID `json:"peer" codec:"p,omitempty"` // the Cluster peer ID Peername string `json:"peername" codec:"pn,omitempty"` Keys []IPFSRepoGC `json:"keys" codec:"k"` Error string `json:"error,omitempty" codec:"e,omitempty"` } // GlobalRepoGC contains cluster-wide information about garbage collected CIDs // from IPFS. type GlobalRepoGC struct { PeerMap map[string]RepoGC `json:"peer_map" codec:"pm,omitempty"` }