Fix #732: Introduce native pin/update

This introduces a pin/update operation which allows to Pin a new item to
cluster indicating that said pin is an update to an already-existing pin.

When this is the case, all the configuration for the existing pin is copied to
the new one (including allocations). The IPFS connector will then trigger
pin/update directly in IPFS, allowing an efficient pinning based on
DAG-differences. Since the allocations where the same for both pins,
the pin/update can proceed.

PinUpdate does not unpin the previous pin (it is not possible to do this
atomically in cluster like it happens in IPFS). The user can manually do it
after the pin/update is done.

Internally, after a lot of deliberations on what the optimal way for this is,
I opted for adding a `PinUpdate` option to the `PinOptions` type (carries the
CID to update from). In order to carry this option from the REST API to the
IPFS Connector, it is serialized in the Protobuf (and stored in the
datastore). There is no other way to do this in a simple fashion since the Pin
object is piece of information that is sent around.

Additionally, making it a PinOption plays well with the Pin/PinPath APIs which
need little changes. Effectively, you are pinning a new thing. You are just
indicating that it should be configured from an existing one.

Fixes #732
This commit is contained in:
Hector Sanjuan 2019-07-12 16:40:29 +02:00
parent 00e78a6b6d
commit 1eade4ae58
16 changed files with 365 additions and 84 deletions

View File

@ -445,38 +445,17 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Get existing FROM pin, and send error if not present.
var fromPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"PinGet",
fromCid,
&fromPin,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}
// Do a PinPath setting PinUpdate
pinPath := &api.PinPath{Path: pTo.String()}
pinPath.PinUpdate = fromCid
// Prepare to pin the TO argument with the options from the FROM pin
// and the allocations of the FROM pin.
toPath := &api.PinPath{
Path: pTo.String(),
PinOptions: fromPin.PinOptions,
}
toPath.PinOptions.UserAllocations = fromPin.Allocations
// Pin the TO pin.
var toPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
var pin api.Pin
err = proxy.rpcClient.Call(
"",
"Cluster",
"PinPath",
toPath,
&toPin,
pinPath,
&pin,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
@ -492,7 +471,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"Unpin",
&fromPin,
api.PinCid(fromCid),
&pinObj,
)
if err != nil {
@ -502,7 +481,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
}
res := ipfsPinOpResp{
Pins: []string{fromCid.String(), toPin.Cid.String()},
Pins: []string{fromCid.String(), pin.Cid.String()},
}
resBytes, _ := json.Marshal(res)
w.WriteHeader(http.StatusOK)

2
api/pb/Makefile Normal file
View File

@ -0,0 +1,2 @@
all:
protoc -I=. --go_out=. types.proto

View File

