Refactor add & general cleanup

addFile function is now a Cluster method accessed by RPC
residue from attempting to stream responses removed
ipfs-cluster-ctl ls bug fixed
problem with importer/add not printing resolved
new test now checks for this

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2018-04-20 11:41:15 -04:00 committed by Hector Sanjuan
parent 1c8c4604c1
commit 1704295331
11 changed files with 353 additions and 125 deletions

227
add.go Normal file
View File

@ -0,0 +1,227 @@
package ipfscluster
import (
"context"
"errors"
"mime/multipart"
"net/url"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/importer"
"github.com/ipfs/go-ipfs-cmdkit/files"
)
func (c *Cluster) consumeLocalAdd(
args map[string]string,
outObj *api.NodeWithMeta,
replMin, replMax int,
) error {
//TODO: when ipfs add starts supporting formats other than
// v0 (v1.cbor, v1.protobuf) we'll need to update this
outObj.Format = ""
args["cid"] = outObj.Cid // root node stored on last call
var hash string
err := c.rpcClient.Call(
"",
"Cluster",
"IPFSBlockPut",
*outObj,
&hash)
if outObj.Cid != hash {
logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", outObj.Cid, hash)
}
return err
}
func (c *Cluster) finishLocalAdd(
args map[string]string,
replMin, replMax int,
) error {
rootCid, ok := args["cid"]
if !ok {
return errors.New("no record of root to pin")
}
pinS := api.PinSerial{
Cid: rootCid,
Type: api.DataType,
Recursive: true,
ReplicationFactorMin: replMin,
ReplicationFactorMax: replMax,
}
return c.rpcClient.Call(
"",
"Cluster",
"Pin",
pinS,
&struct{}{},
)
}
func (c *Cluster) consumeShardAdd(
args map[string]string,
outObj *api.NodeWithMeta,
replMin, replMax int,
) error {
var shardID string
shardID, ok := args["id"]
outObj.ID = shardID
outObj.ReplMax = replMax
outObj.ReplMin = replMin
var retStr string
err := c.rpcClient.Call(
"",
"Cluster",
"SharderAddNode",
*outObj,
&retStr)
if !ok {
args["id"] = retStr
}
return err
}
func (c *Cluster) finishShardAdd(
args map[string]string,
replMin, replMax int,
) error {
shardID, ok := args["id"]
if !ok {
return errors.New("bad state: shardID passed incorrectly")
}
err := c.rpcClient.Call(
"",
"Cluster",
"SharderFinalize",
shardID,
&struct{}{},
)
return err
}
func (c *Cluster) consumeImport(ctx context.Context,
outChan <-chan *api.NodeWithMeta,
printChan <-chan *api.AddedOutput,
errChan <-chan error,
consume func(map[string]string, *api.NodeWithMeta, int, int) error,
finish func(map[string]string, int, int) error,
replMin int, replMax int,
) ([]api.AddedOutput, error) {
var err error
openChs := 3
toPrint := make([]api.AddedOutput, 0)
args := make(map[string]string)
for {
if openChs == 0 {
break
}
// Consume signals from importer. Errors resulting from
select {
// Ensure we terminate reading from importer after cancellation
// but do not block
case <-ctx.Done():
err = errors.New("context timeout terminated add")
return nil, err
// Terminate session when importer throws an error
case err, ok := <-errChan:
if !ok {
openChs--
errChan = nil
continue
}
return nil, err
// Send status information to client for user output
case printObj, ok := <-printChan:
//TODO: if we support progress bar we must update this
if !ok {
openChs--
printChan = nil
continue
}
toPrint = append(toPrint, *printObj)
// Consume ipld node output by importer
case outObj, ok := <-outChan:
if !ok {
openChs--
outChan = nil
continue
}
if err := consume(args, outObj, replMin, replMax); err != nil {
return nil, err
}
}
}
if err := finish(args, replMin, replMax); err != nil {
return nil, err
}
logger.Debugf("succeeding sharding import")
return toPrint, nil
}
func (c *Cluster) AddFile(
reader *multipart.Reader,
params url.Values,
) ([]api.AddedOutput, error) {
layout := params.Get("layout")
trickle := false
if layout == "trickle" {
trickle = true
}
chunker := params.Get("chunker")
raw, _ := strconv.ParseBool(params.Get("raw"))
wrap, _ := strconv.ParseBool(params.Get("wrap"))
progress, _ := strconv.ParseBool(params.Get("progress"))
hidden, _ := strconv.ParseBool(params.Get("hidden"))
silent, _ := strconv.ParseBool(params.Get("silent"))
f := &files.MultipartFile{
Mediatype: "multipart/form-data",
Reader: reader,
}
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
printChan, outChan, errChan := importer.ToChannel(
ctx,
f,
progress,
hidden,
trickle,
raw,
silent,
wrap,
chunker,
)
shard := params.Get("shard")
replMin, _ := strconv.Atoi(params.Get("repl_min"))
replMax, _ := strconv.Atoi(params.Get("repl_max"))
var consume func(map[string]string, *api.NodeWithMeta, int, int) error
var finish func(map[string]string, int, int) error
if shard == "true" {
consume = c.consumeShardAdd
finish = c.finishShardAdd
} else {
consume = c.consumeLocalAdd
finish = c.finishLocalAdd
}
return c.consumeImport(
ctx,
outChan,
printChan,
errChan,
consume,
finish,
replMin,
replMax,
)
}

