Compare commits

...

6 Commits

Author SHA1 Message Date
james-prysm
34857ce004 moving s.registeredNetworkEntry registration 2025-09-29 13:49:02 -05:00
james-prysm
c3027862b2 fixing naming 2025-09-29 11:58:31 -05:00
james-prysm
7e2b730a04 Merge branch 'develop' into fix-subscriber-race 2025-09-29 11:54:55 -05:00
james-prysm
0cded0c1d6 gaz 2025-09-29 11:54:37 -05:00
james-prysm
f08ae428b2 fixing test 2025-09-29 08:32:16 -05:00
james-prysm
94ef8d780d first attempt 2025-09-28 23:39:20 -05:00
4 changed files with 253 additions and 72 deletions

View File

@@ -13,7 +13,13 @@ import (
// it will be in charge of subscribing/unsubscribing the relevant topics at the fork boundaries.
func (s *Service) forkWatcher() {
<-s.initialSyncComplete
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
genesisTime := s.cfg.clock.GenesisTime()
currentEpoch := slots.ToEpoch(slots.CurrentSlot(genesisTime))
// Initialize registeredNetworkEntry to the current network schedule entry to avoid
// duplicate subscriber registration on the first forkWatcher tick when the next
// epoch has the same digest.
s.registeredNetworkEntry = params.GetNetworkScheduleEntry(currentEpoch)
slotTicker := slots.NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
// In the event of a node restart, we will still end up subscribing to the correct

View File

@@ -85,12 +85,14 @@ type subnetTracker struct {
subscribeParameters
mu sync.RWMutex
subscriptions map[uint64]*pubsub.Subscription
trackedTopics map[uint64]string // Maps subnet to full topic string for unsubscribing
}
func newSubnetTracker(p subscribeParameters) *subnetTracker {
return &subnetTracker{
subscribeParameters: p,
subscriptions: make(map[uint64]*pubsub.Subscription),
trackedTopics: make(map[uint64]string),
}
}
@@ -117,31 +119,84 @@ func (t *subnetTracker) missing(wanted map[uint64]bool) []uint64 {
missing = append(missing, subnet)
}
}
if len(missing) > 0 {
log.WithFields(logrus.Fields{
"wanted": len(wanted),
"tracked": len(t.subscriptions),
"missing": missing,
"trackedSubnets": func() []uint64 {
var tracked []uint64
for k := range t.subscriptions {
tracked = append(tracked, k)
}
return tracked
}(),
}).Debug("Subnet tracker missing analysis")
}
return missing
}
// tryReserveSubnets atomically checks for missing subnets and reserves them for subscription.
// Returns a list of subnets that were successfully reserved by this call.
func (t *subnetTracker) tryReserveSubnets(wanted map[uint64]bool) []uint64 {
t.mu.Lock()
defer t.mu.Unlock()
var reserved []uint64
for subnet := range wanted {
if _, ok := t.subscriptions[subnet]; !ok {
// Reserve by marking as nil (pending subscription)
t.subscriptions[subnet] = nil
reserved = append(reserved, subnet)
}
}
return reserved
}
// cancelReservation removes a reservation for a subnet that failed to subscribe.
func (t *subnetTracker) cancelReservation(subnet uint64) {
t.mu.Lock()
defer t.mu.Unlock()
if sub, ok := t.subscriptions[subnet]; ok && sub == nil {
delete(t.subscriptions, subnet)
}
}
// cancelSubscription cancels and removes the subscription for a given subnet.
func (t *subnetTracker) cancelSubscription(subnet uint64) {
func (t *subnetTracker) cancelSubscription(subnet uint64) string {
t.mu.Lock()
defer t.mu.Unlock()
defer delete(t.subscriptions, subnet)
defer delete(t.trackedTopics, subnet)
topic := t.trackedTopics[subnet]
sub := t.subscriptions[subnet]
if sub == nil {
return
// Debug: Check for inconsistent state
if sub != nil && topic == "" {
log.WithField("subnet", subnet).Warn("Found subscription but no tracked topic")
} else if sub == nil && topic != "" {
log.WithField("subnet", subnet).WithField("topic", topic).Debug("Found tracked topic but no subscription object")
}
sub.Cancel()
if sub != nil {
sub.Cancel()
}
return topic
}
// track asks subscriptionTracker to hold on to the subscription for a given subnet so
// that we can remember that it is tracked and cancel its context when it's time to unsubscribe.
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription) {
if sub == nil {
return
}
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription, expectedTopic string) {
t.mu.Lock()
defer t.mu.Unlock()
t.subscriptions[subnet] = sub
// Use the topic from the subscription if available, otherwise use the expected one
// (when sub is nil, it means the subscription already existed)
if sub != nil {
t.trackedTopics[subnet] = sub.Topic()
} else {
t.trackedTopics[subnet] = expectedTopic
}
}
// noopValidator is a no-op that only decodes the message, but does not check its contents.
@@ -200,21 +255,27 @@ func (s *Service) spawn(f func()) {
// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.spawn(func() {
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
})
// Idempotent fixed-topic subscriptions: skip if already active.
fixed := []struct {
topicFormat string
val wrappedVal
handle subHandler
}{
{p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber},
{p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber},
{p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber},
{p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber},
{p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber},
}
for _, f := range fixed {
full := s.addDigestToTopic(f.topicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if s.subHandler.topicExists(full) {
continue
}
s.spawn(func(fmt string, val wrappedVal, handle subHandler) func() {
return func() { s.subscribe(fmt, val, handle, digest) }
}(f.topicFormat, f.val, f.handle))
}
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.AttestationSubnetTopicFormat,
@@ -228,14 +289,20 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
// New gossip topic in Altair
if params.BeaconConfig().AltairForkEpoch <= epoch {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
digest,
)
})
// SyncContributionAndProof fixed topic
{
full := s.addDigestToTopic(p2p.SyncContributionAndProofSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
digest,
)
})
}
}
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
@@ -247,35 +314,49 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
})
if features.Get().EnableLightClient {
s.spawn(func() {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
s.lightClientOptimisticUpdateSubscriber,
digest,
)
})
s.spawn(func() {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
s.lightClientFinalityUpdateSubscriber,
digest,
)
})
// Light client fixed topics
{
full := s.addDigestToTopic(p2p.LightClientOptimisticUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
s.lightClientOptimisticUpdateSubscriber,
digest,
)
})
}
}
{
full := s.addDigestToTopic(p2p.LightClientFinalityUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
s.lightClientFinalityUpdateSubscriber,
digest,
)
})
}
}
}
}
// New gossip topic in Capella
if params.BeaconConfig().CapellaForkEpoch <= epoch {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
digest,
)
})
full := s.addDigestToTopic(p2p.BlsToExecutionChangeSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
digest,
)
})
}
}
// New gossip topic in Deneb, removed in Electra
@@ -337,22 +418,32 @@ func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandle
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, digest), validator, handle)
fullTopic := s.addDigestToTopic(topic, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
_ = s.subscribeWithBase(fullTopic, validator, handle)
}
// subscribeWithBase subscribes to a topic that already includes the protocol suffix.
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
ok := s.subHandler.topicExists(topic)
if ok {
// Fast-path: avoid attempting if already actively subscribed
if s.subHandler.topicExists(topic) {
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
return nil
}
// Atomically check if topic exists and reserve it for subscription if it doesn't
if !s.subHandler.tryReserveTopic(topic) {
// Another goroutine may have just subscribed or reserved it; report and skip
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
return nil
}
// Register validator after successfully reserving the topic
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
log.WithError(err).Error("Could not register validator for topic")
// Cancel the reservation since validator registration failed
s.subHandler.cancelReservation(topic)
return nil
}
@@ -362,6 +453,8 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
// subscription filter.
log.WithError(err).Error("Could not subscribe topic")
// Cancel the reservation since subscription failed
s.subHandler.cancelReservation(topic)
return nil
}
@@ -504,8 +597,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
func (s *Service) pruneSubscriptions(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
topic := t.cancelSubscription(subnet)
s.unSubscribeFromTopic(topic)
}
}
@@ -530,10 +623,50 @@ func (s *Service) subscribeToSubnets(t *subnetTracker) error {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneSubscriptions(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
// Filter out subnets already globally subscribed and mark them tracked to prevent re-reserve.
// Work on a copy to avoid mutating the original map while iterating later code paths.
wanted := make(map[uint64]bool, len(subnetsToJoin))
for subnet := range subnetsToJoin {
full := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
if s.subHandler.topicExists(full) {
t.track(subnet, nil, full)
continue
}
wanted[subnet] = true
}
// Atomically reserve subnets that still need subscription
reservedSubnets := t.tryReserveSubnets(wanted)
if len(reservedSubnets) > 0 {
log.WithFields(logrus.Fields{
"reservedSubnets": reservedSubnets,
"totalWanted": len(subnetsToJoin),
}).Debug("Reserved missing subnets for subscription")
}
for _, subnet := range reservedSubnets {
fullTopic := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
// Skip if already subscribed globally (e.g., another goroutine won the race).
// Mark as tracked so we don't repeatedly reserve and skip on subsequent ticks.
if s.subHandler.topicExists(fullTopic) {
t.track(subnet, nil, fullTopic)
continue
}
log.WithFields(logrus.Fields{
"subnet": subnet,
"topic": fullTopic,
}).Debug("Attempting to subscribe to reserved subnet")
sub := s.subscribeWithBase(fullTopic, t.validate, t.handle)
if sub != nil {
t.track(subnet, sub, fullTopic)
} else {
// Subscription failed, cancel the reservation
t.cancelReservation(subnet)
log.WithFields(logrus.Fields{
"subnet": subnet,
"topic": fullTopic,
}).Debug("Subscription failed, canceled reservation")
}
}
return nil

View File

@@ -317,7 +317,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal))
sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
require.NoError(t, err)
tracker.track(c1, sub1)
tracker.track(c1, sub1, fullTopic)
// committee index 2
c2 := uint64(2)
@@ -327,7 +327,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
require.NoError(t, err)
sub2, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
require.NoError(t, err)
tracker.track(c2, sub2)
tracker.track(c2, sub2, fullTopic)
r.pruneSubscriptions(tracker, map[uint64]bool{c2: true})
require.LogsDoNotContain(t, hook, "Could not unregister topic validator")

