ipfs-cluster/api/rest/restapi.go
Hector Sanjuan 8f06baa1bf Issue #162: Rework configuration format
The following commit reimplements ipfs-cluster configuration under
the following premises:

  * Each component is initialized with a configuration object
  defined by its module
  * Each component decides how the JSON representation of its
  configuration looks like
  * Each component parses and validates its own configuration
  * Each component exposes its own defaults
  * Component configurations are make the sections of a
  central JSON configuration file (which replaces the current
  JSON format)
  * Component configurations implement a common interface
  (config.ComponentConfig) with a set of common operations
  * The central configuration file is managed by a
  config.ConfigManager which:
    * Registers ComponentConfigs
    * Assigns the correspondent sections from the JSON file to each
    component and delegates the parsing
    * Delegates the JSON generation for each section
    * Can be notified when the configuration is updated and must be
    saved to disk

The new service.json would then look as follows:

```json
{
  "cluster": {
    "id": "QmTVW8NoRxC5wBhV7WtAYtRn7itipEESfozWN5KmXUQnk2",
    "private_key": "<...>",
    "secret": "00224102ae6aaf94f2606abf69a0e278251ecc1d64815b617ff19d6d2841f786",
    "peers": [],
    "bootstrap": [],
    "leave_on_shutdown": false,
    "listen_multiaddress": "/ip4/0.0.0.0/tcp/9096",
    "state_sync_interval": "1m0s",
    "ipfs_sync_interval": "2m10s",
    "replication_factor": -1,
    "monitor_ping_interval": "15s"
  },
  "consensus": {
    "raft": {
      "heartbeat_timeout": "1s",
      "election_timeout": "1s",
      "commit_timeout": "50ms",
      "max_append_entries": 64,
      "trailing_logs": 10240,
      "snapshot_interval": "2m0s",
      "snapshot_threshold": 8192,
      "leader_lease_timeout": "500ms"
    }
  },
  "api": {
    "restapi": {
      "listen_multiaddress": "/ip4/127.0.0.1/tcp/9094",
      "read_timeout": "30s",
      "read_header_timeout": "5s",
      "write_timeout": "1m0s",
      "idle_timeout": "2m0s"
    }
  },
  "ipfs_connector": {
    "ipfshttp": {
      "proxy_listen_multiaddress": "/ip4/127.0.0.1/tcp/9095",
      "node_multiaddress": "/ip4/127.0.0.1/tcp/5001",
      "connect_swarms_delay": "7s",
      "proxy_read_timeout": "10m0s",
      "proxy_read_header_timeout": "5s",
      "proxy_write_timeout": "10m0s",
      "proxy_idle_timeout": "1m0s"
    }
  },
  "monitor": {
    "monbasic": {
      "check_interval": "15s"
    }
  },
  "informer": {
    "disk": {
      "metric_ttl": "30s",
      "metric_type": "freespace"
    },
    "numpin": {
      "metric_ttl": "10s"
    }
  }
}
```

This new format aims to be easily extensible per component. As such,
it already surfaces quite a few new options which were hardcoded
before.

Additionally, since Go API have changed, some redundant methods have been
removed and small refactoring has happened to take advantage of the new
way.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-10-18 00:00:12 +02:00

557 lines
11 KiB
Go

