Merge pull request #72 from ipfs/connect-ipfs-nodes
Fixes #16: trigger ipfs swarm connect to other ipfs nodes in the cluster
This commit is contained in:
commit
e8ba1b7dad
10
cluster.go
10
cluster.go
|
@ -535,6 +535,16 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ask the new peer to connect its IPFS daemon to the rest
|
||||||
|
err = c.rpcClient.Call(pid,
|
||||||
|
"Cluster",
|
||||||
|
"IPFSConnectSwarms",
|
||||||
|
struct{}{},
|
||||||
|
&struct{}{})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
id, err := c.getIDForPeer(pid)
|
id, err := c.getIDForPeer(pid)
|
||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,8 @@ func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, e
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ipfs *mockConnector) ConnectSwarms() {}
|
||||||
|
|
||||||
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {
|
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {
|
||||||
api := &mockAPI{}
|
api := &mockAPI{}
|
||||||
ipfs := &mockConnector{}
|
ipfs := &mockConnector{}
|
||||||
|
|
11
coverage.sh
11
coverage.sh
|
@ -1,10 +1,5 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
if [ -z $COVERALLS_TOKEN ]
|
|
||||||
then
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "mode: count" > fullcov.out
|
echo "mode: count" > fullcov.out
|
||||||
dirs=$(find ./* -maxdepth 10 -type d )
|
dirs=$(find ./* -maxdepth 10 -type d )
|
||||||
dirs=". $dirs"
|
dirs=". $dirs"
|
||||||
|
@ -23,6 +18,10 @@ do
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
$HOME/gopath/bin/goveralls -coverprofile=fullcov.out -service=travis-ci -repotoken $COVERALLS_TOKEN
|
|
||||||
|
if [ -n $COVERALLS_TOKEN ];
|
||||||
|
then
|
||||||
|
$HOME/gopath/bin/goveralls -coverprofile=fullcov.out -service=travis-ci -repotoken $COVERALLS_TOKEN
|
||||||
|
fi
|
||||||
rm -rf ./profile.out
|
rm -rf ./profile.out
|
||||||
rm -rf ./fullcov.out
|
rm -rf ./fullcov.out
|
||||||
|
|
|
@ -71,6 +71,9 @@ type IPFSConnector interface {
|
||||||
Unpin(*cid.Cid) error
|
Unpin(*cid.Cid) error
|
||||||
PinLsCid(*cid.Cid) (api.IPFSPinStatus, error)
|
PinLsCid(*cid.Cid) (api.IPFSPinStatus, error)
|
||||||
PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error)
|
PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error)
|
||||||
|
// ConnectSwarms make sure this peer's IPFS daemon is connected to
|
||||||
|
// other peers IPFS daemons.
|
||||||
|
ConnectSwarms()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peered represents a component which needs to be aware of the peers
|
// Peered represents a component which needs to be aware of the peers
|
||||||
|
|
|
@ -681,7 +681,7 @@ func TestClustersReplication(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second / 2)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
// check that it is held by exactly nClusters -1 peers
|
// check that it is held by exactly nClusters -1 peers
|
||||||
gpi, err := clusters[j].Status(h)
|
gpi, err := clusters[j].Status(h)
|
||||||
|
|
|
@ -27,6 +27,11 @@ import (
|
||||||
|
|
||||||
var logger = logging.Logger("ipfshttp")
|
var logger = logging.Logger("ipfshttp")
|
||||||
|
|
||||||
|
// ConnectSwarmsDelay specifies how long to wait after startup before attempting
|
||||||
|
// to open connections from this peer's IPFS daemon to the IPFS daemons
|
||||||
|
// of other peers.
|
||||||
|
var ConnectSwarmsDelay = 7 * time.Second
|
||||||
|
|
||||||
// IPFS Proxy settings
|
// IPFS Proxy settings
|
||||||
var (
|
var (
|
||||||
// maximum duration before timing out read of the request
|
// maximum duration before timing out read of the request
|
||||||
|
@ -134,26 +139,45 @@ func NewConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*Con
|
||||||
ipfs.handlers["/api/v0/pin/rm"] = ipfs.unpinHandler
|
ipfs.handlers["/api/v0/pin/rm"] = ipfs.unpinHandler
|
||||||
ipfs.handlers["/api/v0/pin/ls"] = ipfs.pinLsHandler
|
ipfs.handlers["/api/v0/pin/ls"] = ipfs.pinLsHandler
|
||||||
|
|
||||||
ipfs.run()
|
go ipfs.run()
|
||||||
return ipfs, nil
|
return ipfs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// set cancellable context. launch proxy
|
// launches proxy and connects all ipfs daemons when
|
||||||
|
// we receive the rpcReady signal.
|
||||||
func (ipfs *Connector) run() {
|
func (ipfs *Connector) run() {
|
||||||
|
<-ipfs.rpcReady
|
||||||
|
|
||||||
// This launches the proxy
|
// This launches the proxy
|
||||||
ipfs.wg.Add(1)
|
ipfs.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer ipfs.wg.Done()
|
defer ipfs.wg.Done()
|
||||||
<-ipfs.rpcReady
|
|
||||||
|
|
||||||
logger.Infof("IPFS Proxy: %s -> %s",
|
logger.Infof("IPFS Proxy: %s -> %s",
|
||||||
ipfs.proxyMAddr,
|
ipfs.proxyMAddr,
|
||||||
ipfs.nodeMAddr)
|
ipfs.nodeMAddr)
|
||||||
err := ipfs.server.Serve(ipfs.listener)
|
err := ipfs.server.Serve(ipfs.listener) // hangs here
|
||||||
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// This runs ipfs swarm connect to the daemons of other cluster members
|
||||||
|
ipfs.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer ipfs.wg.Done()
|
||||||
|
|
||||||
|
// It does not hurt to wait a little bit. i.e. think cluster
|
||||||
|
// peers which are started at the same time as the ipfs
|
||||||
|
// daemon...
|
||||||
|
tmr := time.NewTimer(ConnectSwarmsDelay)
|
||||||
|
defer tmr.Stop()
|
||||||
|
select {
|
||||||
|
case <-tmr.C:
|
||||||
|
ipfs.ConnectSwarms()
|
||||||
|
case <-ipfs.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will run a custom handler if we have one for a URL.Path, or
|
// This will run a custom handler if we have one for a URL.Path, or
|
||||||
|
@ -363,7 +387,6 @@ func (ipfs *Connector) ID() (api.IPFSID, error) {
|
||||||
mAddrs[i] = mAddr
|
mAddrs[i] = mAddr
|
||||||
}
|
}
|
||||||
id.Addresses = mAddrs
|
id.Addresses = mAddrs
|
||||||
|
|
||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -505,3 +528,35 @@ func (ipfs *Connector) get(path string) ([]byte, error) {
|
||||||
func (ipfs *Connector) apiURL() string {
|
func (ipfs *Connector) apiURL() string {
|
||||||
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
|
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectSwarms requests the ipfs addresses of other peers and
|
||||||
|
// triggers ipfs swarm connect requests
|
||||||
|
func (ipfs *Connector) ConnectSwarms() {
|
||||||
|
var idsSerial []api.IDSerial
|
||||||
|
err := ipfs.rpcClient.Call("",
|
||||||
|
"Cluster",
|
||||||
|
"Peers",
|
||||||
|
struct{}{},
|
||||||
|
&idsSerial)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Debugf("%+v", idsSerial)
|
||||||
|
|
||||||
|
for _, idSerial := range idsSerial {
|
||||||
|
ipfsID := idSerial.IPFS
|
||||||
|
for _, addr := range ipfsID.Addresses {
|
||||||
|
// This is a best effort attempt
|
||||||
|
// We ignore errors which happens
|
||||||
|
// when passing in a bunch of addresses
|
||||||
|
_, err := ipfs.get(
|
||||||
|
fmt.Sprintf("swarm/connect?arg=%s", addr))
|
||||||
|
if err != nil {
|
||||||
|
logger.Debug(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.Debugf("ipfs successfully connected to %s", addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -6,15 +6,22 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/ipfs-cluster/api"
|
"github.com/ipfs/ipfs-cluster/api"
|
||||||
"github.com/ipfs/ipfs-cluster/test"
|
"github.com/ipfs/ipfs-cluster/test"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
manet "github.com/multiformats/go-multiaddr-net"
|
manet "github.com/multiformats/go-multiaddr-net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
_ = logging.Logger
|
||||||
|
ConnectSwarmsDelay = 0
|
||||||
|
}
|
||||||
|
|
||||||
func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
|
func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
|
||||||
mock := test.NewIpfsMock()
|
mock := test.NewIpfsMock()
|
||||||
nodeMAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d",
|
nodeMAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d",
|
||||||
|
@ -326,6 +333,18 @@ func TestIPFSShutdown(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConnectSwarms(t *testing.T) {
|
||||||
|
// In order to interactively test uncomment the following.
|
||||||
|
// Otherwise there is no good way to test this with the
|
||||||
|
// ipfs mock
|
||||||
|
// logging.SetDebugLogging()
|
||||||
|
|
||||||
|
ipfs, mock := testIPFSConnector(t)
|
||||||
|
defer mock.Close()
|
||||||
|
defer ipfs.Shutdown()
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
func proxyURL(c *Connector) string {
|
func proxyURL(c *Connector) string {
|
||||||
_, addr, _ := manet.DialArgs(c.proxyMAddr)
|
_, addr, _ := manet.DialArgs(c.proxyMAddr)
|
||||||
return fmt.Sprintf("http://%s/api/v0", addr)
|
return fmt.Sprintf("http://%s/api/v0", addr)
|
||||||
|
|
|
@ -219,6 +219,12 @@ func (rpcapi *RPCAPI) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus) er
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectSwarms runs IPFSConnector.ConnectSwarms().
|
||||||
|
func (rpcapi *RPCAPI) ConnectSwarms(in struct{}, out *struct{}) error {
|
||||||
|
rpcapi.c.ipfs.ConnectSwarms()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Consensus component methods
|
Consensus component methods
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -150,6 +150,22 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
j, _ := json.Marshal(resp)
|
j, _ := json.Marshal(resp)
|
||||||
w.Write(j)
|
w.Write(j)
|
||||||
}
|
}
|
||||||
|
case "swarm/connect":
|
||||||
|
query := r.URL.Query()
|
||||||
|
arg, ok := query["arg"]
|
||||||
|
if !ok {
|
||||||
|
goto ERROR
|
||||||
|
}
|
||||||
|
addr := arg[0]
|
||||||
|
splits := strings.Split(addr, "/")
|
||||||
|
pid := splits[len(splits)-1]
|
||||||
|
resp := struct {
|
||||||
|
Strings []string
|
||||||
|
}{
|
||||||
|
Strings: []string{fmt.Sprintf("connect %s success", pid)},
|
||||||
|
}
|
||||||
|
j, _ := json.Marshal(resp)
|
||||||
|
w.Write(j)
|
||||||
case "version":
|
case "version":
|
||||||
w.Write([]byte("{\"Version\":\"m.o.c.k\"}"))
|
w.Write([]byte("{\"Version\":\"m.o.c.k\"}"))
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -63,14 +63,17 @@ func (mock *mockService) ID(in struct{}, out *api.IDSerial) error {
|
||||||
//_, pubkey, _ := crypto.GenerateKeyPair(
|
//_, pubkey, _ := crypto.GenerateKeyPair(
|
||||||
// DefaultConfigCrypto,
|
// DefaultConfigCrypto,
|
||||||
// DefaultConfigKeyLength)
|
// DefaultConfigKeyLength)
|
||||||
*out = api.ID{
|
*out = api.IDSerial{
|
||||||
ID: TestPeerID1,
|
ID: TestPeerID1.Pretty(),
|
||||||
//PublicKey: pubkey,
|
//PublicKey: pubkey,
|
||||||
Version: "0.0.mock",
|
Version: "0.0.mock",
|
||||||
IPFS: api.IPFSID{
|
IPFS: api.IPFSIDSerial{
|
||||||
ID: TestPeerID1,
|
ID: TestPeerID1.Pretty(),
|
||||||
|
Addresses: api.MultiaddrsSerial{
|
||||||
|
api.MultiaddrSerial("/ip4/127.0.0.1/tcp/4001/ipfs/" + TestPeerID1.Pretty()),
|
||||||
},
|
},
|
||||||
}.ToSerial()
|
},
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,3 +240,7 @@ func (mock *mockService) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus)
|
||||||
*out = m
|
*out = m
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mock *mockService) ConnectSwarms(in struct{}, out *struct{}) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user