2018-08-07 18:01:02 +00:00
// Package sharding implements a sharding ClusterDAGService places
// content in different shards while it's being added, creating
// a final Cluster DAG and pinning it.
package sharding
import (
"context"
"errors"
"fmt"
"time"
2022-06-15 09:19:17 +00:00
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
2018-08-07 18:01:02 +00:00
humanize "github.com/dustin/go-humanize"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
2020-03-13 20:40:02 +00:00
logging "github.com/ipfs/go-log/v2"
2022-09-06 14:57:17 +00:00
peer "github.com/libp2p/go-libp2p/core/peer"
2018-10-17 13:28:03 +00:00
rpc "github.com/libp2p/go-libp2p-gorpc"
2018-08-07 18:01:02 +00:00
)
var logger = logging . Logger ( "shardingdags" )
// DAGService is an implementation of a ClusterDAGService which
// shards content while adding among several IPFS Cluster peers,
// creating a Cluster DAG to track and pin that content selectively
// in the IPFS daemons allocated to it.
type DAGService struct {
adder . BaseDAGService
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
ctx context . Context
2018-08-07 18:01:02 +00:00
rpcClient * rpc . Client
2022-02-28 18:44:04 +00:00
addParams api . AddParams
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
output chan <- api . AddedOutput
2018-08-07 18:01:02 +00:00
addedSet * cid . Set
// Current shard being built
currentShard * shard
// Last flushed shard CID
2018-09-22 01:00:10 +00:00
previousShard cid . Cid
2018-08-07 18:01:02 +00:00
// shard tracking
2018-09-22 01:00:10 +00:00
shards map [ string ] cid . Cid
2018-08-07 18:01:02 +00:00
startTime time . Time
totalSize uint64
}
// New returns a new ClusterDAGService, which uses the given rpc client to perform
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
// Allocate, IPFSStream and Pin requests to other cluster components.
func New ( ctx context . Context , rpc * rpc . Client , opts api . AddParams , out chan <- api . AddedOutput ) * DAGService {
2020-04-21 20:59:14 +00:00
// use a default value for this regardless of what is provided.
opts . Mode = api . PinModeRecursive
2018-08-07 18:01:02 +00:00
return & DAGService {
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
ctx : ctx ,
2018-08-07 18:01:02 +00:00
rpcClient : rpc ,
2022-02-28 18:44:04 +00:00
addParams : opts ,
2018-08-07 18:01:02 +00:00
output : out ,
addedSet : cid . NewSet ( ) ,
2018-09-22 01:00:10 +00:00
shards : make ( map [ string ] cid . Cid ) ,
2018-08-07 18:01:02 +00:00
startTime : time . Now ( ) ,
}
}
// Add puts the given node in its corresponding shard and sends it to the
// destination peers.
2018-08-09 11:22:47 +00:00
func ( dgs * DAGService ) Add ( ctx context . Context , node ipld . Node ) error {
2018-08-07 18:01:02 +00:00
// FIXME: This will grow in memory
2018-08-09 11:22:47 +00:00
if ! dgs . addedSet . Visit ( node . Cid ( ) ) {
2018-08-07 18:01:02 +00:00
return nil
}
2019-08-05 05:01:07 +00:00
return dgs . ingestBlock ( ctx , node )
2018-08-07 18:01:02 +00:00
}
2022-07-08 14:15:49 +00:00
// Close performs cleanup and should be called when the DAGService is not
// going to be used anymore.
func ( dgs * DAGService ) Close ( ) error {
if dgs . currentShard != nil {
dgs . currentShard . Close ( )
}
return nil
}
2018-08-07 18:01:02 +00:00
// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
2022-04-07 11:53:30 +00:00
func ( dgs * DAGService ) Finalize ( ctx context . Context , dataRoot api . Cid ) ( api . Cid , error ) {
2018-08-09 11:22:47 +00:00
lastCid , err := dgs . flushCurrentShard ( ctx )
2018-08-07 18:01:02 +00:00
if err != nil {
2022-04-07 11:53:30 +00:00
return api . NewCid ( lastCid ) , err
2018-08-08 23:16:30 +00:00
}
2022-04-07 11:53:30 +00:00
if ! lastCid . Equals ( dataRoot . Cid ) {
2020-03-13 20:40:02 +00:00
logger . Warnf ( "the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory." , lastCid , dataRoot )
2018-08-07 18:01:02 +00:00
}
2018-08-09 11:22:47 +00:00
clusterDAGNodes , err := makeDAG ( ctx , dgs . shards )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
// PutDAG to ourselves
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
blocks := make ( chan api . NodeWithMeta , 256 )
go func ( ) {
defer close ( blocks )
for _ , n := range clusterDAGNodes {
select {
case <- ctx . Done ( ) :
logger . Error ( ctx . Err ( ) )
return //abort
case blocks <- adder . IpldNodeToNodeWithMeta ( n ) :
}
}
} ( )
// Stream these blocks and wait until we are done.
bs := adder . NewBlockStreamer ( ctx , dgs . rpcClient , [ ] peer . ID { "" } , blocks )
select {
case <- ctx . Done ( ) :
return dataRoot , ctx . Err ( )
case <- bs . Done ( ) :
}
if err := bs . Err ( ) ; err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
clusterDAG := clusterDAGNodes [ 0 ] . Cid ( )
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
dgs . sendOutput ( api . AddedOutput {
2022-03-14 14:45:51 +00:00
Name : fmt . Sprintf ( "%s-clusterDAG" , dgs . addParams . Name ) ,
2022-04-07 11:53:30 +00:00
Cid : api . NewCid ( clusterDAG ) ,
2022-03-14 14:45:51 +00:00
Size : dgs . totalSize ,
Allocations : nil ,
2018-08-07 18:01:02 +00:00
} )
// Pin the ClusterDAG
2022-04-07 11:53:30 +00:00
clusterDAGPin := api . PinWithOpts ( api . NewCid ( clusterDAG ) , dgs . addParams . PinOptions )
2018-08-07 18:01:02 +00:00
clusterDAGPin . ReplicationFactorMin = - 1
clusterDAGPin . ReplicationFactorMax = - 1
clusterDAGPin . MaxDepth = 0 // pin direct
2022-02-28 18:44:04 +00:00
clusterDAGPin . Name = fmt . Sprintf ( "%s-clusterDAG" , dgs . addParams . Name )
2018-08-07 18:01:02 +00:00
clusterDAGPin . Type = api . ClusterDAGType
2019-02-27 17:04:35 +00:00
clusterDAGPin . Reference = & dataRoot
2022-03-14 14:45:51 +00:00
// Update object with response.
2018-08-09 11:22:47 +00:00
err = adder . Pin ( ctx , dgs . rpcClient , clusterDAGPin )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
// Pin the META pin
2022-02-28 18:44:04 +00:00
metaPin := api . PinWithOpts ( dataRoot , dgs . addParams . PinOptions )
2018-08-07 18:01:02 +00:00
metaPin . Type = api . MetaType
2022-04-07 11:53:30 +00:00
ref := api . NewCid ( clusterDAG )
metaPin . Reference = & ref
2018-08-07 18:01:02 +00:00
metaPin . MaxDepth = 0 // irrelevant. Meta-pins are not pinned
2018-08-09 11:22:47 +00:00
err = adder . Pin ( ctx , dgs . rpcClient , metaPin )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
// Log some stats
2018-08-09 11:22:47 +00:00
dgs . logStats ( metaPin . Cid , clusterDAGPin . Cid )
2018-08-07 18:01:02 +00:00
// Consider doing this? Seems like overkill
//
2020-02-03 09:30:04 +00:00
// // Amend ShardPins to reference clusterDAG root hash as a Parent
2018-08-07 18:01:02 +00:00
// shardParents := cid.NewSet()
// shardParents.Add(clusterDAG)
2018-08-09 11:22:47 +00:00
// for shardN, shard := range dgs.shardNodes {
2022-02-28 18:44:04 +00:00
// pin := api.PinWithOpts(shard, dgs.addParams)
2018-08-07 18:01:02 +00:00
// pin.Name := fmt.Sprintf("%s-shard-%s", pin.Name, shardN)
// pin.Type = api.ShardType
// pin.Parents = shardParents
// // FIXME: We don't know anymore the shard pin maxDepth
// // so we'd need to get the pin first.
2018-08-09 11:22:47 +00:00
// err := dgs.pin(pin)
2018-08-07 18:01:02 +00:00
// if err != nil {
// return err
// }
// }
2018-08-08 23:16:30 +00:00
return dataRoot , nil
2018-08-07 18:01:02 +00:00
}
2022-03-14 14:45:51 +00:00
// Allocations returns the current allocations for the current shard.
func ( dgs * DAGService ) Allocations ( ) [ ] peer . ID {
// FIXME: this is probably not safe in concurrency? However, there is
// no concurrent execution of any code in the DAGService I think.
if dgs . currentShard != nil {
return dgs . currentShard . Allocations ( )
}
return nil
}
2018-08-07 18:01:02 +00:00
// ingests a block to the current shard. If it get's full, it
// Flushes the shard and retries with a new one.
2019-08-05 05:01:07 +00:00
func ( dgs * DAGService ) ingestBlock ( ctx context . Context , n ipld . Node ) error {
2018-08-09 11:22:47 +00:00
shard := dgs . currentShard
2018-08-07 18:01:02 +00:00
// if we have no currentShard, create one
if shard == nil {
2022-02-28 18:44:04 +00:00
logger . Infof ( "new shard for '%s': #%d" , dgs . addParams . Name , len ( dgs . shards ) )
2018-08-07 18:01:02 +00:00
var err error
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
// important: shards use the DAGService context.
shard , err = newShard ( dgs . ctx , ctx , dgs . rpcClient , dgs . addParams . PinOptions )
2018-08-07 18:01:02 +00:00
if err != nil {
return err
}
2018-08-09 11:22:47 +00:00
dgs . currentShard = shard
2018-08-07 18:01:02 +00:00
}
2022-02-28 18:44:04 +00:00
logger . Debugf ( "ingesting block %s in shard %d (%s)" , n . Cid ( ) , len ( dgs . shards ) , dgs . addParams . Name )
2019-08-05 05:01:07 +00:00
// this is not same as n.Size()
size := uint64 ( len ( n . RawData ( ) ) )
2018-08-07 18:01:02 +00:00
// add the block to it if it fits and return
2019-08-05 05:01:07 +00:00
if shard . Size ( ) + size < shard . Limit ( ) {
shard . AddLink ( ctx , n . Cid ( ) , size )
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
return dgs . currentShard . sendBlock ( ctx , n )
2018-08-07 18:01:02 +00:00
}
logger . Debugf ( "shard %d full: block: %d. shard: %d. limit: %d" ,
2018-08-09 11:22:47 +00:00
len ( dgs . shards ) ,
2019-08-05 05:01:07 +00:00
size ,
2018-08-07 18:01:02 +00:00
shard . Size ( ) ,
shard . Limit ( ) ,
)
// -------
// Below: block DOES NOT fit in shard
// Flush and retry
// if shard is empty, error
if shard . Size ( ) == 0 {
return errors . New ( "block doesn't fit in empty shard: shard size too small?" )
}
2019-02-27 17:04:35 +00:00
_ , err := dgs . flushCurrentShard ( ctx )
2018-08-07 18:01:02 +00:00
if err != nil {
return err
}
2018-08-09 11:22:47 +00:00
return dgs . ingestBlock ( ctx , n ) // <-- retry ingest
2018-08-07 18:01:02 +00:00
}
2022-04-07 11:53:30 +00:00
func ( dgs * DAGService ) logStats ( metaPin , clusterDAGPin api . Cid ) {
2018-08-09 11:22:47 +00:00
duration := time . Since ( dgs . startTime )
2018-08-07 18:01:02 +00:00
seconds := uint64 ( duration ) / uint64 ( time . Second )
var rate string
if seconds == 0 {
rate = "∞ B"
} else {
2018-08-09 11:22:47 +00:00
rate = humanize . Bytes ( dgs . totalSize / seconds )
2018-08-07 18:01:02 +00:00
}
2018-08-09 11:22:47 +00:00
2020-02-03 09:30:04 +00:00
statsFmt := ` sharding session successful :
2018-08-07 18:01:02 +00:00
CID : % s
ClusterDAG : % s
Total shards : % d
Total size : % s
Total time : % s
Ingest Rate : % s / s
2018-08-09 11:22:47 +00:00
`
logger . Infof (
statsFmt ,
2018-08-07 18:01:02 +00:00
metaPin ,
clusterDAGPin ,
2018-08-09 11:22:47 +00:00
len ( dgs . shards ) ,
humanize . Bytes ( dgs . totalSize ) ,
2018-08-07 18:01:02 +00:00
duration ,
rate ,
)
}
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
func ( dgs * DAGService ) sendOutput ( ao api . AddedOutput ) {
2018-08-09 11:22:47 +00:00
if dgs . output != nil {
dgs . output <- ao
2018-08-07 18:01:02 +00:00
}
}
2018-08-09 11:22:47 +00:00
// flushes the dgs.currentShard and returns the LastLink()
2018-09-22 01:00:10 +00:00
func ( dgs * DAGService ) flushCurrentShard ( ctx context . Context ) ( cid . Cid , error ) {
2018-08-09 11:22:47 +00:00
shard := dgs . currentShard
2018-08-07 18:01:02 +00:00
if shard == nil {
2018-09-22 01:00:10 +00:00
return cid . Undef , errors . New ( "cannot flush a nil shard" )
2018-08-07 18:01:02 +00:00
}
2018-08-09 11:22:47 +00:00
lens := len ( dgs . shards )
2018-08-07 18:01:02 +00:00
2018-08-09 11:22:47 +00:00
shardCid , err := shard . Flush ( ctx , lens , dgs . previousShard )
2018-08-07 18:01:02 +00:00
if err != nil {
return shardCid , err
}
2018-08-09 11:22:47 +00:00
dgs . totalSize += shard . Size ( )
dgs . shards [ fmt . Sprintf ( "%d" , lens ) ] = shardCid
dgs . previousShard = shardCid
dgs . currentShard = nil
Adders: stream blocks to destinations
This commit fixes #810 and adds block streaming to the final destinations when
adding. This should add major performance gains when adding data to clusters.
Before, everytime cluster issued a block, it was broadcasted individually to
all destinations (new libp2p stream), where it was block/put to IPFS (a single
block/put http roundtrip per block).
Now, blocks are streamed all the way from the adder module to the ipfs daemon,
by making every block as it arrives a single part in a multipart block/put
request.
Before, block-broadcast needed to wait for all destinations to finish in order
to process the next block. Now, buffers allow some destinations to be faster
than others while sending and receiving blocks.
Before, if a block put request failed to be broadcasted everywhere, an error
would happen at that moment.
Now, we keep streaming until the end and only then report any errors. The
operation succeeds as long as at least one stream finished successfully.
Errors block/putting to IPFS will not abort streams. Instead, subsequent
blocks are retried with a new request, although the method will return an
error when the stream finishes if there were errors at any point.
2022-03-24 01:17:10 +00:00
dgs . sendOutput ( api . AddedOutput {
2022-03-14 14:45:51 +00:00
Name : fmt . Sprintf ( "shard-%d" , lens ) ,
2022-04-07 11:53:30 +00:00
Cid : api . NewCid ( shardCid ) ,
2022-03-14 14:45:51 +00:00
Size : shard . Size ( ) ,
Allocations : shard . Allocations ( ) ,
2018-08-07 18:01:02 +00:00
} )
return shard . LastLink ( ) , nil
}
// AddMany calls Add for every given node.
2018-08-09 11:22:47 +00:00
func ( dgs * DAGService ) AddMany ( ctx context . Context , nodes [ ] ipld . Node ) error {
2018-08-07 18:01:02 +00:00
for _ , node := range nodes {
2018-08-09 11:22:47 +00:00
err := dgs . Add ( ctx , node )
2018-08-07 18:01:02 +00:00
if err != nil {
return err
}
}
return nil
}