Compare commits

...

8 Commits

Author SHA1 Message Date
james-prysm
34857ce004 moving s.registeredNetworkEntry registration 2025-09-29 13:49:02 -05:00
james-prysm
c3027862b2 fixing naming 2025-09-29 11:58:31 -05:00
james-prysm
7e2b730a04 Merge branch 'develop' into fix-subscriber-race 2025-09-29 11:54:55 -05:00
james-prysm
0cded0c1d6 gaz 2025-09-29 11:54:37 -05:00
james-prysm
6139d58fa5 fixing config string parsing regression (#15773)
* adding string parsing and test

* gaz
2025-09-29 16:15:01 +00:00
james-prysm
f08ae428b2 fixing test 2025-09-29 08:32:16 -05:00
james-prysm
94ef8d780d first attempt 2025-09-28 23:39:20 -05:00
Sahil Sojitra
0ea5e2cf9d refactor to use reflect.TypeFor (#15627)
* refactor to use reflect.TypeFor

* added changelog fragment file

* update changelog

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2025-09-26 17:26:44 +00:00
10 changed files with 301 additions and 73 deletions

View File

@@ -27,5 +27,7 @@ go_test(
"//testing/require:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -132,6 +132,10 @@ func convertValueForJSON(v reflect.Value, tag string) interface{} {
}
return m
// ===== String =====
case reflect.String:
return v.String()
// ===== Default =====
default:
log.WithFields(log.Fields{

View File

@@ -8,6 +8,7 @@ import (
"math"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
@@ -17,6 +18,8 @@ import (
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
log "github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestGetDepositContract(t *testing.T) {
@@ -715,3 +718,35 @@ func TestGetSpec_BlobSchedule_NotFulu(t *testing.T) {
_, exists := data["BLOB_SCHEDULE"]
require.Equal(t, false, exists)
}
func TestConvertValueForJSON_NoErrorLogsForStrings(t *testing.T) {
logHook := logTest.NewLocal(log.StandardLogger())
defer logHook.Reset()
stringTestCases := []struct {
tag string
value string
}{
{"CONFIG_NAME", "mainnet"},
{"PRESET_BASE", "mainnet"},
{"DEPOSIT_CONTRACT_ADDRESS", "0x00000000219ab540356cBB839Cbe05303d7705Fa"},
{"TERMINAL_TOTAL_DIFFICULTY", "58750000000000000000000"},
}
for _, tc := range stringTestCases {
t.Run(tc.tag, func(t *testing.T) {
logHook.Reset()
// Convert the string value
v := reflect.ValueOf(tc.value)
result := convertValueForJSON(v, tc.tag)
// Verify the result is correct
require.Equal(t, tc.value, result)
// Verify NO error was logged about unsupported field kind
require.LogsDoNotContain(t, logHook, "Unsupported config field kind")
require.LogsDoNotContain(t, logHook, "kind=string")
})
}
}

View File

@@ -13,7 +13,13 @@ import (
// it will be in charge of subscribing/unsubscribing the relevant topics at the fork boundaries.
func (s *Service) forkWatcher() {
<-s.initialSyncComplete
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
genesisTime := s.cfg.clock.GenesisTime()
currentEpoch := slots.ToEpoch(slots.CurrentSlot(genesisTime))
// Initialize registeredNetworkEntry to the current network schedule entry to avoid
// duplicate subscriber registration on the first forkWatcher tick when the next
// epoch has the same digest.
s.registeredNetworkEntry = params.GetNetworkScheduleEntry(currentEpoch)
slotTicker := slots.NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
// In the event of a node restart, we will still end up subscribing to the correct

View File

@@ -85,12 +85,14 @@ type subnetTracker struct {
subscribeParameters
mu sync.RWMutex
subscriptions map[uint64]*pubsub.Subscription
trackedTopics map[uint64]string // Maps subnet to full topic string for unsubscribing
}
func newSubnetTracker(p subscribeParameters) *subnetTracker {
return &subnetTracker{
subscribeParameters: p,
subscriptions: make(map[uint64]*pubsub.Subscription),
trackedTopics: make(map[uint64]string),
}
}
@@ -117,31 +119,84 @@ func (t *subnetTracker) missing(wanted map[uint64]bool) []uint64 {
missing = append(missing, subnet)
}
}
if len(missing) > 0 {
log.WithFields(logrus.Fields{
"wanted": len(wanted),
"tracked": len(t.subscriptions),
"missing": missing,
"trackedSubnets": func() []uint64 {
var tracked []uint64
for k := range t.subscriptions {
tracked = append(tracked, k)
}
return tracked
}(),
}).Debug("Subnet tracker missing analysis")
}
return missing
}
// tryReserveSubnets atomically checks for missing subnets and reserves them for subscription.
// Returns a list of subnets that were successfully reserved by this call.
func (t *subnetTracker) tryReserveSubnets(wanted map[uint64]bool) []uint64 {
t.mu.Lock()
defer t.mu.Unlock()
var reserved []uint64
for subnet := range wanted {
if _, ok := t.subscriptions[subnet]; !ok {
// Reserve by marking as nil (pending subscription)
t.subscriptions[subnet] = nil
reserved = append(reserved, subnet)
}
}
return reserved
}
// cancelReservation removes a reservation for a subnet that failed to subscribe.
func (t *subnetTracker) cancelReservation(subnet uint64) {
t.mu.Lock()
defer t.mu.Unlock()
if sub, ok := t.subscriptions[subnet]; ok && sub == nil {
delete(t.subscriptions, subnet)
}
}
// cancelSubscription cancels and removes the subscription for a given subnet.
func (t *subnetTracker) cancelSubscription(subnet uint64) {
func (t *subnetTracker) cancelSubscription(subnet uint64) string {
t.mu.Lock()
defer t.mu.Unlock()
defer delete(t.subscriptions, subnet)
defer delete(t.trackedTopics, subnet)
topic := t.trackedTopics[subnet]
sub := t.subscriptions[subnet]
if sub == nil {
return
// Debug: Check for inconsistent state
if sub != nil && topic == "" {
log.WithField("subnet", subnet).Warn("Found subscription but no tracked topic")
} else if sub == nil && topic != "" {
log.WithField("subnet", subnet).WithField("topic", topic).Debug("Found tracked topic but no subscription object")
}
sub.Cancel()
if sub != nil {
sub.Cancel()
}
return topic
}
// track asks subscriptionTracker to hold on to the subscription for a given subnet so
// that we can remember that it is tracked and cancel its context when it's time to unsubscribe.
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription) {
if sub == nil {
return
}
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription, expectedTopic string) {
t.mu.Lock()
defer t.mu.Unlock()
t.subscriptions[subnet] = sub
// Use the topic from the subscription if available, otherwise use the expected one
// (when sub is nil, it means the subscription already existed)
if sub != nil {
t.trackedTopics[subnet] = sub.Topic()
} else {
t.trackedTopics[subnet] = expectedTopic
}
}
// noopValidator is a no-op that only decodes the message, but does not check its contents.
@@ -200,21 +255,27 @@ func (s *Service) spawn(f func()) {
// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.spawn(func() {
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
})
s.spawn(func() {
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
})
// Idempotent fixed-topic subscriptions: skip if already active.
fixed := []struct {
topicFormat string
val wrappedVal
handle subHandler
}{
{p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber},
{p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber},
{p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber},
{p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber},
{p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber},
}
for _, f := range fixed {
full := s.addDigestToTopic(f.topicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if s.subHandler.topicExists(full) {
continue
}
s.spawn(func(fmt string, val wrappedVal, handle subHandler) func() {
return func() { s.subscribe(fmt, val, handle, digest) }
}(f.topicFormat, f.val, f.handle))
}
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.AttestationSubnetTopicFormat,
@@ -228,14 +289,20 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
// New gossip topic in Altair
if params.BeaconConfig().AltairForkEpoch <= epoch {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
digest,
)
})
// SyncContributionAndProof fixed topic
{
full := s.addDigestToTopic(p2p.SyncContributionAndProofSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
digest,
)
})
}
}
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
@@ -247,35 +314,49 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
})
if features.Get().EnableLightClient {
s.spawn(func() {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
s.lightClientOptimisticUpdateSubscriber,
digest,
)
})
s.spawn(func() {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
s.lightClientFinalityUpdateSubscriber,
digest,
)
})
// Light client fixed topics
{
full := s.addDigestToTopic(p2p.LightClientOptimisticUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
s.lightClientOptimisticUpdateSubscriber,
digest,
)
})
}
}
{
full := s.addDigestToTopic(p2p.LightClientFinalityUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
s.lightClientFinalityUpdateSubscriber,
digest,
)
})
}
}
}
}
// New gossip topic in Capella
if params.BeaconConfig().CapellaForkEpoch <= epoch {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
digest,
)
})
full := s.addDigestToTopic(p2p.BlsToExecutionChangeSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
if !s.subHandler.topicExists(full) {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
digest,
)
})
}
}
// New gossip topic in Deneb, removed in Electra
@@ -337,22 +418,32 @@ func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandle
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, digest), validator, handle)
fullTopic := s.addDigestToTopic(topic, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
_ = s.subscribeWithBase(fullTopic, validator, handle)
}
// subscribeWithBase subscribes to a topic that already includes the protocol suffix.
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
ok := s.subHandler.topicExists(topic)
if ok {
// Fast-path: avoid attempting if already actively subscribed
if s.subHandler.topicExists(topic) {
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
return nil
}
// Atomically check if topic exists and reserve it for subscription if it doesn't
if !s.subHandler.tryReserveTopic(topic) {
// Another goroutine may have just subscribed or reserved it; report and skip
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
return nil
}
// Register validator after successfully reserving the topic
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
log.WithError(err).Error("Could not register validator for topic")
// Cancel the reservation since validator registration failed
s.subHandler.cancelReservation(topic)
return nil
}
@@ -362,6 +453,8 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
// subscription filter.
log.WithError(err).Error("Could not subscribe topic")
// Cancel the reservation since subscription failed
s.subHandler.cancelReservation(topic)
return nil
}
@@ -504,8 +597,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
func (s *Service) pruneSubscriptions(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
topic := t.cancelSubscription(subnet)
s.unSubscribeFromTopic(topic)
}
}
@@ -530,10 +623,50 @@ func (s *Service) subscribeToSubnets(t *subnetTracker) error {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneSubscriptions(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
// Filter out subnets already globally subscribed and mark them tracked to prevent re-reserve.
// Work on a copy to avoid mutating the original map while iterating later code paths.
wanted := make(map[uint64]bool, len(subnetsToJoin))
for subnet := range subnetsToJoin {
full := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
if s.subHandler.topicExists(full) {
t.track(subnet, nil, full)
continue
}
wanted[subnet] = true
}
// Atomically reserve subnets that still need subscription
reservedSubnets := t.tryReserveSubnets(wanted)
if len(reservedSubnets) > 0 {
log.WithFields(logrus.Fields{
"reservedSubnets": reservedSubnets,
"totalWanted": len(subnetsToJoin),
}).Debug("Reserved missing subnets for subscription")
}
for _, subnet := range reservedSubnets {
fullTopic := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
// Skip if already subscribed globally (e.g., another goroutine won the race).
// Mark as tracked so we don't repeatedly reserve and skip on subsequent ticks.
if s.subHandler.topicExists(fullTopic) {
t.track(subnet, nil, fullTopic)
continue
}
log.WithFields(logrus.Fields{
"subnet": subnet,
"topic": fullTopic,
}).Debug("Attempting to subscribe to reserved subnet")
sub := s.subscribeWithBase(fullTopic, t.validate, t.handle)
if sub != nil {
t.track(subnet, sub, fullTopic)
} else {
// Subscription failed, cancel the reservation
t.cancelReservation(subnet)
log.WithFields(logrus.Fields{
"subnet": subnet,
"topic": fullTopic,
}).Debug("Subscription failed, canceled reservation")
}
}
return nil

View File

@@ -317,7 +317,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal))
sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
require.NoError(t, err)
tracker.track(c1, sub1)
tracker.track(c1, sub1, fullTopic)
// committee index 2
c2 := uint64(2)
@@ -327,7 +327,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
require.NoError(t, err)
sub2, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
require.NoError(t, err)
tracker.track(c2, sub2)
tracker.track(c2, sub2, fullTopic)
r.pruneSubscriptions(tracker, map[uint64]bool{c2: true})
require.LogsDoNotContain(t, hook, "Could not unregister topic validator")

