Fix #481: Pin expiration

This adds a new PinOption: ExpireAt.

The StateSync ticker will check and unpin expired pins from the Cluster.

ipfs-cluster-ctl supports an "expire-in" which gives a duration.
This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-09-18 21:35:55 +05:30 committed by Hector Sanjuan
parent 295915272b
commit e4e1cbea6e
16 changed files with 345 additions and 76 deletions

View File

@ -172,9 +172,15 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
} }
// ToQueryString returns a url query string (key=value&key2=value2&...) // ToQueryString returns a url query string (key=value&key2=value2&...)
func (p *AddParams) ToQueryString() string { func (p *AddParams) ToQueryString() (string, error) {
pinOptsQuery := p.PinOptions.ToQuery() pinOptsQuery, err := p.PinOptions.ToQuery()
query, _ := url.ParseQuery(pinOptsQuery) if err != nil {
return "", err
}
query, err := url.ParseQuery(pinOptsQuery)
if err != nil {
return "", err
}
query.Set("shard", fmt.Sprintf("%t", p.Shard)) query.Set("shard", fmt.Sprintf("%t", p.Shard))
query.Set("local", fmt.Sprintf("%t", p.Local)) query.Set("local", fmt.Sprintf("%t", p.Local))
query.Set("recursive", fmt.Sprintf("%t", p.Recursive)) query.Set("recursive", fmt.Sprintf("%t", p.Recursive))
@ -188,7 +194,7 @@ func (p *AddParams) ToQueryString() string {
query.Set("hash", p.HashFun) query.Set("hash", p.HashFun)
query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels)) query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels))
query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy)) query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy))
return query.Encode() return query.Encode(), nil
} }
// Equals checks if p equals p2. // Equals checks if p equals p2.

View File

