Fixes for adding: set default timeouts to 0. Improve flags and param names.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-08-08 21:11:26 +02:00
parent 0ed14a4b39
commit 0151f5e312
13 changed files with 152 additions and 84 deletions

View File

@ -30,6 +30,7 @@ type AddParams struct {
Chunker string
RawLeaves bool
Hidden bool
Wrap bool
Shard bool
}
@ -40,6 +41,7 @@ func DefaultAddParams() *AddParams {
Chunker: "size-262144",
RawLeaves: false,
Hidden: false,
Wrap: false,
Shard: false,
PinOptions: PinOptions{
ReplicationFactorMin: 0,
@ -93,7 +95,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
name := query.Get("name")
params.Name = name
err := parseBoolParam(query, "raw", &params.RawLeaves)
err := parseBoolParam(query, "raw-leaves", &params.RawLeaves)
if err != nil {
return nil, err
}
@ -101,20 +103,24 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
if err != nil {
return nil, err
}
err = parseBoolParam(query, "wrap-with-directory", &params.Wrap)
if err != nil {
return nil, err
}
err = parseBoolParam(query, "shard", &params.Shard)
if err != nil {
return nil, err
}
err = parseIntParam(query, "repl_min", &params.ReplicationFactorMin)
err = parseIntParam(query, "replication-min", &params.ReplicationFactorMin)
if err != nil {
return nil, err
}
err = parseIntParam(query, "repl_max", &params.ReplicationFactorMax)
err = parseIntParam(query, "replication-max", &params.ReplicationFactorMax)
if err != nil {
return nil, err
}
if v := query.Get("shard_size"); v != "" {
if v := query.Get("shard-size"); v != "" {
shardSize, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return nil, errors.New("parameter shard_size is invalid")
@ -127,9 +133,10 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
// ToQueryString returns a url query string (key=value&key2=value2&...)
func (p *AddParams) ToQueryString() string {
fmtStr := "repl_min=%d&repl_max=%d&name=%s&"
fmtStr += "shard=%t&shard_size=%d&"
fmtStr += "layout=%s&chunker=%s&raw=%t&hidden=%t"
fmtStr := "replication-min=%d&replication-max=%d&name=%s&"
fmtStr += "shard=%t&shard-size=%d&"
fmtStr += "layout=%s&chunker=%s&raw-leaves=%t&hidden=%t&"
fmtStr += "wrap-with-directory=%t"
query := fmt.Sprintf(
fmtStr,
p.ReplicationFactorMin,
@ -141,6 +148,7 @@ func (p *AddParams) ToQueryString() string {
p.Chunker,
p.RawLeaves,
p.Hidden,
p.Wrap,
)
return query
}
@ -155,5 +163,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool {
p.Layout == p2.Layout &&
p.Chunker == p2.Chunker &&
p.RawLeaves == p2.RawLeaves &&
p.Hidden == p2.Hidden
p.Hidden == p2.Hidden &&
p.Wrap == p2.Wrap
}

View File

@ -17,10 +17,11 @@ import (
// Configuration defaults
var (
DefaultTimeout = 120 * time.Second
DefaultTimeout = 0
DefaultAPIAddr = "/ip4/127.0.0.1/tcp/9094"
DefaultLogLevel = "info"
DefaultProxyPort = 9095
ResolveTimeout = 30 * time.Second
)
var loggingFacility = "apiclient"
@ -96,10 +97,6 @@ func NewClient(cfg *Config) (*Client, error) {
config: cfg,
}
if client.config.Timeout == 0 {
client.config.Timeout = DefaultTimeout
}
err := client.setupHTTPClient()
if err != nil {
return nil, err
@ -165,7 +162,7 @@ func (c *Client) setupHostname() error {
// Taken care of in setupHTTPClient
case c.config.APIAddr != nil:
// Resolve multiaddress just in case and extract host:port
resolveCtx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
resolveCtx, cancel := context.WithTimeout(c.ctx, ResolveTimeout)
defer cancel()
resolved, err := madns.Resolve(resolveCtx, c.config.APIAddr)
if err != nil {
@ -209,7 +206,7 @@ func (c *Client) setupProxy() error {
return errors.New("cannot find proxy address")
}
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
ctx, cancel := context.WithTimeout(c.ctx, ResolveTimeout)
defer cancel()
resolved, err := madns.Resolve(ctx, paddr)
if err != nil {

View File

@ -64,7 +64,7 @@ func (c *Client) Pin(ci *cid.Cid, replicationFactorMin, replicationFactorMax int
err := c.do(
"POST",
fmt.Sprintf(
"/pins/%s?replication_factor_min=%d&replication_factor_max=%d&name=%s",
"/pins/%s?replication-min=%d&replication-max=%d&name=%s",
ci.String(),
replicationFactorMin,
replicationFactorMax,

View File

@ -21,9 +21,9 @@ const configKey = "restapi"
// These are the default values for Config
const (
DefaultHTTPListenAddr = "/ip4/127.0.0.1/tcp/9094"
DefaultReadTimeout = 30 * time.Second
DefaultReadTimeout = 0
DefaultReadHeaderTimeout = 5 * time.Second
DefaultWriteTimeout = 60 * time.Second
DefaultWriteTimeout = 0
DefaultIdleTimeout = 120 * time.Second
)

View File

@ -539,8 +539,8 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
}
}()
add := adder.New(api.ctx, dags, params, output)
c, err := add.FromMultipart(reader)
add := adder.New(dags, params, output)
c, err := add.FromMultipart(api.ctx, reader)
_ = c
wg.Wait()
@ -838,9 +838,18 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial {
name := queryValues.Get("name")
pin.Name = name
pin.MaxDepth = -1 // For now, all pins are recursive
rplStr := queryValues.Get("replication_factor")
rplStrMin := queryValues.Get("replication_factor_min")
rplStrMax := queryValues.Get("replication_factor_max")
rplStr := queryValues.Get("replication")
if rplStr == "" { // compat <= 0.4.0
rplStr = queryValues.Get("replication_factor")
}
rplStrMin := queryValues.Get("replication-min")
if rplStrMin == "" { // compat <= 0.4.0
rplStrMin = queryValues.Get("replication_factor_min")
}
rplStrMax := queryValues.Get("replication-max")
if rplStrMax == "" { // compat <= 0.4.0
rplStrMax = queryValues.Get("replication_factor_max")
}
if rplStr != "" { // override
rplStrMin = rplStr
rplStrMax = rplStr

View File

@ -1119,8 +1119,8 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (*cid
} else {
dags = local.New(c.rpcClient, params.PinOptions)
}
add := adder.New(c.ctx, dags, params, nil)
return add.FromMultipart(reader)
add := adder.New(dags, params, nil)
return add.FromMultipart(c.ctx, reader)
}
// Version returns the current IPFS Cluster version.

View File

@ -46,9 +46,9 @@ var testingRaftCfg = []byte(`{
var testingAPICfg = []byte(`{
"http_listen_multiaddress": "/ip4/127.0.0.1/tcp/10002",
"read_timeout": "30s",
"read_timeout": "0",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"write_timeout": "0",
"idle_timeout": "2m0s"
}`)
@ -56,9 +56,9 @@ var testingIpfsCfg = []byte(`{
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/10001",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_timeout": "0",
"proxy_read_header_timeout": "10m0s",
"proxy_write_timeout": "10m0s",
"proxy_write_timeout": "0",
"proxy_idle_timeout": "1m0s",
"pin_method": "pin",
"pin_timeout": "30s",

View File

@ -210,13 +210,11 @@ func textFormatPrintPin(obj *api.PinSerial) {
}
func textFormatPrintAddedOutput(obj *api.AddedOutput) {
if obj.Hash != "" {
if obj.Quiet {
fmt.Printf("%s\n", obj.Hash)
} else {
fmt.Printf("adding %s %s\n", obj.Hash, obj.Name)
}
if obj.Error.Message != "" {
fmt.Println(obj.Error.Error())
return
}
fmt.Printf("added %s %s\n", obj.Hash, obj.Name)
}
func textFormatPrintError(obj *api.Error) {

View File

@ -8,6 +8,7 @@ import (
"io"
"os"
"strings"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
@ -29,7 +30,7 @@ const Version = "0.4.0"
var (
defaultHost = "/ip4/127.0.0.1/tcp/9094"
defaultTimeout = 120
defaultTimeout = 0
defaultUsername = ""
defaultPassword = ""
defaultWaitCheckFreq = time.Second
@ -244,23 +245,25 @@ cluster peers.
Usage: "Add a file or directory to ipfs and pin it in the cluster",
ArgsUsage: "<path>",
Description: `
Adds allows to add and replicate content to several ipfs daemons, performing
Add allows to add and replicate content to several ipfs daemons, performing
a Cluster Pin operation on success.
Cluster Add is equivalent to "ipfs add" in terms of DAG building, and supports
the same options for adjusting the chunker, the DAG layout etc. It will,
however, send send the content directly to the destination ipfs daemons
to which it is allocated. This may not be the local daemon (depends on the
allocator). Once the adding process is finished, the content has been fully
allocate the cluster pin first and then send the content directly to the
allocated peers.
This may not be the local daemon (depends on the allocator). Once the
adding process is finished, the content has been fully
added to all allocations and pinned in them. This makes cluster add slower
than a local ipfs add.
than a local ipfs add, but the result is a fully replicated CID on completion.
Cluster Add supports handling huge files and sharding the resulting DAG among
several ipfs daemons (--shard). In this case, a single ipfs daemon will not
several ipfs daemons (--shard). In this case, a single ipfs daemon will not
contain the full dag, but only parts of it (shards). Desired shard size can
be provided with the --shard-size flag.
We recommend setting a --name for sharded pins. Otherwise, it will be
We recommend setting a --name for sharded pins. Otherwise, it will be
automatically generated.
`,
Flags: []cli.Flag{
@ -268,6 +271,36 @@ automatically generated.
Name: "recursive, r",
Usage: "Add directory paths recursively",
},
cli.BoolFlag{
Name: "quiet, q",
Usage: "Write only hashes to output (one per line)",
},
cli.BoolFlag{
Name: "quieter, Q",
Usage: "Write only final hash to output",
},
cli.StringFlag{
Name: "layout",
Value: defaultAddParams.Layout,
Usage: "Dag layout to use for dag generation: balanced or trickle",
},
cli.BoolFlag{
Name: "wrap-with-directory, w",
Usage: "Wrap files with a directory object",
},
cli.BoolFlag{
Name: "hidden, H",
Usage: "Include files that are hidden. Only takes effect on recursive add",
},
cli.StringFlag{
Name: "chunker, s",
Usage: "'size-<size>' or 'rabin-<min>-<avg>-<max>'",
Value: defaultAddParams.Chunker,
},
cli.BoolFlag{
Name: "raw-leaves",
Usage: "Use raw blocks for leaves (experimental)",
},
cli.StringFlag{
Name: "name, n",
Value: defaultAddParams.Name,
@ -283,41 +316,22 @@ automatically generated.
Value: defaultAddParams.ReplicationFactorMax,
Usage: "Sets the maximum replication factor for pinning this file",
},
cli.BoolFlag{
Name: "shard",
Usage: "Break the file into pieces (shards) and distributed among peers",
},
cli.Uint64Flag{
Name: "shard-size",
Value: defaultAddParams.ShardSize,
Usage: "Sets the maximum replication factor for pinning this file",
},
// TODO: Uncomment when sharding is supported.
// cli.BoolFlag{
// Name: "only-hashes",
// Usage: "Write newline separated list of hashes to output",
// Name: "shard",
// Usage: "Break the file into pieces (shards) and distributed among peers",
// },
cli.StringFlag{
Name: "layout, L",
Value: defaultAddParams.Layout,
Usage: "Dag layout to use for dag generation: balanced or trickle",
},
cli.StringFlag{
Name: "chunker, s",
Usage: "Chunker selection. Fixed block size: 'size-<size>', or rabin chunker: 'rabin-<min>-<avg>-<max>'",
Value: defaultAddParams.Chunker,
},
cli.BoolFlag{
Name: "raw-leaves",
Usage: "Use raw blocks for leaves (experimental)",
},
// cli.Uint64Flag{
// Name: "shard-size",
// Value: defaultAddParams.ShardSize,
// Usage: "Sets the maximum replication factor for pinning this file",
// },
// TODO: Figure progress over total bar.
// cli.BoolFlag{
// Name: "progress, p",
// Usage: "Stream progress data",
// Name: "progress, p",
// Usage: "Stream progress data",
// },
cli.BoolFlag{
Name: "hidden, H",
Usage: "Include files that are hidden. Only takes effect on recursive add",
},
},
Action: func(c *cli.Context) error {
shard := c.Bool("shard")
@ -347,17 +361,45 @@ automatically generated.
p.ReplicationFactorMin = c.Int("replication-min")
p.ReplicationFactorMax = c.Int("replication-max")
p.Name = name
p.Shard = shard
p.ShardSize = c.Uint64("shard-size")
//p.Shard = shard
//p.ShardSize = c.Uint64("shard-size")
p.Shard = false
p.Layout = c.String("layout")
p.Chunker = c.String("chunker")
p.RawLeaves = c.Bool("raw-leaves")
p.Hidden = c.Bool("hidden")
p.Wrap = c.Bool("wrap-with-directory") || len(paths) > 1
out := make(chan *api.AddedOutput, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var last string
for v := range out {
formatResponse(c, v, nil)
// Print everything when doing json
if c.GlobalString("encoding") != "text" {
formatResponse(c, *v, nil)
continue
}
// Print last hash only
if c.Bool("quieter") {
last = v.Hash
continue
}
// Print hashes only
if c.Bool("quiet") {
fmt.Println(v.Hash)
continue
}
// Format normal text representation of AddedOutput
formatResponse(c, *v, nil)
}
if last != "" {
fmt.Println(last)
}
}()
@ -366,9 +408,9 @@ automatically generated.
p,
out,
)
wg.Wait()
formatResponse(c, nil, cerr)
return nil
return cerr
},
},
{

View File

@ -18,9 +18,9 @@ const (
DefaultProxyAddr = "/ip4/127.0.0.1/tcp/9095"
DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001"
DefaultConnectSwarmsDelay = 30 * time.Second
DefaultProxyReadTimeout = 10 * time.Minute
DefaultProxyReadTimeout = 0
DefaultProxyReadHeaderTimeout = 5 * time.Second
DefaultProxyWriteTimeout = 10 * time.Minute
DefaultProxyWriteTimeout = 0
DefaultProxyIdleTimeout = 60 * time.Second
DefaultPinMethod = "refs"
DefaultIPFSRequestTimeout = 5 * time.Minute

View File

@ -991,6 +991,7 @@ func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) {
// BlockPut triggers an ipfs block put on the given data, inserting the block
// into the ipfs daemon's repo.
func (ipfs *Connector) BlockPut(b api.NodeWithMeta) error {
logger.Debugf("putting block to IPFS: %s", b.Cid)
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
defer ipfs.updateInformerMetric()

View File

@ -192,13 +192,25 @@ func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in api.PinSerial, out *a
return err
}
// Allocate returns allocations for the given Pin.
func (rpcapi *RPCAPI) Allocate(ctx context.Context, in api.PinSerial, out *[]string) error {
// Allocate returns allocations for blocks. This is used in the adders.
// It's different from pin allocations when ReplicationFactor < 0.
func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in api.PinSerial, out *[]string) error {
pin := in.ToPin()
err := rpcapi.c.setupPin(&pin)
if err != nil {
return err
}
// Return the current peer list.
if pin.ReplicationFactorMin < 0 {
peers, err := rpcapi.c.consensus.Peers()
if err != nil {
return err
}
*out = api.PeersToStrings(peers)
return nil
}
allocs, err := rpcapi.c.allocate(
pin.Cid,
pin.ReplicationFactorMin,

View File

@ -238,7 +238,7 @@ func (mock *mockService) RecoverLocal(ctx context.Context, in api.PinSerial, out
return mock.TrackerRecover(ctx, in, out)
}
func (mock *mockService) Allocate(ctx context.Context, in api.PinSerial, out *[]string) error {
func (mock *mockService) BlockAllocate(ctx context.Context, in api.PinSerial, out *[]string) error {
if in.ReplicationFactorMin > 1 {
return errors.New("replMin too high: can only mock-allocate to 1")
}