d19c7facff
It has been observed that some peers have a growing number of goroutines, usually stuck in go-libp2p-gorpc.MultiStream() function, which is waiting to read items from the arguments channel. We suspect this is due to aborted /add requests. In situations when the add request is aborted or fails, Finalize() is never called and the blocks channel stays open, so MultiStream() can never exit, and the BlockStreamer can never stop streaming etc. As a fix, we added the requirement to call Close() when we stop using a ClusterDAGService (error or not). This should ensure that the blocks channel is always closed and not just on Finalize().
137 lines
3.6 KiB
Go
137 lines
3.6 KiB
Go
// Package adderutils provides some utilities for adding content to cluster.
|
|
package adderutils
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"github.com/ipfs-cluster/ipfs-cluster/adder"
|
|
"github.com/ipfs-cluster/ipfs-cluster/adder/sharding"
|
|
"github.com/ipfs-cluster/ipfs-cluster/adder/single"
|
|
"github.com/ipfs-cluster/ipfs-cluster/api"
|
|
|
|
logging "github.com/ipfs/go-log/v2"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
)
|
|
|
|
var logger = logging.Logger("adder")
|
|
|
|
// AddMultipartHTTPHandler is a helper function to add content
|
|
// uploaded using a multipart request. The outputTransform parameter
|
|
// allows to customize the http response output format to something
|
|
// else than api.AddedOutput objects.
|
|
func AddMultipartHTTPHandler(
|
|
ctx context.Context,
|
|
rpc *rpc.Client,
|
|
params api.AddParams,
|
|
reader *multipart.Reader,
|
|
w http.ResponseWriter,
|
|
outputTransform func(api.AddedOutput) interface{},
|
|
) (api.Cid, error) {
|
|
var dags adder.ClusterDAGService
|
|
output := make(chan api.AddedOutput, 200)
|
|
|
|
if params.Shard {
|
|
dags = sharding.New(ctx, rpc, params, output)
|
|
} else {
|
|
dags = single.New(ctx, rpc, params, params.Local)
|
|
}
|
|
defer dags.Close()
|
|
|
|
if outputTransform == nil {
|
|
outputTransform = func(in api.AddedOutput) interface{} { return in }
|
|
}
|
|
|
|
// This must be application/json otherwise go-ipfs client
|
|
// will break.
|
|
w.Header().Set("Content-Type", "application/json")
|
|
// Browsers should not cache these responses.
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
// We need to ask the clients to close the connection
|
|
// (no keep-alive) of things break badly when adding.
|
|
// https://github.com/ipfs/go-ipfs-cmds/pull/116
|
|
w.Header().Set("Connection", "close")
|
|
|
|
var wg sync.WaitGroup
|
|
if !params.StreamChannels {
|
|
// in this case we buffer responses in memory and
|
|
// return them as a valid JSON array.
|
|
wg.Add(1)
|
|
var bufOutput []interface{} // a slice of transformed AddedOutput
|
|
go func() {
|
|
defer wg.Done()
|
|
bufOutput = buildOutput(output, outputTransform)
|
|
}()
|
|
|
|
enc := json.NewEncoder(w)
|
|
add := adder.New(dags, params, output)
|
|
root, err := add.FromMultipart(ctx, reader)
|
|
if err != nil { // Send an error
|
|
logger.Error(err)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
errorResp := api.Error{
|
|
Code: http.StatusInternalServerError,
|
|
Message: err.Error(),
|
|
}
|
|
|
|
if err := enc.Encode(errorResp); err != nil {
|
|
logger.Error(err)
|
|
}
|
|
wg.Wait()
|
|
return root, err
|
|
}
|
|
wg.Wait()
|
|
w.WriteHeader(http.StatusOK)
|
|
enc.Encode(bufOutput)
|
|
return root, err
|
|
}
|
|
|
|
// handle stream-adding. This should be the default.
|
|
|
|
// https://github.com/ipfs-shipyard/ipfs-companion/issues/600
|
|
w.Header().Set("X-Chunked-Output", "1")
|
|
// Used by go-ipfs to signal errors half-way through the stream.
|
|
w.Header().Set("Trailer", "X-Stream-Error")
|
|
w.WriteHeader(http.StatusOK)
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
streamOutput(w, output, outputTransform)
|
|
}()
|
|
add := adder.New(dags, params, output)
|
|
root, err := add.FromMultipart(ctx, reader)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
// Set trailer with error
|
|
w.Header().Set("X-Stream-Error", err.Error())
|
|
}
|
|
wg.Wait()
|
|
return root, err
|
|
}
|
|
|
|
func streamOutput(w http.ResponseWriter, output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) {
|
|
flusher, flush := w.(http.Flusher)
|
|
enc := json.NewEncoder(w)
|
|
for v := range output {
|
|
err := enc.Encode(transform(v))
|
|
if err != nil {
|
|
logger.Error(err)
|
|
break
|
|
}
|
|
if flush {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
|
|
func buildOutput(output chan api.AddedOutput, transform func(api.AddedOutput) interface{}) []interface{} {
|
|
var finalOutput []interface{}
|
|
for v := range output {
|
|
finalOutput = append(finalOutput, transform(v))
|
|
}
|
|
return finalOutput
|
|
}
|