adder/single: do not re-add the same node twice

The ipfsadder actually ends up DAG-putting some nodes several times
(i.e. non-leafs, root)... but usually one after the other. This prevents that
and avoids sending the same data multiple times over the wire (not a good
thing to 3x a small payload because of this).
This commit is contained in:
Hector Sanjuan 2022-03-28 20:05:01 +02:00
parent f07c1e6552
commit e287bd1189
2 changed files with 35 additions and 12 deletions

View File

@ -31,8 +31,9 @@ type DAGService struct {
addParams api.AddParams
local bool
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
recentBlocks *recentBlocks
}
// New returns a new Adder with the given rpc Client. The client is used
@ -41,17 +42,24 @@ func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, local bool) *
// ensure don't Add something and pin it in direct mode.
opts.Mode = api.PinModeRecursive
return &DAGService{
ctx: ctx,
rpcClient: rpc,
dests: nil,
addParams: opts,
local: local,
blocks: make(chan api.NodeWithMeta, 256),
ctx: ctx,
rpcClient: rpc,
dests: nil,
addParams: opts,
local: local,
blocks: make(chan api.NodeWithMeta, 256),
recentBlocks: &recentBlocks{},
}
}
// Add puts the given node in the destination peers.
func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
// Avoid adding the same node multiple times in a row.
// This is done by the ipfsadd-er, because some nodes are added
// via dagbuilder, then via MFS, and root nodes once more.
if dgs.recentBlocks.Has(node) {
return nil
}
// FIXME: can't this happen on initialization? Perhaps the point here
// is the adder only allocates and starts streaming when the first
@ -96,6 +104,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
case <-dgs.ctx.Done():
return ctx.Err()
case dgs.blocks <- adder.IpldNodeToNodeWithMeta(node):
dgs.recentBlocks.Add(node)
return nil
}
}
@ -152,3 +161,18 @@ func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error {
}
return nil
}
type recentBlocks struct {
blocks [2]cid.Cid
cur int
}
func (rc *recentBlocks) Add(n ipld.Node) {
rc.blocks[rc.cur] = n.Cid()
rc.cur = (rc.cur + 1) % 2
}
func (rc *recentBlocks) Has(n ipld.Node) bool {
c := n.Cid()
return rc.blocks[0].Equals(c) || rc.blocks[1].Equals(c)
}

View File

@ -596,7 +596,7 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, pin api.Pin) (api.IPFSPinSt
}
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
logger.Debugf("posting %s", path)
logger.Debugf("posting /%s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path)
req, err := http.NewRequest("POST", urlstr, postBody)
@ -950,9 +950,7 @@ func (ci *chanIterator) Node() files.Node {
return nil
}
ci.seenMu.Lock()
if ci.seen.Visit(ci.current.Cid) {
logger.Debugf("block %s", ci.current.Cid)
}
ci.seen.Add(ci.current.Cid)
ci.seenMu.Unlock()
return files.NewBytesFile(ci.current.Data)
}
@ -1007,6 +1005,7 @@ func (ci *chanIterator) Next() bool {
ci.done = true
return false
}
logger.Debugf("block %s", next.Cid)
ci.current = next
return true
}