ipfshttp: Remove "refs" method

go-ipfs no longer performs a global lock when two simultaneous requests to pin
happen. This allows us to remove a bunch of code.
This commit is contained in:
Hector Sanjuan 2019-08-09 16:45:54 +02:00
parent d8c20adc4e
commit 850b57e62e
4 changed files with 57 additions and 147 deletions

View File

@ -20,7 +20,6 @@ const envConfigKey = "cluster_ipfshttp"
const (
DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001"
DefaultConnectSwarmsDelay = 30 * time.Second
DefaultPinMethod = "refs"
DefaultIPFSRequestTimeout = 5 * time.Minute
DefaultPinTimeout = 24 * time.Hour
DefaultUnpinTimeout = 3 * time.Hour
@ -40,11 +39,6 @@ type Config struct {
// IPFS daemons of other peers.
ConnectSwarmsDelay time.Duration
// "pin" or "refs". "pin" uses a "pin/add" call. "refs" uses a
// "refs -r" call followed by "pin/add". "refs" allows fetching in
// parallel but should be used with GC disabled.
PinMethod string
// IPFS Daemon HTTP Client POST timeout
IPFSRequestTimeout time.Duration
@ -64,7 +58,6 @@ type Config struct {
type jsonConfig struct {
NodeMultiaddress string `json:"node_multiaddress"`
ConnectSwarmsDelay string `json:"connect_swarms_delay"`
PinMethod string `json:"pin_method"`
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
PinTimeout string `json:"pin_timeout"`
UnpinTimeout string `json:"unpin_timeout"`
@ -81,7 +74,6 @@ func (cfg *Config) Default() error {
node, _ := ma.NewMultiaddr(DefaultNodeAddr)
cfg.NodeAddr = node
cfg.ConnectSwarmsDelay = DefaultConnectSwarmsDelay
cfg.PinMethod = DefaultPinMethod
cfg.IPFSRequestTimeout = DefaultIPFSRequestTimeout
cfg.PinTimeout = DefaultPinTimeout
cfg.UnpinTimeout = DefaultUnpinTimeout
@ -118,12 +110,6 @@ func (cfg *Config) Validate() error {
err = errors.New("ipfshttp.connect_swarms_delay is invalid")
}
switch cfg.PinMethod {
case "refs", "pin":
default:
err = errors.New("ipfshttp.pin_method invalid value")
}
if cfg.IPFSRequestTimeout < 0 {
err = errors.New("ipfshttp.ipfs_request_timeout invalid")
}
@ -173,8 +159,6 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
return err
}
config.SetIfNotDefault(jcfg.PinMethod, &cfg.PinMethod)
return cfg.Validate()
}
@ -202,7 +186,6 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) {
// Set all configuration fields
jcfg.NodeMultiaddress = cfg.NodeAddr.String()
jcfg.ConnectSwarmsDelay = cfg.ConnectSwarmsDelay.String()
jcfg.PinMethod = cfg.PinMethod
jcfg.IPFSRequestTimeout = cfg.IPFSRequestTimeout.String()
jcfg.PinTimeout = cfg.PinTimeout.String()
jcfg.UnpinTimeout = cfg.UnpinTimeout.String()

View File

@ -315,56 +315,19 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
ctx, cancelRequest := context.WithCancel(ctx)
defer cancelRequest()
pinMethod := ipfs.config.PinMethod
// If we have a pin-update, and the old object
// is pinned recursively, then do pin/update.
// Otherwise do a normal pin.
if from := pin.PinUpdate; from != cid.Undef {
pinStatus, _ := ipfs.PinLsCid(ctx, from)
if pinStatus.IsPinned(-1) { // pinned recursively as update
pinMethod = "update"
// as a side note, if PinUpdate == pin.Cid, we are
// somehow pinning an already pinned thing and we'd better
// use update for that
if pinStatus.IsPinned(-1) { // pinned recursively.
// As a side note, if PinUpdate == pin.Cid, we are
// somehow pinning an already pinned thing and we'd
// better use update for that
return ipfs.pinUpdate(ctx, from, pin.Cid)
}
}
switch pinMethod {
case "update":
return ipfs.pinUpdate(ctx, pin.PinUpdate, pin.Cid)
case "refs":
// do refs -r first and timeout if we don't get at least
// one ref per pin timeout
outRefs := make(chan string)
go func() {
lastRefTime := time.Now()
ticker := time.NewTicker(progressTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if time.Since(lastRefTime) >= ipfs.config.PinTimeout {
cancelRequest() // timeout
return
}
case <-outRefs:
lastRefTime = time.Now()
case <-ctx.Done():
return
}
}
}()
err := ipfs.refsProgress(ctx, hash, maxDepth, outRefs)
if err != nil {
return err
}
logger.Debugf("Refs for %s sucessfully fetched", hash)
}
// Pin request and timeout if there is no progress
outPins := make(chan int)
go func() {
@ -406,56 +369,6 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error {
return nil
}
// refsProgress fetches refs and puts them on a channel. Blocks until done or
// error. refsProgress will always close the out channel. refsProgres will
// not block on sending to the channel if it is full.
func (ipfs *Connector) refsProgress(ctx context.Context, hash cid.Cid, maxDepth int, out chan<- string) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/refsProgress")
defer span.End()
path := fmt.Sprintf("refs?arg=%s&%s", hash, pinArgs(maxDepth))
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil)
if err != nil {
return err
}
defer res.Body.Close()
_, err = checkResponse(path, res)
if err != nil {
return err
}
dec := json.NewDecoder(res.Body)
for {
var ref ipfsRefsResp
if err := dec.Decode(&ref); err != nil {
// If we cancelled the request we should tell the user
// (in case dec.Decode() exited cleanly with an EOF).
select {
case <-ctx.Done():
return ctx.Err()
default:
if err == io.EOF {
return nil // clean exit
}
return err // error decoding
}
}
// We have a Ref!
if errStr := ref.Err; errStr != "" {
logger.Error(errStr)
}
select { // do not lock
case out <- ref.Ref:
default:
}
}
}
// pinProgress pins an item and sends fetched node's progress on a
// channel. Blocks until done or error. pinProgress will always close the out
// channel. pinProgress will not block on sending to the channel if it is full.

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"strings"
"testing"
"time"
@ -69,14 +68,12 @@ func TestIPFSID(t *testing.T) {
}
}
func testPin(t *testing.T, method string) {
func TestPin(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown(ctx)
ipfs.config.PinMethod = method
c := test.Cid1
err := ipfs.Pin(ctx, api.PinCid(c))
if err != nil {
@ -96,26 +93,15 @@ func testPin(t *testing.T, method string) {
t.Error("expected error pinning cid")
}
switch method {
case "refs":
ipfs.config.PinTimeout = 1 * time.Second
c3 := test.SlowCid1
err = ipfs.Pin(ctx, api.PinCid(c3))
if err == nil {
t.Error("expected error pinning cid")
}
case "pin":
ipfs.config.PinTimeout = 5 * time.Second
c4 := test.SlowCid1
err = ipfs.Pin(ctx, api.PinCid(c4))
if err == nil {
t.Error("expected error pinning cid")
}
default:
ipfs.config.PinTimeout = 5 * time.Second
c4 := test.SlowCid1
err = ipfs.Pin(ctx, api.PinCid(c4))
if err == nil {
t.Error("expected error pinning cid")
}
}
func testPinUpdate(t *testing.T) {
func TestPinUpdate(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
@ -123,17 +109,11 @@ func testPinUpdate(t *testing.T) {
pin := api.PinCid(test.Cid1)
pin.PinUpdate = test.Cid1
// enforce pin/update even though it would be skipped
ipfs.config.PinMethod = "update"
err := ipfs.Pin(ctx, pin)
if err == nil {
t.Fatal("expected an error")
}
if !strings.HasSuffix(err.Error(), "recursively pinned already") {
t.Fatal("expected error about from not being pinned")
if err != nil {
t.Error("pin update should have worked even if not pinned")
}
ipfs.config.PinMethod = "pin"
err = ipfs.Pin(ctx, pin)
if err != nil {
t.Fatal(err)
@ -145,12 +125,14 @@ func testPinUpdate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
}
func TestIPFSPin(t *testing.T) {
t.Run("method=pin", func(t *testing.T) { testPin(t, "pin") })
t.Run("method=refs", func(t *testing.T) { testPin(t, "refs") })
t.Run("method=update", func(t *testing.T) { testPinUpdate(t) })
if mock.GetCount("pin/update") != 1 {
t.Error("pin/update should have been called once")
}
if mock.GetCount("pin/add") != 1 {
t.Error("pin/add should have been called once")
}
}
func TestIPFSUnpin(t *testing.T) {

View File

@ -10,6 +10,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -29,6 +30,7 @@ const (
IpfsTimeHeaderName = "X-Time-Now"
IpfsCustomHeaderValue = "42"
IpfsACAOrigin = "myorigin"
IpfsErrFromNotPinned = "'from' cid was not recursively pinned already"
)
// IpfsMock is an ipfs daemon mock which should sustain the functionality used by ipfscluster.
@ -38,6 +40,11 @@ type IpfsMock struct {
Port int
pinMap state.State
BlockStore map[string][]byte
reqCounts map[string]int
reqCounter chan string
closeMux sync.Mutex
closed bool
}
type mockPinResp struct {
@ -105,12 +112,16 @@ func NewIpfsMock(t *testing.T) *IpfsMock {
if err != nil {
t.Fatal(err)
}
blocks := make(map[string][]byte)
m := &IpfsMock{
pinMap: st,
BlockStore: blocks,
BlockStore: make(map[string][]byte),
reqCounts: make(map[string]int),
reqCounter: make(chan string, 100),
}
go m.countRequests()
mux := http.NewServeMux()
mux.HandleFunc("/", m.handler)
@ -135,6 +146,18 @@ func NewIpfsMock(t *testing.T) *IpfsMock {
}
func (m *IpfsMock) countRequests() {
for str := range m.reqCounter {
m.reqCounts[str]++
}
}
// GetCount allows to get the number of times and endpoint was called.
// Do not use concurrently to requests happening.
func (m *IpfsMock) GetCount(path string) int {
return m.reqCounts[path]
}
// FIXME: what if IPFS API changes?
func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
@ -143,6 +166,9 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "ipfs-mock")
w.Header().Set(IpfsTimeHeaderName, fmt.Sprintf("%d", time.Now().Unix()))
endp := strings.TrimPrefix(p, "/api/v0/")
m.reqCounter <- endp
switch endp {
case "id":
resp := mockIDResp{
@ -215,7 +241,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
pin, err := m.pinMap.Get(ctx, from)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
resp := ipfsErr{0, fmt.Sprintf("'from' cid was not recursively pinned already")}
resp := ipfsErr{0, IpfsErrFromNotPinned}
j, _ := json.Marshal(resp)
w.Write(j)
return
@ -413,7 +439,13 @@ ERROR:
// Close closes the mock server. It's important to call after each test or
// the listeners are left hanging around.
func (m *IpfsMock) Close() {
m.server.Close()
m.closeMux.Lock()
defer m.closeMux.Unlock()
if !m.closed {
m.closed = true
close(m.reqCounter)
m.server.Close()
}
}
// extractCid extracts the cid argument from a url.URL, either via