// Package rest implements an IPFS Cluster API component. It provides
// a REST-ish API to interact with Cluster over HTTP.
package rest
import (
"context"
"crypto/tls"
"encoding/json"
"net"
"net/http"
"strconv"
"strings"
"sync"
types "github.com/ipfs/ipfs-cluster/api"
mux "github.com/gorilla/mux"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
var logger = logging.Logger("restapi")
// API implements an API and aims to provides
// a RESTful HTTP API for Cluster.
type API struct {
ctx context.Context
cancel func()
config *Config
rpcClient *rpc.Client
rpcReady chan struct{}
router *mux.Router
listener net.Listener
server *http.Server
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
type route struct {
Name string
Method string
Pattern string
HandlerFunc http.HandlerFunc
}
type peerAddBody struct {
PeerMultiaddr string `json:"peer_multiaddress"`
}
// NewAPI creates a new REST API component. It receives
// the multiaddress on which the API listens.
func NewAPI(cfg *Config) (*API, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
n, addr, err := manet.DialArgs(cfg.ListenAddr)
if err != nil {
return nil, err
}
var l net.Listener
if cfg.TLS != nil {
l, err = tls.Listen(n, addr, cfg.TLS)
} else {
l, err = net.Listen(n, addr)
}
if err != nil {
return nil, err
}
return newAPI(cfg, l)
}
func newAPI(cfg *Config, l net.Listener) (*API, error) {
router := mux.NewRouter().StrictSlash(true)
s := &http.Server{
ReadTimeout: cfg.ReadTimeout,
ReadHeaderTimeout: cfg.ReadHeaderTimeout,
WriteTimeout: cfg.WriteTimeout,
IdleTimeout: cfg.IdleTimeout,
Handler: router,
}
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
ctx, cancel := context.WithCancel(context.Background())
api := &API{
ctx: ctx,
cancel: cancel,
config: cfg,
listener: l,
server: s,
rpcReady: make(chan struct{}, 1),
}
api.addRoutes(router)
api.run()
return api, nil
}
func (api *API) addRoutes(router *mux.Router) {
for _, route := range api.routes() {
if api.config.BasicAuthCreds != nil {
route.HandlerFunc = basicAuth(route.HandlerFunc, api.config.BasicAuthCreds)
}
router.
Methods(route.Method).
Path(route.Pattern).
Name(route.Name).
Handler(route.HandlerFunc)
}
api.router = router
}
func basicAuth(h http.HandlerFunc, credentials map[string]string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
username, password, ok := r.BasicAuth()
if !ok {
resp, err := unauthorizedResp()
if err != nil {
logger.Error(err)
return
}
http.Error(w, resp, 401)
return
}
authorized := false
for u, p := range credentials {
if u == username && p == password {
authorized = true
}
}
if !authorized {
resp, err := unauthorizedResp()
if err != nil {
logger.Error(err)
return
}
http.Error(w, resp, 401)
return
}
h.ServeHTTP(w, r)
}
}
func unauthorizedResp() (string, error) {
apiError := types.Error{
Code: 401,
Message: "Unauthorized",
}
resp, err := json.Marshal(apiError)
return string(resp), err
}
func (api *API) routes() []route {
return []route{
{
"ID",
"GET",
"/id",
api.idHandler,
},
{
"Version",
"GET",
"/version",
api.versionHandler,
},
{
"Peers",
"GET",
"/peers",
api.peerListHandler,
},
{
"PeerAdd",
"POST",
"/peers",
api.peerAddHandler,
},
{
"PeerRemove",
"DELETE",
"/peers/{peer}",
api.peerRemoveHandler,
},
{
"Allocations",
"GET",
"/allocations",
api.allocationsHandler,
},
{
"Allocation",
"GET",
"/allocations/{hash}",
api.allocationHandler,
},
{
"StatusAll",
"GET",
"/pins",
api.statusAllHandler,
},
{
"SyncAll",
"POST",
"/pins/sync",
api.syncAllHandler,
},
{
"Status",
"GET",
"/pins/{hash}",
api.statusHandler,
},
{
"Pin",
"POST",
"/pins/{hash}",
api.pinHandler,
},
{
"Unpin",
"DELETE",
"/pins/{hash}",
api.unpinHandler,
},
{
"Sync",
"POST",
"/pins/{hash}/sync",
api.syncHandler,
},
{
"Recover",
"POST",
"/pins/{hash}/recover",
api.recoverHandler,
},
}
}
func (api *API) run() {
api.wg.Add(1)
go func() {
defer api.wg.Done()
<-api.rpcReady
logger.Infof("REST API: %s", api.config.ListenAddr)
err := api.server.Serve(api.listener)
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
logger.Error(err)
}
}()
}
// Shutdown stops any API listeners.
func (api *API) Shutdown() error {
api.shutdownLock.Lock()
defer api.shutdownLock.Unlock()
if api.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("stopping Cluster API")
api.cancel()
close(api.rpcReady)
// Cancel any outstanding ops
api.server.SetKeepAlivesEnabled(false)
api.listener.Close()
api.wg.Wait()
api.shutdown = true
return nil
}
// SetClient makes the component ready to perform RPC
// requests.
func (api *API) SetClient(c *rpc.Client) {
api.rpcClient = c
api.rpcReady <- struct{}{}
}
func (api *API) idHandler(w http.ResponseWriter, r *http.Request) {
idSerial := types.IDSerial{}
err := api.rpcClient.Call("",
"Cluster",
"ID",
struct{}{},
&idSerial)
sendResponse(w, err, idSerial)
}
func (api *API) versionHandler(w http.ResponseWriter, r *http.Request) {
var v types.Version
err := api.rpcClient.Call("",
"Cluster",
"Version",
struct{}{},
&v)
sendResponse(w, err, v)
}
func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) {
var peersSerial []types.IDSerial
err := api.rpcClient.Call("",
"Cluster",
"Peers",
struct{}{},
&peersSerial)
sendResponse(w, err, peersSerial)
}
func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
dec := json.NewDecoder(r.Body)
defer r.Body.Close()
var addInfo peerAddBody
err := dec.Decode(&addInfo)
if err != nil {
sendErrorResponse(w, 400, "error decoding request body")
return
}
mAddr, err := ma.NewMultiaddr(addInfo.PeerMultiaddr)
if err != nil {
sendErrorResponse(w, 400, "error decoding peer_multiaddress")
return
}
var ids types.IDSerial
err = api.rpcClient.Call("",
"Cluster",
"PeerAdd",
types.MultiaddrToSerial(mAddr),
&ids)
sendResponse(w, err, ids)
}
func (api *API) peerRemoveHandler(w http.ResponseWriter, r *http.Request) {
if p := parsePidOrError(w, r); p != "" {
err := api.rpcClient.Call("",
"Cluster",
"PeerRemove",
p,
&struct{}{})
sendEmptyResponse(w, err)
}
}
func (api *API) pinHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c.Cid != "" {
err := api.rpcClient.Call("",
"Cluster",
"Pin",
c,
&struct{}{})
sendAcceptedResponse(w, err)
}
}
func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c.Cid != "" {
err := api.rpcClient.Call("",
"Cluster",
"Unpin",
c,
&struct{}{})
sendAcceptedResponse(w, err)
}
}
func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) {
var pins []types.PinSerial
err := api.rpcClient.Call("",
"Cluster",
"Pins",
struct{}{},
&pins)
sendResponse(w, err, pins)
}
func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c.Cid != "" {
var pin types.PinSerial
err := api.rpcClient.Call("",
"Cluster",
"PinGet",
c,
&pin)
if err != nil { // errors here are 404s
sendErrorResponse(w, 404, err.Error())
return
}
sendJSONResponse(w, 200, pin)
}
}
func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
var pinInfos []types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
"Cluster",
"StatusAll",
struct{}{},
&pinInfos)
sendResponse(w, err, pinInfos)
}
func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c.Cid != "" {
var pinInfo types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
"Cluster",
"Status",
c,
&pinInfo)
sendResponse(w, err, pinInfo)
}
}
func (api *API) syncAllHandler(w http.ResponseWriter, r *http.Request) {
var pinInfos []types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
"Cluster",
"SyncAll",
struct{}{},
&pinInfos)
sendResponse(w, err, pinInfos)
}
func (api *API) syncHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c.Cid != "" {
var pinInfo types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
"Cluster",
"Sync",
c,
&pinInfo)
sendResponse(w, err, pinInfo)
}
}
func (api *API) recoverHandler(w http.ResponseWriter, r *http.Request) {
if c := parseCidOrError(w, r); c.Cid != "" {
var pinInfo types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
"Cluster",
"Recover",
c,
&pinInfo)
sendResponse(w, err, pinInfo)
}
}
func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial {
vars := mux.Vars(r)
hash := vars["hash"]
_, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return types.PinSerial{Cid: ""}
}
pin := types.PinSerial{
Cid: hash,
}
queryValues := r.URL.Query()
rplStr := queryValues.Get("replication_factor")
if rpl, err := strconv.Atoi(rplStr); err == nil {
pin.ReplicationFactor = rpl
}
return pin
}
func parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID {
vars := mux.Vars(r)
idStr := vars["peer"]
pid, err := peer.IDB58Decode(idStr)
if err != nil {
sendErrorResponse(w, 400, "error decoding Peer ID: "+err.Error())
return ""
}
return pid
}
func sendResponse(w http.ResponseWriter, rpcErr error, resp interface{}) {
if checkRPCErr(w, rpcErr) {
sendJSONResponse(w, 200, resp)
}
}
// checkRPCErr takes care of returning standard error responses if we
// pass an error to it. It returns true when everythings OK (no error
// was handled), or false otherwise.
func checkRPCErr(w http.ResponseWriter, err error) bool {
if err != nil {
sendErrorResponse(w, 500, err.Error())
return false
}
return true
}
func sendEmptyResponse(w http.ResponseWriter, rpcErr error) {
if checkRPCErr(w, rpcErr) {
w.WriteHeader(http.StatusNoContent)
}
}
func sendAcceptedResponse(w http.ResponseWriter, rpcErr error) {
if checkRPCErr(w, rpcErr) {
w.WriteHeader(http.StatusAccepted)
}
}
func sendJSONResponse(w http.ResponseWriter, code int, resp interface{}) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(resp); err != nil {
panic(err)
}
}
func sendErrorResponse(w http.ResponseWriter, code int, msg string) {
errorResp := types.Error{
Code: code,
Message: msg,
}
logger.Errorf("sending error response: %d: %s", code, msg)
sendJSONResponse(w, code, errorResp)
}