View File

@@ -26,20 +26,31 @@ func newSubTopicHandler() *subTopicHandler {
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
s.Lock()
defer s.Unlock()
// Check if this topic was previously reserved (nil value means reserved)
prevSub, wasReserved := s.subTopics[topic]
s.subTopics[topic] = sub
digest, err := p2p.ExtractGossipDigest(topic)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
return
}
s.digestMap[digest] += 1
// Only increment digest count if this is truly a new subscription:
// - Topic didn't exist before (!wasReserved)
// - Topic was reserved (nil) and now getting a real subscription
if !wasReserved || prevSub == nil {
s.digestMap[digest] += 1
}
}
func (s *subTopicHandler) topicExists(topic string) bool {
if s == nil {
return false
}
s.RLock()
defer s.RUnlock()
_, ok := s.subTopics[topic]
return ok
sub, ok := s.subTopics[topic]
// Topic exists if it's in the map and has a real subscription (not just reserved)
return ok && sub != nil
}
func (s *subTopicHandler) removeTopic(topic string) {
@@ -88,3 +99,34 @@ func (s *subTopicHandler) subForTopic(topic string) *pubsub.Subscription {
defer s.RUnlock()
return s.subTopics[topic]
}
// tryReserveTopic atomically checks if a topic has an active subscription or reservation and reserves it if not.
// Returns true if the topic was successfully reserved, false if it already has an active subscription or reservation.
// If true is returned, the caller is responsible for calling addTopic with the actual subscription.
func (s *subTopicHandler) tryReserveTopic(topic string) bool {
if s == nil {
return false
}
s.Lock()
defer s.Unlock()
_, exists := s.subTopics[topic]
// Reject if topic already exists (either reserved with nil or has real subscription)
if exists {
return false
}
// Reserve the topic by adding a nil entry to prevent other goroutines from registering
s.subTopics[topic] = nil
return true
}
// cancelReservation removes a topic reservation if the subscription failed.
func (s *subTopicHandler) cancelReservation(topic string) {
if s == nil {
return
}
s.Lock()
defer s.Unlock()
if sub, exists := s.subTopics[topic]; exists && sub == nil {
delete(s.subTopics, topic)
}
}

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixing Unsupported config field kind; value forwarded verbatim errors for type string.

View File

@@ -0,0 +1,3 @@
### Changed
- Replaced reflect.TypeOf with reflect.TypeFor

View File

@@ -320,6 +320,6 @@ func IsProto(item interface{}) bool {
return ok
}
elemTyp := typ.Elem()
modelType := reflect.TypeOf((*proto.Message)(nil)).Elem()
modelType := reflect.TypeFor[proto.Message]()
return elemTyp.Implements(modelType)
}