Move adder.Params to api.AddParams. Re-use in other modules

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-08-06 12:44:44 +02:00
parent 7c08177d6f
commit 8f1a15b279
15 changed files with 145 additions and 94 deletions

View File

@ -4,6 +4,8 @@ import (
"context"
"mime/multipart"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
)
@ -14,5 +16,5 @@ var logger = logging.Logger("adder")
type Adder interface {
// FromMultipart adds from a multipart reader and returns
// the resulting CID.
FromMultipart(context.Context, *multipart.Reader, *Params) (*cid.Cid, error)
FromMultipart(context.Context, *multipart.Reader, *api.AddParams) (*cid.Cid, error)
}

View File

@ -6,10 +6,10 @@ import (
"io"
"sync"
"github.com/ipfs/go-ipfs-cmdkit/files"
"github.com/ipfs/ipfs-cluster/adder/ipfsadd"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/go-ipfs-cmdkit/files"
)
// BlockHandler is a function used to process a block as is received by the
@ -23,7 +23,7 @@ type Importer struct {
started bool
files files.File
params *Params
params *api.AddParams
output chan *api.AddedOutput
blocks chan *api.NodeWithMeta
@ -31,7 +31,7 @@ type Importer struct {
}
// NewImporter sets up an Importer ready to Go().
func NewImporter(f files.File, p *Params) (*Importer, error) {
func NewImporter(f files.File, p *api.AddParams) (*Importer, error) {
output := make(chan *api.AddedOutput, 1)
blocks := make(chan *api.NodeWithMeta, 1)
errors := make(chan error, 1)

View File

@ -13,7 +13,7 @@ func TestImporter(t *testing.T) {
defer sth.Clean()
f := sth.GetTreeSerialFile(t)
p := DefaultParams()
p := api.DefaultAddParams()
imp, err := NewImporter(f, p)
if err != nil {
@ -59,7 +59,7 @@ func TestImporter_DoubleStart(t *testing.T) {
defer sth.Clean()
f := sth.GetTreeSerialFile(t)
p := DefaultParams()
p := api.DefaultAddParams()
imp, err := NewImporter(f, p)
if err != nil {

View File

@ -7,16 +7,15 @@ import (
"errors"
"mime/multipart"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
"github.com/ipfs/go-ipfs-cmdkit/files"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-cmdkit/files"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("addlocal")
@ -69,7 +68,7 @@ func (a *Adder) putBlock(ctx context.Context, n *api.NodeWithMeta, dests []peer.
}
// FromMultipart allows to add a file encoded as multipart.
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader, p *adder.Params) (*cid.Cid, error) {
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader, p *api.AddParams) (*cid.Cid, error) {
f := &files.MultipartFile{
Mediatype: "multipart/form-data",
Reader: r,

View File

@ -7,7 +7,6 @@ import (
"sync"
"testing"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
@ -53,9 +52,9 @@ func TestFromMultipart(t *testing.T) {
defer sth.Clean()
mr := sth.GetTreeMultiReader(t)
r := multipart.NewReader(mr, mr.Boundary())
params := adder.DefaultParams()
params := api.DefaultAddParams()
params.ShardSize = 0
rootCid, err := add.FromMultipart(context.Background(), r, adder.DefaultParams())
rootCid, err := add.FromMultipart(context.Background(), r, api.DefaultAddParams())
if err != nil {
t.Fatal(err)
}
@ -92,7 +91,7 @@ func TestFromMultipart(t *testing.T) {
defer sth.Clean()
mr := sth.GetTreeMultiReader(t)
r := multipart.NewReader(mr, mr.Boundary())
p := adder.DefaultParams()
p := api.DefaultAddParams()
p.Layout = "trickle"
rootCid, err := add.FromMultipart(context.Background(), r, p)

View File

@ -8,13 +8,12 @@ import (
"fmt"
"mime/multipart"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
"github.com/ipfs/go-ipfs-cmdkit/files"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-cmdkit/files"
logging "github.com/ipfs/go-log"
)
@ -37,7 +36,7 @@ func New(rpc *rpc.Client) *Adder {
}
// FromMultipart allows to add (and shard) a file encoded as multipart.
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader, p *adder.Params) (*cid.Cid, error) {
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader, p *api.AddParams) (*cid.Cid, error) {
logger.Debugf("adding from multipart with params: %+v", p)
f := &files.MultipartFile{

View File

@ -7,7 +7,6 @@ import (
"sync"
"testing"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
@ -86,7 +85,7 @@ func TestFromMultipart(t *testing.T) {
add, rpcObj, r := makeAdder(t, sth.GetTreeMultiReader)
_ = rpcObj
p := adder.DefaultParams()
p := api.DefaultAddParams()
// Total data is about
p.ShardSize = 1024 * 300 // 300kB
p.Name = "testingFile"
@ -147,7 +146,7 @@ func TestFromMultipart(t *testing.T) {
add, rpcObj, r := makeAdder(t, mrF)
_ = rpcObj
p := adder.DefaultParams()
p := api.DefaultAddParams()
// Total data is about
p.ShardSize = 1024 * 1024 * 2 // 2MB
p.Name = "testingFile"
@ -172,13 +171,13 @@ func TestFromMultipart(t *testing.T) {
func TestFromMultipart_Errors(t *testing.T) {
type testcase struct {
name string
params *adder.Params
params *api.AddParams
}
tcs := []*testcase{
&testcase{
name: "bad chunker",
params: &adder.Params{
params: &api.AddParams{
Layout: "",
Chunker: "aweee",
RawLeaves: false,
@ -194,7 +193,7 @@ func TestFromMultipart_Errors(t *testing.T) {
},
&testcase{
name: "shard size too small",
params: &adder.Params{
params: &api.AddParams{
Layout: "",
Chunker: "",
RawLeaves: false,
@ -210,7 +209,7 @@ func TestFromMultipart_Errors(t *testing.T) {
},
&testcase{
name: "replication too high",
params: &adder.Params{
params: &api.AddParams{
Layout: "",
Chunker: "",
RawLeaves: false,
@ -242,7 +241,7 @@ func TestFromMultipart_Errors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
add, _, r := makeAdder(t, sth.GetTreeMultiReader)
_, err := add.FromMultipart(ctx, r, adder.DefaultParams())
_, err := add.FromMultipart(ctx, r, api.DefaultAddParams())
if err != ctx.Err() {
t.Error("expected context error:", err)
}

View File

@ -1,12 +1,10 @@
package adder
package api
import (
"errors"
"fmt"
"net/url"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
)
// DefaultShardSize is the shard size for params objects created with DefaultParams().
@ -14,8 +12,8 @@ var DefaultShardSize = uint64(100 * 1024 * 1024) // 100 MB
// Params contains all of the configurable parameters needed to specify the
// importing process of a file being added to an ipfs-cluster
type Params struct {
api.PinOptions
type AddParams struct {
PinOptions
Layout string
Chunker string
@ -24,15 +22,15 @@ type Params struct {
Shard bool
}
// DefaultParams returns a Params object with standard defaults
func DefaultParams() *Params {
return &Params{
// DefaultAddParams returns a AddParams object with standard defaults
func DefaultAddParams() *AddParams {
return &AddParams{
Layout: "", // corresponds to balanced layout
Chunker: "",
Chunker: "size-262144",
RawLeaves: false,
Hidden: false,
Shard: false,
PinOptions: api.PinOptions{
PinOptions: PinOptions{
ReplicationFactorMin: 0,
ReplicationFactorMax: 0,
Name: "",
@ -63,10 +61,10 @@ func parseIntParam(q url.Values, name string, dest *int) error {
return nil
}
// ParamsFromQuery parses the Params object from
// AddParamsFromQuery parses the AddParams object from
// a URL.Query().
func ParamsFromQuery(query url.Values) (*Params, error) {
params := DefaultParams()
func AddParamsFromQuery(query url.Values) (*AddParams, error) {
params := DefaultAddParams()
layout := query.Get("layout")
switch layout {
@ -115,3 +113,35 @@ func ParamsFromQuery(query url.Values) (*Params, error) {
return params, nil
}
// ToQueryString returns a url query string (key=value&key2=value2&...)
func (p *AddParams) ToQueryString() string {
fmtStr := "repl_min=%d&repl_max=%d&name=%s&"
fmtStr += "shard=%t&shard_size=%d&"
fmtStr += "layout=%s&chunker=%s&raw=%t&hidden=%t"
query := fmt.Sprintf(
fmtStr,
p.ReplicationFactorMin,
p.ReplicationFactorMax,
p.Name,
p.Shard,
p.ShardSize,
p.Layout,
p.Chunker,
p.RawLeaves,
p.Hidden,
)
return query
}
func (p *AddParams) Equals(p2 *AddParams) bool {
return p.ReplicationFactorMin == p2.ReplicationFactorMin &&
p.ReplicationFactorMax == p2.ReplicationFactorMax &&
p.Name == p2.Name &&
p.Shard == p2.Shard &&
p.ShardSize == p2.ShardSize &&
p.Layout == p2.Layout &&
p.Chunker == p2.Chunker &&
p.RawLeaves == p2.RawLeaves &&
p.Hidden == p2.Hidden
}

View File

@ -1,11 +1,11 @@
package adder
package api
import (
"net/url"
"testing"
)
func TestParamsFromQuery(t *testing.T) {
func TestAddParams_FromQuery(t *testing.T) {
qStr := "layout=balanced&chunker=size-262144&name=test&raw=true&hidden=true&shard=true&repl_min=2&repl_max=4&shard_size=1"
q, err := url.ParseQuery(qStr)
@ -13,7 +13,7 @@ func TestParamsFromQuery(t *testing.T) {
t.Fatal(err)
}
p, err := ParamsFromQuery(q)
p, err := AddParamsFromQuery(q)
if err != nil {
t.Fatal(err)
}
@ -27,3 +27,27 @@ func TestParamsFromQuery(t *testing.T) {
t.Fatal("did not parse the query correctly")
}
}
func TestAddParams_ToQueryString(t *testing.T) {
p := DefaultAddParams()
p.ReplicationFactorMin = 3
p.ReplicationFactorMax = 6
p.Name = "something"
p.RawLeaves = true
p.ShardSize = 1020
qstr := p.ToQueryString()
q, err := url.ParseQuery(qstr)
if err != nil {
t.Fatal()
}
p2, err := AddParamsFromQuery(q)
if err != nil {
t.Fatal(err)
}
if !p.Equals(p2) {
t.Error("generated and parsed params should be equal")
}
}

View File

@ -308,36 +308,19 @@ func statusReached(target api.TrackerStatus, gblPinInfo api.GlobalPinInfo) (bool
// sharding underlying dags across the ipfs repos of multiple cluster peers.
func (c *Client) AddMultiFile(
multiFileR *files.MultiFileReader,
replicationFactorMin int,
replicationFactorMax int,
name string,
shard bool,
shardSize int,
layout string,
chunker string,
raw bool,
hidden bool,
params *api.AddParams,
) error {
headers := make(map[string]string)
headers["Content-Type"] = "multipart/form-data; boundary=" + multiFileR.Boundary()
fmtStr := "/allocations?repl_min=%d&repl_max=%d&name=%s&"
fmtStr += "shard=%t&shard-size=%d&"
fmtStr += "layout=%s&chunker=%s&raw=%t&hidden=%t"
url := fmt.Sprintf(
fmtStr,
replicationFactorMin,
replicationFactorMax,
name,
shard,
shardSize,
layout,
chunker,
raw,
hidden,
)
queryStr := params.ToQueryString()
output := make([]api.AddedOutput, 0)
err := c.doStream("POST", url, multiFileR, headers, &output)
err := c.doStream(
"POST",
"/allocations?"+queryStr,
multiFileR,
headers,
&output,
)
// TODO: handle output
return err
}

View File

@ -438,7 +438,22 @@ func TestAddMultiFile(t *testing.T) {
testF := func(t *testing.T, c *Client) {
sth := test.NewShardingTestHelper()
mfr := sth.GetTreeMultiReader(t)
err := c.AddMultiFile(mfr, -1, -1, "test", false, 1024, "", "", false, false)
p := &types.AddParams{
PinOptions: types.PinOptions{
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
Name: "test",
ShardSize: 1024,
},
Shard: false,
Layout: "",
Chunker: "",
RawLeaves: false,
Hidden: false,
}
err := c.AddMultiFile(mfr, p)
if err != nil {
t.Fatal(err)
}

View File

@ -509,7 +509,7 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
return
}
params, err := adder.ParamsFromQuery(r.URL.Query())
params, err := types.AddParamsFromQuery(r.URL.Query())
if err != nil {
sendErrorResponse(w, 400, err.Error())
return

View File

@ -1123,7 +1123,7 @@ func (c *Cluster) unpinShard(cdagCid, shardCid *cid.Cid) error {
// pipeline is used to DAGify the file. Depending on input parameters this
// DAG can be added locally to the calling cluster peer's ipfs repo, or
// sharded across the entire cluster.
func (c *Cluster) AddFile(reader *multipart.Reader, params *adder.Params) (*cid.Cid, error) {
func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (*cid.Cid, error) {
var add adder.Adder
if params.Shard {
add = sharding.New(c.rpcClient)

View File

@ -10,7 +10,6 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/api"
@ -262,7 +261,7 @@ func TestAddFile(t *testing.T) {
defer sth.Clean()
t.Run("local", func(t *testing.T) {
params := adder.DefaultParams()
params := api.DefaultAddParams()
params.Shard = false
params.Name = "testlocal"
mfr := sth.GetTreeMultiReader(t)
@ -290,7 +289,7 @@ func TestAddFile(t *testing.T) {
})
t.Run("shard", func(t *testing.T) {
params := adder.DefaultParams()
params := api.DefaultAddParams()
params.Shard = true
params.Name = "testshard"
mfr := sth.GetTreeMultiReader(t)
@ -318,7 +317,7 @@ func TestUnpinShard(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean()
params := adder.DefaultParams()
params := api.DefaultAddParams()
params.Shard = true
params.Name = "testshard"
mfr := sth.GetTreeMultiReader(t)

View File

@ -33,7 +33,7 @@ var (
defaultUsername = ""
defaultPassword = ""
defaultWaitCheckFreq = time.Second
defaultShardSize = 100 * 1024 * 1024
defaultAddParams = api.DefaultAddParams()
)
var logger = logging.Logger("cluster-ctl")
@ -270,26 +270,26 @@ automatically generated.
},
cli.StringFlag{
Name: "name, n",
Value: "",
Value: defaultAddParams.Name,
Usage: "Sets a name for this pin",
},
cli.IntFlag{
Name: "replication-min, rmin",
Value: 0,
Value: defaultAddParams.ReplicationFactorMin,
Usage: "Sets the minimum replication factor for pinning this file",
},
cli.IntFlag{
Name: "replication-max, rmax",
Value: 0,
Value: defaultAddParams.ReplicationFactorMax,
Usage: "Sets the maximum replication factor for pinning this file",
},
cli.BoolFlag{
Name: "shard",
Usage: "Break the file into pieces (shards) and distributed among peers",
},
cli.IntFlag{
cli.Uint64Flag{
Name: "shard-size",
Value: defaultShardSize,
Value: defaultAddParams.ShardSize,
Usage: "Sets the maximum replication factor for pinning this file",
},
// cli.BoolFlag{
@ -298,13 +298,13 @@ automatically generated.
// },
cli.StringFlag{
Name: "layout, L",
Value: defaultAddParams.Layout,
Usage: "Dag layout to use for dag generation: balanced or trickle",
Value: "balanced",
},
cli.StringFlag{
Name: "chunker, s",
Usage: "Chunker selection. Fixed block size: 'size-<size>', or rabin chunker: 'rabin-<min>-<avg>-<max>'",
Value: "size-262144",
Value: defaultAddParams.Chunker,
},
cli.BoolFlag{
Name: "raw-leaves",
@ -343,17 +343,19 @@ automatically generated.
// Files are all opened but not read until they are sent.
multiFileR, err := parseFileArgs(paths, c.Bool("recursive"), c.Bool("hidden"))
checkErr("serializing all files", err)
p := api.DefaultAddParams()
p.ReplicationFactorMin = c.Int("replication-min")
p.ReplicationFactorMax = c.Int("replication-max")
p.Name = name
p.Shard = shard
p.ShardSize = c.Uint64("shard-size")
p.Layout = c.String("layout")
p.Chunker = c.String("chunker")
p.RawLeaves = c.Bool("raw-leaves")
p.Hidden = c.Bool("hidden")
cerr := globalClient.AddMultiFile(
multiFileR,
c.Int("replication-min"),
c.Int("replication-max"),
name,
shard,
c.Int("shard-size"),
c.String("layout"),
c.String("chunker"),
c.Bool("raw-leaves"),
c.Bool("hidden"),
p,
)
// TODO: output control
// if c.Bool("only-hashes") {