View File

@@ -26,20 +26,31 @@ func newSubTopicHandler() *subTopicHandler {
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
s.Lock()
defer s.Unlock()
// Check if this topic was previously reserved (nil value means reserved)
prevSub, wasReserved := s.subTopics[topic]
s.subTopics[topic] = sub
digest, err := p2p.ExtractGossipDigest(topic)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
return
}
s.digestMap[digest] += 1
// Only increment digest count if this is truly a new subscription:
// - Topic didn't exist before (!wasReserved)
// - Topic was reserved (nil) and now getting a real subscription
if !wasReserved || prevSub == nil {
s.digestMap[digest] += 1
}
}
func (s *subTopicHandler) topicExists(topic string) bool {
if s == nil {
return false
}
s.RLock()
defer s.RUnlock()
_, ok := s.subTopics[topic]
return ok
sub, ok := s.subTopics[topic]
// Topic exists if it's in the map and has a real subscription (not just reserved)
return ok && sub != nil
}
func (s *subTopicHandler) removeTopic(topic string) {
@@ -88,3 +99,34 @@ func (s *subTopicHandler) subForTopic(topic string) *pubsub.Subscription {
defer s.RUnlock()
return s.subTopics[topic]
}
// tryReserveTopic atomically checks if a topic has an active subscription or reservation and reserves it if not.
// Returns true if the topic was successfully reserved, false if it already has an active subscription or reservation.
// If true is returned, the caller is responsible for calling addTopic with the actual subscription.
func (s *subTopicHandler) tryReserveTopic(topic string) bool {
if s == nil {
return false
}
s.Lock()
defer s.Unlock()
_, exists := s.subTopics[topic]
// Reject if topic already exists (either reserved with nil or has real subscription)
if exists {
return false
}
// Reserve the topic by adding a nil entry to prevent other goroutines from registering
s.subTopics[topic] = nil
return true
}
// cancelReservation removes a topic reservation if the subscription failed.
func (s *subTopicHandler) cancelReservation(topic string) {
if s == nil {
return
}
s.Lock()
defer s.Unlock()
if sub, exists := s.subTopics[topic]; exists && sub == nil {
delete(s.subTopics, topic)
}
}