ipfscluster-server executable

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2016-12-21 19:37:25 +01:00
parent f785c90de0
commit 34720465cd
9 changed files with 278 additions and 65 deletions

27
.gitignore vendored Normal file
View File

@ -0,0 +1,27 @@
coverage.out
ipfscluster-server/ipfscluster-server
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof

View File

@ -1,6 +1,6 @@
The MIT License (MIT)
Copyright (c) 2016 IPFS
Copyright (c) 2017 Protocol Labs, Inc
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -1,4 +1,6 @@
all: deps
server: deps
$(MAKE) -C ipfscluster-server
gx:
go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go

View File

@ -1,9 +1,26 @@
package ipfscluster
import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
)
// Default parameters for the configuration
const (
DefaultConfigCrypto = crypto.RSA
DefaultConfigKeyLength = 2048
DefaultAPIAddr = "127.0.0.1"
DefaultAPIPort = 9094
DefaultIPFSAPIAddr = "127.0.0.1"
DefaultIPFSAPIPort = 9095
DefaultIPFSAddr = "127.0.0.1"
DefaultIPFSPort = 5001
DefaultClusterAddr = "0.0.0.0"
DefaultClusterPort = 9096
)
type Config struct {
@ -39,13 +56,55 @@ type Config struct {
}
func LoadConfig(path string) (*Config, error) {
fmt.Println(path)
config := &Config{}
file, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
json.Unmarshal(file, config)
fmt.Printf("%+v", config)
return config, nil
}
// NewDefaultConfig returns a default configuration object with a randomly
// generated ID and private key.
func NewDefaultConfig() (*Config, error) {
priv, pub, err := crypto.GenerateKeyPair(
DefaultConfigCrypto,
DefaultConfigKeyLength)
if err != nil {
return nil, err
}
pid, err := peer.IDFromPublicKey(pub)
if err != nil {
return nil, err
}
privBytes, err := priv.Bytes()
if err != nil {
return nil, err
}
b64priv := base64.StdEncoding.EncodeToString(privBytes)
return &Config{
ID: peer.IDB58Encode(pid),
PrivateKey: b64priv,
ClusterPeers: []string{},
ClusterAddr: DefaultClusterAddr,
ClusterPort: DefaultClusterPort,
ConsensusDataFolder: "ipfscluster-data",
APIAddr: DefaultAPIAddr,
APIPort: DefaultAPIPort,
IPFSAPIAddr: DefaultIPFSAPIAddr,
IPFSAPIPort: DefaultIPFSAPIPort,
IPFSAddr: DefaultIPFSAddr,
IPFSPort: DefaultIPFSPort,
}, nil
}
func (c *Config) Save(path string) error {
json, err := json.MarshalIndent(c, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(path, json, 0600)
return err
}

View File

@ -163,7 +163,7 @@ func (cc *Consensus) run() {
for {
lai := cc.p2pRaft.raft.AppliedIndex()
li := cc.p2pRaft.raft.LastIndex()
logger.Infof("current Raft index: %d/%d", lai, li)
logger.Debugf("current Raft index: %d/%d", lai, li)
if lai == li {
upToDate <- struct{}{}
break
@ -172,7 +172,6 @@ func (cc *Consensus) run() {
}
}()
logger.Info("consensus state is catching up")
timer := time.NewTimer(FirstSyncDelay)
quitLoop := false
for !quitLoop {
@ -180,6 +179,7 @@ func (cc *Consensus) run() {
case <-timer.C: // Make a first sync
MakeRPC(ctx, cc.rpcCh, NewRPC(StateSyncRPC, nil), false)
case <-upToDate:
logger.Info("consensus is up to date. Triggering state sync.")
MakeRPC(ctx, cc.rpcCh, NewRPC(StateSyncRPC, nil), false)
quitLoop = true
}

View File

@ -1,58 +0,0 @@
package main
import (
"fmt"
"os"
"os/signal"
"os/user"
"path/filepath"
logging "github.com/ipfs/go-log"
ipfscluster "github.com/ipfs/ipfs-cluster"
)
func main() {
logging.SetLogLevel("ipfs-cluster", "debug")
signalChan := make(chan os.Signal, 1)
cleanup := make(chan bool)
signal.Notify(signalChan, os.Interrupt)
usr, _ := user.Current()
home := usr.HomeDir
clusterCfg, err := ipfscluster.LoadConfig(filepath.Join(home, "ipfs-cluster.json"))
if err != nil {
fmt.Println(err)
return
}
api, err := ipfscluster.NewRESTAPI(clusterCfg)
if err != nil {
fmt.Println(err)
return
}
proxy, err := ipfscluster.NewIPFSHTTPConnector(clusterCfg)
if err != nil {
fmt.Println(err)
return
}
state := ipfscluster.NewMapState()
tracker := ipfscluster.NewMapPinTracker(clusterCfg)
remote := ipfscluster.NewLibp2pRemote()
cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker, remote)
if err != nil {
fmt.Println(err)
return
}
go func() {
<-signalChan
fmt.Println("caught signal")
cluster.Shutdown()
cleanup <- true
}()
<-cleanup
}

View File

@ -0,0 +1,4 @@
all: ipfscluster-server
ipfscluster-server:
go build

179
ipfscluster-server/main.go Normal file
View File

@ -0,0 +1,179 @@
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"os/user"
"path/filepath"
logging "github.com/ipfs/go-log"
ipfscluster "github.com/ipfs/ipfs-cluster"
)
// Name of this application
const name = `ipfscluster-server`
// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
%s runs an IPFS Cluster member.
A member is a server node which participates in the cluster consensus, follows
a distributed log of pinning and unpinning operations and manages pinning
operations to a configured IPFS daemon.
This server also provides an API for cluster management, an IPFS Proxy API which
forwards requests to IPFS and a number of components for internal communication
using LibP2P.
%s needs a valid configuration to run. This configuration is
independent from IPFS and includes its own LibP2P key-pair. It can be
initialized with -init and its default location is
~/%s/%s.
For any additional information, visit https://github.com/ipfs/ipfs-cluster.
`,
name,
name,
DefaultPath,
DefaultConfigFile)
// Default location for the configurations and data
var (
// DefaultPath is initialized to something like ~/.ipfs-cluster/server.json
// and holds all the ipfs-cluster data
DefaultPath = ".ipfs-cluster"
// The name of the configuration file inside DefaultPath
DefaultConfigFile = "server.json"
// The name of the data folder inside DefaultPath
DefaultDataFolder = "data"
)
var (
configPath string
dataPath string
)
// Command line flags
var (
initFlag bool
configFlag string
debugFlag bool
logLevelFlag string
)
func init() {
if path := os.Getenv("IPFSCLUSTER_PATH"); path != "" {
DefaultPath = path
} else {
usr, err := user.Current()
if err != nil {
panic("cannot guess the current user")
}
DefaultPath = filepath.Join(
usr.HomeDir,
".ipfs-cluster")
}
configPath = filepath.Join(DefaultPath, DefaultConfigFile)
dataPath = filepath.Join(DefaultPath, DefaultDataFolder)
flag.Usage = func() {
out("Usage: %s [options]\n", name)
out(Description)
out("Options:\n")
flag.PrintDefaults()
out("\n")
}
flag.BoolVar(&initFlag, "init", false,
"create a default configuration and exit")
flag.StringVar(&configFlag, "config", configPath,
"path to the ipfscluster-server configuration file")
flag.BoolVar(&debugFlag, "debug", false,
"enable full debug logs of ipfs cluster and consensus layers")
flag.StringVar(&logLevelFlag, "loglevel", "info",
"set the loglevel [critical, error, warning, notice, info, debug]")
flag.Parse()
}
func out(m string, a ...interface{}) {
fmt.Fprintf(os.Stderr, m, a...)
}
func main() {
// Catch SIGINT as a way to exit
signalChan := make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt)
setupLogging()
setupDebug()
if initFlag {
err := initConfig()
checkErr("creating configuration", err)
os.Exit(0)
}
cfg, err := loadConfig()
checkErr("loading configuration", err)
api, err := ipfscluster.NewRESTAPI(cfg)
checkErr("creating REST API component", err)
proxy, err := ipfscluster.NewIPFSHTTPConnector(cfg)
checkErr("creating IPFS Connector component", err)
state := ipfscluster.NewMapState()
tracker := ipfscluster.NewMapPinTracker(cfg)
remote := ipfscluster.NewLibp2pRemote()
cluster, err := ipfscluster.NewCluster(cfg,
api, proxy, state, tracker, remote)
checkErr("creating IPFS Cluster", err)
// Wait until we are told to exit by a signal
<-signalChan
fmt.Println("aa")
err = cluster.Shutdown()
checkErr("shutting down IPFS Cluster", err)
os.Exit(0)
}
func checkErr(doing string, err error) {
if err != nil {
out("error %s: %s\n", doing, err)
os.Exit(1)
}
}
func setupLogging() {
logging.SetLogLevel("ipfscluster", logLevelFlag)
}
func setupDebug() {
if debugFlag {
logging.SetLogLevel("ipfscluster", "debug")
logging.SetLogLevel("libp2p-raft", "debug")
ipfscluster.SilentRaft = false
}
}
func initConfig() error {
if _, err := os.Stat(configPath); err == nil {
return fmt.Errorf("%s exists. Try deleting it first", configPath)
}
cfg, err := ipfscluster.NewDefaultConfig()
if err != nil {
return err
}
cfg.ConsensusDataFolder = dataPath
err = os.MkdirAll(DefaultPath, 0700)
err = cfg.Save(configPath)
if err != nil {
return err
}
out("%s configuration written to %s",
name, configPath)
return nil
}
func loadConfig() (*ipfscluster.Config, error) {
return ipfscluster.LoadConfig(configPath)
}

View File

@ -18,7 +18,7 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("ipfs-cluster")
var logger = logging.Logger("ipfscluster")
// Current Cluster version.
const Version = "0.0.1"