Compare commits

...

5 Commits

Author SHA1 Message Date
nisdas
6a8ac948a2 Preston's Review 2024-05-28 21:40:19 +08:00
nisdas
7a9a026cef Add Flag To Configure Dials 2024-05-28 20:49:32 +08:00
nisdas
f52c29489d Slow Down Lookups 2024-05-27 17:12:25 +08:00
nisdas
3c6ea738f4 Handle backoff in Iterator 2024-05-23 19:07:19 +08:00
nisdas
4ce1261197 Fix Excessive Subnet Dials 2024-05-21 21:18:14 +08:00
9 changed files with 94 additions and 25 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/ecdsa" "crypto/ecdsa"
"net" "net"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
@@ -15,9 +16,11 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/config/params"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa" ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v5/math"
"github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/prysmaticlabs/prysm/v5/time/slots"
) )
@@ -106,7 +109,7 @@ func (s *Service) RefreshENR() {
// listen for new nodes watches for new nodes in the network and adds them to the peerstore. // listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() { func (s *Service) listenForNewNodes() {
iterator := enode.Filter(s.dv5Listener.RandomNodes(), s.filterPeer) iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
defer iterator.Close() defer iterator.Close()
for { for {
@@ -122,29 +125,47 @@ func (s *Service) listenForNewNodes() {
time.Sleep(pollingPeriod) time.Sleep(pollingPeriod)
continue continue
} }
wantedCount := s.wantedPeerDials()
if exists := iterator.Next(); !exists { if wantedCount == 0 {
break log.Trace("Not looking for peers, at peer limit")
} time.Sleep(pollingPeriod)
node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue continue
} }
// Restrict dials if limit is applied.
if peerInfo == nil { if flags.MaxDialIsActive() {
continue var err error
} wantedICount := math.Min(uint64(wantedCount), uint64(flags.Get().MaxConcurrentDials))
wantedCount, err = math.Int(wantedICount)
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period. if err != nil {
s.Peers().RandomizeBackOff(peerInfo.ID) log.WithError(err).Error("Could not get wanted count")
go func(info *peer.AddrInfo) { continue
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
} }
}(peerInfo) }
wantedNodes := enode.ReadNodes(iterator, wantedCount)
wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
node := wantedNodes[i]
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}
if peerInfo == nil {
continue
}
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
wg.Add(1)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}(peerInfo)
}
wg.Wait()
} }
} }
@@ -384,6 +405,17 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
return activePeers >= maxPeers || numOfConns >= maxPeers return activePeers >= maxPeers || numOfConns >= maxPeers
} }
func (s *Service) wantedPeerDials() int {
maxPeers := int(s.cfg.MaxPeers)
activePeers := len(s.Peers().Active())
wantedCount := 0
if maxPeers > activePeers {
wantedCount = maxPeers - activePeers
}
return wantedCount
}
// PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p. // PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
var allAddrs []ma.Multiaddr var allAddrs []ma.Multiaddr

View File

