Reset the timer less often

This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-04-24 15:44:13 +05:30 committed by Hector Sanjuan
parent 7cc09e0efd
commit 1b90d871c2

View File

@ -14,6 +14,7 @@ import (
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
gopath "github.com/ipfs/go-path" gopath "github.com/ipfs/go-path"
@ -305,7 +306,7 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
defer timer.Stop() defer timer.Stop()
reset := make(chan int) reset := make(chan int)
checkTimeout(ctx, cancel, timer, ipfs.config.PinTimeout, reset) go checkTimeout(ctx, cancel, timer, ipfs.config.PinTimeout, reset)
switch ipfs.config.PinMethod { switch ipfs.config.PinMethod {
case "refs": // do refs -r first case "refs": // do refs -r first
@ -484,6 +485,27 @@ func checkResponse(path string, code int, body []byte) error {
} }
func handleRefsProgress(dec *json.Decoder, reset chan int) error { func handleRefsProgress(dec *json.Decoder, reset chan int) error {
ticker := time.NewTicker(5 * time.Second)
var val atomic.Value
var done bool
go func() {
for {
select {
case <-ticker.C:
{
if val.Load().(bool) {
reset <- 1
}
val.Store(false)
}
}
if done {
break
}
}
}()
var last string var last string
for { for {
var ref ipfsRefsResp var ref ipfsRefsResp
@ -499,7 +521,7 @@ func handleRefsProgress(dec *json.Decoder, reset chan int) error {
if ref.Ref != last { if ref.Ref != last {
last = ref.Ref last = ref.Ref
reset <- 1 val.Store(true)
} }
} }
@ -507,45 +529,19 @@ func handleRefsProgress(dec *json.Decoder, reset chan int) error {
} }
func handlePinsProgress(dec *json.Decoder, reset chan int) error { func handlePinsProgress(dec *json.Decoder, reset chan int) error {
ticker := time.NewTicker(5 * time.Second)
var val atomic.Value
var progress int var progress int
for {
var pins ipfsPinsResp
if err := dec.Decode(&pins); err == io.EOF {
break
} else if err != nil {
return err
}
//logger.Infof("Progress: %d", pins.Progress)
if pins.Progress > progress {
progress = pins.Progress
reset <- 1
}
}
return nil
}
func checkTimeout(ctx context.Context, cancel context.CancelFunc, timer *time.Timer, timeout time.Duration, reset chan int) {
var done bool var done bool
go func() { go func() {
for { for {
select { select {
case <-reset: case <-ticker.C:
{ {
if timer.Stop() { if val.Load().(bool) {
timer.Reset(timeout) reset <- 1
} }
} val.Store(false)
case <-timer.C:
{
done = true
cancel()
break
}
case <-ctx.Done():
{
done = true
break
} }
} }
@ -554,6 +550,55 @@ func checkTimeout(ctx context.Context, cancel context.CancelFunc, timer *time.Ti
} }
} }
}() }()
for {
var pins ipfsPinsResp
if err := dec.Decode(&pins); err == io.EOF {
done = true
break
} else if err != nil {
done = true
return err
}
//logger.Infof("Progress: %d", pins.Progress)
if pins.Progress > progress {
progress = pins.Progress
val.Store(true)
}
}
return nil
}
func checkTimeout(ctx context.Context, cancel context.CancelFunc, timer *time.Timer, timeout time.Duration, reset chan int) {
var done bool
for {
select {
case <-reset:
{
if timer.Stop() {
timer.Reset(timeout)
}
}
case <-timer.C:
{
done = true
cancel()
break
}
case <-ctx.Done():
{
done = true
break
}
}
if done {
break
}
}
} }
// postCtx makes a POST request against // postCtx makes a POST request against