Issue #49: Add disk informer

The disk informer uses "ipfs repo stat" to fetch the RepoSize value and
uses it as a metric.

The numpinalloc allocator is now a generalized ascendalloc which
sorts metrics in ascending order and return the ones with lowest
values.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-03-27 15:07:12 +02:00
parent e8ba1b7dad
commit 2bbbea79cc
16 changed files with 438 additions and 54 deletions

View File

@ -1,13 +1,14 @@
// Package numpinalloc implements an ipfscluster.Allocator based on the "numpin"
// Informer. It is a simple example on how an allocator is implemented.
package numpinalloc
// Package ascendalloc implements an ipfscluster.Allocator returns allocations
// based on sorting the metrics in ascending order. Thus, peers with smallest
// metrics are first in the list. This allocator can be used with a number
// of informers, as long as they provide a numeric metric value.
package ascendalloc
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
@ -15,7 +16,7 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("numpinalloc")
var logger = logging.Logger("ascendalloc")
// Allocator implements ipfscluster.Allocate.
type Allocator struct{}
@ -31,14 +32,15 @@ func (alloc *Allocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc *Allocator) Shutdown() error { return nil }
// Allocate returns where to allocate a pin request based on "numpin"-Informer
// metrics. In this simple case, we do not pay attention to the metrics
// of the current, we just need to sort the candidates by number of pins.
// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the candidates
// based on their metric values (from smallest to largest).
func (alloc *Allocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
numpins := newMetricsSorter(candidates)
sort.Sort(numpins)
return numpins.peers, nil
sortable := newMetricsSorter(candidates)
sort.Sort(sortable)
return sortable.peers, nil
}
// metricsSorter attaches sort.Interface methods to our metrics and sorts
@ -52,7 +54,7 @@ func newMetricsSorter(m map[peer.ID]api.Metric) *metricsSorter {
vMap := make(map[peer.ID]int)
peers := make([]peer.ID, 0, len(m))
for k, v := range m {
if v.Name != numpin.MetricName || v.Discard() {
if v.Discard() {
continue
}
val, err := strconv.Atoi(v.Value)

View File

@ -1,4 +1,4 @@
package numpinalloc
package ascendalloc
import (
"testing"
@ -76,24 +76,6 @@ var testCases = []testcase{
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad metric name
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: "lalala",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{

View File

@ -5,7 +5,7 @@ import (
"os"
"testing"
"github.com/ipfs/ipfs-cluster/allocator/numpinalloc"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/monitor/basic"
@ -77,7 +77,9 @@ func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, e
return m, nil
}
func (ipfs *mockConnector) ConnectSwarms() {}
func (ipfs *mockConnector) ConnectSwarms() error { return nil }
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
func (ipfs *mockConnector) RepoSize() (int, error) { return 0, nil }
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {
api := &mockAPI{}
@ -86,7 +88,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
st := mapstate.NewMapState()
tracker := maptracker.NewMapPinTracker(cfg.ID)
mon := basic.NewStdPeerMonitor(2)
alloc := numpinalloc.NewAllocator()
alloc := ascendalloc.NewAllocator()
inf := numpin.NewInformer()
cl, err := NewCluster(
@ -106,7 +108,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
}
func cleanRaft() {
os.RemoveAll(".raftFolderFromTests")
os.RemoveAll("raftFolderFromTests")
}
func testClusterShutdown(t *testing.T) {

View File

@ -77,6 +77,10 @@ type Config struct {
// pass before a peer can be detected as down.
MonitoringIntervalSeconds int
// AllocationStrategy is used to decide on the
// Informer/Allocator implementation to use.
AllocationStrategy string
// if a config has been loaded from disk, track the path
// so it can be saved to the same place.
path string
@ -143,6 +147,11 @@ type JSONConfig struct {
// Number of seconds between monitoring checks which detect
// if a peer is down and consenquently trigger a rebalance
MonitoringIntervalSeconds int `json:"monitoring_interval_seconds"`
// AllocationStrategy is used to set how pins are allocated to
// different Cluster peers. Currently supports "reposize" and "pincount"
// values.
AllocationStrategy string `json:"allocation_strategy"`
}
// ToJSONConfig converts a Config object to its JSON representation which
@ -184,6 +193,7 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) {
StateSyncSeconds: cfg.StateSyncSeconds,
ReplicationFactor: cfg.ReplicationFactor,
MonitoringIntervalSeconds: cfg.MonitoringIntervalSeconds,
AllocationStrategy: cfg.AllocationStrategy,
}
return
}
@ -265,6 +275,10 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
jcfg.MonitoringIntervalSeconds = DefaultMonitoringIntervalSeconds
}
if jcfg.AllocationStrategy == "" {
jcfg.AllocationStrategy = "reposize"
}
c = &Config{
ID: id,
PrivateKey: pKey,
@ -279,6 +293,7 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
StateSyncSeconds: jcfg.StateSyncSeconds,
ReplicationFactor: jcfg.ReplicationFactor,
MonitoringIntervalSeconds: jcfg.MonitoringIntervalSeconds,
AllocationStrategy: jcfg.AllocationStrategy,
}
return
}

View File

@ -10,7 +10,7 @@ func testingConfig() *Config {
ClusterListenMultiaddress: "/ip4/127.0.0.1/tcp/10000",
APIListenMultiaddress: "/ip4/127.0.0.1/tcp/10002",
IPFSProxyListenMultiaddress: "/ip4/127.0.0.1/tcp/10001",
ConsensusDataFolder: "./raftFolderFromTests",
ConsensusDataFolder: "raftFolderFromTests",
LeaveOnShutdown: true,
MonitoringIntervalSeconds: 2,
}

81
informer/disk/disk.go Normal file
View File

@ -0,0 +1,81 @@
// Package disk implements an ipfs-cluster informer which determines
// the current RepoSize of the ipfs daemon datastore and returns it as an
// api.Metric.
package disk
import (
"fmt"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
"github.com/ipfs/ipfs-cluster/api"
)
var logger = logging.Logger("diskinfo")
// MetricTTL specifies how long our reported metric is valid in seconds.
var MetricTTL = 30
// MetricName specifies the name of our metric
var MetricName = "disk"
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
rpcClient *rpc.Client
}
// NewInformer returns an initialized Informer.
func NewInformer() *Informer {
return &Informer{}
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (disk *Informer) SetClient(c *rpc.Client) {
disk.rpcClient = c
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (disk *Informer) Shutdown() error {
disk.rpcClient = nil
return nil
}
// Name returns the name of this informer.
func (disk *Informer) Name() string {
return MetricName
}
// GetMetric uses the IPFSConnector the current
// repository size and returns it in a metric.
func (disk *Informer) GetMetric() api.Metric {
if disk.rpcClient == nil {
return api.Metric{
Valid: false,
}
}
var repoSize int
valid := true
err := disk.rpcClient.Call("",
"Cluster",
"IPFSRepoSize",
struct{}{},
&repoSize)
if err != nil {
logger.Error(err)
valid = false
}
m := api.Metric{
Name: MetricName,
Value: fmt.Sprintf("%d", repoSize),
Valid: valid,
}
m.SetTTL(MetricTTL)
return m
}

View File

@ -0,0 +1,81 @@
package disk
import (
"errors"
"testing"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
"github.com/ipfs/ipfs-cluster/test"
)
type badRPCService struct {
nthCall int
}
func badRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("Cluster", &badRPCService{})
if err != nil {
t.Fatal(err)
}
return c
}
// func (mock *badRPCService) IPFSConfigKey(in string, out *interface{}) error {
// mock.nthCall++
// switch mock.nthCall {
// case 1:
// return errors.New("fake error the first time you use this mock")
// case 2:
// // don't set out
// return nil
// case 3:
// // don't set to string
// *out = 3
// case 4:
// // non parseable string
// *out = "abc"
// default:
// *out = "10KB"
// }
// return nil
// }
func (mock *badRPCService) IPFSRepoSize(in struct{}, out *int) error {
*out = 2
mock.nthCall++
return errors.New("fake error")
}
func Test(t *testing.T) {
inf := NewInformer()
defer inf.Shutdown()
if inf.Name() != "disk" {
t.Error("careful when changing the name of an informer")
}
m := inf.GetMetric()
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(test.NewMockRPCClient(t))
m = inf.GetMetric()
if !m.Valid {
t.Error("metric should be valid")
}
// The mock client reports 100KB and 2 pins of 1 KB
if m.Value != "2000" {
t.Error("bad metric value")
}
}
func TestWithErrors(t *testing.T) {
inf := NewInformer()
defer inf.Shutdown()
inf.SetClient(badRPCClient(t))
m := inf.GetMetric()
if m.Valid {
t.Errorf("metric should be invalid")
}
}

View File

@ -15,8 +15,9 @@ import (
"github.com/urfave/cli"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/numpinalloc"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/api/restapi"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
@ -177,6 +178,11 @@ func main() {
Value: "info",
Usage: "set the loglevel for cluster only [critical, error, warning, info, debug]",
},
cli.StringFlag{
Name: "alloc, a",
Value: "disk",
Usage: "allocation strategy [reposize, pincount]. Overrides the \"allocation_strategy\" value from the configuration",
},
}
app.Commands = []cli.Command{
@ -241,6 +247,10 @@ func run(c *cli.Context) error {
cfg.LeaveOnShutdown = true
}
if a := c.String("alloc"); a != "" {
cfg.AllocationStrategy = a
}
api, err := restapi.NewRESTAPI(cfg.APIAddr)
checkErr("creating REST API component", err)
@ -251,8 +261,7 @@ func run(c *cli.Context) error {
state := mapstate.NewMapState()
tracker := maptracker.NewMapPinTracker(cfg.ID)
mon := basic.NewStdPeerMonitor(cfg.MonitoringIntervalSeconds)
informer := numpin.NewInformer()
alloc := numpinalloc.NewAllocator()
informer, alloc := setupAllocation(cfg.AllocationStrategy)
cluster, err := ipfscluster.NewCluster(
cfg,
@ -288,6 +297,8 @@ var facilities = []string{
"monitor",
"consensus",
"pintracker",
"ascendalloc",
"diskinfo",
}
func setupLogging(lvl string) {
@ -296,6 +307,19 @@ func setupLogging(lvl string) {
}
}
func setupAllocation(strategy string) (ipfscluster.Informer, ipfscluster.PinAllocator) {
switch strategy {
case "disk", "reposize":
return disk.NewInformer(), ascendalloc.NewAllocator()
case "numpin", "pincount":
return numpin.NewInformer(), ascendalloc.NewAllocator()
default:
err := errors.New("unknown allocation strategy")
checkErr("", err)
return nil, nil
}
}
func setupDebug() {
l := "DEBUG"
for _, f := range facilities {

View File

@ -73,7 +73,13 @@ type IPFSConnector interface {
PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error)
// ConnectSwarms make sure this peer's IPFS daemon is connected to
// other peers IPFS daemons.
ConnectSwarms()
ConnectSwarms() error
// ConfigKey returns the value for a configuration key.
// Subobjects are reached with keypaths as "Parent/Child/GrandChild...".
ConfigKey(keypath string) (interface{}, error)
// RepoSize returns the current repository size as expressed
// by "repo stat".
RepoSize() (int, error)
}
// Peered represents a component which needs to be aware of the peers

View File

@ -10,10 +10,10 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/allocator/numpinalloc"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/restapi"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
@ -93,9 +93,9 @@ func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, state.S
state := mapstate.NewMapState()
tracker := maptracker.NewMapPinTracker(cfg.ID)
mon := basic.NewStdPeerMonitor(cfg.MonitoringIntervalSeconds)
alloc := numpinalloc.NewAllocator()
numpin.MetricTTL = 1 // second
inf := numpin.NewInformer()
alloc := ascendalloc.NewAllocator()
disk.MetricTTL = 1 // second
inf := disk.NewInformer()
return cfg, api, ipfs, state, tracker, mon, alloc, inf, mock
}
@ -664,8 +664,7 @@ func TestClustersReplication(t *testing.T) {
// Why is replication factor nClusters - 1?
// Because that way we know that pinning nCluster
// pins with an strategy like numpins (which tries
// to make everyone pin the same number of things),
// pins with an strategy like numpins/disk
// will result in each peer holding locally exactly
// nCluster pins.

View File

@ -94,6 +94,11 @@ type ipfsIDResp struct {
Addresses []string
}
type ipfsRepoStatResp struct {
RepoSize int
NumObjects int
}
// NewConnector creates the component and leaves it ready to be started
func NewConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*Connector, error) {
_, nodeAddr, err := manet.DialArgs(ipfsNodeMAddr)
@ -531,7 +536,7 @@ func (ipfs *Connector) apiURL() string {
// ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms() {
func (ipfs *Connector) ConnectSwarms() error {
var idsSerial []api.IDSerial
err := ipfs.rpcClient.Call("",
"Cluster",
@ -540,7 +545,7 @@ func (ipfs *Connector) ConnectSwarms() {
&idsSerial)
if err != nil {
logger.Error(err)
return
return err
}
logger.Debugf("%+v", idsSerial)
@ -559,4 +564,67 @@ func (ipfs *Connector) ConnectSwarms() {
logger.Debugf("ipfs successfully connected to %s", addr)
}
}
return nil
}
// ConfigKey fetches the IPFS daemon configuration and retrieves the value for
// a given configuration key. For example, "Datastore/StorageMax" will return
// the value for StorageMax in the Datastore configuration object.
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
resp, err := ipfs.get("config/show")
if err != nil {
logger.Error(err)
return nil, err
}
var cfg map[string]interface{}
err = json.Unmarshal(resp, &cfg)
if err != nil {
logger.Error(err)
return nil, err
}
path := strings.SplitN(keypath, "/", 2)
if len(path) == 0 {
return nil, errors.New("cannot lookup without a path")
}
return getConfigValue(path, cfg)
}
func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, error) {
value, ok := cfg[path[0]]
if !ok {
return nil, errors.New("key not found in configuration")
}
if len(path) == 1 {
return value, nil
}
switch value.(type) {
case map[string]interface{}:
v := value.(map[string]interface{})
return getConfigValue(path[1:], v)
default:
return nil, errors.New("invalid path")
}
}
// RepoSize returns the current repository size of the ipfs daemon as
// provided by "repo stats". The value is in bytes.
func (ipfs *Connector) RepoSize() (int, error) {
resp, err := ipfs.get("repo/stat")
if err != nil {
logger.Error(err)
return 0, err
}
var stats ipfsRepoStatResp
err = json.Unmarshal(resp, &stats)
if err != nil {
logger.Error(err)
return 0, err
}
return stats.RepoSize, nil
}

View File

@ -345,6 +345,69 @@ func TestConnectSwarms(t *testing.T) {
time.Sleep(time.Second)
}
func TestRepoSize(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
s, err := ipfs.RepoSize()
if err != nil {
t.Fatal(err)
}
// See the ipfs mock implementation
if s != 0 {
t.Error("expected 0 bytes of size")
}
c, _ := cid.Decode(test.TestCid1)
err = ipfs.Pin(c)
if err != nil {
t.Error("expected success pinning cid")
}
s, err = ipfs.RepoSize()
if err != nil {
t.Fatal(err)
}
if s != 1000 {
t.Error("expected 1000 bytes of size")
}
}
func TestConfigKey(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
v, err := ipfs.ConfigKey("Datastore/StorageMax")
if err != nil {
t.Fatal(err)
}
sto, ok := v.(string)
if !ok {
t.Fatal("error converting to string")
}
if sto != "10G" {
t.Error("StorageMax shouold be 10G")
}
v, err = ipfs.ConfigKey("Datastore")
_, ok = v.(map[string]interface{})
if !ok {
t.Error("should have returned the whole Datastore config object")
}
_, err = ipfs.ConfigKey("")
if err == nil {
t.Error("should not work with an empty path")
}
_, err = ipfs.ConfigKey("Datastore/abc")
if err == nil {
t.Error("should not work with a bad path")
}
}
func proxyURL(c *Connector) string {
_, addr, _ := manet.DialArgs(c.proxyMAddr)
return fmt.Sprintf("http://%s/api/v0", addr)

View File

@ -12,6 +12,8 @@ var facilities = []string{
"consensus",
"raft",
"pintracker",
"ascendalloc",
"diskinfo",
}
// SetFacilityLogLevel sets the log level for a given module

View File

@ -219,10 +219,24 @@ func (rpcapi *RPCAPI) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus) er
return err
}
// ConnectSwarms runs IPFSConnector.ConnectSwarms().
func (rpcapi *RPCAPI) ConnectSwarms(in struct{}, out *struct{}) error {
rpcapi.c.ipfs.ConnectSwarms()
return nil
// IPFSConnectSwarms runs IPFSConnector.ConnectSwarms().
func (rpcapi *RPCAPI) IPFSConnectSwarms(in struct{}, out *struct{}) error {
err := rpcapi.c.ipfs.ConnectSwarms()
return err
}
// IPFSConfigKey runs IPFSConnector.ConfigKey().
func (rpcapi *RPCAPI) IPFSConfigKey(in string, out *interface{}) error {
res, err := rpcapi.c.ipfs.ConfigKey(in)
*out = res
return err
}
// IPFSRepoSize runs IPFSConnector.RepoSize().
func (rpcapi *RPCAPI) IPFSRepoSize(in struct{}, out *int) error {
res, err := rpcapi.c.ipfs.RepoSize()
*out = res
return err
}
/*

View File

@ -45,6 +45,17 @@ type idResp struct {
Addresses []string
}
type repoStatResp struct {
RepoSize int
RepoCount int
}
type configResp struct {
Datastore struct {
StorageMax string
}
}
// NewIpfsMock returns a new mock.
func NewIpfsMock() *IpfsMock {
st := mapstate.NewMapState()
@ -166,6 +177,24 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
}
j, _ := json.Marshal(resp)
w.Write(j)
case "repo/stat":
len := len(m.pinMap.List())
resp := repoStatResp{
RepoSize: len * 1000,
RepoCount: len,
}
j, _ := json.Marshal(resp)
w.Write(j)
case "config/show":
resp := configResp{
Datastore: struct {
StorageMax string
}{
StorageMax: "10G",
},
}
j, _ := json.Marshal(resp)
w.Write(j)
case "version":
w.Write([]byte("{\"Version\":\"m.o.c.k\"}"))
default:

View File

@ -241,6 +241,22 @@ func (mock *mockService) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus)
return nil
}
func (mock *mockService) ConnectSwarms(in struct{}, out *struct{}) error {
func (mock *mockService) IPFSConnectSwarms(in struct{}, out *struct{}) error {
return nil
}
func (mock *mockService) IPFSConfigKey(in string, out *interface{}) error {
switch in {
case "Datastore/StorageMax":
*out = "100KB"
default:
return errors.New("configuration key not found")
}
return nil
}
func (mock *mockService) IPFSRepoSize(in struct{}, out *int) error {
// since we have two pins. Assume each is 1KB.
*out = 2000
return nil
}