Merge pull request #875 from ipfs/fix/pin-methods
ipfshttp: Remove "refs" method
This commit is contained in:
commit
f9889e712f
|
@ -98,7 +98,6 @@ var testingProxyCfg = []byte(`{
|
|||
var testingIpfsCfg = []byte(`{
|
||||
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||
"connect_swarms_delay": "7s",
|
||||
"pin_method": "pin",
|
||||
"pin_timeout": "30s",
|
||||
"unpin_timeout": "15s"
|
||||
}`)
|
||||
|
|
|
@ -20,7 +20,6 @@ const envConfigKey = "cluster_ipfshttp"
|
|||
const (
|
||||
DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001"
|
||||
DefaultConnectSwarmsDelay = 30 * time.Second
|
||||
DefaultPinMethod = "refs"
|
||||
DefaultIPFSRequestTimeout = 5 * time.Minute
|
||||
DefaultPinTimeout = 24 * time.Hour
|
||||
DefaultUnpinTimeout = 3 * time.Hour
|
||||
|
@ -40,11 +39,6 @@ type Config struct {
|
|||
// IPFS daemons of other peers.
|
||||
ConnectSwarmsDelay time.Duration
|
||||
|
||||
// "pin" or "refs". "pin" uses a "pin/add" call. "refs" uses a
|
||||
// "refs -r" call followed by "pin/add". "refs" allows fetching in
|
||||
// parallel but should be used with GC disabled.
|
||||
PinMethod string
|
||||
|
||||
// IPFS Daemon HTTP Client POST timeout
|
||||
IPFSRequestTimeout time.Duration
|
||||
|
||||
|
@ -64,7 +58,6 @@ type Config struct {
|
|||
type jsonConfig struct {
|
||||
NodeMultiaddress string `json:"node_multiaddress"`
|
||||
ConnectSwarmsDelay string `json:"connect_swarms_delay"`
|
||||
PinMethod string `json:"pin_method"`
|
||||
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
|
||||
PinTimeout string `json:"pin_timeout"`
|
||||
UnpinTimeout string `json:"unpin_timeout"`
|
||||
|
@ -81,7 +74,6 @@ func (cfg *Config) Default() error {
|
|||
node, _ := ma.NewMultiaddr(DefaultNodeAddr)
|
||||
cfg.NodeAddr = node
|
||||
cfg.ConnectSwarmsDelay = DefaultConnectSwarmsDelay
|
||||
cfg.PinMethod = DefaultPinMethod
|
||||
cfg.IPFSRequestTimeout = DefaultIPFSRequestTimeout
|
||||
cfg.PinTimeout = DefaultPinTimeout
|
||||
cfg.UnpinTimeout = DefaultUnpinTimeout
|
||||
|
@ -118,12 +110,6 @@ func (cfg *Config) Validate() error {
|
|||
err = errors.New("ipfshttp.connect_swarms_delay is invalid")
|
||||
}
|
||||
|
||||
switch cfg.PinMethod {
|
||||
case "refs", "pin":
|
||||
default:
|
||||
err = errors.New("ipfshttp.pin_method invalid value")
|
||||
}
|
||||
|
||||
if cfg.IPFSRequestTimeout < 0 {
|
||||
err = errors.New("ipfshttp.ipfs_request_timeout invalid")
|
||||
}
|
||||
|
@ -173,8 +159,6 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
|||
return err
|
||||
}
|
||||
|
||||
config.SetIfNotDefault(jcfg.PinMethod, &cfg.PinMethod)
|
||||
|
||||
return cfg.Validate()
|
||||
}
|
||||
|
||||
|
@ -202,7 +186,6 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) {
|
|||
// Set all configuration fields
|
||||
jcfg.NodeMultiaddress = cfg.NodeAddr.String()
|
||||
jcfg.ConnectSwarmsDelay = cfg.ConnectSwarmsDelay.String()
|
||||
jcfg.PinMethod = cfg.PinMethod
|
||||
jcfg.IPFSRequestTimeout = cfg.IPFSRequestTimeout.String()
|
||||
jcfg.PinTimeout = cfg.PinTimeout.String()
|
||||
jcfg.UnpinTimeout = cfg.UnpinTimeout.String()
|
||||
|
|
|
@ -11,7 +11,6 @@ var cfgJSON = []byte(`
|
|||
{
|
||||
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||
"connect_swarms_delay": "7s",
|
||||
"pin_method": "pin",
|
||||
"ipfs_request_timeout": "5m0s",
|
||||
"pin_timeout": "24h",
|
||||
"unpin_timeout": "3h"
|
||||
|
|
|
@ -315,56 +315,19 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
|
|||
ctx, cancelRequest := context.WithCancel(ctx)
|
||||
defer cancelRequest()
|
||||
|
||||
pinMethod := ipfs.config.PinMethod
|
||||
|
||||
// If we have a pin-update, and the old object
|
||||
// is pinned recursively, then do pin/update.
|
||||
// Otherwise do a normal pin.
|
||||
if from := pin.PinUpdate; from != cid.Undef {
|
||||
pinStatus, _ := ipfs.PinLsCid(ctx, from)
|
||||
if pinStatus.IsPinned(-1) { // pinned recursively as update
|
||||
pinMethod = "update"
|
||||
// as a side note, if PinUpdate == pin.Cid, we are
|
||||
// somehow pinning an already pinned thing and we'd better
|
||||
// use update for that
|
||||
if pinStatus.IsPinned(-1) { // pinned recursively.
|
||||
// As a side note, if PinUpdate == pin.Cid, we are
|
||||
// somehow pinning an already pinned thing and we'd
|
||||
// better use update for that
|
||||
return ipfs.pinUpdate(ctx, from, pin.Cid)
|
||||
}
|
||||
}
|
||||
|
||||
switch pinMethod {
|
||||
case "update":
|
||||
return ipfs.pinUpdate(ctx, pin.PinUpdate, pin.Cid)
|
||||
case "refs":
|
||||
// do refs -r first and timeout if we don't get at least
|
||||
// one ref per pin timeout
|
||||
outRefs := make(chan string)
|
||||
go func() {
|
||||
lastRefTime := time.Now()
|
||||
ticker := time.NewTicker(progressTick)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if time.Since(lastRefTime) >= ipfs.config.PinTimeout {
|
||||
cancelRequest() // timeout
|
||||
return
|
||||
}
|
||||
case <-outRefs:
|
||||
lastRefTime = time.Now()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err := ipfs.refsProgress(ctx, hash, maxDepth, outRefs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Debugf("Refs for %s sucessfully fetched", hash)
|
||||
|
||||
}
|
||||
|
||||
// Pin request and timeout if there is no progress
|
||||
outPins := make(chan int)
|
||||
go func() {
|
||||
|
@ -406,56 +369,6 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// refsProgress fetches refs and puts them on a channel. Blocks until done or
|
||||
// error. refsProgress will always close the out channel. refsProgres will
|
||||
// not block on sending to the channel if it is full.
|
||||
func (ipfs *Connector) refsProgress(ctx context.Context, hash cid.Cid, maxDepth int, out chan<- string) error {
|
||||
defer close(out)
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/refsProgress")
|
||||
defer span.End()
|
||||
|
||||
path := fmt.Sprintf("refs?arg=%s&%s", hash, pinArgs(maxDepth))
|
||||
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
_, err = checkResponse(path, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(res.Body)
|
||||
for {
|
||||
var ref ipfsRefsResp
|
||||
if err := dec.Decode(&ref); err != nil {
|
||||
// If we cancelled the request we should tell the user
|
||||
// (in case dec.Decode() exited cleanly with an EOF).
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
if err == io.EOF {
|
||||
return nil // clean exit
|
||||
}
|
||||
return err // error decoding
|
||||
}
|
||||
}
|
||||
|
||||
// We have a Ref!
|
||||
if errStr := ref.Err; errStr != "" {
|
||||
logger.Error(errStr)
|
||||
}
|
||||
|
||||
select { // do not lock
|
||||
case out <- ref.Ref:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pinProgress pins an item and sends fetched node's progress on a
|
||||
// channel. Blocks until done or error. pinProgress will always close the out
|
||||
// channel. pinProgress will not block on sending to the channel if it is full.
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -69,14 +68,12 @@ func TestIPFSID(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testPin(t *testing.T, method string) {
|
||||
func TestPin(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
defer ipfs.Shutdown(ctx)
|
||||
|
||||
ipfs.config.PinMethod = method
|
||||
|
||||
c := test.Cid1
|
||||
err := ipfs.Pin(ctx, api.PinCid(c))
|
||||
if err != nil {
|
||||
|
@ -96,26 +93,15 @@ func testPin(t *testing.T, method string) {
|
|||
t.Error("expected error pinning cid")
|
||||
}
|
||||
|
||||
switch method {
|
||||
case "refs":
|
||||
ipfs.config.PinTimeout = 1 * time.Second
|
||||
c3 := test.SlowCid1
|
||||
err = ipfs.Pin(ctx, api.PinCid(c3))
|
||||
if err == nil {
|
||||
t.Error("expected error pinning cid")
|
||||
}
|
||||
case "pin":
|
||||
ipfs.config.PinTimeout = 5 * time.Second
|
||||
c4 := test.SlowCid1
|
||||
err = ipfs.Pin(ctx, api.PinCid(c4))
|
||||
if err == nil {
|
||||
t.Error("expected error pinning cid")
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func testPinUpdate(t *testing.T) {
|
||||
func TestPinUpdate(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
|
@ -123,17 +109,11 @@ func testPinUpdate(t *testing.T) {
|
|||
|
||||
pin := api.PinCid(test.Cid1)
|
||||
pin.PinUpdate = test.Cid1
|
||||
// enforce pin/update even though it would be skipped
|
||||
ipfs.config.PinMethod = "update"
|
||||
err := ipfs.Pin(ctx, pin)
|
||||
if err == nil {
|
||||
t.Fatal("expected an error")
|
||||
}
|
||||
if !strings.HasSuffix(err.Error(), "recursively pinned already") {
|
||||
t.Fatal("expected error about from not being pinned")
|
||||
if err != nil {
|
||||
t.Error("pin update should have worked even if not pinned")
|
||||
}
|
||||
|
||||
ipfs.config.PinMethod = "pin"
|
||||
err = ipfs.Pin(ctx, pin)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -145,12 +125,14 @@ func testPinUpdate(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIPFSPin(t *testing.T) {
|
||||
t.Run("method=pin", func(t *testing.T) { testPin(t, "pin") })
|
||||
t.Run("method=refs", func(t *testing.T) { testPin(t, "refs") })
|
||||
t.Run("method=update", func(t *testing.T) { testPinUpdate(t) })
|
||||
if mock.GetCount("pin/update") != 1 {
|
||||
t.Error("pin/update should have been called once")
|
||||
}
|
||||
|
||||
if mock.GetCount("pin/add") != 1 {
|
||||
t.Error("pin/add should have been called once")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIPFSUnpin(t *testing.T) {
|
||||
|
|
|
@ -63,7 +63,6 @@
|
|||
"ipfshttp": {
|
||||
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||
"connect_swarms_delay": "30s",
|
||||
"pin_method": "refs",
|
||||
"ipfs_request_timeout": "5m0s",
|
||||
"pin_timeout": "24h0m0s",
|
||||
"unpin_timeout": "3h0m0s"
|
||||
|
|
|
@ -63,7 +63,6 @@
|
|||
"ipfshttp": {
|
||||
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||
"connect_swarms_delay": "30s",
|
||||
"pin_method": "refs",
|
||||
"ipfs_request_timeout": "5m0s",
|
||||
"pin_timeout": "24h0m0s",
|
||||
"unpin_timeout": "3h0m0s"
|
||||
|
|
|
@ -62,7 +62,6 @@
|
|||
"ipfshttp": {
|
||||
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
|
||||
"connect_swarms_delay": "30s",
|
||||
"pin_method": "refs",
|
||||
"ipfs_request_timeout": "5m0s",
|
||||
"pin_timeout": "24h0m0s",
|
||||
"unpin_timeout": "3h0m0s"
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -29,6 +30,7 @@ const (
|
|||
IpfsTimeHeaderName = "X-Time-Now"
|
||||
IpfsCustomHeaderValue = "42"
|
||||
IpfsACAOrigin = "myorigin"
|
||||
IpfsErrFromNotPinned = "'from' cid was not recursively pinned already"
|
||||
)
|
||||
|
||||
// IpfsMock is an ipfs daemon mock which should sustain the functionality used by ipfscluster.
|
||||
|
@ -38,6 +40,11 @@ type IpfsMock struct {
|
|||
Port int
|
||||
pinMap state.State
|
||||
BlockStore map[string][]byte
|
||||
reqCounts map[string]int
|
||||
reqCounter chan string
|
||||
|
||||
closeMux sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
type mockPinResp struct {
|
||||
|
@ -105,12 +112,16 @@ func NewIpfsMock(t *testing.T) *IpfsMock {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
blocks := make(map[string][]byte)
|
||||
|
||||
m := &IpfsMock{
|
||||
pinMap: st,
|
||||
BlockStore: blocks,
|
||||
BlockStore: make(map[string][]byte),
|
||||
reqCounts: make(map[string]int),
|
||||
reqCounter: make(chan string, 100),
|
||||
}
|
||||
|
||||
go m.countRequests()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", m.handler)
|
||||
|
||||
|
@ -135,6 +146,18 @@ func NewIpfsMock(t *testing.T) *IpfsMock {
|
|||
|
||||
}
|
||||
|
||||
func (m *IpfsMock) countRequests() {
|
||||
for str := range m.reqCounter {
|
||||
m.reqCounts[str]++
|
||||
}
|
||||
}
|
||||
|
||||
// GetCount allows to get the number of times and endpoint was called.
|
||||
// Do not use concurrently to requests happening.
|
||||
func (m *IpfsMock) GetCount(path string) int {
|
||||
return m.reqCounts[path]
|
||||
}
|
||||
|
||||
// FIXME: what if IPFS API changes?
|
||||
func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
@ -143,6 +166,9 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
|
|||
w.Header().Set("Server", "ipfs-mock")
|
||||
w.Header().Set(IpfsTimeHeaderName, fmt.Sprintf("%d", time.Now().Unix()))
|
||||
endp := strings.TrimPrefix(p, "/api/v0/")
|
||||
|
||||
m.reqCounter <- endp
|
||||
|
||||
switch endp {
|
||||
case "id":
|
||||
resp := mockIDResp{
|
||||
|
@ -215,7 +241,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
|
|||
pin, err := m.pinMap.Get(ctx, from)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
resp := ipfsErr{0, fmt.Sprintf("'from' cid was not recursively pinned already")}
|
||||
resp := ipfsErr{0, IpfsErrFromNotPinned}
|
||||
j, _ := json.Marshal(resp)
|
||||
w.Write(j)
|
||||
return
|
||||
|
@ -413,7 +439,13 @@ ERROR:
|
|||
// Close closes the mock server. It's important to call after each test or
|
||||
// the listeners are left hanging around.
|
||||
func (m *IpfsMock) Close() {
|
||||
m.closeMux.Lock()
|
||||
defer m.closeMux.Unlock()
|
||||
if !m.closed {
|
||||
m.closed = true
|
||||
close(m.reqCounter)
|
||||
m.server.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// extractCid extracts the cid argument from a url.URL, either via
|
||||
|
|
Loading…
Reference in New Issue
Block a user