mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Reboot Discovery Listener (#14487)
* Add Current Changes To Routine * Add In New Test * Add Feature Flag * Add Discovery Rebooter feature * Do Not Export Mutex And Use Zero Value Mutex * Wrap Error For Better Debugging * Fix Function Name and Add Specific Test For it * Manu's Review
This commit is contained in:
@@ -24,6 +24,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
|
||||
- fastssz version bump (better error messages).
|
||||
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)
|
||||
- Add Bellatrix tests for light client functions
|
||||
- Add Discovery Rebooter Feature
|
||||
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
@@ -236,7 +235,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
|
||||
bootNode := bootListener.Self()
|
||||
subnet := uint64(5)
|
||||
|
||||
var listeners []*discover.UDPv5
|
||||
var listeners []*listenerWrapper
|
||||
var hosts []host.Host
|
||||
// setup other nodes.
|
||||
cfg = &Config{
|
||||
|
||||
@@ -50,7 +50,7 @@ func TestPeer_AtMaxLimit(t *testing.T) {
|
||||
}()
|
||||
|
||||
for i := 0; i < highWatermarkBuffer; i++ {
|
||||
addPeer(t, s.peers, peers.PeerConnected)
|
||||
addPeer(t, s.peers, peers.PeerConnected, false)
|
||||
}
|
||||
|
||||
// create alternate host
|
||||
@@ -159,7 +159,7 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
|
||||
inboundLimit += 1
|
||||
// Add in up to inbound peer limit.
|
||||
for i := 0; i < int(inboundLimit); i++ {
|
||||
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
|
||||
}
|
||||
valid = s.InterceptAccept(&maEndpoints{raddr: multiAddress})
|
||||
if valid {
|
||||
|
||||
@@ -24,6 +24,11 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
)
|
||||
|
||||
type ListenerRebooter interface {
|
||||
Listener
|
||||
RebootListener() error
|
||||
}
|
||||
|
||||
// Listener defines the discovery V5 network interface that is used
|
||||
// to communicate with other peers.
|
||||
type Listener interface {
|
||||
@@ -47,6 +52,87 @@ type quicProtocol uint16
|
||||
// quicProtocol is the "quic" key, which holds the QUIC port of the node.
|
||||
func (quicProtocol) ENRKey() string { return "quic" }
|
||||
|
||||
type listenerWrapper struct {
|
||||
mu sync.RWMutex
|
||||
listener *discover.UDPv5
|
||||
listenerCreator func() (*discover.UDPv5, error)
|
||||
}
|
||||
|
||||
func newListener(listenerCreator func() (*discover.UDPv5, error)) (*listenerWrapper, error) {
|
||||
rawListener, err := listenerCreator()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create new listener")
|
||||
}
|
||||
return &listenerWrapper{
|
||||
listener: rawListener,
|
||||
listenerCreator: listenerCreator,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) Self() *enode.Node {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.listener.Self()
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) Close() {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
l.listener.Close()
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) Lookup(id enode.ID) []*enode.Node {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.listener.Lookup(id)
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) Resolve(node *enode.Node) *enode.Node {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.listener.Resolve(node)
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) RandomNodes() enode.Iterator {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.listener.RandomNodes()
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) Ping(node *enode.Node) error {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.listener.Ping(node)
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) RequestENR(node *enode.Node) (*enode.Node, error) {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.listener.RequestENR(node)
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) LocalNode() *enode.LocalNode {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.listener.LocalNode()
|
||||
}
|
||||
|
||||
func (l *listenerWrapper) RebootListener() error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
// Close current listener
|
||||
l.listener.Close()
|
||||
|
||||
newListener, err := l.listenerCreator()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.listener = newListener
|
||||
return nil
|
||||
}
|
||||
|
||||
// RefreshENR uses an epoch to refresh the enr entry for our node
|
||||
// with the tracked committee ids for the epoch, allowing our node
|
||||
// to be dynamically discoverable by others given our tracked committee ids.
|
||||
@@ -110,55 +196,78 @@ func (s *Service) RefreshENR() {
|
||||
func (s *Service) listenForNewNodes() {
|
||||
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
|
||||
defer iterator.Close()
|
||||
connectivityTicker := time.NewTicker(1 * time.Minute)
|
||||
thresholdCount := 0
|
||||
|
||||
for {
|
||||
// Exit if service's context is canceled.
|
||||
if s.ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if s.isPeerAtLimit(false /* inbound */) {
|
||||
// Pause the main loop for a period to stop looking
|
||||
// for new peers.
|
||||
log.Trace("Not looking for peers, at peer limit")
|
||||
time.Sleep(pollingPeriod)
|
||||
continue
|
||||
}
|
||||
wantedCount := s.wantedPeerDials()
|
||||
if wantedCount == 0 {
|
||||
log.Trace("Not looking for peers, at peer limit")
|
||||
time.Sleep(pollingPeriod)
|
||||
continue
|
||||
}
|
||||
// Restrict dials if limit is applied.
|
||||
if flags.MaxDialIsActive() {
|
||||
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
|
||||
}
|
||||
wantedNodes := enode.ReadNodes(iterator, wantedCount)
|
||||
wg := new(sync.WaitGroup)
|
||||
for i := 0; i < len(wantedNodes); i++ {
|
||||
node := wantedNodes[i]
|
||||
peerInfo, _, err := convertToAddrInfo(node)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not convert to peer info")
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-connectivityTicker.C:
|
||||
// Skip the connectivity check if not enabled.
|
||||
if !features.Get().EnableDiscoveryReboot {
|
||||
continue
|
||||
}
|
||||
|
||||
if peerInfo == nil {
|
||||
if !s.isBelowOutboundPeerThreshold() {
|
||||
// Reset counter if we are beyond the threshold
|
||||
thresholdCount = 0
|
||||
continue
|
||||
}
|
||||
|
||||
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
|
||||
s.Peers().RandomizeBackOff(peerInfo.ID)
|
||||
wg.Add(1)
|
||||
go func(info *peer.AddrInfo) {
|
||||
if err := s.connectWithPeer(s.ctx, *info); err != nil {
|
||||
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
||||
thresholdCount++
|
||||
// Reboot listener if connectivity drops
|
||||
if thresholdCount > 5 {
|
||||
log.WithField("outboundConnectionCount", len(s.peers.OutboundConnected())).Warn("Rebooting discovery listener, reached threshold.")
|
||||
if err := s.dv5Listener.RebootListener(); err != nil {
|
||||
log.WithError(err).Error("Could not reboot listener")
|
||||
continue
|
||||
}
|
||||
wg.Done()
|
||||
}(peerInfo)
|
||||
iterator = filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
|
||||
thresholdCount = 0
|
||||
}
|
||||
default:
|
||||
if s.isPeerAtLimit(false /* inbound */) {
|
||||
// Pause the main loop for a period to stop looking
|
||||
// for new peers.
|
||||
log.Trace("Not looking for peers, at peer limit")
|
||||
time.Sleep(pollingPeriod)
|
||||
continue
|
||||
}
|
||||
wantedCount := s.wantedPeerDials()
|
||||
if wantedCount == 0 {
|
||||
log.Trace("Not looking for peers, at peer limit")
|
||||
time.Sleep(pollingPeriod)
|
||||
continue
|
||||
}
|
||||
// Restrict dials if limit is applied.
|
||||
if flags.MaxDialIsActive() {
|
||||
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
|
||||
}
|
||||
wantedNodes := enode.ReadNodes(iterator, wantedCount)
|
||||
wg := new(sync.WaitGroup)
|
||||
for i := 0; i < len(wantedNodes); i++ {
|
||||
node := wantedNodes[i]
|
||||
peerInfo, _, err := convertToAddrInfo(node)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not convert to peer info")
|
||||
continue
|
||||
}
|
||||
|
||||
if peerInfo == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
|
||||
s.Peers().RandomizeBackOff(peerInfo.ID)
|
||||
wg.Add(1)
|
||||
go func(info *peer.AddrInfo) {
|
||||
if err := s.connectWithPeer(s.ctx, *info); err != nil {
|
||||
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
|
||||
}
|
||||
wg.Done()
|
||||
}(peerInfo)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,14 +408,17 @@ func (s *Service) createLocalNode(
|
||||
func (s *Service) startDiscoveryV5(
|
||||
addr net.IP,
|
||||
privKey *ecdsa.PrivateKey,
|
||||
) (*discover.UDPv5, error) {
|
||||
listener, err := s.createListener(addr, privKey)
|
||||
) (*listenerWrapper, error) {
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(addr, privKey)
|
||||
}
|
||||
wrappedListener, err := newListener(createListener)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create listener")
|
||||
}
|
||||
record := listener.Self()
|
||||
record := wrappedListener.Self()
|
||||
log.WithField("ENR", record.String()).Info("Started discovery v5")
|
||||
return listener, nil
|
||||
return wrappedListener, nil
|
||||
}
|
||||
|
||||
// filterPeer validates each node that we retrieve from our dht. We
|
||||
@@ -398,6 +510,22 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
|
||||
return activePeers >= maxPeers || numOfConns >= maxPeers
|
||||
}
|
||||
|
||||
// isBelowOutboundPeerThreshold checks if the number of outbound peers that
|
||||
// we are connected to satisfies the minimum expected outbound peer count
|
||||
// according to our peer limit.
|
||||
func (s *Service) isBelowOutboundPeerThreshold() bool {
|
||||
maxPeers := int(s.cfg.MaxPeers)
|
||||
inBoundLimit := s.Peers().InboundLimit()
|
||||
// Impossible Condition
|
||||
if maxPeers < inBoundLimit {
|
||||
return false
|
||||
}
|
||||
outboundFloor := maxPeers - inBoundLimit
|
||||
outBoundThreshold := outboundFloor / 2
|
||||
outBoundCount := len(s.Peers().OutboundConnected())
|
||||
return outBoundCount < outBoundThreshold
|
||||
}
|
||||
|
||||
func (s *Service) wantedPeerDials() int {
|
||||
maxPeers := int(s.cfg.MaxPeers)
|
||||
|
||||
|
||||
@@ -95,7 +95,7 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
|
||||
|
||||
bootNode := bootListener.Self()
|
||||
|
||||
var listeners []*discover.UDPv5
|
||||
var listeners []*listenerWrapper
|
||||
for i := 1; i <= 5; i++ {
|
||||
port = 3000 + i
|
||||
cfg := &Config{
|
||||
@@ -231,6 +231,37 @@ func TestCreateLocalNode(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRebootDiscoveryListener(t *testing.T) {
|
||||
port := 1024
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
s := &Service{
|
||||
genesisTime: time.Now(),
|
||||
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
|
||||
cfg: &Config{UDPPort: uint(port)},
|
||||
}
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(ipAddr, pkey)
|
||||
}
|
||||
listener, err := newListener(createListener)
|
||||
require.NoError(t, err)
|
||||
currentPubkey := listener.Self().Pubkey()
|
||||
currentID := listener.Self().ID()
|
||||
currentPort := listener.Self().UDP()
|
||||
currentAddr := listener.Self().IP()
|
||||
|
||||
assert.NoError(t, listener.RebootListener())
|
||||
|
||||
newPubkey := listener.Self().Pubkey()
|
||||
newID := listener.Self().ID()
|
||||
newPort := listener.Self().UDP()
|
||||
newAddr := listener.Self().IP()
|
||||
|
||||
assert.Equal(t, true, currentPubkey.Equal(newPubkey))
|
||||
assert.Equal(t, currentID, newID)
|
||||
assert.Equal(t, currentPort, newPort)
|
||||
assert.Equal(t, currentAddr.String(), newAddr.String())
|
||||
}
|
||||
|
||||
func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) {
|
||||
addr := net.ParseIP("invalidIP")
|
||||
_, pkey := createAddrAndPrivKey(t)
|
||||
@@ -347,19 +378,44 @@ func TestInboundPeerLimit(t *testing.T) {
|
||||
}
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
|
||||
}
|
||||
|
||||
require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers")
|
||||
require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers")
|
||||
|
||||
for i := 0; i < highWatermarkBuffer; i++ {
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
|
||||
}
|
||||
|
||||
require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers")
|
||||
}
|
||||
|
||||
func TestOutboundPeerThreshold(t *testing.T) {
|
||||
fakePeer := testp2p.NewTestP2P(t)
|
||||
s := &Service{
|
||||
cfg: &Config{MaxPeers: 30},
|
||||
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, 1*time.Second, false),
|
||||
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
|
||||
PeerLimit: 30,
|
||||
ScorerParams: &scorers.Config{},
|
||||
}),
|
||||
host: fakePeer.BHost,
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), true)
|
||||
}
|
||||
|
||||
require.Equal(t, true, s.isBelowOutboundPeerThreshold(), "not at outbound peer threshold")
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), true)
|
||||
}
|
||||
|
||||
require.Equal(t, false, s.isBelowOutboundPeerThreshold(), "still at outbound peer threshold")
|
||||
}
|
||||
|
||||
func TestUDPMultiAddress(t *testing.T) {
|
||||
port := 6500
|
||||
ipAddr, pkey := createAddrAndPrivKey(t)
|
||||
@@ -370,7 +426,11 @@ func TestUDPMultiAddress(t *testing.T) {
|
||||
genesisTime: genesisTime,
|
||||
genesisValidatorsRoot: genesisValidatorsRoot,
|
||||
}
|
||||
listener, err := s.createListener(ipAddr, pkey)
|
||||
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(ipAddr, pkey)
|
||||
}
|
||||
listener, err := newListener(createListener)
|
||||
require.NoError(t, err)
|
||||
defer listener.Close()
|
||||
s.dv5Listener = listener
|
||||
@@ -417,7 +477,7 @@ func TestCorrectUDPVersion(t *testing.T) {
|
||||
}
|
||||
|
||||
// addPeer is a helper to add a peer with a given connection state)
|
||||
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID {
|
||||
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState, outbound bool) peer.ID {
|
||||
// Set up some peers with different states
|
||||
mhBytes := []byte{0x11, 0x04}
|
||||
idBytes := make([]byte, 4)
|
||||
@@ -426,7 +486,11 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState)
|
||||
mhBytes = append(mhBytes, idBytes...)
|
||||
id, err := peer.IDFromBytes(mhBytes)
|
||||
require.NoError(t, err)
|
||||
p.Add(new(enr.Record), id, nil, network.DirInbound)
|
||||
dir := network.DirInbound
|
||||
if outbound {
|
||||
dir = network.DirOutbound
|
||||
}
|
||||
p.Add(new(enr.Record), id, nil, dir)
|
||||
p.SetConnectionState(id, state)
|
||||
p.SetMetadata(id, wrapper.WrappedMetadataV0(ðpb.MetaDataV0{
|
||||
SeqNumber: 0,
|
||||
@@ -455,7 +519,10 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) {
|
||||
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
|
||||
cfg: &Config{UDPPort: uint(port)},
|
||||
}
|
||||
listener, err := s.createListener(ipAddr, pkey)
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(ipAddr, pkey)
|
||||
}
|
||||
listener, err := newListener(createListener)
|
||||
assert.NoError(t, err)
|
||||
s.dv5Listener = listener
|
||||
s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0))
|
||||
@@ -484,7 +551,10 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) {
|
||||
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
|
||||
cfg: &Config{UDPPort: uint(port)},
|
||||
}
|
||||
listener, err := s.createListener(ipAddr, pkey)
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(ipAddr, pkey)
|
||||
}
|
||||
listener, err := newListener(createListener)
|
||||
assert.NoError(t, err)
|
||||
s.dv5Listener = listener
|
||||
s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0))
|
||||
@@ -506,7 +576,10 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) {
|
||||
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
|
||||
cfg: &Config{UDPPort: uint(port)},
|
||||
}
|
||||
listener, err := s.createListener(ipAddr, pkey)
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(ipAddr, pkey)
|
||||
}
|
||||
listener, err := newListener(createListener)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Update params
|
||||
@@ -537,7 +610,10 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) {
|
||||
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
|
||||
cfg: &Config{UDPPort: uint(port)},
|
||||
}
|
||||
listener, err := s.createListener(ipAddr, pkey)
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(ipAddr, pkey)
|
||||
}
|
||||
listener, err := newListener(createListener)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Update params
|
||||
@@ -575,7 +651,10 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) {
|
||||
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
|
||||
cfg: &Config{UDPPort: uint(port)},
|
||||
}
|
||||
listener, err := s.createListener(ipAddr, pkey)
|
||||
createListener := func() (*discover.UDPv5, error) {
|
||||
return s.createListener(ipAddr, pkey)
|
||||
}
|
||||
listener, err := newListener(createListener)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Update params
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
@@ -52,7 +51,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
|
||||
StateNotifier: &mock.MockStateNotifier{},
|
||||
}
|
||||
|
||||
var listeners []*discover.UDPv5
|
||||
var listeners []*listenerWrapper
|
||||
for i := 1; i <= 5; i++ {
|
||||
port := 3000 + i
|
||||
cfg.UDPPort = uint(port)
|
||||
@@ -139,7 +138,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
|
||||
UDPPort: uint(port),
|
||||
}
|
||||
|
||||
var listeners []*discover.UDPv5
|
||||
var listeners []*listenerWrapper
|
||||
for i := 1; i <= 5; i++ {
|
||||
port := 3000 + i
|
||||
cfg.UDPPort = uint(port)
|
||||
|
||||
@@ -71,7 +71,7 @@ type Service struct {
|
||||
subnetsLock map[uint64]*sync.RWMutex
|
||||
subnetsLockLock sync.Mutex // Lock access to subnetsLock
|
||||
initializationLock sync.Mutex
|
||||
dv5Listener Listener
|
||||
dv5Listener ListenerRebooter
|
||||
startupErr error
|
||||
ctx context.Context
|
||||
host host.Host
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
@@ -69,6 +68,8 @@ func (mockListener) RandomNodes() enode.Iterator {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (mockListener) RebootListener() error { panic("implement me") }
|
||||
|
||||
func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
|
||||
_, pkey := createAddrAndPrivKey(t)
|
||||
ipAddr := net.ParseIP("127.0.0.1")
|
||||
@@ -210,7 +211,7 @@ func TestListenForNewNodes(t *testing.T) {
|
||||
|
||||
bootNode := bootListener.Self()
|
||||
|
||||
var listeners []*discover.UDPv5
|
||||
var listeners []*listenerWrapper
|
||||
var hosts []host.Host
|
||||
// setup other nodes.
|
||||
cs := startup.NewClockSynchronizer()
|
||||
|
||||
@@ -78,6 +78,8 @@ type Flags struct {
|
||||
SaveInvalidBlock bool // SaveInvalidBlock saves invalid block to temp.
|
||||
SaveInvalidBlob bool // SaveInvalidBlob saves invalid blob to temp.
|
||||
|
||||
EnableDiscoveryReboot bool // EnableDiscoveryReboot allows the node to have its local listener to be rebooted in the event of discovery issues.
|
||||
|
||||
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
|
||||
// changed on disk. This feature is for advanced use cases only.
|
||||
KeystoreImportDebounceInterval time.Duration
|
||||
@@ -260,6 +262,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
||||
logEnabled(DisableCommitteeAwarePacking)
|
||||
cfg.DisableCommitteeAwarePacking = true
|
||||
}
|
||||
if ctx.IsSet(EnableDiscoveryReboot.Name) {
|
||||
logEnabled(EnableDiscoveryReboot)
|
||||
cfg.EnableDiscoveryReboot = true
|
||||
}
|
||||
|
||||
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
|
||||
Init(cfg)
|
||||
|
||||
@@ -170,6 +170,10 @@ var (
|
||||
Name: "disable-committee-aware-packing",
|
||||
Usage: "Changes the attestation packing algorithm to one that is not aware of attesting committees.",
|
||||
}
|
||||
EnableDiscoveryReboot = &cli.BoolFlag{
|
||||
Name: "enable-discovery-reboot",
|
||||
Usage: "Experimental: Enables the discovery listener to rebooted in the event of connectivity issues.",
|
||||
}
|
||||
)
|
||||
|
||||
// devModeFlags holds list of flags that are set when development mode is on.
|
||||
@@ -227,6 +231,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
|
||||
BlobSaveFsync,
|
||||
EnableQUIC,
|
||||
DisableCommitteeAwarePacking,
|
||||
EnableDiscoveryReboot,
|
||||
}...)...)
|
||||
|
||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||
|
||||
Reference in New Issue
Block a user