ipfs-cluster/cmd/ipfs-cluster-follow/commands.go
Hector Sanjuan a97ed10d0b Adopt api.Cid type - replaces cid.Cid everwhere.
This commit introduces an api.Cid type and replaces the usage of cid.Cid
everywhere.

The main motivation here is to override MarshalJSON so that Cids are
JSON-ified as '"Qm...."' instead of '{ "/": "Qm....." }', as this "ipld"
representation of IDs is horrible to work with, and our APIs are not issuing
IPLD objects to start with.

Unfortunately, there is no way to do this cleanly, and the best way is to just
switch everything to our own type.
2022-04-07 14:27:39 +02:00

556 lines
15 KiB
Go

package main
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"strings"
"time"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/balanced"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/consensus/crdt"
"github.com/ipfs/ipfs-cluster/datastore/badger"
"github.com/ipfs/ipfs-cluster/datastore/leveldb"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
cli "github.com/urfave/cli/v2"
)
func printFirstStart() {
fmt.Printf(`
No clusters configured yet!
If this is the first time you are running %s,
be sure to check out the usage documentation. Here are some
examples to get you going:
$ %s --help - general description and usage help
$ %s <clusterName> --help - Help and subcommands for the <clusterName>'s follower peer
$ %s <clusterName> info --help - Help for the "info" subcommand (same for others).
`, programName, programName, programName, programName)
}
func printNotInitialized(clusterName string) {
fmt.Printf(`
This cluster peer has not been initialized.
Try running "%s %s init <config-url>" first.
`, programName, clusterName)
}
func setLogLevels(lvl string) {
for f := range ipfscluster.LoggingFacilities {
ipfscluster.SetFacilityLogLevel(f, lvl)
}
for f := range ipfscluster.LoggingFacilitiesExtra {
ipfscluster.SetFacilityLogLevel(f, lvl)
}
}
// returns whether the config folder exists
func isInitialized(absPath string) bool {
_, err := os.Stat(absPath)
return err == nil
}
func listClustersCmd(c *cli.Context) error {
absPath, _, _ := buildPaths(c, "")
f, err := os.Open(absPath)
if os.IsNotExist(err) {
printFirstStart()
return nil
}
if err != nil {
return cli.Exit(err, 1)
}
dirs, err := f.Readdir(-1)
if err != nil {
return cli.Exit(errors.Wrapf(err, "reading %s", absPath), 1)
}
var filteredDirs []string
for _, d := range dirs {
if d.IsDir() {
configPath := filepath.Join(absPath, d.Name(), DefaultConfigFile)
if _, err := os.Stat(configPath); err == nil {
filteredDirs = append(filteredDirs, d.Name())
}
}
}
if len(filteredDirs) == 0 {
printFirstStart()
return nil
}
fmt.Printf("Configurations found for %d follower peers. For info and help, try running:\n\n", len(filteredDirs))
for _, d := range filteredDirs {
fmt.Printf("%s \"%s\"\n", programName, d)
}
fmt.Printf("\nTip: \"%s --help\" for help and examples.\n", programName)
return nil
}
func infoCmd(c *cli.Context) error {
clusterName := c.String("clusterName")
// Avoid pollution of the screen
setLogLevels("critical")
absPath, configPath, identityPath := buildPaths(c, clusterName)
if !isInitialized(absPath) {
printNotInitialized(clusterName)
return cli.Exit("", 1)
}
cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
var url string
if err != nil {
if config.IsErrFetchingSource(err) {
url = fmt.Sprintf(
"failed retrieving configuration source (%s)",
cfgHelper.Manager().Source,
)
ipfsCfg := ipfshttp.Config{}
ipfsCfg.Default()
cfgHelper.Configs().Ipfshttp = &ipfsCfg
} else {
return cli.Exit(errors.Wrapf(err, "reading the configurations in %s", absPath), 1)
}
} else {
url = fmt.Sprintf("Available (%s)", cfgHelper.Manager().Source)
}
cfgHelper.Manager().Shutdown()
fmt.Printf("Information about follower peer for Cluster \"%s\":\n\n", clusterName)
fmt.Printf("Config folder: %s\n", absPath)
fmt.Printf("Config source URL: %s\n", url)
ctx := context.Background()
client, err := getClient(absPath, clusterName)
if err != nil {
return cli.Exit(errors.Wrap(err, "error creating client"), 1)
}
_, err = client.Version(ctx)
fmt.Printf("Cluster Peer online: %t\n", err == nil)
// Either we loaded a valid config, or we are using a default. Worth
// applying env vars in the second case.
if err := cfgHelper.Configs().Ipfshttp.ApplyEnvVars(); err != nil {
return cli.Exit(errors.Wrap(err, "applying environment variables to ipfshttp config"), 1)
}
cfgHelper.Configs().Ipfshttp.ConnectSwarmsDelay = 0
connector, err := ipfshttp.NewConnector(cfgHelper.Configs().Ipfshttp)
if err == nil {
_, err = connector.ID(ctx)
}
fmt.Printf("IPFS peer online: %t\n", err == nil)
if c.Command.Name == "" {
fmt.Printf("Additional help:\n\n")
fmt.Printf("-------------------------------------------------\n\n")
return cli.ShowAppHelp(c)
}
return nil
}
func initCmd(c *cli.Context) error {
if !c.Args().Present() {
return cli.Exit("configuration URL not provided", 1)
}
cfgURL := c.Args().First()
return initCluster(c, false, cfgURL)
}
func initCluster(c *cli.Context, ignoreReinit bool, cfgURL string) error {
clusterName := c.String(clusterNameFlag)
absPath, configPath, identityPath := buildPaths(c, clusterName)
if isInitialized(absPath) {
if ignoreReinit {
fmt.Println("Configuration for this cluster already exists. Skipping initialization.")
fmt.Printf("If you wish to re-initialize, simply delete %s\n\n", absPath)
return nil
}
cmdutils.ErrorOut("Configuration for this cluster already exists.\n")
cmdutils.ErrorOut("Please delete %s if you wish to re-initialize.", absPath)
return cli.Exit("", 1)
}
gw := c.String("gateway")
if !strings.HasPrefix(cfgURL, "http://") && !strings.HasPrefix(cfgURL, "https://") {
fmt.Printf("%s will be assumed to be an DNSLink-powered address: /ipns/%s.\n", cfgURL, cfgURL)
fmt.Printf("It will be resolved using the local IPFS daemon's gateway (%s).\n", gw)
fmt.Println("If this is not the case, specify the full url starting with http:// or https://.")
fmt.Println("(You can override the gateway URL by setting IPFS_GATEWAY)")
fmt.Println()
cfgURL = fmt.Sprintf("http://%s/ipns/%s", gw, cfgURL)
}
// Setting the datastore here is useless, as we initialize with remote
// config and we will have an empty service.json with the source only.
// That source will decide which datastore is actually used.
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "crdt", "")
cfgHelper.Manager().Shutdown()
cfgHelper.Manager().Source = cfgURL
err := cfgHelper.Manager().Default()
if err != nil {
return cli.Exit(errors.Wrap(err, "error generating default config"), 1)
}
ident := cfgHelper.Identity()
err = ident.Default()
if err != nil {
return cli.Exit(errors.Wrap(err, "error generating identity"), 1)
}
err = ident.ApplyEnvVars()
if err != nil {
return cli.Exit(errors.Wrap(err, "error applying environment variables to the identity"), 1)
}
err = cfgHelper.SaveIdentityToDisk()
if err != nil {
return cli.Exit(errors.Wrapf(err, "error saving %s", identityPath), 1)
}
fmt.Printf("Identity written to %s.\n", identityPath)
err = cfgHelper.SaveConfigToDisk()
if err != nil {
return cli.Exit(errors.Wrapf(err, "saving %s", configPath), 1)
}
fmt.Printf("Configuration written to %s.\n", configPath)
fmt.Printf("Cluster \"%s\" follower peer initialized.\n\n", clusterName)
fmt.Printf(
"You can now use \"%s %s run\" to start a follower peer for this cluster.\n",
programName,
clusterName,
)
fmt.Println("(Remember to start your IPFS daemon before)")
return nil
}
func runCmd(c *cli.Context) error {
clusterName := c.String(clusterNameFlag)
if cfgURL := c.String("init"); cfgURL != "" {
err := initCluster(c, true, cfgURL)
if err != nil {
return err
}
}
absPath, configPath, identityPath := buildPaths(c, clusterName)
if !isInitialized(absPath) {
printNotInitialized(clusterName)
return cli.Exit("", 1)
}
fmt.Printf("Starting the IPFS Cluster follower peer for \"%s\".\nCTRL-C to stop it.\n", clusterName)
fmt.Println("Checking if IPFS is online (will wait for 2 minutes)...")
ctxIpfs, cancelIpfs := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelIpfs()
err := cmdutils.WaitForIPFS(ctxIpfs)
if err != nil {
return cli.Exit("timed out waiting for IPFS to be available", 1)
}
setLogLevels(logLevel) // set to "info" by default.
// Avoid API logs polluting the screen everytime we
// run some "list" command.
ipfscluster.SetFacilityLogLevel("restapilog", "error")
cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
if err != nil {
return cli.Exit(errors.Wrapf(err, "reading the configurations in %s", absPath), 1)
}
cfgHelper.Manager().Shutdown()
cfgs := cfgHelper.Configs()
stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.GetDatastore(), cfgHelper.Identity(), cfgs)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating state manager"), 1)
}
store, err := stmgr.GetStore()
if err != nil {
return cli.Exit(errors.Wrap(err, "creating datastore"), 1)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster, store)
if err != nil {
return cli.Exit(errors.Wrap(err, "error creating libp2p components"), 1)
}
// Always run followers in follower mode.
cfgs.Cluster.FollowerMode = true
// Do not let trusted peers GC this peer
// Defaults to Trusted otherwise.
cfgs.Cluster.RPCPolicy["Cluster.RepoGCLocal"] = ipfscluster.RPCClosed
// Discard API configurations and create our own
apiCfg := rest.NewConfig()
cfgs.Restapi = apiCfg
_ = apiCfg.Default()
listenSocket, err := socketAddress(absPath, clusterName)
if err != nil {
return cli.Exit(err, 1)
}
apiCfg.HTTPListenAddr = []multiaddr.Multiaddr{listenSocket}
// Allow customization via env vars
err = apiCfg.ApplyEnvVars()
if err != nil {
return cli.Exit(errors.Wrap(err, "error applying environmental variables to restapi configuration"), 1)
}
rest, err := rest.NewAPI(ctx, apiCfg)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating REST API component"), 1)
}
connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating IPFS Connector component"), 1)
}
informer, err := disk.NewInformer(cfgs.Diskinf)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating disk informer"), 1)
}
alloc, err := balanced.New(cfgs.BalancedAlloc)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating metrics allocator"), 1)
}
crdtcons, err := crdt.New(
host,
dht,
pubsub,
cfgs.Crdt,
store,
)
if err != nil {
store.Close()
return cli.Exit(errors.Wrap(err, "creating CRDT component"), 1)
}
tracker := stateless.New(cfgs.Statelesstracker, host.ID(), cfgs.Cluster.Peername, crdtcons.State)
mon, err := pubsubmon.New(ctx, cfgs.Pubsubmon, pubsub, nil)
if err != nil {
store.Close()
return cli.Exit(errors.Wrap(err, "setting up PeerMonitor"), 1)
}
// Hardcode disabled tracing and metrics to avoid mistakenly
// exposing any user data.
tracerCfg := observations.TracingConfig{}
_ = tracerCfg.Default()
tracerCfg.EnableTracing = false
cfgs.Tracing = &tracerCfg
tracer, err := observations.SetupTracing(&tracerCfg)
if err != nil {
return cli.Exit(errors.Wrap(err, "error setting up tracer"), 1)
}
// This does nothing since we are not calling SetupMetrics anyways
// But stays just to be explicit.
metricsCfg := observations.MetricsConfig{}
_ = metricsCfg.Default()
metricsCfg.EnableStats = false
cfgs.Metrics = &metricsCfg
// We are going to run a cluster peer and should do an
// oderly shutdown if we are interrupted: cancel default
// signal handling and leave things to HandleSignals.
signal.Stop(signalChan)
close(signalChan)
cluster, err := ipfscluster.NewCluster(
ctx,
host,
dht,
cfgs.Cluster,
store,
crdtcons,
[]ipfscluster.API{rest},
connector,
tracker,
mon,
alloc,
[]ipfscluster.Informer{informer},
tracer,
)
if err != nil {
store.Close()
return cli.Exit(errors.Wrap(err, "error creating cluster peer"), 1)
}
return cmdutils.HandleSignals(ctx, cancel, cluster, host, dht, store)
}
// List
func listCmd(c *cli.Context) error {
clusterName := c.String("clusterName")
absPath, configPath, identityPath := buildPaths(c, clusterName)
if !isInitialized(absPath) {
printNotInitialized(clusterName)
return cli.Exit("", 1)
}
err := printStatusOnline(absPath, clusterName)
if err == nil {
return nil
}
// There was an error. Try offline status
apiErr, ok := err.(*api.Error)
if ok && apiErr.Code != 0 {
return cli.Exit(
errors.Wrapf(
err,
"The Peer API seems to be running but returned with code %d",
apiErr.Code,
), 1)
}
// We are on offline mode so we cannot rely on IPFS being
// running and most probably our configuration is remote and
// to be loaded from IPFS. Thus we need to find a different
// way to decide whether to load badger/leveldb, and once we
// know, do it with the default settings.
hasLevelDB := false
lDBCfg := &leveldb.Config{}
lDBCfg.SetBaseDir(absPath)
lDBCfg.Default()
levelDBInfo, err := os.Stat(lDBCfg.GetFolder())
if err == nil && levelDBInfo.IsDir() {
hasLevelDB = true
}
hasBadger := false
badgerCfg := &badger.Config{}
badgerCfg.SetBaseDir(absPath)
badgerCfg.Default()
badgerInfo, err := os.Stat(badgerCfg.GetFolder())
if err == nil && badgerInfo.IsDir() {
hasBadger = true
}
if hasLevelDB && hasBadger {
return cli.Exit(errors.Wrapf(err, "found both leveldb (%s) and badger (%s) folders: cannot determine which to use in offline mode", lDBCfg.GetFolder(), badgerCfg.GetFolder()), 1)
}
// Since things were initialized, assume there is one at least.
dstoreType := "leveldb"
if hasBadger {
dstoreType = "badger"
}
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "crdt", dstoreType)
cfgHelper.Manager().Shutdown() // not needed
cfgHelper.Configs().Badger.SetBaseDir(absPath)
cfgHelper.Configs().LevelDB.SetBaseDir(absPath)
cfgHelper.Manager().Default() // we have a default crdt config with either leveldb or badger registered.
cfgHelper.Manager().ApplyEnvVars()
err = printStatusOffline(cfgHelper)
if err != nil {
return cli.Exit(errors.Wrap(err, "error obtaining the pinset"), 1)
}
return nil
}
func printStatusOnline(absPath, clusterName string) error {
ctx := context.Background()
client, err := getClient(absPath, clusterName)
if err != nil {
return cli.Exit(errors.Wrap(err, "error creating client"), 1)
}
out := make(chan api.GlobalPinInfo, 1024)
errCh := make(chan error, 1)
go func() {
defer close(errCh)
errCh <- client.StatusAll(ctx, 0, true, out)
}()
var pid string
for gpi := range out {
if pid == "" { // do this once
// PeerMap will only have one key
for k := range gpi.PeerMap {
pid = k
break
}
}
pinInfo := gpi.PeerMap[pid]
printPin(gpi.Cid, pinInfo.Status.String(), gpi.Name, pinInfo.Error)
}
err = <-errCh
return err
}
func printStatusOffline(cfgHelper *cmdutils.ConfigHelper) error {
mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper)
if err != nil {
return err
}
store, err := mgr.GetStore()
if err != nil {
return err
}
defer store.Close()
st, err := mgr.GetOfflineState(store)
if err != nil {
return err
}
out := make(chan api.Pin, 1024)
errCh := make(chan error, 1)
go func() {
defer close(errCh)
errCh <- st.List(context.Background(), out)
}()
for pin := range out {
printPin(pin.Cid, "offline", pin.Name, "")
}
err = <-errCh
return err
}
func printPin(c api.Cid, status, name, err string) {
if err != "" {
name = name + " (" + err + ")"
}
fmt.Printf("%-20s %s %s\n", status, c, name)
}