diff --git a/add/add.go b/add/add.go deleted file mode 100644 index c5d74241..00000000 --- a/add/add.go +++ /dev/null @@ -1,227 +0,0 @@ -package ipfscluster - -import ( - "context" - "errors" - "mime/multipart" - - "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/importer" - - rpc "github.com/hsanjuan/go-libp2p-gorpc" - "github.com/ipfs/go-ipfs-cmdkit/files" - logging "github.com/ipfs/go-log" -) - -// AddSession stores utilities of the calling component needed for the add -type AddSession struct { - rpcClient *rpc.Client - logger logging.EventLogger -} - -// NewAddSession creates a new AddSession for adding a file -func NewAddSession(rpcClient *rpc.Client, logger logging.EventLogger) *AddSession { - return &AddSession{ - rpcClient: rpcClient, - logger: logger, - } -} - -func (a *AddSession) consumeLocalAdd( - arg string, - outObj *api.NodeWithMeta, - replMin, replMax int, -) (string, error) { - //TODO: when ipfs add starts supporting formats other than - // v0 (v1.cbor, v1.protobuf) we'll need to update this - outObj.Format = "" - err := a.rpcClient.Call( - "", - "Cluster", - "IPFSBlockPut", - *outObj, - &struct{}{}, - ) - - return outObj.Cid, err // root node returned in case this is last call -} - -func (a *AddSession) finishLocalAdd(rootCid string, replMin, replMax int) error { - if rootCid == "" { - return errors.New("no record of root to pin") - } - - pinS := api.PinSerial{ - Cid: rootCid, - Type: int(api.DataType), - Recursive: true, - ReplicationFactorMin: replMin, - ReplicationFactorMax: replMax, - } - return a.rpcClient.Call( - "", - "Cluster", - "Pin", - pinS, - &struct{}{}, - ) -} - -func (a *AddSession) consumeShardAdd( - shardID string, - outObj *api.NodeWithMeta, - replMin, replMax int, -) (string, error) { - outObj.ID = shardID - outObj.ReplMax = replMax - outObj.ReplMin = replMin - var retStr string - err := a.rpcClient.Call( - "", - "Cluster", - "SharderAddNode", - *outObj, - &retStr, - ) - - return retStr, err -} - -func (a *AddSession) finishShardAdd( - shardID string, - replMin, replMax int, -) error { - if shardID == "" { - return errors.New("bad state: shardID passed incorrectly") - } - return a.rpcClient.Call( - "", - "Cluster", - "SharderFinalize", - shardID, - &struct{}{}, - ) -} - -func (a *AddSession) consumeImport(ctx context.Context, - outChan <-chan *api.NodeWithMeta, - printChan <-chan *api.AddedOutput, - errChan <-chan error, - consume func(string, *api.NodeWithMeta, int, int) (string, error), - finish func(string, int, int) error, - replMin int, replMax int, -) ([]api.AddedOutput, error) { - var err error - openChs := 3 - toPrint := make([]api.AddedOutput, 0) - arg := "" - - for { - if openChs == 0 { - break - } - - // Consume signals from importer. Errors resulting from - select { - // Ensure we terminate reading from importer after cancellation - // but do not block - case <-ctx.Done(): - err = errors.New("context timeout terminated add") - return nil, err - // Terminate session when importer throws an error - case err, ok := <-errChan: - if !ok { - openChs-- - errChan = nil - continue - } - return nil, err - - // Send status information to client for user output - case printObj, ok := <-printChan: - //TODO: if we support progress bar we must update this - if !ok { - openChs-- - printChan = nil - continue - } - toPrint = append(toPrint, *printObj) - // Consume ipld node output by importer - case outObj, ok := <-outChan: - if !ok { - openChs-- - outChan = nil - continue - } - - arg, err = consume(arg, outObj, replMin, replMax) - if err != nil { - return nil, err - } - } - } - - if err := finish(arg, replMin, replMax); err != nil { - return nil, err - } - a.logger.Debugf("succeeding file import") - return toPrint, nil -} - -// AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer -// pipeline is used to DAGify the file. Depending on input parameters this -// DAG can be added locally to the calling cluster peer's ipfs repo, or -// sharded across the entire cluster. -func (a *AddSession) AddFile(ctx context.Context, - reader *multipart.Reader, - params api.AddParams, -) ([]api.AddedOutput, error) { - layout := params.Layout - trickle := false - if layout == "trickle" { - trickle = true - } - chunker := params.Chunker - raw := params.Raw - hidden := params.Hidden - - f := &files.MultipartFile{ - Mediatype: "multipart/form-data", - Reader: reader, - } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - printChan, outChan, errChan := importer.ToChannel( - ctx, - f, - hidden, - trickle, - raw, - chunker, - ) - - shard := params.Shard - replMin := params.Rmin - replMax := params.Rmax - - var consume func(string, *api.NodeWithMeta, int, int) (string, error) - var finish func(string, int, int) error - if shard { - consume = a.consumeShardAdd - finish = a.finishShardAdd - } else { - consume = a.consumeLocalAdd - finish = a.finishLocalAdd - - } - return a.consumeImport( - ctx, - outChan, - printChan, - errChan, - consume, - finish, - replMin, - replMax, - ) -} diff --git a/adder/adder.go b/adder/adder.go new file mode 100644 index 00000000..86dfd46a --- /dev/null +++ b/adder/adder.go @@ -0,0 +1,14 @@ +package adder + +import ( + "context" + "mime/multipart" + + logging "github.com/ipfs/go-log" +) + +var logger = logging.Logger("adder") + +type Adder interface { + FromMultipart(context.Context, *multipart.Reader, *Params) error +} diff --git a/adder/dagservice.go b/adder/dagservice.go new file mode 100644 index 00000000..f30fabcd --- /dev/null +++ b/adder/dagservice.go @@ -0,0 +1,123 @@ +package adder + +import ( + "context" + "errors" + "fmt" + + "github.com/ipfs/ipfs-cluster/api" + + cid "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" +) + +var errNotFound = errors.New("dagservice: block not found") + +func isNotFound(err error) bool { + return err == errNotFound +} + +// adderDAGService implemengs a DAG Service and +// outputs any nodes added using this service to an Added. +type adderDAGService struct { + addedSet *cid.Set + addedChan chan<- *api.NodeWithMeta +} + +func newAdderDAGService(ch chan *api.NodeWithMeta) ipld.DAGService { + set := cid.NewSet() + + return &adderDAGService{ + addedSet: set, + addedChan: ch, + } +} + +// Add passes the provided node through the output channel +func (dag *adderDAGService) Add(ctx context.Context, node ipld.Node) error { + // FIXME ? This set will grow in memory. + // Maybe better to use a bloom filter + ok := dag.addedSet.Visit(node.Cid()) + if !ok { + // don't re-add + return nil + } + + size, err := node.Size() + if err != nil { + return err + } + nodeSerial := api.NodeWithMeta{ + Cid: node.Cid().String(), + Data: node.RawData(), + Size: size, + } + + if uint64(len(nodeSerial.Data)) != size { + logger.Warningf("fixme: node size doesnt match raw data length") + } + + select { + case dag.addedChan <- &nodeSerial: + return nil + case <-ctx.Done(): + close(dag.addedChan) + return errors.New("canceled context preempted dagservice add") + } +} + +// AddMany passes the provided nodes through the output channel +func (dag *adderDAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { + for _, node := range nodes { + err := dag.Add(ctx, node) + if err != nil { + return err + } + } + return nil +} + +// Get always returns errNotFound +func (dag *adderDAGService) Get(ctx context.Context, key *cid.Cid) (ipld.Node, error) { + return nil, errNotFound +} + +// GetMany returns an output channel that always emits an error +func (dag *adderDAGService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption { + out := make(chan *ipld.NodeOption, 1) + out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")} + close(out) + return out +} + +// Remove is a nop +func (dag *adderDAGService) Remove(ctx context.Context, key *cid.Cid) error { + return nil +} + +// RemoveMany is a nop +func (dag *adderDAGService) RemoveMany(ctx context.Context, keys []*cid.Cid) error { + return nil +} + +// printDAGService will "add" a node by printing it. printDAGService cannot Get nodes +// that have already been seen and calls to Remove are noops. Nodes are +// recorded after being added so that they will only be printed once. +type printDAGService struct { + ads ipld.DAGService +} + +func newPDagService() *printDAGService { + ch := make(chan *api.NodeWithMeta) + ads := newAdderDAGService(ch) + + go func() { + for n := range ch { + fmt.Printf(n.Cid, " | ", n.Size) + } + }() + + return &printDAGService{ + ads: ads, + } +} diff --git a/adder/importer.go b/adder/importer.go new file mode 100644 index 00000000..f302a931 --- /dev/null +++ b/adder/importer.go @@ -0,0 +1,176 @@ +package adder + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + "github.com/ipfs/go-ipfs-cmdkit/files" + + "github.com/ipfs/ipfs-cluster/adder/ipfsadd" + "github.com/ipfs/ipfs-cluster/api" +) + +// BlockHandler is a function used to process a block as is received by the +// Importer. Used in Importer.Run(). +type BlockHandler func(ctx context.Context, n *api.NodeWithMeta) (string, error) + +// Importer facilitates converting a file into a stream +// of chunked blocks. +type Importer struct { + startedMux sync.Mutex + started bool + + files files.File + params *Params + + output chan *api.AddedOutput + blocks chan *api.NodeWithMeta + errors chan error +} + +// NewImporter sets up an Importer ready to Go(). +func NewImporter( + f files.File, + p *Params, +) (*Importer, error) { + output := make(chan *api.AddedOutput, 1) + blocks := make(chan *api.NodeWithMeta, 1) + errors := make(chan error, 1) + + return &Importer{ + started: false, + files: f, + params: p, + output: output, + blocks: blocks, + errors: errors, + }, nil +} + +// Output returns a channel where information about each +// added block is sent. +func (imp *Importer) Output() <-chan *api.AddedOutput { + return imp.output +} + +// Blocks returns a channel where each imported block is sent. +func (imp *Importer) Blocks() <-chan *api.NodeWithMeta { + return imp.blocks +} + +// Errors returns a channel to which any errors during the import +// process are sent. +func (imp *Importer) Errors() <-chan error { + return imp.errors +} + +func (imp *Importer) start() bool { + imp.startedMux.Lock() + defer imp.startedMux.Unlock() + retVal := imp.started + imp.started = true + return !retVal +} + +// ImportFile chunks a File and sends the results (blocks) to the +// importer channels. +func (imp *Importer) Go(ctx context.Context) error { + if !imp.start() { + return errors.New("importing process already started or finished.") + } + + dagsvc := newAdderDAGService(imp.blocks) + + ipfsAdder, err := ipfsadd.NewAdder(ctx, dagsvc) + if err != nil { + return err + } + + ipfsAdder.Hidden = imp.params.Hidden + ipfsAdder.Trickle = imp.params.Layout == "trickle" + ipfsAdder.RawLeaves = imp.params.RawLeaves + ipfsAdder.Wrap = true + ipfsAdder.Chunker = imp.params.Chunker + ipfsAdder.Out = imp.output + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(imp.output) + defer close(imp.blocks) + defer close(imp.errors) + + for { + select { + case <-ctx.Done(): + imp.errors <- ctx.Err() + return + } + + f, err := imp.files.NextFile() + if err != nil { + if err == io.EOF { + break // time to finalize + } + imp.errors <- err + return + } + + if err := ipfsAdder.AddFile(f); err != nil { + imp.errors <- err + return + } + } + + _, err := ipfsAdder.Finalize() + if err != nil { + if !isNotFound(err) { + fmt.Println("fixme importer.go", err) + } else { + imp.errors <- err + } + } + }() + return nil +} + +// Run calls the given BlockHandler every node read from the importer. +// It returns the value returned by the last-called BlockHandler. +func (imp *Importer) Run(ctx context.Context, blockF BlockHandler) (string, error) { + var retVal string + + errors := imp.Errors() + blocks := imp.Blocks() + + err := imp.Go(ctx) + if err != nil { + return retVal, err + } + + for { + select { + case <-ctx.Done(): + return retVal, ctx.Err() + case err, ok := <-errors: + if ok { + return retVal, err + } + case node, ok := <-blocks: + if !ok { + break // finished importing + } + retVal, err := blockF(ctx, node) + if err != nil { + return retVal, err + } + } + } + + // grab any last errors from errors if necessary + // (this waits for errors to be closed) + err = <-errors + return retVal, err +} diff --git a/importer/import_test.go b/adder/importer_test.go similarity index 97% rename from importer/import_test.go rename to adder/importer_test.go index c6fb9e40..2f1a10ad 100644 --- a/importer/import_test.go +++ b/adder/importer_test.go @@ -1,11 +1,11 @@ -package importer +package adder import ( "context" "testing" - "github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/test" ) // import and receive all blocks @@ -76,8 +76,6 @@ func listenErrCh(t *testing.T, errChan <-chan error) { t.Fatal(err) } } - - // testChannelOutput is a utility for shared functionality of output and print // channel testing @@ -85,7 +83,7 @@ func testChannelOutput(t *testing.T, objs []interface{}, expected []string) { check := make(map[string]struct{}) for _, obj := range objs { var cid string - switch obj := obj.(type){ + switch obj := obj.(type) { case *api.AddedOutput: cid = obj.Hash case *api.NodeWithMeta: diff --git a/importer/add.go b/adder/ipfsadd/add.go similarity index 92% rename from importer/add.go rename to adder/ipfsadd/add.go index 0df888f1..6b9e0b5c 100644 --- a/importer/add.go +++ b/adder/ipfsadd/add.go @@ -1,4 +1,5 @@ -package importer +// The ipfsadd package is a simplified copy of go-ipfs/core/coreunix/add.go +package ipfsadd import ( "context" @@ -9,26 +10,31 @@ import ( "github.com/ipfs/ipfs-cluster/api" - offline "github.com/ipfs/go-ipfs-exchange-offline" - bserv "github.com/ipfs/go-ipfs/blockservice" + cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-chunker" + files "github.com/ipfs/go-ipfs-cmdkit/files" + posinfo "github.com/ipfs/go-ipfs-posinfo" balanced "github.com/ipfs/go-ipfs/importer/balanced" ihelper "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" dag "github.com/ipfs/go-ipfs/merkledag" mfs "github.com/ipfs/go-ipfs/mfs" unixfs "github.com/ipfs/go-ipfs/unixfs" - - cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - syncds "github.com/ipfs/go-datastore/sync" - bstore "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipfs/go-ipfs-chunker" - files "github.com/ipfs/go-ipfs-cmdkit/files" - posinfo "github.com/ipfs/go-ipfs-posinfo" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" ) +// FIXME +// // error used to i +// var errNotFound = errors.New("dagservice: block not found") + +// func shouldIgnore(err error) bool { +// if err == errNotFound { +// return true +// } +// return false +// } + var log = logging.Logger("coreunix") // how many bytes of progress to wait before sending a progress update message @@ -53,10 +59,9 @@ func (e *ignoreFileError) Error() string { } // NewAdder Returns a new Adder used for a file add operation. -func NewAdder(ctx context.Context, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) { +func NewAdder(ctx context.Context, ds ipld.DAGService) (*Adder, error) { return &Adder{ ctx: ctx, - blockstore: bs, dagService: ds, Progress: false, Hidden: true, @@ -69,7 +74,6 @@ func NewAdder(ctx context.Context, bs bstore.GCBlockstore, ds ipld.DAGService) ( // Adder holds the switches passed to the `add` command. type Adder struct { ctx context.Context - blockstore bstore.GCBlockstore dagService ipld.DAGService Out chan *api.AddedOutput Progress bool @@ -228,9 +232,12 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error { // an fsn file. Outgoing DAGservice does not // store blocks. We recognize it as a file and // keep traversing the directory - if shouldIgnore(err) { - continue - } + //if shouldIgnore(err) { + // continue + //} + // FIXME + fmt.Println("fixme:", err) + continue return err } @@ -408,14 +415,6 @@ func outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error { return nil } -// NewMemoryDagService builds and returns a new mem-datastore. -func NewMemoryDagService() ipld.DAGService { - // build mem-datastore for editor's intermediary nodes - bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - bsrv := bserv.New(bs, offline.Exchange(bs)) - return dag.NewDAGService(bsrv) -} - type progressReader struct { file files.File out chan *api.AddedOutput diff --git a/adder/local/adder.go b/adder/local/adder.go new file mode 100644 index 00000000..8cb6c47d --- /dev/null +++ b/adder/local/adder.go @@ -0,0 +1,81 @@ +// Package local implements an ipfs-cluster Adder that chunks and adds content +// to a local peer, before pinning it. +package local + +import ( + "context" + "mime/multipart" + + "github.com/ipfs/ipfs-cluster/adder" + "github.com/ipfs/ipfs-cluster/api" + + rpc "github.com/hsanjuan/go-libp2p-gorpc" + "github.com/ipfs/go-ipfs-cmdkit/files" + logging "github.com/ipfs/go-log" +) + +var logger = logging.Logger("addlocal") + +type Adder struct { + rpcClient *rpc.Client +} + +func New(rpc *rpc.Client) *Adder { + return &Adder{ + rpcClient: rpc, + } +} + +func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader, p *adder.Params) error { + f := &files.MultipartFile{ + Mediatype: "multipart/form-data", + Reader: r, + } + + // TODO: it should send it to the best allocation + localBlockPut := func(ctx context.Context, n *api.NodeWithMeta) (string, error) { + retVal := n.Cid + err := a.rpcClient.CallContext( + ctx, + "", + "Cluster", + "IPFSBlockPut", + *n, + &struct{}{}, + ) + return retVal, err + } + + importer, err := adder.NewImporter(f, p) + if err != nil { + return err + } + + lastCid, err := importer.Run(ctx, localBlockPut) + if err != nil { + return err + } + + if lastCid == "" { + panic("nothing imported") + } + + // Finally, cluster pin the result + pinS := api.PinSerial{ + Cid: lastCid, + Type: int(api.DataType), + MaxDepth: -1, + PinOptions: api.PinOptions{ + ReplicationFactorMin: p.ReplicationFactorMin, + ReplicationFactorMax: p.ReplicationFactorMax, + Name: p.Name, + }, + } + return a.rpcClient.Call( + "", + "Cluster", + "Pin", + pinS, + &struct{}{}, + ) +} diff --git a/adder/params.go b/adder/params.go new file mode 100644 index 00000000..df568061 --- /dev/null +++ b/adder/params.go @@ -0,0 +1,94 @@ +package adder + +import ( + "errors" + "net/url" + "strconv" + + "github.com/ipfs/ipfs-cluster/api" +) + +// Params contains all of the configurable parameters needed to specify the +// importing process of a file being added to an ipfs-cluster +type Params struct { + api.PinOptions + + Layout string + Chunker string + RawLeaves bool + Hidden bool + Shard bool +} + +// DefaultParams returns a Params object with standard defaults +func DefaultParams() *Params { + return &Params{ + Layout: "", // corresponds to balanced layout + Chunker: "", + RawLeaves: false, + Hidden: false, + Shard: false, + PinOptions: api.PinOptions{ + ReplicationFactorMin: -1, + ReplicationFactorMax: -1, + Name: "", + ShardSize: 100 * 1024 * 1024, // FIXME + }, + } +} + +// ParamsFromQuery parses the Params object from +// a URL.Query(). +// FIXME? Defaults? +func ParamsFromQuery(query url.Values) (*Params, error) { + layout := query.Get("layout") + switch layout { + case "trickle": + case "balanced": + default: + return nil, errors.New("parameter trickle invalid") + } + + chunker := query.Get("chunker") + name := query.Get("name") + raw, err := strconv.ParseBool(query.Get("raw")) + if err != nil { + return nil, errors.New("parameter raw invalid") + } + hidden, err := strconv.ParseBool(query.Get("hidden")) + if err != nil { + return nil, errors.New("parameter hidden invalid") + } + shard, err := strconv.ParseBool(query.Get("shard")) + if err != nil { + return nil, errors.New("parameter shard invalid") + } + replMin, err := strconv.Atoi(query.Get("repl_min")) + if err != nil || replMin < -1 { + return nil, errors.New("parameter repl_min invalid") + } + replMax, err := strconv.Atoi(query.Get("repl_max")) + if err != nil || replMax < -1 { + return nil, errors.New("parameter repl_max invalid") + } + + shardSize, err := strconv.ParseUint(query.Get("shard_size"), 10, 64) + if err != nil { + return nil, errors.New("parameter shard_size is invalid") + } + + params := &Params{ + Layout: layout, + Chunker: chunker, + RawLeaves: raw, + Hidden: hidden, + Shard: shard, + PinOptions: api.PinOptions{ + ReplicationFactorMin: replMin, + ReplicationFactorMax: replMax, + Name: name, + ShardSize: shardSize, + }, + } + return params, nil +} diff --git a/adder/sharding/adder.go b/adder/sharding/adder.go new file mode 100644 index 00000000..e97121e0 --- /dev/null +++ b/adder/sharding/adder.go @@ -0,0 +1,69 @@ +// package sharding implements a sharding adder that chunks and +// shards content while it's added, creating Cluster DAGs and +// pinning them. +package sharding + +import ( + "context" + "mime/multipart" + + "github.com/ipfs/ipfs-cluster/adder" + "github.com/ipfs/ipfs-cluster/api" + + rpc "github.com/hsanjuan/go-libp2p-gorpc" + "github.com/ipfs/go-ipfs-cmdkit/files" + logging "github.com/ipfs/go-log" +) + +var logger = logging.Logger("addshard") + +type Adder struct { + rpcClient *rpc.Client +} + +func New(rpc *rpc.Client) *Adder { + return &Adder{ + rpcClient: rpc, + } +} + +func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader, p *adder.Params) error { + f := &files.MultipartFile{ + Mediatype: "multipart/form-data", + Reader: r, + } + + pinOpts := api.PinOptions{ + ReplicationFactorMin: p.ReplicationFactorMin, + ReplicationFactorMax: p.ReplicationFactorMax, + Name: p.Name, + ShardSize: p.ShardSize, + } + + dagBuilder := newClusterDAGBuilder(a.rpcClient, pinOpts) + // Always stop the builder + defer dagBuilder.Cancel() + + blockSink := dagBuilder.Blocks() + + blockHandle := func(ctx context.Context, n *api.NodeWithMeta) (string, error) { + blockSink <- n + return "", nil + } + + importer, err := adder.NewImporter(f, p) + if err != nil { + return err + } + + _, err = importer.Run(ctx, blockHandle) + if err != nil { + return err + } + + // Trigger shard finalize + close(blockSink) + + <-dagBuilder.Done() // wait for the builder to finish + return nil +} diff --git a/adder/sharding/cluster_dag_builder.go b/adder/sharding/cluster_dag_builder.go new file mode 100644 index 00000000..0a1de93b --- /dev/null +++ b/adder/sharding/cluster_dag_builder.go @@ -0,0 +1,245 @@ +package sharding + +import ( + "context" + "errors" + "fmt" + + "github.com/ipfs/ipfs-cluster/api" + + rpc "github.com/hsanjuan/go-libp2p-gorpc" + cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" +) + +// A clusterDAGBuilder is in charge of creating a full cluster dag upon receiving +// a stream of blocks (NodeWithMeta) +type clusterDAGBuilder struct { + ctx context.Context + cancel context.CancelFunc + + pinOpts api.PinOptions + + rpc *rpc.Client + + blocks chan *api.NodeWithMeta + + // Current shard being built + currentShard *shard + + // shard tracking + shards map[string]*cid.Cid +} + +func newClusterDAGBuilder(rpc *rpc.Client, opts api.PinOptions) *clusterDAGBuilder { + ctx, cancel := context.WithCancel(context.Background()) + + // By caching one node don't block sending something + // to the channel. + blocks := make(chan *api.NodeWithMeta, 1) + + cdb := &clusterDAGBuilder{ + ctx: ctx, + cancel: cancel, + rpc: rpc, + blocks: blocks, + pinOpts: opts, + shards: make(map[string]*cid.Cid), + } + go cdb.ingestBlocks() + return cdb +} + +// Blocks returns a channel on which to send blocks to be processed by this +// clusterDAGBuilder (ingested). Close channel when done. +func (cdb *clusterDAGBuilder) Blocks() chan<- *api.NodeWithMeta { + return cdb.blocks +} + +func (cdb *clusterDAGBuilder) Done() <-chan struct{} { + return cdb.ctx.Done() +} + +func (cdb *clusterDAGBuilder) Cancel() { + cdb.cancel() +} + +// shortcut to pin something +func (cdb *clusterDAGBuilder) pin(p api.Pin) error { + return cdb.rpc.CallContext( + cdb.ctx, + "", + "Cluster", + "Pin", + p.ToSerial(), + &struct{}{}, + ) +} + +// finalize is used to signal that we need to wrap up this clusterDAG +//. It is called when the Blocks() channel is closed. +func (cdb *clusterDAGBuilder) finalize() error { + lastShard := cdb.currentShard + if lastShard == nil { + return errors.New("cannot finalize a ClusterDAG with no shards") + } + + rootCid, err := lastShard.Flush(cdb.ctx, cdb.pinOpts, len(cdb.shards)) + if err != nil { + return err + } + + // Do not forget this shard + cdb.shards[fmt.Sprintf("%d", len(cdb.shards))] = rootCid + + shardNodes, err := makeDAG(cdb.shards) + if err != nil { + return err + } + + err = putDAG(cdb.ctx, cdb.rpc, shardNodes, "") + if err != nil { + return err + } + + dataRootCid := lastShard.LastLink() + clusterDAG := shardNodes[0].Cid() + + // Pin the META pin + metaPin := api.PinWithOpts(dataRootCid, cdb.pinOpts) + metaPin.Type = api.MetaType + metaPin.ClusterDAG = clusterDAG + metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned + err = cdb.pin(metaPin) + if err != nil { + return err + } + + // Pin the ClusterDAG + clusterDAGPin := api.PinCid(clusterDAG) + clusterDAGPin.ReplicationFactorMin = -1 + clusterDAGPin.ReplicationFactorMax = -1 + clusterDAGPin.MaxDepth = 0 // pin direct + clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", cdb.pinOpts.Name) + clusterDAGPin.Type = api.ClusterDAGType + clusterDAGPin.Parents = cid.NewSet() + clusterDAGPin.Parents.Add(dataRootCid) + err = cdb.pin(clusterDAGPin) + if err != nil { + return err + } + + // Consider doing this? Seems like overkill + // + // // Ammend ShardPins to reference clusterDAG root hash as a Parent + // shardParents := cid.NewSet() + // shardParents.Add(clusterDAG) + // for shardN, shard := range cdb.shardNodes { + // pin := api.PinWithOpts(shard, cdb.pinOpts) + // pin.Name := fmt.Sprintf("%s-shard-%s", pin.Name, shardN) + // pin.Type = api.ShardType + // pin.Parents = shardParents + // // FIXME: We don't know anymore the shard pin maxDepth + // err := cdb.pin(pin) + // if err != nil { + // return err + // } + // } + + cdb.cancel() // auto-cancel the builder. We're done. + return nil +} + +func (cdb *clusterDAGBuilder) ingestBlocks() { + // TODO: handle errors somehow + for { + select { + case <-cdb.ctx.Done(): + return + case n, ok := <-cdb.blocks: + if !ok { + err := cdb.finalize() + if err != nil { + logger.Error(err) + // TODO: handle + } + return + } + err := cdb.ingestBlock(n) + if err != nil { + logger.Error(err) + // TODO: handle + } + } + } +} + +// ingests a block to the current shard. If it get's full, it +// Flushes the shard and retries with a new one. +func (cdb *clusterDAGBuilder) ingestBlock(n *api.NodeWithMeta) error { + shard := cdb.currentShard + + // if we have no currentShard, create one + if shard == nil { + var err error + shard, err = newShard(cdb.ctx, cdb.rpc, cdb.pinOpts.ShardSize) + if err != nil { + return err + } + cdb.currentShard = shard + } + + c, err := cid.Decode(n.Cid) + if err != nil { + return err + } + + // add the block to it if it fits and return + if shard.Size()+n.Size < shard.Limit() { + shard.AddLink(c, n.Size) + return cdb.putBlock(n, shard.DestPeer()) + } + + // block doesn't fit in shard + + // if shard is empty, error + if shard.Size() == 0 { + return errors.New("block doesn't fit in empty shard") + } + + // otherwise, shard considered full. Flush and pin result + rootCid, err := shard.Flush(cdb.ctx, cdb.pinOpts, len(cdb.shards)) + if err != nil { + return err + } + // Do not forget this shard + cdb.shards[fmt.Sprintf("%d", len(cdb.shards))] = rootCid + cdb.currentShard = nil + return cdb.ingestBlock(n) // <-- retry ingest +} + +// performs an IPFSBlockPut +func (cdb *clusterDAGBuilder) putBlock(n *api.NodeWithMeta, dest peer.ID) error { + c, err := cid.Decode(n.Cid) + if err != nil { + return err + } + + format, ok := cid.CodecToStr[c.Type()] + if !ok { + format = "" + logger.Warning("unsupported cid type, treating as v0") + } + if c.Prefix().Version == 0 { + format = "v0" + } + n.Format = format + return cdb.rpc.CallContext( + cdb.ctx, + dest, + "Cluster", + "IPFSBlockPut", + n, + &struct{}{}, + ) +} diff --git a/sharder/clusterdag.go b/adder/sharding/dag.go similarity index 85% rename from sharder/clusterdag.go rename to adder/sharding/dag.go index 8ad0dcc5..bbf1296a 100644 --- a/sharder/clusterdag.go +++ b/adder/sharding/dag.go @@ -1,6 +1,6 @@ -package sharder +package sharding -// clusterdag.go defines functions for constructing and parsing ipld-cbor nodes +// dag.go defines functions for constructing and parsing ipld-cbor nodes // of the clusterDAG used to track sharded DAGs in ipfs-cluster // Most logic goes into handling the edge cases in which clusterDAG @@ -17,13 +17,18 @@ package sharder // multiple levels of indirection. import ( + "context" "fmt" + "github.com/ipfs/ipfs-cluster/api" + + rpc "github.com/hsanjuan/go-libp2p-gorpc" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" dag "github.com/ipfs/go-ipfs/merkledag" cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" + peer "github.com/libp2p/go-libp2p-peer" mh "github.com/multiformats/go-multihash" ) @@ -66,11 +71,13 @@ func CborDataToNode(raw []byte, format string) (ipld.Node, error) { // carry links to the data nodes being tracked. The head of the output slice // is always the root of the shardDAG, i.e. the ipld node that should be // recursively pinned to track the shard -func makeDAG(obj shardObj) ([]ipld.Node, error) { +func makeDAG(dagObj map[string]*cid.Cid) ([]ipld.Node, error) { // No indirect node - if len(obj) <= MaxLinks { - node, err := cbor.WrapObject(obj, hashFn, - mh.DefaultLengths[hashFn]) + if len(dagObj) <= MaxLinks { + node, err := cbor.WrapObject( + dagObj, + hashFn, mh.DefaultLengths[hashFn], + ) if err != nil { return nil, err } @@ -79,11 +86,11 @@ func makeDAG(obj shardObj) ([]ipld.Node, error) { // Indirect node required leafNodes := make([]ipld.Node, 0) // shardNodes with links to data indirectObj := make(map[string]*cid.Cid) // shardNode with links to shardNodes - numFullLeaves := len(obj) / MaxLinks + numFullLeaves := len(dagObj) / MaxLinks for i := 0; i <= numFullLeaves; i++ { leafObj := make(map[string]*cid.Cid) for j := 0; j < MaxLinks; j++ { - c, ok := obj[fmt.Sprintf("%d", i*MaxLinks+j)] + c, ok := dagObj[fmt.Sprintf("%d", i*MaxLinks+j)] if !ok { // finished with this leaf before filling all the way if i != numFullLeaves { panic("bad state, should never be here") @@ -109,6 +116,29 @@ func makeDAG(obj shardObj) ([]ipld.Node, error) { return nodes, nil } +func putDAG(ctx context.Context, rpcC *rpc.Client, nodes []ipld.Node, dest peer.ID) error { + for _, n := range nodes { + logger.Debugf("The dag cbor Node Links: %v", n.Links()) + b := api.NodeWithMeta{ + Data: n.RawData(), + Format: "cbor", + } + logger.Debugf("Here is the serialized ipld: %x", b.Data) + err := rpcC.CallContext( + ctx, + dest, + "Cluster", + "IPFSBlockPut", + b, + &struct{}{}, + ) + if err != nil { + return err + } + } + return nil +} + //TODO: decide whether this is worth including. Is precision important for // most usecases? Is being a little over the shard size a serious problem? // Is precision worth the cost to maintain complex accounting for metadata diff --git a/adder/sharding/shard.go b/adder/sharding/shard.go new file mode 100644 index 00000000..f1fae07c --- /dev/null +++ b/adder/sharding/shard.go @@ -0,0 +1,130 @@ +package sharding + +import ( + "context" + "errors" + "fmt" + + "github.com/ipfs/ipfs-cluster/api" + + rpc "github.com/hsanjuan/go-libp2p-gorpc" + cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" +) + +// a shard represents a set of blocks (or bucket) which have been assigned +// a peer to be block-put and will be part of the same shard in the +// cluster DAG. +type shard struct { + rpc *rpc.Client + destPeer peer.ID + // dagNode represents a node with links and will be converted + // to Cbor. + dagNode map[string]*cid.Cid + currentSize uint64 + sizeLimit uint64 +} + +func newShard(ctx context.Context, rpc *rpc.Client, sizeLimit uint64) (*shard, error) { + var allocs []string + // TODO: before it figured out how much freespace there is in the node + // and set the maximum shard size to that. + // I have dropped that functionality. + // It would involve getting metrics manually. + err := rpc.CallContext( + ctx, + "", + "Cluster", + "Allocate", + api.PinSerial{ + Cid: "", + PinOptions: api.PinOptions{ + ReplicationFactorMin: 1, + ReplicationFactorMax: int(^uint(0) >> 1), //max int + }, + }, + &allocs, + ) + if err != nil { + return nil, err + } + + if len(allocs) < 1 { // redundant + return nil, errors.New("cannot allocate new shard") + } + + return &shard{ + rpc: rpc, + destPeer: api.StringsToPeers(allocs)[0], + dagNode: make(map[string]*cid.Cid), + currentSize: 0, + sizeLimit: sizeLimit, + }, nil +} + +// AddLink tries to add a new block to this shard if it's not full. +// Returns true if the block was added +func (sh *shard) AddLink(c *cid.Cid, s uint64) { + linkN := len(sh.dagNode) + linkName := fmt.Sprintf("%d", linkN) + sh.dagNode[linkName] = c + sh.currentSize += s +} + +// DestPeer returns the peer ID on which blocks are put for this shard. +func (sh *shard) DestPeer() peer.ID { + return sh.destPeer +} + +// Flush completes the allocation of this shard by building a CBOR node +// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the +// shard. +func (sh *shard) Flush(ctx context.Context, opts api.PinOptions, shardN int) (*cid.Cid, error) { + logger.Debug("flushing shard") + nodes, err := makeDAG(sh.dagNode) + if err != nil { + return nil, err + } + + putDAG(ctx, sh.rpc, nodes, sh.destPeer) + + rootCid := nodes[0].Cid() + pin := api.PinWithOpts(rootCid, opts) + pin.Name = fmt.Sprintf("%s-shard-%d", opts.Name, shardN) + // this sets dest peer as priority allocation + pin.Allocations = []peer.ID{sh.destPeer} + pin.Type = api.ShardType + pin.MaxDepth = 1 + if len(nodes) > len(sh.dagNode)+1 { // using an indirect graph + pin.MaxDepth = 2 + } + + err = sh.rpc.Call( + sh.destPeer, + "Cluster", + "Pin", + pin.ToSerial(), + &struct{}{}, + ) + return rootCid, err +} + +// Size returns this shard's current size. +func (sh *shard) Size() uint64 { + return sh.currentSize +} + +// Size returns this shard's size limit. +func (sh *shard) Limit() uint64 { + return sh.sizeLimit +} + +// Last returns the last added link. When finishing sharding, +// the last link of the last shard is the data root for the +// full sharded DAG (the CID that would have resulted from +// adding the content to a single IPFS daemon). +func (sh *shard) LastLink() *cid.Cid { + l := len(sh.dagNode) + lastLink := fmt.Sprintf("%d", l-1) + return sh.dagNode[lastLink] +} diff --git a/allocate.go b/allocate.go index 7b8b3eca..a700a25c 100644 --- a/allocate.go +++ b/allocate.go @@ -37,14 +37,22 @@ import ( // ReplicationFactorMin. // allocate finds peers to allocate a hash using the informer and the monitor -// it should only be used with valid replicationFactors (rplMin and rplMax -// which are positive and rplMin <= rplMax). +// it should only be used with valid replicationFactors (if rplMin and rplMax +// are > 0, then rplMin <= rplMax). // It always returns allocations, but if no new allocations are needed, // it will return the current ones. Note that allocate() does not take // into account if the given CID was previously in a "pin everywhere" mode, // and will consider such Pins as currently unallocated ones, providing // new allocations as available. func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) { + if (rplMin + rplMax) == 0 { + return nil, fmt.Errorf("bad replication factors: %d/%d", rplMin, rplMax) + } + + if rplMin < 0 && rplMax < 0 { // allocate everywhere + return []peer.ID{}, nil + } + // Figure out who is holding the CID currentPin, _ := c.getCurrentPin(hash) currentAllocs := currentPin.Allocations diff --git a/api/rest/restapi.go b/api/rest/restapi.go index a377676e..f8d6aa22 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -21,7 +21,9 @@ import ( "sync" "time" - add "github.com/ipfs/ipfs-cluster/add" + "github.com/ipfs/ipfs-cluster/adder" + "github.com/ipfs/ipfs-cluster/adder/local" + "github.com/ipfs/ipfs-cluster/adder/sharding" types "github.com/ipfs/ipfs-cluster/api" mux "github.com/gorilla/mux" @@ -319,6 +321,12 @@ func (api *API) routes() []route { "/allocations/{hash}", api.allocationHandler, }, + { + "Allocate", + "POST", + "/allocations", + api.addHandler, + }, { "StatusAll", "GET", @@ -373,12 +381,6 @@ func (api *API) routes() []route { "/health/graph", api.graphHandler, }, - { - "FilesAdd", - "POST", - "/allocations", - api.addFileHandler, - }, } } @@ -500,62 +502,29 @@ func (api *API) graphHandler(w http.ResponseWriter, r *http.Request) { sendResponse(w, err, graph) } -func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) { +func (api *API) addHandler(w http.ResponseWriter, r *http.Request) { reader, err := r.MultipartReader() if err != nil { sendErrorResponse(w, 400, err.Error()) return } - urlParams := r.URL.Query() - layout := urlParams.Get("layout") - if layout != "" && layout != "trickle" && layout != "balanced" { - sendErrorResponse(w, 400, "parameter trickle invalid") - return - } - chunker := urlParams.Get("chunker") - raw, err := strconv.ParseBool(urlParams.Get("raw")) + params, err := adder.ParamsFromQuery(r.URL.Query()) if err != nil { - sendErrorResponse(w, 400, "parameter raw invalid") + sendErrorResponse(w, 400, err.Error()) return } - hidden, err := strconv.ParseBool(urlParams.Get("hidden")) - if err != nil { - sendErrorResponse(w, 400, "parameter hidden invalid") - return - } - shard, err := strconv.ParseBool(urlParams.Get("shard")) - if err != nil { - sendErrorResponse(w, 400, "parameter shard invalid") - return - } - replMin, err := strconv.Atoi(urlParams.Get("repl_min")) - if err != nil || replMin < -1 { - sendErrorResponse(w, 400, "parameter replMin invalid") - return - } - replMax, err := strconv.Atoi(urlParams.Get("repl_max")) - if err != nil || replMax < -1 { - sendErrorResponse(w, 400, "parameter replMax invalid") - return - } - params := types.AddParams{ - Layout: layout, - Chunker: chunker, - Raw: raw, - Hidden: hidden, - Shard: shard, - Rmin: replMin, - Rmax: replMax, - } - addSess := add.NewAddSession(api.rpcClient, logger) - toPrint, err := addSess.AddFile(api.ctx, reader, params) - if err != nil { - sendErrorResponse(w, 500, err.Error()) - return + var add adder.Adder + if params.Shard { + add = sharding.New(api.rpcClient) + } else { + add = local.New(api.rpcClient) } - sendJSONResponse(w, 200, toPrint) + + err = add.FromMultipart(api.ctx, reader, params) + // TODO: progress? + sendEmptyResponse(w, err) } func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) { @@ -835,7 +804,7 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial { queryValues := r.URL.Query() name := queryValues.Get("name") pin.Name = name - pin.Recursive = true // For now all CLI pins are recursive + pin.MaxDepth = -1 // For now, all pins are recursive rplStr := queryValues.Get("replication_factor") rplStrMin := queryValues.Get("replication_factor_min") rplStrMax := queryValues.Get("replication_factor_max") diff --git a/api/types.go b/api/types.go index 2e38d477..d5fc8f77 100644 --- a/api/types.go +++ b/api/types.go @@ -92,6 +92,7 @@ func TrackerStatusFromString(str string) TrackerStatus { } // IPFSPinStatus values +// FIXME include maxdepth const ( IPFSPinStatusBug IPFSPinStatus = iota IPFSPinStatusError @@ -107,25 +108,37 @@ type IPFSPinStatus int // IPFSPinStatusFromString parses a string and returns the matching // IPFSPinStatus. func IPFSPinStatusFromString(t string) IPFSPinStatus { - // Since indirect statuses are of the form "indirect through ", use a regexp to match + // Since indirect statuses are of the form "indirect through ", + // use a regexp to match var ind, _ = regexp.MatchString("^indirect", t) + var rec, _ = regexp.MatchString("^recursive", t) // TODO: This is only used in the http_connector to parse // ipfs-daemon-returned values. Maybe it should be extended. switch { case ind: return IPFSPinStatusIndirect + case rec: + // FIXME: Maxdepth? + return IPFSPinStatusRecursive case t == "direct": return IPFSPinStatusDirect - case t == "recursive": - return IPFSPinStatusRecursive default: return IPFSPinStatusBug } } -// IsPinned returns true if the status is Direct or Recursive -func (ips IPFSPinStatus) IsPinned() bool { - return ips == IPFSPinStatusDirect || ips == IPFSPinStatusRecursive +// IsPinned returns true if the item is pinned as expected by the +// maxDepth parameter. +func (ips IPFSPinStatus) IsPinned(maxDepth int) bool { + switch { + case maxDepth < 0: + return ips == IPFSPinStatusRecursive + case maxDepth == 0: + return ips == IPFSPinStatusDirect + case maxDepth > 0: // FIXME + return ips == IPFSPinStatusRecursive + } + return false } // ToTrackerStatus converts the IPFSPinStatus value to the @@ -540,29 +553,52 @@ func StringsToCidSet(strs []string) *cid.Set { return cids } -// PinType values +// PinType values. See PinType documentation for further explanation. const ( + // BadType type showing up anywhere indicates a bug BadType PinType = iota + // DataType is a regular, non-sharded pin. It has no parents + // and no ClusterDAG associated. It is pinned recursively + // in allocated peers. DataType + // MetaType tracks the original CID of a sharded DAG and points + // to its ClusterDAG. It has no parents. MetaType - CdagType + // ClusterDAGType pins carry the CID if the root node that points to + // all the shard-root-nodes of the shards in which a DAG has been + // divided. It has a parent (the MetaType). + // ClusterDAGType pins are pinned directly everywhere. + ClusterDAGType + // ShardType pins carry the root CID of a shard, which points + // to individual blocks on the original DAG that the user is adding, + // which has been sharded. + // Their parent is the ClusterDAGType pin. + // ShardTypes are pinned with MaxDepth=1 (root and + // direct children only). ShardType ) // AllType is a PinType used for filtering all pin types -const AllType = -1 +const AllType PinType = -1 -// PinType specifies which of four possible interpretations a pin represents. -// DataType pins are the simplest and represent a pin in the pinset used to -// directly track user data. ShardType pins are metadata pins that track -// many nodes in a user's data DAG. ShardType pins have a parent pin, and in -// general can have many parents. ClusterDAG, or Cdag for short, pins are also -// metadata pins that do not directly track user data DAGs but rather other -// metadata pins. CdagType pins have at least one parent. Finally MetaType -// pins always track the cid of the root of a user-tracked data DAG. However -// MetaType pins are not stored directly in the ipfs pinset. Instead the -// underlying DAG is tracked via the metadata pins underneath the root of a -// CdagType pin +// PinType specifies which sort of Pin object we are dealing with. +// In practice, the PinType decides how a Pin object is treated by the +// PinTracker. +// See descriptions above. +// A sharded Pin would look like: +// +// [ Meta ] (not pinned on IPFS, only present in cluster state) +// | +// v +// [ Cluster DAG ] (pinned everywhere in "direct") +// | .. | +// v v +// [Shard1] .. [ShardN] (allocated to peers and pinned with max-depth=2 +// | | .. | | | .. | +// v v .. v v v .. v +// [][]..[] [][]..[] Blocks (indirectly pinned on ipfs, not tracked in cluster) +// +// type PinType int // PinTypeFromString is the inverse of String. It returns the PinType value @@ -574,7 +610,7 @@ func PinTypeFromString(str string) PinType { case "meta-pin": return MetaType case "clusterdag-pin": - return CdagType + return ClusterDAGType case "shard-pin": return ShardType case "all": @@ -591,7 +627,7 @@ func (pT *PinType) String() string { return "pin" case MetaType: return "meta-pin" - case CdagType: + case ClusterDAGType: return "clusterdag-pin" case ShardType: return "shard-pin" @@ -602,18 +638,36 @@ func (pT *PinType) String() string { } } -// Pin is an argument that carries a Cid. It may carry more things in the -// future. +// PinOptions wraps user-defined options for Pins +type PinOptions struct { + ReplicationFactorMin int `json:"replication_factor_min"` + ReplicationFactorMax int `json:"replication_factor_max"` + Name string `json:"name"` + ShardSize uint64 `json:"shard_size"` +} + +// Pin carries all the information associated to a CID that is pinned +// in IPFS Cluster. type Pin struct { - Cid *cid.Cid - Name string - Type PinType - Allocations []peer.ID - ReplicationFactorMin int - ReplicationFactorMax int - Recursive bool - Parents *cid.Set - Clusterdag *cid.Cid + PinOptions + + Cid *cid.Cid + + // See PinType comments + Type PinType + + // The peers to which this pin is allocated + Allocations []peer.ID + + // MaxDepth associated to this pin. -1 means + // recursive. + MaxDepth int + + // For certain types of pin, a pointer to parents. + Parents *cid.Set + + // For MetaType pins, a pointer to the Cluster DAG (sharded tree) + ClusterDAG *cid.Cid } // PinCid is a shortcut to create a Pin only with a Cid. Default is for pin to @@ -623,21 +677,29 @@ func PinCid(c *cid.Cid) Pin { Cid: c, Type: DataType, Allocations: []peer.ID{}, - Recursive: true, + MaxDepth: -1, } } +func PinWithOpts(c *cid.Cid, opts PinOptions) Pin { + p := PinCid(c) + p.ReplicationFactorMin = opts.ReplicationFactorMin + p.ReplicationFactorMax = opts.ReplicationFactorMax + p.Name = opts.Name + p.ShardSize = opts.ShardSize + return p +} + // PinSerial is a serializable version of Pin type PinSerial struct { - Cid string `json:"cid"` - Name string `json:"name"` - Type int `json:"type"` - Allocations []string `json:"allocations"` - ReplicationFactorMin int `json:"replication_factor_min"` - ReplicationFactorMax int `json:"replication_factor_max"` - Recursive bool `json:"recursive"` - Parents []string `json:"parents"` - Clusterdag string `json:"clusterdag"` + PinOptions + + Cid string `json:"cid"` + Type int `json:"type"` + Allocations []string `json:"allocations"` + MaxDepth int `json:"max_depth"` + Parents []string `json:"parents"` + ClusterDAG string `json:"clusterdag"` } // ToSerial converts a Pin to PinSerial. @@ -647,8 +709,8 @@ func (pin Pin) ToSerial() PinSerial { c = pin.Cid.String() } cdag := "" - if pin.Clusterdag != nil { - cdag = pin.Clusterdag.String() + if pin.ClusterDAG != nil { + cdag = pin.ClusterDAG.String() } n := pin.Name @@ -659,15 +721,18 @@ func (pin Pin) ToSerial() PinSerial { } return PinSerial{ - Cid: c, - Name: n, - Allocations: allocs, - Type: int(pin.Type), - ReplicationFactorMin: pin.ReplicationFactorMin, - ReplicationFactorMax: pin.ReplicationFactorMax, - Recursive: pin.Recursive, - Parents: parents, - Clusterdag: cdag, + Cid: c, + Allocations: allocs, + Type: int(pin.Type), + MaxDepth: pin.MaxDepth, + Parents: parents, + ClusterDAG: cdag, + PinOptions: PinOptions{ + Name: n, + ReplicationFactorMin: pin.ReplicationFactorMin, + ReplicationFactorMax: pin.ReplicationFactorMax, + ShardSize: pin.ShardSize, + }, } } @@ -690,7 +755,11 @@ func (pin Pin) Equals(pin2 Pin) bool { return false } - if pin1s.Recursive != pin2s.Recursive { + if pin1s.MaxDepth != pin2s.MaxDepth { + return false + } + + if pin1s.ShardSize != pin2s.ShardSize { return false } @@ -709,7 +778,7 @@ func (pin Pin) Equals(pin2 Pin) bool { return false } - if pin1s.Clusterdag != pin2s.Clusterdag { + if pin1s.ClusterDAG != pin2s.ClusterDAG { return false } @@ -730,48 +799,26 @@ func (pins PinSerial) ToPin() Pin { logger.Debug(pins.Cid, err) } var cdag *cid.Cid - if pins.Clusterdag != "" { - cdag, err = cid.Decode(pins.Clusterdag) + if pins.ClusterDAG != "" { + cdag, err = cid.Decode(pins.ClusterDAG) if err != nil { - logger.Error(pins.Clusterdag, err) + logger.Error(pins.ClusterDAG, err) } } return Pin{ - Cid: c, - Name: pins.Name, - Allocations: StringsToPeers(pins.Allocations), - Type: PinType(pins.Type), - ReplicationFactorMin: pins.ReplicationFactorMin, - ReplicationFactorMax: pins.ReplicationFactorMax, - Recursive: pins.Recursive, - Parents: StringsToCidSet(pins.Parents), - Clusterdag: cdag, - } -} - -// AddParams contains all of the configurable parameters needed to specify the -// importing process of a file being added to an ipfs-cluster -type AddParams struct { - Layout string - Chunker string - Raw bool - Hidden bool - Shard bool - Rmin int - Rmax int -} - -// DefaultAddParams returns the default AddParams value -func DefaultAddParams() AddParams { - return AddParams{ - Layout: "", // corresponds to balanced layout - Chunker: "", - Raw: false, - Hidden: false, - Shard: false, - Rmin: -1, - Rmax: -1, + Cid: c, + Allocations: StringsToPeers(pins.Allocations), + Type: PinType(pins.Type), + MaxDepth: pins.MaxDepth, + Parents: StringsToCidSet(pins.Parents), + ClusterDAG: cdag, + PinOptions: PinOptions{ + Name: pins.Name, + ReplicationFactorMin: pins.ReplicationFactorMin, + ReplicationFactorMax: pins.ReplicationFactorMax, + ShardSize: pins.ShardSize, + }, } } @@ -789,13 +836,12 @@ type AddedOutput struct { // NodeWithMeta specifies a block of data and a set of optional metadata fields // carrying information about the encoded ipld node type NodeWithMeta struct { - Data []byte - Cid string - Size uint64 - ID string - Format string - ReplMin int - ReplMax int + ShardingSession int + + Data []byte + Cid string + Size uint64 + Format string } // AllocateInfo transports the information necessary to call an allocator's diff --git a/cluster.go b/cluster.go index ae87373c..d63a964d 100644 --- a/cluster.go +++ b/cluster.go @@ -8,11 +8,12 @@ import ( "sync" "time" - add "github.com/ipfs/ipfs-cluster/add" + "github.com/ipfs/ipfs-cluster/adder" + "github.com/ipfs/ipfs-cluster/adder/local" + "github.com/ipfs/ipfs-cluster/adder/sharding" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/rpcutil" - "github.com/ipfs/ipfs-cluster/sharder" "github.com/ipfs/ipfs-cluster/state" rpc "github.com/hsanjuan/go-libp2p-gorpc" @@ -50,7 +51,6 @@ type Cluster struct { monitor PeerMonitor allocator PinAllocator informer Informer - sharder Sharder shutdownLock sync.Mutex shutdownB bool @@ -80,7 +80,7 @@ func NewCluster( monitor PeerMonitor, allocator PinAllocator, informer Informer, - sharder Sharder) (*Cluster, error) { +) (*Cluster, error) { err := cfg.Validate() if err != nil { return nil, err @@ -118,7 +118,6 @@ func NewCluster( monitor: monitor, allocator: allocator, informer: informer, - sharder: sharder, peerManager: peerManager, shutdownB: false, removed: false, @@ -161,7 +160,6 @@ func (c *Cluster) setupRPCClients() { c.monitor.SetClient(c.rpcClient) c.allocator.SetClient(c.rpcClient) c.informer.SetClient(c.rpcClient) - c.sharder.SetClient(c.rpcClient) } // syncWatcher loops and triggers StateSync and SyncAllLocal from time to time @@ -920,104 +918,10 @@ func (c *Cluster) Pin(pin api.Pin) error { return err } -// validate pin ensures that the metadata accompanying the cid is -// self-consistent. This amounts to verifying that the data structure matches -// the expected form of the pinType carried in the pin. -func (c *Cluster) validatePin(pin api.Pin, rplMin, rplMax int) error { - // In general validation requires access to the existing state. - // Multiple clusterdags may reference the same shard, sharder sessions - // update a shard pin's metadata and the same cid should not be - // tracked by different pin types - cState, err := c.consensus.State() - if err != nil && err != p2praft.ErrNoState { - return err - } - cPinExists := err != p2praft.ErrNoState && cState.Has(pin.Cid) - if cPinExists { - existing := cState.Get(pin.Cid) - if existing.Type != pin.Type { - return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed") - } - } - - switch pin.Type { - case api.DataType: - if pin.Clusterdag != nil || - (pin.Parents != nil && pin.Parents.Len() != 0) { - return errors.New("data pins should not reference other pins") - } - case api.ShardType: - if !pin.Recursive { - return errors.New("must pin shards recursively") - } - if pin.Clusterdag != nil { - return errors.New("shard pin should not reference cdag") - } - if !cPinExists { - return nil - } - - // State already tracks pin's CID - existing := cState.Get(pin.Cid) - // For now all repins of the same shard must use the same - // replmax and replmin. It is unclear what the best UX is here - // especially if the same Shard is referenced in multiple - // clusterdags. This simplistic policy avoids complexity and - // suits existing needs for shard pins. - if existing.ReplicationFactorMin != rplMin || - existing.ReplicationFactorMax != rplMax { - return errors.New("shard update with wrong repl factors") - } - case api.CdagType: - if pin.Recursive { - return errors.New("must pin roots directly") - } - if pin.Parents == nil || pin.Parents.Len() != 1 { - return errors.New("cdag nodes are referenced once") - } - case api.MetaType: - if len(pin.Allocations) != 0 { - return errors.New("meta pin should not specify allocations") - } - if pin.Clusterdag == nil { - return errors.New("roots must reference a dag") - } - - default: - return errors.New("unrecognized pin type") - } - return nil -} - -// updatePinParents modifies the api.Pin input to give it the correct parents -// so that previous additions to the pins parents are maintained after this -// pin is committed to consensus. If this pin carries new parents they are -// merged with those already existing for this CID -func (c *Cluster) updatePinParents(pin *api.Pin) error { - cState, err := c.consensus.State() - if err != nil && err != p2praft.ErrNoState { - return err - } - // first pin of this cid, nothing to update - if err == p2praft.ErrNoState || !cState.Has(pin.Cid) { - return nil - } - existing := cState.Get(pin.Cid) - // no existing parents this pin is up to date - if existing.Parents == nil || len(existing.Parents.Keys()) == 0 { - return nil - } - for _, c := range existing.Parents.Keys() { - pin.Parents.Add(c) - } - return nil -} - -// pin performs the actual pinning and supports a blacklist to be -// able to evacuate a node and returns whether the pin was submitted -// to the consensus layer or skipped (due to error or to the fact -// that it was already valid). -func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (bool, error) { +// setupPin ensures that the Pin object is fit for pinning. We check +// and set the replication factors and ensure that the pinType matches the +// metadata consistently. +func (c *Cluster) setupPin(pin *api.Pin) error { // Determine repl factors rplMin := pin.ReplicationFactorMin rplMax := pin.ReplicationFactorMax @@ -1030,14 +934,88 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) pin.ReplicationFactorMax = rplMax } - // Validate pin + if err := isReplicationFactorValid(rplMin, rplMax); err != nil { + return err + } + + // In general validation requires access to the existing state. + // Multiple clusterdags may reference the same shard, sharder sessions + // update a shard pin's metadata and the same cid should not be + // tracked by different pin types + cState, err := c.consensus.State() + if err != nil && err != p2praft.ErrNoState { + return err + } + cPinExists := err != p2praft.ErrNoState && cState.Has(pin.Cid) + existing := cState.Get(pin.Cid) + if cPinExists { + if existing.Type != pin.Type { + return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed") + } + + updatePinParents(pin, existing) + } + + switch pin.Type { + case api.DataType: + if pin.ClusterDAG != nil || + (pin.Parents != nil && pin.Parents.Len() != 0) { + return errors.New("data pins should not reference other pins") + } + case api.ShardType: + if pin.MaxDepth != 1 { + return errors.New("must pin shards go depth 1") + } + if pin.ClusterDAG != nil { + return errors.New("shard pin should not reference cdag") + } + if !cPinExists { + return nil + } + + // State already tracks pin's CID + // For now all repins of the same shard must use the same + // replmax and replmin. It is unclear what the best UX is here + // especially if the same Shard is referenced in multiple + // clusterdags. This simplistic policy avoids complexity and + // suits existing needs for shard pins. + // Safest idea: use the largest min and max + if existing.ReplicationFactorMin != rplMin || + existing.ReplicationFactorMax != rplMax { + return errors.New("shard update with wrong repl factors") + } + case api.ClusterDAGType: + if pin.MaxDepth != 0 { + return errors.New("must pin roots directly") + } + if pin.Parents == nil || pin.Parents.Len() != 1 { + return errors.New("cdag nodes are referenced once") + } + case api.MetaType: + if pin.Allocations != nil && len(pin.Allocations) != 0 { + return errors.New("meta pin should not specify allocations") + } + if pin.ClusterDAG == nil { + return errors.New("roots must reference a dag") + } + + default: + return errors.New("unrecognized pin type") + } + return nil +} + +// pin performs the actual pinning and supports a blacklist to be +// able to evacuate a node and returns whether the pin was submitted +// to the consensus layer or skipped (due to error or to the fact +// that it was already valid). +func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (bool, error) { if pin.Cid == nil { return false, errors.New("bad pin object") } - if err := isReplicationFactorValid(rplMin, rplMax); err != nil { - return false, err - } - err := c.validatePin(pin, rplMin, rplMax) + + // setup pin might produce some side-effects to our pin + err := c.setupPin(&pin) if err != nil { return false, err } @@ -1045,22 +1023,16 @@ func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) return true, c.consensus.LogPin(pin) } - // Ensure parents do not overwrite existing and merge non-intersecting - err = c.updatePinParents(&pin) + allocs, err := c.allocate( + pin.Cid, + pin.ReplicationFactorMin, + pin.ReplicationFactorMax, + blacklist, + prioritylist) if err != nil { return false, err } - - switch { - case rplMin == -1 && rplMax == -1: - pin.Allocations = []peer.ID{} - default: - allocs, err := c.allocate(pin.Cid, rplMin, rplMax, blacklist, prioritylist) - if err != nil { - return false, err - } - pin.Allocations = allocs - } + pin.Allocations = allocs if curr, _ := c.getCurrentPin(pin.Cid); curr.Equals(pin) { // skip pinning @@ -1108,7 +1080,7 @@ func (c *Cluster) Unpin(h *cid.Cid) error { return err } return c.consensus.LogUnpin(pin) - case api.CdagType: + case api.ClusterDAGType: err := "unpinning cluster dag root CID before unpinning parent" return errors.New(err) default: @@ -1121,29 +1093,29 @@ func (c *Cluster) Unpin(h *cid.Cid) error { // reference the same metadata node, only unpinning those nodes without // existing references func (c *Cluster) unpinClusterDag(metaPin api.Pin) error { - if metaPin.Clusterdag == nil { + if metaPin.ClusterDAG == nil { return errors.New("metaPin not linked to clusterdag") } - cdagBytes, err := c.ipfs.BlockGet(metaPin.Clusterdag) + cdagBytes, err := c.ipfs.BlockGet(metaPin.ClusterDAG) if err != nil { return err } - cdag, err := sharder.CborDataToNode(cdagBytes, "cbor") + cdag, err := sharding.CborDataToNode(cdagBytes, "cbor") if err != nil { return err } // traverse all shards of cdag for _, shardLink := range cdag.Links() { - err = c.unpinShard(metaPin.Clusterdag, shardLink.Cid) + err = c.unpinShard(metaPin.ClusterDAG, shardLink.Cid) if err != nil { return err } } // by invariant in Pin cdag has only one parent and can be unpinned - cdagWrap := api.PinCid(metaPin.Clusterdag) + cdagWrap := api.PinCid(metaPin.ClusterDAG) return c.consensus.LogUnpin(cdagWrap) } @@ -1178,9 +1150,14 @@ func (c *Cluster) unpinShard(cdagCid, shardCid *cid.Cid) error { // pipeline is used to DAGify the file. Depending on input parameters this // DAG can be added locally to the calling cluster peer's ipfs repo, or // sharded across the entire cluster. -func (c *Cluster) AddFile(reader *multipart.Reader, params api.AddParams) ([]api.AddedOutput, error) { - addSess := add.NewAddSession(c.rpcClient, logger) - return addSess.AddFile(c.ctx, reader, params) +func (c *Cluster) AddFile(reader *multipart.Reader, params *adder.Params) error { + var add adder.Adder + if params.Shard { + add = sharding.New(c.rpcClient) + } else { + add = local.New(c.rpcClient) + } + return add.FromMultipart(c.ctx, reader, params) } // Version returns the current IPFS Cluster version. diff --git a/importer/import.go b/importer/import.go deleted file mode 100644 index 20ee1755..00000000 --- a/importer/import.go +++ /dev/null @@ -1,81 +0,0 @@ -package importer - -import ( - "context" - "io" - - "github.com/ipfs/ipfs-cluster/api" - - "github.com/ipfs/go-ipfs-cmdkit/files" -) - -func shouldIgnore(err error) bool { - if err == errNotFound { - return true - } - return false -} - -// ToChannel imports file to ipfs ipld nodes, outputting nodes on the -// provided channel -func ToChannel(ctx context.Context, f files.File, hidden bool, - trickle bool, raw bool, chunker string) (<-chan *api.AddedOutput, - <-chan *api.NodeWithMeta, <-chan error) { - - printChan := make(chan *api.AddedOutput) - errChan := make(chan error) - outChan := make(chan *api.NodeWithMeta) - - dserv := &outDAGService{ - membership: make(map[string]struct{}), - outChan: outChan, - } - - fileAdder, err := NewAdder(ctx, nil, dserv) - if err != nil { - go func() { - errChan <- err - }() - return printChan, outChan, errChan - } - fileAdder.Hidden = hidden - fileAdder.Trickle = trickle - fileAdder.RawLeaves = raw - // Files added in one session are wrapped. This is because if files - // are sharded together then the share one logical clusterDAG root hash - fileAdder.Wrap = true - fileAdder.Chunker = chunker - fileAdder.Out = printChan - - go func() { - defer close(printChan) - defer close(outChan) - defer close(errChan) - // add all files under the root, as in ipfs - for { - file, err := f.NextFile() - - if err == io.EOF { - break - } else if err != nil { - errChan <- err - return - } - select { - case <-ctx.Done(): - return - default: - } - if err := fileAdder.AddFile(file); err != nil { - errChan <- err - return - } - } - - _, err := fileAdder.Finalize() - if err != nil && !shouldIgnore(err) { - errChan <- err - } - }() - return printChan, outChan, errChan -} diff --git a/importer/outgoing-dagservice.go b/importer/outgoing-dagservice.go deleted file mode 100644 index 8cf134e7..00000000 --- a/importer/outgoing-dagservice.go +++ /dev/null @@ -1,85 +0,0 @@ -package importer - -import ( - "context" - "errors" - "fmt" - - "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" -) - -var errUninit = errors.New("DAGService output channel uninitialized") - -// outDAGService will "add" a node by sending through the outChan -type outDAGService struct { - membership map[string]struct{} - outChan chan<- *api.NodeWithMeta -} - -// Get always returns errNotFound -func (ods *outDAGService) Get(ctx context.Context, key *cid.Cid) (ipld.Node, error) { - return nil, errNotFound -} - -// GetMany returns an output channel that always emits an error -func (ods *outDAGService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption { - out := make(chan *ipld.NodeOption, 1) - out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")} - close(out) - return out -} - -// Add passes the provided node through the output channel -func (ods *outDAGService) Add(ctx context.Context, node ipld.Node) error { - id := node.Cid().String() - _, ok := ods.membership[id] - if ok { // already added don't add again - return nil - } - ods.membership[id] = struct{}{} - - // Send node on output channel - if ods.outChan == nil { - return errUninit - } - size, err := node.Size() - if err != nil { - return err - } - nodeSerial := api.NodeWithMeta{ - Cid: id, - Data: node.RawData(), - Size: size, - } - select { - case ods.outChan <- &nodeSerial: - return nil - case <-ctx.Done(): - close(ods.outChan) - return errors.New("canceled context preempted dagservice add") - } -} - -// AddMany passes the provided nodes through the output channel -func (ods *outDAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { - for _, node := range nodes { - err := ods.Add(ctx, node) - if err != nil { - return err - } - } - return nil -} - -// Remove is a nop -func (ods *outDAGService) Remove(ctx context.Context, key *cid.Cid) error { - return nil -} - -// RemoveMany is a nop -func (ods *outDAGService) RemoveMany(ctx context.Context, keys []*cid.Cid) error { - return nil -} diff --git a/importer/printing-dagservice.go b/importer/printing-dagservice.go deleted file mode 100644 index 086c55f4..00000000 --- a/importer/printing-dagservice.go +++ /dev/null @@ -1,63 +0,0 @@ -package importer - -import ( - "context" - "errors" - "fmt" - - cid "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" -) - -var errNotFound = errors.New("dagservice: block not found") - -// pDAGService will "add" a node by printing it. pDAGService cannot Get nodes -// that have already been seen and calls to Remove are noops. Nodes are -// recorded after being added so that they will only be printed once -type pDAGService struct { - membership map[string]bool -} - -func (pds *pDAGService) Get(ctx context.Context, key *cid.Cid) (ipld.Node, error) { - return nil, errNotFound -} - -func (pds *pDAGService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption { - out := make(chan *ipld.NodeOption, len(keys)) - go func() { - out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")} - return - }() - return out -} - -func (pds *pDAGService) Add(ctx context.Context, node ipld.Node) error { - id := node.Cid().String() - _, ok := pds.membership[id] - if ok { // already added don't add again - return nil - } - pds.membership[id] = true - fmt.Printf("%s\n", node.String()) - return nil -} - -func (pds *pDAGService) AddMany(ctx context.Context, nodes []ipld.Node) error { - for _, node := range nodes { - err := pds.Add(ctx, node) - if err != nil { - return err - } - } - return nil -} - -// Remove is a nop -func (pds *pDAGService) Remove(ctx context.Context, key *cid.Cid) error { - return nil -} - -// RemoveMany is a nop -func (pds *pDAGService) RemoveMany(ctx context.Context, keys []*cid.Cid) error { - return nil -} diff --git a/ipfs-cluster-ctl/formatters.go b/ipfs-cluster-ctl/formatters.go index 69e59a10..1feafaf9 100644 --- a/ipfs-cluster-ctl/formatters.go +++ b/ipfs-cluster-ctl/formatters.go @@ -182,11 +182,15 @@ func textFormatPrintPin(obj *api.PinSerial) { sortAlloc) } var recStr string - if obj.Recursive { + switch obj.MaxDepth { + case 0: + recStr = "Direct" + case -1: recStr = "Recursive" - } else { - recStr = "Non-recursive" + default: + recStr = fmt.Sprintf("Recursive-%d", obj.MaxDepth) } + fmt.Printf("| %s | ", recStr) pinType := obj.ToPin().Type @@ -196,8 +200,8 @@ func textFormatPrintPin(obj *api.PinSerial) { case api.DataType: infoStr = typeStr case api.MetaType: - infoStr = fmt.Sprintf("%s-- clusterdag=%s",typeStr, obj.Clusterdag) - case api.CdagType, api.ShardType: + infoStr = fmt.Sprintf("%s-- clusterdag=%s", typeStr, obj.ClusterDAG) + case api.ClusterDAGType, api.ShardType: infoStr = typeStr default: infoStr = "" @@ -209,10 +213,10 @@ func textFormatPrintAddedOutput(obj *api.AddedOutput) { if obj.Hash != "" { if obj.Quiet { fmt.Printf("%s\n", obj.Hash) - } else{ + } else { fmt.Printf("adding %s %s\n", obj.Hash, obj.Name) } - } + } } func textFormatPrintError(obj *api.Error) { diff --git a/ipfs-cluster-service/configs.go b/ipfs-cluster-service/configs.go index c453a053..add3ed8b 100644 --- a/ipfs-cluster-service/configs.go +++ b/ipfs-cluster-service/configs.go @@ -15,7 +15,6 @@ import ( "github.com/ipfs/ipfs-cluster/monitor/basic" "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" - "github.com/ipfs/ipfs-cluster/sharder" ) type cfgs struct { @@ -28,7 +27,6 @@ type cfgs struct { pubsubmonCfg *pubsubmon.Config diskInfCfg *disk.Config numpinInfCfg *numpin.Config - sharderCfg *sharder.Config } func makeConfigs() (*config.Manager, *cfgs) { @@ -42,7 +40,6 @@ func makeConfigs() (*config.Manager, *cfgs) { pubsubmonCfg := &pubsubmon.Config{} diskInfCfg := &disk.Config{} numpinInfCfg := &numpin.Config{} - sharderCfg := &sharder.Config{} cfg.RegisterComponent(config.Cluster, clusterCfg) cfg.RegisterComponent(config.API, apiCfg) cfg.RegisterComponent(config.IPFSConn, ipfshttpCfg) @@ -52,8 +49,17 @@ func makeConfigs() (*config.Manager, *cfgs) { cfg.RegisterComponent(config.Monitor, pubsubmonCfg) cfg.RegisterComponent(config.Informer, diskInfCfg) cfg.RegisterComponent(config.Informer, numpinInfCfg) - cfg.RegisterComponent(config.Sharder, sharderCfg) - return cfg, &cfgs{clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, pubsubmonCfg, diskInfCfg, numpinInfCfg, sharderCfg} + return cfg, &cfgs{ + clusterCfg, + apiCfg, + ipfshttpCfg, + consensusCfg, + trackerCfg, + monCfg, + pubsubmonCfg, + diskInfCfg, + numpinInfCfg, + } } func saveConfig(cfg *config.Manager, force bool) { diff --git a/ipfs-cluster-service/daemon.go b/ipfs-cluster-service/daemon.go index e1c7fad6..38044124 100644 --- a/ipfs-cluster-service/daemon.go +++ b/ipfs-cluster-service/daemon.go @@ -23,7 +23,6 @@ import ( "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pstoremgr" - "github.com/ipfs/ipfs-cluster/sharder" "github.com/ipfs/ipfs-cluster/state/mapstate" ma "github.com/multiformats/go-multiaddr" @@ -132,9 +131,6 @@ func createCluster( ipfscluster.ReadyTimeout = cfgs.consensusCfg.WaitForLeaderTimeout + 5*time.Second - sharder, err := sharder.New(cfgs.sharderCfg) - checkErr("creating Sharder component", err) - return ipfscluster.NewCluster( host, cfgs.clusterCfg, @@ -146,7 +142,6 @@ func createCluster( mon, alloc, informer, - sharder, ) } diff --git a/ipfscluster.go b/ipfscluster.go index 49af7515..389ae4be 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -72,7 +72,7 @@ type API interface { type IPFSConnector interface { Component ID() (api.IPFSID, error) - Pin(context.Context, *cid.Cid, bool) error + Pin(context.Context, *cid.Cid, int) error Unpin(context.Context, *cid.Cid) error PinLsCid(context.Context, *cid.Cid) (api.IPFSPinStatus, error) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) @@ -179,12 +179,3 @@ type PeerMonitor interface { // to trigger self-healing measures or re-pinnings of content. Alerts() <-chan api.Alert } - -// Sharder controls the aggregation of ipfs file nodes into shards. File -// shards are grouped together and referenced by a cluster DAG node and -// distributed among the cluster -type Sharder interface { - Component - AddNode(size uint64, data []byte, c string, id string, min int, max int) (string, error) - Finalize(id string) error -} diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 0c5e15f1..d2de6180 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -298,7 +298,7 @@ func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Re ipfsErrorResponder(w, "Error: bad argument") return } - _, err := cid.Decode(arg) + c, err := cid.Decode(arg) if err != nil { ipfsErrorResponder(w, "Error parsing CID: "+err.Error()) return @@ -308,9 +308,7 @@ func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Re "", "Cluster", op, - api.PinSerial{ - Cid: arg, - }, + api.PinCid(c).ToSerial(), &struct{}{}, ) if err != nil { @@ -625,33 +623,45 @@ func (ipfs *Connector) ID() (api.IPFSID, error) { // Pin performs a pin request against the configured IPFS // daemon. -func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) error { +func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, maxDepth int) error { ctx, cancel := context.WithTimeout(ctx, ipfs.config.PinTimeout) defer cancel() pinStatus, err := ipfs.PinLsCid(ctx, hash) if err != nil { return err } - if !pinStatus.IsPinned() { - switch ipfs.config.PinMethod { - case "refs": - path := fmt.Sprintf("refs?arg=%s&recursive=%t", hash, recursive) - err := ipfs.postDiscardBodyCtx(ctx, path) - if err != nil { - return err - } - logger.Debugf("Refs for %s sucessfully fetched", hash) - } - path := fmt.Sprintf("pin/add?arg=%s&recursive=%t", hash, recursive) - _, err = ipfs.postCtx(ctx, path, "", nil) - if err == nil { - logger.Info("IPFS Pin request succeeded: ", hash) - } - return err + if pinStatus.IsPinned(maxDepth) { + logger.Debug("IPFS object is already pinned: ", hash) + return nil } - logger.Debug("IPFS object is already pinned: ", hash) - return nil + + var pinArgs string + switch { + case maxDepth < 0: + pinArgs = "recursive=true" + case maxDepth == 0: + pinArgs = "recursive=false" + default: + pinArgs = fmt.Sprintf("recursive=true&max-depth=%d", maxDepth) + } + + switch ipfs.config.PinMethod { + case "refs": // do refs -r first + path := fmt.Sprintf("refs?arg=%s&%s", hash, pinArgs) + err := ipfs.postDiscardBodyCtx(ctx, path) + if err != nil { + return err + } + logger.Debugf("Refs for %s sucessfully fetched", hash) + } + + path := fmt.Sprintf("pin/add?arg=%s&%s", hash, pinArgs) + _, err = ipfs.postCtx(ctx, path, "", nil) + if err == nil { + logger.Info("IPFS Pin request succeeded: ", hash) + } + return err } // Unpin performs an unpin request against the configured IPFS @@ -663,7 +673,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error { if err != nil { return err } - if pinStatus.IsPinned() { + if pinStatus.IsPinned(-1) { path := fmt.Sprintf("pin/rm?arg=%s", hash) _, err := ipfs.postCtx(ctx, path, "", nil) if err == nil { diff --git a/logging.go b/logging.go index 1f313b1e..4210abbc 100644 --- a/logging.go +++ b/logging.go @@ -18,7 +18,8 @@ var LoggingFacilities = map[string]string{ "diskinfo": "INFO", "apitypes": "INFO", "config": "INFO", - "sharder": "INFO", + "addshard": "INFO", + "addlocal": "INFO", } // LoggingFacilitiesExtra provides logging identifiers diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go index baaa46e5..edde8d2c 100644 --- a/pintracker/maptracker/maptracker.go +++ b/pintracker/maptracker/maptracker.go @@ -181,6 +181,20 @@ func (mpt *MapPinTracker) enqueue(c api.Pin, typ optracker.OperationType, ch cha func (mpt *MapPinTracker) Track(c api.Pin) error { logger.Debugf("tracking %s", c.Cid) + // TODO: Fix this for sharding + // FIXME: Fix this for sharding + // The problem is remote/unpin operation won't be cancelled + // but I don't know how bad is that + // Also, this is dup code + + // Sharded pins are never pinned. A sharded pin cannot turn into + // something else or viceversa like it happens with Remote pins. + // Thus we just mark as sharded + if c.Type == api.MetaType { + mpt.optracker.TrackNewOperation(c, optracker.OperationSharded, optracker.PhaseDone) + return nil + } + // Trigger unpin whenever something remote is tracked // Note, IPFSConn checks with pin/ls before triggering // pin/rm. @@ -199,27 +213,6 @@ func (mpt *MapPinTracker) Track(c api.Pin) error { return nil } - // TODO: Fix this for sharding - // FIXME: Fix this for sharding - // The problem is remote/unpin operation won't be cancelled - // but I don't know how bad is that - // Also, this is dup code - if c.Type == api.ShardType { - // cancel any other ops - op := mpt.optracker.TrackNewOperation(c, optracker.OperationSharded, optracker.PhaseInProgress) - if op == nil { - return nil - } - err := mpt.unpin(op) - op.Cancel() - if err != nil { - op.SetError(err) - } else { - op.SetPhase(optracker.PhaseDone) - } - return nil - } - return mpt.enqueue(c, optracker.OperationPin, mpt.pinCh) } @@ -322,7 +315,8 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI status = api.TrackerStatusUnpinned } - if ips.IsPinned() { + if ips.IsPinned(-1) { // FIXME FIXME FIXME: how much do we want to check + // that something is pinned as EXPECTED (with right max depth). switch status { case api.TrackerStatusPinError: // If an item that we wanted to pin is pinned, we mark it so diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index d3144a11..bef8159c 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -25,8 +25,8 @@ const ( OperationUnpin // OperationRemote represents an noop operation OperationRemote - // OperationSharded represents a pin which points to shard - // FIXME + // OperationSharded represents a meta pin. We don't + // pin these. OperationSharded ) @@ -192,6 +192,8 @@ func (op *Operation) ToTrackerStatus() api.TrackerStatus { } case OperationRemote: return api.TrackerStatusRemote + case OperationSharded: + return api.TrackerStatusSharded default: return api.TrackerStatusBug } diff --git a/rpc_api.go b/rpc_api.go index 2fd6b680..765aded3 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -192,6 +192,28 @@ func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in api.PinSerial, out *a return err } +// Allocate returns allocations for the given Pin. +func (rpcapi *RPCAPI) Allocate(ctx context.Context, in api.PinSerial, out *[]string) error { + pin := in.ToPin() + err := rpcapi.c.setupPin(&pin) + if err != nil { + return err + } + allocs, err := rpcapi.c.allocate( + pin.Cid, + pin.ReplicationFactorMin, + pin.ReplicationFactorMax, + []peer.ID{}, // blacklist + []peer.ID{}, // prio list + ) + if err != nil { + return err + } + + *out = api.PeersToStrings(allocs) + return nil +} + /* Tracker component methods */ @@ -221,7 +243,7 @@ func (rpcapi *RPCAPI) TrackerStatus(ctx context.Context, in api.PinSerial, out * return nil } -// TrackerRecoverAll runs PinTracker.RecoverAll(). +// TrackerRecoverAll runs PinTracker.RecoverAll().f func (rpcapi *RPCAPI) TrackerRecoverAll(ctx context.Context, in struct{}, out *[]api.PinInfoSerial) error { pinfos, err := rpcapi.c.tracker.RecoverAll() *out = pinInfoSliceToSerial(pinfos) @@ -243,8 +265,8 @@ func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in api.PinSerial, out // IPFSPin runs IPFSConnector.Pin(). func (rpcapi *RPCAPI) IPFSPin(ctx context.Context, in api.PinSerial, out *struct{}) error { c := in.ToPin().Cid - r := in.ToPin().Recursive - return rpcapi.c.ipfs.Pin(ctx, c, r) + depth := in.ToPin().MaxDepth + return rpcapi.c.ipfs.Pin(ctx, c, depth) } // IPFSUnpin runs IPFSConnector.Unpin(). @@ -348,29 +370,6 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe return err } -/* - Sharder methods -*/ - -// SharderAddNode runs Sharder.AddNode(node). -func (rpcapi *RPCAPI) SharderAddNode(ctx context.Context, in api.NodeWithMeta, out *string) error { - shardID, err := rpcapi.c.sharder.AddNode( - in.Size, - in.Data, - in.Cid, - in.ID, - in.ReplMin, - in.ReplMax, - ) - *out = shardID - return err -} - -// SharderFinalize runs Sharder.Finalize(). -func (rpcapi *RPCAPI) SharderFinalize(ctx context.Context, in string, out *struct{}) error { - return rpcapi.c.sharder.Finalize(in) -} - /* Peer Manager methods */ diff --git a/sharder/config.go b/sharder/config.go deleted file mode 100644 index cb0540be..00000000 --- a/sharder/config.go +++ /dev/null @@ -1,73 +0,0 @@ -package sharder - -import ( - "encoding/json" - "errors" - - "github.com/ipfs/ipfs-cluster/config" -) - -const configKey = "sharder" - -// Default values for Config. -const ( - DefaultAllocSize = 5000000 // 5 MB, approx 20 standard ipfs chunks - IPFSChunkSize = 262158 -) - -// Config allows to initialize a Sharder and customize some parameters -type Config struct { - config.Saver - - // Bytes allocated in one sharding allocation round - AllocSize uint64 -} - -type jsonConfig struct { - AllocSize uint64 `json:"alloc_size"` -} - -// ConfigKey provides a human-friendly identifer for this type of Config. -func (cfg *Config) ConfigKey() string { - return configKey -} - -// Default sets the fields of this Config to sensible default values. -func (cfg *Config) Default() error { - cfg.AllocSize = DefaultAllocSize - return nil -} - -// Validate checks that the fields of this Config have working values -func (cfg *Config) Validate() error { - if cfg.AllocSize <= IPFSChunkSize { - return errors.New("sharder.alloc_size lower than single file chunk") - } - return nil -} - -// LoadJSON sets the fields of this Config to the values defined by the JSON -// representation. -func (cfg *Config) LoadJSON(raw []byte) error { - jcfg := &jsonConfig{} - err := json.Unmarshal(raw, jcfg) - if err != nil { - logger.Error("Error unmarshaling sharder config") - return err - } - - cfg.Default() - - config.SetIfNotDefault(jcfg.AllocSize, &cfg.AllocSize) - - return cfg.Validate() -} - -// ToJSON generates a human-friendly JSON representation of this Config. -func (cfg *Config) ToJSON() ([]byte, error) { - jcfg := &jsonConfig{} - - jcfg.AllocSize = cfg.AllocSize - - return config.DefaultJSONMarshal(jcfg) -} diff --git a/sharder/config_test.go b/sharder/config_test.go deleted file mode 100644 index 44146177..00000000 --- a/sharder/config_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package sharder - -import ( - "encoding/json" - "testing" -) - -var cfgJSON = []byte(` -{ - "alloc_size": 6000000 -} -`) - -func TestLoadJSON(t *testing.T) { - cfg := &Config{} - err := cfg.LoadJSON(cfgJSON) - if err != nil { - t.Fatal(err) - } - - j := &jsonConfig{} - json.Unmarshal(cfgJSON, j) - j.AllocSize = 0 - tst, _ := json.Marshal(j) - err = cfg.LoadJSON(tst) - if err != nil { - t.Error("did not expect an error") - } - if cfg.AllocSize != DefaultAllocSize { - t.Error("expected default alloc_size") - } -} - -func TestToJSON(t *testing.T) { - cfg := &Config{} - cfg.LoadJSON(cfgJSON) - newjson, err := cfg.ToJSON() - if err != nil { - t.Fatal(err) - } - cfg = &Config{} - err = cfg.LoadJSON(newjson) - if err != nil { - t.Fatal(err) - } -} - -func TestDefault(t *testing.T) { - cfg := &Config{} - cfg.Default() - if cfg.Validate() != nil { - t.Fatal("error validating") - } - - cfg.AllocSize = 100 - if cfg.Validate() == nil { - t.Fatal("expecting error validating") - } -} diff --git a/sharder/sharder.go b/sharder/sharder.go deleted file mode 100644 index da2064e4..00000000 --- a/sharder/sharder.go +++ /dev/null @@ -1,394 +0,0 @@ -package sharder - -import ( - "errors" - "fmt" - - "github.com/ipfs/ipfs-cluster/api" - - rpc "github.com/hsanjuan/go-libp2p-gorpc" - cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-peer" - - uuid "github.com/satori/go.uuid" -) - -var logger = logging.Logger("sharder") - -type sessionState struct { - // State of current shard - assignedPeer peer.ID - currentShard shardObj - byteCount uint64 - byteThreshold uint64 - - // Global session state - shardNodes shardObj - replMin int - replMax int - dataRoot *cid.Cid -} - -// Sharder aggregates incident ipfs file dag nodes into a shard, or group of -// nodes. The Sharder builds a reference node in the ipfs-cluster DAG to -// reference the nodes in a shard. This component distributes shards among -// cluster peers based on the decisions of the cluster allocator -type Sharder struct { - rpcClient *rpc.Client - idToSession map[string]*sessionState - allocSize uint64 - currentID string -} - -// currentShard: make(map[string]*cid.Cid), - -// New returns a new sharder for use by an ipfs-cluster. In the future -// this may take in a shard-config -func New(cfg *Config) (*Sharder, error) { - logger.Debugf("The alloc size provided: %d", cfg.AllocSize) - return &Sharder{ - allocSize: cfg.AllocSize, - idToSession: make(map[string]*sessionState), - }, nil -} - -// SetClient registers the rpcClient used by the Sharder to communicate with -// other components -func (s *Sharder) SetClient(c *rpc.Client) { - s.rpcClient = c -} - -// Shutdown shuts down the sharding component -func (s *Sharder) Shutdown() error { - return nil -} - -// Temporary storage of links to be serialized to ipld cbor once allocation is -// complete -type shardObj map[string]*cid.Cid - -// getAssignment returns the pid of a cluster peer that will allocate space -// for a new shard, along with a byte threshold determining the max size of -// this shard. -func (s *Sharder) getAssignment() (peer.ID, uint64, error) { - // RPC to get metrics - var metrics []api.Metric - err := s.rpcClient.Call("", - "Cluster", - "GetInformerMetrics", - struct{}{}, - &metrics) - if err != nil { - return peer.ID(""), 0, err - } - candidates := make(map[peer.ID]api.Metric) - for _, m := range metrics { - switch { - case m.Discard(): - // discard invalid metrics - continue - default: - if m.Name != "freespace" { - logger.Warningf("Metric type not freespace but %s", m.Name) - } - candidates[m.Peer] = m - } - } - - // RPC to get Allocations based on this - allocInfo := api.AllocateInfo{ - Cid: "", - Current: nil, - Candidates: candidates, - Priority: nil, - } - - allocs := make([]peer.ID, 0) - err = s.rpcClient.Call("", - "Cluster", - "Allocate", - allocInfo, - &allocs) - if err != nil { - return peer.ID(""), 0, err - } - if len(allocs) == 0 { - return peer.ID(""), 0, errors.New("allocation failed") - } - - return allocs[0], s.allocSize, nil -} - -// initShard gets a new allocation and updates sharder state accordingly -func (s *Sharder) initShard() error { - var err error - session, ok := s.idToSession[s.currentID] - if !ok { - return errors.New("session ID not set on entry call") - } - session.assignedPeer, session.byteThreshold, err = s.getAssignment() - if err != nil { - return err - } - session.byteCount = 0 - session.currentShard = make(map[string]*cid.Cid) - return nil -} - -// AddNode includes the provided node into a shard in the cluster DAG -// that tracks this node's graph -func (s *Sharder) AddNode( - size uint64, - data []byte, - cidserial string, - id string, - replMin, replMax int) (string, error) { - if id == "" { - u, err := uuid.NewV4() - if err != nil { - return id, err - } - id = u.String() - } - s.currentID = id - c, err := cid.Decode(cidserial) - if err != nil { - return id, err - } - blockSize := uint64(len(data)) - - logger.Debug("adding node to shard") - // Sharding session for this file is uninit - session, ok := s.idToSession[id] - if !ok { - logger.Debugf("Initializing sharding session for id: %s", id) - s.idToSession[id] = &sessionState{ - shardNodes: make(map[string]*cid.Cid), - replMin: replMin, - replMax: replMax, - } - logger.Debug("Initializing first shard") - if err := s.initShard(); err != nil { - logger.Debug("Error initializing shard") - delete(s.idToSession, id) // never map to uninit session - return id, err - } - session = s.idToSession[id] - } else { // Data exceeds shard threshold, flush and start a new shard - if session.byteCount+blockSize > session.byteThreshold { - logger.Debug("shard at capacity, pin cluster DAG node") - if err := s.flush(); err != nil { - return id, err - } - if err := s.initShard(); err != nil { - return id, err - } - } - } - - // Shard is initialized and can accommodate node by config-enforced - // invariant that shard size is always greater than the ipfs block - // max chunk size - logger.Debugf("Adding size: %d to byteCount at: %d", size, session.byteCount) - session.byteCount += size - - key := fmt.Sprintf("%d", len(session.currentShard)) - session.currentShard[key] = c - format, ok := cid.CodecToStr[c.Type()] - if !ok { - format = "" - logger.Warning("unsupported cid type, treating as v0") - } - if c.Prefix().Version == 0 { - format = "v0" - } - b := api.NodeWithMeta{ - Data: data, - Format: format, - } - return id, s.rpcClient.Call( - session.assignedPeer, - "Cluster", - "IPFSBlockPut", - b, - &struct{}{}, - ) -} - -// Finalize completes a sharding session. It flushes the final shard to the -// cluster, stores the root of the cluster DAG referencing all shard node roots -// and frees session state -func (s *Sharder) Finalize(id string) error { - s.currentID = id - session, ok := s.idToSession[id] - if !ok { - return errors.New("cannot finalize untracked id") - } - // call flush - if len(session.currentShard) > 0 { - if err := s.flush(); err != nil { - return err - } - } - if session.dataRoot == nil { - return errors.New("finalize called before adding any data") - } - - // construct cluster DAG root - shardRootNodes, err := makeDAG(session.shardNodes) - if err != nil { - return err - } - - for _, shardRoot := range shardRootNodes { - // block put the cluster DAG root nodes in local node - b := api.NodeWithMeta{ - Data: shardRoot.RawData(), - Format: "cbor", - } - logger.Debugf("The serialized shard root cid: %s", shardRoot.Cid().String()) - err = s.rpcClient.Call("", "Cluster", "IPFSBlockPut", b, &struct{}{}) - if err != nil { - return err - } - } - - // Link dataDAG hash to clusterDAG root hash - cdagCid := shardRootNodes[0].Cid() - metaPinS := api.Pin{ - Cid: session.dataRoot, - ReplicationFactorMin: session.replMin, - ReplicationFactorMax: session.replMax, - Type: api.MetaType, - Clusterdag: cdagCid, - }.ToSerial() - err = s.rpcClient.Call( - "", - "Cluster", - "Pin", - metaPinS, - &struct{}{}, - ) - if err != nil { - return err - } - - // Pin root node of rootDAG everywhere non recursively - cdagParents := cid.NewSet() - cdagParents.Add(session.dataRoot) - cdagPinS := api.Pin{ - Cid: cdagCid, - ReplicationFactorMin: -1, - ReplicationFactorMax: -1, - Type: api.CdagType, - Recursive: false, - Parents: cdagParents, - }.ToSerial() - err = s.rpcClient.Call( - "", - "Cluster", - "Pin", - cdagPinS, - &struct{}{}, - ) - if err != nil { - return err - } - - // Ammend ShardPins to reference clusterDAG root hash as a Parent - shardParents := cid.NewSet() - shardParents.Add(cdagCid) - for _, c := range session.shardNodes { - pinS := api.Pin{ - Cid: c, - ReplicationFactorMin: session.replMin, - ReplicationFactorMax: session.replMax, - Type: api.ShardType, - Recursive: true, - Parents: shardParents, - }.ToSerial() - - err = s.rpcClient.Call( - "", - "Cluster", - "Pin", - pinS, - &struct{}{}, - ) - if err != nil { - return err - } - } - - // clear session state from sharder component - delete(s.idToSession, s.currentID) - return nil -} - -// flush completes the allocation of the current shard of a session by adding the -// clusterDAG shard node block to IPFS. It must only be called by an entrypoint -// that has already set the currentID -func (s *Sharder) flush() error { - // Serialize shard node and reset state - session, ok := s.idToSession[s.currentID] - if !ok { - return errors.New("session ID not set on entry call") - } - logger.Debugf("flushing the current shard %v", session.currentShard) - shardNodes, err := makeDAG(session.currentShard) - if err != nil { - return err - } - // Track latest data hash - key := fmt.Sprintf("%d", len(session.currentShard)-1) - session.dataRoot = session.currentShard[key] - - targetPeer := session.assignedPeer - session.currentShard = make(map[string]*cid.Cid) - session.assignedPeer = peer.ID("") - session.byteThreshold = 0 - session.byteCount = 0 - - for _, shardNode := range shardNodes { - logger.Debugf("The dag cbor Node Links: %v", shardNode.Links()) - b := api.NodeWithMeta{ - Data: shardNode.RawData(), - Format: "cbor", - } - logger.Debugf("Here is the serialized ipld: %x", b.Data) - err = s.rpcClient.Call( - targetPeer, - "Cluster", - "IPFSBlockPut", - b, - &struct{}{}, - ) - if err != nil { - return err - } - } - - // Track shardNodeDAG root within clusterDAG - key = fmt.Sprintf("%d", len(session.shardNodes)) - c := shardNodes[0].Cid() - session.shardNodes[key] = c - - // Track shard node as a shard pin within cluster state - pinS := api.Pin{ - Cid: c, - Allocations: []peer.ID{targetPeer}, - ReplicationFactorMin: session.replMin, - ReplicationFactorMax: session.replMax, - Type: api.ShardType, - Recursive: true, - }.ToSerial() - - return s.rpcClient.Call( - targetPeer, - "Cluster", - "Pin", - pinS, - &struct{}{}, - ) -} diff --git a/sharder/sharder_test.go b/sharder/sharder_test.go deleted file mode 100644 index 239f5cbf..00000000 --- a/sharder/sharder_test.go +++ /dev/null @@ -1,481 +0,0 @@ -package sharder - -import ( - "bytes" - "context" - "encoding/binary" - "testing" - - rpc "github.com/hsanjuan/go-libp2p-gorpc" - blocks "github.com/ipfs/go-block-format" - cid "github.com/ipfs/go-cid" - dag "github.com/ipfs/go-ipfs/merkledag" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/ipfs-cluster/api" - crypto "github.com/libp2p/go-libp2p-crypto" - host "github.com/libp2p/go-libp2p-host" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - swarm "github.com/libp2p/go-libp2p-swarm" - basichost "github.com/libp2p/go-libp2p/p2p/host/basic" - ma "github.com/multiformats/go-multiaddr" -) - -var nodeDataSet1 = [][]byte{[]byte(`Dag Node 1`), []byte(`Dag Node 2`), []byte(`Dag Node 3`)} -var nodeDataSet2 = [][]byte{[]byte(`Dag Node A`), []byte(`Dag Node B`), []byte(`Dag Node C`)} - -// mockRPC simulates the sharder's connection with the rest of the cluster. -// It keeps track of an ordered list of ipfs block puts for use by tests -// that verify a sequence of dag-nodes is correctly added with the right -// metadata. -type mockRPC struct { - orderedPuts map[int]api.NodeWithMeta - Host host.Host -} - -// NewMockRPCClient creates a mock ipfs-cluster RPC server and returns a client -// to it. A testing host is created so that the server can be called by the -// pid returned in Allocate -func NewMockRPCClient(t *testing.T, mock *mockRPC) *rpc.Client { - h := makeTestingHost() - return NewMockRPCClientWithHost(t, h, mock) -} - -// NewMockRPCClientWithHost returns a mock ipfs-cluster RPC client initialized -// with a given host. -func NewMockRPCClientWithHost(t *testing.T, h host.Host, mock *mockRPC) *rpc.Client { - s := rpc.NewServer(h, "sharder-mock") - c := rpc.NewClientWithServer(h, "sharder-mock", s) - mock.Host = h - err := s.RegisterName("Cluster", mock) - if err != nil { - t.Fatal(err) - } - return c -} - -func makeTestingHost() host.Host { - priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048) - pid, _ := peer.IDFromPublicKey(pub) - maddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - ps := peerstore.NewPeerstore() - ps.AddPubKey(pid, pub) - ps.AddPrivKey(pid, priv) - ps.AddAddr(pid, maddr, peerstore.PermanentAddrTTL) - mockNetwork, _ := swarm.NewNetwork(context.Background(), - []ma.Multiaddr{maddr}, - pid, - ps, - nil, - ) - - return basichost.New(mockNetwork) -} - -// GetInformerMetrics does nothing as mock allocator does not check metrics -func (mock *mockRPC) GetInformerMetrics(ctx context.Context, in struct{}, out *[]api.Metric) error { - return nil -} - -// All pins get allocated to the mockRPC's server host -func (mock *mockRPC) Allocate(ctx context.Context, in api.AllocateInfo, out *[]peer.ID) error { - *out = []peer.ID{mock.Host.ID()} - return nil -} - -// Record the ordered sequence of BlockPut calls for later validation -func (mock *mockRPC) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error { - mock.orderedPuts[len(mock.orderedPuts)] = in - return nil -} - -// Tests don't currently check Pin calls. For now this is a NOP. -// TODO: once the sharder Pinning is stabalized (support for pinning to -// specific peers and non-recursive pinning through RPC) we should validate -// pinning calls alongside block put calls -func (mock *mockRPC) Pin(ctx context.Context, in api.PinSerial, out *struct{}) error { - return nil -} - -// Check that allocations go to the correct server. This will require setting -// up more than one server and doing some libp2p trickery. TODO after single -// host tests are complete - -// Create a new sharder and register a mock RPC for testing -func testNewSharder(t *testing.T) (*Sharder, *mockRPC) { - mockRPC := &mockRPC{} - mockRPC.orderedPuts = make(map[int]api.NodeWithMeta) - client := NewMockRPCClient(t, mockRPC) - cfg := &Config{ - AllocSize: DefaultAllocSize, - } - sharder, err := NewSharder(cfg) - if err != nil { - t.Fatal(err) - } - sharder.SetClient(client) - return sharder, mockRPC -} - -// Simply test that 3 input nodes are added and that the shard node -// and clusterDAG root take the correct form -func TestAddAndFinalizeShard(t *testing.T) { - sharder, mockRPC := testNewSharder(t) - // Create 3 ipld protobuf nodes and add to sharding session - nodes := make([]*dag.ProtoNode, 3) - cids := make([]string, 3) - sessionID := "" - for i, data := range nodeDataSet1 { - nodes[i] = dag.NodeWithData(data) - nodes[i].SetPrefix(nil) - cids[i] = nodes[i].Cid().String() - logger.Debugf("Cid of node%d: %s", i, cids[i]) - size, err := nodes[i].Size() - if err != nil { - t.Fatal(err) - } - sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID, 1, 1) - if err != nil { - t.Fatal(err) - } - } - err := sharder.Finalize(sessionID) - if err != nil { - t.Fatal(err) - } - - if len(mockRPC.orderedPuts) != len(nodes)+2 { //data nodes, 1 shard node 1 shard root - t.Errorf("unexpected number of block puts called: %d", len(mockRPC.orderedPuts)) - } - // Verify correct node data sent to ipfs - verifyNodePuts(t, nodeDataSet1, cids, mockRPC.orderedPuts, []int{0, 1, 2}) - - shardNode := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)]) - // Traverse shard node to verify all expected links are there - links := shardNode.Links() - for _, c := range cids { - if !linksContain(links, c) { - t.Errorf("expected cid %s not in shard node", c) - } - } - - rootNode := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+1]) - // Verify that clusterDAG root points to shard node - links = rootNode.Links() - if len(links) != 1 { - t.Fatalf("Expected 1 link in root got %d", len(links)) - } - if links[0].Cid.String() != shardNode.Cid().String() { - t.Errorf("clusterDAG expected to link to %s, instead links to %s", - shardNode.Cid().String(), links[0].Cid.String()) - } -} - -// helper function determining whether a cid is referenced in a slice of links -func linksContain(links []*ipld.Link, c string) bool { - for _, link := range links { - if link.Cid.String() == c { - return true - } - } - return false -} - -// verifyNodePuts takes in a slice of byte slices containing the underlying data -// of added nodes, an ordered slice of the cids of these nodes, a map between -// IPFSBlockPut call order and arguments, and a slice determining which -// IPFSBlockPut calls to verify. -func verifyNodePuts(t *testing.T, - dataSet [][]byte, - cids []string, - orderedPuts map[int]api.NodeWithMeta, - toVerify []int) { - if len(cids) != len(toVerify) || len(dataSet) != len(toVerify) { - t.Error("Malformed verifyNodePuts arguments") - return - } - for j, i := range toVerify { - if orderedPuts[i].Format != "v0" { - t.Errorf("Expecting blocks in v0 format, instead: %s", orderedPuts[i].Format) - continue - } - data := orderedPuts[i].Data - c, err := cid.Decode(cids[j]) - if err != nil { - t.Error(err) - continue - } - blk, err := blocks.NewBlockWithCid(data, c) - if err != nil { - t.Error(err) - continue - } - dataNode, err := ipld.Decode(blk) - if err != nil { - t.Error(err) - continue - } - if bytes.Equal(dataNode.RawData(), dataSet[j]) { - t.Error(err) - } - } -} - -func cborDataToNode(t *testing.T, putInfo api.NodeWithMeta) ipld.Node { - shardNode, err := CborDataToNode(putInfo.Data, putInfo.Format) - if err != nil { - t.Fatal(err) - } - return shardNode -} - -// Interleave two shard sessions to test isolation between concurrent file -// shards. -func TestInterleaveSessions(t *testing.T) { - // Make sharder and add data - sharder, mockRPC := testNewSharder(t) - nodes1 := make([]*dag.ProtoNode, 3) - cids1 := make([]string, 3) - nodes2 := make([]*dag.ProtoNode, 3) - cids2 := make([]string, 3) - - sessionID1 := "" - sessionID2 := "" - for i := 0; i < 6; i++ { - j := i / 2 - if i%2 == 0 { // session 1 - nodes1[j] = dag.NodeWithData(nodeDataSet1[j]) - nodes1[j].SetPrefix(nil) - cids1[j] = nodes1[j].Cid().String() - size, err := nodes1[j].Size() - if err != nil { - t.Fatal(err) - } - sessionID1, err = sharder.AddNode(size, nodes1[j].RawData(), cids1[j], sessionID1, 1, 1) - if err != nil { - t.Fatal(err) - } - } else { // session 2 - nodes2[j] = dag.NodeWithData(nodeDataSet2[j]) - nodes2[j].SetPrefix(nil) - cids2[j] = nodes2[j].Cid().String() - size, err := nodes2[j].Size() - if err != nil { - t.Fatal(err) - } - sessionID2, err = sharder.AddNode(size, nodes2[j].RawData(), cids2[j], sessionID2, 1, 1) - if err != nil { - t.Fatal(err) - } - } - } - err := sharder.Finalize(sessionID1) - if err != nil { - t.Fatal(err) - } - err = sharder.Finalize(sessionID2) - if err != nil { - t.Fatal(err) - } - - if len(mockRPC.orderedPuts) != len(nodes1)+len(nodes2)+4 { - t.Errorf("Unexpected number of block puts called: %d", len(mockRPC.orderedPuts)) - } - verifyNodePuts(t, nodeDataSet1, cids1, mockRPC.orderedPuts, []int{0, 2, 4}) - verifyNodePuts(t, nodeDataSet2, cids2, mockRPC.orderedPuts, []int{1, 3, 5}) - - // verify clusterDAG for session 1 - shardNode1 := cborDataToNode(t, mockRPC.orderedPuts[6]) - links1 := shardNode1.Links() - for _, c := range cids1 { - if !linksContain(links1, c) { - t.Errorf("expected cid %s not in links of shard node of session 1", c) - } - } - - rootNode1 := cborDataToNode(t, mockRPC.orderedPuts[7]) - links := rootNode1.Links() - if len(links) != 1 { - t.Errorf("Expected 1 link in root got %d", len(links)) - } - if links[0].Cid.String() != shardNode1.Cid().String() { - t.Errorf("clusterDAG expected to link to %s, instead links to %s", - shardNode1.Cid().String(), links[0].Cid.String()) - } - - // verify clusterDAG for session 2 - shardNode2 := cborDataToNode(t, mockRPC.orderedPuts[8]) - // Traverse shard node to verify all expected links are there - links2 := shardNode2.Links() - for _, c := range cids2 { - if !linksContain(links2, c) { - t.Errorf("expected cid %s not in links of shard node of session 2", c) - } - } - - rootNode2 := cborDataToNode(t, mockRPC.orderedPuts[9]) - links = rootNode2.Links() - if len(links) != 1 { - t.Errorf("Expected 1 link in root got %d", len(links)) - } - if links[0].Cid.String() != shardNode2.Cid().String() { - t.Errorf("clusterDAG expected to link to %s, instead links to %s", - shardNode2.Cid().String(), links[0].Cid.String()) - } -} - -func getManyLinksDataSet(t *testing.T) [][]byte { - numberData := 2*MaxLinks + MaxLinks/2 - dataSet := make([][]byte, numberData) - for i := range dataSet { - buf := new(bytes.Buffer) - err := binary.Write(buf, binary.LittleEndian, uint16(i)) - if err != nil { - t.Fatal(err) - } - dataSet[i] = buf.Bytes() - } - return dataSet -} - -// Test many tiny dag nodes so that a shard node is too big to fit all links -// and itself must be broken down into a tree of shard nodes. -func TestManyLinks(t *testing.T) { - sharder, mockRPC := testNewSharder(t) - dataSet := getManyLinksDataSet(t) - nodes := make([]*dag.ProtoNode, len(dataSet)) - cids := make([]string, len(dataSet)) - sessionID := "" - - for i, data := range dataSet { - nodes[i] = dag.NodeWithData(data) - nodes[i].SetPrefix(nil) - cids[i] = nodes[i].Cid().String() - size, err := nodes[i].Size() - if err != nil { - t.Fatal(err) - } - sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID, 1, 1) - if err != nil { - t.Fatal(err) - } - - } - err := sharder.Finalize(sessionID) - if err != nil { - t.Fatal(err) - } - index := make([]int, len(nodes)) - for i := range index { - index[i] = i - } - // data nodes, 3 shard dag leaves, 1 shard dag root, 1 root dag node - if len(mockRPC.orderedPuts) != len(nodes)+5 { - t.Errorf("unexpected number of block puts called: %d", len(mockRPC.orderedPuts)) - } - verifyNodePuts(t, dataSet, cids, mockRPC.orderedPuts, index) - - // all shardNode leaves but the last should be filled to capacity - shardNodeLeaf1 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+1]) - leafLinks1 := shardNodeLeaf1.Links() - shardNodeLeaf2 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+2]) - leafLinks2 := shardNodeLeaf2.Links() - shardNodeLeaf3 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+3]) - leafLinks3 := shardNodeLeaf3.Links() - - if len(leafLinks1) != MaxLinks { - t.Errorf("First leaf should have max links, not %d", - len(leafLinks1)) - } - if len(leafLinks2) != MaxLinks { - t.Errorf("Second leaf should have max links, not %d", - len(leafLinks2)) - } - if len(leafLinks3) != MaxLinks/2 { - t.Errorf("Third leaf should have half max links, not %d", - len(leafLinks3)) - } - - // shardNodeRoot must point to shardNode leaves - shardNodeIndirect := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)]) - links := shardNodeIndirect.Links() - if len(links) != 3 { - t.Fatalf("Expected 3 links in indirect got %d", len(links)) - } - if !linksContain(links, shardNodeLeaf1.Cid().String()) || - !linksContain(links, shardNodeLeaf2.Cid().String()) || - !linksContain(links, shardNodeLeaf3.Cid().String()) { - t.Errorf("Unexpected shard leaf nodes in shard root node") - } - - // clusterDAG root should only point to shardNode root - clusterRoot := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+4]) - links = clusterRoot.Links() - if len(links) != 1 { - t.Fatalf("Expected 1 link in root got %d", len(links)) - } - if links[0].Cid.String() != shardNodeIndirect.Cid().String() { - t.Errorf("clusterDAG expected to link to %s, instead links to %s", - shardNodeIndirect.Cid().String(), - links[0].Cid.String()) - } -} - -// Test that by adding in enough nodes multiple shard nodes will be created -func TestMultipleShards(t *testing.T) { - sharder, mockRPC := testNewSharder(t) - sharder.allocSize = IPFSChunkSize + IPFSChunkSize/2 - sharder.allocSize = uint64(300000) - nodes := make([]*dag.ProtoNode, 4) - cids := make([]string, 4) - sessionID := "" - - for i := range nodes { - data := repeatData(90000, i) - nodes[i] = dag.NodeWithData(data) - nodes[i].SetPrefix(nil) - cids[i] = nodes[i].Cid().String() - size, err := nodes[i].Size() - if err != nil { - t.Fatal(err) - } - sessionID, err = sharder.AddNode(size, nodes[i].RawData(), cids[i], sessionID, 1, 1) - if err != nil { - t.Fatal(err) - } - } - err := sharder.Finalize(sessionID) - if err != nil { - t.Fatal(err) - } - - if len(mockRPC.orderedPuts) != len(nodes)+3 { //data nodes, 2 shard nodes, 1 root - t.Errorf("unexpected number of block puts called: %d", len(mockRPC.orderedPuts)) - } - - // First shard node contains first 3 cids - shardNode1 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)-1]) - links := shardNode1.Links() - for _, c := range cids[:len(cids)-1] { - if !linksContain(links, c) { - t.Errorf("expected cid %s not in shard node", c) - } - } - - // Second shard node only points to final cid - shardNode2 := cborDataToNode(t, mockRPC.orderedPuts[len(nodes)+1]) - links = shardNode2.Links() - if len(links) != 1 || links[0].Cid.String() != cids[len(nodes)-1] { - t.Errorf("unexpected links in second shard node") - } -} - -// repeatData takes in a byte value and a number of times this value should be -// repeated and returns a byte slice of this value repeated -func repeatData(byteCount int, digit int) []byte { - data := make([]byte, byteCount) - for i := range data { - data[i] = byte(digit) - } - return data -} diff --git a/state/mapstate/migrate.go b/state/mapstate/migrate.go index 875c72fb..05780ebb 100644 --- a/state/mapstate/migrate.go +++ b/state/mapstate/migrate.go @@ -110,15 +110,16 @@ func (st *mapStateV3) next() migrateable { mst4.PinMap = make(map[string]api.PinSerial) for k, v := range st.PinMap { mst4.PinMap[k] = api.PinSerial{ - Cid: v.Cid, - Name: v.Name, - Allocations: v.Allocations, - ReplicationFactorMin: v.ReplicationFactorMin, - ReplicationFactorMax: v.ReplicationFactorMax, - Recursive: true, - Type: int(api.DataType), - Parents: nil, - Clusterdag: "", + Cid: v.Cid, + Allocations: v.Allocations, + Type: int(api.DataType), + Parents: nil, + ClusterDAG: "", + PinOptions: api.PinOptions{ + ReplicationFactorMin: v.ReplicationFactorMin, + ReplicationFactorMax: v.ReplicationFactorMax, + Name: v.Name, + }, } } return &mst4 diff --git a/util.go b/util.go index 3208531a..6e76611e 100644 --- a/util.go +++ b/util.go @@ -110,3 +110,17 @@ func minInt(x, y int) int { } return y } + +// updatePinParents modifies the api.Pin input to give it the correct parents +// so that previous additions to the pins parents are maintained after this +// pin is committed to consensus. If this pin carries new parents they are +// merged with those already existing for this CID. +func updatePinParents(pin *api.Pin, existing api.Pin) { + // no existing parents this pin is up to date + if existing.Parents == nil || len(existing.Parents.Keys()) == 0 { + return + } + for _, c := range existing.Parents.Keys() { + pin.Parents.Add(c) + } +}