From 577899bfec0ff52d4db9fdb564140770435553fc Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Fri, 31 Oct 2025 10:25:18 -0500 Subject: [PATCH] P2p active val count lock (#15955) * Add a lock for p2p computation of active validator count and limit only to topics that need it. * Changelog fragment * Update gossip_scoring_params.go Wrap errors --- beacon-chain/p2p/gossip_scoring_params.go | 19 ++++++-- beacon-chain/p2p/service.go | 59 ++++++++++++----------- changelog/pvl-active-val-count-lock.md | 4 ++ 3 files changed, 49 insertions(+), 33 deletions(-) create mode 100644 changelog/pvl-active-val-count-lock.md diff --git a/beacon-chain/p2p/gossip_scoring_params.go b/beacon-chain/p2p/gossip_scoring_params.go index 84c80d4f1a..69a853566a 100644 --- a/beacon-chain/p2p/gossip_scoring_params.go +++ b/beacon-chain/p2p/gossip_scoring_params.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "fmt" "math" "net" "reflect" @@ -106,18 +107,26 @@ func peerScoringParams(colocationWhitelist []*net.IPNet) (*pubsub.PeerScoreParam } func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, error) { - activeValidators, err := s.retrieveActiveValidators() - if err != nil { - return nil, err - } switch { case strings.Contains(topic, GossipBlockMessage): return defaultBlockTopicParams(), nil case strings.Contains(topic, GossipAggregateAndProofMessage): + activeValidators, err := s.retrieveActiveValidators() + if err != nil { + return nil, fmt.Errorf("failed to compute active validator count for topic %s: %w", GossipAggregateAndProofMessage, err) + } return defaultAggregateTopicParams(activeValidators), nil case strings.Contains(topic, GossipAttestationMessage): + activeValidators, err := s.retrieveActiveValidators() + if err != nil { + return nil, fmt.Errorf("failed to compute active validator count for topic %s: %w", GossipAttestationMessage, err) + } return defaultAggregateSubnetTopicParams(activeValidators), nil case strings.Contains(topic, GossipSyncCommitteeMessage): + activeValidators, err := s.retrieveActiveValidators() + if err != nil { + return nil, fmt.Errorf("failed to compute active validator count for topic %s: %w", GossipSyncCommitteeMessage, err) + } return defaultSyncSubnetTopicParams(activeValidators), nil case strings.Contains(topic, GossipContributionAndProofMessage): return defaultSyncContributionTopicParams(), nil @@ -142,6 +151,8 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro } func (s *Service) retrieveActiveValidators() (uint64, error) { + s.activeValidatorCountLock.Lock() + defer s.activeValidatorCountLock.Unlock() if s.activeValidatorCount != 0 { return s.activeValidatorCount, nil } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 000f65ce88..40cff9e35f 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -65,35 +65,36 @@ var ( // Service for managing peer to peer (p2p) networking. type Service struct { - started bool - isPreGenesis bool - pingMethod func(ctx context.Context, id peer.ID) error - pingMethodLock sync.RWMutex - cancel context.CancelFunc - cfg *Config - peers *peers.Status - addrFilter *multiaddr.Filters - ipLimiter *leakybucket.Collector - privKey *ecdsa.PrivateKey - metaData metadata.Metadata - pubsub *pubsub.PubSub - joinedTopics map[string]*pubsub.Topic - joinedTopicsLock sync.RWMutex - subnetsLock map[uint64]*sync.RWMutex - subnetsLockLock sync.Mutex // Lock access to subnetsLock - initializationLock sync.Mutex - dv5Listener ListenerRebooter - startupErr error - ctx context.Context - host host.Host - genesisTime time.Time - genesisValidatorsRoot []byte - activeValidatorCount uint64 - peerDisconnectionTime *cache.Cache - custodyInfo *custodyInfo - custodyInfoLock sync.RWMutex // Lock access to custodyInfo - custodyInfoSet chan struct{} - allForkDigests map[[4]byte]struct{} + started bool + isPreGenesis bool + pingMethod func(ctx context.Context, id peer.ID) error + pingMethodLock sync.RWMutex + cancel context.CancelFunc + cfg *Config + peers *peers.Status + addrFilter *multiaddr.Filters + ipLimiter *leakybucket.Collector + privKey *ecdsa.PrivateKey + metaData metadata.Metadata + pubsub *pubsub.PubSub + joinedTopics map[string]*pubsub.Topic + joinedTopicsLock sync.RWMutex + subnetsLock map[uint64]*sync.RWMutex + subnetsLockLock sync.Mutex // Lock access to subnetsLock + initializationLock sync.Mutex + dv5Listener ListenerRebooter + startupErr error + ctx context.Context + host host.Host + genesisTime time.Time + genesisValidatorsRoot []byte + activeValidatorCount uint64 + activeValidatorCountLock sync.Mutex + peerDisconnectionTime *cache.Cache + custodyInfo *custodyInfo + custodyInfoLock sync.RWMutex // Lock access to custodyInfo + custodyInfoSet chan struct{} + allForkDigests map[[4]byte]struct{} } type custodyInfo struct { diff --git a/changelog/pvl-active-val-count-lock.md b/changelog/pvl-active-val-count-lock.md new file mode 100644 index 0000000000..68a9b3b546 --- /dev/null +++ b/changelog/pvl-active-val-count-lock.md @@ -0,0 +1,4 @@ +### Fixed + +- Changed the behavior of topic subscriptions such that only topics that require the active validator count will compute that value. +- Added a Mutex to the computation of active validator count during topic subscription to avoid a race condition where multiple goroutines are computing the same work.