diff --git a/.codeclimate.yml b/.codeclimate.yml index 869d5a30..fc7d4a16 100644 --- a/.codeclimate.yml +++ b/.codeclimate.yml @@ -8,7 +8,7 @@ checks: threshold: 500 method-complexity: config: - threshold: 12 + threshold: 15 method-lines: config: threshold: 80 @@ -16,7 +16,10 @@ checks: enabled: false return-statements: config: - threshold: 10 + threshold: 10 + argument-count: + config: + threshold: 6 engines: fixme: diff --git a/adder/importer.go b/adder/importer.go index 9fe8ec87..185cba57 100644 --- a/adder/importer.go +++ b/adder/importer.go @@ -74,6 +74,46 @@ func (imp *Importer) start() bool { return !retVal } +func (imp *Importer) addFile(ipfsAdder *ipfsadd.Adder) error { + f, err := imp.files.NextFile() + if err != nil { + return err + } + + logger.Debugf("ipfsAdder AddFile(%s)", f.FullPath()) + return ipfsAdder.AddFile(f) +} + +func (imp *Importer) addFiles(ctx context.Context, ipfsAdder *ipfsadd.Adder) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(imp.output) + defer close(imp.blocks) + defer close(imp.errors) + + for { + select { + case <-ctx.Done(): + imp.errors <- ctx.Err() + return + default: + err := imp.addFile(ipfsAdder) + if err != nil { + if err == io.EOF { + goto FINALIZE + } + imp.errors <- err + return + } + } + } +FINALIZE: + _, err := ipfsAdder.Finalize() + if err != nil { + imp.errors <- err + } +} + // Go starts a goroutine which reads the blocks as outputted by the // ipfsadd module called with the parameters of this importer. The blocks, // errors and output are placed in the respective importer channels for @@ -98,41 +138,7 @@ func (imp *Importer) Go(ctx context.Context) error { ipfsAdder.Chunker = imp.params.Chunker ipfsAdder.Out = imp.output - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(imp.output) - defer close(imp.blocks) - defer close(imp.errors) - - for { - select { - case <-ctx.Done(): - imp.errors <- ctx.Err() - return - default: - f, err := imp.files.NextFile() - if err != nil { - if err == io.EOF { - goto FINALIZE // time to finalize - } - imp.errors <- err - return - } - - logger.Debugf("ipfsAdder AddFile(%s)", f.FullPath()) - if err := ipfsAdder.AddFile(f); err != nil { - imp.errors <- err - return - } - } - } - FINALIZE: - _, err := ipfsAdder.Finalize() - if err != nil { - imp.errors <- err - } - }() + go imp.addFiles(ctx, ipfsAdder) return nil } diff --git a/adder/params.go b/adder/params.go index 98de298c..ad428822 100644 --- a/adder/params.go +++ b/adder/params.go @@ -2,6 +2,7 @@ package adder import ( "errors" + "fmt" "net/url" "strconv" @@ -40,6 +41,28 @@ func DefaultParams() *Params { } } +func parseBoolParam(q url.Values, name string, dest *bool) error { + if v := q.Get(name); v != "" { + b, err := strconv.ParseBool(v) + if err != nil { + return fmt.Errorf("parameter %s invalid", name) + } + *dest = b + } + return nil +} + +func parseIntParam(q url.Values, name string, dest *int) error { + if v := q.Get(name); v != "" { + i, err := strconv.Atoi(v) + if err != nil { + return fmt.Errorf("parameter %s invalid", name) + } + *dest = i + } + return nil +} + // ParamsFromQuery parses the Params object from // a URL.Query(). func ParamsFromQuery(query url.Values) (*Params, error) { @@ -61,44 +84,25 @@ func ParamsFromQuery(query url.Values) (*Params, error) { name := query.Get("name") params.Name = name - if v := query.Get("raw"); v != "" { - raw, err := strconv.ParseBool(v) - if err != nil { - return nil, errors.New("parameter raw invalid") - } - params.RawLeaves = raw + err := parseBoolParam(query, "raw", ¶ms.RawLeaves) + if err != nil { + return nil, err } - - if v := query.Get("hidden"); v != "" { - hidden, err := strconv.ParseBool(v) - if err != nil { - return nil, errors.New("parameter hidden invalid") - } - params.Hidden = hidden + err = parseBoolParam(query, "hidden", ¶ms.Hidden) + if err != nil { + return nil, err } - - if v := query.Get("shard"); v != "" { - shard, err := strconv.ParseBool(v) - if err != nil { - return nil, errors.New("parameter shard invalid") - } - params.Shard = shard + err = parseBoolParam(query, "shard", ¶ms.Shard) + if err != nil { + return nil, err } - - if v := query.Get("repl_min"); v != "" { - replMin, err := strconv.Atoi(v) - if err != nil || replMin < -1 { - return nil, errors.New("parameter repl_min invalid") - } - params.ReplicationFactorMin = replMin + err = parseIntParam(query, "repl_min", ¶ms.ReplicationFactorMin) + if err != nil { + return nil, err } - - if v := query.Get("repl_max"); v != "" { - replMax, err := strconv.Atoi(v) - if err != nil || replMax < -1 { - return nil, errors.New("parameter repl_max invalid") - } - params.ReplicationFactorMax = replMax + err = parseIntParam(query, "repl_max", ¶ms.ReplicationFactorMax) + if err != nil { + return nil, err } if v := query.Get("shard_size"); v != "" { diff --git a/adder/sharding/cluster_dag_builder.go b/adder/sharding/cluster_dag_builder.go index de48dda7..ee8772c4 100644 --- a/adder/sharding/cluster_dag_builder.go +++ b/adder/sharding/cluster_dag_builder.go @@ -188,32 +188,39 @@ func (cdb *clusterDAGBuilder) finalize() error { return nil } +// returns the value for continue in ingestBlocks() +func (cdb *clusterDAGBuilder) handleBlock(n *api.NodeWithMeta, more bool) bool { + if !more { + err := cdb.finalize() + if err != nil { + logger.Error(err) + cdb.error = err + } + return false + } + err := cdb.ingestBlock(n) + if err != nil { + logger.Error(err) + cdb.error = err + return false + } + return true +} + func (cdb *clusterDAGBuilder) ingestBlocks() { // if this function returns, it means we are Done(). // we auto-cancel ourselves in that case. // if it was due to an error, it will be in Err(). defer cdb.Cancel() - for { + cont := true + + for cont { select { case <-cdb.ctx.Done(): // cancelled from outside return case n, ok := <-cdb.blocks: - if !ok { - err := cdb.finalize() - if err != nil { - logger.Error(err) - cdb.error = err - } - return // will cancel on defer - } - err := cdb.ingestBlock(n) - if err != nil { - logger.Error(err) - cdb.error = err - return // will cancel on defer - } - // continue with next block + cont = cdb.handleBlock(n, ok) } } } diff --git a/adder/sharding/dag.go b/adder/sharding/dag.go index 6553853e..b7fb1163 100644 --- a/adder/sharding/dag.go +++ b/adder/sharding/dag.go @@ -65,6 +65,17 @@ func CborDataToNode(raw []byte, format string) (ipld.Node, error) { return shardNode, nil } +func makeDAGSimple(dagObj map[string]*cid.Cid) (ipld.Node, error) { + node, err := cbor.WrapObject( + dagObj, + hashFn, mh.DefaultLengths[hashFn], + ) + if err != nil { + return nil, err + } + return node, err +} + // makeDAG parses a shardObj which stores all of the node-links a shardDAG // is responsible for tracking. In general a single node of links may exceed // the capacity of an ipfs block. In this case an indirect node in the @@ -75,14 +86,8 @@ func CborDataToNode(raw []byte, format string) (ipld.Node, error) { func makeDAG(dagObj map[string]*cid.Cid) ([]ipld.Node, error) { // No indirect node if len(dagObj) <= MaxLinks { - node, err := cbor.WrapObject( - dagObj, - hashFn, mh.DefaultLengths[hashFn], - ) - if err != nil { - return nil, err - } - return []ipld.Node{node}, err + n, err := makeDAGSimple(dagObj) + return []ipld.Node{n}, err } // Indirect node required leafNodes := make([]ipld.Node, 0) // shardNodes with links to data @@ -100,16 +105,14 @@ func makeDAG(dagObj map[string]*cid.Cid) ([]ipld.Node, error) { } leafObj[fmt.Sprintf("%d", j)] = c } - leafNode, err := cbor.WrapObject(leafObj, hashFn, - mh.DefaultLengths[hashFn]) + leafNode, err := makeDAGSimple(leafObj) if err != nil { return nil, err } indirectObj[fmt.Sprintf("%d", i)] = leafNode.Cid() leafNodes = append(leafNodes, leafNode) } - indirectNode, err := cbor.WrapObject(indirectObj, hashFn, - mh.DefaultLengths[hashFn]) + indirectNode, err := makeDAGSimple(indirectObj) if err != nil { return nil, err } diff --git a/adder/sharding/verify.go b/adder/sharding/verify.go index 83fd5546..257a508b 100644 --- a/adder/sharding/verify.go +++ b/adder/sharding/verify.go @@ -22,7 +22,8 @@ type MockBlockStore interface { } // VerifyShards checks that a sharded CID has been correctly formed and stored. -// This is a helper function for testing. +// This is a helper function for testing. It returns a map with all the blocks +// from all shards. func VerifyShards(t *testing.T, rootCid *cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) { metaPin, err := pins.PinGet(rootCid) if err != nil { diff --git a/api/rest/client/request.go b/api/rest/client/request.go index 96cf6820..b7957210 100644 --- a/api/rest/client/request.go +++ b/api/rest/client/request.go @@ -40,7 +40,7 @@ func (c *Client) doRequest(method, path string, body io.Reader) (*http.Response, // eventually we may want to trigger streaming with a boolean flag in // a single doRequest function to prevent code duplication (same for do) func (c *Client) doStreamRequest(method, path string, body io.Reader, headers map[string]string) (*http.Response, error) { - urlpath := c.net + "://" + c.hostname + "/" + strings.TrimPrefix(path, "/") + urlpath := c.net + "://" + c.hostname + "/" + strings.TrimPrefix(path, "/") logger.Debugf("%s: %s", method, urlpath) r, err := http.NewRequest(method, urlpath, body) @@ -65,7 +65,7 @@ func (c *Client) doStreamRequest(method, path string, body io.Reader, headers ma return c.client.Do(r) } -func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) (error) { +func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) error { resp, err := c.doStreamRequest(method, path, body, headers) if err != nil { return &api.Error{Code: 0, Message: err.Error()} diff --git a/cluster.go b/cluster.go index 6e8afc21..44532121 100644 --- a/cluster.go +++ b/cluster.go @@ -917,11 +917,8 @@ func (c *Cluster) Pin(pin api.Pin) error { return err } -// setupPin ensures that the Pin object is fit for pinning. We check -// and set the replication factors and ensure that the pinType matches the -// metadata consistently. -func (c *Cluster) setupPin(pin *api.Pin) error { - // Determine repl factors +// sets the default replication factor in a pin when it's set to 0 +func (c *Cluster) setupReplicationFactor(pin *api.Pin) error { rplMin := pin.ReplicationFactorMin rplMax := pin.ReplicationFactorMax if rplMin == 0 { @@ -933,22 +930,11 @@ func (c *Cluster) setupPin(pin *api.Pin) error { pin.ReplicationFactorMax = rplMax } - if err := isReplicationFactorValid(rplMin, rplMax); err != nil { - return err - } - - // We ensure that if the given pin exists already, it is not of - // different type (i.e. sharding and already locally pinned item) - var existing *api.Pin - cState, err := c.consensus.State() - if err == nil && pin.Cid != nil && cState.Has(pin.Cid) { - pinTmp := cState.Get(pin.Cid) - existing = &pinTmp - if existing.Type != pin.Type { - return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed") - } - } + return isReplicationFactorValid(rplMin, rplMax) +} +// basic checks on the pin type to check it's well-formed. +func checkPinType(pin *api.Pin) error { switch pin.Type { case api.DataType: if pin.Reference != nil { @@ -958,24 +944,13 @@ func (c *Cluster) setupPin(pin *api.Pin) error { if pin.MaxDepth != 1 { return errors.New("must pin shards go depth 1") } - //if pin.Reference != nil { - // return errors.New("shard pin should not reference cdag") + // FIXME: indirect shard pins could have max-depth 2 + // FIXME: repinning a shard type will overwrite replication + // factor from previous: + // if existing.ReplicationFactorMin != rplMin || + // existing.ReplicationFactorMax != rplMax { + // return errors.New("shard update with wrong repl factors") //} - if existing == nil { - return nil - } - - // State already tracks pin's CID - // For now all repins of the same shard must use the same - // replmax and replmin. It is unclear what the best UX is here - // especially if the same Shard is referenced in multiple - // clusterdags. This simplistic policy avoids complexity and - // suits existing needs for shard pins. - // Safest idea: use the largest min and max - if existing.ReplicationFactorMin != rplMin || - existing.ReplicationFactorMax != rplMax { - return errors.New("shard update with wrong repl factors") - } case api.ClusterDAGType: if pin.MaxDepth != 0 { return errors.New("must pin roots directly") @@ -997,6 +972,30 @@ func (c *Cluster) setupPin(pin *api.Pin) error { return nil } +// setupPin ensures that the Pin object is fit for pinning. We check +// and set the replication factors and ensure that the pinType matches the +// metadata consistently. +func (c *Cluster) setupPin(pin *api.Pin) error { + err := c.setupReplicationFactor(pin) + if err != nil { + return err + } + + // We ensure that if the given pin exists already, it is not of + // different type (i.e. sharding and already locally pinned item) + var existing *api.Pin + cState, err := c.consensus.State() + if err == nil && pin.Cid != nil && cState.Has(pin.Cid) { + pinTmp := cState.Get(pin.Cid) + existing = &pinTmp + if existing.Type != pin.Type { + return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed") + } + } + + return checkPinType(pin) +} + // pin performs the actual pinning and supports a blacklist to be // able to evacuate a node and returns whether the pin was submitted // to the consensus layer or skipped (due to error or to the fact