ipfsconn/ipfshttp: Pass ctx through from rpc_api
to the ipfscluster.IPFSConnector interface and then to the implementation of that interface in ipfsconn/ipfshttp. This allows calls from MapPinTracker to cancel requests made to the local IPFS node. License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This commit is contained in:
parent
ab2a883a3d
commit
9e20e4e3b2
|
@ -1,6 +1,7 @@
|
|||
package ipfscluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -51,28 +52,28 @@ func (ipfs *mockConnector) ID() (api.IPFSID, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (ipfs *mockConnector) Pin(c *cid.Cid, b bool) error {
|
||||
func (ipfs *mockConnector) Pin(ctx context.Context, c *cid.Cid, b bool) error {
|
||||
if ipfs.returnError {
|
||||
return errors.New("")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ipfs *mockConnector) Unpin(c *cid.Cid) error {
|
||||
func (ipfs *mockConnector) Unpin(ctx context.Context, c *cid.Cid) error {
|
||||
if ipfs.returnError {
|
||||
return errors.New("")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ipfs *mockConnector) PinLsCid(c *cid.Cid) (api.IPFSPinStatus, error) {
|
||||
func (ipfs *mockConnector) PinLsCid(ctx context.Context, c *cid.Cid) (api.IPFSPinStatus, error) {
|
||||
if ipfs.returnError {
|
||||
return api.IPFSPinStatusError, errors.New("")
|
||||
}
|
||||
return api.IPFSPinStatusRecursive, nil
|
||||
}
|
||||
|
||||
func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, error) {
|
||||
func (ipfs *mockConnector) PinLs(ctx context.Context, filter string) (map[string]api.IPFSPinStatus, error) {
|
||||
if ipfs.returnError {
|
||||
return nil, errors.New("")
|
||||
}
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
package ipfscluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/state"
|
||||
|
||||
|
@ -70,10 +72,10 @@ type API interface {
|
|||
type IPFSConnector interface {
|
||||
Component
|
||||
ID() (api.IPFSID, error)
|
||||
Pin(*cid.Cid, bool) error
|
||||
Unpin(*cid.Cid) error
|
||||
PinLsCid(*cid.Cid) (api.IPFSPinStatus, error)
|
||||
PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error)
|
||||
Pin(context.Context, *cid.Cid, bool) error
|
||||
Unpin(context.Context, *cid.Cid) error
|
||||
PinLsCid(context.Context, *cid.Cid) (api.IPFSPinStatus, error)
|
||||
PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error)
|
||||
// ConnectSwarms make sure this peer's IPFS daemon is connected to
|
||||
// other peers IPFS daemons.
|
||||
ConnectSwarms() error
|
||||
|
|
|
@ -607,8 +607,8 @@ func (ipfs *Connector) ID() (api.IPFSID, error) {
|
|||
|
||||
// Pin performs a pin request against the configured IPFS
|
||||
// daemon.
|
||||
func (ipfs *Connector) Pin(hash *cid.Cid, recursive bool) error {
|
||||
pinStatus, err := ipfs.PinLsCid(hash)
|
||||
func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) error {
|
||||
pinStatus, err := ipfs.PinLsCid(ctx, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -636,8 +636,8 @@ func (ipfs *Connector) Pin(hash *cid.Cid, recursive bool) error {
|
|||
|
||||
// Unpin performs an unpin request against the configured IPFS
|
||||
// daemon.
|
||||
func (ipfs *Connector) Unpin(hash *cid.Cid) error {
|
||||
pinStatus, err := ipfs.PinLsCid(hash)
|
||||
func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error {
|
||||
pinStatus, err := ipfs.PinLsCid(ctx, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -656,7 +656,7 @@ func (ipfs *Connector) Unpin(hash *cid.Cid) error {
|
|||
|
||||
// PinLs performs a "pin ls --type typeFilter" request against the configured
|
||||
// IPFS daemon and returns a map of cid strings and their status.
|
||||
func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) {
|
||||
func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) {
|
||||
body, err := ipfs.post("pin/ls?type=" + typeFilter)
|
||||
|
||||
// Some error talking to the daemon
|
||||
|
@ -681,7 +681,7 @@ func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, e
|
|||
|
||||
// PinLsCid performs a "pin ls --type=recursive <hash> "request and returns
|
||||
// an api.IPFSPinStatus for that hash.
|
||||
func (ipfs *Connector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) {
|
||||
func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPinStatus, error) {
|
||||
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash)
|
||||
body, err := ipfs.post(lsPath)
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package ipfshttp
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -76,6 +77,7 @@ func TestIPFSID(t *testing.T) {
|
|||
}
|
||||
|
||||
func testPin(t *testing.T, method string) {
|
||||
ctx := context.Background()
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
defer ipfs.Shutdown()
|
||||
|
@ -83,11 +85,11 @@ func testPin(t *testing.T, method string) {
|
|||
ipfs.config.PinMethod = method
|
||||
|
||||
c, _ := cid.Decode(test.TestCid1)
|
||||
err := ipfs.Pin(c, true)
|
||||
err := ipfs.Pin(ctx, c, true)
|
||||
if err != nil {
|
||||
t.Error("expected success pinning cid")
|
||||
}
|
||||
pinSt, err := ipfs.PinLsCid(c)
|
||||
pinSt, err := ipfs.PinLsCid(ctx, c)
|
||||
if err != nil {
|
||||
t.Fatal("expected success doing ls")
|
||||
}
|
||||
|
@ -96,7 +98,7 @@ func testPin(t *testing.T, method string) {
|
|||
}
|
||||
|
||||
c2, _ := cid.Decode(test.ErrorCid)
|
||||
err = ipfs.Pin(c2, true)
|
||||
err = ipfs.Pin(ctx, c2, true)
|
||||
if err == nil {
|
||||
t.Error("expected error pinning cid")
|
||||
}
|
||||
|
@ -108,50 +110,53 @@ func TestIPFSPin(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIPFSUnpin(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
defer ipfs.Shutdown()
|
||||
c, _ := cid.Decode(test.TestCid1)
|
||||
err := ipfs.Unpin(c)
|
||||
err := ipfs.Unpin(ctx, c)
|
||||
if err != nil {
|
||||
t.Error("expected success unpinning non-pinned cid")
|
||||
}
|
||||
ipfs.Pin(c, true)
|
||||
err = ipfs.Unpin(c)
|
||||
ipfs.Pin(ctx, c, true)
|
||||
err = ipfs.Unpin(ctx, c)
|
||||
if err != nil {
|
||||
t.Error("expected success unpinning pinned cid")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIPFSPinLsCid(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
defer ipfs.Shutdown()
|
||||
c, _ := cid.Decode(test.TestCid1)
|
||||
c2, _ := cid.Decode(test.TestCid2)
|
||||
|
||||
ipfs.Pin(c, true)
|
||||
ips, err := ipfs.PinLsCid(c)
|
||||
ipfs.Pin(ctx, c, true)
|
||||
ips, err := ipfs.PinLsCid(ctx, c)
|
||||
if err != nil || !ips.IsPinned() {
|
||||
t.Error("c should appear pinned")
|
||||
}
|
||||
|
||||
ips, err = ipfs.PinLsCid(c2)
|
||||
ips, err = ipfs.PinLsCid(ctx, c2)
|
||||
if err != nil || ips != api.IPFSPinStatusUnpinned {
|
||||
t.Error("c2 should appear unpinned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIPFSPinLs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
defer ipfs.Shutdown()
|
||||
c, _ := cid.Decode(test.TestCid1)
|
||||
c2, _ := cid.Decode(test.TestCid2)
|
||||
|
||||
ipfs.Pin(c, true)
|
||||
ipfs.Pin(c2, true)
|
||||
ipsMap, err := ipfs.PinLs("")
|
||||
ipfs.Pin(ctx, c, true)
|
||||
ipfs.Pin(ctx, c2, true)
|
||||
ipsMap, err := ipfs.PinLs(ctx, "")
|
||||
if err != nil {
|
||||
t.Error("should not error")
|
||||
}
|
||||
|
@ -702,6 +707,7 @@ func TestSwarmPeers(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRepoSize(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
defer ipfs.Shutdown()
|
||||
|
@ -716,7 +722,7 @@ func TestRepoSize(t *testing.T) {
|
|||
}
|
||||
|
||||
c, _ := cid.Decode(test.TestCid1)
|
||||
err = ipfs.Pin(c, true)
|
||||
err = ipfs.Pin(ctx, c, true)
|
||||
if err != nil {
|
||||
t.Error("expected success pinning cid")
|
||||
}
|
||||
|
|
|
@ -251,26 +251,26 @@ func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in api.PinSerial, out
|
|||
func (rpcapi *RPCAPI) IPFSPin(ctx context.Context, in api.PinSerial, out *struct{}) error {
|
||||
c := in.ToPin().Cid
|
||||
r := in.ToPin().Recursive
|
||||
return rpcapi.c.ipfs.Pin(c, r)
|
||||
return rpcapi.c.ipfs.Pin(ctx, c, r)
|
||||
}
|
||||
|
||||
// IPFSUnpin runs IPFSConnector.Unpin().
|
||||
func (rpcapi *RPCAPI) IPFSUnpin(ctx context.Context, in api.PinSerial, out *struct{}) error {
|
||||
c := in.ToPin().Cid
|
||||
return rpcapi.c.ipfs.Unpin(c)
|
||||
return rpcapi.c.ipfs.Unpin(ctx, c)
|
||||
}
|
||||
|
||||
// IPFSPinLsCid runs IPFSConnector.PinLsCid().
|
||||
func (rpcapi *RPCAPI) IPFSPinLsCid(ctx context.Context, in api.PinSerial, out *api.IPFSPinStatus) error {
|
||||
c := in.ToPin().Cid
|
||||
b, err := rpcapi.c.ipfs.PinLsCid(c)
|
||||
b, err := rpcapi.c.ipfs.PinLsCid(ctx, c)
|
||||
*out = b
|
||||
return err
|
||||
}
|
||||
|
||||
// IPFSPinLs runs IPFSConnector.PinLs().
|
||||
func (rpcapi *RPCAPI) IPFSPinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
|
||||
m, err := rpcapi.c.ipfs.PinLs(in)
|
||||
m, err := rpcapi.c.ipfs.PinLs(ctx, in)
|
||||
*out = m
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user