Fix 466: Hijack repo/stat in the proxy and return aggregates from the cluster.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-08-20 20:43:27 +02:00
parent 0b21a404e5
commit 2ffa3d80de
11 changed files with 134 additions and 106 deletions

View File

@ -888,3 +888,9 @@ type Error struct {
func (e *Error) Error() string {
return fmt.Sprintf("%s (%d)", e.Message, e.Code)
}
// IPFSRepoStat wraps information about the IPFS repository.
type IPFSRepoStat struct {
RepoSize uint64
StorageMax uint64
}

View File

@ -98,10 +98,12 @@ func (ipfs *mockConnector) SwarmPeers() (api.SwarmPeers, error) {
return []peer.ID{test.TestPeerID4, test.TestPeerID5}, nil
}
func (ipfs *mockConnector) RepoStat() (api.IPFSRepoStat, error) {
return api.IPFSRepoStat{RepoSize: 100, StorageMax: 1000}, nil
}
func (ipfs *mockConnector) ConnectSwarms() error { return nil }
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil }
func (ipfs *mockConnector) RepoSize() (uint64, error) { return 0, nil }
func (ipfs *mockConnector) BlockPut(nwm api.NodeWithMeta) error {
ipfs.blocks.Store(nwm.Cid, nwm.Data)

View File

@ -60,7 +60,7 @@ func (cfg *Config) Validate() error {
return errors.New("disk.metric_ttl is invalid")
}
if _, ok := metricToRPC[cfg.Type]; !ok {
if cfg.Type.String() == "" {
return errors.New("disk.metric_type is invalid")
}
return nil

View File

@ -23,12 +23,6 @@ const (
var logger = logging.Logger("diskinfo")
// metricToRPC maps from a specified metric name to the corrresponding RPC call
var metricToRPC = map[MetricType]string{
MetricFreeSpace: "IPFSFreeSpace",
MetricRepoSize: "IPFSRepoSize",
}
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
@ -76,16 +70,26 @@ func (disk *Informer) GetMetric() api.Metric {
}
}
var repoStat api.IPFSRepoStat
var metric uint64
valid := true
err := disk.rpcClient.Call("",
"Cluster",
metricToRPC[disk.config.Type],
"IPFSRepoStat",
struct{}{},
&metric)
&repoStat)
if err != nil {
logger.Error(err)
valid = false
} else {
switch disk.config.Type {
case MetricFreeSpace:
metric = repoStat.StorageMax - repoStat.RepoSize
case MetricRepoSize:
metric = repoStat.RepoSize
}
}
m := api.Metric{

View File

@ -7,11 +7,11 @@ import (
rpc "github.com/hsanjuan/go-libp2p-gorpc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
type badRPCService struct {
nthCall int
}
func badRPCClient(t *testing.T) *rpc.Client {
@ -24,35 +24,7 @@ func badRPCClient(t *testing.T) *rpc.Client {
return c
}
// func (mock *badRPCService) IPFSConfigKey(in string, out *interface{}) error {
// mock.nthCall++
// switch mock.nthCall {
// case 1:
// return errors.New("fake error the first time you use this mock")
// case 2:
// // don't set out
// return nil
// case 3:
// // don't set to string
// *out = 3
// case 4:
// // non parseable string
// *out = "abc"
// default:
// *out = "10KB"
// }
// return nil
// }
func (mock *badRPCService) IPFSRepoSize(ctx context.Context, in struct{}, out *uint64) error {
*out = 2
mock.nthCall++
return errors.New("fake error")
}
func (mock *badRPCService) IPFSFreeSpace(ctx context.Context, in struct{}, out *uint64) error {
*out = 2
mock.nthCall++
func (mock *badRPCService) IPFSRepoStat(ctx context.Context, in struct{}, out *api.IPFSRepoStat) error {
return errors.New("fake error")
}

View File

@ -84,12 +84,9 @@ type IPFSConnector interface {
// ConfigKey returns the value for a configuration key.
// Subobjects are reached with keypaths as "Parent/Child/GrandChild...".
ConfigKey(keypath string) (interface{}, error)
// FreeSpace returns the amount of remaining space on the repo, calculated from
//"repo stat"
FreeSpace() (uint64, error)
// RepoSize returns the current repository size as expressed
// by "repo stat".
RepoSize() (uint64, error)
// RepoStat returns the current repository size and max limit as
// provided by "repo stat".
RepoStat() (api.IPFSRepoStat, error)
// BlockPut directly adds a block of data to the IPFS repo
BlockPut(api.NodeWithMeta) error
// BlockGet retrieves the raw data of an IPFS block

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
@ -94,11 +95,6 @@ type ipfsIDResp struct {
Addresses []string
}
type ipfsRepoStatResp struct {
RepoSize uint64
StorageMax uint64
}
type ipfsAddResp struct {
Name string
Hash string
@ -189,6 +185,8 @@ func NewConnector(cfg *Config) (*Connector, error) {
smux.HandleFunc("/api/v0/pin/ls/", ipfs.pinLsHandler)
smux.HandleFunc("/api/v0/add", ipfs.addHandler)
smux.HandleFunc("/api/v0/add/", ipfs.addHandler)
smux.HandleFunc("/api/v0/repo/stat", ipfs.repoStatHandler)
smux.HandleFunc("/api/v0/repo/stat/", ipfs.repoStatHandler)
go ipfs.run()
return ipfs, nil
@ -467,6 +465,56 @@ func (ipfs *Connector) addHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (ipfs *Connector) repoStatHandler(w http.ResponseWriter, r *http.Request) {
var peers []peer.ID
err := ipfs.rpcClient.Call(
"",
"Cluster",
"ConsensusPeers",
struct{}{},
&peers,
)
if err != nil {
ipfsErrorResponder(w, err.Error())
return
}
ctxs, cancels := rpcutil.CtxsWithTimeout(ipfs.ctx, len(peers), ipfs.config.IPFSRequestTimeout)
defer rpcutil.MultiCancel(cancels)
repoStats := make([]api.IPFSRepoStat, len(peers), len(peers))
repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats))
for i := range repoStats {
repoStatsIfaces[i] = &repoStats[i]
}
errs := ipfs.rpcClient.MultiCall(
ctxs,
peers,
"Cluster",
"IPFSRepoStat",
struct{}{},
repoStatsIfaces,
)
totalStats := api.IPFSRepoStat{}
for i, err := range errs {
if err != nil {
logger.Errorf("%s repo/stat errored: %s", peers[i], err)
continue
}
totalStats.RepoSize += repoStats[i].RepoSize
totalStats.StorageMax += repoStats[i].StorageMax
}
resBytes, _ := json.Marshal(totalStats)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
}
// SetClient makes the component ready to perform RPC
// requests.
func (ipfs *Connector) SetClient(c *rpc.Client) {
@ -841,45 +889,24 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err
}
}
// FreeSpace returns the amount of unused space in the ipfs repository. This
// value is derived from the RepoSize and StorageMax values given by "repo
// stats". The value is in bytes.
func (ipfs *Connector) FreeSpace() (uint64, error) {
// RepoStat returns the DiskUsage and StorageMax repo/stat values from the
// ipfs daemon, in bytes, wrapped as an IPFSRepoStat object.
func (ipfs *Connector) RepoStat() (api.IPFSRepoStat, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat?size-only=true", "", nil)
if err != nil {
logger.Error(err)
return 0, err
return api.IPFSRepoStat{}, err
}
var stats ipfsRepoStatResp
var stats api.IPFSRepoStat
err = json.Unmarshal(res, &stats)
if err != nil {
logger.Error(err)
return 0, err
return stats, err
}
return stats.StorageMax - stats.RepoSize, nil
}
// RepoSize returns the current repository size of the ipfs daemon as
// provided by "repo stats". The value is in bytes.
func (ipfs *Connector) RepoSize() (uint64, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat?size-only=true", "", nil)
if err != nil {
logger.Error(err)
return 0, err
}
var stats ipfsRepoStatResp
err = json.Unmarshal(res, &stats)
if err != nil {
logger.Error(err)
return 0, err
}
return stats.RepoSize, nil
return stats, nil
}
// SwarmPeers returns the peers currently connected to this ipfs daemon.

View File

@ -489,6 +489,35 @@ func TestIPFSProxyPinLs(t *testing.T) {
})
}
func TestProxyRepoStat(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
res, err := http.Post(fmt.Sprintf("%s/repo/stat", proxyURL(ipfs)), "", nil)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
t.Error("request should have succeeded")
}
resBytes, _ := ioutil.ReadAll(res.Body)
var stat api.IPFSRepoStat
err = json.Unmarshal(resBytes, &stat)
if err != nil {
t.Fatal(err)
}
// The mockRPC returns 3 peers. Since no host is set,
// all calls are local.
if stat.RepoSize != 6000 || stat.StorageMax != 300000 {
t.Errorf("expected different stats: %+v", stat)
}
}
func TestProxyAdd(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
@ -683,18 +712,18 @@ func TestBlockGet(t *testing.T) {
}
}
func TestRepoSize(t *testing.T) {
func TestRepoStat(t *testing.T) {
ctx := context.Background()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
s, err := ipfs.RepoSize()
s, err := ipfs.RepoStat()
if err != nil {
t.Fatal(err)
}
// See the ipfs mock implementation
if s != 0 {
if s.RepoSize != 0 {
t.Error("expected 0 bytes of size")
}
@ -704,11 +733,11 @@ func TestRepoSize(t *testing.T) {
t.Error("expected success pinning cid")
}
s, err = ipfs.RepoSize()
s, err = ipfs.RepoStat()
if err != nil {
t.Fatal(err)
}
if s != 1000 {
if s.RepoSize != 1000 {
t.Error("expected 1000 bytes of size")
}
}

View File

@ -45,9 +45,9 @@
},
{
"author": "hsanjuan",
"hash": "QmSXkfCpd7nWZahenZ544zsJV29VPooaJnQZfKu5qgga8E",
"hash": "QmW56LEPdtdETprHG5h5qcJpp6qNYuytzyqbEctGdbGfSF",
"name": "go-libp2p-gorpc",
"version": "1.0.16"
"version": "1.0.17"
},
{
"author": "libp2p",

View File

@ -322,16 +322,9 @@ func (rpcapi *RPCAPI) IPFSConfigKey(ctx context.Context, in string, out *interfa
return err
}
// IPFSFreeSpace runs IPFSConnector.FreeSpace().
func (rpcapi *RPCAPI) IPFSFreeSpace(ctx context.Context, in struct{}, out *uint64) error {
res, err := rpcapi.c.ipfs.FreeSpace()
*out = res
return err
}
// IPFSRepoSize runs IPFSConnector.RepoSize().
func (rpcapi *RPCAPI) IPFSRepoSize(ctx context.Context, in struct{}, out *uint64) error {
res, err := rpcapi.c.ipfs.RepoSize()
// IPFSRepoStat runs IPFSConnector.RepoStat().
func (rpcapi *RPCAPI) IPFSRepoStat(ctx context.Context, in struct{}, out *api.IPFSRepoStat) error {
res, err := rpcapi.c.ipfs.RepoStat()
*out = res
return err
}

View File

@ -392,15 +392,13 @@ func (mock *mockService) IPFSConfigKey(ctx context.Context, in string, out *inte
return nil
}
func (mock *mockService) IPFSRepoSize(ctx context.Context, in struct{}, out *uint64) error {
// since we have two pins. Assume each is 1KB.
*out = 2000
return nil
func (mock *mockService) IPFSRepoStat(ctx context.Context, in struct{}, out *api.IPFSRepoStat) error {
// since we have two pins. Assume each is 1000B.
stat := api.IPFSRepoStat{
StorageMax: 100000,
RepoSize: 2000,
}
func (mock *mockService) IPFSFreeSpace(ctx context.Context, in struct{}, out *uint64) error {
// RepoSize is 2KB, StorageMax is 100KB
*out = 98000
*out = stat
return nil
}