mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Fix CPU usage in small devnets (#14446)
* `CustodyCountFromRemotePeer`: Set happy path in the outer scope. * `FindPeersWithSubnet`: Improve logging. * `listenForNewNodes`: Avoid infinite loop in a small subnet. * Address Nishant's comment. * FIx Nishant's comment.
This commit is contained in:
@@ -18,7 +18,6 @@ go_library(
|
||||
"handshake.go",
|
||||
"info.go",
|
||||
"interfaces.go",
|
||||
"iterator.go",
|
||||
"log.go",
|
||||
"message_id.go",
|
||||
"monitoring.go",
|
||||
|
||||
@@ -230,11 +230,11 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer bootListener.Close()
|
||||
|
||||
// Use shorter period for testing.
|
||||
currentPeriod := pollingPeriod
|
||||
pollingPeriod = 1 * time.Second
|
||||
// Use smaller batch size for testing.
|
||||
currentBatchSize := batchSize
|
||||
batchSize = 2
|
||||
defer func() {
|
||||
pollingPeriod = currentPeriod
|
||||
batchSize = currentBatchSize
|
||||
}()
|
||||
|
||||
bootNode := bootListener.Self()
|
||||
|
||||
@@ -72,26 +72,10 @@ loop:
|
||||
return validPeers, nil
|
||||
}
|
||||
|
||||
// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
|
||||
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
|
||||
func (s *Service) custodyCountFromRemotePeerEnr(pid peer.ID) uint64 {
|
||||
// By default, we assume the peer custodies the minimum number of subnets.
|
||||
custodyRequirement := params.BeaconConfig().CustodyRequirement
|
||||
|
||||
// First, try to get the custody count from the peer's metadata.
|
||||
metadata, err := s.peers.Metadata(pid)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
|
||||
}
|
||||
|
||||
if metadata != nil {
|
||||
custodyCount := metadata.CustodySubnetCount()
|
||||
if custodyCount > 0 {
|
||||
return custodyCount
|
||||
}
|
||||
}
|
||||
|
||||
log.WithField("peerID", pid).Debug("Failed to retrieve custody count from metadata for peer, defaulting to the ENR value")
|
||||
|
||||
// Retrieve the ENR of the peer.
|
||||
record, err := s.peers.ENR(pid)
|
||||
if err != nil {
|
||||
@@ -116,3 +100,30 @@ func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
|
||||
|
||||
return custodyCount
|
||||
}
|
||||
|
||||
// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
|
||||
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
|
||||
// Try to get the custody count from the peer's metadata.
|
||||
metadata, err := s.peers.Metadata(pid)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
|
||||
return s.custodyCountFromRemotePeerEnr(pid)
|
||||
}
|
||||
|
||||
// If the metadata is nil, default to the ENR value.
|
||||
if metadata == nil {
|
||||
log.WithField("peerID", pid).Debug("Metadata is nil, defaulting to the ENR value")
|
||||
return s.custodyCountFromRemotePeerEnr(pid)
|
||||
}
|
||||
|
||||
// Get the custody subnets count from the metadata.
|
||||
custodyCount := metadata.CustodySubnetCount()
|
||||
|
||||
// If the custody count is null, default to the ENR value.
|
||||
if custodyCount == 0 {
|
||||
log.WithField("peerID", pid).Debug("The custody count extracted from the metadata equals to 0, defaulting to the ENR value")
|
||||
return s.custodyCountFromRemotePeerEnr(pid)
|
||||
}
|
||||
|
||||
return custodyCount
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
|
||||
@@ -191,7 +192,26 @@ func (s *Service) RefreshPersistentSubnets() {
|
||||
|
||||
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
|
||||
func (s *Service) listenForNewNodes() {
|
||||
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
|
||||
const minLogInterval = 1 * time.Minute
|
||||
|
||||
peersSummary := func(threshold uint) (uint, uint) {
|
||||
// Retrieve how many active peers we have.
|
||||
activePeers := s.Peers().Active()
|
||||
activePeerCount := uint(len(activePeers))
|
||||
|
||||
// Compute how many peers we are missing to reach the threshold.
|
||||
if activePeerCount >= threshold {
|
||||
return activePeerCount, 0
|
||||
}
|
||||
|
||||
missingPeerCount := threshold - activePeerCount
|
||||
|
||||
return activePeerCount, missingPeerCount
|
||||
}
|
||||
|
||||
var lastLogTime time.Time
|
||||
|
||||
iterator := s.dv5Listener.RandomNodes()
|
||||
defer iterator.Close()
|
||||
|
||||
for {
|
||||
@@ -207,17 +227,35 @@ func (s *Service) listenForNewNodes() {
|
||||
time.Sleep(pollingPeriod)
|
||||
continue
|
||||
}
|
||||
wantedCount := s.wantedPeerDials()
|
||||
if wantedCount == 0 {
|
||||
|
||||
// Compute the number of new peers we want to dial.
|
||||
activePeerCount, missingPeerCount := peersSummary(s.cfg.MaxPeers)
|
||||
|
||||
fields := logrus.Fields{
|
||||
"currentPeerCount": activePeerCount,
|
||||
"targetPeerCount": s.cfg.MaxPeers,
|
||||
}
|
||||
|
||||
if missingPeerCount == 0 {
|
||||
log.Trace("Not looking for peers, at peer limit")
|
||||
time.Sleep(pollingPeriod)
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Since(lastLogTime) > minLogInterval {
|
||||
lastLogTime = time.Now()
|
||||
log.WithFields(fields).Debug("Searching for new active peers")
|
||||
}
|
||||
|
||||
// Restrict dials if limit is applied.
|
||||
if flags.MaxDialIsActive() {
|
||||
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
|
||||
maxConcurrentDials := uint(flags.Get().MaxConcurrentDials)
|
||||
missingPeerCount = min(missingPeerCount, maxConcurrentDials)
|
||||
}
|
||||
wantedNodes := enode.ReadNodes(iterator, wantedCount)
|
||||
|
||||
// Search for new peers.
|
||||
wantedNodes := searchForPeers(iterator, batchSize, missingPeerCount, s.filterPeer)
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
for i := 0; i < len(wantedNodes); i++ {
|
||||
node := wantedNodes[i]
|
||||
@@ -487,17 +525,6 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
|
||||
return activePeers >= maxPeers || numOfConns >= maxPeers
|
||||
}
|
||||
|
||||
func (s *Service) wantedPeerDials() int {
|
||||
maxPeers := int(s.cfg.MaxPeers)
|
||||
|
||||
activePeers := len(s.Peers().Active())
|
||||
wantedCount := 0
|
||||
if maxPeers > activePeers {
|
||||
wantedCount = maxPeers - activePeers
|
||||
}
|
||||
return wantedCount
|
||||
}
|
||||
|
||||
// PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
|
||||
func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
|
||||
var allAddrs []ma.Multiaddr
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
)
|
||||
|
||||
// filterNodes wraps an iterator such that Next only returns nodes for which
|
||||
// the 'check' function returns true. This custom implementation also
|
||||
// checks for context deadlines so that in the event the parent context has
|
||||
// expired, we do exit from the search rather than perform more network
|
||||
// lookups for additional peers.
|
||||
func filterNodes(ctx context.Context, it enode.Iterator, check func(*enode.Node) bool) enode.Iterator {
|
||||
return &filterIter{ctx, it, check}
|
||||
}
|
||||
|
||||
type filterIter struct {
|
||||
context.Context
|
||||
enode.Iterator
|
||||
check func(*enode.Node) bool
|
||||
}
|
||||
|
||||
// Next looks up for the next valid node according to our
|
||||
// filter criteria.
|
||||
func (f *filterIter) Next() bool {
|
||||
for f.Iterator.Next() {
|
||||
if f.Context.Err() != nil {
|
||||
return false
|
||||
}
|
||||
if f.check(f.Node()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -43,6 +43,10 @@ var _ runtime.Service = (*Service)(nil)
|
||||
// defined below.
|
||||
var pollingPeriod = 6 * time.Second
|
||||
|
||||
// When looking for new nodes, if not enough nodes are found,
|
||||
// we stop after this amount of iterations.
|
||||
var batchSize = 2_000
|
||||
|
||||
// Refresh rate of ENR set at twice per slot.
|
||||
var refreshRate = slots.DivideSlotBy(2)
|
||||
|
||||
|
||||
@@ -201,11 +201,11 @@ func TestListenForNewNodes(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer bootListener.Close()
|
||||
|
||||
// Use shorter period for testing.
|
||||
currentPeriod := pollingPeriod
|
||||
pollingPeriod = 1 * time.Second
|
||||
// Use shorter batch size for testing.
|
||||
currentBatchSize := batchSize
|
||||
batchSize = 5
|
||||
defer func() {
|
||||
pollingPeriod = currentPeriod
|
||||
batchSize = currentBatchSize
|
||||
}()
|
||||
|
||||
bootNode := bootListener.Self()
|
||||
|
||||
@@ -76,11 +76,11 @@ func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node)
|
||||
func searchForPeers(
|
||||
iterator enode.Iterator,
|
||||
batchSize int,
|
||||
peersToFindCount int,
|
||||
peersToFindCount uint,
|
||||
filter func(node *enode.Node) bool,
|
||||
) []*enode.Node {
|
||||
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
|
||||
for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ {
|
||||
for i := 0; i < batchSize && uint(len(nodeFromNodeID)) <= peersToFindCount && iterator.Next(); i++ {
|
||||
node := iterator.Node()
|
||||
|
||||
// Filter out nodes that do not meet the criteria.
|
||||
@@ -141,7 +141,7 @@ func (s *Service) FindPeersWithSubnet(
|
||||
index uint64,
|
||||
threshold int,
|
||||
) (bool, error) {
|
||||
const batchSize = 2000
|
||||
const minLogInterval = 1 * time.Minute
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
|
||||
defer span.End()
|
||||
@@ -180,19 +180,17 @@ func (s *Service) FindPeersWithSubnet(
|
||||
return true, nil
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"topic": topic,
|
||||
"currentPeerCount": peerCountForTopic,
|
||||
"targetPeerCount": threshold,
|
||||
}).Debug("Searching for new peers in the network - Start")
|
||||
log := log.WithFields(logrus.Fields{
|
||||
"topic": topic,
|
||||
"targetPeerCount": threshold,
|
||||
})
|
||||
|
||||
log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - start")
|
||||
|
||||
lastLogTime := time.Now()
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
for {
|
||||
// If we have enough peers, we can exit the loop. This is the happy path.
|
||||
if missingPeerCountForTopic == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// If the context is done, we can exit the loop. This is the unhappy path.
|
||||
if err := ctx.Err(); err != nil {
|
||||
return false, errors.Errorf(
|
||||
@@ -202,7 +200,7 @@ func (s *Service) FindPeersWithSubnet(
|
||||
}
|
||||
|
||||
// Search for new peers in the network.
|
||||
nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter)
|
||||
nodes := searchForPeers(iterator, batchSize, uint(missingPeerCountForTopic), filter)
|
||||
|
||||
// Restrict dials if limit is applied.
|
||||
maxConcurrentDials := math.MaxInt
|
||||
@@ -221,10 +219,20 @@ func (s *Service) FindPeersWithSubnet(
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
_, missingPeerCountForTopic = peersSummary(topic, threshold)
|
||||
peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold)
|
||||
|
||||
// If we have enough peers, we can exit the loop. This is the happy path.
|
||||
if missingPeerCountForTopic == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if time.Since(lastLogTime) > minLogInterval {
|
||||
lastLogTime = time.Now()
|
||||
log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - continue")
|
||||
}
|
||||
}
|
||||
|
||||
log.WithField("topic", topic).Debug("Searching for new peers in the network - Success")
|
||||
log.WithField("currentPeerCount", threshold).Debug("Searching for new peers for a subnet - success")
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user