2017-03-14 15:37:29 +00:00
|
|
|
// Package ipfshttp implements an IPFS Cluster IPFSConnector component. It
|
|
|
|
// uses the IPFS HTTP API to communicate to IPFS.
|
2017-03-10 14:29:11 +00:00
|
|
|
package ipfshttp
|
2016-12-02 18:33:39 +00:00
|
|
|
|
|
|
|
import (
|
2017-03-29 20:52:13 +00:00
|
|
|
"bytes"
|
2016-12-02 18:33:39 +00:00
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
2016-12-05 15:24:41 +00:00
|
|
|
"net"
|
2016-12-02 18:33:39 +00:00
|
|
|
"net/http"
|
2017-03-29 20:52:13 +00:00
|
|
|
"net/url"
|
|
|
|
"path/filepath"
|
2016-12-02 18:33:39 +00:00
|
|
|
"strings"
|
2016-12-15 13:07:19 +00:00
|
|
|
"sync"
|
2016-12-22 10:31:09 +00:00
|
|
|
"time"
|
2016-12-02 18:33:39 +00:00
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
|
|
|
2017-10-13 16:44:43 +00:00
|
|
|
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
|
|
logging "github.com/ipfs/go-log"
|
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
|
|
manet "github.com/multiformats/go-multiaddr-net"
|
2016-12-02 18:33:39 +00:00
|
|
|
)
|
|
|
|
|
2017-03-10 14:29:11 +00:00
|
|
|
var logger = logging.Logger("ipfshttp")
|
|
|
|
|
2017-03-14 15:37:29 +00:00
|
|
|
// Connector implements the IPFSConnector interface
|
2016-12-02 18:33:39 +00:00
|
|
|
// and provides a component which does two tasks:
|
|
|
|
//
|
|
|
|
// On one side, it proxies HTTP requests to the configured IPFS
|
|
|
|
// daemon. It is able to intercept these requests though, and
|
|
|
|
// perform extra operations on them.
|
|
|
|
//
|
|
|
|
// On the other side, it is used to perform on-demand requests
|
|
|
|
// against the configured IPFS daemom (such as a pin request).
|
2017-03-14 15:37:29 +00:00
|
|
|
type Connector struct {
|
2017-03-02 12:57:37 +00:00
|
|
|
ctx context.Context
|
|
|
|
cancel func()
|
|
|
|
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
config *Config
|
|
|
|
nodeAddr string
|
2016-12-23 18:35:37 +00:00
|
|
|
|
|
|
|
handlers map[string]func(http.ResponseWriter, *http.Request)
|
|
|
|
|
|
|
|
rpcClient *rpc.Client
|
|
|
|
rpcReady chan struct{}
|
2016-12-05 15:24:41 +00:00
|
|
|
|
2016-12-09 19:54:46 +00:00
|
|
|
listener net.Listener
|
|
|
|
server *http.Server
|
|
|
|
|
2016-12-15 13:07:19 +00:00
|
|
|
shutdownLock sync.Mutex
|
|
|
|
shutdown bool
|
|
|
|
wg sync.WaitGroup
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type ipfsError struct {
|
|
|
|
Message string
|
|
|
|
}
|
|
|
|
|
2017-01-26 21:49:53 +00:00
|
|
|
type ipfsPinType struct {
|
|
|
|
Type string
|
|
|
|
}
|
|
|
|
|
|
|
|
type ipfsPinLsResp struct {
|
|
|
|
Keys map[string]ipfsPinType
|
|
|
|
}
|
|
|
|
|
|
|
|
type ipfsPinOpResp struct {
|
|
|
|
Pins []string
|
2017-01-25 17:07:19 +00:00
|
|
|
}
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
type ipfsIDResp struct {
|
|
|
|
ID string
|
|
|
|
Addresses []string
|
|
|
|
}
|
|
|
|
|
2017-03-27 13:07:12 +00:00
|
|
|
type ipfsRepoStatResp struct {
|
2017-10-26 11:45:33 +00:00
|
|
|
RepoSize uint64
|
|
|
|
StorageMax uint64
|
|
|
|
NumObjects uint64
|
2017-03-27 13:07:12 +00:00
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
type ipfsAddResp struct {
|
2017-11-13 12:52:33 +00:00
|
|
|
Name string
|
|
|
|
Hash string
|
|
|
|
Bytes uint64
|
2017-03-29 20:52:13 +00:00
|
|
|
}
|
|
|
|
|
2017-03-14 15:37:29 +00:00
|
|
|
// NewConnector creates the component and leaves it ready to be started
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
func NewConnector(cfg *Config) (*Connector, error) {
|
|
|
|
err := cfg.Validate()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
_, nodeAddr, err := manet.DialArgs(cfg.NodeAddr)
|
2017-01-23 17:38:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
proxyNet, proxyAddr, err := manet.DialArgs(cfg.ProxyAddr)
|
2017-01-23 17:38:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-03-16 14:51:24 +00:00
|
|
|
l, err := net.Listen(proxyNet, proxyAddr)
|
2016-12-05 15:24:41 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-12-09 19:54:46 +00:00
|
|
|
|
|
|
|
smux := http.NewServeMux()
|
|
|
|
s := &http.Server{
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
ReadTimeout: cfg.ProxyReadTimeout,
|
|
|
|
WriteTimeout: cfg.ProxyWriteTimeout,
|
|
|
|
ReadHeaderTimeout: cfg.ProxyReadHeaderTimeout,
|
|
|
|
IdleTimeout: cfg.ProxyIdleTimeout,
|
|
|
|
Handler: smux,
|
2016-12-09 19:54:46 +00:00
|
|
|
}
|
|
|
|
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
|
|
|
|
|
2017-03-02 12:57:37 +00:00
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
2017-03-14 15:37:29 +00:00
|
|
|
ipfs := &Connector{
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
ctx: ctx,
|
|
|
|
config: cfg,
|
|
|
|
cancel: cancel,
|
|
|
|
nodeAddr: nodeAddr,
|
|
|
|
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
|
|
|
|
rpcReady: make(chan struct{}, 1),
|
|
|
|
listener: l,
|
|
|
|
server: s,
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2016-12-09 19:54:46 +00:00
|
|
|
smux.HandleFunc("/", ipfs.handle)
|
2017-01-26 21:49:53 +00:00
|
|
|
ipfs.handlers["/api/v0/pin/add"] = ipfs.pinHandler
|
|
|
|
ipfs.handlers["/api/v0/pin/rm"] = ipfs.unpinHandler
|
|
|
|
ipfs.handlers["/api/v0/pin/ls"] = ipfs.pinLsHandler
|
2017-03-29 20:52:13 +00:00
|
|
|
ipfs.handlers["/api/v0/add"] = ipfs.addHandler
|
2016-12-09 19:54:46 +00:00
|
|
|
|
2017-03-23 18:34:33 +00:00
|
|
|
go ipfs.run()
|
2016-12-02 18:33:39 +00:00
|
|
|
return ipfs, nil
|
|
|
|
}
|
|
|
|
|
2017-03-23 18:34:33 +00:00
|
|
|
// launches proxy and connects all ipfs daemons when
|
|
|
|
// we receive the rpcReady signal.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) run() {
|
2017-03-23 18:34:33 +00:00
|
|
|
<-ipfs.rpcReady
|
|
|
|
|
2017-01-26 21:49:53 +00:00
|
|
|
// This launches the proxy
|
|
|
|
ipfs.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer ipfs.wg.Done()
|
|
|
|
logger.Infof("IPFS Proxy: %s -> %s",
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
ipfs.config.ProxyAddr,
|
|
|
|
ipfs.config.NodeAddr)
|
2017-03-23 18:34:33 +00:00
|
|
|
err := ipfs.server.Serve(ipfs.listener) // hangs here
|
2017-01-26 21:49:53 +00:00
|
|
|
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
|
|
|
logger.Error(err)
|
|
|
|
}
|
|
|
|
}()
|
2017-03-23 18:34:33 +00:00
|
|
|
|
|
|
|
// This runs ipfs swarm connect to the daemons of other cluster members
|
|
|
|
ipfs.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer ipfs.wg.Done()
|
|
|
|
|
|
|
|
// It does not hurt to wait a little bit. i.e. think cluster
|
|
|
|
// peers which are started at the same time as the ipfs
|
|
|
|
// daemon...
|
Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:
* Each component is initialized with a configuration object
defined by its module
* Each component decides how the JSON representation of its
configuration looks like
* Each component parses and validates its own configuration
* Each component exposes its own defaults
* Component configurations are make the sections of a
central JSON configuration file (which replaces the current
JSON format)
* Component configurations implement a common interface
(config.ComponentConfig) with a set of common operations
* The central configuration file is managed by a
config.ConfigManager which:
* Registers ComponentConfigs
* Assigns the correspondent sections from the JSON file to each
component and delegates the parsing
* Delegates the JSON generation for each section
* Can be notified when the configuration is updated and must be
saved to disk
The new service.json would then look as follows:
```json
{
"cluster": {
"id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
"private_key": "<...>",
"secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
"peers": [],
"bootstrap": [],
"leave_on_shutdown": false,
"listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"replication_factor": -1,
"monitor_ping_interval": "15s"
},
"consensus": {
"raft": {
"heartbeat_timeout": "1s",
"election_timeout": "1s",
"commit_timeout": "50ms",
"max_append_entries": 64,
"trailing_logs": 10240,
"snapshot_interval": "2m0s",
"snapshot_threshold": 8192,
"leader_lease_timeout": "500ms"
}
},
"api": {
"restapi": {
"listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
"read_timeout": "30s",
"read_header_timeout": "5s",
"write_timeout": "1m0s",
"idle_timeout": "2m0s"
}
},
"ipfs_connector": {
"ipfshttp": {
"proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
"node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
"connect_swarms_delay": "7s",
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
}
},
"monitor": {
"monbasic": {
"check_interval": "15s"
}
},
"informer": {
"disk": {
"metric_ttl": "30s",
"metric_type": "freespace"
},
"numpin": {
"metric_ttl": "10s"
}
}
}
```
This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.
Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-11 18:23:03 +00:00
|
|
|
tmr := time.NewTimer(ipfs.config.ConnectSwarmsDelay)
|
2017-03-23 18:34:33 +00:00
|
|
|
defer tmr.Stop()
|
|
|
|
select {
|
|
|
|
case <-tmr.C:
|
|
|
|
ipfs.ConnectSwarms()
|
|
|
|
case <-ipfs.ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}()
|
2017-01-26 21:49:53 +00:00
|
|
|
}
|
|
|
|
|
2016-12-02 18:33:39 +00:00
|
|
|
// This will run a custom handler if we have one for a URL.Path, or
|
|
|
|
// otherwise just proxy the requests.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) handle(w http.ResponseWriter, r *http.Request) {
|
2016-12-02 18:33:39 +00:00
|
|
|
if customHandler, ok := ipfs.handlers[r.URL.Path]; ok {
|
|
|
|
customHandler(w, r)
|
|
|
|
} else {
|
|
|
|
ipfs.defaultHandler(w, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
func (ipfs *Connector) proxyRequest(r *http.Request) (*http.Response, error) {
|
2016-12-02 18:33:39 +00:00
|
|
|
newURL := *r.URL
|
2017-03-16 14:51:24 +00:00
|
|
|
newURL.Host = ipfs.nodeAddr
|
2016-12-02 18:33:39 +00:00
|
|
|
newURL.Scheme = "http"
|
|
|
|
|
|
|
|
proxyReq, err := http.NewRequest(r.Method, newURL.String(), r.Body)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error("error creating proxy request: ", err)
|
2017-03-29 20:52:13 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range r.Header {
|
|
|
|
for _, s := range v {
|
|
|
|
proxyReq.Header.Add(k, s)
|
|
|
|
}
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
res, err := http.DefaultTransport.RoundTrip(proxyReq)
|
2016-12-02 18:33:39 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Error("error forwarding request: ", err)
|
2017-03-29 20:52:13 +00:00
|
|
|
return nil, err
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
2017-03-29 20:52:13 +00:00
|
|
|
return res, nil
|
|
|
|
}
|
2016-12-02 18:33:39 +00:00
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
// Writes a response to a ResponseWriter using the given body
|
|
|
|
// (which maybe resp.Body or a copy if it was already used).
|
|
|
|
func (ipfs *Connector) proxyResponse(w http.ResponseWriter, res *http.Response, body io.Reader) {
|
2016-12-02 18:33:39 +00:00
|
|
|
// Set response headers
|
2017-03-29 20:52:13 +00:00
|
|
|
for k, v := range res.Header {
|
2016-12-02 18:33:39 +00:00
|
|
|
for _, s := range v {
|
|
|
|
w.Header().Add(k, s)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
w.WriteHeader(res.StatusCode)
|
|
|
|
|
|
|
|
// And copy body
|
|
|
|
io.Copy(w, body)
|
|
|
|
}
|
|
|
|
|
|
|
|
// defaultHandler just proxies the requests.
|
|
|
|
func (ipfs *Connector) defaultHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
res, err := ipfs.proxyRequest(r)
|
|
|
|
if err != nil {
|
|
|
|
http.Error(w, "error forwarding request: "+err.Error(), 500)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
ipfs.proxyResponse(w, res, res.Body)
|
|
|
|
res.Body.Close()
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2017-01-26 21:49:53 +00:00
|
|
|
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
|
2017-03-29 20:52:13 +00:00
|
|
|
res := ipfsError{errMsg}
|
|
|
|
resBytes, _ := json.Marshal(res)
|
2017-04-06 02:27:02 +00:00
|
|
|
w.Header().Add("Content-Type", "application/json")
|
2017-01-26 21:49:53 +00:00
|
|
|
w.WriteHeader(http.StatusInternalServerError)
|
2017-03-29 20:52:13 +00:00
|
|
|
w.Write(resBytes)
|
2017-01-26 21:49:53 +00:00
|
|
|
return
|
|
|
|
}
|
2016-12-23 18:35:37 +00:00
|
|
|
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
|
2017-01-26 21:49:53 +00:00
|
|
|
argA := r.URL.Query()["arg"]
|
|
|
|
if len(argA) == 0 {
|
|
|
|
ipfsErrorResponder(w, "Error: bad argument")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
arg := argA[0]
|
|
|
|
_, err := cid.Decode(arg)
|
|
|
|
if err != nil {
|
|
|
|
ipfsErrorResponder(w, "Error parsing CID: "+err.Error())
|
|
|
|
return
|
|
|
|
}
|
2016-12-23 18:35:37 +00:00
|
|
|
|
2017-01-26 21:49:53 +00:00
|
|
|
err = ipfs.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
op,
|
2017-03-08 15:57:27 +00:00
|
|
|
api.PinSerial{
|
2017-02-13 15:46:53 +00:00
|
|
|
Cid: arg,
|
|
|
|
},
|
2017-01-26 21:49:53 +00:00
|
|
|
&struct{}{})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
ipfsErrorResponder(w, err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
res := ipfsPinOpResp{
|
2017-01-26 21:49:53 +00:00
|
|
|
Pins: []string{arg},
|
|
|
|
}
|
2017-03-29 20:52:13 +00:00
|
|
|
resBytes, _ := json.Marshal(res)
|
2017-04-06 02:27:02 +00:00
|
|
|
w.Header().Add("Content-Type", "application/json")
|
2017-01-26 21:49:53 +00:00
|
|
|
w.WriteHeader(http.StatusOK)
|
2017-03-29 20:52:13 +00:00
|
|
|
w.Write(resBytes)
|
2017-01-26 21:49:53 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) pinHandler(w http.ResponseWriter, r *http.Request) {
|
2017-01-26 21:49:53 +00:00
|
|
|
ipfs.pinOpHandler("Pin", w, r)
|
|
|
|
}
|
|
|
|
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) unpinHandler(w http.ResponseWriter, r *http.Request) {
|
2017-01-26 21:49:53 +00:00
|
|
|
ipfs.pinOpHandler("Unpin", w, r)
|
|
|
|
}
|
|
|
|
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
|
2017-01-26 21:49:53 +00:00
|
|
|
pinLs := ipfsPinLsResp{}
|
|
|
|
pinLs.Keys = make(map[string]ipfsPinType)
|
|
|
|
|
2017-04-06 02:27:02 +00:00
|
|
|
q := r.URL.Query()
|
|
|
|
arg := q.Get("arg")
|
|
|
|
if arg != "" {
|
|
|
|
c, err := cid.Decode(arg)
|
|
|
|
if err != nil {
|
|
|
|
ipfsErrorResponder(w, err.Error())
|
2017-01-26 21:49:53 +00:00
|
|
|
return
|
|
|
|
}
|
2017-04-06 02:27:02 +00:00
|
|
|
var pin api.PinSerial
|
|
|
|
err = ipfs.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"PinGet",
|
|
|
|
api.PinCid(c).ToSerial(),
|
|
|
|
&pin)
|
|
|
|
if err != nil {
|
2017-01-26 21:49:53 +00:00
|
|
|
ipfsErrorResponder(w, fmt.Sprintf(
|
|
|
|
"Error: path '%s' is not pinned",
|
|
|
|
arg))
|
|
|
|
return
|
|
|
|
}
|
2017-04-06 02:27:02 +00:00
|
|
|
pinLs.Keys[pin.Cid] = ipfsPinType{
|
|
|
|
Type: "recursive",
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
var pins []api.PinSerial
|
|
|
|
err := ipfs.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"Pins",
|
|
|
|
struct{}{},
|
|
|
|
&pins)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
ipfsErrorResponder(w, err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, pin := range pins {
|
|
|
|
pinLs.Keys[pin.Cid] = ipfsPinType{
|
|
|
|
Type: "recursive",
|
|
|
|
}
|
|
|
|
}
|
2017-01-26 21:49:53 +00:00
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
resBytes, _ := json.Marshal(pinLs)
|
2017-04-06 02:27:02 +00:00
|
|
|
w.Header().Add("Content-Type", "application/json")
|
2017-01-26 21:49:53 +00:00
|
|
|
w.WriteHeader(http.StatusOK)
|
2017-03-29 20:52:13 +00:00
|
|
|
w.Write(resBytes)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ipfs *Connector) addHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
// Handle some request options
|
|
|
|
q := r.URL.Query()
|
|
|
|
// Remember if the user does not want cluster/ipfs to pin
|
|
|
|
doNotPin := q.Get("pin") == "false"
|
|
|
|
// make sure the local peer does not pin.
|
|
|
|
// Cluster will decide where to pin based on metrics and current
|
|
|
|
// allocations.
|
|
|
|
q.Set("pin", "false")
|
|
|
|
r.URL.RawQuery = q.Encode()
|
|
|
|
|
|
|
|
res, err := ipfs.proxyRequest(r)
|
|
|
|
if err != nil {
|
|
|
|
http.Error(w, "error forwarding request: "+err.Error(), 500)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
|
|
|
|
|
|
// Shortcut some cases where there is nothing else to do
|
|
|
|
if scode := res.StatusCode; scode != http.StatusOK {
|
|
|
|
logger.Warningf("proxy /add request returned %d", scode)
|
|
|
|
ipfs.proxyResponse(w, res, res.Body)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if doNotPin {
|
|
|
|
logger.Debug("proxy /add requests has pin==false")
|
|
|
|
ipfs.proxyResponse(w, res, res.Body)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// The ipfs-add response is a streaming-like body where
|
|
|
|
// { "Name" : "filename", "Hash": "cid" } objects are provided
|
|
|
|
// for every added object.
|
|
|
|
|
|
|
|
// We will need to re-read the response in order to re-play it to
|
|
|
|
// the client at the end, therefore we make a copy in bodyCopy
|
|
|
|
// while decoding.
|
|
|
|
bodyCopy := new(bytes.Buffer)
|
|
|
|
bodyReader := io.TeeReader(res.Body, bodyCopy)
|
|
|
|
|
|
|
|
ipfsAddResps := []ipfsAddResp{}
|
|
|
|
dec := json.NewDecoder(bodyReader)
|
|
|
|
for dec.More() {
|
|
|
|
var addResp ipfsAddResp
|
|
|
|
err := dec.Decode(&addResp)
|
|
|
|
if err != nil {
|
|
|
|
http.Error(w, "error decoding response: "+err.Error(), 502)
|
|
|
|
return
|
|
|
|
}
|
2017-11-13 12:52:33 +00:00
|
|
|
if addResp.Bytes != 0 {
|
|
|
|
// This is a progress notification, so we ignore it
|
|
|
|
continue
|
|
|
|
}
|
2017-03-29 20:52:13 +00:00
|
|
|
ipfsAddResps = append(ipfsAddResps, addResp)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(ipfsAddResps) == 0 {
|
|
|
|
logger.Warning("proxy /add request response was OK but empty")
|
|
|
|
ipfs.proxyResponse(w, res, bodyCopy)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// An ipfs-add call can add multiple files and pin multiple items.
|
|
|
|
// The go-ipfs api is not perfectly behaved here (i.e. when passing in
|
|
|
|
// two directories to pin). There is no easy way to know for sure what
|
|
|
|
// has been pinned recursively and what not.
|
|
|
|
// Usually when pinning a directory, the recursive pin comes last.
|
|
|
|
// But we may just be pinning different files and no directories.
|
|
|
|
// In that case, we need to recursively pin them separately.
|
|
|
|
// decideRecursivePins() takes a conservative approach. It
|
|
|
|
// works on the regular use-cases. Otherwise, it might pin
|
|
|
|
// more things than it should.
|
|
|
|
pinHashes := decideRecursivePins(ipfsAddResps, r.URL.Query())
|
|
|
|
|
|
|
|
logger.Debugf("proxy /add request and will pin %s", pinHashes)
|
|
|
|
for _, pin := range pinHashes {
|
|
|
|
err := ipfs.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"Pin",
|
|
|
|
api.PinSerial{
|
|
|
|
Cid: pin,
|
|
|
|
},
|
|
|
|
&struct{}{})
|
|
|
|
if err != nil {
|
|
|
|
// we need to fail the operation and make sure the
|
|
|
|
// user knows about it.
|
|
|
|
msg := "add operation was successful but "
|
2017-12-06 14:00:01 +00:00
|
|
|
msg += "an error occurred performing the cluster "
|
2017-03-29 20:52:13 +00:00
|
|
|
msg += "pin operation: " + err.Error()
|
|
|
|
logger.Error(msg)
|
|
|
|
http.Error(w, msg, 500)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Finally, send the original response back
|
|
|
|
ipfs.proxyResponse(w, res, bodyCopy)
|
|
|
|
}
|
|
|
|
|
|
|
|
// decideRecursivePins takes the answers from ipfsAddResp and
|
|
|
|
// figures out which of the pinned items need to be pinned
|
|
|
|
// recursively in cluster. That is, it guesses which items
|
|
|
|
// ipfs would have pinned recursively.
|
|
|
|
// When adding multiple files+directories, it may end up
|
|
|
|
// pinning more than it should because ipfs API does not
|
|
|
|
// behave well in these cases.
|
|
|
|
// It should work well for regular usecases: pin 1 file,
|
|
|
|
// pin 1 directory, pin several files.
|
|
|
|
func decideRecursivePins(added []ipfsAddResp, q url.Values) []string {
|
|
|
|
// When wrap-in-directory, return last element only.
|
|
|
|
_, ok := q["wrap-in-directory"]
|
|
|
|
if ok && q.Get("wrap-in-directory") == "true" {
|
|
|
|
return []string{
|
|
|
|
added[len(added)-1].Hash,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
toPin := []string{}
|
|
|
|
baseFolders := make(map[string]struct{})
|
|
|
|
// Guess base folder names
|
|
|
|
baseFolder := func(path string) string {
|
|
|
|
slashed := filepath.ToSlash(path)
|
|
|
|
parts := strings.Split(slashed, "/")
|
|
|
|
if len(parts) == 0 {
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
if parts[0] == "" && len(parts) > 1 {
|
|
|
|
return parts[1]
|
|
|
|
}
|
|
|
|
return parts[0]
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, add := range added {
|
|
|
|
if add.Hash == "" {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
b := baseFolder(add.Name)
|
|
|
|
if b != "" {
|
|
|
|
baseFolders[b] = struct{}{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, add := range added {
|
|
|
|
if add.Hash == "" {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
_, ok := baseFolders[add.Name]
|
|
|
|
if ok { // it's a base folder, pin it
|
|
|
|
toPin = append(toPin, add.Hash)
|
|
|
|
} else { // otherwise, pin if there is no
|
|
|
|
// basefolder to it.
|
|
|
|
b := baseFolder(add.Name)
|
|
|
|
_, ok := baseFolders[b]
|
|
|
|
if !ok {
|
|
|
|
toPin = append(toPin, add.Hash)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return toPin
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
// SetClient makes the component ready to perform RPC
|
|
|
|
// requests.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) SetClient(c *rpc.Client) {
|
2016-12-23 18:35:37 +00:00
|
|
|
ipfs.rpcClient = c
|
|
|
|
ipfs.rpcReady <- struct{}{}
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown stops any listeners and stops the component from taking
|
|
|
|
// any requests.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) Shutdown() error {
|
2016-12-15 13:07:19 +00:00
|
|
|
ipfs.shutdownLock.Lock()
|
|
|
|
defer ipfs.shutdownLock.Unlock()
|
|
|
|
|
|
|
|
if ipfs.shutdown {
|
|
|
|
logger.Debug("already shutdown")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Info("stopping IPFS Proxy")
|
2016-12-15 13:07:19 +00:00
|
|
|
|
2017-03-02 12:57:37 +00:00
|
|
|
ipfs.cancel()
|
2016-12-23 18:35:37 +00:00
|
|
|
close(ipfs.rpcReady)
|
2016-12-09 19:54:46 +00:00
|
|
|
ipfs.server.SetKeepAlivesEnabled(false)
|
2016-12-05 15:24:41 +00:00
|
|
|
ipfs.listener.Close()
|
2016-12-15 13:07:19 +00:00
|
|
|
|
|
|
|
ipfs.wg.Wait()
|
|
|
|
ipfs.shutdown = true
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
// ID performs an ID request against the configured
|
|
|
|
// IPFS daemon. It returns the fetched information.
|
|
|
|
// If the request fails, or the parsing fails, it
|
|
|
|
// returns an error and an empty IPFSID which also
|
|
|
|
// contains the error message.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) ID() (api.IPFSID, error) {
|
2017-02-08 17:04:08 +00:00
|
|
|
id := api.IPFSID{}
|
2017-10-13 16:38:18 +00:00
|
|
|
body, err := ipfs.post("id")
|
2017-01-26 18:59:31 +00:00
|
|
|
if err != nil {
|
|
|
|
id.Error = err.Error()
|
|
|
|
return id, err
|
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
var res ipfsIDResp
|
|
|
|
err = json.Unmarshal(body, &res)
|
2017-01-26 18:59:31 +00:00
|
|
|
if err != nil {
|
|
|
|
id.Error = err.Error()
|
|
|
|
return id, err
|
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
pID, err := peer.IDB58Decode(res.ID)
|
2017-01-26 18:59:31 +00:00
|
|
|
if err != nil {
|
|
|
|
id.Error = err.Error()
|
|
|
|
return id, err
|
|
|
|
}
|
|
|
|
id.ID = pID
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
mAddrs := make([]ma.Multiaddr, len(res.Addresses), len(res.Addresses))
|
|
|
|
for i, strAddr := range res.Addresses {
|
2017-01-26 18:59:31 +00:00
|
|
|
mAddr, err := ma.NewMultiaddr(strAddr)
|
|
|
|
if err != nil {
|
|
|
|
id.Error = err.Error()
|
|
|
|
return id, err
|
|
|
|
}
|
|
|
|
mAddrs[i] = mAddr
|
|
|
|
}
|
|
|
|
id.Addresses = mAddrs
|
|
|
|
return id, nil
|
|
|
|
}
|
|
|
|
|
2016-12-02 18:33:39 +00:00
|
|
|
// Pin performs a pin request against the configured IPFS
|
|
|
|
// daemon.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) Pin(hash *cid.Cid) error {
|
2017-01-25 17:07:19 +00:00
|
|
|
pinStatus, err := ipfs.PinLsCid(hash)
|
2016-12-05 14:30:11 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
if !pinStatus.IsPinned() {
|
2016-12-02 18:33:39 +00:00
|
|
|
path := fmt.Sprintf("pin/add?arg=%s", hash)
|
2017-10-13 16:38:18 +00:00
|
|
|
_, err = ipfs.post(path)
|
2016-12-19 17:35:24 +00:00
|
|
|
if err == nil {
|
|
|
|
logger.Info("IPFS Pin request succeeded: ", hash)
|
|
|
|
}
|
2016-12-02 18:33:39 +00:00
|
|
|
return err
|
|
|
|
}
|
2017-01-23 22:58:04 +00:00
|
|
|
logger.Debug("IPFS object is already pinned: ", hash)
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
// Unpin performs an unpin request against the configured IPFS
|
2016-12-02 18:33:39 +00:00
|
|
|
// daemon.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) Unpin(hash *cid.Cid) error {
|
2017-01-25 17:07:19 +00:00
|
|
|
pinStatus, err := ipfs.PinLsCid(hash)
|
2016-12-05 14:30:11 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
if pinStatus.IsPinned() {
|
2016-12-02 18:33:39 +00:00
|
|
|
path := fmt.Sprintf("pin/rm?arg=%s", hash)
|
2017-10-13 16:38:18 +00:00
|
|
|
_, err := ipfs.post(path)
|
2016-12-19 17:35:24 +00:00
|
|
|
if err == nil {
|
|
|
|
logger.Info("IPFS Unpin request succeeded:", hash)
|
|
|
|
}
|
2016-12-02 18:33:39 +00:00
|
|
|
return err
|
|
|
|
}
|
2016-12-07 16:21:29 +00:00
|
|
|
|
2017-01-23 22:58:04 +00:00
|
|
|
logger.Debug("IPFS object is already unpinned: ", hash)
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-02-09 15:29:17 +00:00
|
|
|
// PinLs performs a "pin ls --type typeFilter" request against the configured
|
|
|
|
// IPFS daemon and returns a map of cid strings and their status.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) {
|
2017-10-13 16:38:18 +00:00
|
|
|
body, err := ipfs.post("pin/ls?type=" + typeFilter)
|
2017-01-25 17:07:19 +00:00
|
|
|
|
|
|
|
// Some error talking to the daemon
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
var res ipfsPinLsResp
|
|
|
|
err = json.Unmarshal(body, &res)
|
2016-12-07 16:21:29 +00:00
|
|
|
if err != nil {
|
2017-01-25 17:07:19 +00:00
|
|
|
logger.Error("parsing pin/ls response")
|
|
|
|
logger.Error(string(body))
|
|
|
|
return nil, err
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
statusMap := make(map[string]api.IPFSPinStatus)
|
2017-03-29 20:52:13 +00:00
|
|
|
for k, v := range res.Keys {
|
2017-02-08 17:04:08 +00:00
|
|
|
statusMap[k] = api.IPFSPinStatusFromString(v.Type)
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
return statusMap, nil
|
2016-12-07 16:21:29 +00:00
|
|
|
}
|
|
|
|
|
2017-03-21 11:57:09 +00:00
|
|
|
// PinLsCid performs a "pin ls --type=recursive <hash> "request and returns
|
|
|
|
// an api.IPFSPinStatus for that hash.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) {
|
2017-03-21 11:57:09 +00:00
|
|
|
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash)
|
2017-10-13 16:38:18 +00:00
|
|
|
body, err := ipfs.post(lsPath)
|
2016-12-05 14:30:11 +00:00
|
|
|
|
|
|
|
// Network error, daemon down
|
|
|
|
if body == nil && err != nil {
|
2017-02-08 17:04:08 +00:00
|
|
|
return api.IPFSPinStatusError, err
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2016-12-05 14:30:11 +00:00
|
|
|
// Pin not found likely here
|
|
|
|
if err != nil { // Not pinned
|
2017-02-08 17:04:08 +00:00
|
|
|
return api.IPFSPinStatusUnpinned, nil
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
var res ipfsPinLsResp
|
|
|
|
err = json.Unmarshal(body, &res)
|
2016-12-02 18:33:39 +00:00
|
|
|
if err != nil {
|
2017-01-25 17:07:19 +00:00
|
|
|
logger.Error("parsing pin/ls?arg=cid response:")
|
2016-12-08 16:24:38 +00:00
|
|
|
logger.Error(string(body))
|
2017-02-08 17:04:08 +00:00
|
|
|
return api.IPFSPinStatusError, err
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
2017-03-29 20:52:13 +00:00
|
|
|
pinObj, ok := res.Keys[hash.String()]
|
2016-12-02 18:33:39 +00:00
|
|
|
if !ok {
|
2017-02-08 17:04:08 +00:00
|
|
|
return api.IPFSPinStatusError, errors.New("expected to find the pin in the response")
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
|
2017-02-08 17:04:08 +00:00
|
|
|
return api.IPFSPinStatusFromString(pinObj.Type), nil
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2017-10-13 16:38:18 +00:00
|
|
|
// post performs the heavy lifting of a post request against
|
2016-12-02 18:33:39 +00:00
|
|
|
// the IPFS daemon.
|
2017-10-13 16:38:18 +00:00
|
|
|
func (ipfs *Connector) post(path string) ([]byte, error) {
|
|
|
|
logger.Debugf("posting %s", path)
|
2016-12-02 18:33:39 +00:00
|
|
|
url := fmt.Sprintf("%s/%s",
|
|
|
|
ipfs.apiURL(),
|
|
|
|
path)
|
|
|
|
|
2017-10-13 16:38:18 +00:00
|
|
|
res, err := http.Post(url, "", nil)
|
2016-12-02 18:33:39 +00:00
|
|
|
if err != nil {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Error("error getting:", err)
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
2017-03-29 20:52:13 +00:00
|
|
|
defer res.Body.Close()
|
|
|
|
body, err := ioutil.ReadAll(res.Body)
|
2016-12-02 18:33:39 +00:00
|
|
|
if err != nil {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Errorf("error reading response body: %s", err)
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var ipfsErr ipfsError
|
|
|
|
decodeErr := json.Unmarshal(body, &ipfsErr)
|
|
|
|
|
2017-03-29 20:52:13 +00:00
|
|
|
if res.StatusCode != http.StatusOK {
|
2016-12-02 18:33:39 +00:00
|
|
|
var msg string
|
|
|
|
if decodeErr == nil {
|
2016-12-15 13:07:19 +00:00
|
|
|
msg = fmt.Sprintf("IPFS unsuccessful: %d: %s",
|
2017-03-29 20:52:13 +00:00
|
|
|
res.StatusCode, ipfsErr.Message)
|
2016-12-02 18:33:39 +00:00
|
|
|
} else {
|
2017-02-13 15:46:53 +00:00
|
|
|
msg = fmt.Sprintf("IPFS-get '%s' unsuccessful: %d: %s",
|
2017-03-29 20:52:13 +00:00
|
|
|
path, res.StatusCode, body)
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
2017-03-14 16:32:00 +00:00
|
|
|
|
2016-12-05 14:30:11 +00:00
|
|
|
return body, errors.New(msg)
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
return body, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// apiURL is a short-hand for building the url of the IPFS
|
|
|
|
// daemon API.
|
2017-03-14 15:37:29 +00:00
|
|
|
func (ipfs *Connector) apiURL() string {
|
2017-03-16 14:51:24 +00:00
|
|
|
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
2017-03-23 18:34:33 +00:00
|
|
|
|
|
|
|
// ConnectSwarms requests the ipfs addresses of other peers and
|
|
|
|
// triggers ipfs swarm connect requests
|
2017-03-27 13:07:12 +00:00
|
|
|
func (ipfs *Connector) ConnectSwarms() error {
|
2017-03-23 18:34:33 +00:00
|
|
|
var idsSerial []api.IDSerial
|
|
|
|
err := ipfs.rpcClient.Call("",
|
|
|
|
"Cluster",
|
|
|
|
"Peers",
|
|
|
|
struct{}{},
|
|
|
|
&idsSerial)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
2017-03-27 13:07:12 +00:00
|
|
|
return err
|
2017-03-23 18:34:33 +00:00
|
|
|
}
|
|
|
|
logger.Debugf("%+v", idsSerial)
|
|
|
|
|
|
|
|
for _, idSerial := range idsSerial {
|
|
|
|
ipfsID := idSerial.IPFS
|
|
|
|
for _, addr := range ipfsID.Addresses {
|
|
|
|
// This is a best effort attempt
|
|
|
|
// We ignore errors which happens
|
|
|
|
// when passing in a bunch of addresses
|
2017-10-13 16:38:18 +00:00
|
|
|
_, err := ipfs.post(
|
2017-03-23 18:34:33 +00:00
|
|
|
fmt.Sprintf("swarm/connect?arg=%s", addr))
|
|
|
|
if err != nil {
|
|
|
|
logger.Debug(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
logger.Debugf("ipfs successfully connected to %s", addr)
|
|
|
|
}
|
|
|
|
}
|
2017-03-27 13:07:12 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ConfigKey fetches the IPFS daemon configuration and retrieves the value for
|
|
|
|
// a given configuration key. For example, "Datastore/StorageMax" will return
|
|
|
|
// the value for StorageMax in the Datastore configuration object.
|
|
|
|
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
|
2017-10-13 16:38:18 +00:00
|
|
|
res, err := ipfs.post("config/show")
|
2017-03-27 13:07:12 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var cfg map[string]interface{}
|
2017-03-29 20:52:13 +00:00
|
|
|
err = json.Unmarshal(res, &cfg)
|
2017-03-27 13:07:12 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
path := strings.SplitN(keypath, "/", 2)
|
|
|
|
if len(path) == 0 {
|
|
|
|
return nil, errors.New("cannot lookup without a path")
|
|
|
|
}
|
|
|
|
|
|
|
|
return getConfigValue(path, cfg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, error) {
|
|
|
|
value, ok := cfg[path[0]]
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.New("key not found in configuration")
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(path) == 1 {
|
|
|
|
return value, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
switch value.(type) {
|
|
|
|
case map[string]interface{}:
|
|
|
|
v := value.(map[string]interface{})
|
|
|
|
return getConfigValue(path[1:], v)
|
|
|
|
default:
|
|
|
|
return nil, errors.New("invalid path")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-02 22:49:25 +00:00
|
|
|
// FreeSpace returns the amount of unused space in the ipfs repository. This
|
|
|
|
// value is derived from the RepoSize and StorageMax values given by "repo
|
|
|
|
// stats". The value is in bytes.
|
2017-10-26 11:45:33 +00:00
|
|
|
func (ipfs *Connector) FreeSpace() (uint64, error) {
|
2017-10-13 16:38:18 +00:00
|
|
|
res, err := ipfs.post("repo/stat")
|
2017-08-02 22:49:25 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var stats ipfsRepoStatResp
|
|
|
|
err = json.Unmarshal(res, &stats)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return stats.StorageMax - stats.RepoSize, nil
|
|
|
|
}
|
|
|
|
|
2017-03-27 13:07:12 +00:00
|
|
|
// RepoSize returns the current repository size of the ipfs daemon as
|
|
|
|
// provided by "repo stats". The value is in bytes.
|
2017-10-26 11:45:33 +00:00
|
|
|
func (ipfs *Connector) RepoSize() (uint64, error) {
|
2017-10-13 16:38:18 +00:00
|
|
|
res, err := ipfs.post("repo/stat")
|
2017-03-27 13:07:12 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var stats ipfsRepoStatResp
|
2017-03-29 20:52:13 +00:00
|
|
|
err = json.Unmarshal(res, &stats)
|
2017-03-27 13:07:12 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return stats.RepoSize, nil
|
2017-03-23 18:34:33 +00:00
|
|
|
}
|