Error queue is full for stateless pintracker

- increase max pin queue size to 1 million
- hide max_pin_queue_size from configuration
This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-09-11 12:48:34 +07:00
parent cf189799f2
commit 19cde2e8cf
4 changed files with 19 additions and 9 deletions

View File

@ -23,8 +23,6 @@ var logger = logging.Logger("pintracker")
var (
errUnpinned = errors.New("the item is unexpectedly not pinned on IPFS")
// ErrFullQueue is the error used when pin or unpin operation channel is full.
ErrFullQueue = errors.New("pin/unpin operation queue is full (too many operations), increasing max_pin_queue_size would help")
)
// MapPinTracker is a PinTracker implementation which uses a Go map
@ -182,7 +180,7 @@ func (mpt *MapPinTracker) enqueue(ctx context.Context, c *api.Pin, typ optracker
select {
case ch <- op:
default:
err := ErrFullQueue
err := util.ErrFullQueue
op.SetError(err)
op.Cancel()
logger.Error(err.Error())

View File

@ -14,7 +14,7 @@ const envConfigKey = "cluster_stateless"
// Default values for this Config.
const (
DefaultMaxPinQueueSize = 50000
DefaultMaxPinQueueSize = 1000000
DefaultConcurrentPins = 10
)
@ -31,7 +31,7 @@ type Config struct {
}
type jsonConfig struct {
MaxPinQueueSize int `json:"max_pin_queue_size"`
MaxPinQueueSize int `json:"max_pin_queue_size,omitempty"`
ConcurrentPins int `json:"concurrent_pins"`
}
@ -103,8 +103,12 @@ func (cfg *Config) ToJSON() ([]byte, error) {
}
func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{
MaxPinQueueSize: cfg.MaxPinQueueSize,
ConcurrentPins: cfg.ConcurrentPins,
jCfg := &jsonConfig{
ConcurrentPins: cfg.ConcurrentPins,
}
if cfg.MaxPinQueueSize != DefaultMaxPinQueueSize {
jCfg.MaxPinQueueSize = cfg.MaxPinQueueSize
}
return jCfg
}

View File

@ -11,6 +11,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pintracker/optracker"
"github.com/ipfs/ipfs-cluster/pintracker/util"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
@ -176,7 +177,7 @@ func (spt *Tracker) enqueue(ctx context.Context, c *api.Pin, typ optracker.Opera
select {
case ch <- op:
default:
err := errors.New("queue is full")
err := util.ErrFullQueue
op.SetError(err)
op.Cancel()
logger.Error(err.Error())

View File

@ -1,11 +1,18 @@
package util
import (
"errors"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-core/peer"
)
var (
// ErrFullQueue is the error used when pin or unpin operation channel is full.
ErrFullQueue = errors.New("pin/unpin operation queue is full (too many operations), increasing max_pin_queue_size would help")
)
// IsRemotePin determines whether a Pin's ReplicationFactor has
// been met, so as to either pin or unpin it from the peer.
func IsRemotePin(c *api.Pin, pid peer.ID) bool {