From 0151f5e312ddcc50763e7a2155e7cd4d2bd9a606 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Wed, 8 Aug 2018 21:11:26 +0200 Subject: [PATCH] Fixes for adding: set default timeouts to 0. Improve flags and param names. License: MIT Signed-off-by: Hector Sanjuan --- api/add.go | 25 +++++-- api/rest/client/client.go | 11 +-- api/rest/client/methods.go | 2 +- api/rest/config.go | 4 +- api/rest/restapi.go | 19 +++-- cluster.go | 4 +- config_test.go | 8 +- ipfs-cluster-ctl/formatters.go | 10 +-- ipfs-cluster-ctl/main.go | 130 ++++++++++++++++++++++----------- ipfsconn/ipfshttp/config.go | 4 +- ipfsconn/ipfshttp/ipfshttp.go | 1 + rpc_api.go | 16 +++- test/rpc_api_mock.go | 2 +- 13 files changed, 152 insertions(+), 84 deletions(-) diff --git a/api/add.go b/api/add.go index 0eb9cbce..b58e31fe 100644 --- a/api/add.go +++ b/api/add.go @@ -30,6 +30,7 @@ type AddParams struct { Chunker string RawLeaves bool Hidden bool + Wrap bool Shard bool } @@ -40,6 +41,7 @@ func DefaultAddParams() *AddParams { Chunker: "size-262144", RawLeaves: false, Hidden: false, + Wrap: false, Shard: false, PinOptions: PinOptions{ ReplicationFactorMin: 0, @@ -93,7 +95,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { name := query.Get("name") params.Name = name - err := parseBoolParam(query, "raw", ¶ms.RawLeaves) + err := parseBoolParam(query, "raw-leaves", ¶ms.RawLeaves) if err != nil { return nil, err } @@ -101,20 +103,24 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { if err != nil { return nil, err } + err = parseBoolParam(query, "wrap-with-directory", ¶ms.Wrap) + if err != nil { + return nil, err + } err = parseBoolParam(query, "shard", ¶ms.Shard) if err != nil { return nil, err } - err = parseIntParam(query, "repl_min", ¶ms.ReplicationFactorMin) + err = parseIntParam(query, "replication-min", ¶ms.ReplicationFactorMin) if err != nil { return nil, err } - err = parseIntParam(query, "repl_max", ¶ms.ReplicationFactorMax) + err = parseIntParam(query, "replication-max", ¶ms.ReplicationFactorMax) if err != nil { return nil, err } - if v := query.Get("shard_size"); v != "" { + if v := query.Get("shard-size"); v != "" { shardSize, err := strconv.ParseUint(v, 10, 64) if err != nil { return nil, errors.New("parameter shard_size is invalid") @@ -127,9 +133,10 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) { // ToQueryString returns a url query string (key=value&key2=value2&...) func (p *AddParams) ToQueryString() string { - fmtStr := "repl_min=%d&repl_max=%d&name=%s&" - fmtStr += "shard=%t&shard_size=%d&" - fmtStr += "layout=%s&chunker=%s&raw=%t&hidden=%t" + fmtStr := "replication-min=%d&replication-max=%d&name=%s&" + fmtStr += "shard=%t&shard-size=%d&" + fmtStr += "layout=%s&chunker=%s&raw-leaves=%t&hidden=%t&" + fmtStr += "wrap-with-directory=%t" query := fmt.Sprintf( fmtStr, p.ReplicationFactorMin, @@ -141,6 +148,7 @@ func (p *AddParams) ToQueryString() string { p.Chunker, p.RawLeaves, p.Hidden, + p.Wrap, ) return query } @@ -155,5 +163,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool { p.Layout == p2.Layout && p.Chunker == p2.Chunker && p.RawLeaves == p2.RawLeaves && - p.Hidden == p2.Hidden + p.Hidden == p2.Hidden && + p.Wrap == p2.Wrap } diff --git a/api/rest/client/client.go b/api/rest/client/client.go index f22a4c3e..9e839b39 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -17,10 +17,11 @@ import ( // Configuration defaults var ( - DefaultTimeout = 120 * time.Second + DefaultTimeout = 0 DefaultAPIAddr = "/ip4/127.0.0.1/tcp/9094" DefaultLogLevel = "info" DefaultProxyPort = 9095 + ResolveTimeout = 30 * time.Second ) var loggingFacility = "apiclient" @@ -96,10 +97,6 @@ func NewClient(cfg *Config) (*Client, error) { config: cfg, } - if client.config.Timeout == 0 { - client.config.Timeout = DefaultTimeout - } - err := client.setupHTTPClient() if err != nil { return nil, err @@ -165,7 +162,7 @@ func (c *Client) setupHostname() error { // Taken care of in setupHTTPClient case c.config.APIAddr != nil: // Resolve multiaddress just in case and extract host:port - resolveCtx, cancel := context.WithTimeout(c.ctx, c.config.Timeout) + resolveCtx, cancel := context.WithTimeout(c.ctx, ResolveTimeout) defer cancel() resolved, err := madns.Resolve(resolveCtx, c.config.APIAddr) if err != nil { @@ -209,7 +206,7 @@ func (c *Client) setupProxy() error { return errors.New("cannot find proxy address") } - ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout) + ctx, cancel := context.WithTimeout(c.ctx, ResolveTimeout) defer cancel() resolved, err := madns.Resolve(ctx, paddr) if err != nil { diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index 97b52725..6f2fd28e 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -64,7 +64,7 @@ func (c *Client) Pin(ci *cid.Cid, replicationFactorMin, replicationFactorMax int err := c.do( "POST", fmt.Sprintf( - "/pins/%s?replication_factor_min=%d&replication_factor_max=%d&name=%s", + "/pins/%s?replication-min=%d&replication-max=%d&name=%s", ci.String(), replicationFactorMin, replicationFactorMax, diff --git a/api/rest/config.go b/api/rest/config.go index 20a16a46..5902d1e6 100644 --- a/api/rest/config.go +++ b/api/rest/config.go @@ -21,9 +21,9 @@ const configKey = "restapi" // These are the default values for Config const ( DefaultHTTPListenAddr = "/ip4/127.0.0.1/tcp/9094" - DefaultReadTimeout = 30 * time.Second + DefaultReadTimeout = 0 DefaultReadHeaderTimeout = 5 * time.Second - DefaultWriteTimeout = 60 * time.Second + DefaultWriteTimeout = 0 DefaultIdleTimeout = 120 * time.Second ) diff --git a/api/rest/restapi.go b/api/rest/restapi.go index ea83dc90..9542162a 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -539,8 +539,8 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) { } }() - add := adder.New(api.ctx, dags, params, output) - c, err := add.FromMultipart(reader) + add := adder.New(dags, params, output) + c, err := add.FromMultipart(api.ctx, reader) _ = c wg.Wait() @@ -838,9 +838,18 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial { name := queryValues.Get("name") pin.Name = name pin.MaxDepth = -1 // For now, all pins are recursive - rplStr := queryValues.Get("replication_factor") - rplStrMin := queryValues.Get("replication_factor_min") - rplStrMax := queryValues.Get("replication_factor_max") + rplStr := queryValues.Get("replication") + if rplStr == "" { // compat <= 0.4.0 + rplStr = queryValues.Get("replication_factor") + } + rplStrMin := queryValues.Get("replication-min") + if rplStrMin == "" { // compat <= 0.4.0 + rplStrMin = queryValues.Get("replication_factor_min") + } + rplStrMax := queryValues.Get("replication-max") + if rplStrMax == "" { // compat <= 0.4.0 + rplStrMax = queryValues.Get("replication_factor_max") + } if rplStr != "" { // override rplStrMin = rplStr rplStrMax = rplStr diff --git a/cluster.go b/cluster.go index cfd2e2aa..5c90ab2d 100644 --- a/cluster.go +++ b/cluster.go @@ -1119,8 +1119,8 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (*cid } else { dags = local.New(c.rpcClient, params.PinOptions) } - add := adder.New(c.ctx, dags, params, nil) - return add.FromMultipart(reader) + add := adder.New(dags, params, nil) + return add.FromMultipart(c.ctx, reader) } // Version returns the current IPFS Cluster version. diff --git a/config_test.go b/config_test.go index e80e6609..0d96528f 100644 --- a/config_test.go +++ b/config_test.go @@ -46,9 +46,9 @@ var testingRaftCfg = []byte(`{ var testingAPICfg = []byte(`{ "http_listen_multiaddress": "/ip4/127.0.0.1/tcp/10002", - "read_timeout": "30s", + "read_timeout": "0", "read_header_timeout": "5s", - "write_timeout": "1m0s", + "write_timeout": "0", "idle_timeout": "2m0s" }`) @@ -56,9 +56,9 @@ var testingIpfsCfg = []byte(`{ "proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/10001", "node_multiaddress": "/ip4/127.0.0.1/tcp/5001", "connect_swarms_delay": "7s", - "proxy_read_timeout": "10m0s", + "proxy_read_timeout": "0", "proxy_read_header_timeout": "10m0s", - "proxy_write_timeout": "10m0s", + "proxy_write_timeout": "0", "proxy_idle_timeout": "1m0s", "pin_method": "pin", "pin_timeout": "30s", diff --git a/ipfs-cluster-ctl/formatters.go b/ipfs-cluster-ctl/formatters.go index c3d4a45a..5c0c3f42 100644 --- a/ipfs-cluster-ctl/formatters.go +++ b/ipfs-cluster-ctl/formatters.go @@ -210,13 +210,11 @@ func textFormatPrintPin(obj *api.PinSerial) { } func textFormatPrintAddedOutput(obj *api.AddedOutput) { - if obj.Hash != "" { - if obj.Quiet { - fmt.Printf("%s\n", obj.Hash) - } else { - fmt.Printf("adding %s %s\n", obj.Hash, obj.Name) - } + if obj.Error.Message != "" { + fmt.Println(obj.Error.Error()) + return } + fmt.Printf("added %s %s\n", obj.Hash, obj.Name) } func textFormatPrintError(obj *api.Error) { diff --git a/ipfs-cluster-ctl/main.go b/ipfs-cluster-ctl/main.go index cfeb546d..38650c8a 100644 --- a/ipfs-cluster-ctl/main.go +++ b/ipfs-cluster-ctl/main.go @@ -8,6 +8,7 @@ import ( "io" "os" "strings" + "sync" "time" "github.com/ipfs/ipfs-cluster/api" @@ -29,7 +30,7 @@ const Version = "0.4.0" var ( defaultHost = "/ip4/127.0.0.1/tcp/9094" - defaultTimeout = 120 + defaultTimeout = 0 defaultUsername = "" defaultPassword = "" defaultWaitCheckFreq = time.Second @@ -244,23 +245,25 @@ cluster peers. Usage: "Add a file or directory to ipfs and pin it in the cluster", ArgsUsage: "", Description: ` -Adds allows to add and replicate content to several ipfs daemons, performing +Add allows to add and replicate content to several ipfs daemons, performing a Cluster Pin operation on success. Cluster Add is equivalent to "ipfs add" in terms of DAG building, and supports the same options for adjusting the chunker, the DAG layout etc. It will, -however, send send the content directly to the destination ipfs daemons -to which it is allocated. This may not be the local daemon (depends on the -allocator). Once the adding process is finished, the content has been fully +allocate the cluster pin first and then send the content directly to the +allocated peers. + +This may not be the local daemon (depends on the allocator). Once the + adding process is finished, the content has been fully added to all allocations and pinned in them. This makes cluster add slower -than a local ipfs add. +than a local ipfs add, but the result is a fully replicated CID on completion. Cluster Add supports handling huge files and sharding the resulting DAG among -several ipfs daemons (--shard). In this case, a single ipfs daemon will not +several ipfs daemons (--shard). In this case, a single ipfs daemon will not contain the full dag, but only parts of it (shards). Desired shard size can be provided with the --shard-size flag. -We recommend setting a --name for sharded pins. Otherwise, it will be +We recommend setting a --name for sharded pins. Otherwise, it will be automatically generated. `, Flags: []cli.Flag{ @@ -268,6 +271,36 @@ automatically generated. Name: "recursive, r", Usage: "Add directory paths recursively", }, + cli.BoolFlag{ + Name: "quiet, q", + Usage: "Write only hashes to output (one per line)", + }, + cli.BoolFlag{ + Name: "quieter, Q", + Usage: "Write only final hash to output", + }, + cli.StringFlag{ + Name: "layout", + Value: defaultAddParams.Layout, + Usage: "Dag layout to use for dag generation: balanced or trickle", + }, + cli.BoolFlag{ + Name: "wrap-with-directory, w", + Usage: "Wrap files with a directory object", + }, + cli.BoolFlag{ + Name: "hidden, H", + Usage: "Include files that are hidden. Only takes effect on recursive add", + }, + cli.StringFlag{ + Name: "chunker, s", + Usage: "'size-' or 'rabin---'", + Value: defaultAddParams.Chunker, + }, + cli.BoolFlag{ + Name: "raw-leaves", + Usage: "Use raw blocks for leaves (experimental)", + }, cli.StringFlag{ Name: "name, n", Value: defaultAddParams.Name, @@ -283,41 +316,22 @@ automatically generated. Value: defaultAddParams.ReplicationFactorMax, Usage: "Sets the maximum replication factor for pinning this file", }, - cli.BoolFlag{ - Name: "shard", - Usage: "Break the file into pieces (shards) and distributed among peers", - }, - cli.Uint64Flag{ - Name: "shard-size", - Value: defaultAddParams.ShardSize, - Usage: "Sets the maximum replication factor for pinning this file", - }, + // TODO: Uncomment when sharding is supported. // cli.BoolFlag{ - // Name: "only-hashes", - // Usage: "Write newline separated list of hashes to output", + // Name: "shard", + // Usage: "Break the file into pieces (shards) and distributed among peers", // }, - cli.StringFlag{ - Name: "layout, L", - Value: defaultAddParams.Layout, - Usage: "Dag layout to use for dag generation: balanced or trickle", - }, - cli.StringFlag{ - Name: "chunker, s", - Usage: "Chunker selection. Fixed block size: 'size-', or rabin chunker: 'rabin---'", - Value: defaultAddParams.Chunker, - }, - cli.BoolFlag{ - Name: "raw-leaves", - Usage: "Use raw blocks for leaves (experimental)", - }, + // cli.Uint64Flag{ + // Name: "shard-size", + // Value: defaultAddParams.ShardSize, + // Usage: "Sets the maximum replication factor for pinning this file", + // }, + // TODO: Figure progress over total bar. // cli.BoolFlag{ - // Name: "progress, p", - // Usage: "Stream progress data", + // Name: "progress, p", + // Usage: "Stream progress data", // }, - cli.BoolFlag{ - Name: "hidden, H", - Usage: "Include files that are hidden. Only takes effect on recursive add", - }, + }, Action: func(c *cli.Context) error { shard := c.Bool("shard") @@ -347,17 +361,45 @@ automatically generated. p.ReplicationFactorMin = c.Int("replication-min") p.ReplicationFactorMax = c.Int("replication-max") p.Name = name - p.Shard = shard - p.ShardSize = c.Uint64("shard-size") + //p.Shard = shard + //p.ShardSize = c.Uint64("shard-size") + p.Shard = false p.Layout = c.String("layout") p.Chunker = c.String("chunker") p.RawLeaves = c.Bool("raw-leaves") p.Hidden = c.Bool("hidden") + p.Wrap = c.Bool("wrap-with-directory") || len(paths) > 1 out := make(chan *api.AddedOutput, 1) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() + var last string for v := range out { - formatResponse(c, v, nil) + // Print everything when doing json + if c.GlobalString("encoding") != "text" { + formatResponse(c, *v, nil) + continue + } + + // Print last hash only + if c.Bool("quieter") { + last = v.Hash + continue + } + + // Print hashes only + if c.Bool("quiet") { + fmt.Println(v.Hash) + continue + } + + // Format normal text representation of AddedOutput + formatResponse(c, *v, nil) + } + if last != "" { + fmt.Println(last) } }() @@ -366,9 +408,9 @@ automatically generated. p, out, ) - + wg.Wait() formatResponse(c, nil, cerr) - return nil + return cerr }, }, { diff --git a/ipfsconn/ipfshttp/config.go b/ipfsconn/ipfshttp/config.go index 37fca0ff..ce050b74 100644 --- a/ipfsconn/ipfshttp/config.go +++ b/ipfsconn/ipfshttp/config.go @@ -18,9 +18,9 @@ const ( DefaultProxyAddr = "/ip4/127.0.0.1/tcp/9095" DefaultNodeAddr = "/ip4/127.0.0.1/tcp/5001" DefaultConnectSwarmsDelay = 30 * time.Second - DefaultProxyReadTimeout = 10 * time.Minute + DefaultProxyReadTimeout = 0 DefaultProxyReadHeaderTimeout = 5 * time.Second - DefaultProxyWriteTimeout = 10 * time.Minute + DefaultProxyWriteTimeout = 0 DefaultProxyIdleTimeout = 60 * time.Second DefaultPinMethod = "refs" DefaultIPFSRequestTimeout = 5 * time.Minute diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 42a2ee6a..ab8eb8d9 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -991,6 +991,7 @@ func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) { // BlockPut triggers an ipfs block put on the given data, inserting the block // into the ipfs daemon's repo. func (ipfs *Connector) BlockPut(b api.NodeWithMeta) error { + logger.Debugf("putting block to IPFS: %s", b.Cid) ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) defer cancel() defer ipfs.updateInformerMetric() diff --git a/rpc_api.go b/rpc_api.go index 8e86a86e..937b2c26 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -192,13 +192,25 @@ func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in api.PinSerial, out *a return err } -// Allocate returns allocations for the given Pin. -func (rpcapi *RPCAPI) Allocate(ctx context.Context, in api.PinSerial, out *[]string) error { +// Allocate returns allocations for blocks. This is used in the adders. +// It's different from pin allocations when ReplicationFactor < 0. +func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in api.PinSerial, out *[]string) error { pin := in.ToPin() err := rpcapi.c.setupPin(&pin) if err != nil { return err } + + // Return the current peer list. + if pin.ReplicationFactorMin < 0 { + peers, err := rpcapi.c.consensus.Peers() + if err != nil { + return err + } + *out = api.PeersToStrings(peers) + return nil + } + allocs, err := rpcapi.c.allocate( pin.Cid, pin.ReplicationFactorMin, diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index b8587da0..56468ff6 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -238,7 +238,7 @@ func (mock *mockService) RecoverLocal(ctx context.Context, in api.PinSerial, out return mock.TrackerRecover(ctx, in, out) } -func (mock *mockService) Allocate(ctx context.Context, in api.PinSerial, out *[]string) error { +func (mock *mockService) BlockAllocate(ctx context.Context, in api.PinSerial, out *[]string) error { if in.ReplicationFactorMin > 1 { return errors.New("replMin too high: can only mock-allocate to 1") }