Compare commits

...

5 Commits

Author SHA1 Message Date
nisdas
6a8ac948a2 Preston's Review 2024-05-28 21:40:19 +08:00
nisdas
7a9a026cef Add Flag To Configure Dials 2024-05-28 20:49:32 +08:00
nisdas
f52c29489d Slow Down Lookups 2024-05-27 17:12:25 +08:00
nisdas
3c6ea738f4 Handle backoff in Iterator 2024-05-23 19:07:19 +08:00
nisdas
4ce1261197 Fix Excessive Subnet Dials 2024-05-21 21:18:14 +08:00
9 changed files with 94 additions and 25 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"crypto/ecdsa"
"net"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -15,9 +16,11 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v5/math"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
@@ -106,7 +109,7 @@ func (s *Service) RefreshENR() {
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
iterator := enode.Filter(s.dv5Listener.RandomNodes(), s.filterPeer)
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
defer iterator.Close()
for {
@@ -122,29 +125,47 @@ func (s *Service) listenForNewNodes() {
time.Sleep(pollingPeriod)
continue
}
if exists := iterator.Next(); !exists {
break
}
node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
wantedCount := s.wantedPeerDials()
if wantedCount == 0 {
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
if peerInfo == nil {
continue
}
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
var err error
wantedICount := math.Min(uint64(wantedCount), uint64(flags.Get().MaxConcurrentDials))
wantedCount, err = math.Int(wantedICount)
if err != nil {
log.WithError(err).Error("Could not get wanted count")
continue
}
}(peerInfo)
}
wantedNodes := enode.ReadNodes(iterator, wantedCount)
wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
node := wantedNodes[i]
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}
if peerInfo == nil {
continue
}
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
wg.Add(1)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}(peerInfo)
}
wg.Wait()
}
}
@@ -384,6 +405,17 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
return activePeers >= maxPeers || numOfConns >= maxPeers
}
func (s *Service) wantedPeerDials() int {
maxPeers := int(s.cfg.MaxPeers)
activePeers := len(s.Peers().Active())
wantedCount := 0
if maxPeers > activePeers {
wantedCount = maxPeers - activePeers
}
return wantedCount
}
// PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
var allAddrs []ma.Multiaddr

View File

@@ -2,10 +2,14 @@ package p2p
import (
"context"
"runtime"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
)
const backOffCounter = 50
// filterNodes wraps an iterator such that Next only returns nodes for which
// the 'check' function returns true. This custom implementation also
// checks for context deadlines so that in the event the parent context has
@@ -24,13 +28,21 @@ type filterIter struct {
// Next looks up for the next valid node according to our
// filter criteria.
func (f *filterIter) Next() bool {
lookupCounter := 0
for f.Iterator.Next() {
// Do not excessively perform lookups if we constantly receive non-viable peers.
if lookupCounter > backOffCounter {
lookupCounter = 0
runtime.Gosched()
time.Sleep(30 * time.Second)
}
if f.Context.Err() != nil {
return false
}
if f.check(f.Node()) {
return true
}
lookupCounter++
}
return false
}

View File

@@ -87,7 +87,18 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
"only %d out of %d peers were able to be found", topic, currNum, threshold)
}
nodes := enode.ReadNodes(iterator, int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch))
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
var err error
wantedICount := mathutil.Min(uint64(nodeCount), uint64(flags.Get().MaxConcurrentDials))
nodeCount, err = mathutil.Int(wantedICount)
if err != nil {
log.WithError(err).Error("Could not get wanted count")
continue
}
}
nodes := enode.ReadNodes(iterator, nodeCount)
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {

View File

@@ -227,6 +227,12 @@ var (
Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.",
Value: 6,
}
// MaxConcurrentDials defines a flag to set the maximum number of peers that a node will attempt to dial with from discovery.
MaxConcurrentDials = &cli.Uint64Flag{
Name: "max-concurrent-dials",
Usage: "Sets the maximum number of peers that a node will attempt to dial with from discovery. By default we will dials as" +
"many peers as possible.",
}
// SuggestedFeeRecipient specifies the fee recipient for the transaction fees.
SuggestedFeeRecipient = &cli.StringFlag{
Name: "suggested-fee-recipient",

View File

@@ -11,6 +11,7 @@ type GlobalFlags struct {
SubscribeToAllSubnets bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
@@ -45,11 +46,17 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name)
configureMinimumPeers(ctx, cfg)
Init(cfg)
}
// MaxDialIsActive checks if the user has enabled the max dial flag.
func MaxDialIsActive() bool {
return Get().MaxConcurrentDials > 0
}
func configureMinimumPeers(ctx *cli.Context, cfg *GlobalFlags) {
cfg.MinimumSyncPeers = ctx.Int(MinSyncPeers.Name)
maxPeers := ctx.Int(cmd.P2PMaxPeers.Name)

View File

@@ -72,6 +72,7 @@ var appFlags = []cli.Flag{
flags.WeakSubjectivityCheckpoint,
flags.Eth1HeaderReqLimit,
flags.MinPeersPerSubnet,
flags.MaxConcurrentDials,
flags.SuggestedFeeRecipient,
flags.TerminalTotalDifficultyOverride,
flags.TerminalBlockHashOverride,

View File

@@ -124,6 +124,7 @@ var appHelpFlagGroups = []flagGroup{
flags.WeakSubjectivityCheckpoint,
flags.Eth1HeaderReqLimit,
flags.MinPeersPerSubnet,
flags.MaxConcurrentDials,
flags.MevRelayEndpoint,
flags.MaxBuilderEpochMissedSlots,
flags.MaxBuilderConsecutiveMissedSlots,

View File

@@ -21,8 +21,8 @@ import (
// MockNodeClient is a mock of NodeClient interface.
type MockNodeClient struct {
ctrl *gomock.Controller
recorder *MockNodeClientMockRecorder
ctrl *gomock.Controller
recorder *MockNodeClientMockRecorder
healthTracker *beacon.NodeHealthTracker
}

View File

@@ -76,4 +76,3 @@ func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, re
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
}