Merge pull request #869 from ipfs/fix/732-pin-update-the-good-way

Fix #732: Introduce native pin/update
This commit is contained in:
Hector Sanjuan 2019-08-12 12:15:30 +02:00 committed by GitHub
commit d8c20adc4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 376 additions and 88 deletions

View File

@ -24,7 +24,7 @@ jobs:
- stage: "Testing stage"
name: "Tests (all modules) + Coverage"
script:
- go test -v -failfast -timeout 15m -coverprofile=coverage.txt -covermode=atomic .
- go test -v -failfast -timeout 15m -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
- name: "Main Tests with crdt consensus"

View File

@ -137,7 +137,7 @@ func TestAdder_ContextCancelled(t *testing.T) {
}
t.Log(err)
}()
time.Sleep(200 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
cancel()
wg.Wait()
}

View File

@ -28,8 +28,9 @@ func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out
return nil
}
func (rpcs *testClusterRPC) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
func (rpcs *testClusterRPC) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
rpcs.pins.Store(in.Cid.String(), in)
*out = *in
return nil
}

View File

@ -32,8 +32,9 @@ func (rpcs *testRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out *st
return nil
}
func (rpcs *testRPC) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
func (rpcs *testRPC) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
rpcs.pins.Store(in.Cid.String(), in)
*out = *in
return nil
}

View File

@ -445,38 +445,17 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Get existing FROM pin, and send error if not present.
var fromPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"PinGet",
fromCid,
&fromPin,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}
// Do a PinPath setting PinUpdate
pinPath := &api.PinPath{Path: pTo.String()}
pinPath.PinUpdate = fromCid
// Prepare to pin the TO argument with the options from the FROM pin
// and the allocations of the FROM pin.
toPath := &api.PinPath{
Path: pTo.String(),
PinOptions: fromPin.PinOptions,
}
toPath.PinOptions.UserAllocations = fromPin.Allocations
// Pin the TO pin.
var toPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
var pin api.Pin
err = proxy.rpcClient.Call(
"",
"Cluster",
"PinPath",
toPath,
&toPin,
pinPath,
&pin,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
@ -492,7 +471,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"Unpin",
&fromPin,
api.PinCid(fromCid),
&pinObj,
)
if err != nil {
@ -502,7 +481,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
}
res := ipfsPinOpResp{
Pins: []string{fromCid.String(), toPin.Cid.String()},
Pins: []string{fromCid.String(), pin.Cid.String()},
}
resBytes, _ := json.Marshal(res)
w.WriteHeader(http.StatusOK)

7
api/pb/generate.go Normal file
View File

@ -0,0 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
//
// Actually, the above line is just to avoid golint complains.
// Run go:generate to generate the protobuf types in this module.
//
//go:generate protoc -I=. --go_out=. types.proto
package api_pb

View File