@ -35,11 +35,13 @@ func TestAddParams_ToQueryString(t *testing.T) {
p.Name = "something" p.Name = "something"
p.RawLeaves = true p.RawLeaves = true
p.ShardSize = 1020 p.ShardSize = 1020
qstr := p.ToQueryString() qstr, err := p.ToQueryString()
if err != nil {
t.Fatal(err)
}
q, err := url.ParseQuery(qstr) q, err := url.ParseQuery(qstr)
if err != nil { if err != nil {
t.Fatal() t.Fatal(err)
} }
p2, err := AddParamsFromQuery(q) p2, err := AddParamsFromQuery(q)

View File

@ -140,6 +140,7 @@ type PinOptions struct {
ShardSize uint64 `protobuf:"varint,4,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"` ShardSize uint64 `protobuf:"varint,4,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"`
Metadata map[string]string `protobuf:"bytes,6,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Metadata map[string]string `protobuf:"bytes,6,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
PinUpdate []byte `protobuf:"bytes,7,opt,name=PinUpdate,proto3" json:"PinUpdate,omitempty"` PinUpdate []byte `protobuf:"bytes,7,opt,name=PinUpdate,proto3" json:"PinUpdate,omitempty"`
ExpireAt uint64 `protobuf:"varint,8,opt,name=ExpireAt,proto3" json:"ExpireAt,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -212,6 +213,13 @@ func (m *PinOptions) GetPinUpdate() []byte {
return nil return nil
} }
func (m *PinOptions) GetExpireAt() uint64 {
if m != nil {
return m.ExpireAt
}
return 0
}
func init() { func init() {
proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value) proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value)
proto.RegisterType((*Pin)(nil), "api.pb.Pin") proto.RegisterType((*Pin)(nil), "api.pb.Pin")
@ -222,30 +230,31 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{ var fileDescriptor_d938547f84707355 = []byte{
// 391 bytes of a gzipped FileDescriptorProto // 405 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcd, 0x8e, 0xd3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xc6, 0x49, 0x9a, 0x34, 0x93, 0xee, 0xaa, 0x3b, 0xec, 0xc1, 0x5a, 0x71, 0xb0, 0x7a, 0x21, 0x10, 0x65, 0x6d, 0xc7, 0xb1, 0xc7, 0x69, 0x95, 0x0e, 0x3d, 0xac, 0x2a, 0x0e, 0xab, 0x5c, 0xf0,
0x07, 0x94, 0x43, 0xb8, 0x20, 0xe0, 0xb2, 0x6c, 0x01, 0x09, 0xa9, 0x50, 0x79, 0xe9, 0x03, 0xb8, 0x01, 0xf9, 0x60, 0x2e, 0x08, 0xb8, 0x84, 0xa6, 0x20, 0x21, 0x05, 0xa2, 0x2d, 0xfd, 0x80, 0x6d,
0x8d, 0x51, 0x2d, 0x42, 0x62, 0xa5, 0x2e, 0x6a, 0x78, 0x1b, 0x1e, 0x86, 0xf7, 0x42, 0xb6, 0xfb, 0xbc, 0xa8, 0x2b, 0x8c, 0xbd, 0x72, 0xb6, 0xc8, 0xe6, 0x6f, 0xf8, 0x34, 0xfe, 0x04, 0xed, 0x6e,
0x87, 0xe8, 0x21, 0xd2, 0x7c, 0xdf, 0xcc, 0xe7, 0x99, 0xf9, 0x32, 0x90, 0x99, 0x5e, 0xcb, 0x4d, 0xea, 0x14, 0x35, 0x07, 0x4b, 0xf3, 0xde, 0xcc, 0x9b, 0x19, 0xbf, 0x1d, 0xc8, 0xcc, 0xa0, 0xe5,
0xa1, 0xbb, 0xd6, 0xb4, 0x18, 0x0b, 0xad, 0x0a, 0xbd, 0x9c, 0xfc, 0x0e, 0x20, 0x9c, 0xab, 0x06, 0xae, 0xd0, 0x5d, 0x6b, 0x5a, 0x8c, 0x85, 0x56, 0x85, 0xbe, 0x5d, 0xfc, 0x09, 0x20, 0xdc, 0xa8,
0xc7, 0x10, 0x3e, 0xa8, 0x8a, 0x12, 0x46, 0xf2, 0x11, 0xb7, 0x21, 0x3e, 0x87, 0xe8, 0x6b, 0xaf, 0x06, 0xe7, 0x10, 0x5e, 0xaa, 0x8a, 0x12, 0x46, 0xf2, 0x19, 0xb7, 0x21, 0xbe, 0x84, 0xe8, 0xdb,
0x25, 0x0d, 0x18, 0xc9, 0xaf, 0xcb, 0xa7, 0x85, 0x17, 0x14, 0x73, 0xd5, 0xd8, 0xcf, 0xa6, 0xb8, 0xa0, 0x25, 0x0d, 0x18, 0xc9, 0x4f, 0xcb, 0xe7, 0x85, 0x17, 0x14, 0x1b, 0xd5, 0xd8, 0xcf, 0xa6,
0x2b, 0x40, 0x06, 0xd9, 0x7d, 0x5d, 0xb7, 0x2b, 0x61, 0x54, 0xdb, 0x6c, 0x68, 0xc8, 0xc2, 0x7c, 0xb8, 0x2b, 0x40, 0x06, 0xd9, 0xb2, 0xae, 0xdb, 0xad, 0x30, 0xaa, 0x6d, 0x76, 0x34, 0x64, 0x61,
0xc4, 0xcf, 0x29, 0xbc, 0x83, 0xe1, 0x4c, 0xec, 0xa6, 0x52, 0x9b, 0x35, 0x8d, 0x18, 0xc9, 0x6f, 0x3e, 0xe3, 0x8f, 0x29, 0xbc, 0x80, 0x64, 0x2d, 0xfa, 0x95, 0xd4, 0xe6, 0x8e, 0x46, 0x8c, 0xe4,
0xf8, 0x11, 0xe3, 0x33, 0x48, 0xb9, 0xfc, 0x26, 0x3b, 0xd9, 0xac, 0x24, 0x1d, 0xb8, 0xf6, 0x27, 0x67, 0x7c, 0xc4, 0xf8, 0x02, 0x52, 0x2e, 0xbf, 0xcb, 0x4e, 0x36, 0x5b, 0x49, 0x27, 0x6e, 0xfc,
0x02, 0x5f, 0x40, 0xf2, 0x45, 0xfb, 0x77, 0x63, 0x46, 0xf2, 0xac, 0xc4, 0xb3, 0x39, 0xf6, 0x19, 0x81, 0xc0, 0x57, 0x30, 0xfd, 0xaa, 0x7d, 0xdf, 0x98, 0x91, 0x3c, 0x2b, 0xf1, 0xd1, 0x1e, 0xfb,
0x7e, 0x28, 0x99, 0x2c, 0x20, 0xd9, 0x8f, 0x86, 0x19, 0x24, 0xef, 0x44, 0x65, 0xc3, 0xf1, 0x13, 0x0c, 0x7f, 0x28, 0x59, 0xdc, 0xc0, 0x74, 0xbf, 0x1a, 0x66, 0x30, 0xfd, 0x20, 0x2a, 0x1b, 0xce,
0x1c, 0xc1, 0x70, 0x2a, 0x8c, 0x70, 0x88, 0x58, 0x34, 0x93, 0x7b, 0x14, 0x20, 0xc2, 0xf5, 0x43, 0x9f, 0xe1, 0x0c, 0x92, 0x95, 0x30, 0xc2, 0x21, 0x62, 0xd1, 0x5a, 0xee, 0x51, 0x80, 0x08, 0xa7,
0xbd, 0xdd, 0x18, 0xd9, 0x4d, 0xef, 0x3f, 0x3a, 0x2e, 0xc4, 0x2b, 0x48, 0x1f, 0xd7, 0xa2, 0xf3, 0x97, 0xf5, 0xfd, 0xce, 0xc8, 0x6e, 0xb5, 0xfc, 0xe4, 0xb8, 0x10, 0x4f, 0x20, 0xbd, 0xbe, 0x13,
0xf2, 0x68, 0xf2, 0x27, 0x00, 0x38, 0xb5, 0xc3, 0x12, 0x6e, 0xb9, 0xd4, 0xb5, 0xf2, 0xdb, 0x7d, 0x9d, 0x97, 0x47, 0x8b, 0xbf, 0x01, 0xc0, 0x61, 0x1c, 0x96, 0x70, 0xce, 0xa5, 0xae, 0x95, 0xff,
0x10, 0x2b, 0xd3, 0x76, 0x33, 0xd5, 0x38, 0xef, 0x6e, 0xf8, 0xc5, 0xdc, 0x65, 0x8d, 0xd8, 0x39, 0xbb, 0x8f, 0x62, 0x6b, 0xda, 0x6e, 0xad, 0x1a, 0xe7, 0xdd, 0x19, 0x3f, 0x9a, 0x3b, 0xae, 0x11,
0x73, 0x2f, 0x6a, 0xc4, 0x0e, 0x11, 0xa2, 0xcf, 0xe2, 0x87, 0xa4, 0x21, 0x23, 0x79, 0xca, 0x5d, 0xbd, 0x33, 0xf7, 0xa8, 0x46, 0xf4, 0x88, 0x10, 0x7d, 0x11, 0x3f, 0x25, 0x0d, 0x19, 0xc9, 0x53,
0x6c, 0xdd, 0x72, 0x93, 0x3d, 0xaa, 0x5f, 0xd2, 0x59, 0x19, 0xf1, 0x13, 0x81, 0x6f, 0xfd, 0x66, 0xee, 0x62, 0xeb, 0x96, 0xdb, 0xec, 0x5a, 0xfd, 0x96, 0xce, 0xca, 0x88, 0x1f, 0x08, 0x7c, 0xef,
0x95, 0x30, 0x82, 0xc6, 0x2c, 0xcc, 0xb3, 0x92, 0xfd, 0x6f, 0x57, 0x71, 0x28, 0x79, 0xdf, 0x98, 0xff, 0xac, 0x12, 0x46, 0xd0, 0x98, 0x85, 0x79, 0x56, 0xb2, 0xa7, 0x76, 0x15, 0x0f, 0x25, 0x57,
0xae, 0xe7, 0x47, 0x85, 0x7d, 0x7b, 0xae, 0x9a, 0x85, 0xae, 0x84, 0x91, 0x34, 0xf1, 0x7f, 0xe2, 0x8d, 0xe9, 0x06, 0x3e, 0x2a, 0x6c, 0xef, 0x8d, 0x6a, 0x6e, 0x74, 0x25, 0x8c, 0xa4, 0x53, 0xff,
0x48, 0xdc, 0xbd, 0x81, 0xab, 0x7f, 0x84, 0xf6, 0x62, 0xbe, 0xcb, 0xde, 0x6d, 0x9d, 0x72, 0x1b, 0x12, 0x23, 0x61, 0xdf, 0xf0, 0xaa, 0xd7, 0xaa, 0x93, 0x4b, 0x43, 0x13, 0x37, 0x78, 0xc4, 0x17,
0xe2, 0x2d, 0x0c, 0x7e, 0x8a, 0x7a, 0xeb, 0x4f, 0x26, 0xe5, 0x1e, 0xbc, 0x0e, 0x5e, 0x91, 0x4f, 0xef, 0xe0, 0xe4, 0xbf, 0xa6, 0xf6, 0x9a, 0x7e, 0xc8, 0xc1, 0x39, 0x92, 0x72, 0x1b, 0xe2, 0x39,
0xd1, 0x70, 0x30, 0x8e, 0x97, 0xb1, 0x3b, 0xbd, 0x97, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfe, 0x4c, 0x7e, 0x89, 0xfa, 0xde, 0x9f, 0x53, 0xca, 0x3d, 0x78, 0x1b, 0xbc, 0x21, 0x9f, 0xa3, 0x64,
0x90, 0x29, 0x9a, 0x89, 0x02, 0x00, 0x00, 0x32, 0x8f, 0x6f, 0x63, 0x77, 0x96, 0xaf, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0xaf, 0x59, 0x74,
0x86, 0xa5, 0x02, 0x00, 0x00,
} }

View File

@ -25,4 +25,5 @@ message PinOptions {
reserved 5; // reserved for UserAllocations reserved 5; // reserved for UserAllocations
map<string, string> Metadata = 6; map<string, string> Metadata = 6;
bytes PinUpdate = 7; bytes PinUpdate = 7;
uint64 ExpireAt = 8;
} }

View File

@ -78,14 +78,18 @@ func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions
ctx, span := trace.StartSpan(ctx, "client/Pin") ctx, span := trace.StartSpan(ctx, "client/Pin")
defer span.End() defer span.End()
query, err := opts.ToQuery()
if err != nil {
return nil, err
}
var pin api.Pin var pin api.Pin
err := c.do( err = c.do(
ctx, ctx,
"POST", "POST",
fmt.Sprintf( fmt.Sprintf(
"/pins/%s?%s", "/pins/%s?%s",
ci.String(), ci.String(),
opts.ToQuery(), query,
), ),
nil, nil,
nil, nil,
@ -119,14 +123,17 @@ func (c *defaultClient) PinPath(ctx context.Context, path string, opts api.PinOp
if err != nil { if err != nil {
return nil, err return nil, err
} }
query, err := opts.ToQuery()
if err != nil {
return nil, err
}
err = c.do( err = c.do(
ctx, ctx,
"POST", "POST",
fmt.Sprintf( fmt.Sprintf(
"/pins%s?%s", "/pins%s?%s",
ipfspath.String(), ipfspath.String(),
opts.ToQuery(), query,
), ),
nil, nil,
nil, nil,
@ -572,7 +579,10 @@ func (c *defaultClient) AddMultiFile(
// This method must run with StreamChannels set. // This method must run with StreamChannels set.
params.StreamChannels = true params.StreamChannels = true
queryStr := params.ToQueryString() queryStr, err := params.ToQueryString()
if err != nil {
return err
}
// our handler decodes an AddedOutput and puts it // our handler decodes an AddedOutput and puts it
// in the out channel. // in the out channel.
@ -589,7 +599,7 @@ func (c *defaultClient) AddMultiFile(
return nil return nil
} }
err := c.doStream(ctx, err = c.doStream(ctx,
"POST", "POST",
"/add?"+queryStr, "/add?"+queryStr,
headers, headers,

View File

@ -601,8 +601,12 @@ type pathCase struct {
expectedCid string expectedCid string
} }
func (p *pathCase) WithQuery() string { func (p *pathCase) WithQuery(t *testing.T) string {
return p.path + "?" + p.opts.ToQuery() query, err := p.opts.ToQuery()
if err != nil {
t.Fatal(err)
}
return p.path + "?" + query
} }
var testPinOpts = api.PinOptions{ var testPinOpts = api.PinOptions{
@ -610,6 +614,7 @@ var testPinOpts = api.PinOptions{
ReplicationFactorMin: 6, ReplicationFactorMin: 6,
Name: "hello there", Name: "hello there",
UserAllocations: []peer.ID{test.PeerID1, test.PeerID2}, UserAllocations: []peer.ID{test.PeerID1, test.PeerID2},
ExpireAt: time.Now().Add(30 * time.Second),
} }
var pathTestCases = []pathCase{ var pathTestCases = []pathCase{
@ -660,7 +665,8 @@ func TestAPIPinEndpointWithPath(t *testing.T) {
if testCase.wantErr { if testCase.wantErr {
errResp := api.Error{} errResp := api.Error{}
makePost(t, rest, url(rest)+"/pins"+testCase.WithQuery(), []byte{}, &errResp) q := testCase.WithQuery(t)
makePost(t, rest, url(rest)+"/pins"+q, []byte{}, &errResp)
if errResp.Code != testCase.code { if errResp.Code != testCase.code {
t.Errorf( t.Errorf(
"status code: expected: %d, got: %d, path: %s\n", "status code: expected: %d, got: %d, path: %s\n",
@ -672,7 +678,8 @@ func TestAPIPinEndpointWithPath(t *testing.T) {
continue continue
} }
pin := api.Pin{} pin := api.Pin{}
makePost(t, rest, url(rest)+"/pins"+testCase.WithQuery(), []byte{}, &pin) q := testCase.WithQuery(t)
makePost(t, rest, url(rest)+"/pins"+q, []byte{}, &pin)
if !pin.Equals(resultantPin) { if !pin.Equals(resultantPin) {
t.Errorf("pin: expected: %+v", resultantPin) t.Errorf("pin: expected: %+v", resultantPin)
t.Errorf("pin: got: %+v", pin) t.Errorf("pin: got: %+v", pin)

View File

@ -36,6 +36,8 @@ import (
var logger = logging.Logger("apitypes") var logger = logging.Logger("apitypes")
var unixZero = time.Unix(0, 0)
func init() { func init() {
// Use /p2p/ multiaddresses // Use /p2p/ multiaddresses
multiaddr.SwapToP2pMultiaddrs() multiaddr.SwapToP2pMultiaddrs()
@ -464,6 +466,7 @@ type PinOptions struct {
Name string `json:"name" codec:"n,omitempty"` Name string `json:"name" codec:"n,omitempty"`
ShardSize uint64 `json:"shard_size" codec:"s,omitempty"` ShardSize uint64 `json:"shard_size" codec:"s,omitempty"`
UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"` UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"`
ExpireAt time.Time `json:"expire_at" codec:"e,omitempty"`
Metadata map[string]string `json:"metadata" codec:"m,omitempty"` Metadata map[string]string `json:"metadata" codec:"m,omitempty"`
PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"`
} }
@ -510,6 +513,10 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
return false return false
} }
if !po.ExpireAt.Equal(po2.ExpireAt) {
return false
}
for k, v := range po.Metadata { for k, v := range po.Metadata {
v2 := po2.Metadata[k] v2 := po2.Metadata[k]
if k != "" && v != v2 { if k != "" && v != v2 {
@ -523,13 +530,20 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
} }
// ToQuery returns the PinOption as query arguments. // ToQuery returns the PinOption as query arguments.
func (po *PinOptions) ToQuery() string { func (po *PinOptions) ToQuery() (string, error) {
q := url.Values{} q := url.Values{}
q.Set("replication-min", fmt.Sprintf("%d", po.ReplicationFactorMin)) q.Set("replication-min", fmt.Sprintf("%d", po.ReplicationFactorMin))
q.Set("replication-max", fmt.Sprintf("%d", po.ReplicationFactorMax)) q.Set("replication-max", fmt.Sprintf("%d", po.ReplicationFactorMax))
q.Set("name", po.Name) q.Set("name", po.Name)
q.Set("shard-size", fmt.Sprintf("%d", po.ShardSize)) q.Set("shard-size", fmt.Sprintf("%d", po.ShardSize))
q.Set("user-allocations", strings.Join(PeersToStrings(po.UserAllocations), ",")) q.Set("user-allocations", strings.Join(PeersToStrings(po.UserAllocations), ","))
if !po.ExpireAt.IsZero() {
v, err := po.ExpireAt.MarshalText()
if err != nil {
return "", err
}
q.Set("expire-at", string(v))
}
for k, v := range po.Metadata { for k, v := range po.Metadata {
if k == "" { if k == "" {
continue continue
@ -539,7 +553,7 @@ func (po *PinOptions) ToQuery() string {
if po.PinUpdate != cid.Undef { if po.PinUpdate != cid.Undef {
q.Set("pin-update", po.PinUpdate.String()) q.Set("pin-update", po.PinUpdate.String())
} }
return q.Encode() return q.Encode(), nil
} }
// FromQuery is the inverse of ToQuery(). // FromQuery is the inverse of ToQuery().
@ -573,6 +587,30 @@ func (po *PinOptions) FromQuery(q url.Values) error {
po.UserAllocations = StringsToPeers(strings.Split(allocs, ",")) po.UserAllocations = StringsToPeers(strings.Split(allocs, ","))
} }
if v := q.Get("expire-at"); v != "" {
var tm time.Time
err := tm.UnmarshalText([]byte(v))
if err != nil {
return fmt.Errorf("parameter expire-at invalid")
}
if tm.Before(time.Now()) {
logger.Warning("expiry time is before current time")
} else {
po.ExpireAt = tm
}
} else if v = q.Get("expire-in"); v != "" {
d, err := time.ParseDuration(v)
if err != nil {
return errors.New("parameter expire-in invalid")
}
if d < 1*time.Second {
logger.Warning("expire-in duration too small, less than 1 sec")
po.ExpireAt = unixZero
} else {
po.ExpireAt = time.Now().Add(d)
}
}
po.Metadata = make(map[string]string) po.Metadata = make(map[string]string)
for k := range q { for k := range q {
if !strings.HasPrefix(k, pinOptionsMetaPrefix) { if !strings.HasPrefix(k, pinOptionsMetaPrefix) {
@ -683,6 +721,12 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) {
allocs[i] = bs allocs[i] = bs
} }
var expireAtProto uint64
// Only set the protobuf field with non-zero times.
if !pin.ExpireAt.IsZero() {
expireAtProto = uint64(pin.ExpireAt.Unix())
}
opts := &pb.PinOptions{ opts := &pb.PinOptions{
ReplicationFactorMin: int32(pin.ReplicationFactorMin), ReplicationFactorMin: int32(pin.ReplicationFactorMin),
ReplicationFactorMax: int32(pin.ReplicationFactorMax), ReplicationFactorMax: int32(pin.ReplicationFactorMax),
@ -691,6 +735,7 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) {
// UserAllocations: pin.UserAllocations, // UserAllocations: pin.UserAllocations,
Metadata: pin.Metadata, Metadata: pin.Metadata,
PinUpdate: pin.PinUpdate.Bytes(), PinUpdate: pin.PinUpdate.Bytes(),
ExpireAt: expireAtProto,
} }
pbPin := &pb.Pin{ pbPin := &pb.Pin{
@ -749,8 +794,11 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error {
pin.Name = opts.GetName() pin.Name = opts.GetName()
pin.ShardSize = opts.GetShardSize() pin.ShardSize = opts.GetShardSize()
// pin.UserAllocations = opts.GetUserAllocations() // pin.UserAllocations = opts.GetUserAllocations()
t := opts.GetExpireAt()
if t > 0 {
pin.ExpireAt = time.Unix(int64(t), 0)
}
pin.Metadata = opts.GetMetadata() pin.Metadata = opts.GetMetadata()
pinUpdate, err := cid.Cast(opts.GetPinUpdate()) pinUpdate, err := cid.Cast(opts.GetPinUpdate())
if err == nil { if err == nil {
pin.PinUpdate = pinUpdate pin.PinUpdate = pinUpdate
@ -823,6 +871,11 @@ func (pin *Pin) IsRemotePin(pid peer.ID) bool {
return true return true
} }
// ExpiredAt returns whether the pin has expired at the given time.
func (pin *Pin) ExpiredAt(t time.Time) bool {
return !(pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero) || pin.ExpireAt.After(t))
}
// NodeWithMeta specifies a block of data and a set of optional metadata fields // NodeWithMeta specifies a block of data and a set of optional metadata fields
// carrying information about the encoded ipld node // carrying information about the encoded ipld node
type NodeWithMeta struct { type NodeWithMeta struct {

View File

@ -184,6 +184,7 @@ func TestPinOptionsQuery(t *testing.T) {
"QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc", "QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc",
"QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6", "QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6",
}), }),
ExpireAt: time.Now().Add(12 * time.Hour),
Metadata: map[string]string{ Metadata: map[string]string{
"hello": "bye", "hello": "bye",
"hello2": "bye2", "hello2": "bye2",
@ -210,7 +211,10 @@ func TestPinOptionsQuery(t *testing.T) {
} }
for _, tc := range testcases { for _, tc := range testcases {
queryStr := tc.ToQuery() queryStr, err := tc.ToQuery()
if err != nil {
t.Fatal("error converting to query", err)
}
q, err := url.ParseQuery(queryStr) q, err := url.ParseQuery(queryStr)
if err != nil { if err != nil {
t.Error("error parsing query", err) t.Error("error parsing query", err)

View File

@ -964,6 +964,7 @@ func (c *Cluster) StateSync(ctx context.Context) error {
} }
logger.Debug("syncing state to tracker") logger.Debug("syncing state to tracker")
timeNow := time.Now()
clusterPins, err := cState.List(ctx) clusterPins, err := cState.List(ctx)
if err != nil { if err != nil {
return err return err
@ -987,9 +988,29 @@ func (c *Cluster) StateSync(ctx context.Context) error {
} }
} }
isClosest := func(cid.Cid) bool {
return false
}
if !c.config.FollowerMode {
trustedPeers, err := c.getTrustedPeers(ctx)
if err != nil {
return nil
}
checker := distanceChecker{
local: c.id,
otherPeers: trustedPeers,
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
}
isClosest = func(c cid.Cid) bool {
return checker.isClosest(c)
}
}
// a. Untrack items which should not be tracked // a. Untrack items which should not be tracked
// b. Track items which should not be remote as local // b. Unpin items which have expired
// c. Track items which should not be local as remote // c. Track items which should not be remote as local
// d. Track items which should not be local as remote
for _, p := range trackedPins { for _, p := range trackedPins {
pCid := p.Cid pCid := p.Cid
currentPin, err := cState.Get(ctx, pCid) currentPin, err := cState.Get(ctx, pCid)
@ -1003,16 +1024,15 @@ func (c *Cluster) StateSync(ctx context.Context) error {
continue continue
} }
allocatedHere := containsPeer(currentPin.Allocations, c.id) || currentPin.ReplicationFactorMin == -1 if currentPin.ExpiredAt(timeNow) && isClosest(pCid) {
logger.Infof("Unpinning %s: pin expired at %s", pCid, currentPin.ExpireAt)
switch { if _, err := c.Unpin(ctx, pCid); err != nil {
case p.Status == api.TrackerStatusRemote && allocatedHere: logger.Error(err)
logger.Debugf("StateSync: Tracking %s locally (currently remote)", pCid)
err = c.tracker.Track(ctx, currentPin)
case p.Status == api.TrackerStatusPinned && !allocatedHere:
logger.Debugf("StateSync: Tracking %s as remote (currently local)", pCid)
err = c.tracker.Track(ctx, currentPin)
} }
continue
}
err = c.updateRemotePins(ctx, currentPin, p)
if err != nil { if err != nil {
return err return err
} }
@ -1021,6 +1041,22 @@ func (c *Cluster) StateSync(ctx context.Context) error {
return nil return nil
} }
func (c *Cluster) updateRemotePins(ctx context.Context, pin *api.Pin, p *api.PinInfo) error {
var err error
allocatedHere := pin.ReplicationFactorMin == -1 || containsPeer(pin.Allocations, c.id)
switch {
case p.Status == api.TrackerStatusRemote && allocatedHere:
logger.Debugf("StateSync: Tracking %s locally (currently remote)", p.Cid)
err = c.tracker.Track(ctx, pin)
case p.Status == api.TrackerStatusPinned && !allocatedHere:
logger.Debugf("StateSync: Tracking %s as remote (currently local)", p.Cid)
err = c.tracker.Track(ctx, pin)
}
return err
}
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers. // StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
// If an error happens, the slice will contain as much information as // If an error happens, the slice will contain as much information as
// could be fetched from other peers. // could be fetched from other peers.
@ -1619,6 +1655,25 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID {
return peers return peers
} }
// getTrustedPeers gives listed of trusted peers except the current peer.
func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
peers, err := c.consensus.Peers(ctx)
if err != nil {
return nil, err
}
trustedPeers := make([]peer.ID, 0, len(peers))
for _, p := range peers {
if p == c.id || !c.consensus.IsTrustedPeer(ctx, p) {
continue
}
trustedPeers = append(trustedPeers, p)
}
return trustedPeers, nil
}
func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) { func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid") ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
defer span.End() defer span.End()

View File

@ -359,6 +359,10 @@ content.
Value: defaultAddParams.ReplicationFactorMax, Value: defaultAddParams.ReplicationFactorMax,
Usage: "Sets the maximum replication factor for pinning this file", Usage: "Sets the maximum replication factor for pinning this file",
}, },
cli.StringFlag{
Name: "expire-in",
Usage: "Duration after which pin should be unpinned automatically",
},
cli.StringSliceFlag{ cli.StringSliceFlag{
Name: "metadata", Name: "metadata",
Usage: "Pin metadata: key=value. Can be added multiple times", Usage: "Pin metadata: key=value. Can be added multiple times",
@ -414,6 +418,12 @@ content.
p := api.DefaultAddParams() p := api.DefaultAddParams()
p.ReplicationFactorMin = c.Int("replication-min") p.ReplicationFactorMin = c.Int("replication-min")
p.ReplicationFactorMax = c.Int("replication-max") p.ReplicationFactorMax = c.Int("replication-max")
if expireIn := c.String("expire-in"); expireIn != "" {
d, err := time.ParseDuration(expireIn)
checkErr("parsing expire-in", err)
p.ExpireAt = time.Now().Add(d)
}
p.Metadata = parseMetadata(c.StringSlice("metadata")) p.Metadata = parseMetadata(c.StringSlice("metadata"))
p.Name = name p.Name = name
if c.String("allocations") != "" { if c.String("allocations") != "" {
@ -540,6 +550,10 @@ would stil be respected.
Value: "", Value: "",
Usage: "Sets a name for this pin", Usage: "Sets a name for this pin",
}, },
cli.StringFlag{
Name: "expire-in",
Usage: "Duration after which pin should be unpinned automatically",
},
cli.StringSliceFlag{ cli.StringSliceFlag{
Name: "metadata", Name: "metadata",
Usage: "Pin metadata: key=value. Can be added multiple times", Usage: "Pin metadata: key=value. Can be added multiple times",
@ -579,12 +593,19 @@ would stil be respected.
checkErr("decoding allocations", errors.New("some peer IDs could not be decoded")) checkErr("decoding allocations", errors.New("some peer IDs could not be decoded"))
} }
} }
var expireAt time.Time
if expireIn := c.String("expire-in"); expireIn != "" {
d, err := time.ParseDuration(expireIn)
checkErr("parsing expire-in", err)
expireAt = time.Now().Add(d)
}
opts := api.PinOptions{ opts := api.PinOptions{
ReplicationFactorMin: rplMin, ReplicationFactorMin: rplMin,
ReplicationFactorMax: rplMax, ReplicationFactorMax: rplMax,
Name: c.String("name"), Name: c.String("name"),
UserAllocations: userAllocs, UserAllocations: userAllocs,
ExpireAt: expireAt,
Metadata: parseMetadata(c.StringSlice("metadata")), Metadata: parseMetadata(c.StringSlice("metadata")),
} }

1
go.mod
View File

@ -67,6 +67,7 @@ require (
github.com/urfave/cli v1.22.1 github.com/urfave/cli v1.22.1
github.com/zenground0/go-dot v0.0.0-20180912213407-94a425d4984e github.com/zenground0/go-dot v0.0.0-20180912213407-94a425d4984e
go.opencensus.io v0.22.1 go.opencensus.io v0.22.1
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392
gonum.org/v1/gonum v0.0.0-20190926113837-94b2bbd8ac13 gonum.org/v1/gonum v0.0.0-20190926113837-94b2bbd8ac13
gonum.org/v1/plot v0.0.0-20190615073203-9aa86143727f gonum.org/v1/plot v0.0.0-20190615073203-9aa86143727f
) )

View File

@ -2161,3 +2161,57 @@ func TestClustersFollowerMode(t *testing.T) {
} }
}) })
} }
func TestClusterPinsWithExpiration(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
ttlDelay()
cl := clusters[rand.Intn(nClusters)] // choose a random cluster peer to query
c := test.Cid1
expireIn := 1 * time.Second
opts := api.PinOptions{
ExpireAt: time.Now().Add(expireIn),
}
_, err := cl.Pin(ctx, c, opts)
if err != nil {
t.Fatal("pin should have worked:", err)
}
pinDelay()
pins, err := cl.Pins(ctx)
if err != nil {
t.Fatal(err)
}
if len(pins) != 1 {
t.Error("pin should be part of the state")
}
// wait till expiry time
time.Sleep(expireIn)
// manually call state sync on all peers, so we don't have to wait till
// state sync interval
for _, c := range clusters {
err = c.StateSync(ctx)
if err != nil {
t.Error(err)
}
}
pinDelay()
// state sync should have unpinned expired pin
pins, err = cl.Pins(ctx)
if err != nil {
t.Fatal(err)
}
if len(pins) != 0 {
t.Error("pin should not be part of the state")
}
}

View File

@ -15,7 +15,6 @@ import (
"time" "time"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
utils "github.com/ipfs/ipfs-cluster/utils"
host "github.com/libp2p/go-libp2p-core/host" host "github.com/libp2p/go-libp2p-core/host"
net "github.com/libp2p/go-libp2p-core/network" net "github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
@ -143,7 +142,7 @@ func (pm *Manager) filteredPeerAddrs(p peer.ID) []ma.Multiaddr {
return peerDNSAddrs return peerDNSAddrs
} }
sort.Sort(utils.ByString(peerAddrs)) sort.Sort(byString(peerAddrs))
return peerAddrs return peerAddrs
} }
@ -397,3 +396,10 @@ func (ps *peerSort) Swap(i, j int) {
ps.pinfos[i] = pinfo2 ps.pinfos[i] = pinfo2
ps.pinfos[j] = pinfo1 ps.pinfos[j] = pinfo1
} }
// byString can sort multiaddresses by its string
type byString []ma.Multiaddr
func (m byString) Len() int { return len(m) }
func (m byString) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m byString) Less(i, j int) bool { return m[i].String() < m[j].String() }

