diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 42fca330..ba87070a 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -964,7 +964,7 @@ type chanIterator struct { err error seenMu sync.Mutex - seen *multihash.Set + seen map[string]int } func (ci *chanIterator) Name() string { @@ -975,27 +975,31 @@ func (ci *chanIterator) Name() string { } // return NewBytesFile. +// This function might and is actually called multiple times for the same node +// by the multifile Reader to send the multipart. func (ci *chanIterator) Node() files.Node { if !ci.current.Cid.Defined() { return nil } - logger.Debugf("it.node(): %s", ci.current.Cid) - ci.seenMu.Lock() - ci.seen.Add(ci.current.Cid.Hash()) - ci.seenMu.Unlock() - - stats.Record(ci.ctx, observations.BlocksAdded.M(1)) - stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(ci.current.Data)))) - + logger.Debugf("it.Node(): %s", ci.current.Cid) return files.NewBytesFile(ci.current.Data) } +// Seen returns whether we have seen a multihash. It keeps count so it will +// return true as many times as we have seen it. func (ci *chanIterator) Seen(c api.Cid) bool { ci.seenMu.Lock() - has := ci.seen.Has(c.Cid.Hash()) - ci.seen.Remove(c.Cid.Hash()) + n, ok := ci.seen[string(c.Cid.Hash())] + logger.Debugf("Seen(): %s, %d, %t", c, n, ok) + if ok { + if n == 1 { + delete(ci.seen, string(c.Cid.Hash())) + } else { + ci.seen[string(c.Cid.Hash())] = n - 1 + } + } ci.seenMu.Unlock() - return has + return ok } func (ci *chanIterator) Done() bool { @@ -1025,9 +1029,20 @@ func (ci *chanIterator) Next() bool { if ci.done { return false } + + seeBlock := func(b api.NodeWithMeta) { + ci.seenMu.Lock() + ci.seen[string(b.Cid.Hash())]++ + ci.seenMu.Unlock() + stats.Record(ci.ctx, observations.BlocksAdded.M(1)) + stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(b.Data)))) + + } + if ci.peeked.Cid.Defined() { ci.current = ci.peeked ci.peeked = api.NodeWithMeta{} + seeBlock(ci.current) return true } select { @@ -1040,8 +1055,12 @@ func (ci *chanIterator) Next() bool { ci.done = true return false } + // Record that we have seen this block. This has to be done + // here, not in Node() as Node() is called multiple times per + // block received. logger.Debugf("it.Next() %s", next.Cid) ci.current = next + seeBlock(ci.current) return true } } @@ -1084,7 +1103,7 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi it := &chanIterator{ ctx: ctx, blocks: blocks, - seen: multihash.NewSet(), + seen: make(map[string]int), } dir := &chanDirectory{ iterator: it,