diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index 865bfc26..13b5b9cb 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -31,8 +31,9 @@ type DAGService struct { addParams api.AddParams local bool - bs *adder.BlockStreamer - blocks chan api.NodeWithMeta + bs *adder.BlockStreamer + blocks chan api.NodeWithMeta + recentBlocks *recentBlocks } // New returns a new Adder with the given rpc Client. The client is used @@ -41,17 +42,24 @@ func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, local bool) * // ensure don't Add something and pin it in direct mode. opts.Mode = api.PinModeRecursive return &DAGService{ - ctx: ctx, - rpcClient: rpc, - dests: nil, - addParams: opts, - local: local, - blocks: make(chan api.NodeWithMeta, 256), + ctx: ctx, + rpcClient: rpc, + dests: nil, + addParams: opts, + local: local, + blocks: make(chan api.NodeWithMeta, 256), + recentBlocks: &recentBlocks{}, } } // Add puts the given node in the destination peers. func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { + // Avoid adding the same node multiple times in a row. + // This is done by the ipfsadd-er, because some nodes are added + // via dagbuilder, then via MFS, and root nodes once more. + if dgs.recentBlocks.Has(node) { + return nil + } // FIXME: can't this happen on initialization? Perhaps the point here // is the adder only allocates and starts streaming when the first @@ -96,6 +104,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { case <-dgs.ctx.Done(): return ctx.Err() case dgs.blocks <- adder.IpldNodeToNodeWithMeta(node): + dgs.recentBlocks.Add(node) return nil } } @@ -152,3 +161,18 @@ func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { } return nil } + +type recentBlocks struct { + blocks [2]cid.Cid + cur int +} + +func (rc *recentBlocks) Add(n ipld.Node) { + rc.blocks[rc.cur] = n.Cid() + rc.cur = (rc.cur + 1) % 2 +} + +func (rc *recentBlocks) Has(n ipld.Node) bool { + c := n.Cid() + return rc.blocks[0].Equals(c) || rc.blocks[1].Equals(c) +} diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 1d0513ff..c1129834 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -596,7 +596,7 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, pin api.Pin) (api.IPFSPinSt } func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) { - logger.Debugf("posting %s", path) + logger.Debugf("posting /%s", path) urlstr := fmt.Sprintf("%s/%s", apiURL, path) req, err := http.NewRequest("POST", urlstr, postBody) @@ -950,9 +950,7 @@ func (ci *chanIterator) Node() files.Node { return nil } ci.seenMu.Lock() - if ci.seen.Visit(ci.current.Cid) { - logger.Debugf("block %s", ci.current.Cid) - } + ci.seen.Add(ci.current.Cid) ci.seenMu.Unlock() return files.NewBytesFile(ci.current.Data) } @@ -1007,6 +1005,7 @@ func (ci *chanIterator) Next() bool { ci.done = true return false } + logger.Debugf("block %s", next.Cid) ci.current = next return true }