View File

@ -2,7 +2,7 @@
// IPFS Cluster must satisfy. // IPFS Cluster must satisfy.
package state package state
// State represents the shared state of the cluster and it // State represents the shared state of the cluster
import ( import (
"context" "context"
"errors" "errors"
@ -17,7 +17,7 @@ import (
var ErrNotFound = errors.New("pin is not part of the pinset") var ErrNotFound = errors.New("pin is not part of the pinset")
// State is a wrapper to the Cluster shared state so that Pin objects can // State is a wrapper to the Cluster shared state so that Pin objects can
// easily read, written and queried. The state can be marshaled and // be easily read, written and queried. The state can be marshaled and
// unmarshaled. Implementation should be thread-safe. // unmarshaled. Implementation should be thread-safe.
type State interface { type State interface {
ReadOnly ReadOnly
@ -29,10 +29,9 @@ type State interface {
Marshal(io.Writer) error Marshal(io.Writer) error
// Unmarshal deserializes the state from marshaled bytes. // Unmarshal deserializes the state from marshaled bytes.
Unmarshal(io.Reader) error Unmarshal(io.Reader) error
// Commit writes any batched operations.
} }
// ReadOnly represents a the read side of a State. // ReadOnly represents the read side of a State.
type ReadOnly interface { type ReadOnly interface {
// List lists all the pins in the state. // List lists all the pins in the state.
List(context.Context) ([]*api.Pin, error) List(context.Context) ([]*api.Pin, error)

53
util.go
View File

@ -1,9 +1,12 @@
package ipfscluster package ipfscluster
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
blake2b "golang.org/x/crypto/blake2b"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
@ -93,3 +96,53 @@ func minInt(x, y int) int {
// pin.Parents.Add(c) // pin.Parents.Add(c)
// } // }
// } // }
type distance [blake2b.Size256]byte
type distanceChecker struct {
local peer.ID
otherPeers []peer.ID
cache map[peer.ID]distance
}
func (dc distanceChecker) isClosest(ci cid.Cid) bool {
ciHash := convertKey(ci.KeyString())
localPeerHash := dc.convertPeerID(dc.local)
myDistance := xor(ciHash, localPeerHash)
for _, p := range dc.otherPeers {
peerHash := dc.convertPeerID(p)
distance := xor(peerHash, ciHash)
// if myDistance is larger than for other peers...
if bytes.Compare(myDistance[:], distance[:]) > 0 {
return false
}
}
return true
}
// convertPeerID hashes a Peer ID (Multihash).
func (dc distanceChecker) convertPeerID(id peer.ID) distance {
hash, ok := dc.cache[id]
if ok {
return hash
}
hashBytes := convertKey(string(id))
dc.cache[id] = hashBytes
return hashBytes
}
// convertKey hashes a key.
func convertKey(id string) distance {
return blake2b.Sum256([]byte(id))
}
func xor(a, b distance) distance {
var c distance
for i := 0; i < len(c); i++ {
c[i] = a[i] ^ b[i]
}
return c
}

View File

@ -1,12 +0,0 @@
package utils
import (
ma "github.com/multiformats/go-multiaddr"
)
// ByString can sort multiaddresses by its string
type ByString []ma.Multiaddr
func (m ByString) Len() int { return len(m) }
func (m ByString) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByString) Less(i, j int) bool { return m[i].String() < m[j].String() }