Hector Sanjuan 6ee0f3bead Issue #45: Detect expired metrics and trigger re-pins
An initial, simple approach to this. The PeerMonitor will
check it's metrics, compare to the current set of peers and put
an alert in the alerts channel if the metrics for a peer have expired.

Cluster reads this channel looking for "ping" alerts. The leader
is in charge of triggering repins in all the Cids allocated to
a given peer.

Also, metrics are now broadcasted to the cluster instead of pushed only
to the leader. Since they happen every few seconds it should be okay
regarding how it scales. Main problem was that if the leader is the node
going down, the new leader will not now about it as it doesn't have any
metrics for it, so it won't trigger an alert. If it acted on that then
the component needs to know it is the leader, or cluster needs to
handle alerts in complicated ways when leadership changes. Detecting
leadership changes or letting a component know who is the leader is another
dependency from the consensus algorithm that should be avoided. Therefore
we broadcast, for the moment.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-03-02 14:59:45 +01:00

304 lines
7.6 KiB

package main
import (
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
ipfscluster "github.com/ipfs/ipfs-cluster"
// ProgramName of this application
const programName = `ipfs-cluster-service`
// We store a commit id here
var commit string
// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
%s runs an IPFS Cluster node.
A node participates in the cluster consensus, follows a distributed log
of pinning and unpinning requests and manages pinning operations to a
configured IPFS daemon.
This node also provides an API for cluster management, an IPFS Proxy API which
forwards requests to IPFS and a number of components for internal communication
using LibP2P. This is a simplified view of the components:
| ipfs-cluster-ctl |
ipfs-cluster-service | HTTP
+----------+--------+--v--+----------------------+ +-------------+
| RPC/Raft | Peer 1 | API | IPFS Connector/Proxy +------> IPFS daemon |
+----^-----+--------+-----+----------------------+ +-------------+
| libp2p
+----v-----+--------+-----+----------------------+ +-------------+
| RPC/Raft | Peer 2 | API | IPFS Connector/Proxy +------> IPFS daemon |
+----^-----+--------+-----+----------------------+ +-------------+
+----v-----+--------+-----+----------------------+ +-------------+
| RPC/Raft | Peer 3 | API | IPFS Connector/Proxy +------> IPFS daemon |
+----------+--------+-----+----------------------+ +-------------+
%s needs a valid configuration to run. This configuration is
independent from IPFS and includes its own LibP2P key-pair. It can be
initialized with "init" and its default location is
For feedback, bug reports or any additional information, visit
Initial configuration:
$ ipfs-cluster-service init
Launch a cluster:
$ ipfs-cluster-service
Launch a peer and join existing cluster:
$ ipfs-cluster-service --bootstrap /ip4/
var logger = logging.Logger("service")
// Default location for the configurations and data
var (
// DefaultPath is initialized to something like ~/.ipfs-cluster/service.json
// and holds all the ipfs-cluster data
DefaultPath = ".ipfs-cluster"
// The name of the configuration file inside DefaultPath
DefaultConfigFile = "service.json"
// The name of the data folder inside DefaultPath
DefaultDataFolder = "data"
var (
configPath string
dataPath string
func init() {
// The only way I could make this work
ipfscluster.Commit = commit
usr, err := user.Current()
if err != nil {
panic("cannot guess the current user")
DefaultPath = filepath.Join(
func out(m string, a ...interface{}) {
fmt.Fprintf(os.Stderr, m, a...)
func checkErr(doing string, err error) {
if err != nil {
out("error %s: %s\n", doing, err)
func main() {
app := cli.NewApp()
app.Name = programName
app.Usage = "IPFS Cluster node"
app.Description = Description
//app.Copyright = "© Protocol Labs, Inc."
app.Version = ipfscluster.Version
app.Flags = []cli.Flag{
Name: "init",
Usage: "create a default configuration and exit",
Hidden: true,
Name: "config, c",
Value: DefaultPath,
Usage: "path to the configuration and data `FOLDER`",
Name: "force, f",
Usage: "forcefully proceed with some actions. i.e. overwriting configuration",
Name: "bootstrap, j",
Usage: "join a cluster providing an existing peer's `multiaddress`. Overrides the \"bootstrap\" values from the configuration",
Name: "leave, x",
Usage: "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"",
Hidden: true,
Name: "debug, d",
Usage: "enable full debug logging (very verbose)",
Name: "loglevel, l",
Value: "info",
Usage: "set the loglevel for cluster only [critical, error, warning, info, debug]",
app.Commands = []cli.Command{
Name: "init",
Usage: "create a default configuration and exit",
Action: func(c *cli.Context) error {
return nil
Name: "run",
Usage: "run the IPFS Cluster peer (default)",
Action: run,
app.Before = func(c *cli.Context) error {
absPath, err := filepath.Abs(c.String("config"))
if err != nil {
return err
configPath = filepath.Join(absPath, DefaultConfigFile)
dataPath = filepath.Join(absPath, DefaultDataFolder)
if c.Bool("debug") {
return nil
app.Action = run
func run(c *cli.Context) error {
if c.Bool("init") {
return nil
cfg, err := loadConfig()
checkErr("loading configuration", err)
if a := c.String("bootstrap"); a != "" {
if len(cfg.ClusterPeers) > 0 && !c.Bool("force") {
return errors.New("the configuration provides ClusterPeers. Use -f to ignore and proceed bootstrapping")
joinAddr, err := ma.NewMultiaddr(a)
if err != nil {
return fmt.Errorf("error parsing multiaddress: %s", err)
cfg.Bootstrap = []ma.Multiaddr{joinAddr}
cfg.ClusterPeers = []ma.Multiaddr{}
if c.Bool("leave") {
cfg.LeaveOnShutdown = true
api, err := ipfscluster.NewRESTAPI(cfg)
checkErr("creating REST API component", err)
proxy, err := ipfscluster.NewIPFSHTTPConnector(cfg)
checkErr("creating IPFS Connector component", err)
state := mapstate.NewMapState()
tracker := ipfscluster.NewMapPinTracker(cfg)
mon := ipfscluster.NewStdPeerMonitor(cfg)
informer := numpin.NewInformer()
alloc := numpinalloc.NewAllocator()
cluster, err := ipfscluster.NewCluster(
checkErr("starting cluster", err)
signalChan := make(chan os.Signal, 20)
signal.Notify(signalChan, os.Interrupt)
for {
select {
case <-signalChan:
err = cluster.Shutdown()
checkErr("shutting down cluster", err)
case <-cluster.Done():
return nil
case <-cluster.Ready():
func setupLogging(lvl string) {
ipfscluster.SetFacilityLogLevel("service", lvl)
ipfscluster.SetFacilityLogLevel("cluster", lvl)
//ipfscluster.SetFacilityLogLevel("raft", lvl)
func setupDebug() {
l := "DEBUG"
ipfscluster.SetFacilityLogLevel("cluster", l)
ipfscluster.SetFacilityLogLevel("raft", l)
ipfscluster.SetFacilityLogLevel("p2p-gorpc", l)
//SetFacilityLogLevel("swarm2", l)
//SetFacilityLogLevel("libp2p-raft", l)
func initConfig(force bool) {
if _, err := os.Stat(configPath); err == nil && !force {
err := fmt.Errorf("%s exists. Try running with -f", configPath)
checkErr("", err)
cfg, err := ipfscluster.NewDefaultConfig()
checkErr("creating default configuration", err)
cfg.ConsensusDataFolder = dataPath
err = os.MkdirAll(filepath.Dir(configPath), 0700)
err = cfg.Save(configPath)
checkErr("saving new configuration", err)
out("%s configuration written to %s\n",
programName, configPath)
func loadConfig() (*ipfscluster.Config, error) {
return ipfscluster.LoadConfig(configPath)