// Package single implements a ClusterDAGService that chunks and adds content // to cluster without sharding, before pinning it. package single import ( "context" "sync" adder "github.com/ipfs-cluster/ipfs-cluster/adder" "github.com/ipfs-cluster/ipfs-cluster/api" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) var logger = logging.Logger("singledags") var _ = logger // otherwise unused // DAGService is an implementation of an adder.ClusterDAGService which // puts the added blocks directly in the peers allocated to them (without // sharding). type DAGService struct { adder.BaseDAGService ctx context.Context rpcClient *rpc.Client dests []peer.ID addParams api.AddParams local bool bs *adder.BlockStreamer blocks chan api.NodeWithMeta closeBlocksOnce sync.Once recentBlocks *recentBlocks } // New returns a new Adder with the given rpc Client. The client is used // to perform calls to IPFS.BlockStream and Pin content on Cluster. func New(ctx context.Context, rpc *rpc.Client, opts api.AddParams, local bool) *DAGService { // ensure don't Add something and pin it in direct mode. opts.Mode = api.PinModeRecursive return &DAGService{ ctx: ctx, rpcClient: rpc, dests: nil, addParams: opts, local: local, blocks: make(chan api.NodeWithMeta, 256), recentBlocks: &recentBlocks{}, } } // Add puts the given node in the destination peers. func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { // Avoid adding the same node multiple times in a row. // This is done by the ipfsadd-er, because some nodes are added // via dagbuilder, then via MFS, and root nodes once more. if dgs.recentBlocks.Has(node) { return nil } // FIXME: can't this happen on initialization? Perhaps the point here // is the adder only allocates and starts streaming when the first // block arrives and not on creation. if dgs.dests == nil { dests, err := adder.BlockAllocate(ctx, dgs.rpcClient, dgs.addParams.PinOptions) if err != nil { return err } hasLocal := false localPid := dgs.rpcClient.ID() for i, d := range dests { if d == localPid || d == "" { hasLocal = true // ensure our allocs do not carry an empty peer // mostly an issue with testing mocks dests[i] = localPid } } dgs.dests = dests if dgs.local { // If this is a local pin, make sure that the local // peer is among the allocations.. // UNLESS user-allocations are defined! if !hasLocal && localPid != "" && len(dgs.addParams.UserAllocations) == 0 { // replace last allocation with local peer dgs.dests[len(dgs.dests)-1] = localPid } dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, []peer.ID{localPid}, dgs.blocks) } else { dgs.bs = adder.NewBlockStreamer(dgs.ctx, dgs.rpcClient, dgs.dests, dgs.blocks) } } select { case <-ctx.Done(): return ctx.Err() case <-dgs.ctx.Done(): return ctx.Err() case dgs.blocks <- adder.IpldNodeToNodeWithMeta(node): dgs.recentBlocks.Add(node) return nil } } // Close cleans up the DAGService. func (dgs *DAGService) Close() error { dgs.closeBlocksOnce.Do(func() { close(dgs.blocks) }) return nil } // Finalize pins the last Cid added to this DAGService. func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) { // Close the blocks channel dgs.Close() // Wait for the BlockStreamer to finish. select { case <-dgs.ctx.Done(): return root, ctx.Err() case <-ctx.Done(): return root, ctx.Err() case <-dgs.bs.Done(): } // If the streamer failed to put blocks. if err := dgs.bs.Err(); err != nil { return root, err } // Do not pin, just block put. // Why? Because some people are uploading CAR files with partial DAGs // and ideally they should be pinning only when the last partial CAR // is uploaded. This gives them that option. if dgs.addParams.NoPin { return root, nil } // Cluster pin the result rootPin := api.PinWithOpts(root, dgs.addParams.PinOptions) rootPin.Allocations = dgs.dests return root, adder.Pin(ctx, dgs.rpcClient, rootPin) } // Allocations returns the add destinations decided by the DAGService. func (dgs *DAGService) Allocations() []peer.ID { // using rpc clients without a host results in an empty peer // which cannot be parsed to peer.ID on deserialization. if len(dgs.dests) == 1 && dgs.dests[0] == "" { return nil } return dgs.dests } // AddMany calls Add for every given node. func (dgs *DAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { for _, node := range nodes { err := dgs.Add(ctx, node) if err != nil { return err } } return nil } type recentBlocks struct { blocks [2]cid.Cid cur int } func (rc *recentBlocks) Add(n ipld.Node) { rc.blocks[rc.cur] = n.Cid() rc.cur = (rc.cur + 1) % 2 } func (rc *recentBlocks) Has(n ipld.Node) bool { c := n.Cid() return rc.blocks[0].Equals(c) || rc.blocks[1].Equals(c) }