Merge pull request #523 from jglukasik/master

First pass at creating a client interface ClientIface
This commit is contained in:
Hector Sanjuan 2018-09-28 21:07:16 +02:00 committed by GitHub
commit ef31db6548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 186 additions and 110 deletions

View File

@ -9,9 +9,14 @@ import (
"net/http"
"time"
"github.com/ipfs/go-ipfs-cmdkit/files"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
shell "github.com/ipfs/go-ipfs-api"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
@ -30,6 +35,71 @@ var (
var loggingFacility = "apiclient"
var logger = logging.Logger(loggingFacility)
// Client interface defines the interface to be used by API clients to
// interact with the ipfs-cluster-service
type Client interface {
// ID returns information about the cluster Peer.
ID() (api.ID, error)
// Peers requests ID information for all cluster peers.
Peers() ([]api.ID, error)
// PeerAdd adds a new peer to the cluster.
PeerAdd(pid peer.ID) (api.ID, error)
// PeerRm removes a current peer from the cluster
PeerRm(id peer.ID) error
// Add imports files to the cluster from the given paths.
Add(paths []string, params *api.AddParams, out chan<- *api.AddedOutput) error
// AddMultiFile imports new files from a MultiFileReader.
AddMultiFile(multiFileR *files.MultiFileReader, params *api.AddParams, out chan<- *api.AddedOutput) error
// Pin tracks a Cid with the given replication factor and a name for
// human-friendliness.
Pin(ci *cid.Cid, replicationFactorMin, replicationFactorMax int, name string) error
// Unpin untracks a Cid from cluster.
Unpin(ci *cid.Cid) error
// Allocations returns the consensus state listing all tracked items
// and the peers that should be pinning them.
Allocations(filter api.PinType) ([]api.Pin, error)
// Allocation returns the current allocations for a given Cid.
Allocation(ci *cid.Cid) (api.Pin, error)
// Status returns the current ipfs state for a given Cid. If local is true,
// the information affects only the current peer, otherwise the information
// is fetched from all cluster peers.
Status(ci *cid.Cid, local bool) (api.GlobalPinInfo, error)
// StatusAll gathers Status() for all tracked items.
StatusAll(local bool) ([]api.GlobalPinInfo, error)
// Sync makes sure the state of a Cid corresponds to the state reported
// by the ipfs daemon, and returns it. If local is true, this operation
// only happens on the current peer, otherwise it happens on every
// cluster peer.
Sync(ci *cid.Cid, local bool) (api.GlobalPinInfo, error)
// SyncAll triggers Sync() operations for all tracked items. It only
// returns informations for items that were de-synced or have an error
// state. If local is true, the operation is limited to the current
// peer. Otherwise it happens on every cluster peer.
SyncAll(local bool) ([]api.GlobalPinInfo, error)
// Recover retriggers pin or unpin ipfs operations for a Cid in error
// state. If local is true, the operation is limited to the current
// peer, otherwise it happens on every cluster peer.
Recover(ci *cid.Cid, local bool) (api.GlobalPinInfo, error)
// RecoverAll triggers Recover() operations on all tracked items. If
// local is true, the operation is limited to the current peer.
// Otherwise, it happens everywhere.
RecoverAll(local bool) ([]api.GlobalPinInfo, error)
// Version returns the ipfs-cluster peer's version.
Version() (api.Version, error)
// GetConnectGraph returns an ipfs-cluster connection graph. The
// serialized version, strings instead of pids, is returned
GetConnectGraph() (api.ConnectGraphSerial, error)
}
// Config allows to configure the parameters to connect
// to the ipfs-cluster REST API.
type Config struct {
@ -79,9 +149,9 @@ type Config struct {
LogLevel string
}
// Client provides methods to interact with the ipfs-cluster API. Use
// NewClient() to create one.
type Client struct {
// DefaultClient provides methods to interact with the ipfs-cluster API. Use
// NewDefaultClient() to create one.
type defaultClient struct {
ctx context.Context
cancel func()
config *Config
@ -92,10 +162,10 @@ type Client struct {
p2p host.Host
}
// NewClient initializes a client given a Config.
func NewClient(cfg *Config) (*Client, error) {
// NewDefaultClient initializes a client given a Config.
func NewDefaultClient(cfg *Config) (Client, error) {
ctx := context.Background()
client := &Client{
client := &defaultClient{
ctx: ctx,
config: cfg,
}
@ -142,7 +212,7 @@ func NewClient(cfg *Config) (*Client, error) {
return client, nil
}
func (c *Client) setupAPIAddr() error {
func (c *defaultClient) setupAPIAddr() error {
var addr ma.Multiaddr
var err error
if c.config.APIAddr == nil {
@ -159,7 +229,7 @@ func (c *Client) setupAPIAddr() error {
return nil
}
func (c *Client) resolveAPIAddr() error {
func (c *defaultClient) resolveAPIAddr() error {
resolveCtx, cancel := context.WithTimeout(c.ctx, ResolveTimeout)
defer cancel()
resolved, err := madns.Resolve(resolveCtx, c.config.APIAddr)
@ -175,7 +245,7 @@ func (c *Client) resolveAPIAddr() error {
return nil
}
func (c *Client) setupHTTPClient() error {
func (c *defaultClient) setupHTTPClient() error {
var err error
switch {
@ -198,7 +268,7 @@ func (c *Client) setupHTTPClient() error {
return nil
}
func (c *Client) setupHostname() error {
func (c *defaultClient) setupHostname() error {
// Extract host:port form APIAddr or use Host:Port.
// For libp2p, hostname is set in enableLibp2p()
if !IsPeerAddress(c.config.APIAddr) {
@ -211,12 +281,12 @@ func (c *Client) setupHostname() error {
return nil
}
func (c *Client) setupProxy() error {
func (c *defaultClient) setupProxy() error {
if c.config.ProxyAddr != nil {
return nil
}
// Guess location from APIAddr
// Guess location from APIAddr
port, err := ma.NewMultiaddr(fmt.Sprintf("/tcp/%d", DefaultProxyPort))
if err != nil {
return err
@ -229,7 +299,7 @@ func (c *Client) setupProxy() error {
// configured ProxyAddr (or to the default ipfs-cluster's IPFS proxy port).
// It re-uses this Client's HTTP client, thus will be constrained by
// the same configurations affecting it (timeouts...).
func (c *Client) IPFS() *shell.Shell {
func (c *defaultClient) IPFS() *shell.Shell {
return shell.NewShellWithClient(c.config.ProxyAddr.String(), c.client)
}

View File

@ -64,34 +64,34 @@ func peerMAddr(a *rest.API) ma.Multiaddr {
return listenAddr.Encapsulate(ipfsAddr)
}
func testClientHTTP(t *testing.T, api *rest.API) *Client {
func testClientHTTP(t *testing.T, api *rest.API) *defaultClient {
cfg := &Config{
APIAddr: apiMAddr(api),
DisableKeepAlives: true,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
return c
return c.(*defaultClient)
}
func testClientLibp2p(t *testing.T, api *rest.API) *Client {
func testClientLibp2p(t *testing.T, api *rest.API) *defaultClient {
cfg := &Config{
APIAddr: peerMAddr(api),
ProtectorKey: make([]byte, 32),
DisableKeepAlives: true,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
return c
return c.(*defaultClient)
}
func TestNewClient(t *testing.T) {
func TestNewDefaultClient(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
@ -111,15 +111,16 @@ func TestDefaultAddress(t *testing.T) {
APIAddr: nil,
DisableKeepAlives: true,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
if c.hostname != "127.0.0.1:9094" {
dc := c.(*defaultClient)
if dc.hostname != "127.0.0.1:9094" {
t.Error("default should be used")
}
if c.config.ProxyAddr == nil || c.config.ProxyAddr.String() != "/ip4/127.0.0.1/tcp/9095" {
if dc.config.ProxyAddr == nil || dc.config.ProxyAddr.String() != "/ip4/127.0.0.1/tcp/9095" {
t.Error("proxy address was not guessed correctly")
}
}
@ -132,15 +133,16 @@ func TestMultiaddressPrecedence(t *testing.T) {
Port: "9094",
DisableKeepAlives: true,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
if c.hostname != "1.2.3.4:1234" {
dc := c.(*defaultClient)
if dc.hostname != "1.2.3.4:1234" {
t.Error("APIAddr should be used")
}
if c.config.ProxyAddr == nil || c.config.ProxyAddr.String() != "/ip4/1.2.3.4/tcp/9095" {
if dc.config.ProxyAddr == nil || dc.config.ProxyAddr.String() != "/ip4/1.2.3.4/tcp/9095" {
t.Error("proxy address was not guessed correctly")
}
}
@ -152,15 +154,16 @@ func TestHostPort(t *testing.T) {
Port: "9094",
DisableKeepAlives: true,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
if c.hostname != "3.3.1.1:9094" {
dc := c.(*defaultClient)
if dc.hostname != "3.3.1.1:9094" {
t.Error("Host Port should be used")
}
if c.config.ProxyAddr == nil || c.config.ProxyAddr.String() != "/ip4/3.3.1.1/tcp/9095" {
if dc.config.ProxyAddr == nil || dc.config.ProxyAddr.String() != "/ip4/3.3.1.1/tcp/9095" {
t.Error("proxy address was not guessed correctly")
}
}
@ -173,15 +176,16 @@ func TestDNSMultiaddress(t *testing.T) {
Port: "9094",
DisableKeepAlives: true,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
if c.hostname != "127.0.0.1:1234" {
dc := c.(*defaultClient)
if dc.hostname != "127.0.0.1:1234" {
t.Error("bad resolved address")
}
if c.config.ProxyAddr == nil || c.config.ProxyAddr.String() != "/ip4/127.0.0.1/tcp/9095" {
if dc.config.ProxyAddr == nil || dc.config.ProxyAddr.String() != "/ip4/127.0.0.1/tcp/9095" {
t.Error("proxy address was not guessed correctly")
}
}
@ -194,15 +198,16 @@ func TestPeerAddress(t *testing.T) {
Port: "9094",
DisableKeepAlives: true,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
if c.hostname != "QmP7R7gWEnruNePxmCa9GBa4VmUNexLVnb1v47R8Gyo3LP" || c.net != "libp2p" {
dc := c.(*defaultClient)
if dc.hostname != "QmP7R7gWEnruNePxmCa9GBa4VmUNexLVnb1v47R8Gyo3LP" || dc.net != "libp2p" {
t.Error("bad resolved address")
}
if c.config.ProxyAddr == nil || c.config.ProxyAddr.String() != "/ip4/127.0.0.1/tcp/9095" {
if dc.config.ProxyAddr == nil || dc.config.ProxyAddr.String() != "/ip4/127.0.0.1/tcp/9095" {
t.Error("proxy address was not guessed correctly")
}
}
@ -213,12 +218,12 @@ func TestProxyAddress(t *testing.T) {
DisableKeepAlives: true,
ProxyAddr: addr,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
if c.config.ProxyAddr.String() != addr.String() {
dc := c.(*defaultClient)
if dc.config.ProxyAddr.String() != addr.String() {
t.Error("proxy address was replaced")
}
}
@ -239,12 +244,12 @@ func TestIPFS(t *testing.T) {
ProxyAddr: proxyAddr,
}
c, err := NewClient(cfg)
c, err := NewDefaultClient(cfg)
if err != nil {
t.Fatal(err)
}
ipfs := c.IPFS()
dc := c.(*defaultClient)
ipfs := dc.IPFS()
err = ipfs.Pin(test.TestCid1)
if err != nil {

View File

@ -20,14 +20,14 @@ import (
)
// ID returns information about the cluster Peer.
func (c *Client) ID() (api.ID, error) {
func (c *defaultClient) ID() (api.ID, error) {
var id api.IDSerial
err := c.do("GET", "/id", nil, nil, &id)
return id.ToID(), err
}
// Peers requests ID information for all cluster peers.
func (c *Client) Peers() ([]api.ID, error) {
func (c *defaultClient) Peers() ([]api.ID, error) {
var ids []api.IDSerial
err := c.do("GET", "/peers", nil, nil, &ids)
result := make([]api.ID, len(ids))
@ -42,7 +42,7 @@ type peerAddBody struct {
}
// PeerAdd adds a new peer to the cluster.
func (c *Client) PeerAdd(pid peer.ID) (api.ID, error) {
func (c *defaultClient) PeerAdd(pid peer.ID) (api.ID, error) {
pidStr := peer.IDB58Encode(pid)
body := peerAddBody{pidStr}
@ -56,13 +56,13 @@ func (c *Client) PeerAdd(pid peer.ID) (api.ID, error) {
}
// PeerRm removes a current peer from the cluster
func (c *Client) PeerRm(id peer.ID) error {
func (c *defaultClient) PeerRm(id peer.ID) error {
return c.do("DELETE", fmt.Sprintf("/peers/%s", id.Pretty()), nil, nil, nil)
}
// Pin tracks a Cid with the given replication factor and a name for
// human-friendliness.
func (c *Client) Pin(ci cid.Cid, replicationFactorMin, replicationFactorMax int, name string) error {
func (c *defaultClient) Pin(ci *cid.Cid, replicationFactorMin, replicationFactorMax int, name string) error {
escName := url.QueryEscape(name)
err := c.do(
"POST",
@ -81,13 +81,13 @@ func (c *Client) Pin(ci cid.Cid, replicationFactorMin, replicationFactorMax int,
}
// Unpin untracks a Cid from cluster.
func (c *Client) Unpin(ci cid.Cid) error {
func (c *defaultClient) Unpin(ci *cid.Cid) error {
return c.do("DELETE", fmt.Sprintf("/pins/%s", ci.String()), nil, nil, nil)
}
// Allocations returns the consensus state listing all tracked items and
// the peers that should be pinning them.
func (c *Client) Allocations(filter api.PinType) ([]api.Pin, error) {
func (c *defaultClient) Allocations(filter api.PinType) ([]api.Pin, error) {
var pins []api.PinSerial
types := []api.PinType{
@ -118,7 +118,7 @@ func (c *Client) Allocations(filter api.PinType) ([]api.Pin, error) {
}
// Allocation returns the current allocations for a given Cid.
func (c *Client) Allocation(ci cid.Cid) (api.Pin, error) {
func (c *defaultClient) Allocation(ci *cid.Cid) (api.Pin, error) {
var pin api.PinSerial
err := c.do("GET", fmt.Sprintf("/allocations/%s", ci.String()), nil, nil, &pin)
return pin.ToPin(), err
@ -127,14 +127,14 @@ func (c *Client) Allocation(ci cid.Cid) (api.Pin, error) {
// Status returns the current ipfs state for a given Cid. If local is true,
// the information affects only the current peer, otherwise the information
// is fetched from all cluster peers.
func (c *Client) Status(ci cid.Cid, local bool) (api.GlobalPinInfo, error) {
func (c *defaultClient) Status(ci *cid.Cid, local bool) (api.GlobalPinInfo, error) {
var gpi api.GlobalPinInfoSerial
err := c.do("GET", fmt.Sprintf("/pins/%s?local=%t", ci.String(), local), nil, nil, &gpi)
return gpi.ToGlobalPinInfo(), err
}
// StatusAll gathers Status() for all tracked items.
func (c *Client) StatusAll(local bool) ([]api.GlobalPinInfo, error) {
func (c *defaultClient) StatusAll(local bool) ([]api.GlobalPinInfo, error) {
var gpis []api.GlobalPinInfoSerial
err := c.do("GET", fmt.Sprintf("/pins?local=%t", local), nil, nil, &gpis)
result := make([]api.GlobalPinInfo, len(gpis))
@ -147,7 +147,7 @@ func (c *Client) StatusAll(local bool) ([]api.GlobalPinInfo, error) {
// Sync makes sure the state of a Cid corresponds to the state reported by
// the ipfs daemon, and returns it. If local is true, this operation only
// happens on the current peer, otherwise it happens on every cluster peer.
func (c *Client) Sync(ci cid.Cid, local bool) (api.GlobalPinInfo, error) {
func (c *defaultClient) Sync(ci *cid.Cid, local bool) (api.GlobalPinInfo, error) {
var gpi api.GlobalPinInfoSerial
err := c.do("POST", fmt.Sprintf("/pins/%s/sync?local=%t", ci.String(), local), nil, nil, &gpi)
return gpi.ToGlobalPinInfo(), err
@ -157,7 +157,7 @@ func (c *Client) Sync(ci cid.Cid, local bool) (api.GlobalPinInfo, error) {
// informations for items that were de-synced or have an error state. If
// local is true, the operation is limited to the current peer. Otherwise
// it happens on every cluster peer.
func (c *Client) SyncAll(local bool) ([]api.GlobalPinInfo, error) {
func (c *defaultClient) SyncAll(local bool) ([]api.GlobalPinInfo, error) {
var gpis []api.GlobalPinInfoSerial
err := c.do("POST", fmt.Sprintf("/pins/sync?local=%t", local), nil, nil, &gpis)
result := make([]api.GlobalPinInfo, len(gpis))
@ -170,7 +170,7 @@ func (c *Client) SyncAll(local bool) ([]api.GlobalPinInfo, error) {
// Recover retriggers pin or unpin ipfs operations for a Cid in error state.
// If local is true, the operation is limited to the current peer, otherwise
// it happens on every cluster peer.
func (c *Client) Recover(ci cid.Cid, local bool) (api.GlobalPinInfo, error) {
func (c *defaultClient) Recover(ci *cid.Cid, local bool) (api.GlobalPinInfo, error) {
var gpi api.GlobalPinInfoSerial
err := c.do("POST", fmt.Sprintf("/pins/%s/recover?local=%t", ci.String(), local), nil, nil, &gpi)
return gpi.ToGlobalPinInfo(), err
@ -179,7 +179,7 @@ func (c *Client) Recover(ci cid.Cid, local bool) (api.GlobalPinInfo, error) {
// RecoverAll triggers Recover() operations on all tracked items. If local is
// true, the operation is limited to the current peer. Otherwise, it happens
// everywhere.
func (c *Client) RecoverAll(local bool) ([]api.GlobalPinInfo, error) {
func (c *defaultClient) RecoverAll(local bool) ([]api.GlobalPinInfo, error) {
var gpis []api.GlobalPinInfoSerial
err := c.do("POST", fmt.Sprintf("/pins/recover?local=%t", local), nil, nil, &gpis)
result := make([]api.GlobalPinInfo, len(gpis))
@ -190,7 +190,7 @@ func (c *Client) RecoverAll(local bool) ([]api.GlobalPinInfo, error) {
}
// Version returns the ipfs-cluster peer's version.
func (c *Client) Version() (api.Version, error) {
func (c *defaultClient) Version() (api.Version, error) {
var ver api.Version
err := c.do("GET", "/version", nil, nil, &ver)
return ver, err
@ -198,7 +198,7 @@ func (c *Client) Version() (api.Version, error) {
// GetConnectGraph returns an ipfs-cluster connection graph.
// The serialized version, strings instead of pids, is returned
func (c *Client) GetConnectGraph() (api.ConnectGraphSerial, error) {
func (c *defaultClient) GetConnectGraph() (api.ConnectGraphSerial, error) {
var graphS api.ConnectGraphSerial
err := c.do("GET", "/health/graph", nil, nil, &graphS)
return graphS, err
@ -207,7 +207,7 @@ func (c *Client) GetConnectGraph() (api.ConnectGraphSerial, error) {
// WaitFor is a utility function that allows for a caller to
// wait for a paticular status for a CID. It returns a channel
// upon which the caller can wait for the targetStatus.
func (c *Client) WaitFor(ctx context.Context, fp StatusFilterParams) (api.GlobalPinInfo, error) {
func WaitFor(ctx context.Context, c Client, fp StatusFilterParams) (api.GlobalPinInfo, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -285,7 +285,7 @@ func (sf *statusFilter) filter(ctx context.Context, fp StatusFilterParams) {
}
}
func (sf *statusFilter) pollStatus(ctx context.Context, c *Client, fp StatusFilterParams) {
func (sf *statusFilter) pollStatus(ctx context.Context, c Client, fp StatusFilterParams) {
ticker := time.NewTicker(fp.CheckFreq)
defer ticker.Stop()
@ -295,7 +295,7 @@ func (sf *statusFilter) pollStatus(ctx context.Context, c *Client, fp StatusFilt
sf.Err <- ctx.Err()
return
case <-ticker.C:
gblPinInfo, err := c.Status(fp.Cid, fp.Local)
gblPinInfo, err := c.Status(&fp.Cid, fp.Local)
if err != nil {
sf.Err <- err
return
@ -365,7 +365,7 @@ func makeSerialFile(fpath string, params *api.AddParams) (files.File, error) {
// sharding the resulting DAG across the IPFS daemons of multiple cluster
// peers. The output channel will receive regular updates as the adding
// process progresses.
func (c *Client) Add(
func (c *defaultClient) Add(
paths []string,
params *api.AddParams,
out chan<- *api.AddedOutput,
@ -400,7 +400,7 @@ func (c *Client) Add(
}
// AddMultiFile imports new files from a MultiFileReader. See Add().
func (c *Client) AddMultiFile(
func (c *defaultClient) AddMultiFile(
multiFileR *files.MultiFileReader,
params *api.AddParams,
out chan<- *api.AddedOutput,

View File

@ -18,7 +18,7 @@ import (
"github.com/ipfs/ipfs-cluster/test"
)
func testClients(t *testing.T, api *rest.API, f func(*testing.T, *Client)) {
func testClients(t *testing.T, api *rest.API, f func(*testing.T, Client)) {
t.Run("in-parallel", func(t *testing.T) {
t.Run("libp2p", func(t *testing.T) {
t.Parallel()
@ -35,7 +35,7 @@ func TestVersion(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
v, err := c.Version()
if err != nil || v.Version == "" {
t.Logf("%+v", v)
@ -51,7 +51,7 @@ func TestID(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
id, err := c.ID()
if err != nil {
t.Fatal(err)
@ -68,7 +68,7 @@ func TestPeers(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ids, err := c.Peers()
if err != nil {
t.Fatal(err)
@ -85,9 +85,9 @@ func TestPeersWithError(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/44444")
c, _ = NewClient(&Config{APIAddr: addr, DisableKeepAlives: true})
c, _ = NewDefaultClient(&Config{APIAddr: addr, DisableKeepAlives: true})
ids, err := c.Peers()
if err == nil {
t.Fatal("expected error")
@ -104,7 +104,7 @@ func TestPeerAdd(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
id, err := c.PeerAdd(test.TestPeerID1)
if err != nil {
t.Fatal(err)
@ -121,7 +121,7 @@ func TestPeerRm(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
err := c.PeerRm(test.TestPeerID1)
if err != nil {
t.Fatal(err)
@ -135,9 +135,9 @@ func TestPin(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
err := c.Pin(ci, 6, 7, "hello")
err := c.Pin(&ci, 6, 7, "hello")
if err != nil {
t.Fatal(err)
}
@ -150,9 +150,9 @@ func TestUnpin(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
err := c.Unpin(ci)
err := c.Unpin(&ci)
if err != nil {
t.Fatal(err)
}
@ -165,7 +165,7 @@ func TestAllocations(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
pins, err := c.Allocations(types.DataType | types.MetaType)
if err != nil {
t.Fatal(err)
@ -182,9 +182,9 @@ func TestAllocation(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
pin, err := c.Allocation(ci)
pin, err := c.Allocation(&ci)
if err != nil {
t.Fatal(err)
}
@ -200,9 +200,9 @@ func TestStatus(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
pin, err := c.Status(ci, false)
pin, err := c.Status(&ci, false)
if err != nil {
t.Fatal(err)
}
@ -218,7 +218,7 @@ func TestStatusAll(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
pins, err := c.StatusAll(false)
if err != nil {
t.Fatal(err)
@ -236,9 +236,9 @@ func TestSync(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
pin, err := c.Sync(ci, false)
pin, err := c.Sync(&ci, false)
if err != nil {
t.Fatal(err)
}
@ -254,7 +254,7 @@ func TestSyncAll(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
pins, err := c.SyncAll(false)
if err != nil {
t.Fatal(err)
@ -272,9 +272,9 @@ func TestRecover(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
pin, err := c.Recover(ci, false)
pin, err := c.Recover(&ci, false)
if err != nil {
t.Fatal(err)
}
@ -290,7 +290,7 @@ func TestRecoverAll(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
_, err := c.RecoverAll(true)
if err != nil {
t.Fatal(err)
@ -304,7 +304,7 @@ func TestGetConnectGraph(t *testing.T) {
api := testAPI(t)
defer shutdown(api)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
cg, err := c.GetConnectGraph()
if err != nil {
t.Fatal(err)
@ -388,7 +388,7 @@ func TestWaitFor(t *testing.T) {
tapi.SetClient(rpcC)
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
var wg sync.WaitGroup
@ -406,7 +406,7 @@ func TestWaitFor(t *testing.T) {
}
start := time.Now()
st, err := c.WaitFor(ctx, fp)
st, err := WaitFor(ctx, c, fp)
if err != nil {
t.Fatal(err)
}
@ -420,7 +420,7 @@ func TestWaitFor(t *testing.T) {
}
}
}()
err := c.Pin(ci, 0, 0, "test")
err := c.Pin(&ci, 0, 0, "test")
if err != nil {
t.Fatal(err)
}
@ -434,7 +434,7 @@ func TestAddMultiFile(t *testing.T) {
api := testAPI(t)
defer api.Shutdown()
testF := func(t *testing.T, c *Client) {
testF := func(t *testing.T, c Client) {
sth := test.NewShardingTestHelper()
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()

View File

@ -12,7 +12,7 @@ import (
type responseDecoder func(d *json.Decoder) error
func (c *Client) do(
func (c *defaultClient) do(
method, path string,
headers map[string]string,
body io.Reader,
@ -26,7 +26,7 @@ func (c *Client) do(
return c.handleResponse(resp, obj)
}
func (c *Client) doStream(
func (c *defaultClient) doStream(
method, path string,
headers map[string]string,
body io.Reader,
@ -40,7 +40,7 @@ func (c *Client) doStream(
return c.handleStreamResponse(resp, outHandler)
}
func (c *Client) doRequest(
func (c *defaultClient) doRequest(
method, path string,
headers map[string]string,
body io.Reader,
@ -73,7 +73,7 @@ func (c *Client) doRequest(
return c.client.Do(r)
}
func (c *Client) handleResponse(resp *http.Response, obj interface{}) error {
func (c *defaultClient) handleResponse(resp *http.Response, obj interface{}) error {
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
@ -110,7 +110,7 @@ func (c *Client) handleResponse(resp *http.Response, obj interface{}) error {
return nil
}
func (c *Client) handleStreamResponse(resp *http.Response, handler responseDecoder) error {
func (c *defaultClient) handleStreamResponse(resp *http.Response, handler responseDecoder) error {
if resp.StatusCode > 399 && resp.StatusCode < 600 {
return c.handleResponse(resp, nil)
}

View File

@ -22,7 +22,7 @@ import (
// with it since it's a global variable, and we don't know who else uses
// it, so we create our own.
// TODO: Allow more configuration options.
func (c *Client) defaultTransport() {
func (c *defaultClient) defaultTransport() {
c.transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
@ -38,7 +38,7 @@ func (c *Client) defaultTransport() {
c.net = "http"
}
func (c *Client) enableLibp2p() error {
func (c *defaultClient) enableLibp2p() error {
c.defaultTransport()
pid, addr, err := api.Libp2pMultiaddrSplit(c.config.APIAddr)
@ -80,7 +80,7 @@ func (c *Client) enableLibp2p() error {
return nil
}
func (c *Client) enableTLS() error {
func (c *defaultClient) enableTLS() error {
c.defaultTransport()
// based on https://github.com/denji/golang-tls
c.transport.TLSClientConfig = &tls.Config{

View File

@ -40,7 +40,7 @@ var (
var logger = logging.Logger("cluster-ctl")
var globalClient *client.Client
var globalClient client.Client
// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
@ -172,7 +172,7 @@ requires authorization. implies --https, which you can disable with --force-http
checkErr("", errors.New("unsupported encoding"))
}
globalClient, err = client.NewClient(cfg)
globalClient, err = client.NewDefaultClient(cfg)
checkErr("creating API client", err)
return nil
}
@ -494,7 +494,7 @@ peers should pin this content.
rplMax = rpl
}
cerr := globalClient.Pin(ci, rplMin, rplMax, c.String("name"))
cerr := globalClient.Pin(&ci, rplMin, rplMax, c.String("name"))
if cerr != nil {
formatResponse(c, nil, cerr)
return nil
@ -539,7 +539,7 @@ although unpinning operations in the cluster may take longer or fail.
cidStr := c.Args().First()
ci, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
cerr := globalClient.Unpin(ci)
cerr := globalClient.Unpin(&ci)
if cerr != nil {
formatResponse(c, nil, cerr)
return nil
@ -583,7 +583,7 @@ The filter only takes effect when listing all pins. The possible values are:
if cidStr != "" {
ci, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp, cerr := globalClient.Allocation(ci)
resp, cerr := globalClient.Allocation(&ci)
formatResponse(c, resp, cerr)
} else {
var filter api.PinType
@ -624,7 +624,7 @@ contacted cluster peer. By default, status will be fetched from all peers.
if cidStr != "" {
ci, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp, cerr := globalClient.Status(ci, c.Bool("local"))
resp, cerr := globalClient.Status(&ci, c.Bool("local"))
formatResponse(c, resp, cerr)
} else {
resp, cerr := globalClient.StatusAll(c.Bool("local"))
@ -660,7 +660,7 @@ operations on the contacted peer. By default, all peers will sync.
if cidStr != "" {
ci, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp, cerr := globalClient.Sync(ci, c.Bool("local"))
resp, cerr := globalClient.Sync(&ci, c.Bool("local"))
formatResponse(c, resp, cerr)
} else {
resp, cerr := globalClient.SyncAll(c.Bool("local"))
@ -692,7 +692,7 @@ operations on the contacted peer (as opposed to on every peer).
if cidStr != "" {
ci, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp, cerr := globalClient.Recover(ci, c.Bool("local"))
resp, cerr := globalClient.Recover(&ci, c.Bool("local"))
formatResponse(c, resp, cerr)
} else {
resp, cerr := globalClient.RecoverAll(c.Bool("local"))
@ -877,7 +877,7 @@ func handlePinResponseFormatFlags(
if status.Cid == cid.Undef { // no status from "wait"
time.Sleep(time.Second)
status, cerr = globalClient.Status(ci, false)
status, cerr = globalClient.Status(&ci, false)
}
formatResponse(c, status, cerr)
}
@ -903,5 +903,5 @@ func waitFor(
CheckFreq: defaultWaitCheckFreq,
}
return globalClient.WaitFor(ctx, fp)
return client.WaitFor(ctx, globalClient, fp)
}

View File

@ -4,7 +4,8 @@
"url": "https://github.com/ipfs/ipfs-cluster"
},
"gx": {
"dvcsimport": "github.com/ipfs/ipfs-cluster"
"dvcsimport": "github.com/ipfs/ipfs-cluster",
"goversion": "1.10"
},
"gxDependencies": [
{