Merge pull request #505 from ipfs/fix/495-proxy-add

Fix #495: Hijack proxy /add correctly. Disable keep-alives.
This commit is contained in:
Hector Sanjuan 2018-08-20 10:29:20 +02:00 committed by GitHub
commit 63fa1ba213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 203 additions and 335 deletions

View File

@ -111,6 +111,7 @@ func (a *Adder) FromFiles(ctx context.Context, f files.File) (*cid.Cid, error) {
ipfsAdder.Wrap = a.params.Wrap
ipfsAdder.Chunker = a.params.Chunker
ipfsAdder.Out = a.output
ipfsAdder.Progress = a.params.Progress
for {
select {

View File

@ -0,0 +1,68 @@
// Package adderutils provides some utilities for adding content to cluster.
package adderutils
import (
"context"
"encoding/json"
"mime/multipart"
"net/http"
"sync"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/local"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
)
var logger = logging.Logger("adder")
// AddMultipartHTTPHandler is a helper function to add content
// uploaded using a multipart request.
func AddMultipartHTTPHandler(
ctx context.Context,
rpc *rpc.Client,
params *api.AddParams,
reader *multipart.Reader,
w http.ResponseWriter,
) (*cid.Cid, error) {
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
flusher, flush := w.(http.Flusher)
if params.Shard {
dags = sharding.New(rpc, params.PinOptions, output)
} else {
dags = local.New(rpc, params.PinOptions)
}
enc := json.NewEncoder(w)
// This must be application/json otherwise go-ipfs client
// will break.
w.Header().Add("Content-Type", "application/json")
// Browsers should not cache when streaming content.
w.Header().Add("Cache-Control", "no-cache")
w.WriteHeader(http.StatusOK)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range output {
err := enc.Encode(v)
if err != nil {
logger.Error(err)
}
if flush {
flusher.Flush()
}
}
}()
add := adder.New(dags, params, output)
root, err := add.FromMultipart(ctx, reader)
wg.Wait()
return root, err
}

View File

@ -31,6 +31,7 @@ type AddParams struct {
Hidden bool
Wrap bool
Shard bool
Progress bool
}
// DefaultAddParams returns a AddParams object with standard defaults
@ -42,6 +43,7 @@ func DefaultAddParams() *AddParams {
Hidden: false,
Wrap: false,
Shard: false,
Progress: false,
PinOptions: PinOptions{
ReplicationFactorMin: 0,
ReplicationFactorMax: 0,
@ -108,6 +110,12 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
if err != nil {
return nil, err
}
err = parseBoolParam(query, "progress", &params.Progress)
if err != nil {
return nil, err
}
err = parseIntParam(query, "replication-min", &params.ReplicationFactorMin)
if err != nil {
return nil, err
@ -133,7 +141,7 @@ func (p *AddParams) ToQueryString() string {
fmtStr := "replication-min=%d&replication-max=%d&name=%s&"
fmtStr += "shard=%t&shard-size=%d&"
fmtStr += "layout=%s&chunker=%s&raw-leaves=%t&hidden=%t&"
fmtStr += "wrap-with-directory=%t"
fmtStr += "wrap-with-directory=%t&progress=%t"
query := fmt.Sprintf(
fmtStr,
p.ReplicationFactorMin,
@ -146,6 +154,7 @@ func (p *AddParams) ToQueryString() string {
p.RawLeaves,
p.Hidden,
p.Wrap,
p.Progress,
)
return query
}

View File

@ -21,9 +21,7 @@ import (
"sync"
"time"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/local"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/adder/adderutils"
types "github.com/ipfs/ipfs-cluster/api"
mux "github.com/gorilla/mux"
@ -115,7 +113,9 @@ func NewAPIWithHost(cfg *Config, h host.Host) (*API, error) {
IdleTimeout: cfg.IdleTimeout,
Handler: router,
}
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
// See: https://github.com/ipfs/go-ipfs/issues/5168
s.SetKeepAlivesEnabled(false)
ctx, cancel := context.WithCancel(context.Background())
@ -514,36 +514,13 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
return
}
output := make(chan *types.AddedOutput, 200)
var dags adder.ClusterDAGService
if params.Shard {
dags = sharding.New(api.rpcClient, params.PinOptions, output)
} else {
dags = local.New(api.rpcClient, params.PinOptions)
}
enc := json.NewEncoder(w)
w.Header().Add("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range output {
err := enc.Encode(v)
if err != nil {
logger.Error(err)
}
}
}()
add := adder.New(dags, params, output)
c, err := add.FromMultipart(api.ctx, reader)
_ = c
wg.Wait()
_, err = adderutils.AddMultipartHTTPHandler(
api.ctx,
api.rpcClient,
params,
reader,
w,
)
if err != nil {
errorResp := types.AddedOutput{
@ -552,6 +529,8 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
Message: err.Error(),
},
}
enc := json.NewEncoder(w)
if err := enc.Encode(errorResp); err != nil {
logger.Error(err)
}

View File

@ -13,11 +13,11 @@ import (
"net"
"net/http"
"net/url"
"path/filepath"
"strings"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
@ -160,7 +160,9 @@ func NewConnector(cfg *Config) (*Connector, error) {
IdleTimeout: cfg.ProxyIdleTimeout,
Handler: smux,
}
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
// See: https://github.com/ipfs/go-ipfs/issues/5168
s.SetKeepAlivesEnabled(false) // A reminder that this can be changed
c := &http.Client{} // timeouts are handled by context timeouts
@ -283,7 +285,7 @@ func (ipfs *Connector) proxyResponse(w http.ResponseWriter, res *http.Response,
func (ipfs *Connector) defaultHandler(w http.ResponseWriter, r *http.Request) {
res, err := ipfs.proxyRequest(r)
if err != nil {
http.Error(w, "error forwarding request: "+err.Error(), 500)
ipfsErrorResponder(w, "error forwarding request: "+err.Error())
return
}
ipfs.proxyResponse(w, res, res.Body)
@ -395,165 +397,74 @@ func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
}
func (ipfs *Connector) addHandler(w http.ResponseWriter, r *http.Request) {
// Handle some request options
q := r.URL.Query()
// Remember if the user does not want cluster/ipfs to pin
doNotPin := q.Get("pin") == "false"
// make sure the local peer does not pin.
// Cluster will decide where to pin based on metrics and current
// allocations.
q.Set("pin", "false")
r.URL.RawQuery = q.Encode()
res, err := ipfs.proxyRequest(r)
reader, err := r.MultipartReader()
if err != nil {
http.Error(w, "error forwarding request: "+err.Error(), 500)
return
}
defer res.Body.Close()
// Shortcut some cases where there is nothing else to do
if scode := res.StatusCode; scode != http.StatusOK {
logger.Warningf("proxy /add request returned %d", scode)
ipfs.proxyResponse(w, res, res.Body)
ipfsErrorResponder(w, "error reading request: "+err.Error())
return
}
if doNotPin {
logger.Debug("proxy /add requests has pin==false")
ipfs.proxyResponse(w, res, res.Body)
q := r.URL.Query()
if q.Get("only-hash") == "true" {
ipfsErrorResponder(w, "only-hash is not supported when adding to cluster")
}
unpin := q.Get("pin") == "false"
// Luckily, most IPFS add query params are compatible with cluster's
// /add params. We can parse most of them directly from the query.
params, err := api.AddParamsFromQuery(q)
if err != nil {
ipfsErrorResponder(w, "error parsing options:"+err.Error())
return
}
trickle := q.Get("trickle")
if trickle == "true" {
params.Layout = "trickle"
}
logger.Warningf("Proxy/add does not support all IPFS params. Current options: %+v", params)
sendAddingError := func(err error) {
errorResp := ipfsError{
Message: err.Error(),
}
enc := json.NewEncoder(w)
if err := enc.Encode(errorResp); err != nil {
logger.Error(err)
}
}
root, err := adderutils.AddMultipartHTTPHandler(
ipfs.ctx,
ipfs.rpcClient,
params,
reader,
w,
)
if err != nil {
sendAddingError(err)
return
}
// The ipfs-add response is a streaming-like body where
// { "Name" : "filename", "Hash": "cid" } objects are provided
// for every added object.
// We will need to re-read the response in order to re-play it to
// the client at the end, therefore we make a copy in bodyCopy
// while decoding.
bodyCopy := new(bytes.Buffer)
bodyReader := io.TeeReader(res.Body, bodyCopy)
ipfsAddResps := []ipfsAddResp{}
dec := json.NewDecoder(bodyReader)
for dec.More() {
var addResp ipfsAddResp
err := dec.Decode(&addResp)
if err != nil {
http.Error(w, "error decoding response: "+err.Error(), 502)
return
}
if addResp.Bytes != 0 {
// This is a progress notification, so we ignore it
continue
}
ipfsAddResps = append(ipfsAddResps, addResp)
}
if len(ipfsAddResps) == 0 {
logger.Warning("proxy /add request response was OK but empty")
ipfs.proxyResponse(w, res, bodyCopy)
if !unpin {
return
}
// An ipfs-add call can add multiple files and pin multiple items.
// The go-ipfs api is not perfectly behaved here (i.e. when passing in
// two directories to pin). There is no easy way to know for sure what
// has been pinned recursively and what not.
// Usually when pinning a directory, the recursive pin comes last.
// But we may just be pinning different files and no directories.
// In that case, we need to recursively pin them separately.
// decideRecursivePins() takes a conservative approach. It
// works on the regular use-cases. Otherwise, it might pin
// more things than it should.
pinHashes := decideRecursivePins(ipfsAddResps, r.URL.Query())
logger.Debugf("proxy /add request and will pin %s", pinHashes)
for _, pin := range pinHashes {
err := ipfs.rpcClient.Call(
"",
"Cluster",
"Pin",
api.PinSerial{
Cid: pin,
},
&struct{}{},
)
if err != nil {
// we need to fail the operation and make sure the
// user knows about it.
msg := "add operation was successful but "
msg += "an error occurred performing the cluster "
msg += "pin operation: " + err.Error()
logger.Error(msg)
http.Error(w, msg, 500)
return
}
// Unpin because the user doesn't want to pin
time.Sleep(100 * time.Millisecond)
err = ipfs.rpcClient.CallContext(
ipfs.ctx,
"",
"Cluster",
"Unpin",
api.PinCid(root).ToSerial(),
&struct{}{},
)
if err != nil {
sendAddingError(err)
return
}
// Finally, send the original response back
ipfs.proxyResponse(w, res, bodyCopy)
}
// decideRecursivePins takes the answers from ipfsAddResp and
// figures out which of the pinned items need to be pinned
// recursively in cluster. That is, it guesses which items
// ipfs would have pinned recursively.
// When adding multiple files+directories, it may end up
// pinning more than it should because ipfs API does not
// behave well in these cases.
// It should work well for regular usecases: pin 1 file,
// pin 1 directory, pin several files.
func decideRecursivePins(added []ipfsAddResp, q url.Values) []string {
// When wrap-in-directory, return last element only.
_, ok := q["wrap-in-directory"]
if ok && q.Get("wrap-in-directory") == "true" {
return []string{
added[len(added)-1].Hash,
}
}
toPin := []string{}
baseFolders := make(map[string]struct{})
// Guess base folder names
baseFolder := func(path string) string {
slashed := filepath.ToSlash(path)
parts := strings.Split(slashed, "/")
if len(parts) == 0 {
return ""
}
if parts[0] == "" && len(parts) > 1 {
return parts[1]
}
return parts[0]
}
for _, add := range added {
if add.Hash == "" {
continue
}
b := baseFolder(add.Name)
if b != "" {
baseFolders[b] = struct{}{}
}
}
for _, add := range added {
if add.Hash == "" {
continue
}
_, ok := baseFolders[add.Name]
if ok { // it's a base folder, pin it
toPin = append(toPin, add.Hash)
} else { // otherwise, pin if there is no
// basefolder to it.
b := baseFolder(add.Name)
_, ok := baseFolders[b]
if !ok {
toPin = append(toPin, add.Hash)
}
}
}
return toPin
}
// SetClient makes the component ready to perform RPC

View File

@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
"testing"
@ -40,6 +39,8 @@ func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
if err != nil {
t.Fatal("creating an IPFSConnector should work: ", err)
}
ipfs.server.SetKeepAlivesEnabled(false)
ipfs.SetClient(test.NewMockRPCClient(t))
return ipfs, mock
}
@ -489,44 +490,51 @@ func TestIPFSProxyPinLs(t *testing.T) {
}
func TestProxyAdd(t *testing.T) {
// TODO: find a way to ensure that the calls to
// rpc-api "Pin" happened.
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
urlQueries := []string{
"",
"pin=false",
"progress=true",
"wrap-with-directory",
type testcase struct {
query string
expectedCid string
}
reqs := make([]*http.Request, len(urlQueries), len(urlQueries))
testcases := []testcase{
testcase{
query: "",
expectedCid: test.ShardingDirBalancedRootCID,
},
testcase{
query: "progress=true",
expectedCid: test.ShardingDirBalancedRootCID,
},
testcase{
query: "wrap-with-directory=true",
expectedCid: test.ShardingDirBalancedRootCIDWrapped,
},
testcase{
query: "trickle=true",
expectedCid: test.ShardingDirTrickleRootCID,
},
}
for i := 0; i < len(urlQueries); i++ {
body := new(bytes.Buffer)
w := multipart.NewWriter(body)
part, err := w.CreateFormFile("file", "testfile")
if err != nil {
t.Fatal(err)
}
_, err = part.Write([]byte("this is a multipart file"))
if err != nil {
t.Fatal(err)
}
err = w.Close()
if err != nil {
t.Fatal(err)
}
url := fmt.Sprintf("%s/add?"+urlQueries[i], proxyURL(ipfs))
req, _ := http.NewRequest("POST", url, body)
req.Header.Set("Content-Type", w.FormDataContentType())
reqs := make([]*http.Request, len(testcases), len(testcases))
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
for i, tc := range testcases {
mr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
cType := "multipart/form-data; boundary=" + mr.Boundary()
url := fmt.Sprintf("%s/add?"+tc.query, proxyURL(ipfs))
req, _ := http.NewRequest("POST", url, mr)
req.Header.Set("Content-Type", cType)
reqs[i] = req
}
for i := 0; i < len(urlQueries); i++ {
t.Run(urlQueries[i], func(t *testing.T) {
for i, tc := range testcases {
t.Run(tc.query, func(t *testing.T) {
res, err := http.DefaultClient.Do(reqs[i])
if err != nil {
t.Fatal("should have succeeded: ", err)
@ -536,32 +544,18 @@ func TestProxyAdd(t *testing.T) {
t.Fatalf("Bad response status: got = %d, want = %d", res.StatusCode, http.StatusOK)
}
var hash ipfsAddResp
// We might return a progress notification, so we do it
// like this to ignore it easily
var resp api.AddedOutput
dec := json.NewDecoder(res.Body)
for dec.More() {
var resp ipfsAddResp
err := dec.Decode(&resp)
if err != nil {
t.Fatal(err)
}
if resp.Bytes != 0 {
continue
} else {
hash = resp
}
}
if hash.Hash != test.TestCid3 {
t.Logf("%+v", hash)
t.Error("expected TestCid1 as it is hardcoded in ipfs mock")
}
if hash.Name != "testfile" {
t.Logf("%+v", hash)
t.Error("expected testfile for hash name")
if resp.Hash != tc.expectedCid {
t.Logf("%+v", resp.Hash)
t.Error("expected CID does not match")
}
})
}
@ -581,73 +575,6 @@ func TestProxyAddError(t *testing.T) {
}
}
func TestDecideRecursivePins(t *testing.T) {
type testcases struct {
addResps []ipfsAddResp
query url.Values
expect []string
}
tcs := []testcases{
{
[]ipfsAddResp{{"a", "cida", 0}},
url.Values{},
[]string{"cida"},
},
{
[]ipfsAddResp{{"a/b", "cidb", 0}, {"a", "cida", 0}},
url.Values{},
[]string{"cida"},
},
{
[]ipfsAddResp{{"a/b", "cidb", 0}, {"c", "cidc", 0}, {"a", "cida", 0}},
url.Values{},
[]string{"cidc", "cida"},
},
{
[]ipfsAddResp{{"/a", "cida", 0}},
url.Values{},
[]string{"cida"},
},
{
[]ipfsAddResp{{"a/b/c/d", "cidd", 0}},
url.Values{},
[]string{"cidd"},
},
{
[]ipfsAddResp{{"a", "cida", 0}, {"b", "cidb", 0}, {"c", "cidc", 0}, {"d", "cidd", 0}},
url.Values{},
[]string{"cida", "cidb", "cidc", "cidd"},
},
{
[]ipfsAddResp{{"a", "cida", 0}, {"b", "cidb", 0}, {"", "cidwrap", 0}},
url.Values{
"wrap-in-directory": []string{"true"},
},
[]string{"cidwrap"},
},
{
[]ipfsAddResp{{"b", "", 0}, {"a", "cida", 0}},
url.Values{},
[]string{"cida"},
},
}
for i, tc := range tcs {
r := decideRecursivePins(tc.addResps, tc.query)
for j, ritem := range r {
if len(r) != len(tc.expect) {
t.Errorf("testcase %d failed", i)
break
}
if tc.expect[j] != ritem {
t.Errorf("testcase %d failed for item %d", i, j)
}
}
}
}
func TestProxyError(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()

View File

@ -69,9 +69,9 @@
},
{
"author": "hsanjuan",
"hash": "QmSMWoH8wKuViGUe2ZDr3kDijbzkk81nY71zV1ApibFLxF",
"hash": "QmeyKL7WDSPhnhozCB3oC51j5pDs7DnCGWPyVaxgwpncA6",
"name": "go-libp2p-http",
"version": "1.0.8"
"version": "1.1.0"
},
{
"author": "ipfs",
@ -150,3 +150,4 @@
"releaseCmd": "git commit -S -a -m \"gx publish $VERSION\"",
"version": "0.4.0"
}

View File

@ -6,7 +6,7 @@ statuses=0
for i in t0*.sh;
do
echo "*** $i ***"
./$i
./$i --verbose
status=$?
statuses=$((statuses + $status))
if [ $status -ne 0 ]; then

View File

@ -118,33 +118,6 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
}
j, _ := json.Marshal(resp)
w.Write(j)
case "add":
c, _ := cid.Decode(TestCid3)
// add also pins
m.pinMap.Add(api.PinCid(c))
_, fheader, err := r.FormFile("file")
if err != nil {
http.Error(w, "no file in /add", 500)
return
}
query := r.URL.Query()
progress, ok := query["progress"]
if ok && len(progress) > 0 && progress[0] != "false" {
progressResp := mockAddResp{
Name: fheader.Filename,
Bytes: 4,
}
j, _ := json.Marshal(progressResp)
w.Write(j)
}
resp := mockAddResp{
Name: fheader.Filename,
Hash: TestCid3,
}
j, _ := json.Marshal(resp)
w.Write(j)
case "pin/add":
arg, ok := extractCid(r.URL)
if !ok {

View File

@ -17,11 +17,6 @@ const shardingTestFile = "testFile"
// Variables related to adding the testing directory generated by tests
var (
// Shard and Cdag Cids
TestShardCid = "zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh"
TestShardData, _ = hex.DecodeString("a16130d82a58230012209273fd63ec94bed5abb219b2d9cb010cabe4af7b0177292d4335eff50464060a")
ShardingDirBalancedRootCID = "QmdHXJgxeCFf6qDZqYYmMesV2DbZCVPEdEhj2oVTxP1y7Y"
ShardingDirBalancedRootCIDWrapped = "QmbfGRPTUd7L1xsAZZ1A3kUFP1zkEZ9kHdb6AGaajBzGGX"
ShardingDirTrickleRootCID = "QmYMbx56GFNBDAaAMchtjmWjDTdqNKCSGuFxtRosiPgJL6"
@ -62,6 +57,10 @@ var (
"QmZ2iUT3W7jh8QNnpWSiMZ1QYgpommCSQFZiPY5VdoCHyv",
"QmdmUbN9JS3BK3nvcycyzFUBJqXip5zf7bdKbYM3p14e9h",
}
// Used for testing blockput/blockget
TestShardCid = "zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh"
TestShardData, _ = hex.DecodeString("a16130d82a58230012209273fd63ec94bed5abb219b2d9cb010cabe4af7b0177292d4335eff50464060a")
)
// ShardingTestHelper helps generating files and folders to test adding and