addressing feedback

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2018-04-26 19:40:32 -04:00 committed by Hector Sanjuan
parent f7c3dcce5b
commit 62eb45279b
4 changed files with 25 additions and 45 deletions

View File

@ -30,33 +30,29 @@ func NewAddSession(rpcClient *rpc.Client, logger logging.EventLogger) *AddSessio
}
func (a *AddSession) consumeLocalAdd(
args map[string]string,
arg string,
outObj *api.NodeWithMeta,
replMin, replMax int,
) error {
) (string, error) {
//TODO: when ipfs add starts supporting formats other than
// v0 (v1.cbor, v1.protobuf) we'll need to update this
outObj.Format = ""
args["cid"] = outObj.Cid // root node stored on last call
var hash string
err := a.rpcClient.Call(
"",
"Cluster",
"IPFSBlockPut",
*outObj,
&hash)
&hash,
)
if outObj.Cid != hash {
a.logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", outObj.Cid, hash)
}
return err
return outObj.Cid, err // root node returned in case this is last call
}
func (a *AddSession) finishLocalAdd(
args map[string]string,
replMin, replMax int,
) error {
rootCid, ok := args["cid"]
if !ok {
func (a *AddSession) finishLocalAdd(rootCid string, replMin, replMax int) error {
if rootCid == "" {
return errors.New("no record of root to pin")
}
@ -77,13 +73,10 @@ func (a *AddSession) finishLocalAdd(
}
func (a *AddSession) consumeShardAdd(
args map[string]string,
shardID string,
outObj *api.NodeWithMeta,
replMin, replMax int,
) error {
var shardID string
shardID, ok := args["id"]
) (string, error) {
outObj.ID = shardID
outObj.ReplMax = replMax
outObj.ReplMin = replMin
@ -93,43 +86,40 @@ func (a *AddSession) consumeShardAdd(
"Cluster",
"SharderAddNode",
*outObj,
&retStr)
if !ok {
args["id"] = retStr
}
return err
&retStr,
)
return retStr, err
}
func (a *AddSession) finishShardAdd(
args map[string]string,
shardID string,
replMin, replMax int,
) error {
shardID, ok := args["id"]
if !ok {
if shardID == "" {
return errors.New("bad state: shardID passed incorrectly")
}
err := a.rpcClient.Call(
return a.rpcClient.Call(
"",
"Cluster",
"SharderFinalize",
shardID,
&struct{}{},
)
return err
}
func (a *AddSession) consumeImport(ctx context.Context,
outChan <-chan *api.NodeWithMeta,
printChan <-chan *api.AddedOutput,
errChan <-chan error,
consume func(map[string]string, *api.NodeWithMeta, int, int) error,
finish func(map[string]string, int, int) error,
consume func(string, *api.NodeWithMeta, int, int) (string, error),
finish func(string, int, int) error,
replMin int, replMax int,
) ([]api.AddedOutput, error) {
var err error
openChs := 3
toPrint := make([]api.AddedOutput, 0)
args := make(map[string]string)
arg := ""
for {
if openChs == 0 {
@ -169,13 +159,14 @@ func (a *AddSession) consumeImport(ctx context.Context,
continue
}
if err := consume(args, outObj, replMin, replMax); err != nil {
arg, err = consume(arg, outObj, replMin, replMax)
if err != nil {
return nil, err
}
}
}
if err := finish(args, replMin, replMax); err != nil {
if err := finish(arg, replMin, replMax); err != nil {
return nil, err
}
a.logger.Debugf("succeeding sharding import")
@ -224,8 +215,8 @@ func (a *AddSession) AddFile(ctx context.Context,
replMin, _ := strconv.Atoi(params.Get("repl_min"))
replMax, _ := strconv.Atoi(params.Get("repl_max"))
var consume func(map[string]string, *api.NodeWithMeta, int, int) error
var finish func(map[string]string, int, int) error
var consume func(string, *api.NodeWithMeta, int, int) (string, error)
var finish func(string, int, int) error
if shard == "true" {
consume = a.consumeShardAdd
finish = a.finishShardAdd

View File

@ -10,7 +10,6 @@ package api
import (
"fmt"
"mime/multipart"
"regexp"
"sort"
"strings"
@ -774,13 +773,6 @@ func (pins PinSerial) ToPin() Pin {
}
}
// FileInfo carries a reader pointing to a file and the parameters specified
// for adding the file to ipfs-cluster
type FileInfo struct {
Reader *multipart.Reader
Params map[string][]string
}
// AddedOutput carries information for displaying the standard ipfs output
// indicating a node of a file has been added.
type AddedOutput struct {

View File

@ -199,7 +199,6 @@ func (adder *Adder) Finalize() (ipld.Node, error) {
return nil, err
}
}
fmt.Printf("Output Dirs reached \n")
err = adder.outputDirs(name, root)
if err != nil {
return nil, err
@ -209,7 +208,6 @@ func (adder *Adder) Finalize() (ipld.Node, error) {
if err != nil {
return nil, err
}
return root.GetNode()
}

View File

@ -3,7 +3,6 @@ package importer
import (
"context"
"io"
"strings"
"github.com/ipfs/ipfs-cluster/api"
@ -11,7 +10,7 @@ import (
)
func shouldIgnore(err error) bool {
if strings.Contains(err.Error(), "dagservice: block not found") {
if err == errNotFound {
return true
}
return false