diff --git a/ipld-importer/import.go b/ipld-importer/import.go index bb407310..47154c76 100644 --- a/ipld-importer/import.go +++ b/ipld-importer/import.go @@ -33,7 +33,6 @@ func ToPrint(f files.File) error { } else if err != nil { return err } - if err := fileAdder.AddFile(file); err != nil { return err } @@ -51,16 +50,16 @@ func ToPrint(f files.File) error { // ToChannel imports file to ipfs ipld nodes, outputting nodes on the // provided channel -func ToChannel(f files.File, ctx context.Context) <-chan *ipld.Node { +func ToChannel(f files.File, ctx context.Context) (<-chan *ipld.Node, error) { outChan := make(chan *ipld.Node) dserv := &outDAGService{ - membership: make(map[string]bool), + membership: make(map[string]struct{}), outChan: outChan, } fileAdder, err := coreunix.NewAdder(ctx, nil, nil, dserv) if err != nil { - return err + return outChan, err } fileAdder.Pin = false @@ -72,18 +71,22 @@ func ToChannel(f files.File, ctx context.Context) <-chan *ipld.Node { if err == io.EOF { break } else if err != nil { - return err + return + } + select { + case <-ctx.Done(): + return + default: } - if err := fileAdder.AddFile(file); err != nil { - return err + return } } _, err = fileAdder.Finalize() if !strings.Contains(err.Error(), "dagservice: block not found") { - return err + return } }() - return outChan + return outChan, nil } diff --git a/ipld-importer/import_test.go b/ipld-importer/import_test.go index d3104516..0bc560b5 100644 --- a/ipld-importer/import_test.go +++ b/ipld-importer/import_test.go @@ -8,7 +8,6 @@ import ( "testing" "github.com/ipfs/go-ipfs-cmdkit/files" - ipld "github.com/ipfs/go-ipld-format" ) const testDir = "testingData" @@ -39,7 +38,7 @@ func TestToPrint(t *testing.T) { } } -var orderedCids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn", +var cids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn", "Qmbp4C4KkyjVzTpZ327Ub555FEHizhJS4M17f2zCCrQMAz", "QmYz38urZ99eVCxSccM63bGtDv54UWtBDWJdTxGch23khA", "QmUwG2mfhhfBEzVtvyEvva1kc8kU4CrXphdMCdAiFNWdxy", @@ -49,10 +48,10 @@ var orderedCids = [18]string{"QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn", "QmcUBApwNSDg2Q2J4NXzks1HewVGFohNpPyEud7bZfo5tE", "QmaKaB735eydQwnaJNuYbXRU1gVb4MJdzHp1rpUZJN67G6", "QmQ6n82dRtEJVHMDLW5BSrJ6bRDXkFXj2i7Vjxy4B2PH82", - "QmbDaWrwxk93RfWSBL8ajTproNKbE2EghBLSk8199EBm1m", - "QmXDmRPxV9KWYLhN7bqGn143GgiNxhy9Hqc3ZPJW63xgok", "QmPZLJ3CZYgxH4K1w5jdbAdxJynXn5TCB4kHy7u8uHC3fy", "QmUqmcdJJodX7mPoUv9u71HoNAiegBCiCqdUZd5ZrCyLbs", + "QmbDaWrwxk93RfWSBL8ajTproNKbE2EghBLSk8199EBm1m", + "QmXDmRPxV9KWYLhN7bqGn143GgiNxhy9Hqc3ZPJW63xgok", "QmaytnNarHzDp1ipGyC7zd7Hw2AePmtSpLLaygQ2e9Yvqe", "QmZwab9h6ADw3tv8pzXVF2yndgJpTWKrrMjJQqRkgYmCRH", "QmaNfMZDZjfqjHrFCc6tZwmkqbXs1fnY9AXZ81WUztFeXm", @@ -65,19 +64,35 @@ func TestToChannel(t *testing.T) { t.Fatal(err) } - outChan := make(chan *ipld.Node) - go func() { - i := 0 - for node := range outChan { - cid := (*node).String() - if cid != orderedCids[i] { - t.Error("unexpected cid: %s instead of %s", cid, orderedCids[i]) - } - i += 1 - } - }() - err = ToChannel(file, outChan, context.Background()) + outChan, err := ToChannel(file, context.Background()) if err != nil { t.Fatal(err) } + + check := make(map[string]struct{}) + for node := range outChan { + cid := (*node).String() + if _, ok := check[cid]; ok { + t.Fatalf("Duplicate cid %s", cid) + } + check[cid] = struct{}{} + } + if len(check) != len(cids) { + t.Fatalf("Witnessed cids: %v\nExpected cids: %v", check, cids) + } + cidsS := cids[:] + for cid := range check { + if !contains(cidsS, cid) { + t.Fatalf("Unexpected cid: %s", cid) + } + } +} + +func contains(slice []string, s string) bool { + for _, a := range slice { + if a == s { + return true + } + } + return false } diff --git a/ipld-importer/outgoing-dagservice.go b/ipld-importer/outgoing-dagservice.go index 4c4f60e0..4f44ab37 100644 --- a/ipld-importer/outgoing-dagservice.go +++ b/ipld-importer/outgoing-dagservice.go @@ -13,7 +13,7 @@ var errUninit = errors.New("DAGService output channel uninitialized") // outDAGService will "add" a node by sending through the outChan type outDAGService struct { - membership map[string]bool + membership map[string]struct{} outChan chan<- *ipld.Node } @@ -24,11 +24,9 @@ func (ods *outDAGService) Get(ctx context.Context, key *cid.Cid) (ipld.Node, err // GetMany returns an output channel that always emits an error func (ods *outDAGService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption { - out := make(chan *ipld.NodeOption, len(keys)) - go func() { - out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")} - return - }() + out := make(chan *ipld.NodeOption, 1) + out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")} + close(out) return out } @@ -39,7 +37,7 @@ func (ods *outDAGService) Add(ctx context.Context, node ipld.Node) error { if ok { // already added don't add again return nil } - ods.membership[id] = true + ods.membership[id] = struct{}{} // Send node on output channel if ods.outChan == nil {