From 850b57e62e2403fcc2e13537efe65444f9f5db3b Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 9 Aug 2019 16:45:54 +0200 Subject: [PATCH] ipfshttp: Remove "refs" method go-ipfs no longer performs a global lock when two simultaneous requests to pin happen. This allows us to remove a bunch of code. --- ipfsconn/ipfshttp/config.go | 17 ------ ipfsconn/ipfshttp/ipfshttp.go | 97 ++---------------------------- ipfsconn/ipfshttp/ipfshttp_test.go | 50 +++++---------- test/ipfs_mock.go | 40 ++++++++++-- 4 files changed, 57 insertions(+), 147 deletions(-) diff --git a/ipfsconn/ipfshttp/config.go b/ipfsconn/ipfshttp/config.go index d42bdf1e..371aa94d 100644 --- a/ipfsconn/ipfshttp/config.go +++ b/ipfsconn/ipfshttp/config.go @@ -20,7 +20,6 @@ const envConfigKey = "cluster_ipfshttp" const ( DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001" DefaultConnectSwarmsDelay = 30 * time.Second - DefaultPinMethod = "refs" DefaultIPFSRequestTimeout = 5 * time.Minute DefaultPinTimeout = 24 * time.Hour DefaultUnpinTimeout = 3 * time.Hour @@ -40,11 +39,6 @@ type Config struct { // IPFS daemons of other peers. ConnectSwarmsDelay time.Duration - // "pin" or "refs". "pin" uses a "pin/add" call. "refs" uses a - // "refs -r" call followed by "pin/add". "refs" allows fetching in - // parallel but should be used with GC disabled. - PinMethod string - // IPFS Daemon HTTP Client POST timeout IPFSRequestTimeout time.Duration @@ -64,7 +58,6 @@ type Config struct { type jsonConfig struct { NodeMultiaddress string `json:"node_multiaddress"` ConnectSwarmsDelay string `json:"connect_swarms_delay"` - PinMethod string `json:"pin_method"` IPFSRequestTimeout string `json:"ipfs_request_timeout"` PinTimeout string `json:"pin_timeout"` UnpinTimeout string `json:"unpin_timeout"` @@ -81,7 +74,6 @@ func (cfg *Config) Default() error { node, _ := ma.NewMultiaddr(DefaultNodeAddr) cfg.NodeAddr = node cfg.ConnectSwarmsDelay = DefaultConnectSwarmsDelay - cfg.PinMethod = DefaultPinMethod cfg.IPFSRequestTimeout = DefaultIPFSRequestTimeout cfg.PinTimeout = DefaultPinTimeout cfg.UnpinTimeout = DefaultUnpinTimeout @@ -118,12 +110,6 @@ func (cfg *Config) Validate() error { err = errors.New("ipfshttp.connect_swarms_delay is invalid") } - switch cfg.PinMethod { - case "refs", "pin": - default: - err = errors.New("ipfshttp.pin_method invalid value") - } - if cfg.IPFSRequestTimeout < 0 { err = errors.New("ipfshttp.ipfs_request_timeout invalid") } @@ -173,8 +159,6 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { return err } - config.SetIfNotDefault(jcfg.PinMethod, &cfg.PinMethod) - return cfg.Validate() } @@ -202,7 +186,6 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) { // Set all configuration fields jcfg.NodeMultiaddress = cfg.NodeAddr.String() jcfg.ConnectSwarmsDelay = cfg.ConnectSwarmsDelay.String() - jcfg.PinMethod = cfg.PinMethod jcfg.IPFSRequestTimeout = cfg.IPFSRequestTimeout.String() jcfg.PinTimeout = cfg.PinTimeout.String() jcfg.UnpinTimeout = cfg.UnpinTimeout.String() diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index a658c612..184d4b88 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -315,56 +315,19 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error { ctx, cancelRequest := context.WithCancel(ctx) defer cancelRequest() - pinMethod := ipfs.config.PinMethod - // If we have a pin-update, and the old object // is pinned recursively, then do pin/update. // Otherwise do a normal pin. if from := pin.PinUpdate; from != cid.Undef { pinStatus, _ := ipfs.PinLsCid(ctx, from) - if pinStatus.IsPinned(-1) { // pinned recursively as update - pinMethod = "update" - // as a side note, if PinUpdate == pin.Cid, we are - // somehow pinning an already pinned thing and we'd better - // use update for that + if pinStatus.IsPinned(-1) { // pinned recursively. + // As a side note, if PinUpdate == pin.Cid, we are + // somehow pinning an already pinned thing and we'd + // better use update for that + return ipfs.pinUpdate(ctx, from, pin.Cid) } } - switch pinMethod { - case "update": - return ipfs.pinUpdate(ctx, pin.PinUpdate, pin.Cid) - case "refs": - // do refs -r first and timeout if we don't get at least - // one ref per pin timeout - outRefs := make(chan string) - go func() { - lastRefTime := time.Now() - ticker := time.NewTicker(progressTick) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if time.Since(lastRefTime) >= ipfs.config.PinTimeout { - cancelRequest() // timeout - return - } - case <-outRefs: - lastRefTime = time.Now() - case <-ctx.Done(): - return - } - } - }() - - err := ipfs.refsProgress(ctx, hash, maxDepth, outRefs) - if err != nil { - return err - } - - logger.Debugf("Refs for %s sucessfully fetched", hash) - - } - // Pin request and timeout if there is no progress outPins := make(chan int) go func() { @@ -406,56 +369,6 @@ func (ipfs *Connector) Pin(ctx context.Context, pin *api.Pin) error { return nil } -// refsProgress fetches refs and puts them on a channel. Blocks until done or -// error. refsProgress will always close the out channel. refsProgres will -// not block on sending to the channel if it is full. -func (ipfs *Connector) refsProgress(ctx context.Context, hash cid.Cid, maxDepth int, out chan<- string) error { - defer close(out) - - ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/refsProgress") - defer span.End() - - path := fmt.Sprintf("refs?arg=%s&%s", hash, pinArgs(maxDepth)) - res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil) - if err != nil { - return err - } - defer res.Body.Close() - - _, err = checkResponse(path, res) - if err != nil { - return err - } - - dec := json.NewDecoder(res.Body) - for { - var ref ipfsRefsResp - if err := dec.Decode(&ref); err != nil { - // If we cancelled the request we should tell the user - // (in case dec.Decode() exited cleanly with an EOF). - select { - case <-ctx.Done(): - return ctx.Err() - default: - if err == io.EOF { - return nil // clean exit - } - return err // error decoding - } - } - - // We have a Ref! - if errStr := ref.Err; errStr != "" { - logger.Error(errStr) - } - - select { // do not lock - case out <- ref.Ref: - default: - } - } -} - // pinProgress pins an item and sends fetched node's progress on a // channel. Blocks until done or error. pinProgress will always close the out // channel. pinProgress will not block on sending to the channel if it is full. diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index a85c3169..836e03de 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "strings" "testing" "time" @@ -69,14 +68,12 @@ func TestIPFSID(t *testing.T) { } } -func testPin(t *testing.T, method string) { +func TestPin(t *testing.T) { ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() defer ipfs.Shutdown(ctx) - ipfs.config.PinMethod = method - c := test.Cid1 err := ipfs.Pin(ctx, api.PinCid(c)) if err != nil { @@ -96,26 +93,15 @@ func testPin(t *testing.T, method string) { t.Error("expected error pinning cid") } - switch method { - case "refs": - ipfs.config.PinTimeout = 1 * time.Second - c3 := test.SlowCid1 - err = ipfs.Pin(ctx, api.PinCid(c3)) - if err == nil { - t.Error("expected error pinning cid") - } - case "pin": - ipfs.config.PinTimeout = 5 * time.Second - c4 := test.SlowCid1 - err = ipfs.Pin(ctx, api.PinCid(c4)) - if err == nil { - t.Error("expected error pinning cid") - } - default: + ipfs.config.PinTimeout = 5 * time.Second + c4 := test.SlowCid1 + err = ipfs.Pin(ctx, api.PinCid(c4)) + if err == nil { + t.Error("expected error pinning cid") } } -func testPinUpdate(t *testing.T) { +func TestPinUpdate(t *testing.T) { ctx := context.Background() ipfs, mock := testIPFSConnector(t) defer mock.Close() @@ -123,17 +109,11 @@ func testPinUpdate(t *testing.T) { pin := api.PinCid(test.Cid1) pin.PinUpdate = test.Cid1 - // enforce pin/update even though it would be skipped - ipfs.config.PinMethod = "update" err := ipfs.Pin(ctx, pin) - if err == nil { - t.Fatal("expected an error") - } - if !strings.HasSuffix(err.Error(), "recursively pinned already") { - t.Fatal("expected error about from not being pinned") + if err != nil { + t.Error("pin update should have worked even if not pinned") } - ipfs.config.PinMethod = "pin" err = ipfs.Pin(ctx, pin) if err != nil { t.Fatal(err) @@ -145,12 +125,14 @@ func testPinUpdate(t *testing.T) { if err != nil { t.Fatal(err) } -} -func TestIPFSPin(t *testing.T) { - t.Run("method=pin", func(t *testing.T) { testPin(t, "pin") }) - t.Run("method=refs", func(t *testing.T) { testPin(t, "refs") }) - t.Run("method=update", func(t *testing.T) { testPinUpdate(t) }) + if mock.GetCount("pin/update") != 1 { + t.Error("pin/update should have been called once") + } + + if mock.GetCount("pin/add") != 1 { + t.Error("pin/add should have been called once") + } } func TestIPFSUnpin(t *testing.T) { diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index bd9ea455..018da251 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -10,6 +10,7 @@ import ( "net/url" "strconv" "strings" + "sync" "testing" "time" @@ -29,6 +30,7 @@ const ( IpfsTimeHeaderName = "X-Time-Now" IpfsCustomHeaderValue = "42" IpfsACAOrigin = "myorigin" + IpfsErrFromNotPinned = "'from' cid was not recursively pinned already" ) // IpfsMock is an ipfs daemon mock which should sustain the functionality used by ipfscluster. @@ -38,6 +40,11 @@ type IpfsMock struct { Port int pinMap state.State BlockStore map[string][]byte + reqCounts map[string]int + reqCounter chan string + + closeMux sync.Mutex + closed bool } type mockPinResp struct { @@ -105,12 +112,16 @@ func NewIpfsMock(t *testing.T) *IpfsMock { if err != nil { t.Fatal(err) } - blocks := make(map[string][]byte) + m := &IpfsMock{ pinMap: st, - BlockStore: blocks, + BlockStore: make(map[string][]byte), + reqCounts: make(map[string]int), + reqCounter: make(chan string, 100), } + go m.countRequests() + mux := http.NewServeMux() mux.HandleFunc("/", m.handler) @@ -135,6 +146,18 @@ func NewIpfsMock(t *testing.T) *IpfsMock { } +func (m *IpfsMock) countRequests() { + for str := range m.reqCounter { + m.reqCounts[str]++ + } +} + +// GetCount allows to get the number of times and endpoint was called. +// Do not use concurrently to requests happening. +func (m *IpfsMock) GetCount(path string) int { + return m.reqCounts[path] +} + // FIXME: what if IPFS API changes? func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() @@ -143,6 +166,9 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "ipfs-mock") w.Header().Set(IpfsTimeHeaderName, fmt.Sprintf("%d", time.Now().Unix())) endp := strings.TrimPrefix(p, "/api/v0/") + + m.reqCounter <- endp + switch endp { case "id": resp := mockIDResp{ @@ -215,7 +241,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { pin, err := m.pinMap.Get(ctx, from) if err != nil { w.WriteHeader(http.StatusInternalServerError) - resp := ipfsErr{0, fmt.Sprintf("'from' cid was not recursively pinned already")} + resp := ipfsErr{0, IpfsErrFromNotPinned} j, _ := json.Marshal(resp) w.Write(j) return @@ -413,7 +439,13 @@ ERROR: // Close closes the mock server. It's important to call after each test or // the listeners are left hanging around. func (m *IpfsMock) Close() { - m.server.Close() + m.closeMux.Lock() + defer m.closeMux.Unlock() + if !m.closed { + m.closed = true + close(m.reqCounter) + m.server.Close() + } } // extractCid extracts the cid argument from a url.URL, either via