Fixes #16: trigger ipfs swarm connect to other ipfs nodes in the cluster.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-03-23 19:34:33 +01:00
parent 98043ca675
commit 4bb30cd24a
9 changed files with 130 additions and 12 deletions

View File

@ -535,6 +535,16 @@ func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
logger.Error(err)
}
// Ask the new peer to connect its IPFS daemon to the rest
err = c.rpcClient.Call(pid,
"Cluster",
"IPFSConnectSwarms",
struct{}{},
&struct{}{})
if err != nil {
logger.Error(err)
}
id, err := c.getIDForPeer(pid)
return id, nil
}

View File

@ -77,6 +77,8 @@ func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, e
return m, nil
}
func (ipfs *mockConnector) ConnectSwarms() {}
func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate.MapState, *maptracker.MapPinTracker) {
api := &mockAPI{}
ipfs := &mockConnector{}

View File

@ -71,6 +71,9 @@ type IPFSConnector interface {
Unpin(*cid.Cid) error
PinLsCid(*cid.Cid) (api.IPFSPinStatus, error)
PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error)
// ConnectSwarms make sure this peer's IPFS daemon is connected to
// other peers IPFS daemons.
ConnectSwarms()
}
// Peered represents a component which needs to be aware of the peers

View File

@ -681,7 +681,7 @@ func TestClustersReplication(t *testing.T) {
if err != nil {
t.Error(err)
}
time.Sleep(time.Second / 2)
time.Sleep(time.Second)
// check that it is held by exactly nClusters -1 peers
gpi, err := clusters[j].Status(h)

View File

@ -27,6 +27,11 @@ import (
var logger = logging.Logger("ipfshttp")
// ConnectSwarmsDelay specifies how long to wait after startup before attempting
// to open connections from this peer's IPFS daemon to the IPFS daemons
// of other peers.
var ConnectSwarmsDelay = 7 * time.Second
// IPFS Proxy settings
var (
// maximum duration before timing out read of the request
@ -134,26 +139,45 @@ func NewConnector(ipfsNodeMAddr ma.Multiaddr, ipfsProxyMAddr ma.Multiaddr) (*Con
ipfs.handlers["/api/v0/pin/rm"] = ipfs.unpinHandler
ipfs.handlers["/api/v0/pin/ls"] = ipfs.pinLsHandler
ipfs.run()
go ipfs.run()
return ipfs, nil
}
// set cancellable context. launch proxy
// launches proxy and connects all ipfs daemons when
// we receive the rpcReady signal.
func (ipfs *Connector) run() {
<-ipfs.rpcReady
// This launches the proxy
ipfs.wg.Add(1)
go func() {
defer ipfs.wg.Done()
<-ipfs.rpcReady
logger.Infof("IPFS Proxy: %s -> %s",
ipfs.proxyMAddr,
ipfs.nodeMAddr)
err := ipfs.server.Serve(ipfs.listener)
err := ipfs.server.Serve(ipfs.listener) // hangs here
if err != nil && !strings.Contains(err.Error(), "closed network connection") {
logger.Error(err)
}
}()
// This runs ipfs swarm connect to the daemons of other cluster members
ipfs.wg.Add(1)
go func() {
defer ipfs.wg.Done()
// It does not hurt to wait a little bit. i.e. think cluster
// peers which are started at the same time as the ipfs
// daemon...
tmr := time.NewTimer(ConnectSwarmsDelay)
defer tmr.Stop()
select {
case <-tmr.C:
ipfs.ConnectSwarms()
case <-ipfs.ctx.Done():
return
}
}()
}
// This will run a custom handler if we have one for a URL.Path, or
@ -363,7 +387,6 @@ func (ipfs *Connector) ID() (api.IPFSID, error) {
mAddrs[i] = mAddr
}
id.Addresses = mAddrs
return id, nil
}
@ -505,3 +528,35 @@ func (ipfs *Connector) get(path string) ([]byte, error) {
func (ipfs *Connector) apiURL() string {
return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
}
// ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms() {
var idsSerial []api.IDSerial
err := ipfs.rpcClient.Call("",
"Cluster",
"Peers",
struct{}{},
&idsSerial)
if err != nil {
logger.Error(err)
return
}
logger.Debugf("%+v", idsSerial)
for _, idSerial := range idsSerial {
ipfsID := idSerial.IPFS
for _, addr := range ipfsID.Addresses {
// This is a best effort attempt
// We ignore errors which happens
// when passing in a bunch of addresses
_, err := ipfs.get(
fmt.Sprintf("swarm/connect?arg=%s", addr))
if err != nil {
logger.Debug(err)
continue
}
logger.Debugf("ipfs successfully connected to %s", addr)
}
}
}

View File

@ -6,15 +6,22 @@ import (
"io/ioutil"
"net/http"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
func init() {
_ = logging.Logger
ConnectSwarmsDelay = 0
}
func testIPFSConnector(t *testing.T) (*Connector, *test.IpfsMock) {
mock := test.NewIpfsMock()
nodeMAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d",
@ -326,6 +333,18 @@ func TestIPFSShutdown(t *testing.T) {
}
}
func TestConnectSwarms(t *testing.T) {
// In order to interactively test uncomment the following.
// Otherwise there is no good way to test this with the
// ipfs mock
// logging.SetDebugLogging()
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()
time.Sleep(time.Second)
}
func proxyURL(c *Connector) string {
_, addr, _ := manet.DialArgs(c.proxyMAddr)
return fmt.Sprintf("http://%s/api/v0", addr)

View File

@ -219,6 +219,12 @@ func (rpcapi *RPCAPI) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus) er
return err
}
// ConnectSwarms runs IPFSConnector.ConnectSwarms().
func (rpcapi *RPCAPI) ConnectSwarms(in struct{}, out *struct{}) error {
rpcapi.c.ipfs.ConnectSwarms()
return nil
}
/*
Consensus component methods
*/

View File

@ -150,6 +150,22 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
j, _ := json.Marshal(resp)
w.Write(j)
}
case "swarm/connect":
query := r.URL.Query()
arg, ok := query["arg"]
if !ok {
goto ERROR
}
addr := arg[0]
splits := strings.Split(addr, "/")
pid := splits[len(splits)-1]
resp := struct {
Strings []string
}{
Strings: []string{fmt.Sprintf("connect %s success", pid)},
}
j, _ := json.Marshal(resp)
w.Write(j)
case "version":
w.Write([]byte("{\"Version\":\"m.o.c.k\"}"))
default:

View File

@ -63,14 +63,17 @@ func (mock *mockService) ID(in struct{}, out *api.IDSerial) error {
//_, pubkey, _ := crypto.GenerateKeyPair(
// DefaultConfigCrypto,
// DefaultConfigKeyLength)
*out = api.ID{
ID: TestPeerID1,
*out = api.IDSerial{
ID: TestPeerID1.Pretty(),
//PublicKey: pubkey,
Version: "0.0.mock",
IPFS: api.IPFSID{
ID: TestPeerID1,
IPFS: api.IPFSIDSerial{
ID: TestPeerID1.Pretty(),
Addresses: api.MultiaddrsSerial{
api.MultiaddrSerial("/ip4/127.0.0.1/tcp/4001/ipfs/" + TestPeerID1.Pretty()),
},
},
}.ToSerial()
}
return nil
}
@ -237,3 +240,7 @@ func (mock *mockService) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus)
*out = m
return nil
}
func (mock *mockService) ConnectSwarms(in struct{}, out *struct{}) error {
return nil
}