diff --git a/add.go b/add.go new file mode 100644 index 00000000..2aabf162 --- /dev/null +++ b/add.go @@ -0,0 +1,227 @@ +package ipfscluster + +import ( + "context" + "errors" + "mime/multipart" + "net/url" + "strconv" + + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/importer" + + "github.com/ipfs/go-ipfs-cmdkit/files" +) + +func (c *Cluster) consumeLocalAdd( + args map[string]string, + outObj *api.NodeWithMeta, + replMin, replMax int, +) error { + //TODO: when ipfs add starts supporting formats other than + // v0 (v1.cbor, v1.protobuf) we'll need to update this + outObj.Format = "" + args["cid"] = outObj.Cid // root node stored on last call + var hash string + err := c.rpcClient.Call( + "", + "Cluster", + "IPFSBlockPut", + *outObj, + &hash) + if outObj.Cid != hash { + logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", outObj.Cid, hash) + } + return err +} + +func (c *Cluster) finishLocalAdd( + args map[string]string, + replMin, replMax int, +) error { + rootCid, ok := args["cid"] + if !ok { + return errors.New("no record of root to pin") + } + + pinS := api.PinSerial{ + Cid: rootCid, + Type: api.DataType, + Recursive: true, + ReplicationFactorMin: replMin, + ReplicationFactorMax: replMax, + } + return c.rpcClient.Call( + "", + "Cluster", + "Pin", + pinS, + &struct{}{}, + ) +} + +func (c *Cluster) consumeShardAdd( + args map[string]string, + outObj *api.NodeWithMeta, + replMin, replMax int, +) error { + + var shardID string + shardID, ok := args["id"] + outObj.ID = shardID + outObj.ReplMax = replMax + outObj.ReplMin = replMin + var retStr string + err := c.rpcClient.Call( + "", + "Cluster", + "SharderAddNode", + *outObj, + &retStr) + if !ok { + args["id"] = retStr + } + return err +} + +func (c *Cluster) finishShardAdd( + args map[string]string, + replMin, replMax int, +) error { + shardID, ok := args["id"] + if !ok { + return errors.New("bad state: shardID passed incorrectly") + } + err := c.rpcClient.Call( + "", + "Cluster", + "SharderFinalize", + shardID, + &struct{}{}, + ) + return err +} + +func (c *Cluster) consumeImport(ctx context.Context, + outChan <-chan *api.NodeWithMeta, + printChan <-chan *api.AddedOutput, + errChan <-chan error, + consume func(map[string]string, *api.NodeWithMeta, int, int) error, + finish func(map[string]string, int, int) error, + replMin int, replMax int, +) ([]api.AddedOutput, error) { + var err error + openChs := 3 + toPrint := make([]api.AddedOutput, 0) + args := make(map[string]string) + + for { + if openChs == 0 { + break + } + + // Consume signals from importer. Errors resulting from + select { + // Ensure we terminate reading from importer after cancellation + // but do not block + case <-ctx.Done(): + err = errors.New("context timeout terminated add") + return nil, err + // Terminate session when importer throws an error + case err, ok := <-errChan: + if !ok { + openChs-- + errChan = nil + continue + } + return nil, err + + // Send status information to client for user output + case printObj, ok := <-printChan: + //TODO: if we support progress bar we must update this + if !ok { + openChs-- + printChan = nil + continue + } + toPrint = append(toPrint, *printObj) + // Consume ipld node output by importer + case outObj, ok := <-outChan: + if !ok { + openChs-- + outChan = nil + continue + } + + if err := consume(args, outObj, replMin, replMax); err != nil { + return nil, err + } + } + } + + if err := finish(args, replMin, replMax); err != nil { + return nil, err + } + logger.Debugf("succeeding sharding import") + return toPrint, nil +} + +func (c *Cluster) AddFile( + reader *multipart.Reader, + params url.Values, +) ([]api.AddedOutput, error) { + layout := params.Get("layout") + trickle := false + if layout == "trickle" { + trickle = true + } + chunker := params.Get("chunker") + raw, _ := strconv.ParseBool(params.Get("raw")) + wrap, _ := strconv.ParseBool(params.Get("wrap")) + progress, _ := strconv.ParseBool(params.Get("progress")) + hidden, _ := strconv.ParseBool(params.Get("hidden")) + silent, _ := strconv.ParseBool(params.Get("silent")) + + f := &files.MultipartFile{ + Mediatype: "multipart/form-data", + Reader: reader, + } + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + printChan, outChan, errChan := importer.ToChannel( + ctx, + f, + progress, + hidden, + trickle, + raw, + silent, + wrap, + chunker, + ) + + shard := params.Get("shard") + replMin, _ := strconv.Atoi(params.Get("repl_min")) + replMax, _ := strconv.Atoi(params.Get("repl_max")) + + var consume func(map[string]string, *api.NodeWithMeta, int, int) error + var finish func(map[string]string, int, int) error + if shard == "true" { + consume = c.consumeShardAdd + finish = c.finishShardAdd + } else { + consume = c.consumeLocalAdd + finish = c.finishLocalAdd + + } + return c.consumeImport( + ctx, + outChan, + printChan, + errChan, + consume, + finish, + replMin, + replMax, + ) +} diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index 20157c72..12d352c7 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -326,7 +326,9 @@ func (c *Client) AddMultiFile( fmtStr3 := "repl_min=%d&repl_max=%d" url := fmt.Sprintf(fmtStr1+fmtStr2+fmtStr3, shard, quiet, silent, layout, chunker, raw, wrap, progress, hidden, replMin, replMax) - return c.doStream("POST", url, multiFileR, headers, nil) + output := make([]api.AddedOutput, 0) + err := c.doStream("POST", url, multiFileR, headers, &output) + return output, err } // Eventually an Add(io.Reader) method for adding raw readers as a multifile should be here. diff --git a/api/rest/client/request.go b/api/rest/client/request.go index 28c1b38b..d5daa608 100644 --- a/api/rest/client/request.go +++ b/api/rest/client/request.go @@ -2,8 +2,6 @@ package client import ( "encoding/json" - "errors" - "fmt" "io" "io/ioutil" "net/http" @@ -69,12 +67,12 @@ func (c *Client) doStreamRequest(method, path string, body io.Reader, headers ma return c.client.Do(r) } -func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) ([]api.AddedOutput, error) { +func (c *Client) doStream(method, path string, body io.Reader, headers map[string]string, obj interface{}) (error) { resp, err := c.doStreamRequest(method, path, body, headers) if err != nil { - return nil, &api.Error{Code: 0, Message: err.Error()} + return &api.Error{Code: 0, Message: err.Error()} } - return c.handleStreamResponse(resp, obj) + return c.handleResponse(resp, obj) } func (c *Client) handleResponse(resp *http.Response, obj interface{}) error { @@ -113,30 +111,3 @@ func (c *Client) handleResponse(resp *http.Response, obj interface{}) error { } return nil } - -func (c *Client) handleStreamResponse(resp *http.Response, obj interface{}) ([]api.AddedOutput, error) { - // Read body until a termination signal (status code != 0) - outputs := make([]api.AddedOutput, 0) - dec := json.NewDecoder(resp.Body) - for { - var output api.AddedOutput - err := dec.Decode(&output) - outputs = append(outputs, output) - if err != nil { - logger.Debugf("error on decode") - return outputs, err - } - if output.Code == 1 { - return outputs, errors.New(output.Message) - } else if output.Code == 2 { - // All done - logger.Debugf("add output transfer complete") - return outputs, nil - } else if output.Code == 0 { - // TODO more mature handling of printing: e.g. in another function / combine with existing printers - continue - } else { - return outputs, fmt.Errorf("unexpected error code: %d", output.Code) - } - } -} diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 519ca2bb..1b70eeb7 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -22,14 +22,12 @@ import ( "time" types "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/importer" mux "github.com/gorilla/mux" rpc "github.com/hsanjuan/go-libp2p-gorpc" gostream "github.com/hsanjuan/go-libp2p-gostream" p2phttp "github.com/hsanjuan/go-libp2p-http" cid "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-cmdkit/files" logging "github.com/ipfs/go-log" libp2p "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-host" @@ -666,87 +664,35 @@ func (api *API) consumeImport(ctx context.Context, return enc.Encode(types.Error{Code: 2, Message: "success"}) } -// Get a random string of length n. Used to generate sharding id -func randStringRunes(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) -} - func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) { contentType := r.Header.Get("Content-Type") mediatype, _, _ := mime.ParseMediaType(contentType) - var f files.File - if mediatype == "multipart/form-data" { - reader, err := r.MultipartReader() - if err != nil { - sendErrorResponse(w, 400, err.Error()) - return - } - f = &files.MultipartFile{ - Mediatype: mediatype, - Reader: reader, - } - } else { + if mediatype != "multipart/form-data" { sendErrorResponse(w, 415, "unsupported media type") return } - - ctx, cancel := context.WithCancel(api.ctx) - defer cancel() + reader, err := r.MultipartReader() + if err != nil { + sendErrorResponse(w, 400, err.Error()) + return + } queryValues := r.URL.Query() - layout := queryValues.Get("layout") - trickle := false - if layout == "trickle" { - trickle = true + fI := types.FileInfo{ + Reader: reader, + Params: queryValues, } - chunker := queryValues.Get("chunker") - raw, _ := strconv.ParseBool(queryValues.Get("raw")) - wrap, _ := strconv.ParseBool(queryValues.Get("wrap")) - progress, _ := strconv.ParseBool(queryValues.Get("progress")) - hidden, _ := strconv.ParseBool(queryValues.Get("hidden")) - silent, _ := strconv.ParseBool(queryValues.Get("silent")) // just print root hash - printChan, outChan, errChan := importer.ToChannel(ctx, f, progress, - hidden, trickle, raw, silent, wrap, chunker) - - shard := queryValues.Get("shard") - // quiet := queryValues.Get("quiet") // just print hashes, no meta data - replMin, _ := strconv.Atoi(queryValues.Get("repl_min")) - replMax, _ := strconv.Atoi(queryValues.Get("repl_max")) - - if shard == "true" { - if err := api.consumeImport( - ctx, - outChan, - printChan, - errChan, - w, - api.consumeShardAdd, - api.finishShardAdd, - replMin, - replMax, - ); err != nil { - panic(err) - } - } else { - if err := api.consumeImport( - ctx, - outChan, - printChan, - errChan, - w, - api.consumeLocalAdd, - api.finishLocalAdd, - replMin, - replMax, - ); err != nil { - panic(err) - } + var toPrint []types.AddedOutput + err = api.rpcClient.Call("", + "Cluster", + "AddFile", + fI, + &toPrint) + if err != nil { + sendErrorResponse(w, 500, err.Error()) } + sendJSONResponse(w, 200, toPrint) } func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) { @@ -845,13 +791,14 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) { struct{}{}, &pins, ) - for i, pinS := range pins { - if api.filterOutPin(filter, pinS.ToPin()) { - // remove this pin from output - pins = append(pins[:i], pins[i+1:]...) + outPins := make([]types.PinSerial, 0) + for _, pinS := range pins { + if !api.filterOutPin(filter, pinS.ToPin()) { + // add this pin to output + outPins = append(outPins, pinS) } } - sendResponse(w, err, pins) + sendResponse(w, err, outPins) } func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) { diff --git a/api/types.go b/api/types.go index 383a2ace..79d351f4 100644 --- a/api/types.go +++ b/api/types.go @@ -10,6 +10,7 @@ package api import ( "fmt" + "mime/multipart" "regexp" "sort" "strings" @@ -773,6 +774,13 @@ func (pins PinSerial) ToPin() Pin { } } +// FileInfo carries a reader pointing to a file and the parameters specified +// for adding the file to ipfs-cluster +type FileInfo struct { + Reader *multipart.Reader + Params map[string][]string +} + // AddedOutput carries information for displaying the standard ipfs output // indicating a node of a file has been added. type AddedOutput struct { diff --git a/cluster.go b/cluster.go index 56fcd939..490a99c5 100644 --- a/cluster.go +++ b/cluster.go @@ -958,9 +958,6 @@ func (c *Cluster) validatePin(pin api.Pin, rplMin, rplMax int) error { if pin.Recursive { return errors.New("must pin roots directly") } - if pin.Clusterdag == nil { - return errors.New("roots must reference a dag") - } if pin.Parents.Len() > 1 { return errors.New("cdag nodes are referenced once") } @@ -968,6 +965,10 @@ func (c *Cluster) validatePin(pin api.Pin, rplMin, rplMax int) error { if len(pin.Allocations) != 0 { return errors.New("meta pin should not specify allocations") } + if pin.Clusterdag == nil { + return errors.New("roots must reference a dag") + } + default: return errors.New("unrecognized pin type") } diff --git a/importer/add.go b/importer/add.go index 3f777890..9a4987d4 100644 --- a/importer/add.go +++ b/importer/add.go @@ -199,7 +199,7 @@ func (adder *Adder) Finalize() (ipld.Node, error) { return nil, err } } - + fmt.Printf("Output Dirs reached \n") err = adder.outputDirs(name, root) if err != nil { return nil, err @@ -226,6 +226,14 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error { for _, name := range names { child, err := fsn.Child(name) if err != nil { + // It is ok if adder can't fetch block to make + // an fsn file. Outgoing DAGservice does not + // store blocks. We recognize it as a file and + // keep traversing the directory + if shouldIgnore(err) { + continue + } + return err } @@ -241,7 +249,6 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error { if err != nil { return err } - return outputDagnode(adder.Out, path, nd) default: return fmt.Errorf("unrecognized fsn type: %#v", fsn) diff --git a/importer/import.go b/importer/import.go index b5597e7b..842f8afb 100644 --- a/importer/import.go +++ b/importer/import.go @@ -10,6 +10,13 @@ import ( "github.com/ipfs/go-ipfs-cmdkit/files" ) +func shouldIgnore(err error) bool { + if strings.Contains(err.Error(), "dagservice: block not found") { + return true + } + return false +} + // ToChannel imports file to ipfs ipld nodes, outputting nodes on the // provided channel func ToChannel(ctx context.Context, f files.File, progress bool, hidden bool, @@ -67,7 +74,7 @@ func ToChannel(ctx context.Context, f files.File, progress bool, hidden bool, } _, err := fileAdder.Finalize() - if err != nil && !strings.Contains(err.Error(), "dagservice: block not found") { + if err != nil && !shouldIgnore(err) { errChan <- err } }() diff --git a/importer/import_test.go b/importer/import_test.go index dc29aee1..eae410d5 100644 --- a/importer/import_test.go +++ b/importer/import_test.go @@ -32,9 +32,6 @@ var cids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn", "QmUwG2mfhhfBEzVtvyEvva1kc8kU4CrXphdMCdAiFNWdxy", "QmRgkP4x7tXW9PyiPPxe3aqALQEs22nifkwzrm7wickdfr", "QmNpCHs9zrzP4aArBzRQgjNSbMC5hYqJa1ksmbyorSu44b", - "QmQbg4sHm4zVHnqCS14YzNnQFVMqjZpu5XjkF7vtbzqkFW", - "QmcUBApwNSDg2Q2J4NXzks1HewVGFohNpPyEud7bZfo5tE", - "QmaKaB735eydQwnaJNuYbXRU1gVb4MJdzHp1rpUZJN67G6", "QmQ6n82dRtEJVHMDLW5BSrJ6bRDXkFXj2i7Vjxy4B2PH82", "QmPZLJ3CZYgxH4K1w5jdbAdxJynXn5TCB4kHy7u8uHC3fy", "QmUqmcdJJodX7mPoUv9u71HoNAiegBCiCqdUZd5ZrCyLbs", @@ -43,10 +40,15 @@ var cids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn", "QmaytnNarHzDp1ipGyC7zd7Hw2AePmtSpLLaygQ2e9Yvqe", "QmZwab9h6ADw3tv8pzXVF2yndgJpTWKrrMjJQqRkgYmCRH", "QmaNfMZDZjfqjHrFCc6tZwmkqbXs1fnY9AXZ81WUztFeXm", - "QmY4qt6WG12qYwWeeTNhggqaNMJWp2NouuTSf79ukoobw8"} + // Hashes that are not printed out (chunks of files and wrapper dir) + "QmY4qt6WG12qYwWeeTNhggqaNMJWp2NouuTSf79ukoobw8", + "QmQbg4sHm4zVHnqCS14YzNnQFVMqjZpu5XjkF7vtbzqkFW", + "QmcUBApwNSDg2Q2J4NXzks1HewVGFohNpPyEud7bZfo5tE", + "QmaKaB735eydQwnaJNuYbXRU1gVb4MJdzHp1rpUZJN67G6", +} // import and receive all blocks -func TestToChannel(t *testing.T) { +func TestToChannelOutput(t *testing.T) { file, err := getTestingDir() if err != nil { t.Fatal(err) @@ -95,6 +97,55 @@ func TestToChannel(t *testing.T) { } } +func TestToChannelPrint(t *testing.T) { + file, err := getTestingDir() + if err != nil { + t.Fatal(err) + } + + printChan, outChan, errChan := ToChannel(context.Background(), file, + false, false, false, false, false, false, "") + + go func() { // listen on outChan so progress can be made + for { + _, ok := <-outChan + if !ok { + // channel closed, safe to exit + return + } + } + }() + + go func() { // listen for errors + for { + err, ok := <-errChan + if !ok { + // channel closed, safe to exit + return + } + t.Fatal(err) + } + }() + + check := make(map[string]struct{}) + for obj := range printChan { + cid := obj.Hash + if _, ok := check[cid]; ok { + t.Fatalf("Duplicate cid %s", cid) + } + check[cid] = struct{}{} + } + if len(check) != len(cids[:14]) { + t.Fatalf("Witnessed cids: %v\nExpected cids: %v", check, cids) + } + cidsS := cids[:] + for cid := range check { + if !contains(cidsS, cid) { + t.Fatalf("Unexpected cid: %s", cid) + } + } +} + func contains(slice []string, s string) bool { for _, a := range slice { if a == s { diff --git a/ipfs-cluster-ctl/formatters.go b/ipfs-cluster-ctl/formatters.go index d80d79f3..2803f633 100644 --- a/ipfs-cluster-ctl/formatters.go +++ b/ipfs-cluster-ctl/formatters.go @@ -173,11 +173,11 @@ func textFormatPrintPin(obj *api.PinSerial) { fmt.Printf("%s | %s | ", obj.Cid, obj.Name) if obj.ReplicationFactorMin < 0 { - fmt.Printf("Repl. Factor: -1 | Allocations: [everywhere]\n") + fmt.Printf("Repl. Factor: -1 | Allocations: [everywhere]") } else { var sortAlloc sort.StringSlice = obj.Allocations sortAlloc.Sort() - fmt.Printf("Repl. Factor: %d--%d | Allocations: %s\n", + fmt.Printf("Repl. Factor: %d--%d | Allocations: %s", obj.ReplicationFactorMin, obj.ReplicationFactorMax, sortAlloc) } @@ -196,13 +196,13 @@ func textFormatPrintPin(obj *api.PinSerial) { case api.DataType: infoStr = typeStr case api.MetaType: - infoStr = fmt.Sprintf("%s-- clusterDAG=%s",typeStr, obj.Clusterdag) + infoStr = fmt.Sprintf("%s-- clusterdag=%s",typeStr, obj.Clusterdag) case api.CdagType, api.ShardType: infoStr = typeStr default: infoStr = "" } - fmt.Printf("| %s ", infoStr) + fmt.Printf("%s \n", infoStr) } func textFormatPrintAddedOutput(obj *api.AddedOutput) { diff --git a/rpc_api.go b/rpc_api.go index e00cfaca..c04bd009 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -53,6 +53,13 @@ func (rpcapi *RPCAPI) Pins(ctx context.Context, in struct{}, out *[]api.PinSeria return nil } +// AddFile runs Cluster.AddFile +func (rpcapi *RPCAPI) AddFile(ctx context.Context, in api.FileInfo, out *[]api.AddedOutput) error { + toPrint, err := rpcapi.c.AddFile(in.Reader, in.Params) + *out = toPrint + return err +} + // PinGet runs Cluster.PinGet(). func (rpcapi *RPCAPI) PinGet(ctx context.Context, in api.PinSerial, out *api.PinSerial) error { cidarg := in.ToPin()