Improve PinLsCid to check for pinned items
Receive the full pin object so that it can decide whether to check for recursive or direct pins directly. Additionally, unpin will not check for the pin presence anymore and simply trigger unpins (ignoring errors)
This commit is contained in:
parent
99eb29a7d6
commit
fa762d78cf
|
@ -71,8 +71,8 @@ func (ipfs *mockConnector) Unpin(ctx context.Context, c cid.Cid) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ipfs *mockConnector) PinLsCid(ctx context.Context, c cid.Cid) (api.IPFSPinStatus, error) {
|
||||
dI, ok := ipfs.pins.Load(c.String())
|
||||
func (ipfs *mockConnector) PinLsCid(ctx context.Context, pin *api.Pin) (api.IPFSPinStatus, error) {
|
||||
dI, ok := ipfs.pins.Load(pin.Cid.String())
|
||||
if !ok {
|
||||
return api.IPFSPinStatusUnpinned, nil
|
||||
}
|
||||
|
|
1
go.mod
1
go.mod
|
@ -26,6 +26,7 @@ require (
|
|||
github.com/ipfs/go-ipfs-config v0.5.2
|
||||
github.com/ipfs/go-ipfs-ds-help v1.0.0
|
||||
github.com/ipfs/go-ipfs-files v0.0.8
|
||||
github.com/ipfs/go-ipfs-pinner v0.0.4
|
||||
github.com/ipfs/go-ipfs-posinfo v0.0.1
|
||||
github.com/ipfs/go-ipld-cbor v0.0.4
|
||||
github.com/ipfs/go-ipld-format v0.2.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -299,6 +299,7 @@ github.com/ipfs/go-datastore v0.1.0 h1:TOxI04l8CmO4zGtesENhzm4PwkFwJXY3rKiYaaMf9
|
|||
github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
|
||||
github.com/ipfs/go-datastore v0.1.1 h1:F4k0TkTAZGLFzBOrVKDAvch6JZtuN4NHkfdcEZL50aI=
|
||||
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
|
||||
github.com/ipfs/go-datastore v0.3.0/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
|
||||
github.com/ipfs/go-datastore v0.3.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
|
||||
github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
||||
github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
|
||||
|
@ -362,6 +363,8 @@ github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN
|
|||
github.com/ipfs/go-ipfs-files v0.0.6/go.mod h1:lVYE6sgAdtZN5825beJjSAHibw7WOBNPDWz5LaJeukg=
|
||||
github.com/ipfs/go-ipfs-files v0.0.8 h1:8o0oFJkJ8UkO/ABl8T6ac6tKF3+NIpj67aAB6ZpusRg=
|
||||
github.com/ipfs/go-ipfs-files v0.0.8/go.mod h1:wiN/jSG8FKyk7N0WyctKSvq3ljIa2NNTiZB55kpTdOs=
|
||||
github.com/ipfs/go-ipfs-pinner v0.0.4 h1:EmxhS3vDsCK/rZrsgxX0Le9m2drBcGlUd7ah/VyFYVE=
|
||||
github.com/ipfs/go-ipfs-pinner v0.0.4/go.mod h1:s4kFZWLWGDudN8Jyd/GTpt222A12C2snA2+OTdy/7p8=
|
||||
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
|
||||
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
|
||||
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
|
||||
|
@ -410,6 +413,7 @@ github.com/ipfs/go-merkledag v0.1.0 h1:CAEXjRFEDPvealQj3TgEjV1IJckwjvmxAqtq5QSXJ
|
|||
github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
|
||||
github.com/ipfs/go-merkledag v0.2.3 h1:aMdkK9G1hEeNvn3VXfiEMLY0iJnbiQQUHnM0HFJREsE=
|
||||
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
|
||||
github.com/ipfs/go-merkledag v0.3.0/go.mod h1:4pymaZLhSLNVuiCITYrpViD6vmfZ/Ws4n/L9tfNv3S4=
|
||||
github.com/ipfs/go-merkledag v0.3.1 h1:3UqWINBEr3/N+r6OwgFXAddDP/8zpQX/8J7IGVOCqRQ=
|
||||
github.com/ipfs/go-merkledag v0.3.1/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M=
|
||||
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
|
||||
|
|
|
@ -77,7 +77,7 @@ type IPFSConnector interface {
|
|||
ID(context.Context) (*api.IPFSID, error)
|
||||
Pin(context.Context, *api.Pin) error
|
||||
Unpin(context.Context, cid.Cid) error
|
||||
PinLsCid(context.Context, cid.Cid) (api.IPFSPinStatus, error)
|
||||
PinLsCid(context.Context, *api.Pin) (api.IPFSPinStatus, error)
|
||||
PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error)
|
||||
// ConnectSwarms make sure this peer's IPFS daemon is connected to
|
||||
// other peers IPFS daemons.
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
pinner "github.com/ipfs/go-ipfs-pinner"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
gopath "github.com/ipfs/go-path"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
|
@ -70,9 +71,20 @@ type Connector struct {
|
|||
}
|
||||
|
||||
type ipfsError struct {
|
||||
path string
|
||||
code int
|
||||
Message string
|
||||
}
|
||||
|
||||
func (ie ipfsError) Error() string {
|
||||
return fmt.Sprintf(
|
||||
"IPFS request unsuccessful (%s). Code: %d. Message: %s",
|
||||
ie.path,
|
||||
ie.code,
|
||||
ie.Message,
|
||||
)
|
||||
}
|
||||
|
||||
type ipfsPinType struct {
|
||||
Type string
|
||||
}
|
||||
|
@ -298,7 +310,7 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
|
|||
hash := pin.Cid
|
||||
maxDepth := pin.MaxDepth
|
||||
|
||||
pinStatus, err := ipfs.PinLsCid(ctx, hash)
|
||||
pinStatus, err := ipfs.PinLsCid(ctx, pin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -317,7 +329,8 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
|
|||
// is pinned recursively, then do pin/update.
|
||||
// Otherwise do a normal pin.
|
||||
if from := pin.PinUpdate; from != cid.Undef {
|
||||
pinStatus, _ := ipfs.PinLsCid(ctx, from)
|
||||
fromPin := api.PinWithOpts(from, pin.PinOptions)
|
||||
pinStatus, _ := ipfs.PinLsCid(ctx, fromPin)
|
||||
if pinStatus.IsPinned(-1) { // pinned recursively.
|
||||
// As a side note, if PinUpdate == pin.Cid, we are
|
||||
// somehow pinning an already pinned thing and we'd
|
||||
|
@ -433,31 +446,30 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error {
|
|||
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Unpin")
|
||||
defer span.End()
|
||||
|
||||
pinStatus, err := ipfs.PinLsCid(ctx, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
if ipfs.config.UnpinDisable {
|
||||
return errors.New("ipfs unpinning is disallowed by configuration on this peer")
|
||||
}
|
||||
|
||||
if pinStatus.IsPinned(-1) {
|
||||
if ipfs.config.UnpinDisable {
|
||||
return errors.New("ipfs unpinning is disallowed by configuration on this peer")
|
||||
}
|
||||
defer ipfs.updateInformerMetric(ctx)
|
||||
|
||||
defer ipfs.updateInformerMetric(ctx)
|
||||
path := fmt.Sprintf("pin/rm?arg=%s", hash)
|
||||
path := fmt.Sprintf("pin/rm?arg=%s", hash)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, ipfs.config.UnpinTimeout)
|
||||
defer cancel()
|
||||
ctx, cancel := context.WithTimeout(ctx, ipfs.config.UnpinTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := ipfs.postCtx(ctx, path, "", nil)
|
||||
if err != nil {
|
||||
// We will call unpin in any case, if the CID is not pinned,
|
||||
// then we ignore the error (although this is a bit flaky).
|
||||
_, err := ipfs.postCtx(ctx, path, "", nil)
|
||||
if err != nil {
|
||||
ipfsErr, ok := err.(ipfsError)
|
||||
if !ok || ipfsErr.Message != pinner.ErrNotPinned.Error() {
|
||||
return err
|
||||
}
|
||||
logger.Info("IPFS Unpin request succeeded:", hash)
|
||||
stats.Record(ctx, observations.Pins.M(-1))
|
||||
logger.Debug("IPFS object is already unpinned: ", hash)
|
||||
}
|
||||
|
||||
logger.Debug("IPFS object is already unpinned: ", hash)
|
||||
logger.Info("IPFS Unpin request succeeded:", hash)
|
||||
stats.Record(ctx, observations.Pins.M(-1))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -491,34 +503,21 @@ func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string
|
|||
return statusMap, nil
|
||||
}
|
||||
|
||||
// PinLsCid performs a "pin ls <hash>" request. It first tries with
|
||||
// "type=recursive" and then, if not found, with "type=direct". It returns an
|
||||
// api.IPFSPinStatus for that hash.
|
||||
func (ipfs *Connector) PinLsCid(ctx context.Context, hash cid.Cid) (api.IPFSPinStatus, error) {
|
||||
// PinLsCid performs a "pin ls <hash>" request. It will use "type=recursive" or
|
||||
// "type=direct" (or other) depending on the given pin's MaxDepth setting.
|
||||
// It returns an api.IPFSPinStatus for that hash.
|
||||
func (ipfs *Connector) PinLsCid(ctx context.Context, pin *api.Pin) (api.IPFSPinStatus, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/PinLsCid")
|
||||
defer span.End()
|
||||
|
||||
pinLsType := func(pinType string) ([]byte, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
|
||||
defer cancel()
|
||||
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=%s", hash, pinType)
|
||||
return ipfs.postCtx(ctx, lsPath, "", nil)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var body []byte
|
||||
var err error
|
||||
// FIXME: Sharding may need to check more pin types here.
|
||||
for _, pinType := range []string{"recursive", "direct"} {
|
||||
body, err = pinLsType(pinType)
|
||||
// Network error, daemon down
|
||||
if body == nil && err != nil {
|
||||
return api.IPFSPinStatusError, err
|
||||
}
|
||||
|
||||
// Pin found. Do not keep looking.
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
pinType := pin.MaxDepth.ToPinMode().String()
|
||||
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=%s", pin.Cid, pinType)
|
||||
body, err := ipfs.postCtx(ctx, lsPath, "", nil)
|
||||
if body == nil && err != nil { // Network error, daemon down
|
||||
return api.IPFSPinStatusError, err
|
||||
}
|
||||
|
||||
if err != nil { // we could not find the pin
|
||||
|
@ -537,7 +536,7 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, hash cid.Cid) (api.IPFSPinS
|
|||
// we parse as CID. There should only be one returned key.
|
||||
for k, pinObj := range res.Keys {
|
||||
c, err := cid.Decode(k)
|
||||
if err != nil || !c.Equals(hash) {
|
||||
if err != nil || !c.Equals(pin.Cid) {
|
||||
continue
|
||||
}
|
||||
return api.IPFSPinStatusFromString(pinObj.Type), nil
|
||||
|
@ -575,12 +574,9 @@ func checkResponse(path string, res *http.Response) ([]byte, error) {
|
|||
if err == nil {
|
||||
var ipfsErr ipfsError
|
||||
if err := json.Unmarshal(body, &ipfsErr); err == nil {
|
||||
return body, fmt.Errorf(
|
||||
"IPFS request unsuccessful (%s). Code: %d. Message: %s",
|
||||
path,
|
||||
res.StatusCode,
|
||||
ipfsErr.Message,
|
||||
)
|
||||
ipfsErr.code = res.StatusCode
|
||||
ipfsErr.path = path
|
||||
return body, ipfsErr
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -75,12 +75,12 @@ func TestPin(t *testing.T) {
|
|||
defer mock.Close()
|
||||
defer ipfs.Shutdown(ctx)
|
||||
|
||||
c := test.Cid1
|
||||
err := ipfs.Pin(ctx, api.PinCid(c))
|
||||
pin := api.PinCid(test.Cid1)
|
||||
err := ipfs.Pin(ctx, pin)
|
||||
if err != nil {
|
||||
t.Error("expected success pinning cid:", err)
|
||||
}
|
||||
pinSt, err := ipfs.PinLsCid(ctx, c)
|
||||
pinSt, err := ipfs.PinLsCid(ctx, pin)
|
||||
if err != nil {
|
||||
t.Fatal("expected success doing ls:", err)
|
||||
}
|
||||
|
@ -88,8 +88,8 @@ func TestPin(t *testing.T) {
|
|||
t.Error("cid should have been pinned")
|
||||
}
|
||||
|
||||
c2 := test.ErrorCid
|
||||
err = ipfs.Pin(ctx, api.PinCid(c2))
|
||||
pin2 := api.PinCid(test.ErrorCid)
|
||||
err = ipfs.Pin(ctx, pin2)
|
||||
if err == nil {
|
||||
t.Error("expected error pinning cid")
|
||||
}
|
||||
|
@ -178,8 +178,9 @@ func TestIPFSPinLsCid(t *testing.T) {
|
|||
c := test.Cid1
|
||||
c2 := test.Cid2
|
||||
|
||||
ipfs.Pin(ctx, api.PinCid(c))
|
||||
ips, err := ipfs.PinLsCid(ctx, c)
|
||||
pin := api.PinCid(c)
|
||||
ipfs.Pin(ctx, pin)
|
||||
ips, err := ipfs.PinLsCid(ctx, pin)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -188,7 +189,7 @@ func TestIPFSPinLsCid(t *testing.T) {
|
|||
t.Error("c should appear pinned")
|
||||
}
|
||||
|
||||
ips, err = ipfs.PinLsCid(ctx, c2)
|
||||
ips, err = ipfs.PinLsCid(ctx, api.PinCid(c2))
|
||||
if err != nil || ips != api.IPFSPinStatusUnpinned {
|
||||
t.Error("c2 should appear unpinned")
|
||||
}
|
||||
|
@ -201,8 +202,9 @@ func TestIPFSPinLsCid_DifferentEncoding(t *testing.T) {
|
|||
defer ipfs.Shutdown(ctx)
|
||||
c := test.Cid4 // ipfs mock treats this specially
|
||||
|
||||
ipfs.Pin(ctx, api.PinCid(c))
|
||||
ips, err := ipfs.PinLsCid(ctx, c)
|
||||
pin := api.PinCid(c)
|
||||
ipfs.Pin(ctx, pin)
|
||||
ips, err := ipfs.PinLsCid(ctx, pin)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
|
@ -352,7 +352,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
|
|||
"",
|
||||
"IPFSConnector",
|
||||
"PinLsCid",
|
||||
c,
|
||||
gpin,
|
||||
&ips,
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
@ -68,8 +68,8 @@ func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
|
||||
switch in {
|
||||
func (mock *mockIPFS) PinLsCid(ctx context.Context, in *api.Pin, out *api.IPFSPinStatus) error {
|
||||
switch in.Cid {
|
||||
case test.Cid1, test.Cid2:
|
||||
*out = api.IPFSPinStatusRecursive
|
||||
default:
|
||||
|
|
|
@ -499,7 +499,7 @@ func (rpcapi *IPFSConnectorRPCAPI) Unpin(ctx context.Context, in *api.Pin, out *
|
|||
}
|
||||
|
||||
// PinLsCid runs IPFSConnector.PinLsCid().
|
||||
func (rpcapi *IPFSConnectorRPCAPI) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
|
||||
func (rpcapi *IPFSConnectorRPCAPI) PinLsCid(ctx context.Context, in *api.Pin, out *api.IPFSPinStatus) error {
|
||||
b, err := rpcapi.ipfs.PinLsCid(ctx, in)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -447,8 +447,8 @@ func (mock *mockIPFSConnector) Unpin(ctx context.Context, in *api.Pin, out *stru
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockIPFSConnector) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
|
||||
if in.Equals(Cid1) || in.Equals(Cid3) {
|
||||
func (mock *mockIPFSConnector) PinLsCid(ctx context.Context, in *api.Pin, out *api.IPFSPinStatus) error {
|
||||
if in.Cid.Equals(Cid1) || in.Cid.Equals(Cid3) {
|
||||
*out = api.IPFSPinStatusRecursive
|
||||
} else {
|
||||
*out = api.IPFSPinStatusUnpinned
|
||||
|
|
Loading…
Reference in New Issue
Block a user