diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index fb80c012..353e2de5 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -56,59 +56,59 @@ func AddMultipartHTTPHandler( w.Header().Set("Connection", "close") var wg sync.WaitGroup - if params.StreamChannels { - // handle stream-adding - // this should be the default. - - // https://github.com/ipfs-shipyard/ipfs-companion/issues/600 - w.Header().Set("X-Chunked-Output", "1") - // Used by go-ipfs to signal errors half-way through the stream. - w.Header().Set("Trailer", "X-Stream-Error") - w.WriteHeader(http.StatusOK) + if !params.StreamChannels { + // in this case we buffer responses in memory and + // return them as a valid JSON array. wg.Add(1) + var bufOutput []interface{} // a slice of transformed AddedOutput go func() { defer wg.Done() - streamOutput(w, output, outputTransform) + bufOutput = buildOutput(output, outputTransform) }() + + enc := json.NewEncoder(w) add := adder.New(dags, params, output) root, err := add.FromMultipart(ctx, reader) - if err != nil { + if err != nil { // Send an error logger.Error(err) - // Set trailer with error - w.Header().Set("X-Stream-Error", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + errorResp := api.Error{ + Code: http.StatusInternalServerError, + Message: err.Error(), + } + + if err := enc.Encode(errorResp); err != nil { + logger.Error(err) + } + wg.Wait() + return root, err } wg.Wait() + w.WriteHeader(http.StatusOK) + enc.Encode(bufOutput) return root, err } - // Add buffering the AddedOutput (StreamChannels=false) + // handle stream-adding. This should be the default. + + // https://github.com/ipfs-shipyard/ipfs-companion/issues/600 + w.Header().Set("X-Chunked-Output", "1") + // Used by go-ipfs to signal errors half-way through the stream. + w.Header().Set("Trailer", "X-Stream-Error") + w.WriteHeader(http.StatusOK) wg.Add(1) - var bufOutput []interface{} // a slice of transformed AddedOutput go func() { defer wg.Done() - bufOutput = buildOutput(w, output, outputTransform) + streamOutput(w, output, outputTransform) }() - - enc := json.NewEncoder(w) add := adder.New(dags, params, output) root, err := add.FromMultipart(ctx, reader) - if err != nil { // Send an error + if err != nil { logger.Error(err) - w.WriteHeader(http.StatusInternalServerError) - errorResp := api.Error{ - Code: http.StatusInternalServerError, - Message: err.Error(), - } - - if err := enc.Encode(errorResp); err != nil { - logger.Error(err) - } - wg.Wait() - return root, err + // Set trailer with error + w.Header().Set("X-Stream-Error", err.Error()) } wg.Wait() - w.WriteHeader(http.StatusOK) - enc.Encode(bufOutput) return root, err } @@ -127,7 +127,7 @@ func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform } } -func buildOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) []interface{} { +func buildOutput(output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) []interface{} { var finalOutput []interface{} for v := range output { finalOutput = append(finalOutput, transform(v))