ipfscluster tool. A CLI app wrapping the Cluster API.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-22 17:14:15 +01:00
parent 0b5a300568
commit b92b598db1
10 changed files with 484 additions and 33 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
coverage.out
ipfscluster-server/ipfscluster-server
ipfscluster/ipfscluster
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o

View File

@ -11,6 +11,8 @@ install:
- make all
script:
- make test
- make server
- make client
- "$GOPATH/bin/goveralls -coverprofile=coverage.out -service=travis-ci -repotoken $COVERALLS_TOKEN"
env:
global:

View File

@ -1,10 +1,16 @@
all: install
clean:
all: server client
clean: rwundo
$(MAKE) -C ipfscluster-server clean
$(MAKE) -C ipfscluster clean
install: deps
$(MAKE) -C ipfscluster-server install
$(MAKE) -C ipfscluster install
server: deps
$(MAKE) -C ipfscluster-server ipfscluster-server
client: deps
$(MAKE) -C ipfscluster ipfscluster
gx:
go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go

View File

@ -37,13 +37,13 @@ Current functionality only allows pinning in all cluster members, but more strat
## Background
Since the start of IPFS it was clear that a tool to coordinate a number of different nodes (and the content they are supposed to store) would add a great value to the IPFS ecosystem. Naïve approaches are possible, but they present some weaknesses, specially at dealing with error handling and incorporating multiple pinning strategies.
Since the start of IPFS it was clear that a tool to coordinate a number of different nodes (and the content they are supposed to store) would add a great value to the IPFS ecosystem. Naïve approaches are possible, but they present some weaknesses, specially at dealing with error handling, recovery and implementation of advanced pinning strategies.
`ipfs-cluster` aims to address this issues by providing a IPFS node wrapper which coordinates multiple cluster members via a consensus algorithm. This ensures that the desired state of the system is always agreed upon and can be easily maintained by the members of the cluster. Thus, every cluster member has an overview of where each hash is pinned, and the cluster can react to any contingencies, like IPFS nodes dying, by redistributing the storage load to others.
`ipfs-cluster` aims to address this issues by providing a IPFS node wrapper which coordinates multiple cluster members via a consensus algorithm. This ensures that the desired state of the system is always agreed upon and can be easily maintained by the members of the cluster. Thus, every cluster member knows which content is tracked, can decide whether asking IPFS to pin it and can react to any contingencies like server reboots.
## Install
In order to install `ipfs-cluster` simply download this repository and run `make` as follows:
In order to install the `ipfscluster-server` the `ipfscluster` tool simply download this repository and run `make` as follows:
```
$ go get -u -d github.com/ipfs/ipfs-cluster
@ -51,7 +51,7 @@ $ cd $GOPATH/src/github.com/ipfs/ipfs-cluster
$ make install
```
This will install the ipfs-cluster executables (`ipfscluster-server` and `ipfscluster`) in your `$GOPATH/bin` folder.
This will install `ipfscluster-server` and `ipfscluster` in your `$GOPATH/bin` folder.
## Usage
@ -70,7 +70,7 @@ Before running `ipfscluster-server` for the first time, initialize a configurati
$ ipfscluster-server -init
```
The configuration will be placed in `~/.ipfs-cluster/server.json`.
The configuration will be placed in `~/.ipfs-cluster/server.json` by default.
You can add the multiaddresses for the other members of the cluster in the `cluster_peers` variable.
@ -79,7 +79,18 @@ You can add the multiaddresses for the other members of the cluster in the `clus
`ipfscluster` is the client application to manage the cluster servers and perform actions. `ipfscluster` uses the HTTP API provided by the server nodes.
TODO: This is not done yet
After installing, you can run `ipfscluster --help` to display general description and options, or alternatively `ipfscluster help [cmd]` to display
information about supported commands.
In summary, it works as follows:
```
$ ipfscluster member ls # list cluster members
$ ipfscluster pin add Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # pins a Cid in the cluster
$ ipfscluster pin add Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # unpin a Cid from the cluster
$ ipfscluster status # display tracked Cids information
$ ipfscluster sync Qma4Lid2T1F68E3Xa3CpE6vVJDLwxXLD8RfiB9g1Tmqp58 # recover Cids in error status
```
### Go

View File

@ -13,7 +13,7 @@ Things that need to be done:
* ~~ipfscluster-server tool~~
* ~~Allow custom configuration path~~
* ~~allow --init~~
* ipfscluster tool
* ~~ipfscluster tool~~
* Peer-aware components. Modify cluster members on the fly.
* ~~Recover(). Also vs. Sync()~~
* go-fmt

View File

