Workaround tests failing randomly
Tracked down reason to: https://github.com/libp2p/go-libp2p-swarm/issues/15 License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
74c494eb1e
commit
84a7fa663d
30
cluster.go
30
cluster.go
|
@ -3,7 +3,9 @@ package ipfscluster
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
rpc "github.com/hsanjuan/go-libp2p-rpc"
|
rpc "github.com/hsanjuan/go-libp2p-rpc"
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
|
@ -76,6 +78,9 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Workaround for https://github.com/libp2p/go-libp2p-swarm/issues/15
|
||||||
|
cluster.openConns()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
tracker.SetClient(rpcClient)
|
tracker.SetClient(rpcClient)
|
||||||
ipfs.SetClient(rpcClient)
|
ipfs.SetClient(rpcClient)
|
||||||
|
@ -434,7 +439,7 @@ func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, er
|
||||||
var errorMsgs string
|
var errorMsgs string
|
||||||
for i, r := range replies {
|
for i, r := range replies {
|
||||||
if e := errs[i]; e != nil {
|
if e := errs[i]; e != nil {
|
||||||
logger.Error(e)
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
|
||||||
errorMsgs += e.Error() + "\n"
|
errorMsgs += e.Error() + "\n"
|
||||||
}
|
}
|
||||||
pin.Status[r.Peer] = r
|
pin.Status[r.Peer] = r
|
||||||
|
@ -479,7 +484,7 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
|
||||||
var errorMsgs string
|
var errorMsgs string
|
||||||
for i, r := range replies {
|
for i, r := range replies {
|
||||||
if e := errs[i]; e != nil {
|
if e := errs[i]; e != nil {
|
||||||
logger.Error("error in broadcast response: ", e)
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
|
||||||
errorMsgs += e.Error() + "\n"
|
errorMsgs += e.Error() + "\n"
|
||||||
}
|
}
|
||||||
mergePins(r)
|
mergePins(r)
|
||||||
|
@ -494,3 +499,24 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
|
||||||
}
|
}
|
||||||
return infos, errors.New(errorMsgs)
|
return infos, errors.New(errorMsgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// openConns is a workaround for
|
||||||
|
// https://github.com/libp2p/go-libp2p-swarm/issues/15
|
||||||
|
// It runs when consensus is initialized so we can assume
|
||||||
|
// that the cluster is more or less up.
|
||||||
|
// It should open connections for peers where they haven't
|
||||||
|
// yet been opened. By randomly sleeping we reduce the
|
||||||
|
// chance that members will open 2 connections simultaneously.
|
||||||
|
func (c *Cluster) openConns() {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
||||||
|
peers := c.host.Peerstore().Peers()
|
||||||
|
for _, p := range peers {
|
||||||
|
peerInfo := c.host.Peerstore().PeerInfo(p)
|
||||||
|
if p == c.host.ID() {
|
||||||
|
continue // do not connect to ourselves
|
||||||
|
}
|
||||||
|
// ignore any errors here
|
||||||
|
c.host.Connect(c.ctx, peerInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -41,6 +41,9 @@ func SetLogLevel(l string) {
|
||||||
DEBUG
|
DEBUG
|
||||||
*/
|
*/
|
||||||
logging.SetLogLevel("cluster", l)
|
logging.SetLogLevel("cluster", l)
|
||||||
|
//logging.SetLogLevel("libp2p-rpc", l)
|
||||||
|
//logging.SetLogLevel("swarm2", l)
|
||||||
|
//logging.SetLogLevel("libp2p-raft", l)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IPFSStatus values
|
// IPFSStatus values
|
||||||
|
|
Loading…
Reference in New Issue
Block a user