Merge pull request #1753 from ipfs-cluster/fix/1706-fix-block-warning

ipfshttp: Fix "seen blocks" tracking and blockPut metrics
This commit is contained in:
Hector Sanjuan 2022-09-09 16:32:09 +02:00 committed by GitHub
commit 06f0cac9c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -963,7 +963,7 @@ type chanIterator struct {
err error
seenMu sync.Mutex
seen *multihash.Set
seen map[string]int
}
func (ci *chanIterator) Name() string {
@ -974,27 +974,31 @@ func (ci *chanIterator) Name() string {
}
// return NewBytesFile.
// This function might and is actually called multiple times for the same node
// by the multifile Reader to send the multipart.
func (ci *chanIterator) Node() files.Node {
if !ci.current.Cid.Defined() {
return nil
}
logger.Debugf("it.node(): %s", ci.current.Cid)
ci.seenMu.Lock()
ci.seen.Add(ci.current.Cid.Hash())
ci.seenMu.Unlock()
stats.Record(ci.ctx, observations.BlocksAdded.M(1))
stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(ci.current.Data))))
logger.Debugf("it.Node(): %s", ci.current.Cid)
return files.NewBytesFile(ci.current.Data)
}
// Seen returns whether we have seen a multihash. It keeps count so it will
// return true as many times as we have seen it.
func (ci *chanIterator) Seen(c api.Cid) bool {
ci.seenMu.Lock()
has := ci.seen.Has(c.Cid.Hash())
ci.seen.Remove(c.Cid.Hash())
n, ok := ci.seen[string(c.Cid.Hash())]
logger.Debugf("Seen(): %s, %d, %t", c, n, ok)
if ok {
if n == 1 {
delete(ci.seen, string(c.Cid.Hash()))
} else {
ci.seen[string(c.Cid.Hash())] = n - 1
}
}
ci.seenMu.Unlock()
return has
return ok
}
func (ci *chanIterator) Done() bool {
@ -1024,9 +1028,20 @@ func (ci *chanIterator) Next() bool {
if ci.done {
return false
}
seeBlock := func(b api.NodeWithMeta) {
ci.seenMu.Lock()
ci.seen[string(b.Cid.Hash())]++
ci.seenMu.Unlock()
stats.Record(ci.ctx, observations.BlocksAdded.M(1))
stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(b.Data))))
}
if ci.peeked.Cid.Defined() {
ci.current = ci.peeked
ci.peeked = api.NodeWithMeta{}
seeBlock(ci.current)
return true
}
select {
@ -1039,8 +1054,12 @@ func (ci *chanIterator) Next() bool {
ci.done = true
return false
}
// Record that we have seen this block. This has to be done
// here, not in Node() as Node() is called multiple times per
// block received.
logger.Debugf("it.Next() %s", next.Cid)
ci.current = next
seeBlock(ci.current)
return true
}
}
@ -1083,7 +1102,7 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi
it := &chanIterator{
ctx: ctx,
blocks: blocks,
seen: multihash.NewSet(),
seen: make(map[string]int),
}
dir := &chanDirectory{
iterator: it,