From 1eade4ae58bb59ef637ebc746537c89e7a69cd97 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 12 Jul 2019 16:40:29 +0200 Subject: [PATCH] Fix #732: Introduce native pin/update This introduces a pin/update operation which allows to Pin a new item to cluster indicating that said pin is an update to an already-existing pin. When this is the case, all the configuration for the existing pin is copied to the new one (including allocations). The IPFS connector will then trigger pin/update directly in IPFS, allowing an efficient pinning based on DAG-differences. Since the allocations where the same for both pins, the pin/update can proceed. PinUpdate does not unpin the previous pin (it is not possible to do this atomically in cluster like it happens in IPFS). The user can manually do it after the pin/update is done. Internally, after a lot of deliberations on what the optimal way for this is, I opted for adding a `PinUpdate` option to the `PinOptions` type (carries the CID to update from). In order to carry this option from the REST API to the IPFS Connector, it is serialized in the Protobuf (and stored in the datastore). There is no other way to do this in a simple fashion since the Pin object is piece of information that is sent around. Additionally, making it a PinOption plays well with the Pin/PinPath APIs which need little changes. Effectively, you are pinning a new thing. You are just indicating that it should be configured from an existing one. Fixes #732 --- api/ipfsproxy/ipfsproxy.go | 39 +++++------------- api/pb/Makefile | 2 + api/pb/types.pb.go | 59 +++++++++++++++------------ api/pb/types.proto | 1 + api/types.go | 25 +++++++++++- cluster.go | 43 ++++++++++++++++++++ cluster_test.go | 4 +- cmd/ipfs-cluster-ctl/main.go | 65 ++++++++++++++++++++++++++++-- config/config.go | 2 - ipfscluster.go | 10 ++--- ipfscluster_test.go | 59 +++++++++++++++++++++++++++ ipfsconn/ipfshttp/ipfshttp.go | 38 ++++++++++++++++- ipfsconn/ipfshttp/ipfshttp_test.go | 56 ++++++++++++++++++++----- rpc_api.go | 2 +- sharness/t0030-ctl-pin.sh | 8 ++++ test/ipfs_mock.go | 36 +++++++++++++++++ 16 files changed, 365 insertions(+), 84 deletions(-) create mode 100644 api/pb/Makefile diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index 5ce3612d..3d61a96d 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -445,38 +445,17 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) { return } - // Get existing FROM pin, and send error if not present. - var fromPin api.Pin - err = proxy.rpcClient.CallContext( - ctx, - "", - "Cluster", - "PinGet", - fromCid, - &fromPin, - ) - if err != nil { - ipfsErrorResponder(w, err.Error(), -1) - return - } + // Do a PinPath setting PinUpdate + pinPath := &api.PinPath{Path: pTo.String()} + pinPath.PinUpdate = fromCid - // Prepare to pin the TO argument with the options from the FROM pin - // and the allocations of the FROM pin. - toPath := &api.PinPath{ - Path: pTo.String(), - PinOptions: fromPin.PinOptions, - } - toPath.PinOptions.UserAllocations = fromPin.Allocations - - // Pin the TO pin. - var toPin api.Pin - err = proxy.rpcClient.CallContext( - ctx, + var pin api.Pin + err = proxy.rpcClient.Call( "", "Cluster", "PinPath", - toPath, - &toPin, + pinPath, + &pin, ) if err != nil { ipfsErrorResponder(w, err.Error(), -1) @@ -492,7 +471,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) { "", "Cluster", "Unpin", - &fromPin, + api.PinCid(fromCid), &pinObj, ) if err != nil { @@ -502,7 +481,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) { } res := ipfsPinOpResp{ - Pins: []string{fromCid.String(), toPin.Cid.String()}, + Pins: []string{fromCid.String(), pin.Cid.String()}, } resBytes, _ := json.Marshal(res) w.WriteHeader(http.StatusOK) diff --git a/api/pb/Makefile b/api/pb/Makefile new file mode 100644 index 00000000..4583b8f7 --- /dev/null +++ b/api/pb/Makefile @@ -0,0 +1,2 @@ +all: + protoc -I=. --go_out=. types.proto diff --git a/api/pb/types.pb.go b/api/pb/types.pb.go index 27ebce0f..432d8654 100644 --- a/api/pb/types.pb.go +++ b/api/pb/types.pb.go @@ -139,6 +139,7 @@ type PinOptions struct { Name string `protobuf:"bytes,3,opt,name=Name,proto3" json:"Name,omitempty"` ShardSize uint64 `protobuf:"varint,4,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"` Metadata map[string]string `protobuf:"bytes,6,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + PinUpdate []byte `protobuf:"bytes,7,opt,name=PinUpdate,proto3" json:"PinUpdate,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -204,6 +205,13 @@ func (m *PinOptions) GetMetadata() map[string]string { return nil } +func (m *PinOptions) GetPinUpdate() []byte { + if m != nil { + return m.PinUpdate + } + return nil +} + func init() { proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value) proto.RegisterType((*Pin)(nil), "api.pb.Pin") @@ -214,29 +222,30 @@ func init() { func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } var fileDescriptor_d938547f84707355 = []byte{ - // 377 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0x4d, 0x6f, 0xda, 0x40, - 0x10, 0xed, 0xda, 0xc6, 0xe0, 0x31, 0x20, 0x98, 0x72, 0xb0, 0x50, 0x0f, 0x2b, 0x2e, 0xf5, 0xa1, - 0xf2, 0xc1, 0xbd, 0x54, 0x6d, 0x2f, 0x14, 0xda, 0x4a, 0x95, 0x68, 0xd1, 0xd2, 0xfc, 0x80, 0x05, - 0x36, 0x62, 0x15, 0xc7, 0x5e, 0x99, 0x25, 0xc2, 0xf9, 0x37, 0x39, 0xe5, 0x6f, 0x46, 0xbb, 0xe6, - 0x2b, 0x0a, 0x07, 0x4b, 0xf3, 0xde, 0x9b, 0xe7, 0x9d, 0x79, 0x1a, 0x08, 0x75, 0xa5, 0xc4, 0x36, - 0x51, 0x65, 0xa1, 0x0b, 0xf4, 0xb9, 0x92, 0x89, 0x5a, 0x8e, 0x9e, 0x1c, 0x70, 0xe7, 0x32, 0xc7, - 0x1e, 0xb8, 0x13, 0xb9, 0x8e, 0x08, 0x25, 0x71, 0x9b, 0x99, 0x12, 0x3f, 0x82, 0xf7, 0xbf, 0x52, - 0x22, 0x72, 0x28, 0x89, 0xbb, 0xe9, 0xfb, 0xa4, 0x36, 0x24, 0x73, 0x99, 0x9b, 0xcf, 0x48, 0xcc, - 0x36, 0x20, 0x85, 0x70, 0x9c, 0x65, 0xc5, 0x8a, 0x6b, 0x59, 0xe4, 0xdb, 0xc8, 0xa5, 0x6e, 0xdc, - 0x66, 0x97, 0x14, 0x0e, 0xa1, 0x35, 0xe3, 0xfb, 0xa9, 0x50, 0x7a, 0x13, 0x79, 0x94, 0xc4, 0x7d, - 0x76, 0xc2, 0xf8, 0x01, 0x02, 0x26, 0x6e, 0x45, 0x29, 0xf2, 0x95, 0x88, 0x1a, 0xf6, 0xf9, 0x33, - 0x81, 0x9f, 0xa0, 0xf9, 0x4f, 0xd5, 0xff, 0xf5, 0x29, 0x89, 0xc3, 0x14, 0x2f, 0xe6, 0x38, 0x28, - 0xec, 0xd8, 0x32, 0xba, 0x81, 0xe6, 0x61, 0x34, 0x0c, 0xa1, 0xf9, 0x83, 0xaf, 0x4d, 0xd9, 0x7b, - 0x87, 0x6d, 0x68, 0x4d, 0xb9, 0xe6, 0x16, 0x11, 0x83, 0x66, 0xe2, 0x80, 0x1c, 0x44, 0xe8, 0x4e, - 0xb2, 0xdd, 0x56, 0x8b, 0x72, 0x3a, 0xfe, 0x6d, 0x39, 0x17, 0x3b, 0x10, 0x2c, 0x36, 0xbc, 0xac, - 0xed, 0xde, 0xe8, 0xd9, 0x01, 0x38, 0x3f, 0x87, 0x29, 0x0c, 0x98, 0x50, 0x99, 0xac, 0xb7, 0xfb, - 0xc5, 0x57, 0xba, 0x28, 0x67, 0x32, 0xb7, 0xd9, 0xf5, 0xd9, 0x55, 0xed, 0xba, 0x87, 0xef, 0x6d, - 0xb8, 0x57, 0x3d, 0x7c, 0x8f, 0x08, 0xde, 0x5f, 0x7e, 0x2f, 0x22, 0x97, 0x92, 0x38, 0x60, 0xb6, - 0x36, 0x69, 0xd9, 0xc9, 0x16, 0xf2, 0x51, 0xd8, 0x28, 0x3d, 0x76, 0x26, 0xf0, 0x7b, 0xbd, 0xd9, - 0x9a, 0x6b, 0x1e, 0xf9, 0xd4, 0x8d, 0xc3, 0x94, 0xbe, 0x8d, 0x2b, 0x39, 0xb6, 0xfc, 0xcc, 0x75, - 0x59, 0xb1, 0x93, 0x63, 0xf8, 0x0d, 0x3a, 0xaf, 0x24, 0x73, 0x13, 0x77, 0xa2, 0xb2, 0x7b, 0x05, - 0xcc, 0x94, 0x38, 0x80, 0xc6, 0x03, 0xcf, 0x76, 0xf5, 0x51, 0x04, 0xac, 0x06, 0x5f, 0x9d, 0x2f, - 0xe4, 0x8f, 0xd7, 0x6a, 0xf4, 0xfc, 0xa5, 0x6f, 0x8f, 0xeb, 0xf3, 0x4b, 0x00, 0x00, 0x00, 0xff, - 0xff, 0xaf, 0xb8, 0xbd, 0x88, 0x6b, 0x02, 0x00, 0x00, + // 391 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcd, 0x8e, 0xd3, 0x30, + 0x10, 0xc6, 0x49, 0x9a, 0x34, 0x93, 0xee, 0xaa, 0x3b, 0xec, 0xc1, 0x5a, 0x71, 0xb0, 0x7a, 0x21, + 0x07, 0x94, 0x43, 0xb8, 0x20, 0xe0, 0xb2, 0x6c, 0x01, 0x09, 0xa9, 0x50, 0x79, 0xe9, 0x03, 0xb8, + 0x8d, 0x51, 0x2d, 0x42, 0x62, 0xa5, 0x2e, 0x6a, 0x78, 0x1b, 0x1e, 0x86, 0xf7, 0x42, 0xb6, 0xfb, + 0x87, 0xe8, 0x21, 0xd2, 0x7c, 0xdf, 0xcc, 0xe7, 0x99, 0xf9, 0x32, 0x90, 0x99, 0x5e, 0xcb, 0x4d, + 0xa1, 0xbb, 0xd6, 0xb4, 0x18, 0x0b, 0xad, 0x0a, 0xbd, 0x9c, 0xfc, 0x0e, 0x20, 0x9c, 0xab, 0x06, + 0xc7, 0x10, 0x3e, 0xa8, 0x8a, 0x12, 0x46, 0xf2, 0x11, 0xb7, 0x21, 0x3e, 0x87, 0xe8, 0x6b, 0xaf, + 0x25, 0x0d, 0x18, 0xc9, 0xaf, 0xcb, 0xa7, 0x85, 0x17, 0x14, 0x73, 0xd5, 0xd8, 0xcf, 0xa6, 0xb8, + 0x2b, 0x40, 0x06, 0xd9, 0x7d, 0x5d, 0xb7, 0x2b, 0x61, 0x54, 0xdb, 0x6c, 0x68, 0xc8, 0xc2, 0x7c, + 0xc4, 0xcf, 0x29, 0xbc, 0x83, 0xe1, 0x4c, 0xec, 0xa6, 0x52, 0x9b, 0x35, 0x8d, 0x18, 0xc9, 0x6f, + 0xf8, 0x11, 0xe3, 0x33, 0x48, 0xb9, 0xfc, 0x26, 0x3b, 0xd9, 0xac, 0x24, 0x1d, 0xb8, 0xf6, 0x27, + 0x02, 0x5f, 0x40, 0xf2, 0x45, 0xfb, 0x77, 0x63, 0x46, 0xf2, 0xac, 0xc4, 0xb3, 0x39, 0xf6, 0x19, + 0x7e, 0x28, 0x99, 0x2c, 0x20, 0xd9, 0x8f, 0x86, 0x19, 0x24, 0xef, 0x44, 0x65, 0xc3, 0xf1, 0x13, + 0x1c, 0xc1, 0x70, 0x2a, 0x8c, 0x70, 0x88, 0x58, 0x34, 0x93, 0x7b, 0x14, 0x20, 0xc2, 0xf5, 0x43, + 0xbd, 0xdd, 0x18, 0xd9, 0x4d, 0xef, 0x3f, 0x3a, 0x2e, 0xc4, 0x2b, 0x48, 0x1f, 0xd7, 0xa2, 0xf3, + 0xf2, 0x68, 0xf2, 0x27, 0x00, 0x38, 0xb5, 0xc3, 0x12, 0x6e, 0xb9, 0xd4, 0xb5, 0xf2, 0xdb, 0x7d, + 0x10, 0x2b, 0xd3, 0x76, 0x33, 0xd5, 0x38, 0xef, 0x6e, 0xf8, 0xc5, 0xdc, 0x65, 0x8d, 0xd8, 0x39, + 0x73, 0x2f, 0x6a, 0xc4, 0x0e, 0x11, 0xa2, 0xcf, 0xe2, 0x87, 0xa4, 0x21, 0x23, 0x79, 0xca, 0x5d, + 0x6c, 0xdd, 0x72, 0x93, 0x3d, 0xaa, 0x5f, 0xd2, 0x59, 0x19, 0xf1, 0x13, 0x81, 0x6f, 0xfd, 0x66, + 0x95, 0x30, 0x82, 0xc6, 0x2c, 0xcc, 0xb3, 0x92, 0xfd, 0x6f, 0x57, 0x71, 0x28, 0x79, 0xdf, 0x98, + 0xae, 0xe7, 0x47, 0x85, 0x7d, 0x7b, 0xae, 0x9a, 0x85, 0xae, 0x84, 0x91, 0x34, 0xf1, 0x7f, 0xe2, + 0x48, 0xdc, 0xbd, 0x81, 0xab, 0x7f, 0x84, 0xf6, 0x62, 0xbe, 0xcb, 0xde, 0x6d, 0x9d, 0x72, 0x1b, + 0xe2, 0x2d, 0x0c, 0x7e, 0x8a, 0x7a, 0xeb, 0x4f, 0x26, 0xe5, 0x1e, 0xbc, 0x0e, 0x5e, 0x91, 0x4f, + 0xd1, 0x70, 0x30, 0x8e, 0x97, 0xb1, 0x3b, 0xbd, 0x97, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfe, + 0x90, 0x29, 0x9a, 0x89, 0x02, 0x00, 0x00, } diff --git a/api/pb/types.proto b/api/pb/types.proto index 8d8d6d19..70ad76fe 100644 --- a/api/pb/types.proto +++ b/api/pb/types.proto @@ -24,4 +24,5 @@ message PinOptions { uint64 ShardSize = 4; reserved 5; // reserved for UserAllocations map Metadata = 6; + bytes PinUpdate = 7; } \ No newline at end of file diff --git a/api/types.go b/api/types.go index 65a19118..6ea85285 100644 --- a/api/types.go +++ b/api/types.go @@ -461,6 +461,7 @@ type PinOptions struct { ShardSize uint64 `json:"shard_size" codec:"s,omitempty"` UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"` Metadata map[string]string `json:"metadata" codec:"m,omitempty"` + PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` } // Equals returns true if two PinOption objects are equivalent. po and po2 may @@ -507,6 +508,9 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool { return false } } + + // deliberately ignore Update + return true } @@ -524,6 +528,7 @@ func (po *PinOptions) ToQuery() string { } q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v) } + q.Set("pin-update", po.PinUpdate.String()) return q.Encode() } @@ -563,10 +568,20 @@ func (po *PinOptions) FromQuery(q url.Values) { } po.Metadata[metaKey] = q.Get(k) } + + updateStr := q.Get("pin-update") + if updateStr != "" { + updateCid, err := cid.Decode(updateStr) + if err != nil { + logger.Error("error decoding update option parameter: ", err) + } + po.PinUpdate = updateCid + } } // Pin carries all the information associated to a CID that is pinned -// in IPFS Cluster. +// in IPFS Cluster. It also carries transient information (that may not +// get protobuffed, like UserAllocations). type Pin struct { PinOptions @@ -657,7 +672,8 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) { Name: pin.Name, ShardSize: pin.ShardSize, // UserAllocations: pin.UserAllocations, - Metadata: pin.Metadata, + Metadata: pin.Metadata, + PinUpdate: pin.PinUpdate.Bytes(), } pbPin := &pb.Pin{ @@ -717,6 +733,11 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { pin.ShardSize = opts.GetShardSize() // pin.UserAllocations = opts.GetUserAllocations() pin.Metadata = opts.GetMetadata() + + pinUpdate, err := cid.Cast(opts.GetPinUpdate()) + if err == nil { + pin.PinUpdate = pinUpdate + } return nil } diff --git a/cluster.go b/cluster.go index 2e6f875b..d16218b0 100644 --- a/cluster.go +++ b/cluster.go @@ -1184,12 +1184,16 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (*api.Pin, error) { // rest of the cluster. Priority allocations are best effort. If any priority // peers are unavailable then Pin will simply allocate from the rest of the // cluster. +// +// If the Update option is set, the pin options (including allocations) will +// be copied from an existing one. This is equivalent to running PinUpdate. func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (*api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/Pin") defer span.End() ctx = trace.NewContext(c.ctx, span) pin := api.PinWithOpts(h, opts) + result, _, err := c.pin(ctx, pin, []peer.ID{}) return result, err } @@ -1280,6 +1284,8 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error { // evacuate a node and returns the pin object that it tried to pin, whether // the pin was submitted to the consensus layer or skipped (due to error or to // the fact that it was already valid) and error. +// +// This is the method called by the Cluster.Pin RPC endpoint. func (c *Cluster) pin( ctx context.Context, pin *api.Pin, @@ -1296,6 +1302,12 @@ func (c *Cluster) pin( return pin, false, errors.New("bad pin object") } + // Handle pin updates when the option is set + if update := pin.PinUpdate; update != cid.Undef && !update.Equals(pin.Cid) { + pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions) + return pin, true, err + } + // setup pin might produce some side-effects to our pin err := c.setupPin(ctx, pin) if err != nil { @@ -1400,6 +1412,37 @@ func (c *Cluster) unpinClusterDag(metaPin *api.Pin) error { return nil } +// PinUpdate pins a new CID based on an existing cluster Pin. The allocations +// and most pin options (replication factors) are copied from the existing +// Pin. The options object can be used to set the Name for the new pin and +// might support additional options in the future. +// +// The from pin is NOT unpinned upon completion. The new pin might take +// advantage of efficient pin/update operation on IPFS-side (if the +// IPFSConnector supports it - the default one does). This may offer +// significant speed when pinning items which are similar to previously pinned +// content. +func (c *Cluster) PinUpdate(ctx context.Context, from cid.Cid, to cid.Cid, opts api.PinOptions) (*api.Pin, error) { + existing, err := c.PinGet(ctx, from) + if err != nil { // including when the existing pin is not found + return nil, err + } + + // Hector: I am not sure whether it has any point to update something + // like a MetaType. + if existing.Type != api.DataType { + return nil, errors.New("this pin type cannot be updated") + } + + existing.Cid = to + existing.PinUpdate = from + if opts.Name != "" { + existing.Name = opts.Name + } + + return existing, c.consensus.LogPin(ctx, existing) +} + // PinPath pins an CID resolved from its IPFS Path. It returns the resolved // Pin object. func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (*api.Pin, error) { diff --git a/cluster_test.go b/cluster_test.go index 1843f9ef..562317cd 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -61,8 +61,8 @@ func (ipfs *mockConnector) ID(ctx context.Context) (*api.IPFSID, error) { }, nil } -func (ipfs *mockConnector) Pin(ctx context.Context, c cid.Cid, maxDepth int) error { - ipfs.pins.Store(c.String(), maxDepth) +func (ipfs *mockConnector) Pin(ctx context.Context, pin *api.Pin) error { + ipfs.pins.Store(pin.Cid.String(), pin.MaxDepth) return nil } diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index f49df483..2fe10908 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -473,7 +473,7 @@ cluster "pin add". Subcommands: []cli.Command{ { Name: "add", - Usage: "Cluster Pin", + Usage: "Pin an item in the cluster", Description: ` This command tells IPFS Cluster to start managing a CID. Depending on the pinning strategy, this will trigger IPFS pin requests. The CID will @@ -491,7 +491,7 @@ comma-separated list of peer IDs on which we want to pin. Peers in allocations are prioritized over automatically-determined ones, but replication factors would stil be respected. `, - ArgsUsage: "", + ArgsUsage: "", Flags: []cli.Flag{ cli.IntFlag{ Name: "replication, r", @@ -569,7 +569,7 @@ would stil be respected. }, { Name: "rm", - Usage: "Cluster Unpin", + Usage: "Unpin an item from the cluster", Description: ` This command tells IPFS Cluster to no longer manage a CID. This will trigger unpinning operations in all the IPFS nodes holding the content. @@ -578,7 +578,7 @@ When the request has succeeded, the command returns the status of the CID in the cluster. The CID should disappear from the list offered by "pin ls", although unpinning operations in the cluster may take longer or fail. `, - ArgsUsage: "", + ArgsUsage: "", Flags: []cli.Flag{ cli.BoolFlag{ Name: "no-status, ns", @@ -610,6 +610,63 @@ although unpinning operations in the cluster may take longer or fail. return nil }, }, + { + Name: "update", + Usage: "Pin a new item based on an existing one", + Description: ` +This command will add a new pin to the cluster taking all the options from an +existing one, including name. This means that the new pin will bypass the +allocation process and will be allocated to the same peers as the existing +one. + +The cluster peers will try to Pin the new item on IPFS using the "pin update" +command. This is especially efficient when the content of two pins (their DAGs) +are similar. + +Unlike the "pin update" command in the ipfs daemon, this will not unpin the +existing item from the cluster. Please run "pin rm" for that. +`, + ArgsUsage: " ", + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "no-status, ns", + Usage: "Prevents fetching pin status after unpinning (faster, quieter)", + }, + cli.BoolFlag{ + Name: "wait, w", + Usage: "Wait for all nodes to report a status of pinned before returning", + }, + cli.DurationFlag{ + Name: "wait-timeout, wt", + Value: 0, + Usage: "How long to --wait (in seconds), default is indefinitely", + }, + }, + Action: func(c *cli.Context) error { + from := c.Args().Get(0) + to := c.Args().Get(1) + + fromCid, err := cid.Decode(from) + checkErr("parsing from Cid", err) + + opts := api.PinOptions{ + PinUpdate: fromCid, + } + + pin, cerr := globalClient.PinPath(ctx, to, opts) + if cerr != nil { + formatResponse(c, nil, cerr) + return nil + } + handlePinResponseFormatFlags( + ctx, + c, + pin, + api.TrackerStatusPinned, + ) + return nil + }, + }, { Name: "ls", Usage: "List items in the cluster pinset", diff --git a/config/config.go b/config/config.go index c7025a72..469405ed 100644 --- a/config/config.go +++ b/config/config.go @@ -264,7 +264,6 @@ func (cfg *Manager) ApplyEnvVars() error { return err } } - return nil } @@ -453,7 +452,6 @@ func (cfg *Manager) LoadJSON(bs []byte) error { return err } } - return cfg.Validate() } diff --git a/ipfscluster.go b/ipfscluster.go index 1421eee4..55678789 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -37,11 +37,11 @@ type Consensus interface { // allowing the main component to wait for it during start. Ready(context.Context) <-chan struct{} // Logs a pin operation. - LogPin(ctx context.Context, c *api.Pin) error + LogPin(context.Context, *api.Pin) error // Logs an unpin operation. - LogUnpin(ctx context.Context, c *api.Pin) error - AddPeer(ctx context.Context, p peer.ID) error - RmPeer(ctx context.Context, p peer.ID) error + LogUnpin(context.Context, *api.Pin) error + AddPeer(context.Context, peer.ID) error + RmPeer(context.Context, peer.ID) error State(context.Context) (state.ReadOnly, error) // Provide a node which is responsible to perform // specific tasks which must only run in 1 cluster peer. @@ -75,7 +75,7 @@ type API interface { type IPFSConnector interface { Component ID(context.Context) (*api.IPFSID, error) - Pin(context.Context, cid.Cid, int) error + Pin(context.Context, *api.Pin) error Unpin(context.Context, cid.Cid) error PinLsCid(context.Context, cid.Cid) (api.IPFSPinStatus, error) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 68a93cee..17fb208b 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -28,6 +28,7 @@ import ( "github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" + "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/version" @@ -642,6 +643,64 @@ func TestClustersPin(t *testing.T) { runF(t, clusters, funpinned) } +func TestClustersPinUpdate(t *testing.T) { + ctx := context.Background() + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + prefix := test.Cid1.Prefix() + + ttlDelay() + + h, err := prefix.Sum(randomBytes()) // create random cid + h2, err := prefix.Sum(randomBytes()) // create random cid + + _, err = clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{}) + if err == nil || err != state.ErrNotFound { + t.Fatal("pin update should fail when from is not pinned") + } + + _, err = clusters[0].Pin(ctx, h, api.PinOptions{}) + if err != nil { + t.Errorf("error pinning %s: %s", h, err) + } + + pinDelay() + + opts2 := api.PinOptions{ + UserAllocations: []peer.ID{clusters[0].host.ID()}, // should not be used + PinUpdate: h, + Name: "new name", + } + + _, err = clusters[0].Pin(ctx, h2, opts2) // should call PinUpdate + if err != nil { + t.Errorf("error pin-updating %s: %s", h2, err) + } + + pinDelay() + + f := func(t *testing.T, c *Cluster) { + pinget, err := c.PinGet(ctx, h2) + if err != nil { + t.Fatal(err) + } + + if len(pinget.Allocations) != 0 { + t.Error("new pin should be allocated everywhere like pin1") + } + + if pinget.MaxDepth != -1 { + t.Error("updated pin should be recursive like pin1") + } + + if pinget.Name != "new name" { + t.Error("name should be kept") + } + } + runF(t, clusters, f) + +} + func TestClustersStatusAll(t *testing.T) { ctx := context.Background() clusters, mock := createClusters(t) diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 629d478c..a658c612 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -293,10 +293,13 @@ func pinArgs(maxDepth int) string { // Pin performs a pin request against the configured IPFS // daemon. -func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) error { +func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Pin") defer span.End() + hash := pin.Cid + maxDepth := pin.MaxDepth + pinStatus, err := ipfs.PinLsCid(ctx, hash) if err != nil { return err @@ -312,7 +315,24 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro ctx, cancelRequest := context.WithCancel(ctx) defer cancelRequest() - switch ipfs.config.PinMethod { + pinMethod := ipfs.config.PinMethod + + // If we have a pin-update, and the old object + // is pinned recursively, then do pin/update. + // Otherwise do a normal pin. + if from := pin.PinUpdate; from != cid.Undef { + pinStatus, _ := ipfs.PinLsCid(ctx, from) + if pinStatus.IsPinned(-1) { // pinned recursively as update + pinMethod = "update" + // as a side note, if PinUpdate == pin.Cid, we are + // somehow pinning an already pinned thing and we'd better + // use update for that + } + } + + switch pinMethod { + case "update": + return ipfs.pinUpdate(ctx, pin.PinUpdate, pin.Cid) case "refs": // do refs -r first and timeout if we don't get at least // one ref per pin timeout @@ -482,6 +502,20 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash cid.Cid, maxDepth i } } +func (ipfs *Connector) pinUpdate(ctx context.Context, from, to cid.Cid) error { + ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinUpdate") + defer span.End() + + path := fmt.Sprintf("pin/update?arg=%s&arg=%s&unpin=false", from, to) + _, err := ipfs.postCtx(ctx, path, "", nil) + if err != nil { + return err + } + logger.Infof("IPFS Pin Update request succeeded. %s -> %s (unpin=false)", from, to) + stats.Record(ctx, observations.Pins.M(1)) + return nil +} + // Unpin performs an unpin request against the configured IPFS // daemon. func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error { diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index 895afc62..a85c3169 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strings" "testing" "time" @@ -77,7 +78,7 @@ func testPin(t *testing.T, method string) { ipfs.config.PinMethod = method c := test.Cid1 - err := ipfs.Pin(ctx, c, -1) + err := ipfs.Pin(ctx, api.PinCid(c)) if err != nil { t.Error("expected success pinning cid:", err) } @@ -90,7 +91,7 @@ func testPin(t *testing.T, method string) { } c2 := test.ErrorCid - err = ipfs.Pin(ctx, c2, -1) + err = ipfs.Pin(ctx, api.PinCid(c2)) if err == nil { t.Error("expected error pinning cid") } @@ -99,14 +100,14 @@ func testPin(t *testing.T, method string) { case "refs": ipfs.config.PinTimeout = 1 * time.Second c3 := test.SlowCid1 - err = ipfs.Pin(ctx, c3, -1) + err = ipfs.Pin(ctx, api.PinCid(c3)) if err == nil { t.Error("expected error pinning cid") } case "pin": ipfs.config.PinTimeout = 5 * time.Second c4 := test.SlowCid1 - err = ipfs.Pin(ctx, c4, -1) + err = ipfs.Pin(ctx, api.PinCid(c4)) if err == nil { t.Error("expected error pinning cid") } @@ -114,9 +115,42 @@ func testPin(t *testing.T, method string) { } } +func testPinUpdate(t *testing.T) { + ctx := context.Background() + ipfs, mock := testIPFSConnector(t) + defer mock.Close() + defer ipfs.Shutdown(ctx) + + pin := api.PinCid(test.Cid1) + pin.PinUpdate = test.Cid1 + // enforce pin/update even though it would be skipped + ipfs.config.PinMethod = "update" + err := ipfs.Pin(ctx, pin) + if err == nil { + t.Fatal("expected an error") + } + if !strings.HasSuffix(err.Error(), "recursively pinned already") { + t.Fatal("expected error about from not being pinned") + } + + ipfs.config.PinMethod = "pin" + err = ipfs.Pin(ctx, pin) + if err != nil { + t.Fatal(err) + } + + // This should trigger the pin/update path + pin.Cid = test.Cid2 + err = ipfs.Pin(ctx, pin) + if err != nil { + t.Fatal(err) + } +} + func TestIPFSPin(t *testing.T) { t.Run("method=pin", func(t *testing.T) { testPin(t, "pin") }) t.Run("method=refs", func(t *testing.T) { testPin(t, "refs") }) + t.Run("method=update", func(t *testing.T) { testPinUpdate(t) }) } func TestIPFSUnpin(t *testing.T) { @@ -129,7 +163,7 @@ func TestIPFSUnpin(t *testing.T) { if err != nil { t.Error("expected success unpinning non-pinned cid") } - ipfs.Pin(ctx, c, -1) + ipfs.Pin(ctx, api.PinCid(c)) err = ipfs.Unpin(ctx, c) if err != nil { t.Error("expected success unpinning pinned cid") @@ -142,7 +176,7 @@ func TestIPFSUnpinDisabled(t *testing.T) { defer mock.Close() defer ipfs.Shutdown(ctx) ipfs.config.UnpinDisable = true - err := ipfs.Pin(ctx, test.Cid1, -1) + err := ipfs.Pin(ctx, api.PinCid(test.Cid1)) if err != nil { t.Fatal(err) } @@ -161,7 +195,7 @@ func TestIPFSPinLsCid(t *testing.T) { c := test.Cid1 c2 := test.Cid2 - ipfs.Pin(ctx, c, -1) + ipfs.Pin(ctx, api.PinCid(c)) ips, err := ipfs.PinLsCid(ctx, c) if err != nil { t.Error(err) @@ -184,7 +218,7 @@ func TestIPFSPinLsCid_DifferentEncoding(t *testing.T) { defer ipfs.Shutdown(ctx) c := test.Cid4 // ipfs mock treats this specially - ipfs.Pin(ctx, c, -1) + ipfs.Pin(ctx, api.PinCid(c)) ips, err := ipfs.PinLsCid(ctx, c) if err != nil { t.Error(err) @@ -203,8 +237,8 @@ func TestIPFSPinLs(t *testing.T) { c := test.Cid1 c2 := test.Cid2 - ipfs.Pin(ctx, c, -1) - ipfs.Pin(ctx, c2, -1) + ipfs.Pin(ctx, api.PinCid(c)) + ipfs.Pin(ctx, api.PinCid(c2)) ipsMap, err := ipfs.PinLs(ctx, "") if err != nil { t.Error("should not error") @@ -330,7 +364,7 @@ func TestRepoStat(t *testing.T) { } c := test.Cid1 - err = ipfs.Pin(ctx, c, -1) + err = ipfs.Pin(ctx, api.PinCid(c)) if err != nil { t.Error("expected success pinning cid") } diff --git a/rpc_api.go b/rpc_api.go index 3a9e9e65..4eb067d6 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -482,7 +482,7 @@ func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in cid.Cid, out *ap func (rpcapi *IPFSConnectorRPCAPI) Pin(ctx context.Context, in *api.Pin, out *struct{}) error { ctx, span := trace.StartSpan(ctx, "rpc/ipfsconn/IPFSPin") defer span.End() - return rpcapi.ipfs.Pin(ctx, in.Cid, in.MaxDepth) + return rpcapi.ipfs.Pin(ctx, in) } // Unpin runs IPFSConnector.Unpin(). diff --git a/sharness/t0030-ctl-pin.sh b/sharness/t0030-ctl-pin.sh index e4fded84..bce2f1dd 100755 --- a/sharness/t0030-ctl-pin.sh +++ b/sharness/t0030-ctl-pin.sh @@ -88,6 +88,14 @@ test_expect_success IPFS,CLUSTER "pin data to cluster with user allocations" ' echo $allocations | grep -q ${pid} ' +test_expect_success IPFS,CLUSTER "pin update a pin" ' + cid1=`docker exec ipfs sh -c "echo test | ipfs add -q"` + ipfs-cluster-ctl pin add "$cid1" + cid2=`docker exec ipfs sh -c "echo test2 | ipfs add -q"` + ipfs-cluster-ctl pin update $cid1 $cid2 && + ipfs-cluster-ctl pin ls $cid2 +' + test_clean_ipfs test_clean_cluster diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index f7795f84..bd9ea455 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -194,6 +194,42 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { resp := mockPinResp{ Pins: []string{arg}, } + j, _ := json.Marshal(resp) + w.Write(j) + case "pin/update": + args := r.URL.Query()["arg"] + if len(args) != 2 { + goto ERROR + } + fromStr := args[0] + toStr := args[1] + from, err := cid.Decode(fromStr) + if err != nil { + goto ERROR + } + to, err := cid.Decode(toStr) + if err != nil { + goto ERROR + } + + pin, err := m.pinMap.Get(ctx, from) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + resp := ipfsErr{0, fmt.Sprintf("'from' cid was not recursively pinned already")} + j, _ := json.Marshal(resp) + w.Write(j) + return + } + pin.Cid = to + err = m.pinMap.Add(ctx, pin) + if err != nil { + goto ERROR + } + + resp := mockPinResp{ + Pins: []string{from.String(), to.String()}, + } + j, _ := json.Marshal(resp) w.Write(j) case "pin/ls":