Improve pin/unpin method signatures (#843)

* Improve pin/unpin method signatures:

These changes the following Cluster Go API methods:

* -> Cluster.Pin(ctx, cid, options) (pin, error)
* -> Cluster.Unpin(ctx, cid) (pin, error)
* -> Cluster.PinPath(ctx, path, opts) (pin,error)

Pin and Unpin now return the pinned object.

The signature of the methods now matches that of the API Client, is clearer as
to what options the user can set and is aligned with PinPath, UnpinPath, which
returned pin methods.

The REST API now returns the Pinned/Unpinned object rather than 204-Accepted.

This was necessary for a cleaner pin/update approach, which I'm working on in
another branch.

Most of the changes here are updating tests to the new signatures

* Adapt load-balancing client to new Pin/Unpin signatures

* cluster.go: Fix typo

Co-Authored-By: Kishan Sagathiya <kishansagathiya@gmail.com>

* cluster.go: Fix typo

Co-Authored-By: Kishan Sagathiya <kishansagathiya@gmail.com>
This commit is contained in:
Hector Sanjuan 2019-07-22 15:39:11 +02:00 committed by GitHub
parent b6ba67804f
commit 7c636061bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 193 additions and 144 deletions

View File

@ -73,13 +73,14 @@ func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, err
dgs.dests = nil
var pinResp api.Pin
return root, dgs.rpcClient.CallContext(
ctx,
"",
"Cluster",
"Pin",
rootPin,
&struct{}{},
&pinResp,
)
}

View File

@ -62,13 +62,14 @@ func Pin(ctx context.Context, rpc *rpc.Client, pin *api.Pin) error {
pin.Allocations = []peer.ID{}
}
logger.Debugf("adder pinning %+v", pin)
var pinResp api.Pin
return rpc.CallContext(
ctx,
"", // use ourself to pin
"Cluster",
"Pin",
pin,
&struct{}{},
&pinResp,
)
}

View File