@ -73,7 +73,7 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
shutdownCh: make(chan struct{}),
}
logger.Info("starting IPFS Cluster")
logger.Infof("starting IPFS Cluster v%s", Version)
cluster.run()
return cluster, nil

View File

@ -13,12 +13,12 @@ import (
ipfscluster "github.com/ipfs/ipfs-cluster"
)
// Name of this application
const name = `ipfscluster-server`
// ProgramName of this application
const programName = `ipfscluster-server`
// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
%s runs an IPFS Cluster member.
%s runs an IPFS Cluster member (version %s).
A member is a server node which participates in the cluster consensus, follows
a distributed log of pinning and unpinning operations and manages pinning
@ -33,11 +33,13 @@ independent from IPFS and includes its own LibP2P key-pair. It can be
initialized with -init and its default location is
~/%s/%s.
For any additional information, visit https://github.com/ipfs/ipfs-cluster.
For feedback, bug reports or any additional information, visit
https://github.com/ipfs/ipfs-cluster.
`,
name,
name,
programName,
ipfscluster.Version,
programName,
DefaultPath,
DefaultConfigFile)
@ -63,6 +65,7 @@ var (
configFlag string
debugFlag bool
logLevelFlag string
versionFlag bool
)
func init() {
@ -81,7 +84,7 @@ func init() {
dataPath = filepath.Join(DefaultPath, DefaultDataFolder)
flag.Usage = func() {
out("Usage: %s [options]\n", name)
out("Usage: %s [options]\n", programName)
out(Description)
out("Options:\n")
flag.PrintDefaults()
@ -95,8 +98,21 @@ func init() {
"enable full debug logs of ipfs cluster and consensus layers")
flag.StringVar(&logLevelFlag, "loglevel", "info",
"set the loglevel [critical, error, warning, notice, info, debug]")
flag.BoolVar(&versionFlag, "version", false,
fmt.Sprintf("display %s version", programName))
flag.Parse()
configPath = configFlag
setupLogging()
setupDebug()
if versionFlag {
fmt.Println(ipfscluster.Version)
}
if initFlag {
err := initConfig()
checkErr("creating configuration", err)
os.Exit(0)
}
}
func out(m string, a ...interface{}) {
@ -108,14 +124,6 @@ func main() {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt)
setupLogging()
setupDebug()
if initFlag {
err := initConfig()
checkErr("creating configuration", err)
os.Exit(0)
}
cfg, err := loadConfig()
checkErr("loading configuration", err)
api, err := ipfscluster.NewRESTAPI(cfg)
@ -145,12 +153,12 @@ func checkErr(doing string, err error) {
}
func setupLogging() {
logging.SetLogLevel("ipfscluster", logLevelFlag)
logging.SetLogLevel("cluster", logLevelFlag)
}
func setupDebug() {
if debugFlag {
logging.SetLogLevel("ipfscluster", "debug")
logging.SetLogLevel("cluster", "debug")
logging.SetLogLevel("libp2p-raft", "debug")
ipfscluster.SilentRaft = false
}
@ -171,7 +179,7 @@ func initConfig() error {
return err
}
out("%s configuration written to %s",
name, configPath)
programName, configPath)
return nil
}

View File

@ -2,9 +2,9 @@
// allows to orchestrate a number of tasks between several IPFS nodes.
//
// IPFS Cluster uses a consensus algorithm and libP2P to keep a shared
// state between the different members of the cluster. This state is
// primarily used to keep track of pinned items, and ensure that an
// item is pinned in different places.
// state between the different members of the cluster and provides
// components to interact with the IPFS daemon and provide public
// and internal APIs.
package ipfscluster
import (
@ -18,7 +18,7 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("ipfscluster")
var logger = logging.Logger("cluster")
// Current Cluster version.
const Version = "0.0.1"
@ -44,7 +44,7 @@ func SetLogLevel(l string) {
INFO
DEBUG
*/
logging.SetLogLevel("ipfs-cluster", l)
logging.SetLogLevel("cluster", l)
}
// ClusterComponent represents a piece of ipfscluster. Cluster components

12
ipfscluster/Makefile Normal file
View File

@ -0,0 +1,12 @@
all: ipfscluster
ipfscluster:
go build
install:
go install
clean:
rm -f ipfscluster
.PHONY: clean install

411
ipfscluster/main.go Normal file
View File

@ -0,0 +1,411 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"sort"
"strings"
"time"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
)
const programName = `ipfscluster`
var (
defaultHost = fmt.Sprintf("127.0.0.1:%d", 9094)
defaultTimeout = 60
defaultProtocol = "http"
)
var logger = logging.Logger("ipfscluster")
// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
%s is a tool to manage IPFS Cluster server nodes.
Use "%s help" to list all available commands and "%s help <command>"
to get usage information for a specific one.
%s uses the IPFS Cluster API to perform requests and display
responses in a user-readable format. The location of the IPFS
Cluster server is assumed to be %s, but can be
configured with the -host option.
For feedback, bug reports or any additional information, visit
https://github.com/ipfs/ipfs-cluster.
`,
programName,
programName,
programName,
programName,
defaultHost)
// Command line flags
var (
hostFlag string
protocolFlag string
timeoutFlag int
versionFlag bool
debugFlag bool
)
type errorResp struct {
Code int `json:"code"`
Message string `json:"message"`
}
type cmd struct {
Name string
Path string
Method string
Subcmd bool
ShortDesc string
LongDesc string
}
var cmds = map[string]cmd{
"help": cmd{
Name: "help",
ShortDesc: "Shows this help",
LongDesc: `
Usage: help <cmd>
This command shows detailed usage instructions for other commands.
`},
"member": cmd{
Name: "member",
Subcmd: true,
ShortDesc: "List and manage cluster members",
LongDesc: `
Usage: member ls
This command can be used to list and manage IPFS Cluster members.
`},
"member ls": cmd{
Name: "memberList",
Path: "/members",
Method: "GET",
ShortDesc: "List cluster members",
LongDesc: `
Usage: member ls
This command lists the nodes participating in the IPFS Cluster.
`},
"pin": cmd{
Name: "pin",
Path: "",
Subcmd: true,
ShortDesc: "Manage tracked CIDs",
LongDesc: `
Usage: pin add|rm|ls [cid]
This command allows to add, remove or list items managed (pinned) by
the Cluster.
`},
"pin add": cmd{
Name: "pinAdd",
Path: "/pins/{param0}",
Method: "POST",
ShortDesc: "Track a CID (pin)",
LongDesc: `
Usage: pin add <cid>
This command tells IPFS Cluster to start managing a CID. Depending on
the pinning strategy, this will trigger IPFS pin requests. The CID will
become part of the Cluster's state and will tracked from this point.
`},
"pin rm": cmd{
Name: "pinRm",
Path: "/pins/{param0}",
Method: "DELETE",
Subcmd: false,
ShortDesc: "Stop tracking a CID (unpin)",
LongDesc: `
Usage: pin rm <cid>
This command tells IPFS Cluster to no longer manage a CID. This will
trigger unpinning operations in all the IPFS nodes holding the content.
`},
"pin ls": cmd{
Name: "pinLs",
Path: "/pins",
Method: "GET",
ShortDesc: "List tracked CIDs",
LongDesc: `
Usage: pin ls
This command will list the CIDs which are tracked by IPFS Cluster. This
list does not include information about tracking status or location, it
merely represents the list of pins which are part of the global state of
the cluster. For specific information, use "status".
`},
/*
member list
pin list
pin add cid
pin rm cid
status
status cid
sync
sync cid
version
*/
"status": cmd{
Name: "status",
Path: "/status/{param0}",
Method: "GET",
Subcmd: false,
ShortDesc: "Retrieve status of tracked items",
LongDesc: `
Usage: status [cid]
This command retrieves the status of the CIDs tracked by IPFS
Cluster, including which member is pinning them and any errors.
If a CID is provided, the status will be only fetched for a single
item.
`},
"sync": cmd{
Name: "sync",
Path: "/status/{param0}",
Method: "POST",
Subcmd: false,
ShortDesc: "Sync status and/or recover tracked items",
LongDesc: `
Usage: sync [cid]
This command verifies that the current status tracked CIDs are accurate by
triggering queries to the IPFS daemons that pin them. When the CID is in
error state, either because pinning or unpinning failed, IPFS Cluster will
attempt to retry the operation. If a CID is provided, the sync and recover
operations will be limited to that single item.
`},
"version": cmd{
Name: "version",
Path: "/version",
Method: "GET",
ShortDesc: "Retrieve cluster version",
LongDesc: `
Usage: version
This command retrieves the IPFS Cluster version. It is advisable that
it matches the one returned by -version.
`},
}
func init() {
flag.Usage = func() {
out("Usage: %s [options] <cmd> [cmd_options]\n", programName)
out(Description)
out("Options:\n")
flag.PrintDefaults()
out("\n")
}
flag.StringVar(&hostFlag, "host", defaultHost,
"host:port of the IPFS Cluster Server API")
flag.StringVar(&protocolFlag, "protocol", defaultProtocol,
"protocol used by the API: usually http or https")
flag.IntVar(&timeoutFlag, "timeout", defaultTimeout,
"how many seconds before timing out a Cluster API request")
flag.BoolVar(&versionFlag, "version", false,
fmt.Sprintf("display %s version", programName))
// flag.BoolVar(&debugFlag, "debug", false,
// "set debug log level")
flag.Parse()
defaultHost = hostFlag
defaultProtocol = protocolFlag
if debugFlag {
logging.SetLogLevel("ipfscluster", "debug")
}
}
func out(m string, a ...interface{}) {
fmt.Fprintf(os.Stderr, m, a...)
}
func checkErr(doing string, err error) {
if err != nil {
out("error %s: %s\n", doing, err)
os.Exit(1)
}
}
func main() {
cmd := getCmd(0, "")
switch cmd.Name {
case "help":
cmdName := flag.Arg(0)
if cmdName == "help" {
cmdName = flag.Arg(1)
}
usage, ok := cmds[cmdName]
if !ok {
keys := make([]string, 0, len(cmds))
for k := range cmds {
keys = append(keys, k)
}
sort.Strings(keys)
out("%s - available commands:\n\n", programName)
for _, k := range keys {
if cmds[k].Subcmd == false {
out("%s - %s\n", k, cmds[k].ShortDesc)
}
}
} else {
out(usage.LongDesc + "\n")
}
case "memberList":
resp := request(cmd.Method, cmd.Path)
formatResponse(resp)
case "pinAdd":
cidStr := flag.Arg(2)
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp := request(cmd.Method, cmd.Path, cidStr)
formatResponse(resp)
case "pinRm":
cidStr := flag.Arg(2)
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
resp := request(cmd.Method, cmd.Path, cidStr)
formatResponse(resp)
case "pinLs":
resp := request(cmd.Method, cmd.Path)
formatResponse(resp)
case "status":
cidStr := flag.Arg(1)
if cidStr != "" {
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
}
resp := request(cmd.Method, cmd.Path, cidStr)
formatResponse(resp)
case "sync":
cidStr := flag.Arg(1)
if cidStr != "" {
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
}
resp := request(cmd.Method, cmd.Path, cidStr)
formatResponse(resp)
case "version":
resp := request(cmd.Method, cmd.Path)
formatResponse(resp)
default:
err := fmt.Errorf("wrong command. Try \"%s help\" for help",
programName)
checkErr("", err)
os.Exit(1)
}
}
func getCmd(nArg int, prefix string) cmd {
arg := flag.Arg(nArg)
cmdStr := arg
if prefix != "" {
cmdStr = prefix + " " + arg
}
cmd, ok := cmds[cmdStr]
if !ok {
if arg != "" {
out("error: command not found\n\n")
}
return cmds["help"]
}
if cmd.Subcmd {
return getCmd(nArg+1, cmdStr)
}
return cmd
}
func request(method, path string, args ...string) *http.Response {
ctx, cancel := context.WithTimeout(context.Background(),
time.Duration(defaultTimeout)*time.Second)
defer cancel()
u := defaultProtocol + "://" + defaultHost + path
// turn /a/{param0}/{param1} into /a/this/that
for i, a := range args {
p := fmt.Sprintf("{param%d}", i)
u = strings.Replace(u, p, a, 1)
}
logger.Debugf("%s: %s", method, u)
r, err := http.NewRequest(method, u, nil)
checkErr("creating request", err)
r.WithContext(ctx)
client := &http.Client{}
resp, err := client.Do(r)
checkErr(fmt.Sprintf("performing request to %s", defaultHost), err)
return resp
}
func formatResponse(r *http.Response) {
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
checkErr("reading body", err)
logger.Debugf("Body: %s", body)
if r.StatusCode > 399 {
var e errorResp
err = json.Unmarshal(body, &e)
checkErr("decoding error response", err)
out("Error %d: %s", e.Code, e.Message)
} else if r.StatusCode == http.StatusAccepted {
out("Request accepted")
} else {
var resp interface{}
err = json.Unmarshal(body, &resp)
checkErr("decoding response", err)
prettyPrint(resp, 0)
}
}
func prettyPrint(obj interface{}, indent int) {
ind := func() string {
var str string
for i := 0; i < indent; i++ {
str += " "
}
return str
}
switch obj.(type) {
case []interface{}:
slice := obj.([]interface{})
for _, elem := range slice {
prettyPrint(elem, indent+2)
}
case map[string]interface{}:
m := obj.(map[string]interface{})
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := m[k]
fmt.Printf(ind()+"%s: ", k)
switch v.(type) {
case []interface{}, map[string]interface{}:
fmt.Println()
prettyPrint(v, indent+4)
default:
prettyPrint(v, indent)
}
}
default:
fmt.Println(obj)
}
}