Testing and polishing connection graph
Added go tests Refactored cluster connect graph to new file Refactored dot file printing to new repo Fixed code climate issues Added sharness test License: MIT Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
parent
e712c87570
commit
d2ef32f48f
|
@ -164,10 +164,10 @@ func (c *Client) Version() (api.Version, error) {
|
|||
return ver, err
|
||||
}
|
||||
|
||||
// GetConnectionGraph returns an ipfs-cluster connection graph.
|
||||
// GetConnectGraph returns an ipfs-cluster connection graph.
|
||||
// The serialized version, strings instead of pids, is returned
|
||||
func (c *Client) GetConnectGraph() (api.ConnectGraphSerial, error) {
|
||||
var graphS api.ConnectGraphSerial
|
||||
err := c.do("GET", "/graph", nil, &graphS)
|
||||
err := c.do("GET", "/health/graph", nil, &graphS)
|
||||
return graphS, err
|
||||
}
|
||||
|
|
|
@ -210,3 +210,17 @@ func TestRecoverAll(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetConnectGraph(t *testing.T) {
|
||||
c, api := testClient(t)
|
||||
defer api.Shutdown()
|
||||
|
||||
cg, err := c.GetConnectGraph()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(cg.IPFSLinks) != 3 || len(cg.ClusterLinks) != 3 ||
|
||||
len(cg.ClustertoIPFS) != 3 {
|
||||
t.Fatal("Bad graph")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -263,7 +263,7 @@ func (api *API) routes() []route {
|
|||
{
|
||||
"ConnectionGraph",
|
||||
"GET",
|
||||
"/graph",
|
||||
"/health/graph",
|
||||
api.graphHandler,
|
||||
},
|
||||
}
|
||||
|
@ -335,9 +335,9 @@ func (api *API) versionHandler(w http.ResponseWriter, r *http.Request) {
|
|||
sendResponse(w, err, v)
|
||||
}
|
||||
|
||||
func (rest *API) graphHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func (api *API) graphHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var graph types.ConnectGraphSerial
|
||||
err := rest.rpcClient.Call("",
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"ConnectGraph",
|
||||
struct{}{},
|
||||
|
|
|
@ -155,6 +155,34 @@ func TestAPIPeerRemoveEndpoint(t *testing.T) {
|
|||
makeDelete(t, "/peers/"+test.TestPeerID1.Pretty(), &struct{}{})
|
||||
}
|
||||
|
||||
func TestConnectGraphEndpoint(t *testing.T) {
|
||||
rest := testAPI(t)
|
||||
defer rest.Shutdown()
|
||||
var cg api.ConnectGraphSerial
|
||||
makeGet(t, "/health/graph", &cg)
|
||||
if cg.ClusterID != test.TestPeerID1.Pretty() {
|
||||
t.Error("unexpected cluster id")
|
||||
}
|
||||
if len(cg.IPFSLinks) != 3 {
|
||||
t.Error("unexpected number of ipfs peers")
|
||||
}
|
||||
if len(cg.ClusterLinks) != 3 {
|
||||
t.Error("unexpected number of cluster peers")
|
||||
}
|
||||
if len(cg.ClustertoIPFS) != 3 {
|
||||
t.Error("unexpected number of cluster to ipfs links")
|
||||
}
|
||||
// test a few link values
|
||||
pid1 := test.TestPeerID1.Pretty()
|
||||
pid4 := test.TestPeerID4.Pretty()
|
||||
if _, ok := cg.ClustertoIPFS[pid1]; !ok {
|
||||
t.Fatal("missing cluster peer 1 from cluster to peer links map")
|
||||
}
|
||||
if cg.ClustertoIPFS[pid1] != pid4 {
|
||||
t.Error("unexpected ipfs peer mapped to cluster peer 1 in graph")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPIPinEndpoint(t *testing.T) {
|
||||
rest := testAPI(t)
|
||||
defer rest.Shutdown()
|
||||
|
|
48
api/types.go
48
api/types.go
|
@ -295,8 +295,8 @@ type ConnectGraphSerial struct {
|
|||
|
||||
// ToSerial converts a ConnectGraph to its Go-serializable version
|
||||
func (cg ConnectGraph) ToSerial() ConnectGraphSerial {
|
||||
IPFSLinksSerial := SerializeLinkMap(cg.IPFSLinks)
|
||||
ClusterLinksSerial := SerializeLinkMap(cg.ClusterLinks)
|
||||
IPFSLinksSerial := serializeLinkMap(cg.IPFSLinks)
|
||||
ClusterLinksSerial := serializeLinkMap(cg.ClusterLinks)
|
||||
ClustertoIPFSSerial := make(map[string]string)
|
||||
for k, v := range cg.ClustertoIPFS {
|
||||
ClustertoIPFSSerial[peer.IDB58Encode(k)] = peer.IDB58Encode(v)
|
||||
|
@ -309,7 +309,24 @@ func (cg ConnectGraph) ToSerial() ConnectGraphSerial {
|
|||
}
|
||||
}
|
||||
|
||||
func SerializeLinkMap(Links map[peer.ID][]peer.ID) map[string][]string {
|
||||
// ToConnectGraph converts a ConnectGraphSerial to a ConnectGraph
|
||||
func (cgs ConnectGraphSerial) ToConnectGraph() ConnectGraph {
|
||||
ClustertoIPFS := make(map[peer.ID]peer.ID)
|
||||
for k, v := range cgs.ClustertoIPFS {
|
||||
pid1, _ := peer.IDB58Decode(k)
|
||||
pid2, _ := peer.IDB58Decode(v)
|
||||
ClustertoIPFS[pid1] = pid2
|
||||
}
|
||||
pid, _ := peer.IDB58Decode(cgs.ClusterID)
|
||||
return ConnectGraph{
|
||||
ClusterID: pid,
|
||||
IPFSLinks: deserializeLinkMap(cgs.IPFSLinks),
|
||||
ClusterLinks: deserializeLinkMap(cgs.ClusterLinks),
|
||||
ClustertoIPFS: ClustertoIPFS,
|
||||
}
|
||||
}
|
||||
|
||||
func serializeLinkMap(Links map[peer.ID][]peer.ID) map[string][]string {
|
||||
LinksSerial := make(map[string][]string)
|
||||
for k, v := range Links {
|
||||
kS := peer.IDB58Encode(k)
|
||||
|
@ -318,28 +335,29 @@ func SerializeLinkMap(Links map[peer.ID][]peer.ID) map[string][]string {
|
|||
return LinksSerial
|
||||
}
|
||||
|
||||
// SwarmPeers lists an ipfs daemon's peers
|
||||
type SwarmPeers struct {
|
||||
Peers []peer.ID
|
||||
func deserializeLinkMap(LinksSerial map[string][]string) map[peer.ID][]peer.ID {
|
||||
Links := make(map[peer.ID][]peer.ID)
|
||||
for k, v := range LinksSerial {
|
||||
pid, _ := peer.IDB58Decode(k)
|
||||
Links[pid] = StringsToPeers(v)
|
||||
}
|
||||
return Links
|
||||
}
|
||||
|
||||
// SwarmPeers lists an ipfs daemon's peers
|
||||
type SwarmPeers []peer.ID
|
||||
|
||||
// SwarmPeersSerial is the serialized form of SwarmPeers for RPC use
|
||||
type SwarmPeersSerial struct {
|
||||
Peers []string `json:"peers"`
|
||||
}
|
||||
type SwarmPeersSerial []string
|
||||
|
||||
// ToSerial converts SwarmPeers to its Go-serializeable version
|
||||
func (swarm SwarmPeers) ToSerial() SwarmPeersSerial {
|
||||
return SwarmPeersSerial{
|
||||
Peers: PeersToStrings(swarm.Peers),
|
||||
}
|
||||
return PeersToStrings(swarm)
|
||||
}
|
||||
|
||||
// ToSwarmPeers converts a SwarmPeersSerial object to SwarmPeers.
|
||||
func (swarmS SwarmPeersSerial) ToSwarmPeers() SwarmPeers {
|
||||
return SwarmPeers{
|
||||
Peers: StringsToPeers(swarmS.Peers),
|
||||
}
|
||||
return StringsToPeers(swarmS)
|
||||
}
|
||||
|
||||
// ID holds information about the Cluster peer
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -16,6 +17,10 @@ var testMAddr3, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/8081/ws/ipfs/QmSoLer265N
|
|||
var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
|
||||
var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
var testPeerID2, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabd")
|
||||
var testPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
|
||||
var testPeerID4, _ = peer.IDB58Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc")
|
||||
var testPeerID5, _ = peer.IDB58Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg")
|
||||
var testPeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
|
||||
|
||||
func TestTrackerFromString(t *testing.T) {
|
||||
testcases := []string{"bug", "cluster_error", "pin_error", "unpin_error", "pinned", "pinning", "unpinning", "unpinned", "remote"}
|
||||
|
@ -127,6 +132,37 @@ func TestIDConv(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConnectGraphConv(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatal("paniced")
|
||||
}
|
||||
}()
|
||||
cg := ConnectGraph{
|
||||
ClusterID: testPeerID1,
|
||||
IPFSLinks: map[peer.ID][]peer.ID{
|
||||
testPeerID4: []peer.ID{testPeerID5, testPeerID6},
|
||||
testPeerID5: []peer.ID{testPeerID4, testPeerID6},
|
||||
testPeerID6: []peer.ID{testPeerID4, testPeerID5},
|
||||
},
|
||||
ClusterLinks: map[peer.ID][]peer.ID{
|
||||
testPeerID1: []peer.ID{testPeerID2, testPeerID3},
|
||||
testPeerID2: []peer.ID{testPeerID1, testPeerID3},
|
||||
testPeerID3: []peer.ID{testPeerID1, testPeerID2},
|
||||
},
|
||||
ClustertoIPFS: map[peer.ID]peer.ID{
|
||||
testPeerID1: testPeerID4,
|
||||
testPeerID2: testPeerID5,
|
||||
testPeerID3: testPeerID6,
|
||||
},
|
||||
}
|
||||
|
||||
cgNew := cg.ToSerial().ToConnectGraph()
|
||||
if !reflect.DeepEqual(cg, cgNew) {
|
||||
t.Fatal("The new connect graph should be equivalent to the old")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiaddrConv(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
|
77
cluster.go
77
cluster.go
|
@ -1051,83 +1051,6 @@ func (c *Cluster) Peers() []api.ID {
|
|||
return peers
|
||||
}
|
||||
|
||||
// ConnectGraph returns a description of which cluster peers and ipfs
|
||||
// daemons are connected to each other
|
||||
func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) {
|
||||
ipfsLinks := make(map[peer.ID][]peer.ID)
|
||||
clusterLinks := make(map[peer.ID][]peer.ID)
|
||||
clusterToIpfs := make(map[peer.ID]peer.ID)
|
||||
|
||||
members, err := c.consensus.Peers()
|
||||
if err != nil {
|
||||
return api.ConnectGraph{}, err
|
||||
}
|
||||
for _, p := range members {
|
||||
// Cluster connections
|
||||
clusterLinks[p] = make([]peer.ID, 0)
|
||||
var sPeers []api.IDSerial
|
||||
var pId api.ID
|
||||
err = c.rpcClient.Call(p,
|
||||
"Cluster",
|
||||
"Peers",
|
||||
struct{}{},
|
||||
&sPeers,
|
||||
)
|
||||
if err != nil { // Only setting cluster connections when no error occurs
|
||||
logger.Debugf("RPC error reaching cluster peer %s: %s", p.Pretty(), err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
selfConnection := false
|
||||
for _, sId := range sPeers {
|
||||
id := sId.ToID()
|
||||
if id.Error != "" {
|
||||
logger.Debugf("Peer %s errored connecting to its peer %s", p.Pretty(), id.ID.Pretty())
|
||||
continue
|
||||
}
|
||||
if id.ID == p {
|
||||
selfConnection = true
|
||||
pId = id
|
||||
} else {
|
||||
clusterLinks[p] = append(clusterLinks[p], id.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// IPFS connections
|
||||
if !selfConnection {
|
||||
logger.Debugf("cluster peer %s not its own peer. No ipfs info ", p.Pretty())
|
||||
continue
|
||||
}
|
||||
|
||||
ipfsId := pId.IPFS.ID
|
||||
if pId.IPFS.Error != "" { // Only setting ipfs connections when no error occurs
|
||||
logger.Debugf("ipfs id: %s has error: %s", ipfsId.Pretty(), pId.IPFS.Error)
|
||||
continue
|
||||
}
|
||||
clusterToIpfs[p] = ipfsId
|
||||
ipfsLinks[ipfsId] = make([]peer.ID, 0)
|
||||
var swarmPeersS api.SwarmPeersSerial
|
||||
err = c.rpcClient.Call(p,
|
||||
"Cluster",
|
||||
"IPFSSwarmPeers",
|
||||
struct{}{},
|
||||
&swarmPeersS,
|
||||
)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
swarmPeers := swarmPeersS.ToSwarmPeers()
|
||||
ipfsLinks[ipfsId] = swarmPeers.Peers
|
||||
}
|
||||
|
||||
return api.ConnectGraph{
|
||||
ClusterID: c.id,
|
||||
IPFSLinks: ipfsLinks,
|
||||
ClusterLinks: clusterLinks,
|
||||
ClustertoIPFS: clusterToIpfs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// makeHost makes a libp2p-host.
|
||||
func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
|
||||
ps := peerstore.NewPeerstore()
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
)
|
||||
|
||||
type mockComponent struct {
|
||||
|
@ -79,6 +80,10 @@ func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, e
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func (ipfs *mockConnector) SwarmPeers() (api.SwarmPeers, error) {
|
||||
return []peer.ID{test.TestPeerID4, test.TestPeerID5}, nil
|
||||
}
|
||||
|
||||
func (ipfs *mockConnector) ConnectSwarms() error { return nil }
|
||||
func (ipfs *mockConnector) ConfigKey(keypath string) (interface{}, error) { return nil, nil }
|
||||
func (ipfs *mockConnector) FreeSpace() (uint64, error) { return 100, nil }
|
||||
|
|
89
connect_graph.go
Normal file
89
connect_graph.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package ipfscluster
|
||||
|
||||
import (
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
)
|
||||
|
||||
// ConnectGraph returns a description of which cluster peers and ipfs
|
||||
// daemons are connected to each other
|
||||
func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) {
|
||||
cg := api.ConnectGraph{
|
||||
IPFSLinks: make(map[peer.ID][]peer.ID),
|
||||
ClusterLinks: make(map[peer.ID][]peer.ID),
|
||||
ClustertoIPFS: make(map[peer.ID]peer.ID),
|
||||
}
|
||||
members, err := c.consensus.Peers()
|
||||
if err != nil {
|
||||
return cg, err
|
||||
}
|
||||
|
||||
peersSerials := make([][]api.IDSerial, len(members), len(members))
|
||||
errs := c.multiRPC(members, "Cluster", "Peers", struct{}{},
|
||||
copyIDSerialSliceToIfaces(peersSerials))
|
||||
|
||||
for i, err := range errs {
|
||||
p := members[i]
|
||||
cg.ClusterLinks[p] = make([]peer.ID, 0)
|
||||
if err != nil { // Only setting cluster connections when no error occurs
|
||||
logger.Debugf("RPC error reaching cluster peer %s: %s", p.Pretty(), err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
selfConnection, pID := c.recordClusterLinks(&cg, p, peersSerials[i])
|
||||
|
||||
// IPFS connections
|
||||
if !selfConnection {
|
||||
logger.Warningf("cluster peer %s not its own peer. No ipfs info ", p.Pretty())
|
||||
continue
|
||||
}
|
||||
c.recordIPFSLinks(&cg, pID)
|
||||
}
|
||||
|
||||
return cg, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) recordClusterLinks(cg *api.ConnectGraph, p peer.ID, sPeers []api.IDSerial) (bool, api.ID) {
|
||||
selfConnection := false
|
||||
var pID api.ID
|
||||
for _, sID := range sPeers {
|
||||
id := sID.ToID()
|
||||
if id.Error != "" {
|
||||
logger.Debugf("Peer %s errored connecting to its peer %s", p.Pretty(), id.ID.Pretty())
|
||||
continue
|
||||
}
|
||||
if id.ID == p {
|
||||
selfConnection = true
|
||||
pID = id
|
||||
} else {
|
||||
cg.ClusterLinks[p] = append(cg.ClusterLinks[p], id.ID)
|
||||
}
|
||||
}
|
||||
return selfConnection, pID
|
||||
}
|
||||
|
||||
func (c *Cluster) recordIPFSLinks(cg *api.ConnectGraph, pID api.ID) {
|
||||
ipfsID := pID.IPFS.ID
|
||||
if pID.IPFS.Error != "" { // Only setting ipfs connections when no error occurs
|
||||
logger.Warningf("ipfs id: %s has error: %s. Skipping swarm connections", ipfsID.Pretty(), pID.IPFS.Error)
|
||||
return
|
||||
}
|
||||
if _, ok := cg.IPFSLinks[pID.ID]; ok {
|
||||
logger.Warningf("ipfs id: %s already recorded, one ipfs daemon in use by multiple cluster peers", ipfsID.Pretty())
|
||||
}
|
||||
cg.ClustertoIPFS[pID.ID] = ipfsID
|
||||
cg.IPFSLinks[ipfsID] = make([]peer.ID, 0)
|
||||
var swarmPeersS api.SwarmPeersSerial
|
||||
err := c.rpcClient.Call(pID.ID,
|
||||
"Cluster",
|
||||
"IPFSSwarmPeers",
|
||||
struct{}{},
|
||||
&swarmPeersS,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
swarmPeers := swarmPeersS.ToSwarmPeers()
|
||||
cg.IPFSLinks[ipfsID] = swarmPeers
|
||||
}
|
|
@ -2,17 +2,37 @@ package main
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
dot "github.com/zenground0/go-dot"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
)
|
||||
|
||||
/*
|
||||
These functions are used to write an IPFS Cluster connectivity graph to a
|
||||
graphviz-style dot file. Input an api.ConnectGraphSerial object, makeDot
|
||||
does some preprocessing and then passes all 3 link maps to a
|
||||
cluster-dotWriter which handles iterating over the link maps and writing
|
||||
dot file node and edge statements to make a dot-file graph. Nodes are
|
||||
labeled with the go-libp2p-peer shortened peer id. IPFS nodes are rendered
|
||||
with gold boundaries, Cluster nodes with blue. Currently preprocessing
|
||||
consists of moving IPFS swarm peers not connected to any cluster peer to
|
||||
the IPFSLinks map in the event that the function was invoked with the
|
||||
allIpfs flag. This allows all IPFS peers connected to the cluster to be
|
||||
rendered as nodes in the final graph.
|
||||
*/
|
||||
|
||||
// nodeType specifies the type of node being represented in the dot file:
|
||||
// either IPFS or Cluster
|
||||
type nodeType int
|
||||
|
||||
const (
|
||||
tCluster nodeType = iota
|
||||
tIpfs
|
||||
tCluster nodeType = iota // The cluster node type
|
||||
tIpfs // The IPFS node type
|
||||
)
|
||||
|
||||
var errUnfinishedWrite = errors.New("could not complete write of line to output")
|
||||
|
@ -21,7 +41,7 @@ var errCorruptOrdering = errors.New("expected pid to have an ordering within dot
|
|||
|
||||
func makeDot(cg api.ConnectGraphSerial, w io.Writer, allIpfs bool) error {
|
||||
ipfsEdges := make(map[string][]string)
|
||||
for k,v := range cg.IPFSLinks {
|
||||
for k, v := range cg.IPFSLinks {
|
||||
ipfsEdges[k] = make([]string, 0)
|
||||
for _, id := range v {
|
||||
if _, ok := cg.IPFSLinks[id]; ok || allIpfs {
|
||||
|
@ -37,202 +57,132 @@ func makeDot(cg api.ConnectGraphSerial, w io.Writer, allIpfs bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
dW := dotWriter {
|
||||
dW := dotWriter{
|
||||
w: w,
|
||||
dotGraph: dot.NewGraph("cluster"),
|
||||
ipfsEdges: ipfsEdges,
|
||||
clusterEdges: cg.ClusterLinks,
|
||||
clusterIpfsEdges: cg.ClustertoIPFS,
|
||||
clusterOrder: make(map[string]int, 0),
|
||||
ipfsOrder: make(map[string]int, 0),
|
||||
clusterNodes: make(map[string]*dot.VertexDescription),
|
||||
ipfsNodes: make(map[string]*dot.VertexDescription),
|
||||
}
|
||||
return dW.print()
|
||||
}
|
||||
|
||||
type dotWriter struct {
|
||||
clusterOrder map[string]int
|
||||
ipfsOrder map[string]int
|
||||
clusterNodes map[string]*dot.VertexDescription
|
||||
ipfsNodes map[string]*dot.VertexDescription
|
||||
|
||||
w io.Writer
|
||||
dotGraph dot.DotGraph
|
||||
|
||||
w io.Writer
|
||||
|
||||
ipfsEdges map[string][]string
|
||||
clusterEdges map[string][]string
|
||||
clusterIpfsEdges map[string]string
|
||||
|
||||
}
|
||||
|
||||
func (dW dotWriter) writeComment(comment string) error {
|
||||
final := fmt.Sprintf("/* %s */\n", comment)
|
||||
n, err := io.WriteString(dW.w, final)
|
||||
if err == nil && n != len([]byte(final)) {
|
||||
err = errUnfinishedWrite
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// precondition: id has already been processed and id's ordering
|
||||
// has been recorded by dW
|
||||
func (dW dotWriter) getString(id string, idT nodeType) (string, error) {
|
||||
switch idT {
|
||||
case tCluster:
|
||||
number, ok := dW.clusterOrder[id]
|
||||
if !ok {
|
||||
return "", errCorruptOrdering
|
||||
}
|
||||
return fmt.Sprintf("C%d", number), nil
|
||||
|
||||
case tIpfs:
|
||||
number, ok := dW.ipfsOrder[id]
|
||||
if !ok {
|
||||
return "", errCorruptOrdering
|
||||
}
|
||||
return fmt.Sprintf("I%d", number), nil
|
||||
default:
|
||||
return "", errUnknownNodeType
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (dW dotWriter) writeEdge(from string, fT nodeType, to string, tT nodeType) error{
|
||||
fromStr, err := dW.getString(from, fT)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
toStr, err := dW.getString(to, tT)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
edgeStr := fmt.Sprintf("%s -> %s\n", fromStr, toStr)
|
||||
n, err := io.WriteString(dW.w, edgeStr)
|
||||
if err == nil && n != len([]byte(edgeStr)) {
|
||||
err = errUnfinishedWrite
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// writes nodes to dot file output and creates and stores an ordering over nodes
|
||||
func (dW *dotWriter) writeNode(id string, nT nodeType) error {
|
||||
var nodeStr string
|
||||
func (dW *dotWriter) addNode(id string, nT nodeType) error {
|
||||
var node dot.VertexDescription
|
||||
node.Label = shortID(id)
|
||||
switch nT {
|
||||
case tCluster:
|
||||
nodeStr = fmt.Sprintf("C%d [label=\"%s\" color=\"blue2\"]\n", len(dW.clusterOrder), shortId(id))
|
||||
dW.clusterOrder[id] = len(dW.clusterOrder)
|
||||
node.ID = fmt.Sprintf("C%d", len(dW.clusterNodes))
|
||||
node.Color = "blue2"
|
||||
dW.clusterNodes[id] = &node
|
||||
case tIpfs:
|
||||
nodeStr = fmt.Sprintf("I%d [label=\"%s\" color=\"goldenrod\"]\n", len(dW.ipfsOrder), shortId(id))
|
||||
dW.ipfsOrder[id] = len(dW.ipfsOrder)
|
||||
node.ID = fmt.Sprintf("I%d", len(dW.ipfsNodes))
|
||||
node.Color = "goldenrod"
|
||||
dW.ipfsNodes[id] = &node
|
||||
default:
|
||||
return errUnknownNodeType
|
||||
}
|
||||
n, err := io.WriteString(dW.w, nodeStr)
|
||||
if err == nil && n != len([]byte(nodeStr)) {
|
||||
err = errUnfinishedWrite
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (dW *dotWriter) print() error {
|
||||
_, err := io.WriteString(dW.w, "digraph cluster {\n\n")
|
||||
err = dW.writeComment("The nodes of the connectivity graph")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = dW.writeComment("The cluster-service peers")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Write cluster nodes
|
||||
for k := range dW.clusterEdges {
|
||||
err = dW.writeNode(k, tCluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err = io.WriteString(dW.w, "\n")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dW.writeComment("The ipfs peers")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Write ipfs nodes
|
||||
for k := range dW.ipfsEdges {
|
||||
err = dW.writeNode(k, tIpfs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err = io.WriteString(dW.w, "\n")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dW.writeComment("Edges representing active connections in the cluster")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = dW.writeComment("The connections among cluster-service peers")
|
||||
// Write cluster edges
|
||||
for k, v := range dW.clusterEdges {
|
||||
for _, id := range v {
|
||||
err = dW.writeEdge(k,tCluster,id,tCluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err = io.WriteString(dW.w, "\n")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dW.writeComment("The connections between cluster peers and their ipfs daemons")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Write cluster to ipfs edges
|
||||
for k, id := range dW.clusterIpfsEdges {
|
||||
err = dW.writeEdge(k,tCluster,id,tIpfs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err = io.WriteString(dW.w, "\n")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dW.writeComment("The swarm peer connections among ipfs daemons in the cluster")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Write ipfs edges
|
||||
for k, v := range dW.ipfsEdges {
|
||||
for _, id := range v {
|
||||
err = dW.writeEdge(k,tIpfs,id,tIpfs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err = io.WriteString(dW.w, "\n }")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dW.dotGraph.AddVertex(&node)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// truncate the provided peer id string to the 3 last characters. Odds of
|
||||
// pairwise collisions are less than 1 in 200,000 so with 70 cluster peers
|
||||
func (dW *dotWriter) print() error {
|
||||
dW.dotGraph.AddComment("The nodes of the connectivity graph")
|
||||
dW.dotGraph.AddComment("The cluster-service peers")
|
||||
// Write cluster nodes, use sorted order for consistent labels
|
||||
for _, k := range sortedKeys(dW.clusterEdges) {
|
||||
err := dW.addNode(k, tCluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
dW.dotGraph.AddNewLine()
|
||||
|
||||
dW.dotGraph.AddComment("The ipfs peers")
|
||||
// Write ipfs nodes, use sorted order for consistent labels
|
||||
for _, k := range sortedKeys(dW.ipfsEdges) {
|
||||
err := dW.addNode(k, tIpfs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
dW.dotGraph.AddNewLine()
|
||||
|
||||
dW.dotGraph.AddComment("Edges representing active connections in the cluster")
|
||||
dW.dotGraph.AddComment("The connections among cluster-service peers")
|
||||
// Write cluster edges
|
||||
for k, v := range dW.clusterEdges {
|
||||
for _, id := range v {
|
||||
toNode := dW.clusterNodes[k]
|
||||
fromNode := dW.clusterNodes[id]
|
||||
dW.dotGraph.AddEdge(toNode, fromNode, true)
|
||||
}
|
||||
}
|
||||
dW.dotGraph.AddNewLine()
|
||||
|
||||
dW.dotGraph.AddComment("The connections between cluster peers and their ipfs daemons")
|
||||
// Write cluster to ipfs edges
|
||||
for k, id := range dW.clusterIpfsEdges {
|
||||
toNode := dW.clusterNodes[k]
|
||||
fromNode := dW.ipfsNodes[id]
|
||||
dW.dotGraph.AddEdge(toNode, fromNode, true)
|
||||
}
|
||||
dW.dotGraph.AddNewLine()
|
||||
|
||||
dW.dotGraph.AddComment("The swarm peer connections among ipfs daemons in the cluster")
|
||||
// Write ipfs edges
|
||||
for k, v := range dW.ipfsEdges {
|
||||
for _, id := range v {
|
||||
toNode := dW.ipfsNodes[k]
|
||||
fromNode := dW.ipfsNodes[id]
|
||||
dW.dotGraph.AddEdge(toNode, fromNode, true)
|
||||
}
|
||||
}
|
||||
return dW.dotGraph.WriteDot(dW.w)
|
||||
}
|
||||
|
||||
func sortedKeys(dict map[string][]string) []string {
|
||||
keys := make([]string, len(dict), len(dict))
|
||||
i := 0
|
||||
for k := range dict {
|
||||
keys[i] = k
|
||||
i++
|
||||
}
|
||||
sort.Strings(keys)
|
||||
return keys
|
||||
}
|
||||
|
||||
// truncate the provided peer id string to the 3 last characters. Odds of
|
||||
// pairwise collisions are less than 1 in 200,000 so with 70 cluster peers
|
||||
// the chances of a collision are still less than 1 in 100 (birthday paradox).
|
||||
// As clusters grow bigger than this we can provide a flag for including
|
||||
// more characters.
|
||||
func shortId(peerString string) string {
|
||||
if len(peerString) < 3 {
|
||||
return peerString
|
||||
func shortID(peerString string) string {
|
||||
pid, err := peer.IDB58Decode(peerString)
|
||||
if err != nil {
|
||||
// We'll truncate ourselves
|
||||
// Should never get here but try to match with peer:String()
|
||||
if len(peerString) < 6 {
|
||||
|
||||
return fmt.Sprintf("<peer.ID %s>", peerString)
|
||||
}
|
||||
return fmt.Sprintf("<peer.ID %s>", peerString[:6])
|
||||
}
|
||||
start := len(peerString) - 3
|
||||
end := len(peerString)
|
||||
return peerString[start:end]
|
||||
return pid.String()
|
||||
}
|
||||
|
|
217
ipfs-cluster-ctl/graph_test.go
Normal file
217
ipfs-cluster-ctl/graph_test.go
Normal file
|
@ -0,0 +1,217 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
)
|
||||
|
||||
func verifyOutput(t *testing.T, outStr string, trueStr string) {
|
||||
// Because values are printed in the order of map keys we cannot enforce
|
||||
// an exact ordering. Instead we split both strings and compare line by
|
||||
// line.
|
||||
outLines := strings.Split(outStr, "\n")
|
||||
trueLines := strings.Split(trueStr, "\n")
|
||||
sort.Strings(outLines)
|
||||
sort.Strings(trueLines)
|
||||
if len(outLines) != len(trueLines) {
|
||||
fmt.Printf("expected: %s\n actual: %s", trueStr, outStr)
|
||||
t.Fatal("Number of output lines does not match expectation")
|
||||
}
|
||||
for i := range outLines {
|
||||
if outLines[i] != trueLines[i] {
|
||||
t.Errorf("Difference in sorted outputs: %s vs %s", outLines[i], trueLines[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var simpleIpfs = `digraph cluster {
|
||||
|
||||
/* The nodes of the connectivity graph */
|
||||
/* The cluster-service peers */
|
||||
C0 [label="<peer.ID UBuxVH>" color="blue2"]
|
||||
C1 [label="<peer.ID V35Ljb>" color="blue2"]
|
||||
C2 [label="<peer.ID Z2ckU7>" color="blue2"]
|
||||
|
||||
/* The ipfs peers */
|
||||
I0 [label="<peer.ID PFKAGZ>" color="goldenrod"]
|
||||
I1 [label="<peer.ID XbiVZd>" color="goldenrod"]
|
||||
I2 [label="<peer.ID bU7273>" color="goldenrod"]
|
||||
|
||||
/* Edges representing active connections in the cluster */
|
||||
/* The connections among cluster-service peers */
|
||||
C0 -> C1
|
||||
C0 -> C2
|
||||
C1 -> C0
|
||||
C1 -> C2
|
||||
C2 -> C0
|
||||
C2 -> C1
|
||||
|
||||
/* The connections between cluster peers and their ipfs daemons */
|
||||
C0 -> I1
|
||||
C1 -> I0
|
||||
C2 -> I2
|
||||
|
||||
/* The swarm peer connections among ipfs daemons in the cluster */
|
||||
I0 -> I1
|
||||
I0 -> I2
|
||||
I1 -> I0
|
||||
I1 -> I2
|
||||
I2 -> I0
|
||||
I2 -> I1
|
||||
|
||||
}`
|
||||
|
||||
func TestSimpleIpfsGraphs(t *testing.T) {
|
||||
cg := api.ConnectGraphSerial{
|
||||
ClusterID: "QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD",
|
||||
ClusterLinks: map[string][]string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD": []string{
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ",
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu",
|
||||
},
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ": []string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD",
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu",
|
||||
},
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu": []string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD",
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ",
|
||||
},
|
||||
},
|
||||
IPFSLinks: map[string][]string{
|
||||
"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV": []string{
|
||||
"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq",
|
||||
"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL",
|
||||
},
|
||||
"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq": []string{
|
||||
"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV",
|
||||
"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL",
|
||||
},
|
||||
"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL": []string{
|
||||
"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV",
|
||||
"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq",
|
||||
},
|
||||
},
|
||||
ClustertoIPFS: map[string]string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD":"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV",
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ":"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq",
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu":"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL",
|
||||
},
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
err := makeDot(cg, buf, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
verifyOutput(t, buf.String(), simpleIpfs)
|
||||
}
|
||||
|
||||
var allIpfs = `digraph cluster {
|
||||
|
||||
/* The nodes of the connectivity graph */
|
||||
/* The cluster-service peers */
|
||||
C0 [label="<peer.ID UBuxVH>" color="blue2"]
|
||||
C1 [label="<peer.ID V35Ljb>" color="blue2"]
|
||||
C2 [label="<peer.ID Z2ckU7>" color="blue2"]
|
||||
|
||||
/* The ipfs peers */
|
||||
I0 [label="<peer.ID PFKAGZ>" color="goldenrod"]
|
||||
I1 [label="<peer.ID VV2enw>" color="goldenrod"]
|
||||
I2 [label="<peer.ID XbiVZd>" color="goldenrod"]
|
||||
I3 [label="<peer.ID bU7273>" color="goldenrod"]
|
||||
I4 [label="<peer.ID Qmp43V>" color="goldenrod"]
|
||||
I5 [label="<peer.ID QmqRmE>" color="goldenrod"]
|
||||
|
||||
/* Edges representing active connections in the cluster */
|
||||
/* The connections among cluster-service peers */
|
||||
C2 -> C0
|
||||
C2 -> C1
|
||||
C0 -> C1
|
||||
C0 -> C2
|
||||
C1 -> C0
|
||||
C1 -> C2
|
||||
|
||||
/* The connections between cluster peers and their ipfs daemons */
|
||||
C0 -> I2
|
||||
C1 -> I0
|
||||
C2 -> I3
|
||||
|
||||
/* The swarm peer connections among ipfs daemons in the cluster */
|
||||
I0 -> I1
|
||||
I0 -> I2
|
||||
I0 -> I3
|
||||
I0 -> I4
|
||||
I0 -> I5
|
||||
I2 -> I0
|
||||
I2 -> I1
|
||||
I2 -> I3
|
||||
I2 -> I4
|
||||
I2 -> I5
|
||||
I3 -> I0
|
||||
I3 -> I1
|
||||
I3 -> I2
|
||||
I3 -> I4
|
||||
I3 -> I5
|
||||
|
||||
}`
|
||||
|
||||
|
||||
func TestIpfsAllGraphs(t *testing.T) {
|
||||
cg := api.ConnectGraphSerial{
|
||||
ClusterID: "QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD",
|
||||
ClusterLinks: map[string][]string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD": []string{
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ",
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu",
|
||||
},
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ": []string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD",
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu",
|
||||
},
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu": []string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD",
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ",
|
||||
},
|
||||
},
|
||||
IPFSLinks: map[string][]string{
|
||||
"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV": []string{
|
||||
"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq",
|
||||
"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL",
|
||||
"QmqRmEm2rp5Ppqy9JwGiFLiu9TAm21F2y9fu8aaaaaaDBq",
|
||||
"QmVV2enwXqqQf5esx4v36UeaFQvFehSPzNfi8aaaaaanM8",
|
||||
"Qmp43VV2enwXp43VV2enwXNjfmNpTaff774yyQuu99akzp",
|
||||
},
|
||||
"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq": []string{
|
||||
"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV",
|
||||
"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL",
|
||||
"QmqRmEm2rp5Ppqy9JwGiFLiu9TAm21F2y9fu8aaaaaaDBq",
|
||||
"QmVV2enwXqqQf5esx4v36UeaFQvFehSPzNfi8aaaaaanM8",
|
||||
"Qmp43VV2enwXp43VV2enwXNjfmNpTaff774yyQuu99akzp",
|
||||
},
|
||||
"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL": []string{
|
||||
"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV",
|
||||
"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq",
|
||||
"QmqRmEm2rp5Ppqy9JwGiFLiu9TAm21F2y9fu8aaaaaaDBq",
|
||||
"QmVV2enwXqqQf5esx4v36UeaFQvFehSPzNfi8aaaaaanM8",
|
||||
"Qmp43VV2enwXp43VV2enwXNjfmNpTaff774yyQuu99akzp",
|
||||
},
|
||||
},
|
||||
ClustertoIPFS: map[string]string{
|
||||
"QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD":"QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV",
|
||||
"QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ":"QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq",
|
||||
"QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu":"QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL",
|
||||
},
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
err := makeDot(cg, buf, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
verifyOutput(t, buf.String(), allIpfs)
|
||||
}
|
|
@ -225,46 +225,6 @@ cluster peers.
|
|||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "graph",
|
||||
Usage: "display connectivity of cluster nodes",
|
||||
Description: `
|
||||
This command queries all connected cluster peers and their ipfs nodes to generate a
|
||||
graph of the connections. Output is a dot file encoding the cluster's connection state
|
||||
`,
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "file, f",
|
||||
Value: "",
|
||||
Usage: "sets an output dot-file for the connectivity graph",
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "all-ipfs-peers",
|
||||
Usage: "causes the graph to mark nodes for ipfs peers not directly in the cluster",
|
||||
},
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
resp, cerr := globalClient.GetConnectGraph()
|
||||
if cerr != nil {
|
||||
formatResponse(c, resp, cerr)
|
||||
return nil
|
||||
}
|
||||
var w io.WriteCloser
|
||||
var err error
|
||||
outputPath := c.String("file")
|
||||
if outputPath == "" {
|
||||
w = os.Stdout
|
||||
} else {
|
||||
w, err = os.Create(outputPath)
|
||||
checkErr("creating output file", err)
|
||||
}
|
||||
defer w.Close()
|
||||
err = makeDot(resp, w, c.Bool("all-ipfs-peers"))
|
||||
checkErr("printing graph", err)
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -471,6 +431,7 @@ operations on the contacted peer (as opposed to on every peer).
|
|||
return nil
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
Name: "version",
|
||||
Usage: "Retrieve cluster version",
|
||||
|
@ -486,6 +447,52 @@ to check that it matches the CLI version (shown by -v).
|
|||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "health",
|
||||
Description: "Display information on clusterhealth",
|
||||
Subcommands: []cli.Command{
|
||||
{
|
||||
Name: "graph",
|
||||
Usage: "display connectivity of cluster nodes",
|
||||
Description: `
|
||||
This command queries all connected cluster peers and their ipfs nodes to generate a
|
||||
graph of the connections. Output is a dot file encoding the cluster's connection state.
|
||||
`,
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "file, f",
|
||||
Value: "",
|
||||
Usage: "sets an output dot-file for the connectivity graph",
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "all-ipfs-peers",
|
||||
Usage: "causes the graph to mark nodes for ipfs peers not directly in the cluster",
|
||||
},
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
resp, cerr := globalClient.GetConnectGraph()
|
||||
if cerr != nil {
|
||||
formatResponse(c, resp, cerr)
|
||||
return nil
|
||||
}
|
||||
var w io.WriteCloser
|
||||
var err error
|
||||
outputPath := c.String("file")
|
||||
if outputPath == "" {
|
||||
w = os.Stdout
|
||||
} else {
|
||||
w, err = os.Create(outputPath)
|
||||
checkErr("creating output file", err)
|
||||
}
|
||||
defer w.Close()
|
||||
err = makeDot(resp, w, c.Bool("all-ipfs-peers"))
|
||||
checkErr("printing graph", err)
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "commands",
|
||||
Usage: "List all commands",
|
||||
|
|
|
@ -1005,3 +1005,140 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
|
|||
t.Errorf("it should be pinned and is %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function for verifying cluster graph. Will only pass if exactly the
|
||||
// peers in clusterIDs are fully connected to each other and the expected ipfs
|
||||
// mock connectivity exists. Cluster peers not in clusterIDs are assumed to
|
||||
// be disconnected and the graph should reflect this
|
||||
func validateClusterGraph(t *testing.T, graph api.ConnectGraph, clusterIDs map[peer.ID]struct{}) {
|
||||
// Check that all cluster peers see each other as peers
|
||||
for id1, peers := range graph.ClusterLinks {
|
||||
if _, ok := clusterIDs[id1]; !ok {
|
||||
if len(peers) != 0 {
|
||||
t.Errorf("disconnected peer %s is still connected in graph", id1)
|
||||
}
|
||||
continue
|
||||
}
|
||||
fmt.Printf("id: %s, peers: %v\n", id1, peers)
|
||||
if len(peers) > len(clusterIDs)-1 {
|
||||
t.Errorf("More peers recorded in graph than expected")
|
||||
}
|
||||
// Make lookup index for peers connected to id1
|
||||
peerIndex := make(map[peer.ID]struct{})
|
||||
for _, peer := range peers {
|
||||
peerIndex[peer] = struct{}{}
|
||||
}
|
||||
for id2 := range clusterIDs {
|
||||
if _, ok := peerIndex[id2]; id1 != id2 && !ok {
|
||||
t.Errorf("Expected graph to see peer %s connected to peer %s", id1, id2)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(graph.ClusterLinks) != nClusters {
|
||||
t.Errorf("Unexpected number of cluster nodes in graph")
|
||||
}
|
||||
|
||||
// Check that all cluster peers are recorded as nodes in the graph
|
||||
for id := range clusterIDs {
|
||||
if _, ok := graph.ClusterLinks[id]; !ok {
|
||||
t.Errorf("Expected graph to record peer %s as a node", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the mocked ipfs swarm is recorded
|
||||
if len(graph.IPFSLinks) != 1 {
|
||||
t.Error("Expected exactly one ipfs peer for all cluster nodes, the mocked peer")
|
||||
}
|
||||
links, ok := graph.IPFSLinks[test.TestPeerID1]
|
||||
if !ok {
|
||||
t.Error("Expected the mocked ipfs peer to be a node in the graph")
|
||||
} else {
|
||||
if len(links) != 2 || links[0] != test.TestPeerID4 ||
|
||||
links[1] != test.TestPeerID5 {
|
||||
t.Error("Swarm peers of mocked ipfs are not those expected")
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the cluster to ipfs connections are all recorded
|
||||
for id := range clusterIDs {
|
||||
if ipfsID, ok := graph.ClustertoIPFS[id]; !ok {
|
||||
t.Errorf("Expected graph to record peer %s's ipfs connection", id)
|
||||
} else {
|
||||
if ipfsID != test.TestPeerID1 {
|
||||
t.Errorf("Unexpected error %s", ipfsID)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(graph.ClustertoIPFS) > len(clusterIDs) {
|
||||
t.Error("More cluster to ipfs links recorded in graph than expected")
|
||||
}
|
||||
}
|
||||
|
||||
// In this test we get a cluster graph report from a random peer in a healthy
|
||||
// fully connected cluster and verify that it is formed as expected.
|
||||
func TestClustersGraphConnected(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
delay()
|
||||
delay()
|
||||
|
||||
j := rand.Intn(nClusters) // choose a random cluster peer to query
|
||||
graph, err := clusters[j].ConnectGraph()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clusterIDs := make(map[peer.ID]struct{})
|
||||
for _, c := range clusters {
|
||||
id := c.ID().ID
|
||||
clusterIDs[id] = struct{}{}
|
||||
}
|
||||
validateClusterGraph(t, graph, clusterIDs)
|
||||
}
|
||||
|
||||
// Similar to the previous test we get a cluster graph report from a peer.
|
||||
// However now 2 peers have been shutdown and so we do not expect to see
|
||||
// them in the graph
|
||||
func TestClustersGraphUnhealthy(t *testing.T) {
|
||||
clusters, mock := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mock)
|
||||
if nClusters < 5 {
|
||||
t.Skip("Need at least 5 peers")
|
||||
}
|
||||
|
||||
j := rand.Intn(nClusters) // choose a random cluster peer to query
|
||||
// chose the clusters to shutdown
|
||||
discon1 := -1
|
||||
discon2 := -1
|
||||
for i := range clusters {
|
||||
if i != j {
|
||||
if discon1 == -1 {
|
||||
discon1 = i
|
||||
} else {
|
||||
discon2 = i
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
clusters[discon1].Shutdown()
|
||||
clusters[discon2].Shutdown()
|
||||
delay()
|
||||
waitForLeader(t, clusters)
|
||||
delay()
|
||||
|
||||
graph, err := clusters[j].ConnectGraph()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clusterIDs := make(map[peer.ID]struct{})
|
||||
for i, c := range clusters {
|
||||
if i == discon1 || i == discon2 {
|
||||
continue
|
||||
}
|
||||
id := c.ID().ID
|
||||
clusterIDs[id] = struct{}{}
|
||||
}
|
||||
validateClusterGraph(t, graph, clusterIDs)
|
||||
}
|
||||
|
|
|
@ -97,11 +97,7 @@ type ipfsSwarmPeersResp struct {
|
|||
}
|
||||
|
||||
type ipfsPeer struct {
|
||||
Addr string
|
||||
Peer string
|
||||
Latency string
|
||||
Muxer string
|
||||
Streams []ipfsStream
|
||||
Peer string
|
||||
}
|
||||
|
||||
type ipfsStream struct {
|
||||
|
@ -867,14 +863,14 @@ func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) {
|
|||
return swarm, err
|
||||
}
|
||||
|
||||
swarm.Peers = make([]peer.ID, len(peersRaw.Peers))
|
||||
swarm = make([]peer.ID, len(peersRaw.Peers))
|
||||
for i, p := range peersRaw.Peers {
|
||||
pID, err := peer.IDB58Decode(p.Peer)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return swarm, err
|
||||
}
|
||||
swarm.Peers[i] = pID
|
||||
swarm[i] = pID
|
||||
}
|
||||
return swarm, nil
|
||||
}
|
||||
|
|
|
@ -540,6 +540,26 @@ func TestConnectSwarms(t *testing.T) {
|
|||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
func TestSwarmPeers(t *testing.T) {
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
defer ipfs.Shutdown()
|
||||
|
||||
swarmPeers, err := ipfs.SwarmPeers()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(swarmPeers) != 2 {
|
||||
t.Fatal("expected 2 swarm peers")
|
||||
}
|
||||
if swarmPeers[0] != test.TestPeerID4 {
|
||||
t.Error("unexpected swarm peer")
|
||||
}
|
||||
if swarmPeers[1] != test.TestPeerID5 {
|
||||
t.Error("unexpected swarm peer")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepoSize(t *testing.T) {
|
||||
ipfs, mock := testIPFSConnector(t)
|
||||
defer mock.Close()
|
||||
|
|
|
@ -66,6 +66,12 @@
|
|||
"hash": "QmWi28zbQG6B1xfaaWx5cYoLn3kBFU6pQ6GWQNRV5P6dNe",
|
||||
"name": "lock",
|
||||
"version": "0.0.0"
|
||||
},
|
||||
{
|
||||
"author": "ZenGround0",
|
||||
"hash": "QmXoatUMzVryJXW1iucE9H4BayxAKzHSyRVZPxhWwEuX8y",
|
||||
"name": "go-dot",
|
||||
"version": "0.0.0"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.11.0",
|
||||
|
|
|
@ -5,9 +5,7 @@ test_description="Test ctl's status reporting functionality. Test errors on inc
|
|||
. lib/test-lib.sh
|
||||
|
||||
test_ipfs_init
|
||||
cleanup test_clean_ipfs
|
||||
test_cluster_init
|
||||
cleanup test_clean_cluster
|
||||
|
||||
test_expect_success IPFS,CLUSTER,JQ "cluster-ctl can read id" '
|
||||
id=`cluster_id`
|
||||
|
@ -59,4 +57,12 @@ test_expect_success IPFS,CLUSTER "pin ls on invalid CID fails" '
|
|||
test_must_fail ipfs-cluster-ctl pin ls XXXinvalid-CIDXXX
|
||||
'
|
||||
|
||||
test_expect_success IPFS,CLUSTER "health graph succeeds and prints as expected" '
|
||||
ipfs-cluster-ctl health graph | grep -q "C0 -> I0"
|
||||
|
||||
'
|
||||
|
||||
test_clean_ipfs
|
||||
test_clean_cluster
|
||||
|
||||
test_done
|
||||
|
|
|
@ -13,4 +13,7 @@ var (
|
|||
TestPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
TestPeerID2, _ = peer.IDB58Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
|
||||
TestPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
|
||||
TestPeerID4, _ = peer.IDB58Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc")
|
||||
TestPeerID5, _ = peer.IDB58Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg")
|
||||
TestPeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
|
||||
)
|
||||
|
|
|
@ -63,6 +63,14 @@ type mockAddResp struct {
|
|||
Bytes uint64
|
||||
}
|
||||
|
||||
type mockSwarmPeersResp struct {
|
||||
Peers []mockIpfsPeer
|
||||
}
|
||||
|
||||
type mockIpfsPeer struct {
|
||||
Peer string
|
||||
}
|
||||
|
||||
// NewIpfsMock returns a new mock.
|
||||
func NewIpfsMock() *IpfsMock {
|
||||
st := mapstate.NewMapState()
|
||||
|
@ -211,6 +219,18 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
j, _ := json.Marshal(resp)
|
||||
w.Write(j)
|
||||
case "swarm/peers":
|
||||
peer1 := mockIpfsPeer{
|
||||
Peer: TestPeerID4.Pretty(),
|
||||
}
|
||||
peer2 := mockIpfsPeer{
|
||||
Peer: TestPeerID5.Pretty(),
|
||||
}
|
||||
resp := mockSwarmPeersResp{
|
||||
Peers: []mockIpfsPeer{peer1, peer2},
|
||||
}
|
||||
j, _ := json.Marshal(resp)
|
||||
w.Write(j)
|
||||
case "repo/stat":
|
||||
len := len(m.pinMap.List())
|
||||
resp := mockRepoStatResp{
|
||||
|
|
|
@ -118,6 +118,28 @@ func (mock *mockService) PeerRemove(in peer.ID, out *struct{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) ConnectGraph(in struct{}, out *api.ConnectGraphSerial) error {
|
||||
*out = api.ConnectGraphSerial{
|
||||
ClusterID: TestPeerID1.Pretty(),
|
||||
IPFSLinks: map[string][]string{
|
||||
TestPeerID4.Pretty(): []string{TestPeerID5.Pretty(), TestPeerID6.Pretty()},
|
||||
TestPeerID5.Pretty(): []string{TestPeerID4.Pretty(), TestPeerID6.Pretty()},
|
||||
TestPeerID6.Pretty(): []string{TestPeerID4.Pretty(), TestPeerID5.Pretty()},
|
||||
},
|
||||
ClusterLinks: map[string][]string{
|
||||
TestPeerID1.Pretty(): []string{TestPeerID2.Pretty(), TestPeerID3.Pretty()},
|
||||
TestPeerID2.Pretty(): []string{TestPeerID1.Pretty(), TestPeerID3.Pretty()},
|
||||
TestPeerID3.Pretty(): []string{TestPeerID1.Pretty(), TestPeerID2.Pretty()},
|
||||
},
|
||||
ClustertoIPFS: map[string]string{
|
||||
TestPeerID1.Pretty(): TestPeerID4.Pretty(),
|
||||
TestPeerID2.Pretty(): TestPeerID5.Pretty(),
|
||||
TestPeerID3.Pretty(): TestPeerID6.Pretty(),
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) StatusAll(in struct{}, out *[]api.GlobalPinInfoSerial) error {
|
||||
c1, _ := cid.Decode(TestCid1)
|
||||
c2, _ := cid.Decode(TestCid2)
|
||||
|
@ -320,6 +342,11 @@ func (mock *mockService) IPFSConnectSwarms(in struct{}, out *struct{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) IPFSSwarmPeers(in struct{}, out *api.SwarmPeersSerial) error {
|
||||
*out = []string{TestPeerID2.Pretty(), TestPeerID3.Pretty()}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) IPFSConfigKey(in string, out *interface{}) error {
|
||||
switch in {
|
||||
case "Datastore/StorageMax":
|
||||
|
|
8
util.go
8
util.go
|
@ -44,6 +44,14 @@ func copyPinInfoSerialSliceToIfaces(in [][]api.PinInfoSerial) []interface{} {
|
|||
return ifaces
|
||||
}
|
||||
|
||||
func copyIDSerialSliceToIfaces(in [][]api.IDSerial) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
for i := range in {
|
||||
ifaces[i] = &in[i]
|
||||
}
|
||||
return ifaces
|
||||
}
|
||||
|
||||
func copyEmptyStructToIfaces(in []struct{}) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
for i := range in {
|
||||
|
|
Loading…
Reference in New Issue
Block a user