From 9e20e4e3b230bd11efc8d8db0c238c9045cf916a Mon Sep 17 00:00:00 2001 From: Adrian Lanzafame Date: Mon, 16 Apr 2018 19:01:20 +1000 Subject: [PATCH] ipfsconn/ipfshttp: Pass ctx through from rpc_api to the ipfscluster.IPFSConnector interface and then to the implementation of that interface in ipfsconn/ipfshttp. This allows calls from MapPinTracker to cancel requests made to the local IPFS node. License: MIT Signed-off-by: Adrian Lanzafame --- cluster_test.go | 9 +++++---- ipfscluster.go | 10 ++++++---- ipfsconn/ipfshttp/ipfshttp.go | 12 +++++------ ipfsconn/ipfshttp/ipfshttp_test.go | 32 ++++++++++++++++++------------ rpc_api.go | 8 ++++---- 5 files changed, 40 insertions(+), 31 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index cdb99e40..1fdd768a 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,6 +1,7 @@ package ipfscluster import ( + "context" "errors" "os" "path/filepath" @@ -51,28 +52,28 @@ func (ipfs *mockConnector) ID() (api.IPFSID, error) { }, nil } -func (ipfs *mockConnector) Pin(c *cid.Cid, b bool) error { +func (ipfs *mockConnector) Pin(ctx context.Context, c *cid.Cid, b bool) error { if ipfs.returnError { return errors.New("") } return nil } -func (ipfs *mockConnector) Unpin(c *cid.Cid) error { +func (ipfs *mockConnector) Unpin(ctx context.Context, c *cid.Cid) error { if ipfs.returnError { return errors.New("") } return nil } -func (ipfs *mockConnector) PinLsCid(c *cid.Cid) (api.IPFSPinStatus, error) { +func (ipfs *mockConnector) PinLsCid(ctx context.Context, c *cid.Cid) (api.IPFSPinStatus, error) { if ipfs.returnError { return api.IPFSPinStatusError, errors.New("") } return api.IPFSPinStatusRecursive, nil } -func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, error) { +func (ipfs *mockConnector) PinLs(ctx context.Context, filter string) (map[string]api.IPFSPinStatus, error) { if ipfs.returnError { return nil, errors.New("") } diff --git a/ipfscluster.go b/ipfscluster.go index da2982cf..dcd6cd45 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -9,6 +9,8 @@ package ipfscluster import ( + "context" + "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/state" @@ -70,10 +72,10 @@ type API interface { type IPFSConnector interface { Component ID() (api.IPFSID, error) - Pin(*cid.Cid, bool) error - Unpin(*cid.Cid) error - PinLsCid(*cid.Cid) (api.IPFSPinStatus, error) - PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) + Pin(context.Context, *cid.Cid, bool) error + Unpin(context.Context, *cid.Cid) error + PinLsCid(context.Context, *cid.Cid) (api.IPFSPinStatus, error) + PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) // ConnectSwarms make sure this peer's IPFS daemon is connected to // other peers IPFS daemons. ConnectSwarms() error diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 81d0ff2d..6ea5f942 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -607,8 +607,8 @@ func (ipfs *Connector) ID() (api.IPFSID, error) { // Pin performs a pin request against the configured IPFS // daemon. -func (ipfs *Connector) Pin(hash *cid.Cid, recursive bool) error { - pinStatus, err := ipfs.PinLsCid(hash) +func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) error { + pinStatus, err := ipfs.PinLsCid(ctx, hash) if err != nil { return err } @@ -636,8 +636,8 @@ func (ipfs *Connector) Pin(hash *cid.Cid, recursive bool) error { // Unpin performs an unpin request against the configured IPFS // daemon. -func (ipfs *Connector) Unpin(hash *cid.Cid) error { - pinStatus, err := ipfs.PinLsCid(hash) +func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error { + pinStatus, err := ipfs.PinLsCid(ctx, hash) if err != nil { return err } @@ -656,7 +656,7 @@ func (ipfs *Connector) Unpin(hash *cid.Cid) error { // PinLs performs a "pin ls --type typeFilter" request against the configured // IPFS daemon and returns a map of cid strings and their status. -func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) { +func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) { body, err := ipfs.post("pin/ls?type=" + typeFilter) // Some error talking to the daemon @@ -681,7 +681,7 @@ func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, e // PinLsCid performs a "pin ls --type=recursive "request and returns // an api.IPFSPinStatus for that hash. -func (ipfs *Connector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) { +func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPinStatus, error) { lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash) body, err := ipfs.post(lsPath) diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index c5d61d3b..6174e4be 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -2,6 +2,7 @@ package ipfshttp import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -76,6 +77,7 @@ func TestIPFSID(t *testing.T) { } func testPin(t *testing.T, method string) { + ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown() @@ -83,11 +85,11 @@ func testPin(t *testing.T, method string) { ipfs.config.PinMethod = method c, _ := cid.Decode(test.TestCid1) - err := ipfs.Pin(c, true) + err := ipfs.Pin(ctx, c, true) if err != nil { t.Error("expected success pinning cid") } - pinSt, err := ipfs.PinLsCid(c) + pinSt, err := ipfs.PinLsCid(ctx, c) if err != nil { t.Fatal("expected success doing ls") } @@ -96,7 +98,7 @@ func testPin(t *testing.T, method string) { } c2, _ := cid.Decode(test.ErrorCid) - err = ipfs.Pin(c2, true) + err = ipfs.Pin(ctx, c2, true) if err == nil { t.Error("expected error pinning cid") } @@ -108,50 +110,53 @@ func TestIPFSPin(t *testing.T) { } func TestIPFSUnpin(t *testing.T) { + ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown() c, _ := cid.Decode(test.TestCid1) - err := ipfs.Unpin(c) + err := ipfs.Unpin(ctx, c) if err != nil { t.Error("expected success unpinning non-pinned cid") } - ipfs.Pin(c, true) - err = ipfs.Unpin(c) + ipfs.Pin(ctx, c, true) + err = ipfs.Unpin(ctx, c) if err != nil { t.Error("expected success unpinning pinned cid") } } func TestIPFSPinLsCid(t *testing.T) { + ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown() c, _ := cid.Decode(test.TestCid1) c2, _ := cid.Decode(test.TestCid2) - ipfs.Pin(c, true) - ips, err := ipfs.PinLsCid(c) + ipfs.Pin(ctx, c, true) + ips, err := ipfs.PinLsCid(ctx, c) if err != nil || !ips.IsPinned() { t.Error("c should appear pinned") } - ips, err = ipfs.PinLsCid(c2) + ips, err = ipfs.PinLsCid(ctx, c2) if err != nil || ips != api.IPFSPinStatusUnpinned { t.Error("c2 should appear unpinned") } } func TestIPFSPinLs(t *testing.T) { + ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown() c, _ := cid.Decode(test.TestCid1) c2, _ := cid.Decode(test.TestCid2) - ipfs.Pin(c, true) - ipfs.Pin(c2, true) - ipsMap, err := ipfs.PinLs("") + ipfs.Pin(ctx, c, true) + ipfs.Pin(ctx, c2, true) + ipsMap, err := ipfs.PinLs(ctx, "") if err != nil { t.Error("should not error") } @@ -702,6 +707,7 @@ func TestSwarmPeers(t *testing.T) { } func TestRepoSize(t *testing.T) { + ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown() @@ -716,7 +722,7 @@ func TestRepoSize(t *testing.T) { } c, _ := cid.Decode(test.TestCid1) - err = ipfs.Pin(c, true) + err = ipfs.Pin(ctx, c, true) if err != nil { t.Error("expected success pinning cid") } diff --git a/rpc_api.go b/rpc_api.go index 4ad32b93..5df88fa0 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -251,26 +251,26 @@ func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in api.PinSerial, out func (rpcapi *RPCAPI) IPFSPin(ctx context.Context, in api.PinSerial, out *struct{}) error { c := in.ToPin().Cid r := in.ToPin().Recursive - return rpcapi.c.ipfs.Pin(c, r) + return rpcapi.c.ipfs.Pin(ctx, c, r) } // IPFSUnpin runs IPFSConnector.Unpin(). func (rpcapi *RPCAPI) IPFSUnpin(ctx context.Context, in api.PinSerial, out *struct{}) error { c := in.ToPin().Cid - return rpcapi.c.ipfs.Unpin(c) + return rpcapi.c.ipfs.Unpin(ctx, c) } // IPFSPinLsCid runs IPFSConnector.PinLsCid(). func (rpcapi *RPCAPI) IPFSPinLsCid(ctx context.Context, in api.PinSerial, out *api.IPFSPinStatus) error { c := in.ToPin().Cid - b, err := rpcapi.c.ipfs.PinLsCid(c) + b, err := rpcapi.c.ipfs.PinLsCid(ctx, c) *out = b return err } // IPFSPinLs runs IPFSConnector.PinLs(). func (rpcapi *RPCAPI) IPFSPinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error { - m, err := rpcapi.c.ipfs.PinLs(in) + m, err := rpcapi.c.ipfs.PinLs(ctx, in) *out = m return err }