diff --git a/api/add.go b/api/add.go index 26e361e5..298d1f56 100644 --- a/api/add.go +++ b/api/add.go @@ -172,9 +172,15 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { } // ToQueryString returns a url query string (key=value&key2=value2&...) -func (p *AddParams) ToQueryString() string { - pinOptsQuery := p.PinOptions.ToQuery() - query, _ := url.ParseQuery(pinOptsQuery) +func (p *AddParams) ToQueryString() (string, error) { + pinOptsQuery, err := p.PinOptions.ToQuery() + if err != nil { + return "", err + } + query, err := url.ParseQuery(pinOptsQuery) + if err != nil { + return "", err + } query.Set("shard", fmt.Sprintf("%t", p.Shard)) query.Set("local", fmt.Sprintf("%t", p.Local)) query.Set("recursive", fmt.Sprintf("%t", p.Recursive)) @@ -188,7 +194,7 @@ func (p *AddParams) ToQueryString() string { query.Set("hash", p.HashFun) query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels)) query.Set("nocopy", fmt.Sprintf("%t", p.NoCopy)) - return query.Encode() + return query.Encode(), nil } // Equals checks if p equals p2. diff --git a/api/add_test.go b/api/add_test.go index e8d38ee8..d302d787 100644 --- a/api/add_test.go +++ b/api/add_test.go @@ -35,11 +35,13 @@ func TestAddParams_ToQueryString(t *testing.T) { p.Name = "something" p.RawLeaves = true p.ShardSize = 1020 - qstr := p.ToQueryString() - + qstr, err := p.ToQueryString() + if err != nil { + t.Fatal(err) + } q, err := url.ParseQuery(qstr) if err != nil { - t.Fatal() + t.Fatal(err) } p2, err := AddParamsFromQuery(q) diff --git a/api/pb/types.pb.go b/api/pb/types.pb.go index 432d8654..a76a1544 100644 --- a/api/pb/types.pb.go +++ b/api/pb/types.pb.go @@ -140,6 +140,7 @@ type PinOptions struct { ShardSize uint64 `protobuf:"varint,4,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"` Metadata map[string]string `protobuf:"bytes,6,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` PinUpdate []byte `protobuf:"bytes,7,opt,name=PinUpdate,proto3" json:"PinUpdate,omitempty"` + ExpireAt uint64 `protobuf:"varint,8,opt,name=ExpireAt,proto3" json:"ExpireAt,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -212,6 +213,13 @@ func (m *PinOptions) GetPinUpdate() []byte { return nil } +func (m *PinOptions) GetExpireAt() uint64 { + if m != nil { + return m.ExpireAt + } + return 0 +} + func init() { proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value) proto.RegisterType((*Pin)(nil), "api.pb.Pin") @@ -222,30 +230,31 @@ func init() { func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } var fileDescriptor_d938547f84707355 = []byte{ - // 391 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcd, 0x8e, 0xd3, 0x30, - 0x10, 0xc6, 0x49, 0x9a, 0x34, 0x93, 0xee, 0xaa, 0x3b, 0xec, 0xc1, 0x5a, 0x71, 0xb0, 0x7a, 0x21, - 0x07, 0x94, 0x43, 0xb8, 0x20, 0xe0, 0xb2, 0x6c, 0x01, 0x09, 0xa9, 0x50, 0x79, 0xe9, 0x03, 0xb8, - 0x8d, 0x51, 0x2d, 0x42, 0x62, 0xa5, 0x2e, 0x6a, 0x78, 0x1b, 0x1e, 0x86, 0xf7, 0x42, 0xb6, 0xfb, - 0x87, 0xe8, 0x21, 0xd2, 0x7c, 0xdf, 0xcc, 0xe7, 0x99, 0xf9, 0x32, 0x90, 0x99, 0x5e, 0xcb, 0x4d, - 0xa1, 0xbb, 0xd6, 0xb4, 0x18, 0x0b, 0xad, 0x0a, 0xbd, 0x9c, 0xfc, 0x0e, 0x20, 0x9c, 0xab, 0x06, - 0xc7, 0x10, 0x3e, 0xa8, 0x8a, 0x12, 0x46, 0xf2, 0x11, 0xb7, 0x21, 0x3e, 0x87, 0xe8, 0x6b, 0xaf, - 0x25, 0x0d, 0x18, 0xc9, 0xaf, 0xcb, 0xa7, 0x85, 0x17, 0x14, 0x73, 0xd5, 0xd8, 0xcf, 0xa6, 0xb8, - 0x2b, 0x40, 0x06, 0xd9, 0x7d, 0x5d, 0xb7, 0x2b, 0x61, 0x54, 0xdb, 0x6c, 0x68, 0xc8, 0xc2, 0x7c, - 0xc4, 0xcf, 0x29, 0xbc, 0x83, 0xe1, 0x4c, 0xec, 0xa6, 0x52, 0x9b, 0x35, 0x8d, 0x18, 0xc9, 0x6f, - 0xf8, 0x11, 0xe3, 0x33, 0x48, 0xb9, 0xfc, 0x26, 0x3b, 0xd9, 0xac, 0x24, 0x1d, 0xb8, 0xf6, 0x27, - 0x02, 0x5f, 0x40, 0xf2, 0x45, 0xfb, 0x77, 0x63, 0x46, 0xf2, 0xac, 0xc4, 0xb3, 0x39, 0xf6, 0x19, - 0x7e, 0x28, 0x99, 0x2c, 0x20, 0xd9, 0x8f, 0x86, 0x19, 0x24, 0xef, 0x44, 0x65, 0xc3, 0xf1, 0x13, - 0x1c, 0xc1, 0x70, 0x2a, 0x8c, 0x70, 0x88, 0x58, 0x34, 0x93, 0x7b, 0x14, 0x20, 0xc2, 0xf5, 0x43, - 0xbd, 0xdd, 0x18, 0xd9, 0x4d, 0xef, 0x3f, 0x3a, 0x2e, 0xc4, 0x2b, 0x48, 0x1f, 0xd7, 0xa2, 0xf3, - 0xf2, 0x68, 0xf2, 0x27, 0x00, 0x38, 0xb5, 0xc3, 0x12, 0x6e, 0xb9, 0xd4, 0xb5, 0xf2, 0xdb, 0x7d, - 0x10, 0x2b, 0xd3, 0x76, 0x33, 0xd5, 0x38, 0xef, 0x6e, 0xf8, 0xc5, 0xdc, 0x65, 0x8d, 0xd8, 0x39, - 0x73, 0x2f, 0x6a, 0xc4, 0x0e, 0x11, 0xa2, 0xcf, 0xe2, 0x87, 0xa4, 0x21, 0x23, 0x79, 0xca, 0x5d, - 0x6c, 0xdd, 0x72, 0x93, 0x3d, 0xaa, 0x5f, 0xd2, 0x59, 0x19, 0xf1, 0x13, 0x81, 0x6f, 0xfd, 0x66, - 0x95, 0x30, 0x82, 0xc6, 0x2c, 0xcc, 0xb3, 0x92, 0xfd, 0x6f, 0x57, 0x71, 0x28, 0x79, 0xdf, 0x98, - 0xae, 0xe7, 0x47, 0x85, 0x7d, 0x7b, 0xae, 0x9a, 0x85, 0xae, 0x84, 0x91, 0x34, 0xf1, 0x7f, 0xe2, - 0x48, 0xdc, 0xbd, 0x81, 0xab, 0x7f, 0x84, 0xf6, 0x62, 0xbe, 0xcb, 0xde, 0x6d, 0x9d, 0x72, 0x1b, - 0xe2, 0x2d, 0x0c, 0x7e, 0x8a, 0x7a, 0xeb, 0x4f, 0x26, 0xe5, 0x1e, 0xbc, 0x0e, 0x5e, 0x91, 0x4f, - 0xd1, 0x70, 0x30, 0x8e, 0x97, 0xb1, 0x3b, 0xbd, 0x97, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfe, - 0x90, 0x29, 0x9a, 0x89, 0x02, 0x00, 0x00, + // 405 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xc1, 0x6e, 0xd3, 0x40, + 0x10, 0x65, 0x6d, 0xc7, 0xb1, 0xc7, 0x69, 0x95, 0x0e, 0x3d, 0xac, 0x2a, 0x0e, 0xab, 0x5c, 0xf0, + 0x01, 0xf9, 0x60, 0x2e, 0x08, 0xb8, 0x84, 0xa6, 0x20, 0x21, 0x05, 0xa2, 0x2d, 0xfd, 0x80, 0x6d, + 0xbc, 0xa8, 0x2b, 0x8c, 0xbd, 0x72, 0xb6, 0xc8, 0xe6, 0x6f, 0xf8, 0x34, 0xfe, 0x04, 0xed, 0x6e, + 0xea, 0x14, 0x35, 0x07, 0x4b, 0xf3, 0xde, 0xcc, 0x9b, 0x19, 0xbf, 0x1d, 0xc8, 0xcc, 0xa0, 0xe5, + 0xae, 0xd0, 0x5d, 0x6b, 0x5a, 0x8c, 0x85, 0x56, 0x85, 0xbe, 0x5d, 0xfc, 0x09, 0x20, 0xdc, 0xa8, + 0x06, 0xe7, 0x10, 0x5e, 0xaa, 0x8a, 0x12, 0x46, 0xf2, 0x19, 0xb7, 0x21, 0xbe, 0x84, 0xe8, 0xdb, + 0xa0, 0x25, 0x0d, 0x18, 0xc9, 0x4f, 0xcb, 0xe7, 0x85, 0x17, 0x14, 0x1b, 0xd5, 0xd8, 0xcf, 0xa6, + 0xb8, 0x2b, 0x40, 0x06, 0xd9, 0xb2, 0xae, 0xdb, 0xad, 0x30, 0xaa, 0x6d, 0x76, 0x34, 0x64, 0x61, + 0x3e, 0xe3, 0x8f, 0x29, 0xbc, 0x80, 0x64, 0x2d, 0xfa, 0x95, 0xd4, 0xe6, 0x8e, 0x46, 0x8c, 0xe4, + 0x67, 0x7c, 0xc4, 0xf8, 0x02, 0x52, 0x2e, 0xbf, 0xcb, 0x4e, 0x36, 0x5b, 0x49, 0x27, 0x6e, 0xfc, + 0x81, 0xc0, 0x57, 0x30, 0xfd, 0xaa, 0x7d, 0xdf, 0x98, 0x91, 0x3c, 0x2b, 0xf1, 0xd1, 0x1e, 0xfb, + 0x0c, 0x7f, 0x28, 0x59, 0xdc, 0xc0, 0x74, 0xbf, 0x1a, 0x66, 0x30, 0xfd, 0x20, 0x2a, 0x1b, 0xce, + 0x9f, 0xe1, 0x0c, 0x92, 0x95, 0x30, 0xc2, 0x21, 0x62, 0xd1, 0x5a, 0xee, 0x51, 0x80, 0x08, 0xa7, + 0x97, 0xf5, 0xfd, 0xce, 0xc8, 0x6e, 0xb5, 0xfc, 0xe4, 0xb8, 0x10, 0x4f, 0x20, 0xbd, 0xbe, 0x13, + 0x9d, 0x97, 0x47, 0x8b, 0xbf, 0x01, 0xc0, 0x61, 0x1c, 0x96, 0x70, 0xce, 0xa5, 0xae, 0x95, 0xff, + 0xbb, 0x8f, 0x62, 0x6b, 0xda, 0x6e, 0xad, 0x1a, 0xe7, 0xdd, 0x19, 0x3f, 0x9a, 0x3b, 0xae, 0x11, + 0xbd, 0x33, 0xf7, 0xa8, 0x46, 0xf4, 0x88, 0x10, 0x7d, 0x11, 0x3f, 0x25, 0x0d, 0x19, 0xc9, 0x53, + 0xee, 0x62, 0xeb, 0x96, 0xdb, 0xec, 0x5a, 0xfd, 0x96, 0xce, 0xca, 0x88, 0x1f, 0x08, 0x7c, 0xef, + 0xff, 0xac, 0x12, 0x46, 0xd0, 0x98, 0x85, 0x79, 0x56, 0xb2, 0xa7, 0x76, 0x15, 0x0f, 0x25, 0x57, + 0x8d, 0xe9, 0x06, 0x3e, 0x2a, 0x6c, 0xef, 0x8d, 0x6a, 0x6e, 0x74, 0x25, 0x8c, 0xa4, 0x53, 0xff, + 0x12, 0x23, 0x61, 0xdf, 0xf0, 0xaa, 0xd7, 0xaa, 0x93, 0x4b, 0x43, 0x13, 0x37, 0x78, 0xc4, 0x17, + 0xef, 0xe0, 0xe4, 0xbf, 0xa6, 0xf6, 0x9a, 0x7e, 0xc8, 0xc1, 0x39, 0x92, 0x72, 0x1b, 0xe2, 0x39, + 0x4c, 0x7e, 0x89, 0xfa, 0xde, 0x9f, 0x53, 0xca, 0x3d, 0x78, 0x1b, 0xbc, 0x21, 0x9f, 0xa3, 0x64, + 0x32, 0x8f, 0x6f, 0x63, 0x77, 0x96, 0xaf, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0xaf, 0x59, 0x74, + 0x86, 0xa5, 0x02, 0x00, 0x00, } diff --git a/api/pb/types.proto b/api/pb/types.proto index 70ad76fe..bc73b5d6 100644 --- a/api/pb/types.proto +++ b/api/pb/types.proto @@ -25,4 +25,5 @@ message PinOptions { reserved 5; // reserved for UserAllocations map Metadata = 6; bytes PinUpdate = 7; + uint64 ExpireAt = 8; } \ No newline at end of file diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index a435866d..59d18c51 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -78,14 +78,18 @@ func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions ctx, span := trace.StartSpan(ctx, "client/Pin") defer span.End() + query, err := opts.ToQuery() + if err != nil { + return nil, err + } var pin api.Pin - err := c.do( + err = c.do( ctx, "POST", fmt.Sprintf( "/pins/%s?%s", ci.String(), - opts.ToQuery(), + query, ), nil, nil, @@ -119,14 +123,17 @@ func (c *defaultClient) PinPath(ctx context.Context, path string, opts api.PinOp if err != nil { return nil, err } - + query, err := opts.ToQuery() + if err != nil { + return nil, err + } err = c.do( ctx, "POST", fmt.Sprintf( "/pins%s?%s", ipfspath.String(), - opts.ToQuery(), + query, ), nil, nil, @@ -572,7 +579,10 @@ func (c *defaultClient) AddMultiFile( // This method must run with StreamChannels set. params.StreamChannels = true - queryStr := params.ToQueryString() + queryStr, err := params.ToQueryString() + if err != nil { + return err + } // our handler decodes an AddedOutput and puts it // in the out channel. @@ -589,7 +599,7 @@ func (c *defaultClient) AddMultiFile( return nil } - err := c.doStream(ctx, + err = c.doStream(ctx, "POST", "/add?"+queryStr, headers, diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 745057af..605f4615 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -601,8 +601,12 @@ type pathCase struct { expectedCid string } -func (p *pathCase) WithQuery() string { - return p.path + "?" + p.opts.ToQuery() +func (p *pathCase) WithQuery(t *testing.T) string { + query, err := p.opts.ToQuery() + if err != nil { + t.Fatal(err) + } + return p.path + "?" + query } var testPinOpts = api.PinOptions{ @@ -610,6 +614,7 @@ var testPinOpts = api.PinOptions{ ReplicationFactorMin: 6, Name: "hello there", UserAllocations: []peer.ID{test.PeerID1, test.PeerID2}, + ExpireAt: time.Now().Add(30 * time.Second), } var pathTestCases = []pathCase{ @@ -660,7 +665,8 @@ func TestAPIPinEndpointWithPath(t *testing.T) { if testCase.wantErr { errResp := api.Error{} - makePost(t, rest, url(rest)+"/pins"+testCase.WithQuery(), []byte{}, &errResp) + q := testCase.WithQuery(t) + makePost(t, rest, url(rest)+"/pins"+q, []byte{}, &errResp) if errResp.Code != testCase.code { t.Errorf( "status code: expected: %d, got: %d, path: %s\n", @@ -672,7 +678,8 @@ func TestAPIPinEndpointWithPath(t *testing.T) { continue } pin := api.Pin{} - makePost(t, rest, url(rest)+"/pins"+testCase.WithQuery(), []byte{}, &pin) + q := testCase.WithQuery(t) + makePost(t, rest, url(rest)+"/pins"+q, []byte{}, &pin) if !pin.Equals(resultantPin) { t.Errorf("pin: expected: %+v", resultantPin) t.Errorf("pin: got: %+v", pin) diff --git a/api/types.go b/api/types.go index 303867f6..8ad7f383 100644 --- a/api/types.go +++ b/api/types.go @@ -10,7 +10,6 @@ package api import ( "encoding/json" - "errors" "fmt" "net/url" "sort" @@ -32,10 +31,13 @@ import ( _ "github.com/multiformats/go-multiaddr-dns" proto "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" ) var logger = logging.Logger("apitypes") +var unixZero = time.Unix(0, 0) + func init() { // Use /p2p/ multiaddresses multiaddr.SwapToP2pMultiaddrs() @@ -464,6 +466,7 @@ type PinOptions struct { Name string `json:"name" codec:"n,omitempty"` ShardSize uint64 `json:"shard_size" codec:"s,omitempty"` UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"` + ExpireAt time.Time `json:"expire_at" codec:"e,omitempty"` Metadata map[string]string `json:"metadata" codec:"m,omitempty"` PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` } @@ -510,6 +513,10 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool { return false } + if !po.ExpireAt.Equal(po2.ExpireAt) { + return false + } + for k, v := range po.Metadata { v2 := po2.Metadata[k] if k != "" && v != v2 { @@ -523,13 +530,20 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool { } // ToQuery returns the PinOption as query arguments. -func (po *PinOptions) ToQuery() string { +func (po *PinOptions) ToQuery() (string, error) { q := url.Values{} q.Set("replication-min", fmt.Sprintf("%d", po.ReplicationFactorMin)) q.Set("replication-max", fmt.Sprintf("%d", po.ReplicationFactorMax)) q.Set("name", po.Name) q.Set("shard-size", fmt.Sprintf("%d", po.ShardSize)) q.Set("user-allocations", strings.Join(PeersToStrings(po.UserAllocations), ",")) + if !po.ExpireAt.IsZero() { + v, err := po.ExpireAt.MarshalText() + if err != nil { + return "", err + } + q.Set("expire-at", string(v)) + } for k, v := range po.Metadata { if k == "" { continue @@ -539,7 +553,7 @@ func (po *PinOptions) ToQuery() string { if po.PinUpdate != cid.Undef { q.Set("pin-update", po.PinUpdate.String()) } - return q.Encode() + return q.Encode(), nil } // FromQuery is the inverse of ToQuery(). @@ -573,6 +587,24 @@ func (po *PinOptions) FromQuery(q url.Values) error { po.UserAllocations = StringsToPeers(strings.Split(allocs, ",")) } + if v := q.Get("expire-at"); v != "" { + var tm time.Time + err := tm.UnmarshalText([]byte(v)) + if err != nil { + return errors.Wrap(err, "expire-at cannot be parsed") + } + po.ExpireAt = tm + } else if v = q.Get("expire-in"); v != "" { + d, err := time.ParseDuration(v) + if err != nil { + return errors.Wrap(err, "expire-in cannot be parsed") + } + if d < time.Second { + return errors.New("expire-in duration too short") + } + po.ExpireAt = time.Now().Add(d) + } + po.Metadata = make(map[string]string) for k := range q { if !strings.HasPrefix(k, pinOptionsMetaPrefix) { @@ -683,6 +715,12 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) { allocs[i] = bs } + var expireAtProto uint64 + // Only set the protobuf field with non-zero times. + if !(pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero)) { + expireAtProto = uint64(pin.ExpireAt.Unix()) + } + opts := &pb.PinOptions{ ReplicationFactorMin: int32(pin.ReplicationFactorMin), ReplicationFactorMax: int32(pin.ReplicationFactorMax), @@ -691,6 +729,7 @@ func (pin *Pin) ProtoMarshal() ([]byte, error) { // UserAllocations: pin.UserAllocations, Metadata: pin.Metadata, PinUpdate: pin.PinUpdate.Bytes(), + ExpireAt: expireAtProto, } pbPin := &pb.Pin{ @@ -749,8 +788,11 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { pin.Name = opts.GetName() pin.ShardSize = opts.GetShardSize() // pin.UserAllocations = opts.GetUserAllocations() + t := opts.GetExpireAt() + if t > 0 { + pin.ExpireAt = time.Unix(int64(t), 0) + } pin.Metadata = opts.GetMetadata() - pinUpdate, err := cid.Cast(opts.GetPinUpdate()) if err == nil { pin.PinUpdate = pinUpdate @@ -823,6 +865,15 @@ func (pin *Pin) IsRemotePin(pid peer.ID) bool { return true } +// ExpiredAt returns whether the pin has expired at the given time. +func (pin *Pin) ExpiredAt(t time.Time) bool { + if pin.ExpireAt.IsZero() || pin.ExpireAt.Equal(unixZero) { + return false + } + + return pin.ExpireAt.Before(t) +} + // NodeWithMeta specifies a block of data and a set of optional metadata fields // carrying information about the encoded ipld node type NodeWithMeta struct { diff --git a/api/types_test.go b/api/types_test.go index 184fe386..fad8c8c9 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -184,6 +184,7 @@ func TestPinOptionsQuery(t *testing.T) { "QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc", "QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6", }), + ExpireAt: time.Now().Add(12 * time.Hour), Metadata: map[string]string{ "hello": "bye", "hello2": "bye2", @@ -210,7 +211,10 @@ func TestPinOptionsQuery(t *testing.T) { } for _, tc := range testcases { - queryStr := tc.ToQuery() + queryStr, err := tc.ToQuery() + if err != nil { + t.Fatal("error converting to query", err) + } q, err := url.ParseQuery(queryStr) if err != nil { t.Error("error parsing query", err) diff --git a/cluster.go b/cluster.go index 82cbd7fd..fc5a7c14 100644 --- a/cluster.go +++ b/cluster.go @@ -964,6 +964,7 @@ func (c *Cluster) StateSync(ctx context.Context) error { } logger.Debug("syncing state to tracker") + timeNow := time.Now() clusterPins, err := cState.List(ctx) if err != nil { return err @@ -987,9 +988,29 @@ func (c *Cluster) StateSync(ctx context.Context) error { } } + isClosest := func(cid.Cid) bool { + return false + } + + if !c.config.FollowerMode { + trustedPeers, err := c.getTrustedPeers(ctx) + if err != nil { + return nil + } + checker := distanceChecker{ + local: c.id, + otherPeers: trustedPeers, + cache: make(map[peer.ID]distance, len(trustedPeers)+1), + } + isClosest = func(c cid.Cid) bool { + return checker.isClosest(c) + } + } + // a. Untrack items which should not be tracked - // b. Track items which should not be remote as local - // c. Track items which should not be local as remote + // b. Unpin items which have expired + // c. Track items which should not be remote as local + // d. Track items which should not be local as remote for _, p := range trackedPins { pCid := p.Cid currentPin, err := cState.Get(ctx, pCid) @@ -1003,16 +1024,15 @@ func (c *Cluster) StateSync(ctx context.Context) error { continue } - allocatedHere := containsPeer(currentPin.Allocations, c.id) || currentPin.ReplicationFactorMin == -1 - - switch { - case p.Status == api.TrackerStatusRemote && allocatedHere: - logger.Debugf("StateSync: Tracking %s locally (currently remote)", pCid) - err = c.tracker.Track(ctx, currentPin) - case p.Status == api.TrackerStatusPinned && !allocatedHere: - logger.Debugf("StateSync: Tracking %s as remote (currently local)", pCid) - err = c.tracker.Track(ctx, currentPin) + if currentPin.ExpiredAt(timeNow) && isClosest(pCid) { + logger.Infof("Unpinning %s: pin expired at %s", pCid, currentPin.ExpireAt) + if _, err := c.Unpin(ctx, pCid); err != nil { + logger.Error(err) + } + continue } + + err = c.updateRemotePins(ctx, currentPin, p) if err != nil { return err } @@ -1021,6 +1041,22 @@ func (c *Cluster) StateSync(ctx context.Context) error { return nil } +func (c *Cluster) updateRemotePins(ctx context.Context, pin *api.Pin, p *api.PinInfo) error { + var err error + allocatedHere := pin.ReplicationFactorMin == -1 || containsPeer(pin.Allocations, c.id) + + switch { + case p.Status == api.TrackerStatusRemote && allocatedHere: + logger.Debugf("StateSync: Tracking %s locally (currently remote)", p.Cid) + err = c.tracker.Track(ctx, pin) + case p.Status == api.TrackerStatusPinned && !allocatedHere: + logger.Debugf("StateSync: Tracking %s as remote (currently local)", p.Cid) + err = c.tracker.Track(ctx, pin) + } + + return err +} + // StatusAll returns the GlobalPinInfo for all tracked Cids in all peers. // If an error happens, the slice will contain as much information as // could be fetched from other peers. @@ -1330,6 +1366,10 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error { return err } + if !pin.ExpireAt.IsZero() && pin.ExpireAt.Before(time.Now()) { + return errors.New("pin.ExpireAt set before current time") + } + existing, err := c.PinGet(ctx, pin.Cid) if err != nil && err != state.ErrNotFound { return err @@ -1619,6 +1659,25 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID { return peers } +// getTrustedPeers gives listed of trusted peers except the current peer. +func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) { + peers, err := c.consensus.Peers(ctx) + if err != nil { + return nil, err + } + + trustedPeers := make([]peer.ID, 0, len(peers)) + + for _, p := range peers { + if p == c.id || !c.consensus.IsTrustedPeer(ctx, p) { + continue + } + trustedPeers = append(trustedPeers, p) + } + + return trustedPeers, nil +} + func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid") defer span.End() diff --git a/cluster_test.go b/cluster_test.go index 562317cd..78504e06 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -300,6 +300,21 @@ func TestClusterPin(t *testing.T) { } } +func TestPinExpired(t *testing.T) { + ctx := context.Background() + cl, _, _, _ := testingCluster(t) + defer cleanState() + defer cl.Shutdown(ctx) + + c := test.Cid1 + _, err := cl.Pin(ctx, c, api.PinOptions{ + ExpireAt: time.Now(), + }) + if err == nil { + t.Fatal("pin should have errored") + } +} + func TestClusterPinPath(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index 3b420740..f670960d 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -359,6 +359,10 @@ content. Value: defaultAddParams.ReplicationFactorMax, Usage: "Sets the maximum replication factor for pinning this file", }, + cli.StringFlag{ + Name: "expire-in", + Usage: "Duration after which the pin should be unpinned automatically", + }, cli.StringSliceFlag{ Name: "metadata", Usage: "Pin metadata: key=value. Can be added multiple times", @@ -414,6 +418,12 @@ content. p := api.DefaultAddParams() p.ReplicationFactorMin = c.Int("replication-min") p.ReplicationFactorMax = c.Int("replication-max") + if expireIn := c.String("expire-in"); expireIn != "" { + d, err := time.ParseDuration(expireIn) + checkErr("parsing expire-in", err) + p.ExpireAt = time.Now().Add(d) + } + p.Metadata = parseMetadata(c.StringSlice("metadata")) p.Name = name if c.String("allocations") != "" { @@ -540,6 +550,10 @@ would stil be respected. Value: "", Usage: "Sets a name for this pin", }, + cli.StringFlag{ + Name: "expire-in", + Usage: "Duration after which pin should be unpinned automatically", + }, cli.StringSliceFlag{ Name: "metadata", Usage: "Pin metadata: key=value. Can be added multiple times", @@ -579,12 +593,19 @@ would stil be respected. checkErr("decoding allocations", errors.New("some peer IDs could not be decoded")) } } + var expireAt time.Time + if expireIn := c.String("expire-in"); expireIn != "" { + d, err := time.ParseDuration(expireIn) + checkErr("parsing expire-in", err) + expireAt = time.Now().Add(d) + } opts := api.PinOptions{ ReplicationFactorMin: rplMin, ReplicationFactorMax: rplMax, Name: c.String("name"), UserAllocations: userAllocs, + ExpireAt: expireAt, Metadata: parseMetadata(c.StringSlice("metadata")), } diff --git a/go.mod b/go.mod index 4693f89a..3218636d 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/urfave/cli v1.22.1 github.com/zenground0/go-dot v0.0.0-20180912213407-94a425d4984e go.opencensus.io v0.22.1 + golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 gonum.org/v1/gonum v0.0.0-20190926113837-94b2bbd8ac13 gonum.org/v1/plot v0.0.0-20190615073203-9aa86143727f ) diff --git a/ipfscluster_test.go b/ipfscluster_test.go index b42fe5a5..26b550b2 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -2161,3 +2161,57 @@ func TestClustersFollowerMode(t *testing.T) { } }) } + +func TestClusterPinsWithExpiration(t *testing.T) { + ctx := context.Background() + + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + + ttlDelay() + + cl := clusters[rand.Intn(nClusters)] // choose a random cluster peer to query + + c := test.Cid1 + expireIn := 1 * time.Second + opts := api.PinOptions{ + ExpireAt: time.Now().Add(expireIn), + } + _, err := cl.Pin(ctx, c, opts) + if err != nil { + t.Fatal("pin should have worked:", err) + } + + pinDelay() + + pins, err := cl.Pins(ctx) + if err != nil { + t.Fatal(err) + } + if len(pins) != 1 { + t.Error("pin should be part of the state") + } + + // wait till expiry time + time.Sleep(expireIn) + + // manually call state sync on all peers, so we don't have to wait till + // state sync interval + for _, c := range clusters { + err = c.StateSync(ctx) + if err != nil { + t.Error(err) + } + } + + pinDelay() + + // state sync should have unpinned expired pin + pins, err = cl.Pins(ctx) + if err != nil { + t.Fatal(err) + } + if len(pins) != 0 { + t.Error("pin should not be part of the state") + } +} diff --git a/pstoremgr/pstoremgr.go b/pstoremgr/pstoremgr.go index e0f77fb3..2d41ea59 100644 --- a/pstoremgr/pstoremgr.go +++ b/pstoremgr/pstoremgr.go @@ -15,7 +15,6 @@ import ( "time" logging "github.com/ipfs/go-log" - utils "github.com/ipfs/ipfs-cluster/utils" host "github.com/libp2p/go-libp2p-core/host" net "github.com/libp2p/go-libp2p-core/network" peer "github.com/libp2p/go-libp2p-core/peer" @@ -143,7 +142,7 @@ func (pm *Manager) filteredPeerAddrs(p peer.ID) []ma.Multiaddr { return peerDNSAddrs } - sort.Sort(utils.ByString(peerAddrs)) + sort.Sort(byString(peerAddrs)) return peerAddrs } @@ -397,3 +396,10 @@ func (ps *peerSort) Swap(i, j int) { ps.pinfos[i] = pinfo2 ps.pinfos[j] = pinfo1 } + +// byString can sort multiaddresses by its string +type byString []ma.Multiaddr + +func (m byString) Len() int { return len(m) } +func (m byString) Swap(i, j int) { m[i], m[j] = m[j], m[i] } +func (m byString) Less(i, j int) bool { return m[i].String() < m[j].String() } diff --git a/state/interface.go b/state/interface.go index 17c56d1c..db8aa1c2 100644 --- a/state/interface.go +++ b/state/interface.go @@ -2,7 +2,7 @@ // IPFS Cluster must satisfy. package state -// State represents the shared state of the cluster and it +// State represents the shared state of the cluster import ( "context" "errors" @@ -17,7 +17,7 @@ import ( var ErrNotFound = errors.New("pin is not part of the pinset") // State is a wrapper to the Cluster shared state so that Pin objects can -// easily read, written and queried. The state can be marshaled and +// be easily read, written and queried. The state can be marshaled and // unmarshaled. Implementation should be thread-safe. type State interface { ReadOnly @@ -29,10 +29,9 @@ type State interface { Marshal(io.Writer) error // Unmarshal deserializes the state from marshaled bytes. Unmarshal(io.Reader) error - // Commit writes any batched operations. } -// ReadOnly represents a the read side of a State. +// ReadOnly represents the read side of a State. type ReadOnly interface { // List lists all the pins in the state. List(context.Context) ([]*api.Pin, error) diff --git a/util.go b/util.go index 389a575d..77d919aa 100644 --- a/util.go +++ b/util.go @@ -1,9 +1,12 @@ package ipfscluster import ( + "bytes" "errors" "fmt" + blake2b "golang.org/x/crypto/blake2b" + cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -93,3 +96,53 @@ func minInt(x, y int) int { // pin.Parents.Add(c) // } // } + +type distance [blake2b.Size256]byte + +type distanceChecker struct { + local peer.ID + otherPeers []peer.ID + cache map[peer.ID]distance +} + +func (dc distanceChecker) isClosest(ci cid.Cid) bool { + ciHash := convertKey(ci.KeyString()) + localPeerHash := dc.convertPeerID(dc.local) + myDistance := xor(ciHash, localPeerHash) + + for _, p := range dc.otherPeers { + peerHash := dc.convertPeerID(p) + distance := xor(peerHash, ciHash) + + // if myDistance is larger than for other peers... + if bytes.Compare(myDistance[:], distance[:]) > 0 { + return false + } + } + return true +} + +// convertPeerID hashes a Peer ID (Multihash). +func (dc distanceChecker) convertPeerID(id peer.ID) distance { + hash, ok := dc.cache[id] + if ok { + return hash + } + + hashBytes := convertKey(string(id)) + dc.cache[id] = hashBytes + return hashBytes +} + +// convertKey hashes a key. +func convertKey(id string) distance { + return blake2b.Sum256([]byte(id)) +} + +func xor(a, b distance) distance { + var c distance + for i := 0; i < len(c); i++ { + c[i] = a[i] ^ b[i] + } + return c +} diff --git a/utils/utils.go b/utils/utils.go deleted file mode 100644 index d0872402..00000000 --- a/utils/utils.go +++ /dev/null @@ -1,12 +0,0 @@ -package utils - -import ( - ma "github.com/multiformats/go-multiaddr" -) - -// ByString can sort multiaddresses by its string -type ByString []ma.Multiaddr - -func (m ByString) Len() int { return len(m) } -func (m ByString) Swap(i, j int) { m[i], m[j] = m[j], m[i] } -func (m ByString) Less(i, j int) bool { return m[i].String() < m[j].String() }