Merge pull request #923 from ipfs/feat/pin-expiration

Pin Expiration
This commit is contained in:
Hector Sanjuan 2019-11-05 12:27:02 +01:00 committed by GitHub
commit e06118277e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 363 additions and 77 deletions

View File

@ -172,9 +172,15 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
}
// ToQueryString returns a url query string (key=value&key2=value2&...)
func (p *AddParams) ToQueryString() string {
pinOptsQuery := p.PinOptions.ToQuery()
query, _ := url.ParseQuery(pinOptsQuery)
func (p *AddParams) ToQueryString() (string, error) {
pinOptsQuery, err := p.PinOptions.ToQuery()
if err != nil {
return "", err
}
query, err := url.ParseQuery(pinOptsQuery)
if err != nil {
return "", err
}
query.Set("shard", fmt.Sprintf("%t", p.Shard))
query.Set("local", fmt.Sprintf("%t", p.Local))
query.Set("recursive", fmt.Sprintf("%t", p.Recursive))
@ -188,7 +194,7 @@ func (p *AddParams) ToQueryString() string {
query.Set("hash", p.HashFun)
query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels))
query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy))
return query.Encode()
return query.Encode(), nil
}
// Equals checks if p equals p2.

View File

@ -35,11 +35,13 @@ func TestAddParams_ToQueryString(t *testing.T) {
p.Name = "something"
p.RawLeaves = true
p.ShardSize = 1020
qstr := p.ToQueryString()
qstr, err := p.ToQueryString()
if err != nil {
t.Fatal(err)
}
q, err := url.ParseQuery(qstr)
if err != nil {
t.Fatal()
t.Fatal(err)
}
p2, err := AddParamsFromQuery(q)

View File

@ -140,6 +140,7 @@ type PinOptions struct {
ShardSize uint64 `protobuf:"varint,4,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"`
Metadata map[string]string `protobuf:"bytes,6,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
PinUpdate []byte `protobuf:"bytes,7,opt,name=PinUpdate,proto3" json:"PinUpdate,omitempty"`
ExpireAt uint64 `protobuf:"varint,8,opt,name=ExpireAt,proto3" json:"ExpireAt,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -212,6 +213,13 @@ func (m *PinOptions) GetPinUpdate() []byte {
return nil
}
func (m *PinOptions) GetExpireAt() uint64 {
if m != nil {
return m.ExpireAt
}
return 0
}
func init() {
proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value)
proto.RegisterType((*Pin)(nil), "api.pb.Pin")
@ -222,30 +230,31 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{
// 391 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcd, 0x8e, 0xd3, 0x30,
0x10, 0xc6, 0x49, 0x9a, 0x34, 0x93, 0xee, 0xaa, 0x3b, 0xec, 0xc1, 0x5a, 0x71, 0xb0, 0x7a, 0x21,
0x07, 0x94, 0x43, 0xb8, 0x20, 0xe0, 0xb2, 0x6c, 0x01, 0x09, 0xa9, 0x50, 0x79, 0xe9, 0x03, 0xb8,
0x8d, 0x51, 0x2d, 0x42, 0x62, 0xa5, 0x2e, 0x6a, 0x78, 0x1b, 0x1e, 0x86, 0xf7, 0x42, 0xb6, 0xfb,
0x87, 0xe8, 0x21, 0xd2, 0x7c, 0xdf, 0xcc, 0xe7, 0x99, 0xf9, 0x32, 0x90, 0x99, 0x5e, 0xcb, 0x4d,
0xa1, 0xbb, 0xd6, 0xb4, 0x18, 0x0b, 0xad, 0x0a, 0xbd, 0x9c, 0xfc, 0x0e, 0x20, 0x9c, 0xab, 0x06,
0xc7, 0x10, 0x3e, 0xa8, 0x8a, 0x12, 0x46, 0xf2, 0x11, 0xb7, 0x21, 0x3e, 0x87, 0xe8, 0x6b, 0xaf,
0x25, 0x0d, 0x18, 0xc9, 0xaf, 0xcb, 0xa7, 0x85, 0x17, 0x14, 0x73, 0xd5, 0xd8, 0xcf, 0xa6, 0xb8,
0x2b, 0x40, 0x06, 0xd9, 0x7d, 0x5d, 0xb7, 0x2b, 0x61, 0x54, 0xdb, 0x6c, 0x68, 0xc8, 0xc2, 0x7c,
0xc4, 0xcf, 0x29, 0xbc, 0x83, 0xe1, 0x4c, 0xec, 0xa6, 0x52, 0x9b, 0x35, 0x8d, 0x18, 0xc9, 0x6f,
0xf8, 0x11, 0xe3, 0x33, 0x48, 0xb9, 0xfc, 0x26, 0x3b, 0xd9, 0xac, 0x24, 0x1d, 0xb8, 0xf6, 0x27,
0x02, 0x5f, 0x40, 0xf2, 0x45, 0xfb, 0x77, 0x63, 0x46, 0xf2, 0xac, 0xc4, 0xb3, 0x39, 0xf6, 0x19,
0x7e, 0x28, 0x99, 0x2c, 0x20, 0xd9, 0x8f, 0x86, 0x19, 0x24, 0xef, 0x44, 0x65, 0xc3, 0xf1, 0x13,
0x1c, 0xc1, 0x70, 0x2a, 0x8c, 0x70, 0x88, 0x58, 0x34, 0x93, 0x7b, 0x14, 0x20, 0xc2, 0xf5, 0x43,
0xbd, 0xdd, 0x18, 0xd9, 0x4d, 0xef, 0x3f, 0x3a, 0x2e, 0xc4, 0x2b, 0x48, 0x1f, 0xd7, 0xa2, 0xf3,
0xf2, 0x68, 0xf2, 0x27, 0x00, 0x38, 0xb5, 0xc3, 0x12, 0x6e, 0xb9, 0xd4, 0xb5, 0xf2, 0xdb, 0x7d,
0x10, 0x2b, 0xd3, 0x76, 0x33, 0xd5, 0x38, 0xef, 0x6e, 0xf8, 0xc5, 0xdc, 0x65, 0x8d, 0xd8, 0x39,
0x73, 0x2f, 0x6a, 0xc4, 0x0e, 0x11, 0xa2, 0xcf, 0xe2, 0x87, 0xa4, 0x21, 0x23, 0x79, 0xca, 0x5d,
0x6c, 0xdd, 0x72, 0x93, 0x3d, 0xaa, 0x5f, 0xd2, 0x59, 0x19, 0xf1, 0x13, 0x81, 0x6f, 0xfd, 0x66,
0x95, 0x30, 0x82, 0xc6, 0x2c, 0xcc, 0xb3, 0x92, 0xfd, 0x6f, 0x57, 0x71, 0x28, 0x79, 0xdf, 0x98,
0xae, 0xe7, 0x47, 0x85, 0x7d, 0x7b, 0xae, 0x9a, 0x85, 0xae, 0x84, 0x91, 0x34, 0xf1, 0x7f, 0xe2,
0x48, 0xdc, 0xbd, 0x81, 0xab, 0x7f, 0x84, 0xf6, 0x62, 0xbe, 0xcb, 0xde, 0x6d, 0x9d, 0x72, 0x1b,
0xe2, 0x2d, 0x0c, 0x7e, 0x8a, 0x7a, 0xeb, 0x4f, 0x26, 0xe5, 0x1e, 0xbc, 0x0e, 0x5e, 0x91, 0x4f,
0xd1, 0x70, 0x30, 0x8e, 0x97, 0xb1, 0x3b, 0xbd, 0x97, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfe,
0x90, 0x29, 0x9a, 0x89, 0x02, 0x00, 0x00,
// 405 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0x65, 0x6d, 0xc7, 0xb1, 0xc7, 0x69, 0x95, 0x0e, 0x3d, 0xac, 0x2a, 0x0e, 0xab, 0x5c, 0xf0,
0x01, 0xf9, 0x60, 0x2e, 0x08, 0xb8, 0x84, 0xa6, 0x20, 0x21, 0x05, 0xa2, 0x2d, 0xfd, 0x80, 0x6d,
0xbc, 0xa8, 0x2b, 0x8c, 0xbd, 0x72, 0xb6, 0xc8, 0xe6, 0x6f, 0xf8, 0x34, 0xfe, 0x04, 0xed, 0x6e,
0xea, 0x14, 0x35, 0x07, 0x4b, 0xf3, 0xde, 0xcc, 0x9b, 0x19, 0xbf, 0x1d, 0xc8, 0xcc, 0xa0, 0xe5,
0xae, 0xd0, 0x5d, 0x6b, 0x5a, 0x8c, 0x85, 0x56, 0x85, 0xbe, 0x5d, 0xfc, 0x09, 0x20, 0xdc, 0xa8,
0x06, 0xe7, 0x10, 0x5e, 0xaa, 0x8a, 0x12, 0x46, 0xf2, 0x19, 0xb7, 0x21, 0xbe, 0x84, 0xe8, 0xdb,
0xa0, 0x25, 0x0d, 0x18, 0xc9, 0x4f, 0xcb, 0xe7, 0x85, 0x17, 0x14, 0x1b, 0xd5, 0xd8, 0xcf, 0xa6,
0xb8, 0x2b, 0x40, 0x06, 0xd9, 0xb2, 0xae, 0xdb, 0xad, 0x30, 0xaa, 0x6d, 0x76, 0x34, 0x64, 0x61,
0x3e, 0xe3, 0x8f, 0x29, 0xbc, 0x80, 0x64, 0x2d, 0xfa, 0x95, 0xd4, 0xe6, 0x8e, 0x46, 0x8c, 0xe4,
0x67, 0x7c, 0xc4, 0xf8, 0x02, 0x52, 0x2e, 0xbf, 0xcb, 0x4e, 0x36, 0x5b, 0x49, 0x27, 0x6e, 0xfc,
0x81, 0xc0, 0x57, 0x30, 0xfd, 0xaa, 0x7d, 0xdf, 0x98, 0x91, 0x3c, 0x2b, 0xf1, 0xd1, 0x1e, 0xfb,
0x0c, 0x7f, 0x28, 0x59, 0xdc, 0xc0, 0x74, 0xbf, 0x1a, 0x66, 0x30, 0xfd, 0x20, 0x2a, 0x1b, 0xce,
0x9f, 0xe1, 0x0c, 0x92, 0x95, 0x30, 0xc2, 0x21, 0x62, 0xd1, 0x5a, 0xee, 0x51, 0x80, 0x08, 0xa7,
0x97, 0xf5, 0xfd, 0xce, 0xc8, 0x6e, 0xb5, 0xfc, 0xe4, 0xb8, 0x10, 0x4f, 0x20, 0xbd, 0xbe, 0x13,
0x9d, 0x97, 0x47, 0x8b, 0xbf, 0x01, 0xc0, 0x61, 0x1c, 0x96, 0x70, 0xce, 0xa5, 0xae, 0x95, 0xff,
0xbb, 0x8f, 0x62, 0x6b, 0xda, 0x6e, 0xad, 0x1a, 0xe7, 0xdd, 0x19, 0x3f, 0x9a, 0x3b, 0xae, 0x11,
0xbd, 0x33, 0xf7, 0xa8, 0x46, 0xf4, 0x88, 0x10, 0x7d, 0x11, 0x3f, 0x25, 0x0d, 0x19, 0xc9, 0x53,
0xee, 0x62, 0xeb, 0x96, 0xdb, 0xec, 0x5a, 0xfd, 0x96, 0xce, 0xca, 0x88, 0x1f, 0x08, 0x7c, 0xef,
0xff, 0xac, 0x12, 0x46, 0xd0, 0x98, 0x85, 0x79, 0x56, 0xb2, 0xa7, 0x76, 0x15, 0x0f, 0x25, 0x57,
0x8d, 0xe9, 0x06, 0x3e, 0x2a, 0x6c, 0xef, 0x8d, 0x6a, 0x6e, 0x74, 0x25, 0x8c, 0xa4, 0x53, 0xff,
0x12, 0x23, 0x61, 0xdf, 0xf0, 0xaa, 0xd7, 0xaa, 0x93, 0x4b, 0x43, 0x13, 0x37, 0x78, 0xc4, 0x17,
0xef, 0xe0, 0xe4, 0xbf, 0xa6, 0xf6, 0x9a, 0x7e, 0xc8, 0xc1, 0x39, 0x92, 0x72, 0x1b, 0xe2, 0x39,
0x4c, 0x7e, 0x89, 0xfa, 0xde, 0x9f, 0x53, 0xca, 0x3d, 0x78, 0x1b, 0xbc, 0x21, 0x9f, 0xa3, 0x64,
0x32, 0x8f, 0x6f, 0x63, 0x77, 0x96, 0xaf, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0xaf, 0x59, 0x74,
0x86, 0xa5, 0x02, 0x00, 0x00,
}

View File

@ -25,4 +25,5 @@ message PinOptions {
reserved 5; // reserved for UserAllocations
map<string, string> Metadata = 6;
bytes PinUpdate = 7;
uint64 ExpireAt = 8;
}

View File

@ -78,14 +78,18 @@ func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions
ctx, span := trace.StartSpan(ctx, "client/Pin")
defer span.End()
query, err := opts.ToQuery()
if err != nil {
return nil, err
}
var pin api.Pin
err := c.do(
err = c.do(
ctx,
"POST",
fmt.Sprintf(
"/pins/%s?%s",
ci.String(),
opts.ToQuery(),
query,
),
nil,
nil,
@ -119,14 +123,17 @@ func (c *defaultClient) PinPath(ctx context.Context, path string, opts api.PinOp
if err != nil {
return nil, err
}
query, err := opts.ToQuery()
if err != nil {
return nil, err
}
err = c.do(
ctx,
"POST",
fmt.Sprintf(
"/pins%s?%s",
ipfspath.String(),
opts.ToQuery(),
query,
),
nil,
nil,
@ -572,7 +579,10 @@ func (c *defaultClient) AddMultiFile(
// This method must run with StreamChannels set.
params.StreamChannels = true
queryStr := params.ToQueryString()
queryStr, err := params.ToQueryString()
if err != nil {
return err
}
// our handler decodes an AddedOutput and puts it
// in the out channel.
@ -589,7 +599,7 @@ func (c *defaultClient) AddMultiFile(
return nil
}
err := c.doStream(ctx,
err = c.doStream(ctx,
"POST",
"/add?"+queryStr,
headers,

View File

@ -601,8 +601,12 @@ type pathCase struct {
expectedCid string
}
func (p *pathCase) WithQuery() string {
return p.path + "?" + p.opts.ToQuery()
func (p *pathCase) WithQuery(t *testing.T) string {
query, err := p.opts.ToQuery()
if err != nil {
t.Fatal(err)
}
return p.path + "?" + query
}
var testPinOpts = api.PinOptions{
@ -610,6 +614,7 @@ var testPinOpts = api.PinOptions{
ReplicationFactorMin: 6,
Name: "hello there",
UserAllocations: []peer.ID{test.PeerID1, test.PeerID2},
ExpireAt: time.Now().Add(30 * time.Second),
}
var pathTestCases = []pathCase{
@ -660,7 +665,8 @@ func TestAPIPinEndpointWithPath(t *testing.T) {
if testCase.wantErr {
errResp := api.Error{}
makePost(t, rest, url(rest)+"/pins"+testCase.WithQuery(), []byte{}, &errResp)
q := testCase.WithQuery(t)
makePost(t, rest, url(rest)+"/pins"+q, []byte{}, &errResp)
if errResp.Code != testCase.code {
t.Errorf(
"status code: expected: %d, got: %d, path: %s\n",
@ -672,7 +678,8 @@ func TestAPIPinEndpointWithPath(t *testing.T) {
continue
}
pin := api.Pin{}
makePost(t, rest, url(rest)+"/pins"+testCase.WithQuery(), []byte{}, &pin)
q := testCase.WithQuery(t)
makePost(t, rest, url(rest)+"/pins"+q, []byte{}, &pin)
if !pin.Equals(resultantPin) {
t.Errorf("pin: expected: %+v", resultantPin)
t.Errorf("pin: got: %+v", pin)

View File

@ -10,7 +10,6 @@ package api
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"sort"
@ -32,10 +31,13 @@ import (
_ "github.com/multiformats/go-multiaddr-dns"
proto "github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
var logger = logging.Logger("apitypes")
var unixZero = time.Unix(0, 0)
func init() {
// Use /p2p/ multiaddresses
multiaddr.SwapToP2pMultiaddrs()
@ -464,6 +466,7 @@ type PinOptions struct {
Name string `json:"name" codec:"n,omitempty"`
ShardSize uint64 `json:"shard_size" codec:"s,omitempty"`
UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"`
ExpireAt time.Time `json:"expire_at" codec:"e,omitempty"`
Metadata map[string]string `json:"metadata" codec:"m,omitempty"`
PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"`
}
@ -510,6 +513,10 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
return false
}
if !po.ExpireAt.Equal(po2.ExpireAt) {
return false
}
for k, v := range po.Metadata {
v2 := po2.Metadata[k]
if k != "" && v != v2 {
@ -523,13 +530,20 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
}
// ToQuery returns the PinOption as query arguments.
func (po *PinOptions) ToQuery() string {
func (po *PinOptions) ToQuery() (string, error) {
q := url.Values{}
q.Set("replication-min", fmt.Sprintf("%d", po.ReplicationFactorMin))
q.Set("replication-max", fmt.Sprintf("%d", po.ReplicationFactorMax))
q.Set("name", po.Name)
q.Set("shard-size", fmt.Sprintf("%d", po.ShardSize))
q.Set("user-allocations", strings.Join(PeersToStrings(po.UserAllocations), ","))
if !po.ExpireAt.IsZero() {
v, err := po.ExpireAt.MarshalText()
if err != nil {
return "", err
}
q.Set("expire-at", string(v))
}
for k, v := range po.Metadata {
if k == "" {
continue
@ -539,7 +553,7 @@ func (po *PinOptions) ToQuery() string {
if po.PinUpdate != cid.Undef {
q.Set("pin-update", po.PinUpdate.String())
}
return q.Encode()
return q.Encode(), nil
}
// FromQuery is the inverse of ToQuery().
@ -573,6 +587,24 @@ func (po *PinOptions) FromQuery(q url.Values) error {
po.UserAllocations = StringsToPeers(strings.Split(allocs, ","))
}
if v := q.Get("expire-at"); v != "" {
var tm time.Time
err := tm.UnmarshalText([]byte(v))
if err != nil {
return errors.Wrap(err, "expire-at cannot be parsed")
}
po.ExpireAt = tm
} else if v = q.Get("expire-in"); v != "" {
d, err := time.ParseDuration(v)
if err != nil {
return errors.Wrap(err, "expire-in cannot be parsed")
}
if d < time.Second {
return errors.New("expire-in duration too short")
}
po.ExpireAt = time.Now().Add(d)
}
po.Metadata = make(map[string]string)
for k := range q {
if !strings.HasPrefix(k, pinOptionsMetaPrefix) {
@ -683,6 +715,12 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) {
allocs[i] = bs
}
var expireAtProto uint64
// Only set the protobuf field with non-zero times.
if !(pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero)) {
expireAtProto = uint64(pin.ExpireAt.Unix())
}
opts := &pb.PinOptions{
ReplicationFactorMin: int32(pin.ReplicationFactorMin),
ReplicationFactorMax: int32(pin.ReplicationFactorMax),
@ -691,6 +729,7 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) {
// UserAllocations: pin.UserAllocations,
Metadata: pin.Metadata,
PinUpdate: pin.PinUpdate.Bytes(),
ExpireAt: expireAtProto,
}
pbPin := &pb.Pin{
@ -749,8 +788,11 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error {
pin.Name = opts.GetName()
pin.ShardSize = opts.GetShardSize()
// pin.UserAllocations = opts.GetUserAllocations()
t := opts.GetExpireAt()
if t > 0 {
pin.ExpireAt = time.Unix(int64(t), 0)
}
pin.Metadata = opts.GetMetadata()
pinUpdate, err := cid.Cast(opts.GetPinUpdate())
if err == nil {
pin.PinUpdate = pinUpdate
@ -823,6 +865,15 @@ func (pin *Pin) IsRemotePin(pid peer.ID) bool {
return true
}
// ExpiredAt returns whether the pin has expired at the given time.
func (pin *Pin) ExpiredAt(t time.Time) bool {
if pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero) {
return false
}
return pin.ExpireAt.Before(t)
}
// NodeWithMeta specifies a block of data and a set of optional metadata fields
// carrying information about the encoded ipld node
type NodeWithMeta struct {

View File

@ -184,6 +184,7 @@ func TestPinOptionsQuery(t *testing.T) {
"QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc",
"QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6",
}),
ExpireAt: time.Now().Add(12 * time.Hour),
Metadata: map[string]string{
"hello": "bye",
"hello2": "bye2",
@ -210,7 +211,10 @@ func TestPinOptionsQuery(t *testing.T) {
}
for _, tc := range testcases {
queryStr := tc.ToQuery()
queryStr, err := tc.ToQuery()
if err != nil {
t.Fatal("error converting to query", err)
}
q, err := url.ParseQuery(queryStr)
if err != nil {
t.Error("error parsing query", err)

View File

@ -964,6 +964,7 @@ func (c *Cluster) StateSync(ctx context.Context) error {
}
logger.Debug("syncing state to tracker")
timeNow := time.Now()
clusterPins, err := cState.List(ctx)
if err != nil {
return err
@ -987,9 +988,29 @@ func (c *Cluster) StateSync(ctx context.Context) error {
}
}
isClosest := func(cid.Cid) bool {
return false
}
if !c.config.FollowerMode {
trustedPeers, err := c.getTrustedPeers(ctx)
if err != nil {
return nil
}
checker := distanceChecker{
local: c.id,
otherPeers: trustedPeers,
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
}
isClosest = func(c cid.Cid) bool {
return checker.isClosest(c)
}
}
// a. Untrack items which should not be tracked
// b. Track items which should not be remote as local
// c. Track items which should not be local as remote
// b. Unpin items which have expired
// c. Track items which should not be remote as local
// d. Track items which should not be local as remote
for _, p := range trackedPins {
pCid := p.Cid
currentPin, err := cState.Get(ctx, pCid)
@ -1003,16 +1024,15 @@ func (c *Cluster) StateSync(ctx context.Context) error {
continue
}
allocatedHere := containsPeer(currentPin.Allocations, c.id) || currentPin.ReplicationFactorMin == -1
switch {
case p.Status == api.TrackerStatusRemote && allocatedHere:
logger.Debugf("StateSync: Tracking %s locally (currently remote)", pCid)
err = c.tracker.Track(ctx, currentPin)
case p.Status == api.TrackerStatusPinned && !allocatedHere:
logger.Debugf("StateSync: Tracking %s as remote (currently local)", pCid)
err = c.tracker.Track(ctx, currentPin)
if currentPin.ExpiredAt(timeNow) && isClosest(pCid) {
logger.Infof("Unpinning %s: pin expired at %s", pCid, currentPin.ExpireAt)
if _, err := c.Unpin(ctx, pCid); err != nil {
logger.Error(err)
}
continue
}
err = c.updateRemotePins(ctx, currentPin, p)
if err != nil {
return err
}
@ -1021,6 +1041,22 @@ func (c *Cluster) StateSync(ctx context.Context) error {
return nil
}
func (c *Cluster) updateRemotePins(ctx context.Context, pin *api.Pin, p *api.PinInfo) error {
var err error
allocatedHere := pin.ReplicationFactorMin == -1 || containsPeer(pin.Allocations, c.id)
switch {
case p.Status == api.TrackerStatusRemote && allocatedHere:
logger.Debugf("StateSync: Tracking %s locally (currently remote)", p.Cid)
err = c.tracker.Track(ctx, pin)
case p.Status == api.TrackerStatusPinned && !allocatedHere:
logger.Debugf("StateSync: Tracking %s as remote (currently local)", p.Cid)
err = c.tracker.Track(ctx, pin)
}
return err
}
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
// If an error happens, the slice will contain as much information as
// could be fetched from other peers.
@ -1330,6 +1366,10 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error {
return err
}
if !pin.ExpireAt.IsZero() && pin.ExpireAt.Before(time.Now()) {
return errors.New("pin.ExpireAt set before current time")
}
existing, err := c.PinGet(ctx, pin.Cid)
if err != nil && err != state.ErrNotFound {
return err
@ -1619,6 +1659,25 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID {
return peers
}
// getTrustedPeers gives listed of trusted peers except the current peer.
func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
peers, err := c.consensus.Peers(ctx)
if err != nil {
return nil, err
}
trustedPeers := make([]peer.ID, 0, len(peers))
for _, p := range peers {
if p == c.id || !c.consensus.IsTrustedPeer(ctx, p) {
continue
}
trustedPeers = append(trustedPeers, p)
}
return trustedPeers, nil
}
func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
defer span.End()

View File

@ -300,6 +300,21 @@ func TestClusterPin(t *testing.T) {
}
}
func TestPinExpired(t *testing.T) {
ctx := context.Background()
cl, _, _, _ := testingCluster(t)
defer cleanState()
defer cl.Shutdown(ctx)
c := test.Cid1
_, err := cl.Pin(ctx, c, api.PinOptions{
ExpireAt: time.Now(),
})
if err == nil {
t.Fatal("pin should have errored")
}
}
func TestClusterPinPath(t *testing.T) {
ctx := context.Background()
cl, _, _, _ := testingCluster(t)

View File

@ -359,6 +359,10 @@ content.
Value: defaultAddParams.ReplicationFactorMax,
Usage: "Sets the maximum replication factor for pinning this file",
},
cli.StringFlag{
Name: "expire-in",
Usage: "Duration after which the pin should be unpinned automatically",
},
cli.StringSliceFlag{
Name: "metadata",
Usage: "Pin metadata: key=value. Can be added multiple times",
@ -414,6 +418,12 @@ content.
p := api.DefaultAddParams()
p.ReplicationFactorMin = c.Int("replication-min")
p.ReplicationFactorMax = c.Int("replication-max")
if expireIn := c.String("expire-in"); expireIn != "" {
d, err := time.ParseDuration(expireIn)
checkErr("parsing expire-in", err)
p.ExpireAt = time.Now().Add(d)
}
p.Metadata = parseMetadata(c.StringSlice("metadata"))
p.Name = name
if c.String("allocations") != "" {
@ -540,6 +550,10 @@ would stil be respected.
Value: "",
Usage: "Sets a name for this pin",
},
cli.StringFlag{
Name: "expire-in",
Usage: "Duration after which pin should be unpinned automatically",
},
cli.StringSliceFlag{
Name: "metadata",
Usage: "Pin metadata: key=value. Can be added multiple times",
@ -579,12 +593,19 @@ would stil be respected.
checkErr("decoding allocations", errors.New("some peer IDs could not be decoded"))
}
}
var expireAt time.Time
if expireIn := c.String("expire-in"); expireIn != "" {
d, err := time.ParseDuration(expireIn)
checkErr("parsing expire-in", err)
expireAt = time.Now().Add(d)
}
opts := api.PinOptions{
ReplicationFactorMin: rplMin,
ReplicationFactorMax: rplMax,
Name: c.String("name"),
UserAllocations: userAllocs,
ExpireAt: expireAt,
Metadata: parseMetadata(c.StringSlice("metadata")),
}

1
go.mod
View File

@ -67,6 +67,7 @@ require (
github.com/urfave/cli v1.22.1
github.com/zenground0/go-dot v0.0.0-20180912213407-94a425d4984e
go.opencensus.io v0.22.1
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392
gonum.org/v1/gonum v0.0.0-20190926113837-94b2bbd8ac13
gonum.org/v1/plot v0.0.0-20190615073203-9aa86143727f
)

View File

@ -2161,3 +2161,57 @@ func TestClustersFollowerMode(t *testing.T) {
}
})
}
func TestClusterPinsWithExpiration(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
ttlDelay()
cl := clusters[rand.Intn(nClusters)] // choose a random cluster peer to query
c := test.Cid1
expireIn := 1 * time.Second
opts := api.PinOptions{
ExpireAt: time.Now().Add(expireIn),
}
_, err := cl.Pin(ctx, c, opts)
if err != nil {
t.Fatal("pin should have worked:", err)
}
pinDelay()
pins, err := cl.Pins(ctx)
if err != nil {
t.Fatal(err)
}
if len(pins) != 1 {
t.Error("pin should be part of the state")
}
// wait till expiry time
time.Sleep(expireIn)
// manually call state sync on all peers, so we don't have to wait till
// state sync interval
for _, c := range clusters {
err = c.StateSync(ctx)
if err != nil {
t.Error(err)
}
}
pinDelay()
// state sync should have unpinned expired pin
pins, err = cl.Pins(ctx)
if err != nil {
t.Fatal(err)
}
if len(pins) != 0 {
t.Error("pin should not be part of the state")
}
}

View File

@ -15,7 +15,6 @@ import (
"time"
logging "github.com/ipfs/go-log"
utils "github.com/ipfs/ipfs-cluster/utils"
host "github.com/libp2p/go-libp2p-core/host"
net "github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"
@ -143,7 +142,7 @@ func (pm *Manager) filteredPeerAddrs(p peer.ID) []ma.Multiaddr {
return peerDNSAddrs
}
sort.Sort(utils.ByString(peerAddrs))
sort.Sort(byString(peerAddrs))
return peerAddrs
}
@ -397,3 +396,10 @@ func (ps *peerSort) Swap(i, j int) {
ps.pinfos[i] = pinfo2
ps.pinfos[j] = pinfo1
}
// byString can sort multiaddresses by its string
type byString []ma.Multiaddr
func (m byString) Len() int { return len(m) }
func (m byString) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m byString) Less(i, j int) bool { return m[i].String() < m[j].String() }

View File

@ -2,7 +2,7 @@
// IPFS Cluster must satisfy.
package state
// State represents the shared state of the cluster and it
// State represents the shared state of the cluster
import (
"context"
"errors"
@ -17,7 +17,7 @@ import (
var ErrNotFound = errors.New("pin is not part of the pinset")
// State is a wrapper to the Cluster shared state so that Pin objects can
// easily read, written and queried. The state can be marshaled and
// be easily read, written and queried. The state can be marshaled and
// unmarshaled. Implementation should be thread-safe.
type State interface {
ReadOnly
@ -29,10 +29,9 @@ type State interface {
Marshal(io.Writer) error
// Unmarshal deserializes the state from marshaled bytes.
Unmarshal(io.Reader) error
// Commit writes any batched operations.
}
// ReadOnly represents a the read side of a State.
// ReadOnly represents the read side of a State.
type ReadOnly interface {
// List lists all the pins in the state.
List(context.Context) ([]*api.Pin, error)

53
util.go
View File

@ -1,9 +1,12 @@
package ipfscluster
import (
"bytes"
"errors"
"fmt"
blake2b "golang.org/x/crypto/blake2b"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
@ -93,3 +96,53 @@ func minInt(x, y int) int {
// pin.Parents.Add(c)
// }
// }
type distance [blake2b.Size256]byte
type distanceChecker struct {
local peer.ID
otherPeers []peer.ID
cache map[peer.ID]distance
}
func (dc distanceChecker) isClosest(ci cid.Cid) bool {
ciHash := convertKey(ci.KeyString())
localPeerHash := dc.convertPeerID(dc.local)
myDistance := xor(ciHash, localPeerHash)
for _, p := range dc.otherPeers {
peerHash := dc.convertPeerID(p)
distance := xor(peerHash, ciHash)
// if myDistance is larger than for other peers...
if bytes.Compare(myDistance[:], distance[:]) > 0 {
return false
}
}
return true
}
// convertPeerID hashes a Peer ID (Multihash).
func (dc distanceChecker) convertPeerID(id peer.ID) distance {
hash, ok := dc.cache[id]
if ok {
return hash
}
hashBytes := convertKey(string(id))
dc.cache[id] = hashBytes
return hashBytes
}
// convertKey hashes a key.
func convertKey(id string) distance {
return blake2b.Sum256([]byte(id))
}
func xor(a, b distance) distance {
var c distance
for i := 0; i < len(c); i++ {
c[i] = a[i] ^ b[i]
}
return c
}

View File

@ -1,12 +0,0 @@
package utils
import (
ma "github.com/multiformats/go-multiaddr"
)
// ByString can sort multiaddresses by its string
type ByString []ma.Multiaddr
func (m ByString) Len() int { return len(m) }
func (m ByString) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByString) Less(i, j int) bool { return m[i].String() < m[j].String() }