ipfshttp: Fix "seen blocks" tracking and blockPut metrics

This fixes two bugs. First, the "blockPut response CID does not match the
multihash" warning was coming up when it shouldn't. Particularly, the
multipart reader called Node() several times for the same block, resulting in
CIDs been removed from the Seen set, and causing the warning when there were
several blocks (usually the empty dir block).

This also means we were counting Blockputs (and total data added) wrong in the
metrics, double-counting some blocks as these were recorded in Node() calls.

The fix makes the tracking in Next(), which is only called once for each
block. To avoid timing issues between Block reads from the channel and
blockput responses, the Seen set now stores how many times we have seen a
block. Thus a duplicated block that will get two BlockPut responses will not
trigger a warning regardless of the time when those responses arrive.

Fixes #1706.
This commit is contained in:
Hector Sanjuan 2022-09-05 18:01:39 +02:00
parent 71bda2d658
commit ed54c665b8

View File

@ -964,7 +964,7 @@ type chanIterator struct {
err error
seenMu sync.Mutex
seen *multihash.Set
seen map[string]int
}
func (ci *chanIterator) Name() string {
@ -975,27 +975,31 @@ func (ci *chanIterator) Name() string {
}
// return NewBytesFile.
// This function might and is actually called multiple times for the same node
// by the multifile Reader to send the multipart.
func (ci *chanIterator) Node() files.Node {
if !ci.current.Cid.Defined() {
return nil
}
logger.Debugf("it.node(): %s", ci.current.Cid)
ci.seenMu.Lock()
ci.seen.Add(ci.current.Cid.Hash())
ci.seenMu.Unlock()
stats.Record(ci.ctx, observations.BlocksAdded.M(1))
stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(ci.current.Data))))
logger.Debugf("it.Node(): %s", ci.current.Cid)
return files.NewBytesFile(ci.current.Data)
}
// Seen returns whether we have seen a multihash. It keeps count so it will
// return true as many times as we have seen it.
func (ci *chanIterator) Seen(c api.Cid) bool {
ci.seenMu.Lock()
has := ci.seen.Has(c.Cid.Hash())
ci.seen.Remove(c.Cid.Hash())
n, ok := ci.seen[string(c.Cid.Hash())]
logger.Debugf("Seen(): %s, %d, %t", c, n, ok)
if ok {
if n == 1 {
delete(ci.seen, string(c.Cid.Hash()))
} else {
ci.seen[string(c.Cid.Hash())] = n - 1
}
}
ci.seenMu.Unlock()
return has
return ok
}
func (ci *chanIterator) Done() bool {
@ -1025,9 +1029,20 @@ func (ci *chanIterator) Next() bool {
if ci.done {
return false
}
seeBlock := func(b api.NodeWithMeta) {
ci.seenMu.Lock()
ci.seen[string(b.Cid.Hash())]++
ci.seenMu.Unlock()
stats.Record(ci.ctx, observations.BlocksAdded.M(1))
stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(b.Data))))
}
if ci.peeked.Cid.Defined() {
ci.current = ci.peeked
ci.peeked = api.NodeWithMeta{}
seeBlock(ci.current)
return true
}
select {
@ -1040,8 +1055,12 @@ func (ci *chanIterator) Next() bool {
ci.done = true
return false
}
// Record that we have seen this block. This has to be done
// here, not in Node() as Node() is called multiple times per
// block received.
logger.Debugf("it.Next() %s", next.Cid)
ci.current = next
seeBlock(ci.current)
return true
}
}
@ -1084,7 +1103,7 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi
it := &chanIterator{
ctx: ctx,
blocks: blocks,
seen: multihash.NewSet(),
seen: make(map[string]int),
}
dir := &chanDirectory{
iterator: it,