Merge pull request #714 from ipfs/feat/monitor-accrual

Monitoring and re-allocation revamp: Accrual failure detection
This commit is contained in:
Hector Sanjuan 2019-05-16 13:46:02 +01:00 committed by GitHub
commit 5be1b6532f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1121 additions and 99 deletions

View File

@ -248,6 +248,16 @@ type GlobalPinInfo struct {
PeerMap map[string]*PinInfo `json:"peer_map" codec:"pm,omitempty"`
}
// String returns the string representation of a GlobalPinInfo.
func (gpi *GlobalPinInfo) String() string {
str := fmt.Sprintf("Cid: %v\n", gpi.Cid.String())
str = str + "Peer:\n"
for _, p := range gpi.PeerMap {
str = str + fmt.Sprintf("\t%+v\n", p)
}
return str
}
// PinInfo holds information about local pins.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
@ -581,6 +591,19 @@ type Pin struct {
Reference *cid.Cid `json:"reference" codec:"r,omitempty"`
}
// String is a string representation of a Pin.
func (pin *Pin) String() string {
var b strings.Builder
fmt.Fprintf(&b, "cid: %s\n", pin.Cid.String())
fmt.Fprintf(&b, "type: %s\n", pin.Type)
fmt.Fprintf(&b, "allocations: %v\n", pin.Allocations)
fmt.Fprintf(&b, "maxdepth: %d\n", pin.MaxDepth)
if pin.Reference != nil {
fmt.Fprintf(&b, "reference: %s\n", pin.Reference)
}
return b.String()
}
// PinPath is a wrapper for holding pin options and path of the content.
type PinPath struct {
PinOptions

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"mime/multipart"
"sort"
"sync"
"time"
@ -83,6 +84,7 @@ type Cluster struct {
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up.
func NewCluster(
ctx context.Context,
host host.Host,
dht *dht.IpfsDHT,
cfg *Config,
@ -105,7 +107,7 @@ func NewCluster(
return nil, errors.New("cluster host is nil")
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
listenAddrs := ""
for _, addr := range host.Addrs() {
@ -297,15 +299,23 @@ func (c *Cluster) alertsHandler() {
case <-c.ctx.Done():
return
case alrt := <-c.monitor.Alerts():
// only the leader handles alerts
leader, err := c.consensus.Leader(c.ctx)
if err == nil && leader == c.id {
logger.Warningf(
"Peer %s received alert for %s in %s",
c.id, alrt.MetricName, alrt.Peer,
)
switch alrt.MetricName {
case pingMetricName:
cState, err := c.consensus.State(c.ctx)
if err != nil {
logger.Warning(err)
return
}
list, err := cState.List(c.ctx)
if err != nil {
logger.Warning(err)
return
}
for _, pin := range list {
if len(pin.Allocations) == 1 && containsPeer(pin.Allocations, alrt.Peer) {
logger.Warning("a pin with only one allocation cannot be repinned")
logger.Warning("to make repinning possible, pin with a replication factor of 2+")
continue
}
if c.shouldPeerRepinCid(alrt.Peer, pin) {
c.repinFromPeer(c.ctx, alrt.Peer)
}
}
@ -313,6 +323,25 @@ func (c *Cluster) alertsHandler() {
}
}
// shouldPeerRepinCid returns true if the current peer is the top of the
// allocs list. The failed peer is ignored, i.e. if current peer is
// second and the failed peer is first, the function will still
// return true.
func (c *Cluster) shouldPeerRepinCid(failed peer.ID, pin *api.Pin) bool {
if containsPeer(pin.Allocations, failed) && containsPeer(pin.Allocations, c.id) {
allocs := peer.IDSlice(pin.Allocations)
sort.Sort(allocs)
if allocs[0] == c.id {
return true
}
if allocs[1] == c.id && allocs[0] == failed {
return true
}
}
return false
}
// detects any changes in the peerset and saves the configuration. When it
// detects that we have been removed from the peerset, it shuts down this peer.
func (c *Cluster) watchPeers() {

View File

@ -140,8 +140,9 @@ type mockTracer struct {
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ctx := context.Background()
host, pubsub, dht, err := NewClusterHost(context.Background(), clusterCfg)
host, pubsub, dht, err := NewClusterHost(ctx, clusterCfg)
if err != nil {
t.Fatal(err)
}
@ -156,7 +157,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
raftcon, _ := raft.NewConsensus(host, raftCfg, store, false)
psmonCfg.CheckInterval = 2 * time.Second
mon, err := pubsubmon.New(psmonCfg, pubsub, raftcon.Peers)
mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, raftcon.Peers)
if err != nil {
t.Fatal(err)
}
@ -169,6 +170,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
ReadyTimeout = raftCfg.WaitForLeaderTimeout + 1*time.Second
cl, err := NewCluster(
ctx,
host,
dht,
clusterCfg,

View File

@ -22,6 +22,7 @@ import (
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"go.opencensus.io/tag"
ds "github.com/ipfs/go-datastore"
host "github.com/libp2p/go-libp2p-host"
@ -103,6 +104,9 @@ func createCluster(
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg)
checkErr("creating libP2P Host", err)
ctx, err = tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty()))
checkErr("tag context with host id", err)
peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath())
// Import peers but do not connect. We cannot connect to peers until
// everything has been created (dht, pubsub, bitswap). Otherwise things
@ -164,13 +168,14 @@ func createCluster(
peersF = cons.Peers
}
mon, err := pubsubmon.New(cfgs.pubsubmonCfg, pubsub, peersF)
mon, err := pubsubmon.New(ctx, cfgs.pubsubmonCfg, pubsub, peersF)
if err != nil {
store.Close()
checkErr("setting up PeerMonitor", err)
}
return ipfscluster.NewCluster(
ctx,
host,
dht,
cfgs.clusterCfg,

View File

@ -102,7 +102,8 @@ var testingTrackerCfg = []byte(`
`)
var testingMonCfg = []byte(`{
"check_interval": "300ms"
"check_interval": "300ms",
"failure_threshold": 5
}`)
var testingDiskInfCfg = []byte(`{
@ -111,7 +112,10 @@ var testingDiskInfCfg = []byte(`{
}`)
var testingTracerCfg = []byte(`{
"enable_tracing": false
"enable_tracing": false,
"jaeger_agent_endpoint": "/ip4/0.0.0.0/udp/6831",
"sampling_prob": 1,
"service_name": "cluster-daemon"
}`)
func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {

2
go.mod
View File

@ -87,5 +87,7 @@ require (
golang.org/x/sync v0.0.0-20190412183630-56d357773e84 // indirect
golang.org/x/sys v0.0.0-20190416152802-12500544f89f // indirect
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 // indirect
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a
gonum.org/v1/plot v0.0.0-20190410204940-3a5f52653745
google.golang.org/grpc v1.19.1 // indirect
)

25
go.sum
View File

@ -14,6 +14,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx
github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e h1:2Z+EBRrOJsA3psnUPcEWMIH2EIga1xHflQcr/EZslx8=
github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e/go.mod h1:3oM7gXIttpYDAJXpVNnSCiUMYBLIZ6cb1t+Ip982MRo=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af h1:wVe6/Ea46ZMeNkQjjBW6xcqyQA/j5e0D6GytH95g0gQ=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs=
@ -69,6 +71,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5/go.mod h1:JpoxHjuQauoxiFMl1ie8Xc/7TfLuMZ5eOCONd1sUBHg=
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90 h1:WXb3TSNmHp2vHoCroCIB1foO/yQ36swABL8aOVeDpgg=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@ -81,6 +85,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
@ -255,6 +261,8 @@ github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVY
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5 h1:PJr+ZMXIecYc1Ey2zucXdR73SMBtgjPgwa31099IMv0=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
@ -567,7 +575,13 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49 h1:mE9V9RMa141kxdQR3pfZM3mkg0MPyw+FOPpnciBXkbE=
golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2 h1:y102fOLFqhV41b+4GPiJoa0k/x+pJcEi2/HB1Y5T6fU=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81 h1:00VmoueYNlNz/aHIilyyQz/MHSqGoWJzpFv/HW8xpzI=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@ -622,9 +636,11 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c h1:vamGzbGri8IKo20MQncCuljcQ5uAO6kaCeawQPVblAI=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@ -633,6 +649,13 @@ golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3 h1:P6iTFmrTQqWrqLZPX1VMz
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 h1:PPwnA7z1Pjf7XYaBP9GL1VAMZmcIWyFz7QCMSIIa3Bg=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a h1:XffIu/i+IJIC+M8WoBEmJm8N/YYbA8Pvh748YgzU7kI=
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190410204940-3a5f52653745 h1:Xaq5xR1I2KM/MWp1vwZxOosUPa1U8wtNN8zRbVko0ZY=
gonum.org/v1/plot v0.0.0-20190410204940-3a5f52653745/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/api v0.0.0-20181220000619-583d854617af h1:iQMS7JKv/0w/iiWf1M49Cg3dmOkBoBZT5KheqPDpaac=
google.golang.org/api v0.0.0-20181220000619-583d854617af/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.3.1 h1:oJra/lMfmtm13/rgY/8i3MzjFWYXvQIAKjQ3HqofMk8=
@ -667,3 +690,5 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
rsc.io/pdf v0.1.1 h1:k1MczvYDUvJBe93bYd7wrZLLUEcLZAuF824/I4e5Xr4=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

View File

@ -56,6 +56,7 @@ func TestDefault(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}
func TestApplyEnvVars(t *testing.T) {

View File

@ -134,7 +134,26 @@ func randomBytes() []byte {
return bs
}
func createComponents(t *testing.T, host host.Host, pubsub *pubsub.PubSub, dht *dht.IpfsDHT, i int, staging bool) (*Config, ds.Datastore, Consensus, []API, IPFSConnector, PinTracker, PeerMonitor, PinAllocator, Informer, Tracer, *test.IpfsMock) {
func createComponents(
t *testing.T,
host host.Host,
pubsub *pubsub.PubSub,
dht *dht.IpfsDHT,
i int,
staging bool,
) (
*Config,
ds.Datastore,
Consensus,
[]API,
IPFSConnector,
PinTracker,
PeerMonitor,
PinAllocator,
Informer,
Tracer,
*test.IpfsMock,
) {
ctx := context.Background()
mock := test.NewIpfsMock(t)
@ -187,9 +206,10 @@ func createComponents(t *testing.T, host host.Host, pubsub *pubsub.PubSub, dht *
if consensus == "raft" {
peersF = cons.Peers
}
mon, err := pubsubmon.New(psmonCfg, pubsub, peersF)
mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, peersF)
checkErr(t, err)
tracingCfg.ServiceName = peername
tracer, err := observations.SetupTracing(tracingCfg)
checkErr(t, err)
@ -236,7 +256,7 @@ func makePinTracker(t *testing.T, pid peer.ID, mptCfg *maptracker.Config, sptCfg
}
func createCluster(t *testing.T, host host.Host, dht *dht.IpfsDHT, clusterCfg *Config, store ds.Datastore, consensus Consensus, apis []API, ipfs IPFSConnector, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer, tracer Tracer) *Cluster {
cl, err := NewCluster(host, dht, clusterCfg, store, consensus, apis, ipfs, tracker, mon, alloc, inf, tracer)
cl, err := NewCluster(context.Background(), host, dht, clusterCfg, store, consensus, apis, ipfs, tracker, mon, alloc, inf, tracer)
checkErr(t, err)
return cl
}
@ -452,6 +472,34 @@ loop:
}
}
func waitForClustersHealthy(t *testing.T, clusters []*Cluster) {
t.Helper()
if len(clusters) == 0 {
return
}
timer := time.NewTimer(15 * time.Second)
for {
ttlDelay()
metrics := clusters[0].monitor.LatestMetrics(context.Background(), clusters[0].informer.Name())
healthy := 0
for _, m := range metrics {
if !m.Expired() {
healthy++
}
}
if len(clusters) == healthy {
return
}
select {
case <-timer.C:
t.Fatal("timed out waiting for clusters to be healthy")
default:
}
}
}
/////////////////////////////////////////
func TestClustersVersion(t *testing.T) {
@ -999,7 +1047,7 @@ func TestClustersShutdown(t *testing.T) {
runF(t, clusters, f)
}
func TestClustersReplication(t *testing.T) {
func TestClustersReplicationOverall(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
@ -1008,7 +1056,8 @@ func TestClustersReplication(t *testing.T) {
c.config.ReplicationFactorMax = nClusters - 1
}
ttlDelay()
// wait for clusters to stablise
waitForClustersHealthy(t, clusters)
// Why is replication factor nClusters - 1?
// Because that way we know that pinning nCluster
@ -1029,7 +1078,7 @@ func TestClustersReplication(t *testing.T) {
}
pinDelay()
// check that it is held by exactly nClusters -1 peers
// check that it is held by exactly nClusters - 1 peers
gpi, err := clusters[j].Status(ctx, h)
if err != nil {
t.Fatal(err)
@ -1045,8 +1094,11 @@ func TestClustersReplication(t *testing.T) {
}
}
if numLocal != nClusters-1 {
t.Errorf("We wanted replication %d but it's only %d",
nClusters-1, numLocal)
t.Errorf(
"We wanted replication %d but it's only %d",
nClusters-1,
numLocal,
)
}
if numRemote != 1 {
@ -1056,10 +1108,12 @@ func TestClustersReplication(t *testing.T) {
}
f := func(t *testing.T, c *Cluster) {
// confirm that the pintracker state matches the current global state
pinfos := c.tracker.StatusAll(ctx)
if len(pinfos) != nClusters {
t.Error("Pinfos does not have the expected pins")
}
numRemote := 0
numLocal := 0
for _, pi := range pinfos {
@ -1072,12 +1126,11 @@ func TestClustersReplication(t *testing.T) {
}
}
if numLocal != nClusters-1 {
t.Errorf("Expected %d local pins but got %d", nClusters-1, numLocal)
t.Error(pinfos)
t.Errorf("%s: Expected %d local pins but got %d", c.id.String(), nClusters-1, numLocal)
}
if numRemote != 1 {
t.Errorf("Expected 1 remote pin but got %d", numRemote)
t.Errorf("%s: Expected 1 remote pin but got %d", c.id.String(), numRemote)
}
pins, err := c.Pins(ctx)

View File

@ -297,7 +297,7 @@ func (ipfs *Connector) Pin(ctx context.Context, hash cid.Cid, maxDepth int) erro
return err
}
logger.Debugf("Refs for %s sucessfully fetched", hash)
stats.Record(ctx, observations.PinCountMetric.M(1))
stats.Record(ctx, observations.Pins.M(1))
}
path := fmt.Sprintf("pin/add?arg=%s&%s", hash, pinArgs)
@ -329,7 +329,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error {
return err
}
logger.Info("IPFS Unpin request succeeded:", hash)
stats.Record(ctx, observations.PinCountMetric.M(-1))
stats.Record(ctx, observations.Pins.M(-1))
}
logger.Debug("IPFS object is already unpinned: ", hash)

View File

@ -6,6 +6,9 @@ import (
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/observations"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
peer "github.com/libp2p/go-libp2p-peer"
)
@ -19,27 +22,37 @@ var ErrAlertChannelFull = errors.New("alert channel is full")
// Checker provides utilities to find expired metrics
// for a given peerset and send alerts if it proceeds to do so.
type Checker struct {
alertCh chan *api.Alert
metrics *Store
ctx context.Context
alertCh chan *api.Alert
metrics *Store
threshold float64
}
// NewChecker creates a Checker using the given
// MetricsStore.
func NewChecker(metrics *Store) *Checker {
// MetricsStore. The threshold value indicates when a
// monitored component should be considered to have failed.
// The greater the threshold value the more leniency is granted.
//
// A value between 2.0 and 4.0 is suggested for the threshold.
func NewChecker(ctx context.Context, metrics *Store, threshold float64) *Checker {
return &Checker{
alertCh: make(chan *api.Alert, AlertChannelCap),
metrics: metrics,
ctx: ctx,
alertCh: make(chan *api.Alert, AlertChannelCap),
metrics: metrics,
threshold: threshold,
}
}
// CheckPeers will trigger alerts all latest metrics from the given peerset
// CheckPeers will trigger alerts based on the latest metrics from the given peerset
// when they have expired and no alert has been sent before.
func (mc *Checker) CheckPeers(peers []peer.ID) error {
for _, peer := range peers {
for _, metric := range mc.metrics.PeerMetrics(peer) {
err := mc.alertIfExpired(metric)
if err != nil {
return err
if mc.FailedMetric(metric.Name, peer) {
err := mc.alert(peer, metric.Name)
if err != nil {
return err
}
}
}
}
@ -50,9 +63,11 @@ func (mc *Checker) CheckPeers(peers []peer.ID) error {
// and no alert has been sent before.
func (mc Checker) CheckAll() error {
for _, metric := range mc.metrics.AllMetrics() {
err := mc.alertIfExpired(metric)
if err != nil {
return err
if mc.FailedMetric(metric.Name, metric.Peer) {
err := mc.alert(metric.Peer, metric.Name)
if err != nil {
return err
}
}
}
@ -80,6 +95,11 @@ func (mc *Checker) alert(pid peer.ID, metricName string) error {
}
select {
case mc.alertCh <- alrt:
stats.RecordWithTags(
mc.ctx,
[]tag.Mutator{tag.Upsert(observations.RemotePeerKey, pid.Pretty())},
observations.Alerts.M(1),
)
default:
return ErrAlertChannelFull
}
@ -114,3 +134,40 @@ func (mc *Checker) Watch(ctx context.Context, peersF func(context.Context) ([]pe
}
}
}
// Failed returns true if a peer has potentially failed.
// Peers that are not present in the metrics store will return
// as failed.
func (mc *Checker) Failed(pid peer.ID) bool {
_, _, _, result := mc.failed("ping", pid)
return result
}
// FailedMetric is the same as Failed but can use any metric type,
// not just ping.
func (mc *Checker) FailedMetric(metric string, pid peer.ID) bool {
_, _, _, result := mc.failed(metric, pid)
return result
}
// failed returns all the values involved in making the decision
// as to whether a peer has failed or not. This mainly for debugging
// purposes.
func (mc *Checker) failed(metric string, pid peer.ID) (float64, []float64, float64, bool) {
latest := mc.metrics.PeerLatest(metric, pid)
if latest == nil {
return 0.0, nil, 0.0, true
}
v := time.Now().UnixNano() - latest.ReceivedAt
dv := mc.metrics.Distribution(metric, pid)
// one metric isn't enough to calculate a distribution
// alerting/failure detection will fallback to the metric-expiring
// method
switch {
case len(dv) < 5 && !latest.Expired():
return float64(v), dv, 0.0, false
default:
phiv := phi(float64(v), dv)
return float64(v), dv, phiv, phiv >= mc.threshold
}
}

View File

@ -2,21 +2,27 @@ package metrics
import (
"context"
"fmt"
"math"
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
"gonum.org/v1/plot"
"gonum.org/v1/plot/plotter"
"gonum.org/v1/plot/plotutil"
"gonum.org/v1/plot/vg"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
func TestChecker(t *testing.T) {
func TestCheckPeers(t *testing.T) {
metrics := NewStore()
checker := NewChecker(metrics)
checker := NewChecker(context.Background(), metrics, 2.0)
metr := &api.Metric{
Name: "test",
Name: "ping",
Peer: test.PeerID1,
Value: "1",
Valid: true,
@ -52,12 +58,12 @@ func TestChecker(t *testing.T) {
}
}
func TestCheckerWatch(t *testing.T) {
func TestChecker_Watch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
metrics := NewStore()
checker := NewChecker(metrics)
checker := NewChecker(context.Background(), metrics, 2.0)
metr := &api.Metric{
Name: "test",
@ -81,3 +87,302 @@ func TestCheckerWatch(t *testing.T) {
t.Fatal("should have received an alert")
}
}
func TestChecker_Failed(t *testing.T) {
t.Run("standard failure check", func(t *testing.T) {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, 2.0)
for i := 0; i < 10; i++ {
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(2) * time.Millisecond)
}
for i := 0; i < 10; i++ {
metrics.Add(makePeerMetric(test.PeerID1, "1"))
got := checker.Failed(test.PeerID1)
// the magic number 17 represents the point at which
// the time between metrics addition has gotten
// so large that the probability that the service
// has failed goes over the threshold.
if i >= 17 && !got {
t.Fatal("threshold should have been passed by now")
}
time.Sleep(time.Duration(i) * time.Millisecond)
}
})
}
//////////////////
// HELPER TESTS //
//////////////////
func TestThresholdValues(t *testing.T) {
t.Log("TestThresholdValues is useful for testing out different threshold values")
t.Log("It doesn't actually perform any 'tests', so it is skipped by default")
t.SkipNow()
thresholds := []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.8, 0.9, 1.3, 1.4, 1.5, 1.8, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0, 20.0}
t.Run("linear threshold test", func(t *testing.T) {
dists := make([]timeseries, 0)
phivs := make([]timeseries, 0)
for _, v := range thresholds {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, v)
tsName := fmt.Sprintf("%f", v)
distTS := newTS(tsName)
phivTS := newTS(tsName)
var failed bool
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
fmt.Printf("threshold: %f\n", v)
fmt.Printf("latest: %f\n", inputv)
fmt.Printf("distribution: %v\n", dist)
}
phivTS.record(phiv)
failed = got
if failed {
distTS.record(float64(i))
}
return got
}
// set baseline static distribution of ping intervals
for i := 0; i < 10; i++ {
check(i)
distTS.record(float64(10))
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(10) * time.Millisecond)
}
// start linearly increasing the interval values
for i := 10; i < 100 && !check(i); i++ {
distTS.record(float64(i))
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(i) * time.Millisecond)
}
if failed {
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
continue
}
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
// if a threshold has made it to this point, all greater
// thresholds will make it here so print the threshold and skip
t.Log("threshold that was not meet by linear increase: ", v)
break
}
plotDistsPhivs("linear_", dists, phivs)
})
t.Run("cubic threshold test", func(t *testing.T) {
dists := make([]timeseries, 0)
phivs := make([]timeseries, 0)
for _, v := range thresholds {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, v)
tsName := fmt.Sprintf("%f", v)
distTS := newTS(tsName)
phivTS := newTS(tsName)
var failed bool
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
fmt.Printf("threshold: %f\n", v)
fmt.Printf("latest: %f\n", inputv)
fmt.Printf("distribution: %v\n", dist)
}
phivTS.record(phiv)
failed = got
if failed {
diff := math.Pow(float64(i), float64(i))
distTS.record(diff)
}
return got
}
// set baseline static distribution of ping intervals
for i := 0; i < 10; i++ {
check(i)
distTS.record(float64(8))
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(8) * time.Millisecond)
}
for i := 2; !check(i) && i < 20; i++ {
diff := math.Pow(float64(i), 3)
distTS.record(diff)
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(diff) * time.Millisecond)
}
if failed {
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
continue
}
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
// if a threshold has made it to this point, all greater
// thresholds will make it here so print the threshold and skip
t.Log("threshold that was not meet by cubic increase: ", v)
break
}
plotDistsPhivs("cubic_", dists, phivs)
})
t.Run("14x threshold test", func(t *testing.T) {
dists := make([]timeseries, 0)
phivs := make([]timeseries, 0)
for _, v := range thresholds {
metrics := NewStore()
checker := NewChecker(context.Background(), metrics, v)
tsName := fmt.Sprintf("%f", v)
distTS := newTS(tsName)
phivTS := newTS(tsName)
var failed bool
output := false
check := func(i int) bool {
inputv, dist, phiv, got := checker.failed("ping", test.PeerID1)
if output {
fmt.Println(i)
fmt.Printf("phiv: %f\n", phiv)
fmt.Printf("threshold: %f\n", v)
fmt.Printf("latest: %f\n", inputv)
fmt.Printf("distribution: %v\n", dist)
}
phivTS.record(phiv)
failed = got
if failed {
diff := i * 50
distTS.record(float64(diff))
}
return got
}
for i := 0; i < 10; i++ {
check(i)
distTS.record(float64(i))
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(i) * time.Millisecond)
}
for i := 10; !check(i) && i < 30; i++ {
diff := i * 50
distTS.record(float64(diff))
metrics.Add(makePeerMetric(test.PeerID1, "1"))
time.Sleep(time.Duration(diff) * time.Millisecond)
}
if failed {
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
continue
}
dists = append(dists, *distTS)
phivs = append(phivs, *phivTS)
// if a threshold has made it to this point, all greater
// thresholds will make it here so print the threshold and skip
t.Log("threshold that was not meet by 14x increase: ", v)
break
}
plotDistsPhivs("14x_", dists, phivs)
})
}
func makePeerMetric(pid peer.ID, value string) *api.Metric {
metr := &api.Metric{
Name: "ping",
Peer: pid,
Value: value,
Valid: true,
}
return metr
}
type timeseries struct {
name string
values []float64
}
func newTS(name string) *timeseries {
v := make([]float64, 0)
return &timeseries{name: name, values: v}
}
func (ts *timeseries) record(v float64) {
ts.values = append(ts.values, v)
}
func (ts *timeseries) ToXYs() plotter.XYs {
pts := make(plotter.XYs, len(ts.values))
for i, v := range ts.values {
pts[i].X = float64(i)
if math.IsInf(v, 0) {
pts[i].Y = -5
} else {
pts[i].Y = v
}
}
return pts
}
func toPlotLines(ts []timeseries) ([]string, [][]plot.Plotter) {
labels := make([]string, 0)
plotters := make([][]plot.Plotter, 0)
for i, t := range ts {
l, s, err := plotter.NewLinePoints(t.ToXYs())
if err != nil {
panic(err)
}
l.Width = vg.Points(2)
l.Color = plotutil.Color(i)
l.Dashes = plotutil.Dashes(i)
s.Color = plotutil.Color(i)
s.Shape = plotutil.Shape(i)
labels = append(labels, t.name)
plotters = append(plotters, []plot.Plotter{l, s})
}
return labels, plotters
}
func plotDistsPhivs(prefix string, dists, phivs []timeseries) {
plotTS(prefix+"dists", dists)
plotTS(prefix+"phivs", phivs)
}
func plotTS(name string, ts []timeseries) {
p, err := plot.New()
if err != nil {
panic(err)
}
p.Title.Text = name
p.Add(plotter.NewGrid())
labels, plotters := toPlotLines(ts)
for i := range labels {
l, pts := plotters[i][0], plotters[i][1]
p.Add(l, pts)
p.Legend.Add(labels[i], l.(*plotter.Line), pts.(*plotter.Scatter))
}
if err := p.Save(20*vg.Inch, 15*vg.Inch, name+".png"); err != nil {
panic(err)
}
}

80
monitor/metrics/prob.go Normal file
View File

@ -0,0 +1,80 @@
/*
Copyright (©) 2015 Timothée Peignier <timothee.peignier@tryphon.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
Reference of what was originally copied: https://ipfs.io/ipfs/QmeJDSL6g6u4NSuzUqRxZWhyKPJ6wJAqieQpqX5eXPCjj5
*/
package metrics
import (
"math"
"gonum.org/v1/gonum/floats"
)
// phi returns the φ-failure for the given value and distribution.
// Two edge cases that are dealt with in phi:
// 1. phi == math.+Inf
// 2. phi == math.NaN
//
// Edge case 1. is most certainly a failure, the value of v is is so large
// in comparison to the distribution that the cdf function returns a 1,
// which equates to a math.Log10(0) which is one of its special cases, i.e
// returns -Inf. In this case, phi() will return the math.+Inf value, as it
// will be a valid comparison against the threshold value in checker.Failed.
//
// Edge case 2. could be a failure but may not be. phi() will return NaN
// when the standard deviation of the distribution is 0, i.e. the entire
// distribution is the same number, {1,1,1,1,1}. Considering that we are
// using UnixNano timestamps this would be highly unlikely, but just in case
// phi() will return a -1 value, indicating that the caller should retry.
func phi(v float64, d []float64) float64 {
u, o := meanStdDev(d)
phi := -math.Log10(1 - cdf(u, o, v))
if math.IsNaN(phi) {
return -1
}
return phi
}
// cdf returns the cumulative distribution function if the given
// normal function, for the given value.
func cdf(u, o, v float64) float64 {
return ((1.0 / 2.0) * (1 + math.Erf((v-u)/(o*math.Sqrt2))))
}
func meanStdDev(v []float64) (m, sd float64) {
var variance float64
m, variance = meanVariance(v)
sd = math.Sqrt(variance)
return
}
func meanVariance(values []float64) (m, v float64) {
if len(values) == 0 {
return 0.0, 0.0
}
m = floats.Sum(values) / float64(len(values))
floats.AddConst(-m, values)
floats.Mul(values, values)
v = floats.Sum(values) / float64(len(values))
return
}

View File

@ -0,0 +1,317 @@
package metrics
import (
"math"
"math/rand"
"testing"
"time"
)
// NOTE: Test_phi and Test_cdf contain float64 want values that are 'precise',
// they look like golden test data, they ARE NOT. They have been calculated
// using Wolfram Alpha. The following three links provide examples of calculating
// the phi value:
// - standardDeviation: https://www.wolframalpha.com/input/?i=population+standard+deviation+-2,+-4,+-4,+-4,+-5,+-5,+-7,+-9
// - mean: https://www.wolframalpha.com/input/?i=mean+-2,+-4,+-4,+-4,+-5,+-5,+-7,+-9
// - cdf: https://www.wolframalpha.com/input/?i=(((1.0+%2F+2.0)+*+(1+%2B+Erf((-4--5)%2F(2*Sqrt2)))))
// - phi: https://www.wolframalpha.com/input/?i=-log10(1+-+0.691462461274013103637704610608337739883602175554577936)
//
// Output from the each calculation needs to copy-pasted over. Look at the phi source code
// to understand where each variable should go in the cdf calculation.
func Test_phi(t *testing.T) {
type args struct {
v float64
d []float64
}
tests := []struct {
name string
args args
want float64
}{
{
"infinity edge case",
args{
10,
[]float64{0, 0, 0, 0, 1, 1, 1, 1, 2, 2},
},
math.Inf(1),
},
{
"NaN edge case",
args{10000001, []float64{10000001, 10000001}},
-1, // phi() replaces math.NaN with -1 so it is still comparable
},
{
"increasing values",
args{
4,
[]float64{2, 4, 4, 4, 5, 5, 7, 9},
},
0.160231392277849,
},
{
"decreasing values",
args{
-4,
[]float64{-2, -4, -4, -4, -5, -5, -7, -9},
},
0.5106919892652407,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := phi(tt.args.v, tt.args.d)
if got != tt.want && !math.IsNaN(got) {
t.Errorf("phi() = %v, want %v", got, tt.want)
}
})
}
}
func Test_cdf(t *testing.T) {
type args struct {
values []float64
v float64
}
tests := []struct {
name string
args args
want float64
}{
{
"zero values",
args{[]float64{0}, 0},
math.NaN(),
},
{
"increasing values",
args{
[]float64{2, 4, 4, 4, 5, 5, 7, 9},
4,
},
0.3085375387259869,
},
{
"decreasing values",
args{
[]float64{-2, -4, -4, -4, -5, -5, -7, -9},
-4,
},
0.6914624612740131,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, sd := meanStdDev(tt.args.values)
got := cdf(m, sd, tt.args.v)
if got != tt.want && !math.IsNaN(got) {
t.Errorf("cdf() = %v, want %v", got, tt.want)
}
})
}
}
func Test_meanVariance(t *testing.T) {
type args struct {
values []float64
}
tests := []struct {
name string
args args
wantMean float64
wantVariance float64
}{
{
"zero values",
args{[]float64{}},
0,
0,
},
{
"increasing values",
args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}},
5,
4,
},
{
"decreasing values",
args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}},
-5,
4,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, v := meanVariance(tt.args.values)
if m != tt.wantMean {
t.Errorf("mean() = %v, want %v", m, tt.wantMean)
}
if v != tt.wantVariance {
t.Errorf("variance() = %v, want %v", v, tt.wantVariance)
}
})
}
}
func Test_meanStdDev(t *testing.T) {
type args struct {
v []float64
}
tests := []struct {
name string
args args
want float64
}{
{
"zero values",
args{[]float64{}},
0,
},
{
"increasing values",
args{[]float64{2, 4, 4, 4, 5, 5, 7, 9}},
2,
},
{
"decreasing values",
args{[]float64{-2, -4, -4, -4, -5, -5, -7, -9}},
2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// ignore mean value as it was tested in Test_meanVariance
// it is the same underlying implementation.
if _, got := meanStdDev(tt.args.v); got != tt.want {
t.Errorf("standardDeviation() = %v, want %v", got, tt.want)
}
})
}
}
func Benchmark_prob_phi(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
phi(v, d)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
phi(v, d)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
phi(v, d)
}
})
}
func Benchmark_prob_cdf(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
cdf(u, o, v)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
cdf(u, o, v)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
u, o := meanStdDev(d)
v := float64(r.Int63n(25))
b.ResetTimer()
for i := 0; i < b.N; i++ {
cdf(u, o, v)
}
})
}
func Benchmark_prob_meanVariance(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanVariance(d)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanVariance(d)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanVariance(d)
}
})
}
func Benchmark_prob_meanStdDev(b *testing.B) {
b.Run("distribution size 10", func(b *testing.B) {
d := makeRandSlice(10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanStdDev(d)
}
})
b.Run("distribution size 50", func(b *testing.B) {
d := makeRandSlice(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanStdDev(d)
}
})
b.Run("distribution size 1000", func(b *testing.B) {
d := makeRandSlice(1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
meanStdDev(d)
}
})
}
func makeRandSlice(size int) []float64 {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
s := make([]float64, size, size)
for i := 0; i < size-1; i++ {
s[i] = float64(r.Int63n(25)) + r.Float64()
}
return s
}

View File

@ -61,6 +61,7 @@ func (mtrs *Store) LatestValid(name string) []*api.Metric {
metrics := make([]*api.Metric, 0, len(byPeer))
for _, window := range byPeer {
m, err := window.Latest()
// TODO(ajl): for accrual, does it matter if a ping has expired?
if err != nil || m.Discard() {
continue
}
@ -110,3 +111,45 @@ func (mtrs *Store) PeerMetrics(pid peer.ID) []*api.Metric {
}
return result
}
// PeerLatest returns the latest of a particular metric for a
// particular peer. It may return an expired metric.
func (mtrs *Store) PeerLatest(name string, pid peer.ID) *api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
byPeer, ok := mtrs.byName[name]
if !ok {
return nil
}
window, ok := byPeer[pid]
if !ok {
return nil
}
m, err := window.Latest()
if err != nil {
// ignoring error, as nil metric is indicative enough
return nil
}
return m
}
// Distribution returns the distribution of a particular metrics
// for a particular peer.
func (mtrs *Store) Distribution(name string, pid peer.ID) []float64 {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
byPeer, ok := mtrs.byName[name]
if !ok {
return nil
}
window, ok := byPeer[pid]
if !ok {
return nil
}
return window.Distribution()
}

View File

@ -99,14 +99,14 @@ func (mw *Window) All() []*api.Metric {
// values contained in the current window. This will
// only return values if the api.Metric.Type() is "ping",
// which are used for accural failure detection.
func (mw *Window) Distribution() []int64 {
func (mw *Window) Distribution() []float64 {
ms := mw.All()
dist := make([]int64, 0, len(ms)-1)
dist := make([]float64, 0, len(ms)-1)
// the last value can't be used to calculate a delta
for i, v := range ms[:len(ms)-1] {
// All() provides an order slice, where ms[i] is younger than ms[i+1]
delta := v.ReceivedAt - ms[i+1].ReceivedAt
dist = append(dist, delta)
dist = append(dist, float64(delta))
}
return dist

View File

@ -354,30 +354,30 @@ func BenchmarkWindow_All(b *testing.B) {
func TestWindow_Distribution(t *testing.T) {
var tests = []struct {
name string
heartbeats []int64
want []int64
heartbeats []float64
want []float64
}{
{
"even 1 sec distribution",
[]int64{1, 1, 1, 1},
[]int64{1, 1, 1, 1},
[]float64{1, 1, 1, 1},
[]float64{1, 1, 1, 1},
},
{
"increasing latency distribution",
[]int64{1, 1, 2, 2, 3, 3, 4},
[]int64{4, 3, 3, 2, 2, 1, 1},
[]float64{1, 1, 2, 2, 3, 3, 4},
[]float64{4, 3, 3, 2, 2, 1, 1},
},
{
"random latency distribution",
[]int64{4, 1, 3, 9, 7, 8, 11, 18},
[]int64{18, 11, 8, 7, 9, 3, 1, 4},
[]float64{4, 1, 3, 9, 7, 8, 11, 18},
[]float64{18, 11, 8, 7, 9, 3, 1, 4},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mw := NewWindow(len(tt.heartbeats) + 1)
for i, v := range tt.heartbeats {
mw.Add(makeMetric(string(v * 10)))
mw.Add(makeMetric(string(int64(v * 10))))
// time.Sleep on the 1s of milliseconds level is
// susceptible to scheduler variance. Hence we
// multiple the input by 10 and this combined with
@ -396,10 +396,11 @@ func TestWindow_Distribution(t *testing.T) {
t.Errorf("want len: %v, got len: %v", len(tt.want), len(got))
}
var gotseconds []int64
var gotseconds []float64
for _, v := range got {
// truncate nanoseconds to seconds for testing purposes
gotseconds = append(gotseconds, v/10000000)
// also truncate decimal places by converting to int and then back
gotseconds = append(gotseconds, float64(int64(v/10000000)))
}
for i, s := range gotseconds {

View File

@ -14,7 +14,8 @@ const envConfigKey = "cluster_pubsubmon"
// Default values for this Config.
const (
DefaultCheckInterval = 15 * time.Second
DefaultCheckInterval = 15 * time.Second
DefaultFailureThreshold = 3.0
)
// Config allows to initialize a Monitor and customize some parameters.
@ -22,10 +23,15 @@ type Config struct {
config.Saver
CheckInterval time.Duration
// FailureThreshold indicates when a peer should be considered failed.
// The greater the threshold value the more leniency is granted.
// A value between 2.0 and 4.0 is suggested for the threshold.
FailureThreshold float64
}
type jsonConfig struct {
CheckInterval string `json:"check_interval"`
CheckInterval string `json:"check_interval"`
FailureThreshold *float64 `json:"failure_threshold"`
}
// ConfigKey provides a human-friendly identifier for this type of Config.
@ -36,6 +42,7 @@ func (cfg *Config) ConfigKey() string {
// Default sets the fields of this Config to sensible values.
func (cfg *Config) Default() error {
cfg.CheckInterval = DefaultCheckInterval
cfg.FailureThreshold = DefaultFailureThreshold
return nil
}
@ -58,6 +65,11 @@ func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 {
return errors.New("pubsubmon.check_interval too low")
}
if cfg.FailureThreshold <= 0 {
return errors.New("pubsubmon.failure_threshold too low")
}
return nil
}
@ -79,6 +91,9 @@ func (cfg *Config) LoadJSON(raw []byte) error {
func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
interval, _ := time.ParseDuration(jcfg.CheckInterval)
cfg.CheckInterval = interval
if jcfg.FailureThreshold != nil {
cfg.FailureThreshold = *jcfg.FailureThreshold
}
return cfg.Validate()
}
@ -92,6 +107,7 @@ func (cfg *Config) ToJSON() ([]byte, error) {
func (cfg *Config) toJSONConfig() *jsonConfig {
return &jsonConfig{
CheckInterval: cfg.CheckInterval.String(),
CheckInterval: cfg.CheckInterval.String(),
FailureThreshold: &cfg.FailureThreshold,
}
}

View File

@ -9,7 +9,8 @@ import (
var cfgJSON = []byte(`
{
"check_interval": "15s"
"check_interval": "15s",
"failure_threshold": 3.0
}
`)
@ -53,6 +54,7 @@ func TestDefault(t *testing.T) {
}
cfg.CheckInterval = 0
cfg.FailureThreshold = -0.1
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
@ -60,10 +62,14 @@ func TestDefault(t *testing.T) {
func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_PUBSUBMON_CHECKINTERVAL", "22s")
os.Setenv("CLUSTER_PUBSUBMON_FAILURETHRESHOLD", "4.0")
cfg := &Config{}
cfg.ApplyEnvVars()
if cfg.CheckInterval != 22*time.Second {
t.Fatal("failed to override check_interval with env var")
}
if cfg.FailureThreshold != 4.0 {
t.Fatal("failed to override failure_threshold with env var")
}
}

View File

@ -56,6 +56,7 @@ type PeersFunc func(context.Context) ([]peer.ID, error)
// PeersFunc. The PeersFunc can be nil. In this case, no metric filtering is
// done based on peers (any peer is considered part of the peerset).
func New(
ctx context.Context,
cfg *Config,
psub *pubsub.PubSub,
peers PeersFunc,
@ -65,10 +66,10 @@ func New(
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
mtrs := metrics.NewStore()
checker := metrics.NewChecker(mtrs)
checker := metrics.NewChecker(ctx, mtrs, cfg.FailureThreshold)
subscription, err := psub.Subscribe(PubsubTopic)
if err != nil {

View File

@ -86,7 +86,7 @@ func testPeerMonitor(t *testing.T) (*Monitor, host.Host, func()) {
cfg := &Config{}
cfg.Default()
cfg.CheckInterval = 2 * time.Second
mon, err := New(cfg, psub, peers)
mon, err := New(ctx, cfg, psub, peers)
if err != nil {
t.Fatal(err)
}

View File

@ -12,54 +12,65 @@ var logger = logging.Logger("observations")
var (
// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go)
latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
)
// opencensus attributes
// attributes
var (
ClientIPAttribute = "http.client.ip"
)
// opencensus keys
// keys
var (
HostKey = makeKey("host")
HostKey = makeKey("host")
RemotePeerKey = makeKey("remote_peer")
)
// opencensus metrics
// metrics
var (
// PinCountMetric counts the number of pins ipfs-cluster is tracking.
PinCountMetric = stats.Int64("cluster/pin_count", "Number of pins", stats.UnitDimensionless)
// TrackerPinCountMetric counts the number of pins the local peer is tracking.
TrackerPinCountMetric = stats.Int64("pintracker/pin_count", "Number of pins", stats.UnitDimensionless)
// PeerCountMetric counts the number of ipfs-cluster peers are currently in the cluster.
PeerCountMetric = stats.Int64("cluster/peer_count", "Number of cluster peers", stats.UnitDimensionless)
// Pins counts the number of pins ipfs-cluster is tracking.
Pins = stats.Int64("cluster/pin_count", "Number of pins", stats.UnitDimensionless)
// TrackerPins counts the number of pins the local peer is tracking.
TrackerPins = stats.Int64("pintracker/pin_count", "Number of pins", stats.UnitDimensionless)
// Peers counts the number of ipfs-cluster peers are currently in the cluster.
Peers = stats.Int64("cluster/peers", "Number of cluster peers", stats.UnitDimensionless)
// Alerts is the number of alerts that have been sent due to peers not sending "ping" heartbeats in time.
Alerts = stats.Int64("cluster/alerts", "Number of alerts triggered", stats.UnitDimensionless)
)
// opencensus views, which is just the aggregation of the metrics
// views, which is just the aggregation of the metrics
var (
PinCountView = &view.View{
Measure: PinCountMetric,
Aggregation: view.Sum(),
PinsView = &view.View{
Measure: Pins,
TagKeys: []tag.Key{HostKey},
Aggregation: view.LastValue(),
}
TrackerPinCountView = &view.View{
Measure: TrackerPinCountMetric,
TrackerPinsView = &view.View{
Measure: TrackerPins,
TagKeys: []tag.Key{HostKey},
Aggregation: view.Sum(),
Aggregation: view.LastValue(),
}
PeerCountView = &view.View{
Measure: PeerCountMetric,
PeersView = &view.View{
Measure: Peers,
TagKeys: []tag.Key{HostKey},
Aggregation: view.Count(),
Aggregation: view.LastValue(),
}
AlertsView = &view.View{
Measure: Alerts,
TagKeys: []tag.Key{HostKey, RemotePeerKey},
Aggregation: messageCountDistribution,
}
DefaultViews = []*view.View{
PinCountView,
TrackerPinCountView,
PeerCountView,
PinsView,
TrackerPinsView,
PeersView,
AlertsView,
}
)

View File

@ -69,7 +69,7 @@ func setupMetrics(cfg *MetricsConfig) error {
procCollector := prom.NewProcessCollector(prom.ProcessCollectorOpts{})
registry.MustRegister(goCollector, procCollector)
pe, err := prometheus.NewExporter(prometheus.Options{
Namespace: "cluster",
Namespace: "ipfscluster",
Registry: registry,
})
if err != nil {

View File

@ -70,6 +70,7 @@ func (cfg *Config) Validate() error {
if cfg.ConcurrentPins <= 0 {
return errors.New("maptracker.concurrent_pins is too low")
}
return nil
}

View File

@ -2,6 +2,8 @@ package optracker
import (
"context"
"fmt"
"strings"
"sync"
"time"
@ -83,6 +85,24 @@ func NewOperation(ctx context.Context, pin *api.Pin, typ OperationType, ph Phase
}
}
// String returns a string representation of an Operation.
func (op *Operation) String() string {
var b strings.Builder
fmt.Fprintf(&b, "type: %s\n", op.Type().String())
fmt.Fprint(&b, "pin:\n")
pinstr := op.Pin().String()
pinstrs := strings.Split(pinstr, "\n")
for _, s := range pinstrs {
fmt.Fprintf(&b, "\t%s\n", s)
}
fmt.Fprintf(&b, "phase: %s\n", op.Phase().String())
fmt.Fprintf(&b, "error: %s\n", op.Error())
fmt.Fprintf(&b, "timestamp: %s\n", op.Timestamp().String())
return b.String()
}
// Cid returns the Cid associated to this operation.
func (op *Operation) Cid() cid.Cid {
op.mu.RLock()

View File

@ -7,6 +7,8 @@ package optracker
import (
"context"
"fmt"
"strings"
"sync"
"time"
@ -30,6 +32,24 @@ type OperationTracker struct {
operations map[string]*Operation
}
func (opt *OperationTracker) String() string {
var b strings.Builder
fmt.Fprintf(&b, "pid: %v\n", opt.pid)
fmt.Fprintf(&b, "name: %s\n", opt.peerName)
fmt.Fprint(&b, "operations:\n")
opt.mu.RLock()
defer opt.mu.RUnlock()
for _, op := range opt.operations {
opstr := op.String()
opstrs := strings.Split(opstr, "\n")
for _, s := range opstrs {
fmt.Fprintf(&b, "\t%s\n", s)
}
}
return b.String()
}
// NewOperationTracker creates a new OperationTracker.
func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *OperationTracker {
return &OperationTracker{
@ -170,7 +190,7 @@ func (opt *OperationTracker) GetExists(ctx context.Context, c cid.Cid) (*api.Pin
return &pInfo, true
}
// GetAll returns PinInfo objets for all known operations.
// GetAll returns PinInfo objects for all known operations.
func (opt *OperationTracker) GetAll(ctx context.Context) []*api.PinInfo {
ctx, span := trace.StartSpan(ctx, "optracker/GetAll")
defer span.End()

View File

@ -4,9 +4,9 @@ package optracker
import "strconv"
const _OperationType_name = "OperationUnknownOperationPinOperationUnpin"
const _OperationType_name = "OperationUnknownOperationPinOperationUnpinOperationRemoteOperationShard"
var _OperationType_index = [...]uint8{0, 16, 28, 42}
var _OperationType_index = [...]uint8{0, 16, 28, 42, 57, 71}
func (i OperationType) String() string {
if i < 0 || i >= OperationType(len(_OperationType_index)-1) {

View File

@ -4,9 +4,9 @@ package optracker
import "strconv"
const _Phase_name = "PhaseErrorPhaseQueuedPhaseInProgress"
const _Phase_name = "PhaseErrorPhaseQueuedPhaseInProgressPhaseDone"
var _Phase_index = [...]uint8{0, 10, 21, 36}
var _Phase_index = [...]uint8{0, 10, 21, 36, 45}
func (i Phase) String() string {
if i < 0 || i >= Phase(len(_Phase_index)-1) {