AddFile tests
tests cover local and sharded adds of files ipfs mock and ipfs block put/get calls cleaned up License: MIT Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
parent
fc35bf449c
commit
fabf191883
|
@ -37,17 +37,15 @@ func (a *AddSession) consumeLocalAdd(
|
||||||
//TODO: when ipfs add starts supporting formats other than
|
//TODO: when ipfs add starts supporting formats other than
|
||||||
// v0 (v1.cbor, v1.protobuf) we'll need to update this
|
// v0 (v1.cbor, v1.protobuf) we'll need to update this
|
||||||
outObj.Format = ""
|
outObj.Format = ""
|
||||||
var hash string
|
|
||||||
err := a.rpcClient.Call(
|
err := a.rpcClient.Call(
|
||||||
"",
|
"",
|
||||||
"Cluster",
|
"Cluster",
|
||||||
"IPFSBlockPut",
|
"IPFSBlockPut",
|
||||||
*outObj,
|
*outObj,
|
||||||
&hash,
|
&struct{}{},
|
||||||
)
|
)
|
||||||
if outObj.Cid != hash {
|
|
||||||
a.logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", outObj.Cid, hash)
|
|
||||||
}
|
|
||||||
return outObj.Cid, err // root node returned in case this is last call
|
return outObj.Cid, err // root node returned in case this is last call
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -323,7 +323,7 @@ func TestAPIAddFileEndpoint(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
errResp := api.Error{}
|
errResp := api.Error{}
|
||||||
makePostRaw(t, successUrl, body, "text/html", &errResp)
|
makePostRaw(t, successURL, body, "text/html", &errResp)
|
||||||
if errResp.Code != 415 {
|
if errResp.Code != 415 {
|
||||||
t.Error("expected error with bad content-type")
|
t.Error("expected error with bad content-type")
|
||||||
}
|
}
|
||||||
|
@ -332,14 +332,14 @@ func TestAPIAddFileEndpoint(t *testing.T) {
|
||||||
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
|
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
|
||||||
fmtStr1Bad := "/allocations?shard=true&quiet=false&silent=false&"
|
fmtStr1Bad := "/allocations?shard=true&quiet=false&silent=false&"
|
||||||
failURL := apiURL(rest) + fmtStr1Bad + fmtStr2 + fmtStr3
|
failURL := apiURL(rest) + fmtStr1Bad + fmtStr2 + fmtStr3
|
||||||
makePostRaw(t, failUrl, body, mpContentType, &errResp)
|
makePostRaw(t, failURL, body, mpContentType, &errResp)
|
||||||
if errResp.Code != 500 {
|
if errResp.Code != 500 {
|
||||||
t.Error("expected error with params causing mockrpc AddFile fail")
|
t.Error("expected error with params causing mockrpc AddFile fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test with a correct input
|
// Test with a correct input
|
||||||
resp := []api.AddedOutput{}
|
resp := []api.AddedOutput{}
|
||||||
makePostRaw(t, successUrl, body, mpContentType, &resp)
|
makePostRaw(t, successURL, body, mpContentType, &resp)
|
||||||
if len(resp) != 1 || resp[0].Hash != test.TestCid1 {
|
if len(resp) != 1 || resp[0].Hash != test.TestCid1 {
|
||||||
t.Fatal("unexpected addedoutput from mock rpc on api")
|
t.Fatal("unexpected addedoutput from mock rpc on api")
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (ipfs *mockConnector) ConnectSwarms() error { retu
|
||||||
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
|
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
|
||||||
func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil }
|
func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil }
|
||||||
func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil }
|
func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil }
|
||||||
func (ipfs *mockConnector) BlockPut(nwm api.NodeWithMeta) (string, error) { return "", nil }
|
func (ipfs *mockConnector) BlockPut(nwm api.NodeWithMeta) error { return nil }
|
||||||
|
|
||||||
func (ipfs *mockConnector) BlockGet(c *cid.Cid) ([]byte, error) {
|
func (ipfs *mockConnector) BlockGet(c *cid.Cid) ([]byte, error) {
|
||||||
switch c.String() {
|
switch c.String() {
|
||||||
|
|
|
@ -91,7 +91,7 @@ type IPFSConnector interface {
|
||||||
// by "repo stat".
|
// by "repo stat".
|
||||||
RepoSize() (uint64, error)
|
RepoSize() (uint64, error)
|
||||||
// BlockPut directly adds a block of data to the IPFS repo
|
// BlockPut directly adds a block of data to the IPFS repo
|
||||||
BlockPut(api.NodeWithMeta) (string, error)
|
BlockPut(api.NodeWithMeta) error
|
||||||
// BlockGet retrieves the raw data of an IPFS block
|
// BlockGet retrieves the raw data of an IPFS block
|
||||||
BlockGet(*cid.Cid) ([]byte, error)
|
BlockGet(*cid.Cid) ([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"mime/multipart"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -1647,3 +1648,202 @@ func TestClustersDisabledRepinning(t *testing.T) {
|
||||||
t.Errorf("expected %d replicas for pin, got %d", nClusters-2, numPinned)
|
t.Errorf("expected %d replicas for pin, got %d", nClusters-2, numPinned)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClustersAddFileLocal(t *testing.T) {
|
||||||
|
clusters, mock := createClusters(t)
|
||||||
|
defer shutdownClusters(t, clusters, mock)
|
||||||
|
|
||||||
|
// Add the testing directory locally to a random cluster node with
|
||||||
|
// default parameters
|
||||||
|
j := rand.Intn(nClusters)
|
||||||
|
multiFileR, err := test.GetTestingDirMultiReader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
multiPartR := multipart.NewReader(multiFileR, multiFileR.Boundary())
|
||||||
|
params := make(map[string][]string)
|
||||||
|
params["shard"] = []string{"false"}
|
||||||
|
params["quiet"] = []string{"false"}
|
||||||
|
params["silent"] = []string{"false"}
|
||||||
|
params["layout"] = []string{""}
|
||||||
|
params["chunker"] = []string{""}
|
||||||
|
params["raw"] = []string{"false"}
|
||||||
|
params["hidden"] = []string{"false"}
|
||||||
|
params["replMin"] = []string{"-1"}
|
||||||
|
params["replMax"] = []string{"-1"}
|
||||||
|
|
||||||
|
output, err := clusters[j].AddFile(multiPartR, params)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
delay()
|
||||||
|
if len(output) != test.NumTestDirPrints {
|
||||||
|
t.Error("Unexpected number of lines output")
|
||||||
|
}
|
||||||
|
rootCid, _ := cid.Decode(test.TestDirBalancedRootCID)
|
||||||
|
|
||||||
|
fPinnedThunk := func(pinnedCid *cid.Cid, numPins int) func(*testing.T, *Cluster) {
|
||||||
|
return func(t *testing.T, c *Cluster) {
|
||||||
|
v := c.tracker.Status(pinnedCid)
|
||||||
|
if v.Status != api.TrackerStatusPinned {
|
||||||
|
t.Errorf("%s should have been pinned but it is %s",
|
||||||
|
v.Cid,
|
||||||
|
v.Status.String())
|
||||||
|
}
|
||||||
|
allStatus := c.tracker.StatusAll()
|
||||||
|
if len(allStatus) != numPins {
|
||||||
|
t.Errorf("Expected %d pins from local file add but got %d", numPins, len(allStatus))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runF(t, clusters, fPinnedThunk(rootCid, 1))
|
||||||
|
|
||||||
|
// Add the same file but as a trickle DAG
|
||||||
|
params["layout"] = []string{"trickle"}
|
||||||
|
multiFileR2, err := test.GetTestingDirMultiReader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
multiPartR2 := multipart.NewReader(multiFileR2, multiFileR2.Boundary())
|
||||||
|
_, err = clusters[j].AddFile(multiPartR2, params)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
delay()
|
||||||
|
|
||||||
|
trickleRoot, _ := cid.Decode(test.TestDirTrickleRootCID)
|
||||||
|
runF(t, clusters, fPinnedThunk(trickleRoot, 2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClustersAddFileShard(t *testing.T) {
|
||||||
|
clusters, mock := createClusters(t)
|
||||||
|
defer shutdownClusters(t, clusters, mock)
|
||||||
|
|
||||||
|
// Add the testing directory sharded across nodes
|
||||||
|
j := rand.Intn(nClusters)
|
||||||
|
multiFileR, err := test.GetTestingDirMultiReader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
multiPartR := multipart.NewReader(multiFileR, multiFileR.Boundary())
|
||||||
|
params := make(map[string][]string)
|
||||||
|
params["shard"] = []string{"true"}
|
||||||
|
params["quiet"] = []string{"false"}
|
||||||
|
params["silent"] = []string{"false"}
|
||||||
|
params["layout"] = []string{""}
|
||||||
|
params["chunker"] = []string{""}
|
||||||
|
params["raw"] = []string{"false"}
|
||||||
|
params["hidden"] = []string{"false"}
|
||||||
|
params["replMin"] = []string{"-1"}
|
||||||
|
params["replMax"] = []string{"-1"}
|
||||||
|
|
||||||
|
_, err = clusters[j].AddFile(multiPartR, params)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
delay()
|
||||||
|
|
||||||
|
rootCid, _ := cid.Decode(test.TestDirBalancedRootCID)
|
||||||
|
m, err := mockFromCluster(clusters[j], clusters, mock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fShardsPinned := func(t *testing.T, c *Cluster) {
|
||||||
|
// Verify that consensus state has the expected pinset
|
||||||
|
pins := c.Pins()
|
||||||
|
iRoot := -1
|
||||||
|
for i, pin := range pins {
|
||||||
|
if pin.Cid.Equals(rootCid) {
|
||||||
|
iRoot = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if iRoot == -1 {
|
||||||
|
t.Fatalf("data root not tracked in pinset")
|
||||||
|
}
|
||||||
|
if pins[iRoot].Type != api.MetaType {
|
||||||
|
t.Errorf("unexpected data root pin type: %s",
|
||||||
|
pins[iRoot].Type.String())
|
||||||
|
}
|
||||||
|
if pins[iRoot].Recursive {
|
||||||
|
t.Errorf("unexected recursively pinned data root")
|
||||||
|
}
|
||||||
|
if pins[iRoot].Parents.Len() != 0 {
|
||||||
|
t.Errorf("unexpected parents of data root")
|
||||||
|
}
|
||||||
|
cdag := pins[iRoot].Clusterdag
|
||||||
|
if cdag == nil {
|
||||||
|
t.Fatalf("no clusterdag pinned for data root")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the clusterdag pointed to by data root is pinned
|
||||||
|
iCdag := -1
|
||||||
|
for i, pin := range pins {
|
||||||
|
if pin.Cid.Equals(cdag) {
|
||||||
|
iCdag = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if iCdag == -1 {
|
||||||
|
t.Fatalf("clusterDAG not tracked in pinset")
|
||||||
|
}
|
||||||
|
if pins[iCdag].Type != api.CdagType {
|
||||||
|
t.Errorf("unexpected clusterdag root pin type: %s", pins[iCdag].Type.String())
|
||||||
|
}
|
||||||
|
if pins[iCdag].Recursive {
|
||||||
|
t.Errorf("unexected recursively pinned clusterdag root")
|
||||||
|
}
|
||||||
|
if pins[iCdag].Parents.Len() != 1 {
|
||||||
|
t.Errorf("unexpected parent set size of %d", pins[iCdag].Parents.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather shards from clusterDAG data
|
||||||
|
cdagBytes, ok := m.BlockStore[cdag.String()]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("ipfs does not store cdag data")
|
||||||
|
}
|
||||||
|
cdagNode, err := sharder.CborDataToNode(cdagBytes, "cbor")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cdag bytes cannot be parsed to ipld-cbor node")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that all shards in the clusterDAG are pinned
|
||||||
|
for _, shardLink := range cdagNode.Links() {
|
||||||
|
iShard := -1
|
||||||
|
for i, pin := range pins {
|
||||||
|
if pin.Cid.Equals(shardLink.Cid) {
|
||||||
|
iShard = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if iShard == -1 {
|
||||||
|
t.Fatalf("at least one shard node not tracked in pinset")
|
||||||
|
}
|
||||||
|
if pins[iShard].Type != api.ShardType {
|
||||||
|
t.Errorf("unexpected shard pin type: %s", pins[iShard].Type.String())
|
||||||
|
}
|
||||||
|
if pins[iShard].Parents.Len() != 1 {
|
||||||
|
t.Errorf("unexpected parent set size of %d", pins[iShard].Parents.Len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that these are the only pins tracked
|
||||||
|
numPins := len(cdagNode.Links()) + 2
|
||||||
|
allStatus := c.tracker.StatusAll()
|
||||||
|
if len(allStatus) != numPins {
|
||||||
|
t.Errorf("Expected %d pins from local file add but got %d", numPins, len(allStatus))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runF(t, clusters, fShardsPinned)
|
||||||
|
}
|
||||||
|
|
||||||
|
func mockFromCluster(c *Cluster, cs []*Cluster, ms []*test.IpfsMock) (*test.IpfsMock, error) {
|
||||||
|
targetID := peer.IDB58Encode(c.ID().ID)
|
||||||
|
for i, c := range cs {
|
||||||
|
if peer.IDB58Encode(c.ID().ID) == targetID {
|
||||||
|
return ms[i], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("cluster not in clusters")
|
||||||
|
}
|
||||||
|
|
|
@ -110,10 +110,6 @@ type ipfsStream struct {
|
||||||
Protocol string
|
Protocol string
|
||||||
}
|
}
|
||||||
|
|
||||||
type ipfsBlockPutResp struct {
|
|
||||||
Key string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewConnector creates the component and leaves it ready to be started
|
// NewConnector creates the component and leaves it ready to be started
|
||||||
func NewConnector(cfg *Config) (*Connector, error) {
|
func NewConnector(cfg *Config) (*Connector, error) {
|
||||||
err := cfg.Validate()
|
err := cfg.Validate()
|
||||||
|
@ -992,7 +988,7 @@ func extractArgument(u *url.URL) (string, bool) {
|
||||||
|
|
||||||
// BlockPut triggers an ipfs block put on the given data, inserting the block
|
// BlockPut triggers an ipfs block put on the given data, inserting the block
|
||||||
// into the ipfs daemon's repo.
|
// into the ipfs daemon's repo.
|
||||||
func (ipfs *Connector) BlockPut(b api.NodeWithMeta) (string, error) {
|
func (ipfs *Connector) BlockPut(b api.NodeWithMeta) error {
|
||||||
r := ioutil.NopCloser(bytes.NewReader(b.Data))
|
r := ioutil.NopCloser(bytes.NewReader(b.Data))
|
||||||
rFile := files.NewReaderFile("", "", r, nil)
|
rFile := files.NewReaderFile("", "", r, nil)
|
||||||
sliceFile := files.NewSliceFile("", "", []files.File{rFile}) // IPFS reqs require a wrapping directory
|
sliceFile := files.NewSliceFile("", "", []files.File{rFile}) // IPFS reqs require a wrapping directory
|
||||||
|
@ -1002,16 +998,8 @@ func (ipfs *Connector) BlockPut(b api.NodeWithMeta) (string, error) {
|
||||||
}
|
}
|
||||||
url := "block/put?f=" + b.Format
|
url := "block/put?f=" + b.Format
|
||||||
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
|
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
|
||||||
res, err := ipfs.post(url, contentType, multiFileR)
|
_, err := ipfs.post(url, contentType, multiFileR)
|
||||||
if err != nil {
|
return err
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
var keyRaw ipfsBlockPutResp
|
|
||||||
err = json.Unmarshal(res, &keyRaw)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return keyRaw.Key, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlockGet retrieves an ipfs block with the given cid
|
// BlockGet retrieves an ipfs block with the given cid
|
||||||
|
|
|
@ -712,27 +712,45 @@ func TestBlockPut(t *testing.T) {
|
||||||
defer ipfs.Shutdown()
|
defer ipfs.Shutdown()
|
||||||
|
|
||||||
data := []byte(test.TestCid4Data)
|
data := []byte(test.TestCid4Data)
|
||||||
resp, err := ipfs.BlockPut(api.NodeWithMeta{
|
err := ipfs.BlockPut(api.NodeWithMeta{
|
||||||
Data: data,
|
Data: data,
|
||||||
Format: "protobuf",
|
Cid: test.TestCid4,
|
||||||
|
Format: "raw",
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if resp != test.TestCid4 {
|
|
||||||
t.Fatal("Unexpected resulting cid")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBlockGet(t *testing.T) {
|
func TestBlockGet(t *testing.T) {
|
||||||
ipfs, mock := testIPFSConnector(t)
|
ipfs, mock := testIPFSConnector(t)
|
||||||
defer mock.Close()
|
defer mock.Close()
|
||||||
defer ipfs.Shutdown()
|
defer ipfs.Shutdown()
|
||||||
|
|
||||||
shardCid, err := cid.Decode(test.TestShardCid)
|
shardCid, err := cid.Decode(test.TestShardCid)
|
||||||
data, err := ipfs.BlockGet(shardCid)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
// Fail when getting before putting
|
||||||
|
_, err = ipfs.BlockGet(shardCid)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected to fail getting unput block")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put and then successfully get
|
||||||
|
err = ipfs.BlockPut(api.NodeWithMeta{
|
||||||
|
Data: test.TestShardData,
|
||||||
|
Cid: test.TestShardCid,
|
||||||
|
Format: "cbor",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := ipfs.BlockGet(shardCid)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
if !bytes.Equal(data, test.TestShardData) {
|
if !bytes.Equal(data, test.TestShardData) {
|
||||||
t.Fatal("unexpected data returned")
|
t.Fatal("unexpected data returned")
|
||||||
}
|
}
|
||||||
|
|
|
@ -303,10 +303,8 @@ func (rpcapi *RPCAPI) IPFSSwarmPeers(ctx context.Context, in struct{}, out *api.
|
||||||
}
|
}
|
||||||
|
|
||||||
// IPFSBlockPut runs IPFSConnector.BlockPut().
|
// IPFSBlockPut runs IPFSConnector.BlockPut().
|
||||||
func (rpcapi *RPCAPI) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *string) error {
|
func (rpcapi *RPCAPI) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
|
||||||
res, err := rpcapi.c.ipfs.BlockPut(in)
|
return rpcapi.c.ipfs.BlockPut(in)
|
||||||
*out = res
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// IPFSBlockGet runs IPFSConnector.BlockGet().
|
// IPFSBlockGet runs IPFSConnector.BlockGet().
|
||||||
|
|
|
@ -206,13 +206,12 @@ func (s *Sharder) AddNode(
|
||||||
Data: data,
|
Data: data,
|
||||||
Format: format,
|
Format: format,
|
||||||
}
|
}
|
||||||
var retStr string
|
|
||||||
return id, s.rpcClient.Call(
|
return id, s.rpcClient.Call(
|
||||||
session.assignedPeer,
|
session.assignedPeer,
|
||||||
"Cluster",
|
"Cluster",
|
||||||
"IPFSBlockPut",
|
"IPFSBlockPut",
|
||||||
b,
|
b,
|
||||||
&retStr,
|
&struct{}{},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,8 +247,7 @@ func (s *Sharder) Finalize(id string) error {
|
||||||
Format: "cbor",
|
Format: "cbor",
|
||||||
}
|
}
|
||||||
logger.Debugf("The serialized shard root cid: %s", shardRoot.Cid().String())
|
logger.Debugf("The serialized shard root cid: %s", shardRoot.Cid().String())
|
||||||
var retStr string
|
err = s.rpcClient.Call("", "Cluster", "IPFSBlockPut", b, &struct{}{})
|
||||||
err = s.rpcClient.Call("", "Cluster", "IPFSBlockPut", b, &retStr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -353,7 +351,6 @@ func (s *Sharder) flush() error {
|
||||||
|
|
||||||
for _, shardNode := range shardNodes {
|
for _, shardNode := range shardNodes {
|
||||||
logger.Debugf("The dag cbor Node Links: %v", shardNode.Links())
|
logger.Debugf("The dag cbor Node Links: %v", shardNode.Links())
|
||||||
var retStr string
|
|
||||||
b := api.NodeWithMeta{
|
b := api.NodeWithMeta{
|
||||||
Data: shardNode.RawData(),
|
Data: shardNode.RawData(),
|
||||||
Format: "cbor",
|
Format: "cbor",
|
||||||
|
@ -364,7 +361,7 @@ func (s *Sharder) flush() error {
|
||||||
"Cluster",
|
"Cluster",
|
||||||
"IPFSBlockPut",
|
"IPFSBlockPut",
|
||||||
b,
|
b,
|
||||||
&retStr,
|
&struct{}{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -84,7 +84,7 @@ func (mock *mockRPC) Allocate(ctx context.Context, in api.AllocateInfo, out *[]p
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the ordered sequence of BlockPut calls for later validation
|
// Record the ordered sequence of BlockPut calls for later validation
|
||||||
func (mock *mockRPC) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *string) error {
|
func (mock *mockRPC) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
|
||||||
mock.orderedPuts[len(mock.orderedPuts)] = in
|
mock.orderedPuts[len(mock.orderedPuts)] = in
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
77
test/cids.go
77
test/cids.go
|
@ -34,44 +34,51 @@ var (
|
||||||
TestPeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
|
TestPeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Hashes of test files generated in this module through calls to GetTestingDir*
|
|
||||||
// and ipfs add with default chunking and layout
|
|
||||||
var TestDirCids = [29]string{
|
|
||||||
"QmdWLMAyMF23KyBfAXbNzy7sDu3zGGu27eGQRkQTPMqfoE",
|
|
||||||
"QmbiPudqP8264Ccia1iyTebFrtGmG3JCW85YmT5Gds1Wt9",
|
|
||||||
"QmesCMDCHRETdDYfuGjUaGVZGSE2nGxGETPoBUgwitnsCT",
|
|
||||||
"Qmbiz4Y6ByNTrzEwE2veqy7S8gUBJNNvNqxAy6bBShpvr4",
|
|
||||||
"QmNtq6PD9Ef6V1UtikhemHBcupjsvr2sDu7bu2DrBSoHWw",
|
|
||||||
"QmVorRQhT4DbND8JyhAW7HkNPd7bUzqof8XJKcfGcGmvHF",
|
|
||||||
"QmanFw3Fn96DkMc9XSuhZdvXWDk37cpLrKA6L54MniGL9Z",
|
|
||||||
"QmRVFNBFwtUKpE7u3Bbd6Nj1QsnyHgryZSTM86bBuphPAn",
|
|
||||||
"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn",
|
|
||||||
"QmPZLJ3CZYgxH4K1w5jdbAdxJynXn5TCB4kHy7u8uHC3fy",
|
|
||||||
"QmQntQGk13CkPgr1b3RAseJFaTpVMqQu8zgh21ox1RnwBf",
|
|
||||||
"QmbR4x5HwcQLiipfyHazhKYA1Z2yN9burWWdAKJBhoZnK3",
|
|
||||||
"QmYdHmrwb9Wd8kkjLg4yKr7EPqKNGx5vHuREU5HNc7sxnk",
|
|
||||||
"QmVMmDqWhdH8d4QWyhkkVHYvQYara6ijgCg3PNpvRhbmHo",
|
|
||||||
"QmX2Erb4SBNfcv8X5MFa4z1jGPfaaYA1snMUhQyYVdDqTA",
|
|
||||||
// Hashes that are not printed out (chunks of files)
|
|
||||||
"QmavW3cdGuSfYMEQiBDfobwVtPEjUnML2Ry1q8w8X3Q8Wj",
|
|
||||||
"QmfPHRbeerRWgbu5BzxwK7UhmJGqGvZNxuFoMCUFTuhG3H",
|
|
||||||
"QmaYNfhw7L7KWX7LYpwWt1bh6Gq2p7z1tic35PnDRnqyBf",
|
|
||||||
"QmWWwH1GKMh6GmFQunjq7CHjr4g4z6Q4xHyDVfuZGX7MyU",
|
|
||||||
"QmVpHQGMF5PLsvfgj8bGo9q2YyLRPMvfu1uTb3DgREFtUc",
|
|
||||||
"QmUrdAn4Mx4kNioX9juLgwQotwFfxeo5doUNnLJrQynBEN",
|
|
||||||
"QmdJ86B7J8mfGq6SjQy8Jz7r5x1cLcXc9M2a7T7NmSMVZx",
|
|
||||||
"QmS77cTMdyx8P7rP2Gij6azgYPpjp2J34EVYuhB6mfjrQh",
|
|
||||||
"QmbsBsDspFcqi7xJ4xPxcNYnduzQ5UQDw9y6trQWZGoEHq",
|
|
||||||
"QmakAXHMeyE6fHHaeqicSKVMM2QyuGbS2g8dgUA7ns8gSY",
|
|
||||||
"QmTC6vGbH9ABkpXfrMmYkXbxEqH12jEVGpvGzibGZEDVHK",
|
|
||||||
"QmRLF116yZdLLw7bLvjnHxXVHrjB2snNoJ1itpQxi8NkZP",
|
|
||||||
"QmZ2iUT3W7jh8QNnpWSiMZ1QYgpommCSQFZiPY5VdoCHyv",
|
|
||||||
"QmR5mq8smc6zCvo3eRhS47ScgEwKpPw7b1joExysyqgbee",
|
|
||||||
}
|
|
||||||
|
|
||||||
// MustDecodeCid provides a test helper that ignores
|
// MustDecodeCid provides a test helper that ignores
|
||||||
// errors from cid.Decode.
|
// errors from cid.Decode.
|
||||||
func MustDecodeCid(v string) *cid.Cid {
|
func MustDecodeCid(v string) *cid.Cid {
|
||||||
c, _ := cid.Decode(v)
|
c, _ := cid.Decode(v)
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Variables related to adding the testing directory generated by tests
|
||||||
|
var (
|
||||||
|
NumTestDirPrints = 15
|
||||||
|
TestDirBalancedRootCID = "QmX2Erb4SBNfcv8X5MFa4z1jGPfaaYA1snMUhQyYVdDqTA"
|
||||||
|
TestDirTrickleRootCID = "QmQnpSRdGDhUoGiCXQLMaAWmJKgkZbBeVi1pzsQh1xuTGz"
|
||||||
|
|
||||||
|
// Hashes of test files generated in this module through calls to GetTestingDir*
|
||||||
|
// and ipfs add with default chunking and layout
|
||||||
|
TestDirCids = [29]string{
|
||||||
|
"QmdWLMAyMF23KyBfAXbNzy7sDu3zGGu27eGQRkQTPMqfoE",
|
||||||
|
"QmbiPudqP8264Ccia1iyTebFrtGmG3JCW85YmT5Gds1Wt9",
|
||||||
|
"QmesCMDCHRETdDYfuGjUaGVZGSE2nGxGETPoBUgwitnsCT",
|
||||||
|
"Qmbiz4Y6ByNTrzEwE2veqy7S8gUBJNNvNqxAy6bBShpvr4",
|
||||||
|
"QmNtq6PD9Ef6V1UtikhemHBcupjsvr2sDu7bu2DrBSoHWw",
|
||||||
|
"QmVorRQhT4DbND8JyhAW7HkNPd7bUzqof8XJKcfGcGmvHF",
|
||||||
|
"QmanFw3Fn96DkMc9XSuhZdvXWDk37cpLrKA6L54MniGL9Z",
|
||||||
|
"QmRVFNBFwtUKpE7u3Bbd6Nj1QsnyHgryZSTM86bBuphPAn",
|
||||||
|
"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn",
|
||||||
|
"QmPZLJ3CZYgxH4K1w5jdbAdxJynXn5TCB4kHy7u8uHC3fy",
|
||||||
|
"QmQntQGk13CkPgr1b3RAseJFaTpVMqQu8zgh21ox1RnwBf",
|
||||||
|
"QmbR4x5HwcQLiipfyHazhKYA1Z2yN9burWWdAKJBhoZnK3",
|
||||||
|
"QmYdHmrwb9Wd8kkjLg4yKr7EPqKNGx5vHuREU5HNc7sxnk",
|
||||||
|
"QmVMmDqWhdH8d4QWyhkkVHYvQYara6ijgCg3PNpvRhbmHo",
|
||||||
|
"QmX2Erb4SBNfcv8X5MFa4z1jGPfaaYA1snMUhQyYVdDqTA",
|
||||||
|
// Hashes that are not printed out (chunks of files)
|
||||||
|
"QmavW3cdGuSfYMEQiBDfobwVtPEjUnML2Ry1q8w8X3Q8Wj",
|
||||||
|
"QmfPHRbeerRWgbu5BzxwK7UhmJGqGvZNxuFoMCUFTuhG3H",
|
||||||
|
"QmaYNfhw7L7KWX7LYpwWt1bh6Gq2p7z1tic35PnDRnqyBf",
|
||||||
|
"QmWWwH1GKMh6GmFQunjq7CHjr4g4z6Q4xHyDVfuZGX7MyU",
|
||||||
|
"QmVpHQGMF5PLsvfgj8bGo9q2YyLRPMvfu1uTb3DgREFtUc",
|
||||||
|
"QmUrdAn4Mx4kNioX9juLgwQotwFfxeo5doUNnLJrQynBEN",
|
||||||
|
"QmdJ86B7J8mfGq6SjQy8Jz7r5x1cLcXc9M2a7T7NmSMVZx",
|
||||||
|
"QmS77cTMdyx8P7rP2Gij6azgYPpjp2J34EVYuhB6mfjrQh",
|
||||||
|
"QmbsBsDspFcqi7xJ4xPxcNYnduzQ5UQDw9y6trQWZGoEHq",
|
||||||
|
"QmakAXHMeyE6fHHaeqicSKVMM2QyuGbS2g8dgUA7ns8gSY",
|
||||||
|
"QmTC6vGbH9ABkpXfrMmYkXbxEqH12jEVGpvGzibGZEDVHK",
|
||||||
|
"QmRLF116yZdLLw7bLvjnHxXVHrjB2snNoJ1itpQxi8NkZP",
|
||||||
|
"QmZ2iUT3W7jh8QNnpWSiMZ1QYgpommCSQFZiPY5VdoCHyv",
|
||||||
|
"QmR5mq8smc6zCvo3eRhS47ScgEwKpPw7b1joExysyqgbee",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
|
@ -19,10 +19,11 @@ import (
|
||||||
|
|
||||||
// IpfsMock is an ipfs daemon mock which should sustain the functionality used by ipfscluster.
|
// IpfsMock is an ipfs daemon mock which should sustain the functionality used by ipfscluster.
|
||||||
type IpfsMock struct {
|
type IpfsMock struct {
|
||||||
server *httptest.Server
|
server *httptest.Server
|
||||||
Addr string
|
Addr string
|
||||||
Port int
|
Port int
|
||||||
pinMap *mapstate.MapState
|
pinMap *mapstate.MapState
|
||||||
|
BlockStore map[string][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockPinResp struct {
|
type mockPinResp struct {
|
||||||
|
@ -85,8 +86,10 @@ type mockBlockPutResp struct {
|
||||||
// NewIpfsMock returns a new mock.
|
// NewIpfsMock returns a new mock.
|
||||||
func NewIpfsMock() *IpfsMock {
|
func NewIpfsMock() *IpfsMock {
|
||||||
st := mapstate.NewMapState()
|
st := mapstate.NewMapState()
|
||||||
|
blocks := make(map[string][]byte)
|
||||||
m := &IpfsMock{
|
m := &IpfsMock{
|
||||||
pinMap: st,
|
pinMap: st,
|
||||||
|
BlockStore: blocks,
|
||||||
}
|
}
|
||||||
ts := httptest.NewServer(http.HandlerFunc(m.handler))
|
ts := httptest.NewServer(http.HandlerFunc(m.handler))
|
||||||
m.server = ts
|
m.server = ts
|
||||||
|
@ -246,10 +249,25 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
goto ERROR
|
goto ERROR
|
||||||
}
|
}
|
||||||
c := cid.NewCidV1(cid.Raw, u.Hash(data)).String()
|
// Parse cid from data and format and add to mock block-store
|
||||||
if c != TestCid4 {
|
query := r.URL.Query()
|
||||||
|
format, ok := query["f"]
|
||||||
|
if !ok || len(format) != 1 {
|
||||||
goto ERROR
|
goto ERROR
|
||||||
}
|
}
|
||||||
|
var c string
|
||||||
|
hash := u.Hash(data)
|
||||||
|
codec, ok := cid.Codecs[format[0]]
|
||||||
|
if !ok {
|
||||||
|
goto ERROR
|
||||||
|
}
|
||||||
|
if format[0] == "v0" {
|
||||||
|
c = cid.NewCidV0(hash).String()
|
||||||
|
} else {
|
||||||
|
c = cid.NewCidV1(codec, hash).String()
|
||||||
|
}
|
||||||
|
m.BlockStore[c] = data
|
||||||
|
|
||||||
resp := mockBlockPutResp{
|
resp := mockBlockPutResp{
|
||||||
Key: c,
|
Key: c,
|
||||||
}
|
}
|
||||||
|
@ -264,14 +282,11 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
if len(arg) != 1 {
|
if len(arg) != 1 {
|
||||||
goto ERROR
|
goto ERROR
|
||||||
}
|
}
|
||||||
switch arg[0] {
|
data, ok := m.BlockStore[arg[0]]
|
||||||
case TestShardCid:
|
if !ok {
|
||||||
w.Write(TestShardData)
|
|
||||||
case TestCdagCid:
|
|
||||||
w.Write(TestCdagData)
|
|
||||||
default:
|
|
||||||
goto ERROR
|
goto ERROR
|
||||||
}
|
}
|
||||||
|
w.Write(data)
|
||||||
case "repo/stat":
|
case "repo/stat":
|
||||||
len := len(m.pinMap.List())
|
len := len(m.pinMap.List())
|
||||||
resp := mockRepoStatResp{
|
resp := mockRepoStatResp{
|
||||||
|
|
Loading…
Reference in New Issue
Block a user