@ -139,6 +139,7 @@ type PinOptions struct {
Name string `protobuf:"bytes,3,opt,name=Name,proto3" json:"Name,omitempty"`
ShardSize uint64 `protobuf:"varint,4,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"`
Metadata map[string]string `protobuf:"bytes,6,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
PinUpdate []byte `protobuf:"bytes,7,opt,name=PinUpdate,proto3" json:"PinUpdate,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -204,6 +205,13 @@ func (m *PinOptions) GetMetadata() map[string]string {
return nil
}
func (m *PinOptions) GetPinUpdate() []byte {
if m != nil {
return m.PinUpdate
}
return nil
}
func init() {
proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value)
proto.RegisterType((*Pin)(nil), "api.pb.Pin")
@ -214,29 +222,30 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{
// 377 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0x4d, 0x6f, 0xda, 0x40,
0x10, 0xed, 0xda, 0xc6, 0xe0, 0x31, 0x20, 0x98, 0x72, 0xb0, 0x50, 0x0f, 0x2b, 0x2e, 0xf5, 0xa1,
0xf2, 0xc1, 0xbd, 0x54, 0x6d, 0x2f, 0x14, 0xda, 0x4a, 0x95, 0x68, 0xd1, 0xd2, 0xfc, 0x80, 0x05,
0x36, 0x62, 0x15, 0xc7, 0x5e, 0x99, 0x25, 0xc2, 0xf9, 0x37, 0x39, 0xe5, 0x6f, 0x46, 0xbb, 0xe6,
0x2b, 0x0a, 0x07, 0x4b, 0xf3, 0xde, 0x9b, 0xe7, 0x9d, 0x79, 0x1a, 0x08, 0x75, 0xa5, 0xc4, 0x36,
0x51, 0x65, 0xa1, 0x0b, 0xf4, 0xb9, 0x92, 0x89, 0x5a, 0x8e, 0x9e, 0x1c, 0x70, 0xe7, 0x32, 0xc7,
0x1e, 0xb8, 0x13, 0xb9, 0x8e, 0x08, 0x25, 0x71, 0x9b, 0x99, 0x12, 0x3f, 0x82, 0xf7, 0xbf, 0x52,
0x22, 0x72, 0x28, 0x89, 0xbb, 0xe9, 0xfb, 0xa4, 0x36, 0x24, 0x73, 0x99, 0x9b, 0xcf, 0x48, 0xcc,
0x36, 0x20, 0x85, 0x70, 0x9c, 0x65, 0xc5, 0x8a, 0x6b, 0x59, 0xe4, 0xdb, 0xc8, 0xa5, 0x6e, 0xdc,
0x66, 0x97, 0x14, 0x0e, 0xa1, 0x35, 0xe3, 0xfb, 0xa9, 0x50, 0x7a, 0x13, 0x79, 0x94, 0xc4, 0x7d,
0x76, 0xc2, 0xf8, 0x01, 0x02, 0x26, 0x6e, 0x45, 0x29, 0xf2, 0x95, 0x88, 0x1a, 0xf6, 0xf9, 0x33,
0x81, 0x9f, 0xa0, 0xf9, 0x4f, 0xd5, 0xff, 0xf5, 0x29, 0x89, 0xc3, 0x14, 0x2f, 0xe6, 0x38, 0x28,
0xec, 0xd8, 0x32, 0xba, 0x81, 0xe6, 0x61, 0x34, 0x0c, 0xa1, 0xf9, 0x83, 0xaf, 0x4d, 0xd9, 0x7b,
0x87, 0x6d, 0x68, 0x4d, 0xb9, 0xe6, 0x16, 0x11, 0x83, 0x66, 0xe2, 0x80, 0x1c, 0x44, 0xe8, 0x4e,
0xb2, 0xdd, 0x56, 0x8b, 0x72, 0x3a, 0xfe, 0x6d, 0x39, 0x17, 0x3b, 0x10, 0x2c, 0x36, 0xbc, 0xac,
0xed, 0xde, 0xe8, 0xd9, 0x01, 0x38, 0x3f, 0x87, 0x29, 0x0c, 0x98, 0x50, 0x99, 0xac, 0xb7, 0xfb,
0xc5, 0x57, 0xba, 0x28, 0x67, 0x32, 0xb7, 0xd9, 0xf5, 0xd9, 0x55, 0xed, 0xba, 0x87, 0xef, 0x6d,
0xb8, 0x57, 0x3d, 0x7c, 0x8f, 0x08, 0xde, 0x5f, 0x7e, 0x2f, 0x22, 0x97, 0x92, 0x38, 0x60, 0xb6,
0x36, 0x69, 0xd9, 0xc9, 0x16, 0xf2, 0x51, 0xd8, 0x28, 0x3d, 0x76, 0x26, 0xf0, 0x7b, 0xbd, 0xd9,
0x9a, 0x6b, 0x1e, 0xf9, 0xd4, 0x8d, 0xc3, 0x94, 0xbe, 0x8d, 0x2b, 0x39, 0xb6, 0xfc, 0xcc, 0x75,
0x59, 0xb1, 0x93, 0x63, 0xf8, 0x0d, 0x3a, 0xaf, 0x24, 0x73, 0x13, 0x77, 0xa2, 0xb2, 0x7b, 0x05,
0xcc, 0x94, 0x38, 0x80, 0xc6, 0x03, 0xcf, 0x76, 0xf5, 0x51, 0x04, 0xac, 0x06, 0x5f, 0x9d, 0x2f,
0xe4, 0x8f, 0xd7, 0x6a, 0xf4, 0xfc, 0xa5, 0x6f, 0x8f, 0xeb, 0xf3, 0x4b, 0x00, 0x00, 0x00, 0xff,
0xff, 0xaf, 0xb8, 0xbd, 0x88, 0x6b, 0x02, 0x00, 0x00,
// 391 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcd, 0x8e, 0xd3, 0x30,
0x10, 0xc6, 0x49, 0x9a, 0x34, 0x93, 0xee, 0xaa, 0x3b, 0xec, 0xc1, 0x5a, 0x71, 0xb0, 0x7a, 0x21,
0x07, 0x94, 0x43, 0xb8, 0x20, 0xe0, 0xb2, 0x6c, 0x01, 0x09, 0xa9, 0x50, 0x79, 0xe9, 0x03, 0xb8,
0x8d, 0x51, 0x2d, 0x42, 0x62, 0xa5, 0x2e, 0x6a, 0x78, 0x1b, 0x1e, 0x86, 0xf7, 0x42, 0xb6, 0xfb,
0x87, 0xe8, 0x21, 0xd2, 0x7c, 0xdf, 0xcc, 0xe7, 0x99, 0xf9, 0x32, 0x90, 0x99, 0x5e, 0xcb, 0x4d,
0xa1, 0xbb, 0xd6, 0xb4, 0x18, 0x0b, 0xad, 0x0a, 0xbd, 0x9c, 0xfc, 0x0e, 0x20, 0x9c, 0xab, 0x06,
0xc7, 0x10, 0x3e, 0xa8, 0x8a, 0x12, 0x46, 0xf2, 0x11, 0xb7, 0x21, 0x3e, 0x87, 0xe8, 0x6b, 0xaf,
0x25, 0x0d, 0x18, 0xc9, 0xaf, 0xcb, 0xa7, 0x85, 0x17, 0x14, 0x73, 0xd5, 0xd8, 0xcf, 0xa6, 0xb8,
0x2b, 0x40, 0x06, 0xd9, 0x7d, 0x5d, 0xb7, 0x2b, 0x61, 0x54, 0xdb, 0x6c, 0x68, 0xc8, 0xc2, 0x7c,
0xc4, 0xcf, 0x29, 0xbc, 0x83, 0xe1, 0x4c, 0xec, 0xa6, 0x52, 0x9b, 0x35, 0x8d, 0x18, 0xc9, 0x6f,
0xf8, 0x11, 0xe3, 0x33, 0x48, 0xb9, 0xfc, 0x26, 0x3b, 0xd9, 0xac, 0x24, 0x1d, 0xb8, 0xf6, 0x27,
0x02, 0x5f, 0x40, 0xf2, 0x45, 0xfb, 0x77, 0x63, 0x46, 0xf2, 0xac, 0xc4, 0xb3, 0x39, 0xf6, 0x19,
0x7e, 0x28, 0x99, 0x2c, 0x20, 0xd9, 0x8f, 0x86, 0x19, 0x24, 0xef, 0x44, 0x65, 0xc3, 0xf1, 0x13,
0x1c, 0xc1, 0x70, 0x2a, 0x8c, 0x70, 0x88, 0x58, 0x34, 0x93, 0x7b, 0x14, 0x20, 0xc2, 0xf5, 0x43,
0xbd, 0xdd, 0x18, 0xd9, 0x4d, 0xef, 0x3f, 0x3a, 0x2e, 0xc4, 0x2b, 0x48, 0x1f, 0xd7, 0xa2, 0xf3,
0xf2, 0x68, 0xf2, 0x27, 0x00, 0x38, 0xb5, 0xc3, 0x12, 0x6e, 0xb9, 0xd4, 0xb5, 0xf2, 0xdb, 0x7d,
0x10, 0x2b, 0xd3, 0x76, 0x33, 0xd5, 0x38, 0xef, 0x6e, 0xf8, 0xc5, 0xdc, 0x65, 0x8d, 0xd8, 0x39,
0x73, 0x2f, 0x6a, 0xc4, 0x0e, 0x11, 0xa2, 0xcf, 0xe2, 0x87, 0xa4, 0x21, 0x23, 0x79, 0xca, 0x5d,
0x6c, 0xdd, 0x72, 0x93, 0x3d, 0xaa, 0x5f, 0xd2, 0x59, 0x19, 0xf1, 0x13, 0x81, 0x6f, 0xfd, 0x66,
0x95, 0x30, 0x82, 0xc6, 0x2c, 0xcc, 0xb3, 0x92, 0xfd, 0x6f, 0x57, 0x71, 0x28, 0x79, 0xdf, 0x98,
0xae, 0xe7, 0x47, 0x85, 0x7d, 0x7b, 0xae, 0x9a, 0x85, 0xae, 0x84, 0x91, 0x34, 0xf1, 0x7f, 0xe2,
0x48, 0xdc, 0xbd, 0x81, 0xab, 0x7f, 0x84, 0xf6, 0x62, 0xbe, 0xcb, 0xde, 0x6d, 0x9d, 0x72, 0x1b,
0xe2, 0x2d, 0x0c, 0x7e, 0x8a, 0x7a, 0xeb, 0x4f, 0x26, 0xe5, 0x1e, 0xbc, 0x0e, 0x5e, 0x91, 0x4f,
0xd1, 0x70, 0x30, 0x8e, 0x97, 0xb1, 0x3b, 0xbd, 0x97, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfe,
0x90, 0x29, 0x9a, 0x89, 0x02, 0x00, 0x00,
}