@ -485,6 +485,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
// If unpin != "false", unpin the FROM argument
// (it was already resolved).
var pinObj api.Pin
if unpin {
err = proxy.rpcClient.CallContext(
ctx,
@ -492,7 +493,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
"Cluster",
"Unpin",
&fromPin,
&struct{}{},
&pinObj,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
@ -571,13 +572,14 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
// Unpin because the user doesn't want to pin
time.Sleep(100 * time.Millisecond)
var pinObj api.Pin
err = proxy.rpcClient.CallContext(
proxy.ctx,
"",
"Cluster",
"Unpin",
root,
&struct{}{},
&pinObj,
)
if err != nil {
w.Header().Set("X-Stream-Error", err.Error())

View File

@ -63,9 +63,9 @@ type Client interface {
// Pin tracks a Cid with the given replication factor and a name for
// human-friendliness.
Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) error
Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (*api.Pin, error)
// Unpin untracks a Cid from cluster.
Unpin(ctx context.Context, ci cid.Cid) error
Unpin(ctx context.Context, ci cid.Cid) (*api.Pin, error)
// PinPath resolves given path into a cid and performs the pin operation.
PinPath(ctx context.Context, path string, opts api.PinOptions) (*api.Pin, error)

View File

@ -157,21 +157,29 @@ func (lc *loadBalancingClient) PeerRm(ctx context.Context, id peer.ID) error {
// Pin tracks a Cid with the given replication factor and a name for
// human-friendliness.
func (lc *loadBalancingClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) error {
func (lc *loadBalancingClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (*api.Pin, error) {
var pin *api.Pin
call := func(c Client) error {
return c.Pin(ctx, ci, opts)
var err error
pin, err = c.Pin(ctx, ci, opts)
return err
}
return lc.retry(0, call)
err := lc.retry(0, call)
return pin, err
}
// Unpin untracks a Cid from cluster.
func (lc *loadBalancingClient) Unpin(ctx context.Context, ci cid.Cid) error {
func (lc *loadBalancingClient) Unpin(ctx context.Context, ci cid.Cid) (*api.Pin, error) {
var pin *api.Pin
call := func(c Client) error {
return c.Unpin(ctx, ci)
var err error
pin, err = c.Unpin(ctx, ci)
return err
}
return lc.retry(0, call)
err := lc.retry(0, call)
return pin, err
}
// PinPath allows to pin an element by the given IPFS path.

View File

@ -34,9 +34,9 @@ func TestLBClient(t *testing.T) {
// With Failover strategy, it would go through first 5 empty clients
// and then 6th working client. Thus, all requests should always succeed.
testRunManyRequestsConcurrently(t, cfgs, &Failover{}, 1000, 6, true)
testRunManyRequestsConcurrently(t, cfgs, &Failover{}, 200, 6, true)
// First 5 clients are empty. Thus, all requests should fail.
testRunManyRequestsConcurrently(t, cfgs, &Failover{}, 1000, 5, false)
testRunManyRequestsConcurrently(t, cfgs, &Failover{}, 200, 5, false)
}
func testRunManyRequestsConcurrently(t *testing.T, cfgs []*Config, strategy LBStrategy, requests int, retries int, pass bool) {

View File

@ -74,10 +74,11 @@ func (c *defaultClient) PeerRm(ctx context.Context, id peer.ID) error {
// Pin tracks a Cid with the given replication factor and a name for
// human-friendliness.
func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) error {
func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (*api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "client/Pin")
defer span.End()
var pin api.Pin
err := c.do(
ctx,
"POST",
@ -88,16 +89,24 @@ func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions
),
nil,
nil,
nil,
&pin,
)
return err
if err != nil {
return nil, err
}
return &pin, nil
}
// Unpin untracks a Cid from cluster.
func (c *defaultClient) Unpin(ctx context.Context, ci cid.Cid) error {
func (c *defaultClient) Unpin(ctx context.Context, ci cid.Cid) (*api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "client/Unpin")
defer span.End()
return c.do(ctx, "DELETE", fmt.Sprintf("/pins/%s", ci.String()), nil, nil, nil)
var pin api.Pin
err := c.do(ctx, "DELETE", fmt.Sprintf("/pins/%s", ci.String()), nil, nil, &pin)
if err != nil {
return nil, err
}
return &pin, nil
}
// PinPath allows to pin an element by the given IPFS path.

View File

@ -147,7 +147,7 @@ func TestPin(t *testing.T) {
ReplicationFactorMax: 7,
Name: "hello there",
}
err := c.Pin(ctx, test.Cid1, opts)
_, err := c.Pin(ctx, test.Cid1, opts)
if err != nil {
t.Fatal(err)
}
@ -162,7 +162,7 @@ func TestUnpin(t *testing.T) {
defer shutdown(api)
testF := func(t *testing.T, c Client) {
err := c.Unpin(ctx, test.Cid1)
_, err := c.Unpin(ctx, test.Cid1)
if err != nil {
t.Fatal(err)
}
@ -489,10 +489,11 @@ type waitService struct {
pinStart time.Time
}
func (wait *waitService) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
func (wait *waitService) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
wait.l.Lock()
defer wait.l.Unlock()
wait.pinStart = time.Now()
*out = *in
return nil
}
@ -585,7 +586,7 @@ func TestWaitFor(t *testing.T) {
}
}
}()
err := c.Pin(ctx, test.Cid1, types.PinOptions{ReplicationFactorMin: 0, ReplicationFactorMax: 0, Name: "test", ShardSize: 0})
_, err := c.Pin(ctx, test.Cid1, types.PinOptions{ReplicationFactorMin: 0, ReplicationFactorMax: 0, Name: "test", ShardSize: 0})
if err != nil {
t.Fatal(err)
}

View File

@ -672,15 +672,16 @@ func (api *API) pinHandler(w http.ResponseWriter, r *http.Request) {
if pin := api.parseCidOrError(w, r); pin != nil {
logger.Debugf("rest api pinHandler: %s", pin.Cid)
// span.AddAttributes(trace.StringAttribute("cid", pin.Cid))
var pinObj types.Pin
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"Pin",
pin,
&struct{}{},
&pinObj,
)
api.sendResponse(w, http.StatusAccepted, err, nil)
api.sendResponse(w, http.StatusOK, err, pinObj)
logger.Debug("rest api pinHandler done")
}
}
@ -689,15 +690,16 @@ func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
if pin := api.parseCidOrError(w, r); pin != nil {
logger.Debugf("rest api unpinHandler: %s", pin.Cid)
// span.AddAttributes(trace.StringAttribute("cid", pin.Cid))
var pinObj types.Pin
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"Unpin",
pin,
&struct{}{},
&pinObj,
)
api.sendResponse(w, http.StatusAccepted, err, nil)
api.sendResponse(w, http.StatusOK, err, pinObj)
logger.Debug("rest api unpinHandler done")
}
}

View File

@ -485,7 +485,7 @@ func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID) {
}
for _, pin := range list {
if containsPeer(pin.Allocations, p) {
_, ok, err := c.pin(ctx, pin, []peer.ID{p}, []peer.ID{}) // pin blacklisting this peer
_, ok, err := c.pin(ctx, pin, []peer.ID{p}) // pin blacklisting this peer
if ok && err == nil {
logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty())
}
@ -1166,23 +1166,26 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (*api.Pin, error) {
// pinning strategy, the PinTracker may then request the IPFS daemon
// to pin the Cid.
//
// Pin returns an error if the operation could not be persisted
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
// Pin returns the Pin as stored in the global state (with the given
// allocations and an error if the operation could not be persisted. Pin does
// not reflect the success or failure of underlying IPFS daemon pinning
// operations which happen in async fashion.
//
// If the argument's allocations are non-empty then these peers are pinned with
// priority over other peers in the cluster. If the max repl factor is less
// than the size of the specified peerset then peers are chosen from this set
// in allocation order. If the min repl factor is greater than the size of
// this set then the remaining peers are allocated in order from the rest of
// the cluster. Priority allocations are best effort. If any priority peers
// are unavailable then Pin will simply allocate from the rest of the cluster.
func (c *Cluster) Pin(ctx context.Context, pin *api.Pin) error {
// If the options UserAllocations are non-empty then these peers are pinned
// with priority over other peers in the cluster. If the max repl factor is
// less than the size of the specified peerset then peers are chosen from this
// set in allocation order. If the minimum repl factor is greater than the
// size of this set then the remaining peers are allocated in order from the
// rest of the cluster. Priority allocations are best effort. If any priority
// peers are unavailable then Pin will simply allocate from the rest of the
// cluster.
func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
_, _, err := c.pin(ctx, pin, []peer.ID{}, pin.UserAllocations)
return err
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
return result, err
}
// sets the default replication factor in a pin when it's set to 0
@ -1267,11 +1270,15 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error {
return checkPinType(pin)
}
// pin performs the actual pinning and supports a blacklist to be
// able to evacuate a node and returns the pin object that it tried to pin, whether the pin was submitted
// to the consensus layer or skipped (due to error or to the fact
// that it was already valid) and errror.
func (c *Cluster) pin(ctx context.Context, pin *api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (*api.Pin, bool, error) {
// pin performs the actual pinning and supports a blacklist to be able to
// evacuate a node and returns the pin object that it tried to pin, whether
// the pin was submitted to the consensus layer or skipped (due to error or to
// the fact that it was already valid) and error.
func (c *Cluster) pin(
ctx context.Context,
pin *api.Pin,
blacklist []peer.ID,
) (*api.Pin, bool, error) {
ctx, span := trace.StartSpan(ctx, "cluster/pin")
defer span.End()
@ -1294,7 +1301,7 @@ func (c *Cluster) pin(ctx context.Context, pin *api.Pin, blacklist []peer.ID, pr
pin.ReplicationFactorMin,
pin.ReplicationFactorMax,
blacklist,
prioritylist,
pin.UserAllocations,
)
if err != nil {
return pin, false, err
@ -1317,8 +1324,14 @@ func (c *Cluster) pin(ctx context.Context, pin *api.Pin, blacklist []peer.ID, pr
return pin, true, c.consensus.LogPin(ctx, pin)
}
func (c *Cluster) unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/unpin")
// Unpin removes a previously pinned Cid from Cluster. It returns
// the global state Pin object as it was stored before removal, or
// an error if it was not possible to update the global state.
//
// Unpin does not reflect the success or failure of underlying IPFS daemon
// unpinning operations, which happen in async fashion.
func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Unpin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
@ -1349,20 +1362,6 @@ func (c *Cluster) unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) {
}
}
// Unpin makes the cluster Unpin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state.
//
// Unpin returns an error if the operation could not be persisted
// to the global state. Unpin does not reflect the success or failure
// of underlying IPFS daemon unpinning operations.
func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) error {
_, span := trace.StartSpan(ctx, "cluster/Unpin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
_, err := c.unpin(ctx, h)
return err
}
// unpinClusterDag unpins the clusterDAG metadata node and the shard metadata
// nodes that it references. It handles the case where multiple parents
// reference the same metadata node, only unpinning those nodes without
@ -1389,20 +1388,17 @@ func (c *Cluster) unpinClusterDag(metaPin *api.Pin) error {
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
// Pin object.
func (c *Cluster) PinPath(ctx context.Context, path *api.PinPath) (*api.Pin, error) {
func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/PinPath")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
ci, err := c.ipfs.Resolve(ctx, path.Path)
ci, err := c.ipfs.Resolve(ctx, path)
if err != nil {
return nil, err
}
p := api.PinCid(ci)
p.PinOptions = path.PinOptions
p, _, err = c.pin(ctx, p, []peer.ID{}, p.UserAllocations)
return p, err
return c.Pin(ctx, ci, opts)
}
// UnpinPath unpins a CID resolved from its IPFS Path. If returns the
@ -1417,7 +1413,7 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (*api.Pin, error)
return nil, err
}
return c.unpin(ctx, ci)
return c.Unpin(ctx, ci)
}
// AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer

View File

@ -224,7 +224,7 @@ func TestClusterStateSync(t *testing.T) {
defer cl.Shutdown(ctx)
c := test.Cid1
err := cl.Pin(ctx, api.PinCid(c))
_, err := cl.Pin(ctx, c, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}
@ -274,21 +274,26 @@ func TestClusterPin(t *testing.T) {
defer cl.Shutdown(ctx)
c := test.Cid1
err := cl.Pin(ctx, api.PinCid(c))
res, err := cl.Pin(ctx, c, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}
if res.Type != api.DataType {
t.Error("unexpected pin type")
}
switch consensus {
case "crdt":
return
case "raft":
// test an error case
cl.consensus.Shutdown(ctx)
pin := api.PinCid(c)
pin.ReplicationFactorMax = 1
pin.ReplicationFactorMin = 1
err = cl.Pin(ctx, pin)
opts := api.PinOptions{
ReplicationFactorMax: 1,
ReplicationFactorMin: 1,
}
_, err = cl.Pin(ctx, c, opts)
if err == nil {
t.Error("expected an error but things worked")
}
@ -301,7 +306,7 @@ func TestClusterPinPath(t *testing.T) {
defer cleanState()
defer cl.Shutdown(ctx)
pin, err := cl.PinPath(ctx, &api.PinPath{Path: test.PathIPFS2})
pin, err := cl.PinPath(ctx, test.PathIPFS2, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}
@ -310,7 +315,7 @@ func TestClusterPinPath(t *testing.T) {
}
// test an error case
_, err = cl.PinPath(ctx, &api.PinPath{Path: test.InvalidPath1})
_, err = cl.PinPath(ctx, test.InvalidPath1, api.PinOptions{})
if err == nil {
t.Error("expected an error but things worked")
}
@ -413,7 +418,7 @@ func TestUnpinShard(t *testing.T) {
}
t.Run("unpin clusterdag should fail", func(t *testing.T) {
err := cl.Unpin(ctx, cDag.Cid)
_, err := cl.Unpin(ctx, cDag.Cid)
if err == nil {
t.Fatal("should not allow unpinning the cluster DAG directly")
}
@ -421,7 +426,7 @@ func TestUnpinShard(t *testing.T) {
})
t.Run("unpin shard should fail", func(t *testing.T) {
err := cl.Unpin(ctx, cDagNode.Links()[0].Cid)
_, err := cl.Unpin(ctx, cDagNode.Links()[0].Cid)
if err == nil {
t.Fatal("should not allow unpinning shards directly")
}
@ -429,11 +434,15 @@ func TestUnpinShard(t *testing.T) {
})
t.Run("normal unpin", func(t *testing.T) {
err := cl.Unpin(ctx, root)
res, err := cl.Unpin(ctx, root)
if err != nil {
t.Fatal(err)
}
if res.Type != api.MetaType {
t.Fatal("unexpected root pin type")
}
pinDelay()
for _, c := range pinnedCids {
@ -745,7 +754,7 @@ func TestClusterPins(t *testing.T) {
defer cl.Shutdown(ctx)
c := test.Cid1
err := cl.Pin(ctx, api.PinCid(c))
_, err := cl.Pin(ctx, c, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}
@ -771,7 +780,7 @@ func TestClusterPinGet(t *testing.T) {
defer cl.Shutdown(ctx)
c := test.Cid1
err := cl.Pin(ctx, api.PinCid(c))
_, err := cl.Pin(ctx, c, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}
@ -798,24 +807,28 @@ func TestClusterUnpin(t *testing.T) {
c := test.Cid1
// Unpin should error without pin being committed to state
err := cl.Unpin(ctx, c)
_, err := cl.Unpin(ctx, c)
if err == nil {
t.Error("unpin should have failed")
}
// Unpin after pin should succeed
err = cl.Pin(ctx, api.PinCid(c))
_, err = cl.Pin(ctx, c, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}
err = cl.Unpin(ctx, c)
res, err := cl.Unpin(ctx, c)
if err != nil {
t.Error("unpin should have worked:", err)
}
if res.Type != api.DataType {
t.Error("unexpected pin type returned")
}
// test another error case
cl.consensus.Shutdown(ctx)
err = cl.Unpin(ctx, c)
_, err = cl.Unpin(ctx, c)
if err == nil {
t.Error("expected an error but things worked")
}
@ -834,7 +847,7 @@ func TestClusterUnpinPath(t *testing.T) {
}
// Unpin after pin should succeed
pin, err := cl.PinPath(ctx, &api.PinPath{Path: test.PathIPFS2})
pin, err := cl.PinPath(ctx, test.PathIPFS2, api.PinOptions{})
if err != nil {
t.Fatal("pin with should have worked:", err)
}
@ -890,7 +903,7 @@ func TestClusterRecoverAllLocal(t *testing.T) {
defer cleanState()
defer cl.Shutdown(ctx)
err := cl.Pin(ctx, api.PinCid(test.Cid1))
_, err := cl.Pin(ctx, test.Cid1, api.PinOptions{})
if err != nil {
t.Fatal("pin should have worked:", err)
}

View File

@ -580,7 +580,7 @@ func TestClustersPin(t *testing.T) {
j := rand.Intn(nClusters) // choose a random cluster peer
h, err := prefix.Sum(randomBytes()) // create random cid
checkErr(t, err)
err = clusters[j].Pin(ctx, api.PinCid(h))
_, err = clusters[j].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Errorf("error pinning %s: %s", h, err)
}
@ -622,7 +622,7 @@ func TestClustersPin(t *testing.T) {
for i := 0; i < len(pinList); i++ {
// test re-unpin fails
j := rand.Intn(nClusters) // choose a random cluster peer
err := clusters[j].Unpin(ctx, pinList[i].Cid)
_, err := clusters[j].Unpin(ctx, pinList[i].Cid)
if err != nil {
t.Errorf("error unpinning %s: %s", pinList[i].Cid, err)
}
@ -630,7 +630,7 @@ func TestClustersPin(t *testing.T) {
delay()
for i := 0; i < len(pinList); i++ {
j := rand.Intn(nClusters) // choose a random cluster peer
err := clusters[j].Unpin(ctx, pinList[i].Cid)
_, err := clusters[j].Unpin(ctx, pinList[i].Cid)
if err == nil {
t.Errorf("expected error re-unpinning %s", pinList[i].Cid)
}
@ -651,7 +651,7 @@ func TestClustersStatusAll(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h := test.Cid1
clusters[0].Pin(ctx, api.PinCid(h))
clusters[0].Pin(ctx, h, api.PinOptions{})
pinDelay()
// Global status
f := func(t *testing.T, c *Cluster) {
@ -698,7 +698,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h := test.Cid1
clusters[0].Pin(ctx, api.PinCid(h))
clusters[0].Pin(ctx, h, api.PinOptions{})
pinDelay()
// shutdown 1 cluster peer
@ -777,8 +777,8 @@ func TestClustersSyncAllLocal(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
clusters[0].Pin(ctx, api.PinCid(test.ErrorCid)) // This cid always fails
clusters[0].Pin(ctx, api.PinCid(test.Cid2))
clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{}) // This cid always fails
clusters[0].Pin(ctx, test.Cid2, api.PinOptions{})
pinDelay()
pinDelay()
@ -808,8 +808,8 @@ func TestClustersSyncLocal(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h := test.ErrorCid
h2 := test.Cid2
clusters[0].Pin(ctx, api.PinCid(h))
clusters[0].Pin(ctx, api.PinCid(h2))
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h2, api.PinOptions{})
pinDelay()
pinDelay()
@ -839,8 +839,8 @@ func TestClustersSyncAll(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
clusters[0].Pin(ctx, api.PinCid(test.ErrorCid))
clusters[0].Pin(ctx, api.PinCid(test.Cid2))
clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{})
clusters[0].Pin(ctx, test.Cid2, api.PinOptions{})
pinDelay()
pinDelay()
@ -872,8 +872,8 @@ func TestClustersSync(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
h := test.ErrorCid // This cid always fails
h2 := test.Cid2
clusters[0].Pin(ctx, api.PinCid(h))
clusters[0].Pin(ctx, api.PinCid(h2))
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h2, api.PinOptions{})
pinDelay()
pinDelay()
@ -938,8 +938,8 @@ func TestClustersRecoverLocal(t *testing.T) {
ttlDelay()
clusters[0].Pin(ctx, api.PinCid(h))
clusters[0].Pin(ctx, api.PinCid(h2))
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h2, api.PinOptions{})
pinDelay()
pinDelay()
@ -978,8 +978,8 @@ func TestClustersRecover(t *testing.T) {
ttlDelay()
clusters[0].Pin(ctx, api.PinCid(h))
clusters[0].Pin(ctx, api.PinCid(h2))
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h2, api.PinOptions{})
pinDelay()
pinDelay()
@ -1080,7 +1080,7 @@ func TestClustersReplicationOverall(t *testing.T) {
j := rand.Intn(nClusters) // choose a random cluster peer
h, err := prefix.Sum(randomBytes()) // create random cid
checkErr(t, err)
err = clusters[j].Pin(ctx, api.PinCid(h))
_, err = clusters[j].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Error(err)
}
@ -1182,7 +1182,7 @@ func TestClustersReplicationFactorMax(t *testing.T) {
ttlDelay()
h := test.Cid1
err := clusters[0].Pin(ctx, api.PinCid(h))
_, err := clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1228,7 +1228,7 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) {
ttlDelay() // make sure we have places to pin
h := test.Cid1
err := clusters[0].Pin(ctx, api.PinCid(h))
_, err := clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1244,10 +1244,11 @@ func TestClustersReplicationFactorMaxLower(t *testing.T) {
t.Fatal("allocations should be nClusters")
}
pin := api.PinCid(h)
pin.ReplicationFactorMin = 1
pin.ReplicationFactorMax = 2
err = clusters[0].Pin(ctx, pin)
opts := api.PinOptions{
ReplicationFactorMin: 1,
ReplicationFactorMax: 2,
}
_, err = clusters[0].Pin(ctx, h, opts)
if err != nil {
t.Fatal(err)
}
@ -1288,7 +1289,7 @@ func TestClustersReplicationFactorInBetween(t *testing.T) {
waitForLeaderAndMetrics(t, clusters)
h := test.Cid1
err := clusters[0].Pin(ctx, api.PinCid(h))
_, err := clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1341,7 +1342,7 @@ func TestClustersReplicationFactorMin(t *testing.T) {
waitForLeaderAndMetrics(t, clusters)
h := test.Cid1
err := clusters[0].Pin(ctx, api.PinCid(h))
_, err := clusters[0].Pin(ctx, h, api.PinOptions{})
if err == nil {
t.Error("Pin should have failed as rplMin cannot be satisfied")
}
@ -1369,7 +1370,7 @@ func TestClustersReplicationMinMaxNoRealloc(t *testing.T) {
ttlDelay()
h := test.Cid1
err := clusters[0].Pin(ctx, api.PinCid(h))
_, err := clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1382,7 +1383,7 @@ func TestClustersReplicationMinMaxNoRealloc(t *testing.T) {
clusters[nClusters-2].Shutdown(ctx)
waitForLeaderAndMetrics(t, clusters)
err = clusters[0].Pin(ctx, api.PinCid(h))
_, err = clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1422,7 +1423,7 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
ttlDelay() // make sure metrics are in
h := test.Cid1
err := clusters[0].Pin(ctx, api.PinCid(h))
_, err := clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1452,7 +1453,7 @@ func TestClustersReplicationMinMaxRealloc(t *testing.T) {
waitForLeaderAndMetrics(t, clusters)
// Repin - (although this might have been taken of if there was an alert
err = safePeer.Pin(ctx, api.PinCid(h))
_, err = safePeer.Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1503,7 +1504,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
j := rand.Intn(nClusters)
h := test.Cid1
err := clusters[j].Pin(ctx, api.PinCid(h))
_, err := clusters[j].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1522,7 +1523,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
// Re-pin should work and be allocated to the same
// nodes
err = clusters[j].Pin(ctx, api.PinCid(h))
_, err = clusters[j].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1567,7 +1568,7 @@ func TestClustersReplicationRealloc(t *testing.T) {
}
// now pin should succeed
err = clusters[j].Pin(ctx, api.PinCid(h))
_, err = clusters[j].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1610,7 +1611,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
j := rand.Intn(nClusters)
h := test.Cid1
err := clusters[j].Pin(ctx, api.PinCid(h))
_, err := clusters[j].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -1623,7 +1624,7 @@ func TestClustersReplicationNotEnoughPeers(t *testing.T) {
waitForLeaderAndMetrics(t, clusters)
err = clusters[2].Pin(ctx, api.PinCid(h))
_, err = clusters[2].Pin(ctx, h, api.PinOptions{})
if err == nil {
t.Fatal("expected an error")
}
@ -1649,7 +1650,7 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
// pin something
h := test.Cid1
clusters[0].Pin(ctx, api.PinCid(h))
clusters[0].Pin(ctx, h, api.PinOptions{})
pinDelay()
pinLocal := 0
pinRemote := 0
@ -1855,7 +1856,7 @@ func TestClustersDisabledRepinning(t *testing.T) {
j := rand.Intn(nClusters)
h := test.Cid1
err := clusters[j].Pin(ctx, api.PinCid(h))
_, err := clusters[j].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}

View File

@ -103,7 +103,7 @@ func TestClustersPeerAdd(t *testing.T) {
}
h := test.Cid1
err := clusters[1].Pin(ctx, api.PinCid(h))
_, err := clusters[1].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -448,7 +448,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
for i := 0; i < nClusters; i++ {
h, err := prefix.Sum(randomBytes())
checkErr(t, err)
err = leader.Pin(ctx, api.PinCid(h))
_, err = leader.Pin(ctx, h, api.PinOptions{})
checkErr(t, err)
ttlDelay()
}
@ -519,8 +519,8 @@ func TestClustersPeerJoin(t *testing.T) {
}
}
hash := test.Cid1
clusters[0].Pin(ctx, api.PinCid(hash))
h := test.Cid1
clusters[0].Pin(ctx, h, api.PinOptions{})
pinDelay()
for _, p := range clusters {
@ -541,7 +541,7 @@ func TestClustersPeerJoin(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
if len(pins) != 1 || !pins[0].Cid.Equals(h) {
t.Error("all peers should have pinned the cid")
}
}
@ -566,8 +566,8 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) {
}
runF(t, clusters[1:], f)
hash := test.Cid1
clusters[0].Pin(ctx, api.PinCid(hash))
h := test.Cid1
clusters[0].Pin(ctx, h, api.PinOptions{})
pinDelay()
f2 := func(t *testing.T, c *Cluster) {
@ -579,7 +579,7 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
if len(pins) != 1 || !pins[0].Cid.Equals(h) {
t.Error("all peers should have pinned the cid")
}
}
@ -644,7 +644,7 @@ func TestClustersPeerRejoin(t *testing.T) {
// pin something in c0
pin1 := test.Cid1
err := clusters[0].Pin(ctx, api.PinCid(pin1))
_, err := clusters[0].Pin(ctx, pin1, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
@ -684,7 +684,7 @@ func TestClustersPeerRejoin(t *testing.T) {
// Pin something on the rest
pin2 := test.Cid2
err = clusters[1].Pin(ctx, api.PinCid(pin2))
_, err = clusters[1].Pin(ctx, pin2, api.PinOptions{})
if err != nil {
t.Fatal(err)
}

View File

@ -154,19 +154,32 @@ func (rpcapi *ClusterRPCAPI) ID(ctx context.Context, in struct{}, out *api.ID) e
return nil
}
// Pin runs Cluster.Pin().
func (rpcapi *ClusterRPCAPI) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
return rpcapi.c.Pin(ctx, in)
// Pin runs Cluster.pin().
func (rpcapi *ClusterRPCAPI) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
// we do not call the Pin method directly since that method does not
// allow to pin other than regular DataType pins. The adder will
// however send Meta, Shard and ClusterDAG pins.
pin, _, err := rpcapi.c.pin(ctx, in, []peer.ID{})
if err != nil {
return err
}
*out = *pin
return nil
}
// Unpin runs Cluster.Unpin().
func (rpcapi *ClusterRPCAPI) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
return rpcapi.c.Unpin(ctx, in.Cid)
func (rpcapi *ClusterRPCAPI) Unpin(ctx context.Context, in *api.Pin, out *api.Pin) error {
pin, err := rpcapi.c.Unpin(ctx, in.Cid)
if err != nil {
return err
}
*out = *pin
return nil
}
// PinPath resolves path into a cid and runs Cluster.Pin().
func (rpcapi *ClusterRPCAPI) PinPath(ctx context.Context, in *api.PinPath, out *api.Pin) error {
pin, err := rpcapi.c.PinPath(ctx, in)
pin, err := rpcapi.c.PinPath(ctx, in.Path, in.PinOptions)
if err != nil {
return err
}

View File

@ -61,17 +61,19 @@ type mockIPFSConnector struct{}
type mockConsensus struct{}
type mockPeerMonitor struct{}
func (mock *mockCluster) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
func (mock *mockCluster) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
if in.Cid.Equals(ErrorCid) {
return ErrBadCid
}
*out = *in
return nil
}
func (mock *mockCluster) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
func (mock *mockCluster) Unpin(ctx context.Context, in *api.Pin, out *api.Pin) error {
if in.Cid.Equals(ErrorCid) {
return ErrBadCid
}
*out = *in
return nil
}