2018-08-07 18:01:02 +00:00
// Package sharding implements a sharding ClusterDAGService places
// content in different shards while it's being added, creating
// a final Cluster DAG and pinning it.
package sharding
import (
"context"
"errors"
"fmt"
"time"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
humanize "github.com/dustin/go-humanize"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
2020-03-13 20:40:02 +00:00
logging "github.com/ipfs/go-log/v2"
2019-06-14 10:41:11 +00:00
peer "github.com/libp2p/go-libp2p-core/peer"
2018-10-17 13:28:03 +00:00
rpc "github.com/libp2p/go-libp2p-gorpc"
2018-08-07 18:01:02 +00:00
)
var logger = logging . Logger ( "shardingdags" )
// DAGService is an implementation of a ClusterDAGService which
// shards content while adding among several IPFS Cluster peers,
// creating a Cluster DAG to track and pin that content selectively
// in the IPFS daemons allocated to it.
type DAGService struct {
adder . BaseDAGService
rpcClient * rpc . Client
2022-02-28 18:44:04 +00:00
addParams api . AddParams
output chan <- * api . AddedOutput
2018-08-07 18:01:02 +00:00
addedSet * cid . Set
// Current shard being built
currentShard * shard
// Last flushed shard CID
2018-09-22 01:00:10 +00:00
previousShard cid . Cid
2018-08-07 18:01:02 +00:00
// shard tracking
2018-09-22 01:00:10 +00:00
shards map [ string ] cid . Cid
2018-08-07 18:01:02 +00:00
startTime time . Time
totalSize uint64
}
// New returns a new ClusterDAGService, which uses the given rpc client to perform
// Allocate, IPFSBlockPut and Pin requests to other cluster components.
2022-02-28 18:44:04 +00:00
func New ( rpc * rpc . Client , opts api . AddParams , out chan <- * api . AddedOutput ) * DAGService {
2020-04-21 20:59:14 +00:00
// use a default value for this regardless of what is provided.
opts . Mode = api . PinModeRecursive
2018-08-07 18:01:02 +00:00
return & DAGService {
rpcClient : rpc ,
2022-02-28 18:44:04 +00:00
addParams : opts ,
2018-08-07 18:01:02 +00:00
output : out ,
addedSet : cid . NewSet ( ) ,
2018-09-22 01:00:10 +00:00
shards : make ( map [ string ] cid . Cid ) ,
2018-08-07 18:01:02 +00:00
startTime : time . Now ( ) ,
}
}
// Add puts the given node in its corresponding shard and sends it to the
// destination peers.
2018-08-09 11:22:47 +00:00
func ( dgs * DAGService ) Add ( ctx context . Context , node ipld . Node ) error {
2018-08-07 18:01:02 +00:00
// FIXME: This will grow in memory
2018-08-09 11:22:47 +00:00
if ! dgs . addedSet . Visit ( node . Cid ( ) ) {
2018-08-07 18:01:02 +00:00
return nil
}
2019-08-05 05:01:07 +00:00
return dgs . ingestBlock ( ctx , node )
2018-08-07 18:01:02 +00:00
}
// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
2018-09-22 01:00:10 +00:00
func ( dgs * DAGService ) Finalize ( ctx context . Context , dataRoot cid . Cid ) ( cid . Cid , error ) {
2018-08-09 11:22:47 +00:00
lastCid , err := dgs . flushCurrentShard ( ctx )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return lastCid , err
}
if ! lastCid . Equals ( dataRoot ) {
2020-03-13 20:40:02 +00:00
logger . Warnf ( "the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory." , lastCid , dataRoot )
2018-08-07 18:01:02 +00:00
}
2018-08-09 11:22:47 +00:00
clusterDAGNodes , err := makeDAG ( ctx , dgs . shards )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
// PutDAG to ourselves
2019-08-05 05:01:07 +00:00
err = adder . NewBlockAdder ( dgs . rpcClient , [ ] peer . ID { "" } ) . AddMany ( ctx , clusterDAGNodes )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
clusterDAG := clusterDAGNodes [ 0 ] . Cid ( )
2018-08-09 11:22:47 +00:00
dgs . sendOutput ( & api . AddedOutput {
2022-02-28 18:44:04 +00:00
Name : fmt . Sprintf ( "%s-clusterDAG" , dgs . addParams . Name ) ,
2019-03-06 18:48:25 +00:00
Cid : clusterDAG ,
2018-10-03 21:03:30 +00:00
Size : dgs . totalSize ,
2018-08-07 18:01:02 +00:00
} )
// Pin the ClusterDAG
2022-02-28 18:44:04 +00:00
clusterDAGPin := api . PinWithOpts ( clusterDAG , dgs . addParams . PinOptions )
2018-08-07 18:01:02 +00:00
clusterDAGPin . ReplicationFactorMin = - 1
clusterDAGPin . ReplicationFactorMax = - 1
clusterDAGPin . MaxDepth = 0 // pin direct
2022-02-28 18:44:04 +00:00
clusterDAGPin . Name = fmt . Sprintf ( "%s-clusterDAG" , dgs . addParams . Name )
2018-08-07 18:01:02 +00:00
clusterDAGPin . Type = api . ClusterDAGType
2019-02-27 17:04:35 +00:00
clusterDAGPin . Reference = & dataRoot
2018-08-09 11:22:47 +00:00
err = adder . Pin ( ctx , dgs . rpcClient , clusterDAGPin )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
// Pin the META pin
2022-02-28 18:44:04 +00:00
metaPin := api . PinWithOpts ( dataRoot , dgs . addParams . PinOptions )
2018-08-07 18:01:02 +00:00
metaPin . Type = api . MetaType
2019-02-27 17:04:35 +00:00
metaPin . Reference = & clusterDAG
2018-08-07 18:01:02 +00:00
metaPin . MaxDepth = 0 // irrelevant. Meta-pins are not pinned
2018-08-09 11:22:47 +00:00
err = adder . Pin ( ctx , dgs . rpcClient , metaPin )
2018-08-07 18:01:02 +00:00
if err != nil {
2018-08-08 23:16:30 +00:00
return dataRoot , err
2018-08-07 18:01:02 +00:00
}
// Log some stats
2018-08-09 11:22:47 +00:00
dgs . logStats ( metaPin . Cid , clusterDAGPin . Cid )
2018-08-07 18:01:02 +00:00
// Consider doing this? Seems like overkill
//
2020-02-03 09:30:04 +00:00
// // Amend ShardPins to reference clusterDAG root hash as a Parent
2018-08-07 18:01:02 +00:00
// shardParents := cid.NewSet()
// shardParents.Add(clusterDAG)
2018-08-09 11:22:47 +00:00
// for shardN, shard := range dgs.shardNodes {
2022-02-28 18:44:04 +00:00
// pin := api.PinWithOpts(shard, dgs.addParams)
2018-08-07 18:01:02 +00:00
// pin.Name := fmt.Sprintf("%s-shard-%s", pin.Name, shardN)
// pin.Type = api.ShardType
// pin.Parents = shardParents
// // FIXME: We don't know anymore the shard pin maxDepth
// // so we'd need to get the pin first.
2018-08-09 11:22:47 +00:00
// err := dgs.pin(pin)
2018-08-07 18:01:02 +00:00
// if err != nil {
// return err
// }
// }
2018-08-08 23:16:30 +00:00
return dataRoot , nil
2018-08-07 18:01:02 +00:00
}
// ingests a block to the current shard. If it get's full, it
// Flushes the shard and retries with a new one.
2019-08-05 05:01:07 +00:00
func ( dgs * DAGService ) ingestBlock ( ctx context . Context , n ipld . Node ) error {
2018-08-09 11:22:47 +00:00
shard := dgs . currentShard
2018-08-07 18:01:02 +00:00
// if we have no currentShard, create one
if shard == nil {
2022-02-28 18:44:04 +00:00
logger . Infof ( "new shard for '%s': #%d" , dgs . addParams . Name , len ( dgs . shards ) )
2018-08-07 18:01:02 +00:00
var err error
2022-02-28 18:44:04 +00:00
shard , err = newShard ( ctx , dgs . rpcClient , dgs . addParams . PinOptions )
2018-08-07 18:01:02 +00:00
if err != nil {
return err
}
2018-08-09 11:22:47 +00:00
dgs . currentShard = shard
2018-08-07 18:01:02 +00:00
}
2022-02-28 18:44:04 +00:00
logger . Debugf ( "ingesting block %s in shard %d (%s)" , n . Cid ( ) , len ( dgs . shards ) , dgs . addParams . Name )
2019-08-05 05:01:07 +00:00
// this is not same as n.Size()
size := uint64 ( len ( n . RawData ( ) ) )
2018-08-07 18:01:02 +00:00
// add the block to it if it fits and return
2019-08-05 05:01:07 +00:00
if shard . Size ( ) + size < shard . Limit ( ) {
shard . AddLink ( ctx , n . Cid ( ) , size )
return dgs . currentShard . ba . Add ( ctx , n )
2018-08-07 18:01:02 +00:00
}
logger . Debugf ( "shard %d full: block: %d. shard: %d. limit: %d" ,
2018-08-09 11:22:47 +00:00
len ( dgs . shards ) ,
2019-08-05 05:01:07 +00:00
size ,
2018-08-07 18:01:02 +00:00
shard . Size ( ) ,
shard . Limit ( ) ,
)
// -------
// Below: block DOES NOT fit in shard
// Flush and retry
// if shard is empty, error
if shard . Size ( ) == 0 {
return errors . New ( "block doesn't fit in empty shard: shard size too small?" )
}
2019-02-27 17:04:35 +00:00
_ , err := dgs . flushCurrentShard ( ctx )
2018-08-07 18:01:02 +00:00
if err != nil {
return err
}
2018-08-09 11:22:47 +00:00
return dgs . ingestBlock ( ctx , n ) // <-- retry ingest
2018-08-07 18:01:02 +00:00
}
2018-09-22 01:00:10 +00:00
func ( dgs * DAGService ) logStats ( metaPin , clusterDAGPin cid . Cid ) {
2018-08-09 11:22:47 +00:00
duration := time . Since ( dgs . startTime )
2018-08-07 18:01:02 +00:00
seconds := uint64 ( duration ) / uint64 ( time . Second )
var rate string
if seconds == 0 {
rate = "∞ B"
} else {
2018-08-09 11:22:47 +00:00
rate = humanize . Bytes ( dgs . totalSize / seconds )
2018-08-07 18:01:02 +00:00
}
2018-08-09 11:22:47 +00:00
2020-02-03 09:30:04 +00:00
statsFmt := ` sharding session successful :
2018-08-07 18:01:02 +00:00
CID : % s
ClusterDAG : % s
Total shards : % d
Total size : % s
Total time : % s
Ingest Rate : % s / s
2018-08-09 11:22:47 +00:00
`
logger . Infof (
statsFmt ,
2018-08-07 18:01:02 +00:00
metaPin ,
clusterDAGPin ,
2018-08-09 11:22:47 +00:00
len ( dgs . shards ) ,
humanize . Bytes ( dgs . totalSize ) ,
2018-08-07 18:01:02 +00:00
duration ,
rate ,
)
}
2018-08-09 11:22:47 +00:00
func ( dgs * DAGService ) sendOutput ( ao * api . AddedOutput ) {
if dgs . output != nil {
dgs . output <- ao
2018-08-07 18:01:02 +00:00
}
}
2018-08-09 11:22:47 +00:00
// flushes the dgs.currentShard and returns the LastLink()
2018-09-22 01:00:10 +00:00
func ( dgs * DAGService ) flushCurrentShard ( ctx context . Context ) ( cid . Cid , error ) {
2018-08-09 11:22:47 +00:00
shard := dgs . currentShard
2018-08-07 18:01:02 +00:00
if shard == nil {
2018-09-22 01:00:10 +00:00
return cid . Undef , errors . New ( "cannot flush a nil shard" )
2018-08-07 18:01:02 +00:00
}
2018-08-09 11:22:47 +00:00
lens := len ( dgs . shards )
2018-08-07 18:01:02 +00:00
2018-08-09 11:22:47 +00:00
shardCid , err := shard . Flush ( ctx , lens , dgs . previousShard )
2018-08-07 18:01:02 +00:00
if err != nil {
return shardCid , err
}
2018-08-09 11:22:47 +00:00
dgs . totalSize += shard . Size ( )
dgs . shards [ fmt . Sprintf ( "%d" , lens ) ] = shardCid
dgs . previousShard = shardCid
dgs . currentShard = nil
dgs . sendOutput ( & api . AddedOutput {
2018-08-07 18:01:02 +00:00
Name : fmt . Sprintf ( "shard-%d" , lens ) ,
2019-03-06 18:48:25 +00:00
Cid : shardCid ,
2018-10-03 21:03:30 +00:00
Size : shard . Size ( ) ,
2018-08-07 18:01:02 +00:00
} )
return shard . LastLink ( ) , nil
}
// AddMany calls Add for every given node.
2018-08-09 11:22:47 +00:00
func ( dgs * DAGService ) AddMany ( ctx context . Context , nodes [ ] ipld . Node ) error {
2018-08-07 18:01:02 +00:00
for _ , node := range nodes {
2018-08-09 11:22:47 +00:00
err := dgs . Add ( ctx , node )
2018-08-07 18:01:02 +00:00
if err != nil {
return err
}
}
return nil
}