Fix #632: Handle "stream-channels" in /add endpoints

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2019-01-04 19:30:41 +01:00
parent c0cdaa4b3e
commit 16297ced48
2 changed files with 109 additions and 55 deletions

View File

@ -34,58 +34,103 @@ func AddMultipartHTTPHandler(
) (cid.Cid, error) {
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
flusher, flush := w.(http.Flusher)
if params.Shard {
dags = sharding.New(rpc, params.PinOptions, output)
} else {
dags = local.New(rpc, params.PinOptions)
}
enc := json.NewEncoder(w)
// This must be application/json otherwise go-ipfs client
// will break.
w.Header().Set("Content-Type", "application/json")
// Browsers should not cache when streaming content.
w.Header().Set("Cache-Control", "no-cache")
// Custom header which breaks js-ipfs-api if not set
// https://github.com/ipfs-shipyard/ipfs-companion/issues/600
w.Header().Set("X-Chunked-Output", "1")
// Used by go-ipfs to signal errors half-way through the stream.
w.Header().Set("Trailer", "X-Stream-Error")
// We need to ask the clients to close the connection
// (no keep-alive) of things break badly when adding.
// https://github.com/ipfs/go-ipfs-cmds/pull/116
w.Header().Set("Connection", "close")
w.WriteHeader(http.StatusOK)
if outputTransform == nil {
outputTransform = func(in *api.AddedOutput) interface{} { return in }
}
// This must be application/json otherwise go-ipfs client
// will break.
w.Header().Set("Content-Type", "application/json")
// Browsers should not cache these responses.
w.Header().Set("Cache-Control", "no-cache")
// We need to ask the clients to close the connection
// (no keep-alive) of things break badly when adding.
// https://github.com/ipfs/go-ipfs-cmds/pull/116
w.Header().Set("Connection", "close")
var wg sync.WaitGroup
if params.StreamChannels {
// handle stream-adding
// this should be the default.
// https://github.com/ipfs-shipyard/ipfs-companion/issues/600
w.Header().Set("X-Chunked-Output", "1")
// Used by go-ipfs to signal errors half-way through the stream.
w.Header().Set("Trailer", "X-Stream-Error")
w.WriteHeader(http.StatusOK)
wg.Add(1)
go func() {
defer wg.Done()
streamOutput(w, output, outputTransform)
}()
add := adder.New(dags, params, output)
root, err := add.FromMultipart(ctx, reader)
if err != nil {
logger.Error(err)
// Set trailer with error
w.Header().Set("X-Stream-Error", err.Error())
}
wg.Wait()
return root, err
}
// Add buffering the AddedOutput (StreamChannels=false)
wg.Add(1)
var bufOutput []interface{} // a slice of transformed AddedOutput
go func() {
defer wg.Done()
for v := range output {
err := enc.Encode(outputTransform(v))
if err != nil {
logger.Error(err)
break
}
if flush {
flusher.Flush()
}
}
bufOutput = buildOutput(w, output, outputTransform)
}()
enc := json.NewEncoder(w)
add := adder.New(dags, params, output)
root, err := add.FromMultipart(ctx, reader)
if err != nil {
// Set trailer with error
w.Header().Set("X-Stream-Error", err.Error())
if err != nil { // Send an error
logger.Error(err)
w.WriteHeader(http.StatusInternalServerError)
errorResp := api.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
}
if err := enc.Encode(errorResp); err != nil {
logger.Error(err)
}
wg.Wait()
return root, err
}
wg.Wait()
w.WriteHeader(http.StatusOK)
enc.Encode(bufOutput)
return root, err
}
func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) {
flusher, flush := w.(http.Flusher)
enc := json.NewEncoder(w)
for v := range output {
err := enc.Encode(transform(v))
if err != nil {
logger.Error(err)
break
}
if flush {
flusher.Flush()
}
}
}
func buildOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) []interface{} {
var finalOutput []interface{}
for v := range output {
finalOutput = append(finalOutput, transform(v))
}
return finalOutput
}

View File

@ -24,31 +24,33 @@ type AddedOutput struct {
type AddParams struct {
PinOptions
Recursive bool
Layout string
Chunker string
RawLeaves bool
Hidden bool
Wrap bool
Shard bool
Progress bool
CidVersion int
HashFun string
Recursive bool
Layout string
Chunker string
RawLeaves bool
Hidden bool
Wrap bool
Shard bool
Progress bool
CidVersion int
HashFun string
StreamChannels bool
}
// DefaultAddParams returns a AddParams object with standard defaults
func DefaultAddParams() *AddParams {
return &AddParams{
Recursive: false,
Layout: "", // corresponds to balanced layout
Chunker: "size-262144",
RawLeaves: false,
Hidden: false,
Wrap: false,
Shard: false,
Progress: false,
CidVersion: 0,
HashFun: "sha2-256",
Recursive: false,
Layout: "", // corresponds to balanced layout
Chunker: "size-262144",
RawLeaves: false,
Hidden: false,
Wrap: false,
Shard: false,
Progress: false,
CidVersion: 0,
HashFun: "sha2-256",
StreamChannels: true,
PinOptions: PinOptions{
ReplicationFactorMin: 0,
ReplicationFactorMax: 0,
@ -90,7 +92,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
case "trickle", "balanced", "":
// nothing
default:
return nil, errors.New("parameter trickle invalid")
return nil, errors.New("layout parameter invalid")
}
params.Layout = layout
@ -153,6 +155,11 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
params.ShardSize = shardSize
}
err = parseBoolParam(query, "stream-channels", &params.StreamChannels)
if err != nil {
return nil, err
}
return params, nil
}
@ -173,6 +180,7 @@ func (p *AddParams) ToQueryString() string {
query.Set("progress", fmt.Sprintf("%t", p.Progress))
query.Set("cid-version", fmt.Sprintf("%d", p.CidVersion))
query.Set("hash", p.HashFun)
query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels))
return query.Encode()
}
@ -190,5 +198,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool {
p.Hidden == p2.Hidden &&
p.Wrap == p2.Wrap &&
p.CidVersion == p2.CidVersion &&
p.HashFun == p2.HashFun
p.HashFun == p2.HashFun &&
p.StreamChannels == p2.StreamChannels
}