View File

@ -326,7 +326,9 @@ func (c *Client) AddMultiFile(
fmtStr3 := "repl_min=%d&repl_max=%d"
url := fmt.Sprintf(fmtStr1+fmtStr2+fmtStr3, shard, quiet, silent, layout, chunker,
raw, wrap, progress, hidden, replMin, replMax)
return c.doStream("POST", url, multiFileR, headers, nil)
output := make([]api.AddedOutput, 0)
err := c.doStream("POST", url, multiFileR, headers, &output)
return output, err
}
// Eventually an Add(io.Reader) method for adding raw readers as a multifile should be here.

View File

@ -2,8 +2,6 @@ package client
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
@ -69,12 +67,12 @@ func (c *Client) doStreamRequest(method, path string, body io.Reader, headers ma
return c.client.Do(r)
}
func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) ([]api.AddedOutput, error) {
func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) (error) {
resp, err := c.doStreamRequest(method, path, body, headers)
if err != nil {
return nil, &api.Error{Code: 0, Message: err.Error()}
return &api.Error{Code: 0, Message: err.Error()}
}
return c.handleStreamResponse(resp, obj)
return c.handleResponse(resp, obj)
}
func (c *Client) handleResponse(resp *http.Response, obj interface{}) error {
@ -113,30 +111,3 @@ func (c *Client) handleResponse(resp *http.Response, obj interface{}) error {
}
return nil
}
func (c *Client) handleStreamResponse(resp *http.Response, obj interface{}) ([]api.AddedOutput, error) {
// Read body until a termination signal (status code != 0)
outputs := make([]api.AddedOutput, 0)
dec := json.NewDecoder(resp.Body)
for {
var output api.AddedOutput
err := dec.Decode(&output)
outputs = append(outputs, output)
if err != nil {
logger.Debugf("error on decode")
return outputs, err
}
if output.Code == 1 {
return outputs, errors.New(output.Message)
} else if output.Code == 2 {
// All done
logger.Debugf("add output transfer complete")
return outputs, nil
} else if output.Code == 0 {
// TODO more mature handling of printing: e.g. in another function / combine with existing printers
continue
} else {
return outputs, fmt.Errorf("unexpected error code: %d", output.Code)
}
}
}

View File