@ -139,6 +139,7 @@ type PinOptions struct {
Name string `protobuf:"bytes,3,opt,name=Name,proto3" json:"Name,omitempty"`
ShardSize uint64 `protobuf:"varint,4,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"`
Metadata map[string]string `protobuf:"bytes,6,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
PinUpdate []byte `protobuf:"bytes,7,opt,name=PinUpdate,proto3" json:"PinUpdate,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -204,6 +205,13 @@ func (m *PinOptions) GetMetadata() map[string]string {
return nil
}
func (m *PinOptions) GetPinUpdate() []byte {
if m != nil {
return m.PinUpdate
}
return nil
}
func init() {
proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value)
proto.RegisterType((*Pin)(nil), "api.pb.Pin")
@ -214,29 +222,30 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{
// 377 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0x4d, 0x6f, 0xda, 0x40,
0x10, 0xed, 0xda, 0xc6, 0xe0, 0x31, 0x20, 0x98, 0x72, 0xb0, 0x50, 0x0f, 0x2b, 0x2e, 0xf5, 0xa1,
0xf2, 0xc1, 0xbd, 0x54, 0x6d, 0x2f, 0x14, 0xda, 0x4a, 0x95, 0x68, 0xd1, 0xd2, 0xfc, 0x80, 0x05,
0x36, 0x62, 0x15, 0xc7, 0x5e, 0x99, 0x25, 0xc2, 0xf9, 0x37, 0x39, 0xe5, 0x6f, 0x46, 0xbb, 0xe6,
0x2b, 0x0a, 0x07, 0x4b, 0xf3, 0xde, 0x9b, 0xe7, 0x9d, 0x79, 0x1a, 0x08, 0x75, 0xa5, 0xc4, 0x36,
0x51, 0x65, 0xa1, 0x0b, 0xf4, 0xb9, 0x92, 0x89, 0x5a, 0x8e, 0x9e, 0x1c, 0x70, 0xe7, 0x32, 0xc7,
0x1e, 0xb8, 0x13, 0xb9, 0x8e, 0x08, 0x25, 0x71, 0x9b, 0x99, 0x12, 0x3f, 0x82, 0xf7, 0xbf, 0x52,
0x22, 0x72, 0x28, 0x89, 0xbb, 0xe9, 0xfb, 0xa4, 0x36, 0x24, 0x73, 0x99, 0x9b, 0xcf, 0x48, 0xcc,
0x36, 0x20, 0x85, 0x70, 0x9c, 0x65, 0xc5, 0x8a, 0x6b, 0x59, 0xe4, 0xdb, 0xc8, 0xa5, 0x6e, 0xdc,
0x66, 0x97, 0x14, 0x0e, 0xa1, 0x35, 0xe3, 0xfb, 0xa9, 0x50, 0x7a, 0x13, 0x79, 0x94, 0xc4, 0x7d,
0x76, 0xc2, 0xf8, 0x01, 0x02, 0x26, 0x6e, 0x45, 0x29, 0xf2, 0x95, 0x88, 0x1a, 0xf6, 0xf9, 0x33,
0x81, 0x9f, 0xa0, 0xf9, 0x4f, 0xd5, 0xff, 0xf5, 0x29, 0x89, 0xc3, 0x14, 0x2f, 0xe6, 0x38, 0x28,
0xec, 0xd8, 0x32, 0xba, 0x81, 0xe6, 0x61, 0x34, 0x0c, 0xa1, 0xf9, 0x83, 0xaf, 0x4d, 0xd9, 0x7b,
0x87, 0x6d, 0x68, 0x4d, 0xb9, 0xe6, 0x16, 0x11, 0x83, 0x66, 0xe2, 0x80, 0x1c, 0x44, 0xe8, 0x4e,
0xb2, 0xdd, 0x56, 0x8b, 0x72, 0x3a, 0xfe, 0x6d, 0x39, 0x17, 0x3b, 0x10, 0x2c, 0x36, 0xbc, 0xac,
0xed, 0xde, 0xe8, 0xd9, 0x01, 0x38, 0x3f, 0x87, 0x29, 0x0c, 0x98, 0x50, 0x99, 0xac, 0xb7, 0xfb,
0xc5, 0x57, 0xba, 0x28, 0x67, 0x32, 0xb7, 0xd9, 0xf5, 0xd9, 0x55, 0xed, 0xba, 0x87, 0xef, 0x6d,
0xb8, 0x57, 0x3d, 0x7c, 0x8f, 0x08, 0xde, 0x5f, 0x7e, 0x2f, 0x22, 0x97, 0x92, 0x38, 0x60, 0xb6,
0x36, 0x69, 0xd9, 0xc9, 0x16, 0xf2, 0x51, 0xd8, 0x28, 0x3d, 0x76, 0x26, 0xf0, 0x7b, 0xbd, 0xd9,
0x9a, 0x6b, 0x1e, 0xf9, 0xd4, 0x8d, 0xc3, 0x94, 0xbe, 0x8d, 0x2b, 0x39, 0xb6, 0xfc, 0xcc, 0x75,
0x59, 0xb1, 0x93, 0x63, 0xf8, 0x0d, 0x3a, 0xaf, 0x24, 0x73, 0x13, 0x77, 0xa2, 0xb2, 0x7b, 0x05,
0xcc, 0x94, 0x38, 0x80, 0xc6, 0x03, 0xcf, 0x76, 0xf5, 0x51, 0x04, 0xac, 0x06, 0x5f, 0x9d, 0x2f,
0xe4, 0x8f, 0xd7, 0x6a, 0xf4, 0xfc, 0xa5, 0x6f, 0x8f, 0xeb, 0xf3, 0x4b, 0x00, 0x00, 0x00, 0xff,
0xff, 0xaf, 0xb8, 0xbd, 0x88, 0x6b, 0x02, 0x00, 0x00,
// 391 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcd, 0x8e, 0xd3, 0x30,
0x10, 0xc6, 0x49, 0x9a, 0x34, 0x93, 0xee, 0xaa, 0x3b, 0xec, 0xc1, 0x5a, 0x71, 0xb0, 0x7a, 0x21,
0x07, 0x94, 0x43, 0xb8, 0x20, 0xe0, 0xb2, 0x6c, 0x01, 0x09, 0xa9, 0x50, 0x79, 0xe9, 0x03, 0xb8,
0x8d, 0x51, 0x2d, 0x42, 0x62, 0xa5, 0x2e, 0x6a, 0x78, 0x1b, 0x1e, 0x86, 0xf7, 0x42, 0xb6, 0xfb,
0x87, 0xe8, 0x21, 0xd2, 0x7c, 0xdf, 0xcc, 0xe7, 0x99, 0xf9, 0x32, 0x90, 0x99, 0x5e, 0xcb, 0x4d,
0xa1, 0xbb, 0xd6, 0xb4, 0x18, 0x0b, 0xad, 0x0a, 0xbd, 0x9c, 0xfc, 0x0e, 0x20, 0x9c, 0xab, 0x06,
0xc7, 0x10, 0x3e, 0xa8, 0x8a, 0x12, 0x46, 0xf2, 0x11, 0xb7, 0x21, 0x3e, 0x87, 0xe8, 0x6b, 0xaf,
0x25, 0x0d, 0x18, 0xc9, 0xaf, 0xcb, 0xa7, 0x85, 0x17, 0x14, 0x73, 0xd5, 0xd8, 0xcf, 0xa6, 0xb8,
0x2b, 0x40, 0x06, 0xd9, 0x7d, 0x5d, 0xb7, 0x2b, 0x61, 0x54, 0xdb, 0x6c, 0x68, 0xc8, 0xc2, 0x7c,
0xc4, 0xcf, 0x29, 0xbc, 0x83, 0xe1, 0x4c, 0xec, 0xa6, 0x52, 0x9b, 0x35, 0x8d, 0x18, 0xc9, 0x6f,
0xf8, 0x11, 0xe3, 0x33, 0x48, 0xb9, 0xfc, 0x26, 0x3b, 0xd9, 0xac, 0x24, 0x1d, 0xb8, 0xf6, 0x27,
0x02, 0x5f, 0x40, 0xf2, 0x45, 0xfb, 0x77, 0x63, 0x46, 0xf2, 0xac, 0xc4, 0xb3, 0x39, 0xf6, 0x19,
0x7e, 0x28, 0x99, 0x2c, 0x20, 0xd9, 0x8f, 0x86, 0x19, 0x24, 0xef, 0x44, 0x65, 0xc3, 0xf1, 0x13,
0x1c, 0xc1, 0x70, 0x2a, 0x8c, 0x70, 0x88, 0x58, 0x34, 0x93, 0x7b, 0x14, 0x20, 0xc2, 0xf5, 0x43,
0xbd, 0xdd, 0x18, 0xd9, 0x4d, 0xef, 0x3f, 0x3a, 0x2e, 0xc4, 0x2b, 0x48, 0x1f, 0xd7, 0xa2, 0xf3,
0xf2, 0x68, 0xf2, 0x27, 0x00, 0x38, 0xb5, 0xc3, 0x12, 0x6e, 0xb9, 0xd4, 0xb5, 0xf2, 0xdb, 0x7d,
0x10, 0x2b, 0xd3, 0x76, 0x33, 0xd5, 0x38, 0xef, 0x6e, 0xf8, 0xc5, 0xdc, 0x65, 0x8d, 0xd8, 0x39,
0x73, 0x2f, 0x6a, 0xc4, 0x0e, 0x11, 0xa2, 0xcf, 0xe2, 0x87, 0xa4, 0x21, 0x23, 0x79, 0xca, 0x5d,
0x6c, 0xdd, 0x72, 0x93, 0x3d, 0xaa, 0x5f, 0xd2, 0x59, 0x19, 0xf1, 0x13, 0x81, 0x6f, 0xfd, 0x66,
0x95, 0x30, 0x82, 0xc6, 0x2c, 0xcc, 0xb3, 0x92, 0xfd, 0x6f, 0x57, 0x71, 0x28, 0x79, 0xdf, 0x98,
0xae, 0xe7, 0x47, 0x85, 0x7d, 0x7b, 0xae, 0x9a, 0x85, 0xae, 0x84, 0x91, 0x34, 0xf1, 0x7f, 0xe2,
0x48, 0xdc, 0xbd, 0x81, 0xab, 0x7f, 0x84, 0xf6, 0x62, 0xbe, 0xcb, 0xde, 0x6d, 0x9d, 0x72, 0x1b,
0xe2, 0x2d, 0x0c, 0x7e, 0x8a, 0x7a, 0xeb, 0x4f, 0x26, 0xe5, 0x1e, 0xbc, 0x0e, 0x5e, 0x91, 0x4f,
0xd1, 0x70, 0x30, 0x8e, 0x97, 0xb1, 0x3b, 0xbd, 0x97, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfe,
0x90, 0x29, 0x9a, 0x89, 0x02, 0x00, 0x00,
}

