From ffef2cea0a022c75eaf142aa6f065d3412d71317 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 26 Sep 2022 19:33:46 +0200 Subject: [PATCH] service: add crdt info, mark-dirty, mark-clean commands Provide access to some crdt-internal operations --- cmd/ipfs-cluster-service/main.go | 151 ++++++++++++++++++++++--------- go.mod | 2 +- go.sum | 4 + 3 files changed, 114 insertions(+), 43 deletions(-) diff --git a/cmd/ipfs-cluster-service/main.go b/cmd/ipfs-cluster-service/main.go index 6b6f3fe0..1a4bb1d8 100644 --- a/cmd/ipfs-cluster-service/main.go +++ b/cmd/ipfs-cluster-service/main.go @@ -476,6 +476,29 @@ the peer IDs in the given multiaddresses. }, Subcommands: []cli.Command{ + { + Name: "info", + Usage: "Print information about the CRDT store", + Description: ` +This commands prints basic information: current heads, dirty flag etc. +`, + Flags: []cli.Flag{}, + Action: func(c *cli.Context) error { + locker.lock() + defer locker.tryUnlock() + + crdt := getCrdt() + info := crdt.InternalStats() + fmt.Printf( + "Number of heads: %d. Current max-height: %d. Dirty: %t\nHeads: %s", + len(info.Heads), + info.MaxHeight, + crdt.IsDirty(), + info.Heads, + ) + return nil + }, + }, { Name: "dot", Usage: "Write the CRDT-DAG as DOT file", @@ -498,49 +521,9 @@ Use with caution! locker.lock() defer locker.tryUnlock() - // Load all the configurations and identity - cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath) - checkErr("loading configurations", err) - defer cfgHelper.Manager().Shutdown() - - // Get a state manager and the datastore - mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper) - checkErr("creating state manager", err) - store, err := mgr.GetStore() - checkErr("opening datastore", err) - batching, ok := store.(datastore.Batching) - if !ok { - checkErr("", errors.New("no batching store")) - } - - crdtNs := cfgHelper.Configs().Crdt.DatastoreNamespace - - var blocksDatastore datastore.Batching = namespace.Wrap( - batching, - datastore.NewKey(crdtNs).ChildString(crdt.BlocksNs), - ) - - ipfs, err := ipfslite.New( - context.Background(), - blocksDatastore, - nil, - nil, - &ipfslite.Config{ - Offline: true, - }, - ) - checkErr("creating ipfs-lite offline node", err) - - opts := dscrdt.DefaultOptions() - crdt, err := dscrdt.New( - batching, - datastore.NewKey(crdtNs), - ipfs, - nil, - opts, - ) - checkErr("creating crdt node", err) + crdt := getCrdt() + var err error var w io.WriteCloser outputPath := c.String("file") if outputPath == "" { @@ -563,6 +546,43 @@ Use with caution! }, }, + { + Name: "mark-dirty", + Usage: "Marks the CRDT-store as dirty", + Description: ` +Marking the CRDT store as dirty will force-run a Repair operation on the next +run (i.e. next time the cluster peer is started). +`, + Flags: []cli.Flag{}, + Action: func(c *cli.Context) error { + locker.lock() + defer locker.tryUnlock() + + crdt := getCrdt() + crdt.MarkDirty() + fmt.Println("Datastore marked 'dirty'") + return nil + }, + }, + { + Name: "mark-clean", + Usage: "Marks the CRDT-store as clean", + Description: ` +This command remove the dirty-mark on the CRDT-store, which means no +DAG operations will be run. +`, + Flags: []cli.Flag{}, + Action: func(c *cli.Context) error { + locker.lock() + defer locker.tryUnlock() + + crdt := getCrdt() + crdt.MarkClean() + fmt.Println("Datastore marked 'clean'") + return nil + + }, + }, }, }, { @@ -858,3 +878,50 @@ func getStateManager() cmdutils.StateManager { checkErr("creating state manager", err) return mgr } + +func getCrdt() *dscrdt.Datastore { + // Load all the configurations and identity + cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath) + checkErr("loading configurations", err) + defer cfgHelper.Manager().Shutdown() + + // Get a state manager and the datastore + mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper) + checkErr("creating state manager", err) + store, err := mgr.GetStore() + checkErr("opening datastore", err) + batching, ok := store.(datastore.Batching) + if !ok { + checkErr("", errors.New("no batching store")) + } + + crdtNs := cfgHelper.Configs().Crdt.DatastoreNamespace + + var blocksDatastore datastore.Batching = namespace.Wrap( + batching, + datastore.NewKey(crdtNs).ChildString(crdt.BlocksNs), + ) + + ipfs, err := ipfslite.New( + context.Background(), + blocksDatastore, + nil, + nil, + &ipfslite.Config{ + Offline: true, + }, + ) + checkErr("creating ipfs-lite offline node", err) + + opts := dscrdt.DefaultOptions() + opts.RepairInterval = 0 + crdt, err := dscrdt.New( + batching, + datastore.NewKey(crdtNs), + ipfs, + nil, + opts, + ) + checkErr("creating crdt node", err) + return crdt +} diff --git a/go.mod b/go.mod index a7a46949..1b45fc36 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger v0.3.0 - github.com/ipfs/go-ds-crdt v0.3.7 + github.com/ipfs/go-ds-crdt v0.3.9 github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-fs-lock v0.0.7 github.com/ipfs/go-ipfs-api v0.3.0 diff --git a/go.sum b/go.sum index eb9f2325..94b662e4 100644 --- a/go.sum +++ b/go.sum @@ -502,6 +502,10 @@ github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1 github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek= github.com/ipfs/go-ds-crdt v0.3.7 h1:LVOxRa6rOUPYhDN+tFrQrE4pu7dHTuDqKT57NUWjl1Y= github.com/ipfs/go-ds-crdt v0.3.7/go.mod h1:h2hPQ3njd7DztdvUCOuV33Aq1QYRFwHXJdz+Z5oo2A0= +github.com/ipfs/go-ds-crdt v0.3.8 h1:znCPI1XjCj++hckxYa7YP+udWxWkRrxZIBlEU5Ao6a0= +github.com/ipfs/go-ds-crdt v0.3.8/go.mod h1:h2hPQ3njd7DztdvUCOuV33Aq1QYRFwHXJdz+Z5oo2A0= +github.com/ipfs/go-ds-crdt v0.3.9 h1:hZl67DynkBWGz2YwTQjR7d/dYNMEHFpVX2bz9Dyu6k8= +github.com/ipfs/go-ds-crdt v0.3.9/go.mod h1:h2hPQ3njd7DztdvUCOuV33Aq1QYRFwHXJdz+Z5oo2A0= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=