View File

@ -24,4 +24,5 @@ message PinOptions {
uint64 ShardSize = 4;
reserved 5; // reserved for UserAllocations
map<string, string> Metadata = 6;
bytes PinUpdate = 7;
}

View File

@ -461,6 +461,7 @@ type PinOptions struct {
ShardSize uint64 `json:"shard_size" codec:"s,omitempty"`
UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"`
Metadata map[string]string `json:"metadata" codec:"m,omitempty"`
PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"`
}
// Equals returns true if two PinOption objects are equivalent. po and po2 may
@ -507,6 +508,9 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
return false
}
}
// deliberately ignore Update
return true
}
@ -524,6 +528,7 @@ func (po *PinOptions) ToQuery() string {
}
q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v)
}
q.Set("pin-update", po.PinUpdate.String())
return q.Encode()
}
@ -563,10 +568,20 @@ func (po *PinOptions) FromQuery(q url.Values) {
}
po.Metadata[metaKey] = q.Get(k)
}
updateStr := q.Get("pin-update")
if updateStr != "" {
updateCid, err := cid.Decode(updateStr)
if err != nil {
logger.Error("error decoding update option parameter: ", err)
}
po.PinUpdate = updateCid
}
}
// Pin carries all the information associated to a CID that is pinned
// in IPFS Cluster.
// in IPFS Cluster. It also carries transient information (that may not
// get protobuffed, like UserAllocations).
type Pin struct {
PinOptions
@ -657,7 +672,8 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) {
Name: pin.Name,
ShardSize: pin.ShardSize,
// UserAllocations: pin.UserAllocations,
Metadata: pin.Metadata,
Metadata: pin.Metadata,
PinUpdate: pin.PinUpdate.Bytes(),
}
pbPin := &pb.Pin{
@ -717,6 +733,11 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error {
pin.ShardSize = opts.GetShardSize()
// pin.UserAllocations = opts.GetUserAllocations()
pin.Metadata = opts.GetMetadata()
pinUpdate, err := cid.Cast(opts.GetPinUpdate())
if err == nil {
pin.PinUpdate = pinUpdate
}
return nil
}

View File

@ -1184,12 +1184,16 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (*api.Pin, error) {
// rest of the cluster. Priority allocations are best effort. If any priority
// peers are unavailable then Pin will simply allocate from the rest of the
// cluster.
//
// If the Update option is set, the pin options (including allocations) will
// be copied from an existing one. This is equivalent to running PinUpdate.
func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
return result, err
}
@ -1280,6 +1284,8 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error {
// evacuate a node and returns the pin object that it tried to pin, whether
// the pin was submitted to the consensus layer or skipped (due to error or to
// the fact that it was already valid) and error.
//
// This is the method called by the Cluster.Pin RPC endpoint.
func (c *Cluster) pin(
ctx context.Context,
pin *api.Pin,
@ -1296,6 +1302,12 @@ func (c *Cluster) pin(
return pin, false, errors.New("bad pin object")
}
// Handle pin updates when the option is set
if update := pin.PinUpdate; update != cid.Undef && !update.Equals(pin.Cid) {
pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions)
return pin, true, err
}
// setup pin might produce some side-effects to our pin
err := c.setupPin(ctx, pin)
if err != nil {
@ -1400,6 +1412,37 @@ func (c *Cluster) unpinClusterDag(metaPin *api.Pin) error {
return nil
}
// PinUpdate pins a new CID based on an existing cluster Pin. The allocations
// and most pin options (replication factors) are copied from the existing
// Pin. The options object can be used to set the Name for the new pin and
// might support additional options in the future.
//
// The from pin is NOT unpinned upon completion. The new pin might take
// advantage of efficient pin/update operation on IPFS-side (if the
// IPFSConnector supports it - the default one does). This may offer
// significant speed when pinning items which are similar to previously pinned
// content.
func (c *Cluster) PinUpdate(ctx context.Context, from cid.Cid, to cid.Cid, opts api.PinOptions) (*api.Pin, error) {
existing, err := c.PinGet(ctx, from)
if err != nil { // including when the existing pin is not found
return nil, err
}
// Hector: I am not sure whether it has any point to update something
// like a MetaType.
if existing.Type != api.DataType {
return nil, errors.New("this pin type cannot be updated")
}
existing.Cid = to
existing.PinUpdate = from
if opts.Name != "" {
existing.Name = opts.Name
}
return existing, c.consensus.LogPin(ctx, existing)
}
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
// Pin object.
func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (*api.Pin, error) {

View File

@ -61,8 +61,8 @@ func (ipfs *mockConnector) ID(ctx context.Context) (*api.IPFSID, error) {
}, nil
}
func (ipfs *mockConnector) Pin(ctx context.Context, c cid.Cid, maxDepth int) error {
ipfs.pins.Store(c.String(), maxDepth)
func (ipfs *mockConnector) Pin(ctx context.Context, pin *api.Pin) error {
ipfs.pins.Store(pin.Cid.String(), pin.MaxDepth)
return nil
}

View File

@ -473,7 +473,7 @@ cluster "pin add".
Subcommands: []cli.Command{
{
Name: "add",
Usage: "Cluster Pin",
Usage: "Pin an item in the cluster",
Description: `
This command tells IPFS Cluster to start managing a CID. Depending on
the pinning strategy, this will trigger IPFS pin requests. The CID will
@ -491,7 +491,7 @@ comma-separated list of peer IDs on which we want to pin. Peers in allocations
are prioritized over automatically-determined ones, but replication factors
would stil be respected.
`,
ArgsUsage: "<CID>",
ArgsUsage: "<CID|Path>",
Flags: []cli.Flag{
cli.IntFlag{
Name: "replication, r",
@ -576,7 +576,7 @@ would stil be respected.
},
{
Name: "rm",
Usage: "Cluster Unpin",
Usage: "Unpin an item from the cluster",
Description: `
This command tells IPFS Cluster to no longer manage a CID. This will
trigger unpinning operations in all the IPFS nodes holding the content.
@ -585,7 +585,7 @@ When the request has succeeded, the command returns the status of the CID
in the cluster. The CID should disappear from the list offered by "pin ls",
although unpinning operations in the cluster may take longer or fail.
`,
ArgsUsage: "<CID>",
ArgsUsage: "<CID|Path>",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "no-status, ns",
@ -617,6 +617,63 @@ although unpinning operations in the cluster may take longer or fail.
return nil
},
},
{
Name: "update",
Usage: "Pin a new item based on an existing one",
Description: `
This command will add a new pin to the cluster taking all the options from an
existing one, including name. This means that the new pin will bypass the
allocation process and will be allocated to the same peers as the existing
one.
The cluster peers will try to Pin the new item on IPFS using the "pin update"
command. This is especially efficient when the content of two pins (their DAGs)
are similar.
Unlike the "pin update" command in the ipfs daemon, this will not unpin the
existing item from the cluster. Please run "pin rm" for that.
`,
ArgsUsage: "<existing-CID> <new-CID|Path>",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "no-status, ns",
Usage: "Prevents fetching pin status after unpinning (faster, quieter)",
},
cli.BoolFlag{
Name: "wait, w",
Usage: "Wait for all nodes to report a status of pinned before returning",
},
cli.DurationFlag{
Name: "wait-timeout, wt",
Value: 0,
Usage: "How long to --wait (in seconds), default is indefinitely",
},
},
Action: func(c *cli.Context) error {
from := c.Args().Get(0)
to := c.Args().Get(1)
fromCid, err := cid.Decode(from)
checkErr("parsing from Cid", err)
opts := api.PinOptions{
PinUpdate: fromCid,
}
pin, cerr := globalClient.PinPath(ctx, to, opts)
if cerr != nil {
formatResponse(c, nil, cerr)
return nil
}
handlePinResponseFormatFlags(
ctx,
c,
pin,
api.TrackerStatusPinned,
)
return nil
},
},
{
Name: "ls",
Usage: "List items in the cluster pinset",

View File

@ -264,7 +264,6 @@ func (cfg *Manager) ApplyEnvVars() error {
return err
}
}
return nil
}
@ -453,7 +452,6 @@ func (cfg *Manager) LoadJSON(bs []byte) error {
return err
}
}
return cfg.Validate()
}

View File

@ -37,11 +37,11 @@ type Consensus interface {
// allowing the main component to wait for it during start.
Ready(context.Context) <-chan struct{}
// Logs a pin operation.
LogPin(ctx context.Context, c *api.Pin) error
LogPin(context.Context, *api.Pin) error
// Logs an unpin operation.
LogUnpin(ctx context.Context, c *api.Pin) error
AddPeer(ctx context.Context, p peer.ID) error
RmPeer(ctx context.Context, p peer.ID) error
LogUnpin(context.Context, *api.Pin) error
AddPeer(context.Context, peer.ID) error
RmPeer(context.Context, peer.ID) error
State(context.Context) (state.ReadOnly, error)
// Provide a node which is responsible to perform
// specific tasks which must only run in 1 cluster peer.
@ -75,7 +75,7 @@ type API interface {
type IPFSConnector interface {
Component
ID(context.Context) (*api.IPFSID, error)
Pin(context.Context, cid.Cid, int) error
Pin(context.Context, *api.Pin) error
Unpin(context.Context, cid.Cid) error
PinLsCid(context.Context, cid.Cid) (api.IPFSPinStatus, error)
PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error)

View File

@ -28,6 +28,7 @@ import (
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/test"
"github.com/ipfs/ipfs-cluster/version"
@ -642,6 +643,64 @@ func TestClustersPin(t *testing.T) {
runF(t, clusters, funpinned)
}
func TestClustersPinUpdate(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
prefix := test.Cid1.Prefix()
ttlDelay()
h, err := prefix.Sum(randomBytes()) // create random cid
h2, err := prefix.Sum(randomBytes()) // create random cid
_, err = clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{})
if err == nil || err != state.ErrNotFound {
t.Fatal("pin update should fail when from is not pinned")
}
_, err = clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Errorf("error pinning %s: %s", h, err)
}
pinDelay()
opts2 := api.PinOptions{
UserAllocations: []peer.ID{clusters[0].host.ID()}, // should not be used
PinUpdate: h,
Name: "new name",
}
_, err = clusters[0].Pin(ctx, h2, opts2) // should call PinUpdate
if err != nil {
t.Errorf("error pin-updating %s: %s", h2, err)
}
pinDelay()
f := func(t *testing.T, c *Cluster) {
pinget, err := c.PinGet(ctx, h2)
if err != nil {
t.Fatal(err)
}
if len(pinget.Allocations) != 0 {
t.Error("new pin should be allocated everywhere like pin1")
}
if pinget.MaxDepth != -1 {
t.Error("updated pin should be recursive like pin1")
}
if pinget.Name != "new name" {
t.Error("name should be kept")
}
}
runF(t, clusters, f)
}
func TestClustersStatusAll(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)

View File

@ -293,10 +293,13 @@ func pinArgs(maxDepth int) string {
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) error {
func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Pin")
defer span.End()
hash := pin.Cid
maxDepth := pin.MaxDepth
pinStatus, err := ipfs.PinLsCid(ctx, hash)
if err != nil {
return err
@ -312,7 +315,24 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
ctx, cancelRequest := context.WithCancel(ctx)
defer cancelRequest()
switch ipfs.config.PinMethod {
pinMethod := ipfs.config.PinMethod
// If we have a pin-update, and the old object
// is pinned recursively, then do pin/update.
// Otherwise do a normal pin.
if from := pin.PinUpdate; from != cid.Undef {
pinStatus, _ := ipfs.PinLsCid(ctx, from)
if pinStatus.IsPinned(-1) { // pinned recursively as update
pinMethod = "update"
// as a side note, if PinUpdate == pin.Cid, we are
// somehow pinning an already pinned thing and we'd better
// use update for that
}
}
switch pinMethod {
case "update":
return ipfs.pinUpdate(ctx, pin.PinUpdate, pin.Cid)
case "refs":
// do refs -r first and timeout if we don't get at least
// one ref per pin timeout
@ -482,6 +502,20 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash cid.Cid, maxDepth i
}
}
func (ipfs *Connector) pinUpdate(ctx context.Context, from, to cid.Cid) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinUpdate")
defer span.End()
path := fmt.Sprintf("pin/update?arg=%s&arg=%s&unpin=false", from, to)
_, err := ipfs.postCtx(ctx, path, "", nil)
if err != nil {
return err
}
logger.Infof("IPFS Pin Update request succeeded. %s -> %s (unpin=false)", from, to)
stats.Record(ctx, observations.Pins.M(1))
return nil
}
// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"testing"
"time"
@ -77,7 +78,7 @@ func testPin(t *testing.T, method string) {
ipfs.config.PinMethod = method
c := test.Cid1
err := ipfs.Pin(ctx, c, -1)
err := ipfs.Pin(ctx, api.PinCid(c))
if err != nil {
t.Error("expected success pinning cid:", err)
}
@ -90,7 +91,7 @@ func testPin(t *testing.T, method string) {
}
c2 := test.ErrorCid
err = ipfs.Pin(ctx, c2, -1)
err = ipfs.Pin(ctx, api.PinCid(c2))
if err == nil {
t.Error("expected error pinning cid")
}
@ -99,14 +100,14 @@ func testPin(t *testing.T, method string) {
case "refs":
ipfs.config.PinTimeout = 1 * time.Second
c3 := test.SlowCid1
err = ipfs.Pin(ctx, c3, -1)
err = ipfs.Pin(ctx, api.PinCid(c3))
if err == nil {
t.Error("expected error pinning cid")
}
case "pin":
ipfs.config.PinTimeout = 5 * time.Second
c4 := test.SlowCid1
err = ipfs.Pin(ctx, c4, -1)
err = ipfs.Pin(ctx, api.PinCid(c4))
if err == nil {
t.Error("expected error pinning cid")
}
@ -114,9 +115,42 @@ func testPin(t *testing.T, method string) {
}
}
func testPinUpdate(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown(ctx)
pin := api.PinCid(test.Cid1)
pin.PinUpdate = test.Cid1
// enforce pin/update even though it would be skipped
ipfs.config.PinMethod = "update"
err := ipfs.Pin(ctx, pin)
if err == nil {
t.Fatal("expected an error")
}
if !strings.HasSuffix(err.Error(), "recursively pinned already") {
t.Fatal("expected error about from not being pinned")
}
ipfs.config.PinMethod = "pin"
err = ipfs.Pin(ctx, pin)
if err != nil {
t.Fatal(err)
}
// This should trigger the pin/update path
pin.Cid = test.Cid2
err = ipfs.Pin(ctx, pin)
if err != nil {
t.Fatal(err)
}
}
func TestIPFSPin(t *testing.T) {
t.Run("method=pin", func(t *testing.T) { testPin(t, "pin") })
t.Run("method=refs", func(t *testing.T) { testPin(t, "refs") })
t.Run("method=update", func(t *testing.T) { testPinUpdate(t) })
}
func TestIPFSUnpin(t *testing.T) {
@ -129,7 +163,7 @@ func TestIPFSUnpin(t *testing.T) {
if err != nil {
t.Error("expected success unpinning non-pinned cid")
}
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, api.PinCid(c))
err = ipfs.Unpin(ctx, c)
if err != nil {
t.Error("expected success unpinning pinned cid")
@ -142,7 +176,7 @@ func TestIPFSUnpinDisabled(t *testing.T) {
defer mock.Close()
defer ipfs.Shutdown(ctx)
ipfs.config.UnpinDisable = true
err := ipfs.Pin(ctx, test.Cid1, -1)
err := ipfs.Pin(ctx, api.PinCid(test.Cid1))
if err != nil {
t.Fatal(err)
}
@ -161,7 +195,7 @@ func TestIPFSPinLsCid(t *testing.T) {
c := test.Cid1
c2 := test.Cid2
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, api.PinCid(c))
ips, err := ipfs.PinLsCid(ctx, c)
if err != nil {
t.Error(err)
@ -184,7 +218,7 @@ func TestIPFSPinLsCid_DifferentEncoding(t *testing.T) {
defer ipfs.Shutdown(ctx)
c := test.Cid4 // ipfs mock treats this specially
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, api.PinCid(c))
ips, err := ipfs.PinLsCid(ctx, c)
if err != nil {
t.Error(err)
@ -203,8 +237,8 @@ func TestIPFSPinLs(t *testing.T) {
c := test.Cid1
c2 := test.Cid2
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, c2, -1)
ipfs.Pin(ctx, api.PinCid(c))
ipfs.Pin(ctx, api.PinCid(c2))
ipsMap, err := ipfs.PinLs(ctx, "")
if err != nil {
t.Error("should not error")
@ -330,7 +364,7 @@ func TestRepoStat(t *testing.T) {
}
c := test.Cid1
err = ipfs.Pin(ctx, c, -1)
err = ipfs.Pin(ctx, api.PinCid(c))
if err != nil {
t.Error("expected success pinning cid")
}

View File

@ -482,7 +482,7 @@ func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in cid.Cid, out *ap
func (rpcapi *IPFSConnectorRPCAPI) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
ctx, span := trace.StartSpan(ctx, "rpc/ipfsconn/IPFSPin")
defer span.End()
return rpcapi.ipfs.Pin(ctx, in.Cid, in.MaxDepth)
return rpcapi.ipfs.Pin(ctx, in)
}
// Unpin runs IPFSConnector.Unpin().

View File

@ -88,6 +88,14 @@ test_expect_success IPFS,CLUSTER "pin data to cluster with user allocations" '
echo $allocations | grep -q ${pid}
'
test_expect_success IPFS,CLUSTER "pin update a pin" '
cid1=`docker exec ipfs sh -c "echo test | ipfs add -q"`
ipfs-cluster-ctl pin add "$cid1"
cid2=`docker exec ipfs sh -c "echo test2 | ipfs add -q"`
ipfs-cluster-ctl pin update $cid1 $cid2 &&
ipfs-cluster-ctl pin ls $cid2
'
test_clean_ipfs
test_clean_cluster

View File

@ -194,6 +194,42 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
resp := mockPinResp{
Pins: []string{arg},
}
j, _ := json.Marshal(resp)
w.Write(j)
case "pin/update":
args := r.URL.Query()["arg"]
if len(args) != 2 {
goto ERROR
}
fromStr := args[0]
toStr := args[1]
from, err := cid.Decode(fromStr)
if err != nil {
goto ERROR
}
to, err := cid.Decode(toStr)
if err != nil {
goto ERROR
}
pin, err := m.pinMap.Get(ctx, from)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
resp := ipfsErr{0, fmt.Sprintf("'from' cid was not recursively pinned already")}
j, _ := json.Marshal(resp)
w.Write(j)
return
}
pin.Cid = to
err = m.pinMap.Add(ctx, pin)
if err != nil {
goto ERROR
}
resp := mockPinResp{
Pins: []string{from.String(), to.String()},
}
j, _ := json.Marshal(resp)
w.Write(j)
case "pin/ls":