ipfs-cluster/ipfsconn/ipfshttp/ipfshttp.go
2017-08-03 11:24:19 -06:00

851 lines
21 KiB
Go

// Package ipfshttp implements an IPFS Cluster IPFSConnector component. It
// uses the IPFS HTTP API to communicate to IPFS.
package ipfshttp
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"path/filepath"
"strings"
"sync"
"time"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
var logger = logging.Logger("ipfshttp")
// ConnectSwarmsDelay specifies how long to wait after startup before attempting
// to open connections from this peer's IPFS daemon to the IPFS daemons
// of other peers.
var ConnectSwarmsDelay = 7 * time.Second
// IPFS Proxy settings
var (
// maximum duration before timing out reading a full request
IPFSProxyServerReadTimeout = 10 * time.Minute
// maximum duration before timing out reading the headers of a request
IPFSProxyServerReadHeaderTimeout = 5 * time.Second
// maximum duration before timing out write of the response
IPFSProxyServerWriteTimeout = 10 * time.Minute
// server-side the amount of time a Keep-Alive connection will be
// kept idle before being reused
IPFSProxyServerIdleTimeout = 60 * time.Second
)
// Connector implements the IPFSConnector interface
// and provides a component which does two tasks:
//
// On one side, it proxies HTTP requests to the configured IPFS
// daemon. It is able to intercept these requests though, and
// perform extra operations on them.
//
// On the other side, it is used to perform on-demand requests
// against the configured IPFS daemom (such as a pin request).
type Connector struct {
ctx context.Context
cancel func()
nodeMAddr ma.Multiaddr
nodeAddr string
proxyMAddr ma.Multiaddr
handlers map[string]func(http.ResponseWriter, *http.Request)
rpcClient *rpc.Client
rpcReady chan struct{}
listener net.Listener
server *http.Server
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
type ipfsError struct {
Message string
}
type ipfsPinType struct {
Type string
}
type ipfsPinLsResp struct {
Keys map[string]ipfsPinType
}
type ipfsPinOpResp struct {
Pins []string
}
type ipfsIDResp struct {
ID string
Addresses []string
}
type ipfsRepoStatResp struct {
RepoSize int
StorageMax int
NumObjects int
}
type ipfsAddResp struct {
Name string
Hash string
}
// NewConnector creates the component and leaves it ready to be started
func NewConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*Connector, error) {
_, nodeAddr, err := manet.DialArgs(ipfsNodeMAddr)
if err != nil {
return nil, err
}
proxyNet, proxyAddr, err := manet.DialArgs(ipfsProxyMAddr)
if err != nil {
return nil, err
}
l, err := net.Listen(proxyNet, proxyAddr)
if err != nil {
return nil, err
}
smux := http.NewServeMux()
s := &http.Server{
ReadTimeout: IPFSProxyServerReadTimeout,
WriteTimeout: IPFSProxyServerWriteTimeout,
//ReadHeaderTimeout: IPFSProxyServerReadHeaderTimeout, Go 1.8
//IdleTimeout: IPFSProxyServerIdleTimeout, Go 1.8
Handler: smux,
}
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed
ctx, cancel := context.WithCancel(context.Background())
ipfs := &Connector{
ctx: ctx,
cancel: cancel,
nodeMAddr: ipfsNodeMAddr,
nodeAddr: nodeAddr,
proxyMAddr: ipfsProxyMAddr,
handlers: make(map[string]func(http.ResponseWriter, *http.Request)),
rpcReady: make(chan struct{}, 1),
listener: l,
server: s,
}
smux.HandleFunc("/", ipfs.handle)
ipfs.handlers["/api/v0/pin/add"] = ipfs.pinHandler
ipfs.handlers["/api/v0/pin/rm"] = ipfs.unpinHandler
ipfs.handlers["/api/v0/pin/ls"] = ipfs.pinLsHandler
ipfs.handlers["/api/v0/add"] = ipfs.addHandler
go ipfs.run()
return ipfs, nil
}
// launches proxy and connects all ipfs daemons when
// we receive the rpcReady signal.
func (ipfs *Connector) run() {
<-ipfs.rpcReady
// This launches the proxy
ipfs.wg.Add(1)
go func() {
defer ipfs.wg.Done()
logger.Infof("IPFS Proxy: %s -> %s",
ipfs.proxyMAddr,
ipfs.nodeMAddr)
err := ipfs.server.Serve(ipfs.listener) // hangs here
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
logger.Error(err)
}
}()
// This runs ipfs swarm connect to the daemons of other cluster members
ipfs.wg.Add(1)
go func() {
defer ipfs.wg.Done()
// It does not hurt to wait a little bit. i.e. think cluster
// peers which are started at the same time as the ipfs
// daemon...
tmr := time.NewTimer(ConnectSwarmsDelay)
defer tmr.Stop()
select {
case <-tmr.C:
ipfs.ConnectSwarms()
case <-ipfs.ctx.Done():
return
}
}()
}
// This will run a custom handler if we have one for a URL.Path, or
// otherwise just proxy the requests.
func (ipfs *Connector) handle(w http.ResponseWriter, r *http.Request) {
if customHandler, ok := ipfs.handlers[r.URL.Path]; ok {
customHandler(w, r)
} else {
ipfs.defaultHandler(w, r)
}
}
func (ipfs *Connector) proxyRequest(r *http.Request) (*http.Response, error) {
newURL := *r.URL
newURL.Host = ipfs.nodeAddr
newURL.Scheme = "http"
proxyReq, err := http.NewRequest(r.Method, newURL.String(), r.Body)
if err != nil {
logger.Error("error creating proxy request: ", err)
return nil, err
}
for k, v := range r.Header {
for _, s := range v {
proxyReq.Header.Add(k, s)
}
}
res, err := http.DefaultTransport.RoundTrip(proxyReq)
if err != nil {
logger.Error("error forwarding request: ", err)
return nil, err
}
return res, nil
}
// Writes a response to a ResponseWriter using the given body
// (which maybe resp.Body or a copy if it was already used).
func (ipfs *Connector) proxyResponse(w http.ResponseWriter, res *http.Response, body io.Reader) {
// Set response headers
for k, v := range res.Header {
for _, s := range v {
w.Header().Add(k, s)
}
}
w.WriteHeader(res.StatusCode)
// And copy body
io.Copy(w, body)
}
// defaultHandler just proxies the requests.
func (ipfs *Connector) defaultHandler(w http.ResponseWriter, r *http.Request) {
res, err := ipfs.proxyRequest(r)
if err != nil {
http.Error(w, "error forwarding request: "+err.Error(), 500)
return
}
ipfs.proxyResponse(w, res, res.Body)
res.Body.Close()
}
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
res := ipfsError{errMsg}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write(resBytes)
return
}
func (ipfs *Connector) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
argA := r.URL.Query()["arg"]
if len(argA) == 0 {
ipfsErrorResponder(w, "Error: bad argument")
return
}
arg := argA[0]
_, err := cid.Decode(arg)
if err != nil {
ipfsErrorResponder(w, "Error parsing CID: "+err.Error())
return
}
err = ipfs.rpcClient.Call("",
"Cluster",
op,
api.PinSerial{
Cid: arg,
},
&struct{}{})
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
res := ipfsPinOpResp{
Pins: []string{arg},
}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
}
func (ipfs *Connector) pinHandler(w http.ResponseWriter, r *http.Request) {
ipfs.pinOpHandler("Pin", w, r)
}
func (ipfs *Connector) unpinHandler(w http.ResponseWriter, r *http.Request) {
ipfs.pinOpHandler("Unpin", w, r)
}
func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
pinLs := ipfsPinLsResp{}
pinLs.Keys = make(map[string]ipfsPinType)
q := r.URL.Query()
arg := q.Get("arg")
if arg != "" {
c, err := cid.Decode(arg)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
var pin api.PinSerial
err = ipfs.rpcClient.Call("",
"Cluster",
"PinGet",
api.PinCid(c).ToSerial(),
&pin)
if err != nil {
ipfsErrorResponder(w, fmt.Sprintf(
"Error: path '%s' is not pinned",
arg))
return
}
pinLs.Keys[pin.Cid] = ipfsPinType{
Type: "recursive",
}
} else {
var pins []api.PinSerial
err := ipfs.rpcClient.Call("",
"Cluster",
"Pins",
struct{}{},
&pins)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
for _, pin := range pins {
pinLs.Keys[pin.Cid] = ipfsPinType{
Type: "recursive",
}
}
}
resBytes, _ := json.Marshal(pinLs)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
}
func (ipfs *Connector) addHandler(w http.ResponseWriter, r *http.Request) {
// Handle some request options
q := r.URL.Query()
// Remember if the user does not want cluster/ipfs to pin
doNotPin := q.Get("pin") == "false"
// make sure the local peer does not pin.
// Cluster will decide where to pin based on metrics and current
// allocations.
q.Set("pin", "false")
// do not send progress updates. They have no use since we need to
// wait until the end to tell cluster to pin.
q.Set("progress", "false")
r.URL.RawQuery = q.Encode()
res, err := ipfs.proxyRequest(r)
if err != nil {
http.Error(w, "error forwarding request: "+err.Error(), 500)
return
}
defer res.Body.Close()
// Shortcut some cases where there is nothing else to do
if scode := res.StatusCode; scode != http.StatusOK {
logger.Warningf("proxy /add request returned %d", scode)
ipfs.proxyResponse(w, res, res.Body)
return
}
if doNotPin {
logger.Debug("proxy /add requests has pin==false")
ipfs.proxyResponse(w, res, res.Body)
return
}
// The ipfs-add response is a streaming-like body where
// { "Name" : "filename", "Hash": "cid" } objects are provided
// for every added object.
// We will need to re-read the response in order to re-play it to
// the client at the end, therefore we make a copy in bodyCopy
// while decoding.
bodyCopy := new(bytes.Buffer)
bodyReader := io.TeeReader(res.Body, bodyCopy)
ipfsAddResps := []ipfsAddResp{}
dec := json.NewDecoder(bodyReader)
for dec.More() {
var addResp ipfsAddResp
err := dec.Decode(&addResp)
if err != nil {
http.Error(w, "error decoding response: "+err.Error(), 502)
return
}
ipfsAddResps = append(ipfsAddResps, addResp)
}
if len(ipfsAddResps) == 0 {
logger.Warning("proxy /add request response was OK but empty")
ipfs.proxyResponse(w, res, bodyCopy)
return
}
// An ipfs-add call can add multiple files and pin multiple items.
// The go-ipfs api is not perfectly behaved here (i.e. when passing in
// two directories to pin). There is no easy way to know for sure what
// has been pinned recursively and what not.
// Usually when pinning a directory, the recursive pin comes last.
// But we may just be pinning different files and no directories.
// In that case, we need to recursively pin them separately.
// decideRecursivePins() takes a conservative approach. It
// works on the regular use-cases. Otherwise, it might pin
// more things than it should.
pinHashes := decideRecursivePins(ipfsAddResps, r.URL.Query())
logger.Debugf("proxy /add request and will pin %s", pinHashes)
for _, pin := range pinHashes {
err := ipfs.rpcClient.Call("",
"Cluster",
"Pin",
api.PinSerial{
Cid: pin,
},
&struct{}{})
if err != nil {
// we need to fail the operation and make sure the
// user knows about it.
msg := "add operation was successful but "
msg += "an error ocurred performing the cluster "
msg += "pin operation: " + err.Error()
logger.Error(msg)
http.Error(w, msg, 500)
return
}
}
// Finally, send the original response back
ipfs.proxyResponse(w, res, bodyCopy)
}
// decideRecursivePins takes the answers from ipfsAddResp and
// figures out which of the pinned items need to be pinned
// recursively in cluster. That is, it guesses which items
// ipfs would have pinned recursively.
// When adding multiple files+directories, it may end up
// pinning more than it should because ipfs API does not
// behave well in these cases.
// It should work well for regular usecases: pin 1 file,
// pin 1 directory, pin several files.
func decideRecursivePins(added []ipfsAddResp, q url.Values) []string {
// When wrap-in-directory, return last element only.
_, ok := q["wrap-in-directory"]
if ok && q.Get("wrap-in-directory") == "true" {
return []string{
added[len(added)-1].Hash,
}
}
toPin := []string{}
baseFolders := make(map[string]struct{})
// Guess base folder names
baseFolder := func(path string) string {
slashed := filepath.ToSlash(path)
parts := strings.Split(slashed, "/")
if len(parts) == 0 {
return ""
}
if parts[0] == "" && len(parts) > 1 {
return parts[1]
}
return parts[0]
}
for _, add := range added {
if add.Hash == "" {
continue
}
b := baseFolder(add.Name)
if b != "" {
baseFolders[b] = struct{}{}
}
}
for _, add := range added {
if add.Hash == "" {
continue
}
_, ok := baseFolders[add.Name]
if ok { // it's a base folder, pin it
toPin = append(toPin, add.Hash)
} else { // otherwise, pin if there is no
// basefolder to it.
b := baseFolder(add.Name)
_, ok := baseFolders[b]
if !ok {
toPin = append(toPin, add.Hash)
}
}
}
return toPin
}
// SetClient makes the component ready to perform RPC
// requests.
func (ipfs *Connector) SetClient(c *rpc.Client) {
ipfs.rpcClient = c
ipfs.rpcReady <- struct{}{}
}
// Shutdown stops any listeners and stops the component from taking
// any requests.
func (ipfs *Connector) Shutdown() error {
ipfs.shutdownLock.Lock()
defer ipfs.shutdownLock.Unlock()
if ipfs.shutdown {
logger.Debug("already shutdown")
return nil
}
logger.Info("stopping IPFS Proxy")
ipfs.cancel()
close(ipfs.rpcReady)
ipfs.server.SetKeepAlivesEnabled(false)
ipfs.listener.Close()
ipfs.wg.Wait()
ipfs.shutdown = true
return nil
}
// ID performs an ID request against the configured
// IPFS daemon. It returns the fetched information.
// If the request fails, or the parsing fails, it
// returns an error and an empty IPFSID which also
// contains the error message.
func (ipfs *Connector) ID() (api.IPFSID, error) {
id := api.IPFSID{}
body, err := ipfs.get("id")
if err != nil {
id.Error = err.Error()
return id, err
}
var res ipfsIDResp
err = json.Unmarshal(body, &res)
if err != nil {
id.Error = err.Error()
return id, err
}
pID, err := peer.IDB58Decode(res.ID)
if err != nil {
id.Error = err.Error()
return id, err
}
id.ID = pID
mAddrs := make([]ma.Multiaddr, len(res.Addresses), len(res.Addresses))
for i, strAddr := range res.Addresses {
mAddr, err := ma.NewMultiaddr(strAddr)
if err != nil {
id.Error = err.Error()
return id, err
}
mAddrs[i] = mAddr
}
id.Addresses = mAddrs
return id, nil
}
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *Connector) Pin(hash *cid.Cid) error {
pinStatus, err := ipfs.PinLsCid(hash)
if err != nil {
return err
}
if !pinStatus.IsPinned() {
path := fmt.Sprintf("pin/add?arg=%s", hash)
_, err = ipfs.get(path)
if err == nil {
logger.Info("IPFS Pin request succeeded: ", hash)
}
return err
}
logger.Debug("IPFS object is already pinned: ", hash)
return nil
}
// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *Connector) Unpin(hash *cid.Cid) error {
pinStatus, err := ipfs.PinLsCid(hash)
if err != nil {
return err
}
if pinStatus.IsPinned() {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.get(path)
if err == nil {
logger.Info("IPFS Unpin request succeeded:", hash)
}
return err
}
logger.Debug("IPFS object is already unpinned: ", hash)
return nil
}
// PinLs performs a "pin ls --type typeFilter" request against the configured
// IPFS daemon and returns a map of cid strings and their status.
func (ipfs *Connector) PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error) {
body, err := ipfs.get("pin/ls?type=" + typeFilter)
// Some error talking to the daemon
if err != nil {
return nil, err
}
var res ipfsPinLsResp
err = json.Unmarshal(body, &res)
if err != nil {
logger.Error("parsing pin/ls response")
logger.Error(string(body))
return nil, err
}
statusMap := make(map[string]api.IPFSPinStatus)
for k, v := range res.Keys {
statusMap[k] = api.IPFSPinStatusFromString(v.Type)
}
return statusMap, nil
}
// PinLsCid performs a "pin ls --type=recursive <hash> "request and returns
// an api.IPFSPinStatus for that hash.
func (ipfs *Connector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) {
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash)
body, err := ipfs.get(lsPath)
// Network error, daemon down
if body == nil && err != nil {
return api.IPFSPinStatusError, err
}
// Pin not found likely here
if err != nil { // Not pinned
return api.IPFSPinStatusUnpinned, nil
}
var res ipfsPinLsResp
err = json.Unmarshal(body, &res)
if err != nil {
logger.Error("parsing pin/ls?arg=cid response:")
logger.Error(string(body))
return api.IPFSPinStatusError, err
}
pinObj, ok := res.Keys[hash.String()]
if !ok {
return api.IPFSPinStatusError, errors.New("expected to find the pin in the response")
}
return api.IPFSPinStatusFromString(pinObj.Type), nil
}
// get performs the heavy lifting of a get request against
// the IPFS daemon.
func (ipfs *Connector) get(path string) ([]byte, error) {
logger.Debugf("getting %s", path)
url := fmt.Sprintf("%s/%s",
ipfs.apiURL(),
path)
res, err := http.Get(url)
if err != nil {
logger.Error("error getting:", err)
return nil, err
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
logger.Errorf("error reading response body: %s", err)
return nil, err
}
var ipfsErr ipfsError
decodeErr := json.Unmarshal(body, &ipfsErr)
if res.StatusCode != http.StatusOK {
var msg string
if decodeErr == nil {
msg = fmt.Sprintf("IPFS unsuccessful: %d: %s",
res.StatusCode, ipfsErr.Message)
} else {
msg = fmt.Sprintf("IPFS-get '%s' unsuccessful: %d: %s",
path, res.StatusCode, body)
}
return body, errors.New(msg)
}
return body, nil
}
// apiURL is a short-hand for building the url of the IPFS
// daemon API.
func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
}
// ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms() error {
var idsSerial []api.IDSerial
err := ipfs.rpcClient.Call("",
"Cluster",
"Peers",
struct{}{},
&idsSerial)
if err != nil {
logger.Error(err)
return err
}
logger.Debugf("%+v", idsSerial)
for _, idSerial := range idsSerial {
ipfsID := idSerial.IPFS
for _, addr := range ipfsID.Addresses {
// This is a best effort attempt
// We ignore errors which happens
// when passing in a bunch of addresses
_, err := ipfs.get(
fmt.Sprintf("swarm/connect?arg=%s", addr))
if err != nil {
logger.Debug(err)
continue
}
logger.Debugf("ipfs successfully connected to %s", addr)
}
}
return nil
}
// ConfigKey fetches the IPFS daemon configuration and retrieves the value for
// a given configuration key. For example, "Datastore/StorageMax" will return
// the value for StorageMax in the Datastore configuration object.
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
res, err := ipfs.get("config/show")
if err != nil {
logger.Error(err)
return nil, err
}
var cfg map[string]interface{}
err = json.Unmarshal(res, &cfg)
if err != nil {
logger.Error(err)
return nil, err
}
path := strings.SplitN(keypath, "/", 2)
if len(path) == 0 {
return nil, errors.New("cannot lookup without a path")
}
return getConfigValue(path, cfg)
}
func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, error) {
value, ok := cfg[path[0]]
if !ok {
return nil, errors.New("key not found in configuration")
}
if len(path) == 1 {
return value, nil
}
switch value.(type) {
case map[string]interface{}:
v := value.(map[string]interface{})
return getConfigValue(path[1:], v)
default:
return nil, errors.New("invalid path")
}
}
// FreeSpace returns the amount of unused space in the ipfs repository. This
// value is derived from the RepoSize and StorageMax values given by "repo
// stats". The value is in bytes.
func (ipfs *Connector) FreeSpace() (int, error) {
res, err := ipfs.get("repo/stat")
if err != nil {
logger.Error(err)
return 0, err
}
var stats ipfsRepoStatResp
err = json.Unmarshal(res, &stats)
if err != nil {
logger.Error(err)
return 0, err
}
return stats.StorageMax - stats.RepoSize, nil
}
// RepoSize returns the current repository size of the ipfs daemon as
// provided by "repo stats". The value is in bytes.
func (ipfs *Connector) RepoSize() (int, error) {
res, err := ipfs.get("repo/stat")
if err != nil {
logger.Error(err)
return 0, err
}
var stats ipfsRepoStatResp
err = json.Unmarshal(res, &stats)
if err != nil {
logger.Error(err)
return 0, err
}
return stats.RepoSize, nil
}