@ -22,14 +22,12 @@ import (
"time"
types "github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/importer"
mux "github.com/gorilla/mux"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
gostream "github.com/hsanjuan/go-libp2p-gostream"
p2phttp "github.com/hsanjuan/go-libp2p-http"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmdkit/files"
logging "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
@ -666,87 +664,35 @@ func (api *API) consumeImport(ctx context.Context,
return enc.Encode(types.Error{Code: 2, Message: "success"})
}
// Get a random string of length n. Used to generate sharding id
func randStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("Content-Type")
mediatype, _, _ := mime.ParseMediaType(contentType)
var f files.File
if mediatype == "multipart/form-data" {
reader, err := r.MultipartReader()
if err != nil {
sendErrorResponse(w, 400, err.Error())
return
}
f = &files.MultipartFile{
Mediatype: mediatype,
Reader: reader,
}
} else {
if mediatype != "multipart/form-data" {
sendErrorResponse(w, 415, "unsupported media type")
return
}
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
reader, err := r.MultipartReader()
if err != nil {
sendErrorResponse(w, 400, err.Error())
return
}
queryValues := r.URL.Query()
layout := queryValues.Get("layout")
trickle := false
if layout == "trickle" {
trickle = true
fI := types.FileInfo{
Reader: reader,
Params: queryValues,
}
chunker := queryValues.Get("chunker")
raw, _ := strconv.ParseBool(queryValues.Get("raw"))
wrap, _ := strconv.ParseBool(queryValues.Get("wrap"))
progress, _ := strconv.ParseBool(queryValues.Get("progress"))
hidden, _ := strconv.ParseBool(queryValues.Get("hidden"))
silent, _ := strconv.ParseBool(queryValues.Get("silent")) // just print root hash
printChan, outChan, errChan := importer.ToChannel(ctx, f, progress,
hidden, trickle, raw, silent, wrap, chunker)
shard := queryValues.Get("shard")
// quiet := queryValues.Get("quiet") // just print hashes, no meta data
replMin, _ := strconv.Atoi(queryValues.Get("repl_min"))
replMax, _ := strconv.Atoi(queryValues.Get("repl_max"))
if shard == "true" {
if err := api.consumeImport(
ctx,
outChan,
printChan,
errChan,
w,
api.consumeShardAdd,
api.finishShardAdd,
replMin,
replMax,
); err != nil {
panic(err)
}
} else {
if err := api.consumeImport(
ctx,
outChan,
printChan,
errChan,
w,
api.consumeLocalAdd,
api.finishLocalAdd,
replMin,
replMax,
); err != nil {
panic(err)
}
var toPrint []types.AddedOutput
err = api.rpcClient.Call("",
"Cluster",
"AddFile",
fI,
&toPrint)
if err != nil {
sendErrorResponse(w, 500, err.Error())
}
sendJSONResponse(w, 200, toPrint)
}
func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) {
@ -845,13 +791,14 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) {
struct{}{},
&pins,
)
for i, pinS := range pins {
if api.filterOutPin(filter, pinS.ToPin()) {
// remove this pin from output
pins = append(pins[:i], pins[i+1:]...)
outPins := make([]types.PinSerial, 0)
for _, pinS := range pins {
if !api.filterOutPin(filter, pinS.ToPin()) {
// add this pin to output
outPins = append(outPins, pinS)
}
}
sendResponse(w, err, pins)
sendResponse(w, err, outPins)
}
func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {

View File

@ -10,6 +10,7 @@ package api
import (
"fmt"
"mime/multipart"
"regexp"
"sort"
"strings"
@ -773,6 +774,13 @@ func (pins PinSerial) ToPin() Pin {
}
}
// FileInfo carries a reader pointing to a file and the parameters specified
// for adding the file to ipfs-cluster
type FileInfo struct {
Reader *multipart.Reader
Params map[string][]string
}
// AddedOutput carries information for displaying the standard ipfs output
// indicating a node of a file has been added.
type AddedOutput struct {

View File

@ -958,9 +958,6 @@ func (c *Cluster) validatePin(pin api.Pin, rplMin, rplMax int) error {
if pin.Recursive {
return errors.New("must pin roots directly")
}
if pin.Clusterdag == nil {
return errors.New("roots must reference a dag")
}
if pin.Parents.Len() > 1 {
return errors.New("cdag nodes are referenced once")
}
@ -968,6 +965,10 @@ func (c *Cluster) validatePin(pin api.Pin, rplMin, rplMax int) error {
if len(pin.Allocations) != 0 {
return errors.New("meta pin should not specify allocations")
}
if pin.Clusterdag == nil {
return errors.New("roots must reference a dag")
}
default:
return errors.New("unrecognized pin type")
}

View File

@ -199,7 +199,7 @@ func (adder *Adder) Finalize() (ipld.Node, error) {
return nil, err
}
}
fmt.Printf("Output Dirs reached \n")
err = adder.outputDirs(name, root)
if err != nil {
return nil, err
@ -226,6 +226,14 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
for _, name := range names {
child, err := fsn.Child(name)
if err != nil {
// It is ok if adder can't fetch block to make
// an fsn file. Outgoing DAGservice does not
// store blocks. We recognize it as a file and
// keep traversing the directory
if shouldIgnore(err) {
continue
}
return err
}
@ -241,7 +249,6 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
if err != nil {
return err
}
return outputDagnode(adder.Out, path, nd)
default:
return fmt.Errorf("unrecognized fsn type: %#v", fsn)

View File

@ -10,6 +10,13 @@ import (
"github.com/ipfs/go-ipfs-cmdkit/files"
)
func shouldIgnore(err error) bool {
if strings.Contains(err.Error(), "dagservice: block not found") {
return true
}
return false
}
// ToChannel imports file to ipfs ipld nodes, outputting nodes on the
// provided channel
func ToChannel(ctx context.Context, f files.File, progress bool, hidden bool,
@ -67,7 +74,7 @@ func ToChannel(ctx context.Context, f files.File, progress bool, hidden bool,
}
_, err := fileAdder.Finalize()
if err != nil && !strings.Contains(err.Error(), "dagservice: block not found") {
if err != nil && !shouldIgnore(err) {
errChan <- err
}
}()

View File

@ -32,9 +32,6 @@ var cids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn",
"QmUwG2mfhhfBEzVtvyEvva1kc8kU4CrXphdMCdAiFNWdxy",
"QmRgkP4x7tXW9PyiPPxe3aqALQEs22nifkwzrm7wickdfr",
"QmNpCHs9zrzP4aArBzRQgjNSbMC5hYqJa1ksmbyorSu44b",
"QmQbg4sHm4zVHnqCS14YzNnQFVMqjZpu5XjkF7vtbzqkFW",
"QmcUBApwNSDg2Q2J4NXzks1HewVGFohNpPyEud7bZfo5tE",
"QmaKaB735eydQwnaJNuYbXRU1gVb4MJdzHp1rpUZJN67G6",
"QmQ6n82dRtEJVHMDLW5BSrJ6bRDXkFXj2i7Vjxy4B2PH82",
"QmPZLJ3CZYgxH4K1w5jdbAdxJynXn5TCB4kHy7u8uHC3fy",
"QmUqmcdJJodX7mPoUv9u71HoNAiegBCiCqdUZd5ZrCyLbs",
@ -43,10 +40,15 @@ var cids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn",
"QmaytnNarHzDp1ipGyC7zd7Hw2AePmtSpLLaygQ2e9Yvqe",
"QmZwab9h6ADw3tv8pzXVF2yndgJpTWKrrMjJQqRkgYmCRH",
"QmaNfMZDZjfqjHrFCc6tZwmkqbXs1fnY9AXZ81WUztFeXm",
"QmY4qt6WG12qYwWeeTNhggqaNMJWp2NouuTSf79ukoobw8"}
// Hashes that are not printed out (chunks of files and wrapper dir)
"QmY4qt6WG12qYwWeeTNhggqaNMJWp2NouuTSf79ukoobw8",
"QmQbg4sHm4zVHnqCS14YzNnQFVMqjZpu5XjkF7vtbzqkFW",
"QmcUBApwNSDg2Q2J4NXzks1HewVGFohNpPyEud7bZfo5tE",
"QmaKaB735eydQwnaJNuYbXRU1gVb4MJdzHp1rpUZJN67G6",
}
// import and receive all blocks
func TestToChannel(t *testing.T) {
func TestToChannelOutput(t *testing.T) {
file, err := getTestingDir()
if err != nil {
t.Fatal(err)
@ -95,6 +97,55 @@ func TestToChannel(t *testing.T) {
}
}
func TestToChannelPrint(t *testing.T) {
file, err := getTestingDir()
if err != nil {
t.Fatal(err)
}
printChan, outChan, errChan := ToChannel(context.Background(), file,
false, false, false, false, false, false, "")
go func() { // listen on outChan so progress can be made
for {
_, ok := <-outChan
if !ok {
// channel closed, safe to exit
return
}
}
}()
go func() { // listen for errors
for {
err, ok := <-errChan
if !ok {
// channel closed, safe to exit
return
}
t.Fatal(err)
}
}()
check := make(map[string]struct{})
for obj := range printChan {
cid := obj.Hash
if _, ok := check[cid]; ok {
t.Fatalf("Duplicate cid %s", cid)
}
check[cid] = struct{}{}
}
if len(check) != len(cids[:14]) {
t.Fatalf("Witnessed cids: %v\nExpected cids: %v", check, cids)
}
cidsS := cids[:]
for cid := range check {
if !contains(cidsS, cid) {
t.Fatalf("Unexpected cid: %s", cid)
}
}
}
func contains(slice []string, s string) bool {
for _, a := range slice {
if a == s {

View File

@ -173,11 +173,11 @@ func textFormatPrintPin(obj *api.PinSerial) {
fmt.Printf("%s | %s | ", obj.Cid, obj.Name)
if obj.ReplicationFactorMin < 0 {
fmt.Printf("Repl. Factor: -1 | Allocations: [everywhere]\n")
fmt.Printf("Repl. Factor: -1 | Allocations: [everywhere]")
} else {
var sortAlloc sort.StringSlice = obj.Allocations
sortAlloc.Sort()
fmt.Printf("Repl. Factor: %d--%d | Allocations: %s\n",
fmt.Printf("Repl. Factor: %d--%d | Allocations: %s",
obj.ReplicationFactorMin, obj.ReplicationFactorMax,
sortAlloc)
}
@ -196,13 +196,13 @@ func textFormatPrintPin(obj *api.PinSerial) {
case api.DataType:
infoStr = typeStr
case api.MetaType:
infoStr = fmt.Sprintf("%s-- clusterDAG=%s",typeStr, obj.Clusterdag)
infoStr = fmt.Sprintf("%s-- clusterdag=%s",typeStr, obj.Clusterdag)
case api.CdagType, api.ShardType:
infoStr = typeStr
default:
infoStr = ""
}
fmt.Printf("| %s ", infoStr)
fmt.Printf("%s \n", infoStr)
}
func textFormatPrintAddedOutput(obj *api.AddedOutput) {

View File

@ -53,6 +53,13 @@ func (rpcapi *RPCAPI) Pins(ctx context.Context, in struct{}, out *[]api.PinSeria
return nil
}
// AddFile runs Cluster.AddFile
func (rpcapi *RPCAPI) AddFile(ctx context.Context, in api.FileInfo, out *[]api.AddedOutput) error {
toPrint, err := rpcapi.c.AddFile(in.Reader, in.Params)
*out = toPrint
return err
}
// PinGet runs Cluster.PinGet().
func (rpcapi *RPCAPI) PinGet(ctx context.Context, in api.PinSerial, out *api.PinSerial) error {
cidarg := in.ToPin()