View File

@ -24,4 +24,5 @@ message PinOptions {
uint64 ShardSize = 4;
reserved 5; // reserved for UserAllocations
map<string, string> Metadata = 6;
bytes PinUpdate = 7;
}

View File

@ -461,6 +461,7 @@ type PinOptions struct {
ShardSize uint64 `json:"shard_size" codec:"s,omitempty"`
UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"`
Metadata map[string]string `json:"metadata" codec:"m,omitempty"`
PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"`
}
// Equals returns true if two PinOption objects are equivalent. po and po2 may
@ -507,6 +508,9 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
return false
}
}
// deliberately ignore Update
return true
}
@ -524,6 +528,7 @@ func (po *PinOptions) ToQuery() string {
}
q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v)
}
q.Set("pin-update", po.PinUpdate.String())
return q.Encode()
}
@ -563,10 +568,20 @@ func (po *PinOptions) FromQuery(q url.Values) {
}
po.Metadata[metaKey] = q.Get(k)
}
updateStr := q.Get("pin-update")
if updateStr != "" {
updateCid, err := cid.Decode(updateStr)
if err != nil {
logger.Error("error decoding update option parameter: ", err)
}
po.PinUpdate = updateCid
}
}
// Pin carries all the information associated to a CID that is pinned
// in IPFS Cluster.
// in IPFS Cluster. It also carries transient information (that may not
// get protobuffed, like UserAllocations).
type Pin struct {
PinOptions
@ -657,7 +672,8 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) {
Name: pin.Name,
ShardSize: pin.ShardSize,
// UserAllocations: pin.UserAllocations,
Metadata: pin.Metadata,
Metadata: pin.Metadata,
PinUpdate: pin.PinUpdate.Bytes(),
}
pbPin := &pb.Pin{
@ -717,6 +733,11 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error {
pin.ShardSize = opts.GetShardSize()
// pin.UserAllocations = opts.GetUserAllocations()
pin.Metadata = opts.GetMetadata()
pinUpdate, err := cid.Cast(opts.GetPinUpdate())
if err == nil {
pin.PinUpdate = pinUpdate
}
return nil
}

View File

@ -1184,12 +1184,16 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (*api.Pin, error) {
// rest of the cluster. Priority allocations are best effort. If any priority
// peers are unavailable then Pin will simply allocate from the rest of the
// cluster.
//
// If the Update option is set, the pin options (including allocations) will
// be copied from an existing one. This is equivalent to running PinUpdate.
func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()
ctx = trace.NewContext(c.ctx, span)
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
return result, err
}
@ -1280,6 +1284,8 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error {
// evacuate a node and returns the pin object that it tried to pin, whether
// the pin was submitted to the consensus layer or skipped (due to error or to
// the fact that it was already valid) and error.
//
// This is the method called by the Cluster.Pin RPC endpoint.
func (c *Cluster) pin(
ctx context.Context,
pin *api.Pin,
@ -1296,6 +1302,12 @@ func (c *Cluster) pin(
return pin, false, errors.New("bad pin object")
}
// Handle pin updates when the option is set
if update := pin.PinUpdate; update != cid.Undef && !update.Equals(pin.Cid) {
pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions)
return pin, true, err
}
// setup pin might produce some side-effects to our pin
err := c.setupPin(ctx, pin)
if err != nil {
@ -1400,6 +1412,37 @@ func (c *Cluster) unpinClusterDag(metaPin *api.Pin) error {
return nil
}
// PinUpdate pins a new CID based on an existing cluster Pin. The allocations
// and most pin options (replication factors) are copied from the existing
// Pin. The options object can be used to set the Name for the new pin and
// might support additional options in the future.
//
// The from pin is NOT unpinned upon completion. The new pin might take
// advantage of efficient pin/update operation on IPFS-side (if the
// IPFSConnector supports it - the default one does). This may offer
// significant speed when pinning items which are similar to previously pinned
// content.
func (c *Cluster) PinUpdate(ctx context.Context, from cid.Cid, to cid.Cid, opts api.PinOptions) (*api.Pin, error) {
existing, err := c.PinGet(ctx, from)
if err != nil { // including when the existing pin is not found
return nil, err
}
// Hector: I am not sure whether it has any point to update something
// like a MetaType.
if existing.Type != api.DataType {
return nil, errors.New("this pin type cannot be updated")
}
existing.Cid = to
existing.PinUpdate = from
if opts.Name != "" {
existing.Name = opts.Name
}
return existing, c.consensus.LogPin(ctx, existing)
}
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
// Pin object.
func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (*api.Pin, error) {

View File

@ -61,8 +61,8 @@ func (ipfs *mockConnector) ID(ctx context.Context) (*api.IPFSID, error) {
}, nil
}
func (ipfs *mockConnector) Pin(ctx context.Context, c cid.Cid, maxDepth int) error {
ipfs.pins.Store(c.String(), maxDepth)
func (ipfs *mockConnector) Pin(ctx context.Context, pin *api.Pin) error {
ipfs.pins.Store(pin.Cid.String(), pin.MaxDepth)
return nil
}

View File

@ -473,7 +473,7 @@ cluster "pin add".
Subcommands: []cli.Command{
{
Name: "add",
Usage: "Cluster Pin",
Usage: "Pin an item in the cluster",
Description: `
This command tells IPFS Cluster to start managing a CID. Depending on
the pinning strategy, this will trigger IPFS pin requests. The CID will
@ -491,7 +491,7 @@ comma-separated list of peer IDs on which we want to pin. Peers in allocations
are prioritized over automatically-determined ones, but replication factors
would stil be respected.
`,
ArgsUsage: "<CID>",
ArgsUsage: "<CID|Path>",
Flags: []cli.Flag{
cli.IntFlag{
Name: "replication, r",
@ -569,7 +569,7 @@ would stil be respected.
},
{
Name: "rm",
Usage: "Cluster Unpin",
Usage: "Unpin an item from the cluster",
Description: `
This command tells IPFS Cluster to no longer manage a CID. This will
trigger unpinning operations in all the IPFS nodes holding the content.
@ -578,7 +578,7 @@ When the request has succeeded, the command returns the status of the CID
in the cluster. The CID should disappear from the list offered by "pin ls",
although unpinning operations in the cluster may take longer or fail.
`,
ArgsUsage: "<CID>",
ArgsUsage: "<CID|Path>",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "no-status, ns",
@ -610,6 +610,63 @@ although unpinning operations in the cluster may take longer or fail.
return nil
},
},
{
Name: "update",
Usage: "Pin a new item based on an existing one",
Description: `
This command will add a new pin to the cluster taking all the options from an
existing one, including name. This means that the new pin will bypass the
allocation process and will be allocated to the same peers as the existing
one.
The cluster peers will try to Pin the new item on IPFS using the "pin update"
command. This is especially efficient when the content of two pins (their DAGs)
are similar.
Unlike the "pin update" command in the ipfs daemon, this will not unpin the
existing item from the cluster. Please run "pin rm" for that.
`,
ArgsUsage: "<existing-CID> <new-CID|Path>",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "no-status, ns",
Usage: "Prevents fetching pin status after unpinning (faster, quieter)",
},
cli.BoolFlag{
Name: "wait, w",
Usage: "Wait for all nodes to report a status of pinned before returning",
},
cli.DurationFlag{
Name: "wait-timeout, wt",
Value: 0,
Usage: "How long to --wait (in seconds), default is indefinitely",
},
},
Action: func(c *cli.Context) error {
from := c.Args().Get(0)
to := c.Args().Get(1)
fromCid, err := cid.Decode(from)
checkErr("parsing from Cid", err)
opts := api.PinOptions{
PinUpdate: fromCid,
}
pin, cerr := globalClient.PinPath(ctx, to, opts)
if cerr != nil {
formatResponse(c, nil, cerr)
return nil
}
handlePinResponseFormatFlags(
ctx,
c,
pin,
api.TrackerStatusPinned,
)
return nil
},
},
{
Name: "ls",
Usage: "List items in the cluster pinset",

View File

@ -264,7 +264,6 @@ func (cfg *Manager) ApplyEnvVars() error {
return err
}
}
return nil
}
@ -453,7 +452,6 @@ func (cfg *Manager) LoadJSON(bs []byte) error {
return err
}
}
return cfg.Validate()
}

View File

@ -37,11 +37,11 @@ type Consensus interface {
// allowing the main component to wait for it during start.
Ready(context.Context) <-chan struct{}
// Logs a pin operation.
LogPin(ctx context.Context, c *api.Pin) error
LogPin(context.Context, *api.Pin) error
// Logs an unpin operation.
LogUnpin(ctx context.Context, c *api.Pin) error
AddPeer(ctx context.Context, p peer.ID) error
RmPeer(ctx context.Context, p peer.ID) error
LogUnpin(context.Context, *api.Pin) error
AddPeer(context.Context, peer.ID) error
RmPeer(context.Context, peer.ID) error
State(context.Context) (state.ReadOnly, error)
// Provide a node which is responsible to perform
// specific tasks which must only run in 1 cluster peer.
@ -75,7 +75,7 @@ type API interface {
type IPFSConnector interface {
Component
ID(context.Context) (*api.IPFSID, error)
Pin(context.Context, cid.Cid, int) error
Pin(context.Context, *api.Pin) error
Unpin(context.Context, cid.Cid) error
PinLsCid(context.Context, cid.Cid) (api.IPFSPinStatus, error)
PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error)

View File

@ -28,6 +28,7 @@ import (
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/test"
"github.com/ipfs/ipfs-cluster/version"
@ -642,6 +643,64 @@ func TestClustersPin(t *testing.T) {
runF(t, clusters, funpinned)
}
func TestClustersPinUpdate(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
prefix := test.Cid1.Prefix()
ttlDelay()
h, err := prefix.Sum(randomBytes()) // create random cid
h2, err := prefix.Sum(randomBytes()) // create random cid
_, err = clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{})
if err == nil || err != state.ErrNotFound {
t.Fatal("pin update should fail when from is not pinned")
}
_, err = clusters[0].Pin(ctx, h, api.PinOptions{})
if err != nil {
t.Errorf("error pinning %s: %s", h, err)
}
pinDelay()
opts2 := api.PinOptions{
UserAllocations: []peer.ID{clusters[0].host.ID()}, // should not be used
PinUpdate: h,
Name: "new name",
}
_, err = clusters[0].Pin(ctx, h2, opts2) // should call PinUpdate
if err != nil {
t.Errorf("error pin-updating %s: %s", h2, err)
}
pinDelay()
f := func(t *testing.T, c *Cluster) {
pinget, err := c.PinGet(ctx, h2)
if err != nil {
t.Fatal(err)
}
if len(pinget.Allocations) != 0 {
t.Error("new pin should be allocated everywhere like pin1")
}
if pinget.MaxDepth != -1 {
t.Error("updated pin should be recursive like pin1")
}
if pinget.Name != "new name" {
t.Error("name should be kept")
}
}
runF(t, clusters, f)
}
func TestClustersStatusAll(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)

View File

@ -293,10 +293,13 @@ func pinArgs(maxDepth int) string {
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) error {
func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Pin")
defer span.End()
hash := pin.Cid
maxDepth := pin.MaxDepth
pinStatus, err := ipfs.PinLsCid(ctx, hash)
if err != nil {
return err
@ -312,7 +315,24 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
ctx, cancelRequest := context.WithCancel(ctx)
defer cancelRequest()
switch ipfs.config.PinMethod {
pinMethod := ipfs.config.PinMethod
// If we have a pin-update, and the old object
// is pinned recursively, then do pin/update.
// Otherwise do a normal pin.
if from := pin.PinUpdate; from != cid.Undef {
pinStatus, _ := ipfs.PinLsCid(ctx, from)
if pinStatus.IsPinned(-1) { // pinned recursively as update
pinMethod = "update"
// as a side note, if PinUpdate == pin.Cid, we are
// somehow pinning an already pinned thing and we'd better
// use update for that
}
}
switch pinMethod {
case "update":
return ipfs.pinUpdate(ctx, pin.PinUpdate, pin.Cid)
case "refs":
// do refs -r first and timeout if we don't get at least
// one ref per pin timeout
@ -482,6 +502,20 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash cid.Cid, maxDepth i
}
}
func (ipfs *Connector) pinUpdate(ctx context.Context, from, to cid.Cid) error {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinUpdate")
defer span.End()
path := fmt.Sprintf("pin/update?arg=%s&arg=%s&unpin=false", from, to)
_, err := ipfs.postCtx(ctx, path, "", nil)
if err != nil {
return err
}
logger.Infof("IPFS Pin Update request succeeded. %s -> %s (unpin=false)", from, to)
stats.Record(ctx, observations.Pins.M(1))
return nil
}
// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"testing"
"time"
@ -77,7 +78,7 @@ func testPin(t *testing.T, method string) {
ipfs.config.PinMethod = method
c := test.Cid1
err := ipfs.Pin(ctx, c, -1)
err := ipfs.Pin(ctx, api.PinCid(c))
if err != nil {
t.Error("expected success pinning cid:", err)
}
@ -90,7 +91,7 @@ func testPin(t *testing.T, method string) {
}
c2 := test.ErrorCid
err = ipfs.Pin(ctx, c2, -1)
err = ipfs.Pin(ctx, api.PinCid(c2))
if err == nil {
t.Error("expected error pinning cid")
}
@ -99,14 +100,14 @@ func testPin(t *testing.T, method string) {
case "refs":
ipfs.config.PinTimeout = 1 * time.Second
c3 := test.SlowCid1
err = ipfs.Pin(ctx, c3, -1)
err = ipfs.Pin(ctx, api.PinCid(c3))
if err == nil {
t.Error("expected error pinning cid")
}
case "pin":
ipfs.config.PinTimeout = 5 * time.Second
c4 := test.SlowCid1
err = ipfs.Pin(ctx, c4, -1)
err = ipfs.Pin(ctx, api.PinCid(c4))
if err == nil {
t.Error("expected error pinning cid")
}
@ -114,9 +115,42 @@ func testPin(t *testing.T, method string) {
}
}
func testPinUpdate(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown(ctx)
pin := api.PinCid(test.Cid1)
pin.PinUpdate = test.Cid1
// enforce pin/update even though it would be skipped
ipfs.config.PinMethod = "update"
err := ipfs.Pin(ctx, pin)
if err == nil {
t.Fatal("expected an error")
}
if !strings.HasSuffix(err.Error(), "recursively pinned already") {
t.Fatal("expected error about from not being pinned")
}
ipfs.config.PinMethod = "pin"
err = ipfs.Pin(ctx, pin)
if err != nil {
t.Fatal(err)
}
// This should trigger the pin/update path
pin.Cid = test.Cid2
err = ipfs.Pin(ctx, pin)
if err != nil {
t.Fatal(err)
}
}
func TestIPFSPin(t *testing.T) {
t.Run("method=pin", func(t *testing.T) { testPin(t, "pin") })
t.Run("method=refs", func(t *testing.T) { testPin(t, "refs") })
t.Run("method=update", func(t *testing.T) { testPinUpdate(t) })
}
func TestIPFSUnpin(t *testing.T) {
@ -129,7 +163,7 @@ func TestIPFSUnpin(t *testing.T) {
if err != nil {
t.Error("expected success unpinning non-pinned cid")
}
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, api.PinCid(c))
err = ipfs.Unpin(ctx, c)
if err != nil {
t.Error("expected success unpinning pinned cid")
@ -142,7 +176,7 @@ func TestIPFSUnpinDisabled(t *testing.T) {
defer mock.Close()
defer ipfs.Shutdown(ctx)
ipfs.config.UnpinDisable = true
err := ipfs.Pin(ctx, test.Cid1, -1)
err := ipfs.Pin(ctx, api.PinCid(test.Cid1))
if err != nil {
t.Fatal(err)
}
@ -161,7 +195,7 @@ func TestIPFSPinLsCid(t *testing.T) {
c := test.Cid1
c2 := test.Cid2
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, api.PinCid(c))
ips, err := ipfs.PinLsCid(ctx, c)
if err != nil {
t.Error(err)
@ -184,7 +218,7 @@ func TestIPFSPinLsCid_DifferentEncoding(t *testing.T) {
defer ipfs.Shutdown(ctx)
c := test.Cid4 // ipfs mock treats this specially
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, api.PinCid(c))
ips, err := ipfs.PinLsCid(ctx, c)
if err != nil {
t.Error(err)
@ -203,8 +237,8 @@ func TestIPFSPinLs(t *testing.T) {
c := test.Cid1
c2 := test.Cid2
ipfs.Pin(ctx, c, -1)
ipfs.Pin(ctx, c2, -1)
ipfs.Pin(ctx, api.PinCid(c))
ipfs.Pin(ctx, api.PinCid(c2))
ipsMap, err := ipfs.PinLs(ctx, "")
if err != nil {
t.Error("should not error")
@ -330,7 +364,7 @@ func TestRepoStat(t *testing.T) {
}
c := test.Cid1
err = ipfs.Pin(ctx, c, -1)
err = ipfs.Pin(ctx, api.PinCid(c))
if err != nil {
t.Error("expected success pinning cid")
}

View File

@ -482,7 +482,7 @@ func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in cid.Cid, out *ap
func (rpcapi *IPFSConnectorRPCAPI) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
ctx, span := trace.StartSpan(ctx, "rpc/ipfsconn/IPFSPin")
defer span.End()
return rpcapi.ipfs.Pin(ctx, in.Cid, in.MaxDepth)
return rpcapi.ipfs.Pin(ctx, in)
}
// Unpin runs IPFSConnector.Unpin().

View File

@ -88,6 +88,14 @@ test_expect_success IPFS,CLUSTER "pin data to cluster with user allocations" '
echo $allocations | grep -q ${pid}
'
test_expect_success IPFS,CLUSTER "pin update a pin" '
cid1=`docker exec ipfs sh -c "echo test | ipfs add -q"`
ipfs-cluster-ctl pin add "$cid1"
cid2=`docker exec ipfs sh -c "echo test2 | ipfs add -q"`
ipfs-cluster-ctl pin update $cid1 $cid2 &&
ipfs-cluster-ctl pin ls $cid2
'
test_clean_ipfs
test_clean_cluster

View File

@ -194,6 +194,42 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
resp := mockPinResp{
Pins: []string{arg},
}
j, _ := json.Marshal(resp)
w.Write(j)
case "pin/update":
args := r.URL.Query()["arg"]
if len(args) != 2 {
goto ERROR
}
fromStr := args[0]
toStr := args[1]
from, err := cid.Decode(fromStr)
if err != nil {
goto ERROR
}
to, err := cid.Decode(toStr)
if err != nil {
goto ERROR
}
pin, err := m.pinMap.Get(ctx, from)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
resp := ipfsErr{0, fmt.Sprintf("'from' cid was not recursively pinned already")}
j, _ := json.Marshal(resp)
w.Write(j)
return
}
pin.Cid = to
err = m.pinMap.Add(ctx, pin)
if err != nil {
goto ERROR
}
resp := mockPinResp{
Pins: []string{from.String(), to.String()},
}
j, _ := json.Marshal(resp)
w.Write(j)
case "pin/ls":