Addressing second round of comments
License: MIT Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
parent
0d6b2a42de
commit
3e9b08ba60
|
@ -33,7 +33,6 @@ func ToPrint(f files.File) error {
|
|||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := fileAdder.AddFile(file); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -51,16 +50,16 @@ func ToPrint(f files.File) error {
|
|||
|
||||
// ToChannel imports file to ipfs ipld nodes, outputting nodes on the
|
||||
// provided channel
|
||||
func ToChannel(f files.File, ctx context.Context) <-chan *ipld.Node {
|
||||
func ToChannel(f files.File, ctx context.Context) (<-chan *ipld.Node, error) {
|
||||
outChan := make(chan *ipld.Node)
|
||||
dserv := &outDAGService{
|
||||
membership: make(map[string]bool),
|
||||
membership: make(map[string]struct{}),
|
||||
outChan: outChan,
|
||||
}
|
||||
|
||||
fileAdder, err := coreunix.NewAdder(ctx, nil, nil, dserv)
|
||||
if err != nil {
|
||||
return err
|
||||
return outChan, err
|
||||
}
|
||||
fileAdder.Pin = false
|
||||
|
||||
|
@ -72,18 +71,22 @@ func ToChannel(f files.File, ctx context.Context) <-chan *ipld.Node {
|
|||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err := fileAdder.AddFile(file); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
_, err = fileAdder.Finalize()
|
||||
if !strings.Contains(err.Error(), "dagservice: block not found") {
|
||||
return err
|
||||
return
|
||||
}
|
||||
}()
|
||||
return outChan
|
||||
return outChan, nil
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/ipfs/go-ipfs-cmdkit/files"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
)
|
||||
|
||||
const testDir = "testingData"
|
||||
|
@ -39,7 +38,7 @@ func TestToPrint(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
var orderedCids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn",
|
||||
var cids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn",
|
||||
"Qmbp4C4KkyjVzTpZ327Ub555FEHizhJS4M17f2zCCrQMAz",
|
||||
"QmYz38urZ99eVCxSccM63bGtDv54UWtBDWJdTxGch23khA",
|
||||
"QmUwG2mfhhfBEzVtvyEvva1kc8kU4CrXphdMCdAiFNWdxy",
|
||||
|
@ -49,10 +48,10 @@ var orderedCids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn",
|
|||
"QmcUBApwNSDg2Q2J4NXzks1HewVGFohNpPyEud7bZfo5tE",
|
||||
"QmaKaB735eydQwnaJNuYbXRU1gVb4MJdzHp1rpUZJN67G6",
|
||||
"QmQ6n82dRtEJVHMDLW5BSrJ6bRDXkFXj2i7Vjxy4B2PH82",
|
||||
"QmbDaWrwxk93RfWSBL8ajTproNKbE2EghBLSk8199EBm1m",
|
||||
"QmXDmRPxV9KWYLhN7bqGn143GgiNxhy9Hqc3ZPJW63xgok",
|
||||
"QmPZLJ3CZYgxH4K1w5jdbAdxJynXn5TCB4kHy7u8uHC3fy",
|
||||
"QmUqmcdJJodX7mPoUv9u71HoNAiegBCiCqdUZd5ZrCyLbs",
|
||||
"QmbDaWrwxk93RfWSBL8ajTproNKbE2EghBLSk8199EBm1m",
|
||||
"QmXDmRPxV9KWYLhN7bqGn143GgiNxhy9Hqc3ZPJW63xgok",
|
||||
"QmaytnNarHzDp1ipGyC7zd7Hw2AePmtSpLLaygQ2e9Yvqe",
|
||||
"QmZwab9h6ADw3tv8pzXVF2yndgJpTWKrrMjJQqRkgYmCRH",
|
||||
"QmaNfMZDZjfqjHrFCc6tZwmkqbXs1fnY9AXZ81WUztFeXm",
|
||||
|
@ -65,19 +64,35 @@ func TestToChannel(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
outChan := make(chan *ipld.Node)
|
||||
go func() {
|
||||
i := 0
|
||||
for node := range outChan {
|
||||
cid := (*node).String()
|
||||
if cid != orderedCids[i] {
|
||||
t.Error("unexpected cid: %s instead of %s", cid, orderedCids[i])
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
}()
|
||||
err = ToChannel(file, outChan, context.Background())
|
||||
outChan, err := ToChannel(file, context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
check := make(map[string]struct{})
|
||||
for node := range outChan {
|
||||
cid := (*node).String()
|
||||
if _, ok := check[cid]; ok {
|
||||
t.Fatalf("Duplicate cid %s", cid)
|
||||
}
|
||||
check[cid] = struct{}{}
|
||||
}
|
||||
if len(check) != len(cids) {
|
||||
t.Fatalf("Witnessed cids: %v\nExpected cids: %v", check, cids)
|
||||
}
|
||||
cidsS := cids[:]
|
||||
for cid := range check {
|
||||
if !contains(cidsS, cid) {
|
||||
t.Fatalf("Unexpected cid: %s", cid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func contains(slice []string, s string) bool {
|
||||
for _, a := range slice {
|
||||
if a == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ var errUninit = errors.New("DAGService output channel uninitialized")
|
|||
|
||||
// outDAGService will "add" a node by sending through the outChan
|
||||
type outDAGService struct {
|
||||
membership map[string]bool
|
||||
membership map[string]struct{}
|
||||
outChan chan<- *ipld.Node
|
||||
}
|
||||
|
||||
|
@ -24,11 +24,9 @@ func (ods *outDAGService) Get(ctx context.Context, key *cid.Cid) (ipld.Node, err
|
|||
|
||||
// GetMany returns an output channel that always emits an error
|
||||
func (ods *outDAGService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption {
|
||||
out := make(chan *ipld.NodeOption, len(keys))
|
||||
go func() {
|
||||
out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
|
||||
return
|
||||
}()
|
||||
out := make(chan *ipld.NodeOption, 1)
|
||||
out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
|
||||
close(out)
|
||||
return out
|
||||
}
|
||||
|
||||
|
@ -39,7 +37,7 @@ func (ods *outDAGService) Add(ctx context.Context, node ipld.Node) error {
|
|||
if ok { // already added don't add again
|
||||
return nil
|
||||
}
|
||||
ods.membership[id] = true
|
||||
ods.membership[id] = struct{}{}
|
||||
|
||||
// Send node on output channel
|
||||
if ods.outChan == nil {
|
||||
|
|
Loading…
Reference in New Issue
Block a user