mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
8 Commits
go-1.24.7
...
fix-subscr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34857ce004 | ||
|
|
c3027862b2 | ||
|
|
7e2b730a04 | ||
|
|
0cded0c1d6 | ||
|
|
6139d58fa5 | ||
|
|
f08ae428b2 | ||
|
|
94ef8d780d | ||
|
|
0ea5e2cf9d |
@@ -27,5 +27,7 @@ go_test(
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -132,6 +132,10 @@ func convertValueForJSON(v reflect.Value, tag string) interface{} {
|
||||
}
|
||||
return m
|
||||
|
||||
// ===== String =====
|
||||
case reflect.String:
|
||||
return v.String()
|
||||
|
||||
// ===== Default =====
|
||||
default:
|
||||
log.WithFields(log.Fields{
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api/server/structs"
|
||||
@@ -17,6 +18,8 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
log "github.com/sirupsen/logrus"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestGetDepositContract(t *testing.T) {
|
||||
@@ -715,3 +718,35 @@ func TestGetSpec_BlobSchedule_NotFulu(t *testing.T) {
|
||||
_, exists := data["BLOB_SCHEDULE"]
|
||||
require.Equal(t, false, exists)
|
||||
}
|
||||
|
||||
func TestConvertValueForJSON_NoErrorLogsForStrings(t *testing.T) {
|
||||
logHook := logTest.NewLocal(log.StandardLogger())
|
||||
defer logHook.Reset()
|
||||
|
||||
stringTestCases := []struct {
|
||||
tag string
|
||||
value string
|
||||
}{
|
||||
{"CONFIG_NAME", "mainnet"},
|
||||
{"PRESET_BASE", "mainnet"},
|
||||
{"DEPOSIT_CONTRACT_ADDRESS", "0x00000000219ab540356cBB839Cbe05303d7705Fa"},
|
||||
{"TERMINAL_TOTAL_DIFFICULTY", "58750000000000000000000"},
|
||||
}
|
||||
|
||||
for _, tc := range stringTestCases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
logHook.Reset()
|
||||
|
||||
// Convert the string value
|
||||
v := reflect.ValueOf(tc.value)
|
||||
result := convertValueForJSON(v, tc.tag)
|
||||
|
||||
// Verify the result is correct
|
||||
require.Equal(t, tc.value, result)
|
||||
|
||||
// Verify NO error was logged about unsupported field kind
|
||||
require.LogsDoNotContain(t, logHook, "Unsupported config field kind")
|
||||
require.LogsDoNotContain(t, logHook, "kind=string")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,13 @@ import (
|
||||
// it will be in charge of subscribing/unsubscribing the relevant topics at the fork boundaries.
|
||||
func (s *Service) forkWatcher() {
|
||||
<-s.initialSyncComplete
|
||||
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
|
||||
genesisTime := s.cfg.clock.GenesisTime()
|
||||
currentEpoch := slots.ToEpoch(slots.CurrentSlot(genesisTime))
|
||||
// Initialize registeredNetworkEntry to the current network schedule entry to avoid
|
||||
// duplicate subscriber registration on the first forkWatcher tick when the next
|
||||
// epoch has the same digest.
|
||||
s.registeredNetworkEntry = params.GetNetworkScheduleEntry(currentEpoch)
|
||||
slotTicker := slots.NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot)
|
||||
for {
|
||||
select {
|
||||
// In the event of a node restart, we will still end up subscribing to the correct
|
||||
|
||||
@@ -85,12 +85,14 @@ type subnetTracker struct {
|
||||
subscribeParameters
|
||||
mu sync.RWMutex
|
||||
subscriptions map[uint64]*pubsub.Subscription
|
||||
trackedTopics map[uint64]string // Maps subnet to full topic string for unsubscribing
|
||||
}
|
||||
|
||||
func newSubnetTracker(p subscribeParameters) *subnetTracker {
|
||||
return &subnetTracker{
|
||||
subscribeParameters: p,
|
||||
subscriptions: make(map[uint64]*pubsub.Subscription),
|
||||
trackedTopics: make(map[uint64]string),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,31 +119,84 @@ func (t *subnetTracker) missing(wanted map[uint64]bool) []uint64 {
|
||||
missing = append(missing, subnet)
|
||||
}
|
||||
}
|
||||
if len(missing) > 0 {
|
||||
log.WithFields(logrus.Fields{
|
||||
"wanted": len(wanted),
|
||||
"tracked": len(t.subscriptions),
|
||||
"missing": missing,
|
||||
"trackedSubnets": func() []uint64 {
|
||||
var tracked []uint64
|
||||
for k := range t.subscriptions {
|
||||
tracked = append(tracked, k)
|
||||
}
|
||||
return tracked
|
||||
}(),
|
||||
}).Debug("Subnet tracker missing analysis")
|
||||
}
|
||||
return missing
|
||||
}
|
||||
|
||||
// tryReserveSubnets atomically checks for missing subnets and reserves them for subscription.
|
||||
// Returns a list of subnets that were successfully reserved by this call.
|
||||
func (t *subnetTracker) tryReserveSubnets(wanted map[uint64]bool) []uint64 {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
var reserved []uint64
|
||||
for subnet := range wanted {
|
||||
if _, ok := t.subscriptions[subnet]; !ok {
|
||||
// Reserve by marking as nil (pending subscription)
|
||||
t.subscriptions[subnet] = nil
|
||||
reserved = append(reserved, subnet)
|
||||
}
|
||||
}
|
||||
return reserved
|
||||
}
|
||||
|
||||
// cancelReservation removes a reservation for a subnet that failed to subscribe.
|
||||
func (t *subnetTracker) cancelReservation(subnet uint64) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if sub, ok := t.subscriptions[subnet]; ok && sub == nil {
|
||||
delete(t.subscriptions, subnet)
|
||||
}
|
||||
}
|
||||
|
||||
// cancelSubscription cancels and removes the subscription for a given subnet.
|
||||
func (t *subnetTracker) cancelSubscription(subnet uint64) {
|
||||
func (t *subnetTracker) cancelSubscription(subnet uint64) string {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
defer delete(t.subscriptions, subnet)
|
||||
defer delete(t.trackedTopics, subnet)
|
||||
|
||||
topic := t.trackedTopics[subnet]
|
||||
sub := t.subscriptions[subnet]
|
||||
if sub == nil {
|
||||
return
|
||||
|
||||
// Debug: Check for inconsistent state
|
||||
if sub != nil && topic == "" {
|
||||
log.WithField("subnet", subnet).Warn("Found subscription but no tracked topic")
|
||||
} else if sub == nil && topic != "" {
|
||||
log.WithField("subnet", subnet).WithField("topic", topic).Debug("Found tracked topic but no subscription object")
|
||||
}
|
||||
sub.Cancel()
|
||||
|
||||
if sub != nil {
|
||||
sub.Cancel()
|
||||
}
|
||||
return topic
|
||||
}
|
||||
|
||||
// track asks subscriptionTracker to hold on to the subscription for a given subnet so
|
||||
// that we can remember that it is tracked and cancel its context when it's time to unsubscribe.
|
||||
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription) {
|
||||
if sub == nil {
|
||||
return
|
||||
}
|
||||
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription, expectedTopic string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.subscriptions[subnet] = sub
|
||||
// Use the topic from the subscription if available, otherwise use the expected one
|
||||
// (when sub is nil, it means the subscription already existed)
|
||||
if sub != nil {
|
||||
t.trackedTopics[subnet] = sub.Topic()
|
||||
} else {
|
||||
t.trackedTopics[subnet] = expectedTopic
|
||||
}
|
||||
}
|
||||
|
||||
// noopValidator is a no-op that only decodes the message, but does not check its contents.
|
||||
@@ -200,21 +255,27 @@ func (s *Service) spawn(f func()) {
|
||||
|
||||
// Register PubSub subscribers
|
||||
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
|
||||
})
|
||||
// Idempotent fixed-topic subscriptions: skip if already active.
|
||||
fixed := []struct {
|
||||
topicFormat string
|
||||
val wrappedVal
|
||||
handle subHandler
|
||||
}{
|
||||
{p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber},
|
||||
{p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber},
|
||||
{p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber},
|
||||
{p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber},
|
||||
{p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber},
|
||||
}
|
||||
for _, f := range fixed {
|
||||
full := s.addDigestToTopic(f.topicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
if s.subHandler.topicExists(full) {
|
||||
continue
|
||||
}
|
||||
s.spawn(func(fmt string, val wrappedVal, handle subHandler) func() {
|
||||
return func() { s.subscribe(fmt, val, handle, digest) }
|
||||
}(f.topicFormat, f.val, f.handle))
|
||||
}
|
||||
s.spawn(func() {
|
||||
s.subscribeWithParameters(subscribeParameters{
|
||||
topicFormat: p2p.AttestationSubnetTopicFormat,
|
||||
@@ -228,14 +289,20 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
|
||||
// New gossip topic in Altair
|
||||
if params.BeaconConfig().AltairForkEpoch <= epoch {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.SyncContributionAndProofSubnetTopicFormat,
|
||||
s.validateSyncContributionAndProof,
|
||||
s.syncContributionAndProofSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
// SyncContributionAndProof fixed topic
|
||||
{
|
||||
full := s.addDigestToTopic(p2p.SyncContributionAndProofSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
if !s.subHandler.topicExists(full) {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.SyncContributionAndProofSubnetTopicFormat,
|
||||
s.validateSyncContributionAndProof,
|
||||
s.syncContributionAndProofSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
s.spawn(func() {
|
||||
s.subscribeWithParameters(subscribeParameters{
|
||||
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
|
||||
@@ -247,35 +314,49 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
})
|
||||
|
||||
if features.Get().EnableLightClient {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.LightClientOptimisticUpdateTopicFormat,
|
||||
s.validateLightClientOptimisticUpdate,
|
||||
s.lightClientOptimisticUpdateSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.LightClientFinalityUpdateTopicFormat,
|
||||
s.validateLightClientFinalityUpdate,
|
||||
s.lightClientFinalityUpdateSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
// Light client fixed topics
|
||||
{
|
||||
full := s.addDigestToTopic(p2p.LightClientOptimisticUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
if !s.subHandler.topicExists(full) {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.LightClientOptimisticUpdateTopicFormat,
|
||||
s.validateLightClientOptimisticUpdate,
|
||||
s.lightClientOptimisticUpdateSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
{
|
||||
full := s.addDigestToTopic(p2p.LightClientFinalityUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
if !s.subHandler.topicExists(full) {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.LightClientFinalityUpdateTopicFormat,
|
||||
s.validateLightClientFinalityUpdate,
|
||||
s.lightClientFinalityUpdateSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// New gossip topic in Capella
|
||||
if params.BeaconConfig().CapellaForkEpoch <= epoch {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.BlsToExecutionChangeSubnetTopicFormat,
|
||||
s.validateBlsToExecutionChange,
|
||||
s.blsToExecutionChangeSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
full := s.addDigestToTopic(p2p.BlsToExecutionChangeSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
if !s.subHandler.topicExists(full) {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.BlsToExecutionChangeSubnetTopicFormat,
|
||||
s.validateBlsToExecutionChange,
|
||||
s.blsToExecutionChangeSubscriber,
|
||||
digest,
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// New gossip topic in Deneb, removed in Electra
|
||||
@@ -337,22 +418,32 @@ func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandle
|
||||
// Impossible condition as it would mean topic does not exist.
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
|
||||
}
|
||||
s.subscribeWithBase(s.addDigestToTopic(topic, digest), validator, handle)
|
||||
fullTopic := s.addDigestToTopic(topic, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
_ = s.subscribeWithBase(fullTopic, validator, handle)
|
||||
}
|
||||
|
||||
// subscribeWithBase subscribes to a topic that already includes the protocol suffix.
|
||||
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
|
||||
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
log := log.WithField("topic", topic)
|
||||
|
||||
// Do not resubscribe already seen subscriptions.
|
||||
ok := s.subHandler.topicExists(topic)
|
||||
if ok {
|
||||
// Fast-path: avoid attempting if already actively subscribed
|
||||
if s.subHandler.topicExists(topic) {
|
||||
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Atomically check if topic exists and reserve it for subscription if it doesn't
|
||||
if !s.subHandler.tryReserveTopic(topic) {
|
||||
// Another goroutine may have just subscribed or reserved it; report and skip
|
||||
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register validator after successfully reserving the topic
|
||||
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
|
||||
log.WithError(err).Error("Could not register validator for topic")
|
||||
// Cancel the reservation since validator registration failed
|
||||
s.subHandler.cancelReservation(topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -362,6 +453,8 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
||||
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
|
||||
// subscription filter.
|
||||
log.WithError(err).Error("Could not subscribe topic")
|
||||
// Cancel the reservation since subscription failed
|
||||
s.subHandler.cancelReservation(topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -504,8 +597,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
|
||||
// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
|
||||
func (s *Service) pruneSubscriptions(t *subnetTracker, wantedSubnets map[uint64]bool) {
|
||||
for _, subnet := range t.unwanted(wantedSubnets) {
|
||||
t.cancelSubscription(subnet)
|
||||
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
|
||||
topic := t.cancelSubscription(subnet)
|
||||
s.unSubscribeFromTopic(topic)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -530,10 +623,50 @@ func (s *Service) subscribeToSubnets(t *subnetTracker) error {
|
||||
|
||||
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
|
||||
s.pruneSubscriptions(t, subnetsToJoin)
|
||||
for _, subnet := range t.missing(subnetsToJoin) {
|
||||
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
|
||||
topic := t.fullTopic(subnet, "")
|
||||
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
|
||||
|
||||
// Filter out subnets already globally subscribed and mark them tracked to prevent re-reserve.
|
||||
// Work on a copy to avoid mutating the original map while iterating later code paths.
|
||||
wanted := make(map[uint64]bool, len(subnetsToJoin))
|
||||
for subnet := range subnetsToJoin {
|
||||
full := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
|
||||
if s.subHandler.topicExists(full) {
|
||||
t.track(subnet, nil, full)
|
||||
continue
|
||||
}
|
||||
wanted[subnet] = true
|
||||
}
|
||||
|
||||
// Atomically reserve subnets that still need subscription
|
||||
reservedSubnets := t.tryReserveSubnets(wanted)
|
||||
if len(reservedSubnets) > 0 {
|
||||
log.WithFields(logrus.Fields{
|
||||
"reservedSubnets": reservedSubnets,
|
||||
"totalWanted": len(subnetsToJoin),
|
||||
}).Debug("Reserved missing subnets for subscription")
|
||||
}
|
||||
for _, subnet := range reservedSubnets {
|
||||
fullTopic := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
|
||||
// Skip if already subscribed globally (e.g., another goroutine won the race).
|
||||
// Mark as tracked so we don't repeatedly reserve and skip on subsequent ticks.
|
||||
if s.subHandler.topicExists(fullTopic) {
|
||||
t.track(subnet, nil, fullTopic)
|
||||
continue
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"subnet": subnet,
|
||||
"topic": fullTopic,
|
||||
}).Debug("Attempting to subscribe to reserved subnet")
|
||||
sub := s.subscribeWithBase(fullTopic, t.validate, t.handle)
|
||||
if sub != nil {
|
||||
t.track(subnet, sub, fullTopic)
|
||||
} else {
|
||||
// Subscription failed, cancel the reservation
|
||||
t.cancelReservation(subnet)
|
||||
log.WithFields(logrus.Fields{
|
||||
"subnet": subnet,
|
||||
"topic": fullTopic,
|
||||
}).Debug("Subscription failed, canceled reservation")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -317,7 +317,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
|
||||
require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal))
|
||||
sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
|
||||
require.NoError(t, err)
|
||||
tracker.track(c1, sub1)
|
||||
tracker.track(c1, sub1, fullTopic)
|
||||
|
||||
// committee index 2
|
||||
c2 := uint64(2)
|
||||
@@ -327,7 +327,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
sub2, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
|
||||
require.NoError(t, err)
|
||||
tracker.track(c2, sub2)
|
||||
tracker.track(c2, sub2, fullTopic)
|
||||
|
||||
r.pruneSubscriptions(tracker, map[uint64]bool{c2: true})
|
||||
require.LogsDoNotContain(t, hook, "Could not unregister topic validator")
|
||||
|
||||
@@ -26,20 +26,31 @@ func newSubTopicHandler() *subTopicHandler {
|
||||
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
// Check if this topic was previously reserved (nil value means reserved)
|
||||
prevSub, wasReserved := s.subTopics[topic]
|
||||
s.subTopics[topic] = sub
|
||||
digest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
return
|
||||
}
|
||||
s.digestMap[digest] += 1
|
||||
// Only increment digest count if this is truly a new subscription:
|
||||
// - Topic didn't exist before (!wasReserved)
|
||||
// - Topic was reserved (nil) and now getting a real subscription
|
||||
if !wasReserved || prevSub == nil {
|
||||
s.digestMap[digest] += 1
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subTopicHandler) topicExists(topic string) bool {
|
||||
if s == nil {
|
||||
return false
|
||||
}
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
_, ok := s.subTopics[topic]
|
||||
return ok
|
||||
sub, ok := s.subTopics[topic]
|
||||
// Topic exists if it's in the map and has a real subscription (not just reserved)
|
||||
return ok && sub != nil
|
||||
}
|
||||
|
||||
func (s *subTopicHandler) removeTopic(topic string) {
|
||||
@@ -88,3 +99,34 @@ func (s *subTopicHandler) subForTopic(topic string) *pubsub.Subscription {
|
||||
defer s.RUnlock()
|
||||
return s.subTopics[topic]
|
||||
}
|
||||
|
||||
// tryReserveTopic atomically checks if a topic has an active subscription or reservation and reserves it if not.
|
||||
// Returns true if the topic was successfully reserved, false if it already has an active subscription or reservation.
|
||||
// If true is returned, the caller is responsible for calling addTopic with the actual subscription.
|
||||
func (s *subTopicHandler) tryReserveTopic(topic string) bool {
|
||||
if s == nil {
|
||||
return false
|
||||
}
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
_, exists := s.subTopics[topic]
|
||||
// Reject if topic already exists (either reserved with nil or has real subscription)
|
||||
if exists {
|
||||
return false
|
||||
}
|
||||
// Reserve the topic by adding a nil entry to prevent other goroutines from registering
|
||||
s.subTopics[topic] = nil
|
||||
return true
|
||||
}
|
||||
|
||||
// cancelReservation removes a topic reservation if the subscription failed.
|
||||
func (s *subTopicHandler) cancelReservation(topic string) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if sub, exists := s.subTopics[topic]; exists && sub == nil {
|
||||
delete(s.subTopics, topic)
|
||||
}
|
||||
}
|
||||
|
||||
3
changelog/james-prysm_fix-config-parsing.md
Normal file
3
changelog/james-prysm_fix-config-parsing.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Fixing Unsupported config field kind; value forwarded verbatim errors for type string.
|
||||
3
changelog/sahil-4555-refactor-to-reflect-typefor.md
Normal file
3
changelog/sahil-4555-refactor-to-reflect-typefor.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Replaced reflect.TypeOf with reflect.TypeFor
|
||||
@@ -320,6 +320,6 @@ func IsProto(item interface{}) bool {
|
||||
return ok
|
||||
}
|
||||
elemTyp := typ.Elem()
|
||||
modelType := reflect.TypeOf((*proto.Message)(nil)).Elem()
|
||||
modelType := reflect.TypeFor[proto.Message]()
|
||||
return elemTyp.Implements(modelType)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user