@@ -2,10 +2,14 @@ package p2p
import ( import (
"context" "context"
"runtime"
"time"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
) )
const backOffCounter = 50
// filterNodes wraps an iterator such that Next only returns nodes for which // filterNodes wraps an iterator such that Next only returns nodes for which
// the 'check' function returns true. This custom implementation also // the 'check' function returns true. This custom implementation also
// checks for context deadlines so that in the event the parent context has // checks for context deadlines so that in the event the parent context has
@@ -24,13 +28,21 @@ type filterIter struct {
// Next looks up for the next valid node according to our // Next looks up for the next valid node according to our
// filter criteria. // filter criteria.
func (f *filterIter) Next() bool { func (f *filterIter) Next() bool {
lookupCounter := 0
for f.Iterator.Next() { for f.Iterator.Next() {
// Do not excessively perform lookups if we constantly receive non-viable peers.
if lookupCounter > backOffCounter {
lookupCounter = 0
runtime.Gosched()
time.Sleep(30 * time.Second)
}
if f.Context.Err() != nil { if f.Context.Err() != nil {
return false return false
} }
if f.check(f.Node()) { if f.check(f.Node()) {
return true return true
} }
lookupCounter++
} }
return false return false
} }

View File

@@ -87,7 +87,18 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+ return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
"only %d out of %d peers were able to be found", topic, currNum, threshold) "only %d out of %d peers were able to be found", topic, currNum, threshold)
} }
nodes := enode.ReadNodes(iterator, int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)) nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
var err error
wantedICount := mathutil.Min(uint64(nodeCount), uint64(flags.Get().MaxConcurrentDials))
nodeCount, err = mathutil.Int(wantedICount)
if err != nil {
log.WithError(err).Error("Could not get wanted count")
continue
}
}
nodes := enode.ReadNodes(iterator, nodeCount)
for _, node := range nodes { for _, node := range nodes {
info, _, err := convertToAddrInfo(node) info, _, err := convertToAddrInfo(node)
if err != nil { if err != nil {

View File

@@ -227,6 +227,12 @@ var (
Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.", Usage: "Sets the minimum number of peers that a node will attempt to peer with that are subscribed to a subnet.",
Value: 6, Value: 6,
} }
// MaxConcurrentDials defines a flag to set the maximum number of peers that a node will attempt to dial with from discovery.
MaxConcurrentDials = &cli.Uint64Flag{
Name: "max-concurrent-dials",
Usage: "Sets the maximum number of peers that a node will attempt to dial with from discovery. By default we will dials as" +
"many peers as possible.",
}
// SuggestedFeeRecipient specifies the fee recipient for the transaction fees. // SuggestedFeeRecipient specifies the fee recipient for the transaction fees.
SuggestedFeeRecipient = &cli.StringFlag{ SuggestedFeeRecipient = &cli.StringFlag{
Name: "suggested-fee-recipient", Name: "suggested-fee-recipient",

View File

@@ -11,6 +11,7 @@ type GlobalFlags struct {
SubscribeToAllSubnets bool SubscribeToAllSubnets bool
MinimumSyncPeers int MinimumSyncPeers int
MinimumPeersPerSubnet int MinimumPeersPerSubnet int
MaxConcurrentDials int
BlockBatchLimit int BlockBatchLimit int
BlockBatchLimitBurstFactor int BlockBatchLimitBurstFactor int
BlobBatchLimit int BlobBatchLimit int
@@ -45,11 +46,17 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name) cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name) cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name) cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name)
configureMinimumPeers(ctx, cfg) configureMinimumPeers(ctx, cfg)
Init(cfg) Init(cfg)
} }
// MaxDialIsActive checks if the user has enabled the max dial flag.
func MaxDialIsActive() bool {
return Get().MaxConcurrentDials > 0
}
func configureMinimumPeers(ctx *cli.Context, cfg *GlobalFlags) { func configureMinimumPeers(ctx *cli.Context, cfg *GlobalFlags) {
cfg.MinimumSyncPeers = ctx.Int(MinSyncPeers.Name) cfg.MinimumSyncPeers = ctx.Int(MinSyncPeers.Name)
maxPeers := ctx.Int(cmd.P2PMaxPeers.Name) maxPeers := ctx.Int(cmd.P2PMaxPeers.Name)

View File

@@ -72,6 +72,7 @@ var appFlags = []cli.Flag{
flags.WeakSubjectivityCheckpoint, flags.WeakSubjectivityCheckpoint,
flags.Eth1HeaderReqLimit, flags.Eth1HeaderReqLimit,
flags.MinPeersPerSubnet, flags.MinPeersPerSubnet,
flags.MaxConcurrentDials,
flags.SuggestedFeeRecipient, flags.SuggestedFeeRecipient,
flags.TerminalTotalDifficultyOverride, flags.TerminalTotalDifficultyOverride,
flags.TerminalBlockHashOverride, flags.TerminalBlockHashOverride,

View File

@@ -124,6 +124,7 @@ var appHelpFlagGroups = []flagGroup{
flags.WeakSubjectivityCheckpoint, flags.WeakSubjectivityCheckpoint,
flags.Eth1HeaderReqLimit, flags.Eth1HeaderReqLimit,
flags.MinPeersPerSubnet, flags.MinPeersPerSubnet,
flags.MaxConcurrentDials,
flags.MevRelayEndpoint, flags.MevRelayEndpoint,
flags.MaxBuilderEpochMissedSlots, flags.MaxBuilderEpochMissedSlots,
flags.MaxBuilderConsecutiveMissedSlots, flags.MaxBuilderConsecutiveMissedSlots,

View File

@@ -21,8 +21,8 @@ import (
// MockNodeClient is a mock of NodeClient interface. // MockNodeClient is a mock of NodeClient interface.
type MockNodeClient struct { type MockNodeClient struct {
ctrl *gomock.Controller ctrl *gomock.Controller
recorder *MockNodeClientMockRecorder recorder *MockNodeClientMockRecorder
healthTracker *beacon.NodeHealthTracker healthTracker *beacon.NodeHealthTracker
} }

View File

@@ -76,4 +76,3 @@ func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, re
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
} }