mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-30 23:58:23 -05:00
Compare commits
6 Commits
e2e-debugg
...
topic-bug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c603321733 | ||
|
|
8f816d6f49 | ||
|
|
0a8603268b | ||
|
|
6fd2d5f268 | ||
|
|
8056f55522 | ||
|
|
9ab42d18da |
@@ -196,6 +196,7 @@ go_test(
|
|||||||
"subscriber_beacon_aggregate_proof_test.go",
|
"subscriber_beacon_aggregate_proof_test.go",
|
||||||
"subscriber_beacon_blocks_test.go",
|
"subscriber_beacon_blocks_test.go",
|
||||||
"subscriber_data_column_sidecar_test.go",
|
"subscriber_data_column_sidecar_test.go",
|
||||||
|
"subscriber_race_test.go",
|
||||||
"subscriber_test.go",
|
"subscriber_test.go",
|
||||||
"subscription_topic_handler_test.go",
|
"subscription_topic_handler_test.go",
|
||||||
"sync_fuzz_test.go",
|
"sync_fuzz_test.go",
|
||||||
|
|||||||
@@ -416,6 +416,11 @@ func (s *Service) startDiscoveryAndSubscriptions() {
|
|||||||
// Register respective pubsub handlers at state synced event.
|
// Register respective pubsub handlers at state synced event.
|
||||||
s.registerSubscribers(currentEpoch, forkDigest)
|
s.registerSubscribers(currentEpoch, forkDigest)
|
||||||
|
|
||||||
|
// Initialize registeredNetworkEntry to the current network schedule entry to avoid
|
||||||
|
// duplicate subscriber registration on the first forkWatcher tick when the next
|
||||||
|
// epoch has the same digest.
|
||||||
|
s.registeredNetworkEntry = params.GetNetworkScheduleEntry(currentEpoch)
|
||||||
|
|
||||||
// Start the fork watcher.
|
// Start the fork watcher.
|
||||||
go s.forkWatcher()
|
go s.forkWatcher()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -344,15 +344,23 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
|||||||
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
|
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||||
log := log.WithField("topic", topic)
|
log := log.WithField("topic", topic)
|
||||||
|
|
||||||
// Do not resubscribe already seen subscriptions.
|
// 1) Fast-path bail if it already exists.
|
||||||
ok := s.subHandler.topicExists(topic)
|
if s.subHandler.topicExists(topic) {
|
||||||
if ok {
|
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2) Otherwise, atomically reserve to block concurrent goroutines.
|
||||||
|
if !s.subHandler.tryReserveTopic(topic) {
|
||||||
|
// Someone else reserved first.
|
||||||
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
|
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
|
||||||
log.WithError(err).Error("Could not register validator for topic")
|
log.WithError(err).Error("Could not register validator for topic")
|
||||||
|
// Clean up the reservation since we're not proceeding
|
||||||
|
s.subHandler.removeTopic(topic)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -362,9 +370,12 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
|||||||
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
|
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
|
||||||
// subscription filter.
|
// subscription filter.
|
||||||
log.WithError(err).Error("Could not subscribe topic")
|
log.WithError(err).Error("Could not subscribe topic")
|
||||||
|
// Clean up the reservation since we're not proceeding
|
||||||
|
s.subHandler.removeTopic(topic)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the reservation with the actual subscription
|
||||||
s.subHandler.addTopic(sub.Topic(), sub)
|
s.subHandler.addTopic(sub.Topic(), sub)
|
||||||
|
|
||||||
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
|
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
|
||||||
@@ -414,6 +425,8 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
|||||||
// Cancel subscription in the event of an error, as we are
|
// Cancel subscription in the event of an error, as we are
|
||||||
// now exiting topic event loop.
|
// now exiting topic event loop.
|
||||||
sub.Cancel()
|
sub.Cancel()
|
||||||
|
// Remove topic from our tracking to allow resubscription.
|
||||||
|
s.subHandler.removeTopic(topic)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -533,7 +546,15 @@ func (s *Service) subscribeToSubnets(t *subnetTracker) error {
|
|||||||
for _, subnet := range t.missing(subnetsToJoin) {
|
for _, subnet := range t.missing(subnetsToJoin) {
|
||||||
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
|
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
|
||||||
topic := t.fullTopic(subnet, "")
|
topic := t.fullTopic(subnet, "")
|
||||||
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
|
sub := s.subscribeWithBase(topic, t.validate, t.handle)
|
||||||
|
// Even if sub is nil (topic already exists), we need to track the subnet
|
||||||
|
// to avoid repeated subscription attempts every slot.
|
||||||
|
if sub == nil {
|
||||||
|
// Topic already exists, get the existing subscription for tracking
|
||||||
|
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||||
|
sub = s.subHandler.subForTopic(fullTopic)
|
||||||
|
}
|
||||||
|
t.track(subnet, sub)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
347
beacon-chain/sync/subscriber_race_test.go
Normal file
347
beacon-chain/sync/subscriber_race_test.go
Normal file
@@ -0,0 +1,347 @@
|
|||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/OffchainLabs/prysm/v6/async/abool"
|
||||||
|
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
|
||||||
|
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||||
|
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestSubscriptionCleanup_MissingRemoveTopic tests the following bug:
|
||||||
|
// When a subscription's message loop fails and sub.Cancel() is called,
|
||||||
|
// removeTopic() is NOT called, leaving stale entries in subTopics map.
|
||||||
|
// This likely causes memory leaks and prevents resubscription (missed attestations).
|
||||||
|
func TestSubscriptionCleanup_MissingRemoveTopic(t *testing.T) {
|
||||||
|
t.Run("memory leak with repeated failures", func(t *testing.T) {
|
||||||
|
// This test verifies that removeTopic() is called when subscription fails
|
||||||
|
// Fresh setup for this subtest
|
||||||
|
p2pService := p2ptest.NewTestP2P(t)
|
||||||
|
gt := time.Now()
|
||||||
|
vr := [32]byte{'A'}
|
||||||
|
|
||||||
|
r := &Service{
|
||||||
|
ctx: context.Background(),
|
||||||
|
cfg: &config{
|
||||||
|
p2p: p2pService,
|
||||||
|
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||||
|
chain: &mockChain.ChainService{
|
||||||
|
ValidatorsRoot: vr,
|
||||||
|
Genesis: gt,
|
||||||
|
},
|
||||||
|
clock: startup.NewClock(gt, vr),
|
||||||
|
},
|
||||||
|
subHandler: newSubTopicHandler(),
|
||||||
|
chainStarted: abool.New(),
|
||||||
|
}
|
||||||
|
markInitSyncComplete(t, r)
|
||||||
|
|
||||||
|
digest, err := r.currentForkDigest()
|
||||||
|
require.NoError(t, err)
|
||||||
|
p2pService.Digest = digest
|
||||||
|
|
||||||
|
getMapSize := func() int {
|
||||||
|
r.subHandler.RLock()
|
||||||
|
defer r.subHandler.RUnlock()
|
||||||
|
return len(r.subHandler.subTopics)
|
||||||
|
}
|
||||||
|
|
||||||
|
baseTopic := "/eth2/%x/voluntary_exit"
|
||||||
|
|
||||||
|
// Do one cycle: subscribe, cancel, check cleanup
|
||||||
|
iterCtx, iterCancel := context.WithCancel(context.Background())
|
||||||
|
r.ctx = iterCtx
|
||||||
|
|
||||||
|
handler := func(ctx context.Context, msg proto.Message) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r.markForChainStart()
|
||||||
|
|
||||||
|
// Subscribe
|
||||||
|
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||||
|
require.NotNil(t, sub, "First subscription should succeed")
|
||||||
|
|
||||||
|
// Verify subscribed
|
||||||
|
sizeAfterSubscribe := getMapSize()
|
||||||
|
require.Equal(t, 1, sizeAfterSubscribe, "Should have 1 entry after subscribe")
|
||||||
|
|
||||||
|
// Cancel to simulate failure
|
||||||
|
iterCancel()
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
|
||||||
|
// Check cleanup happened - this is the core fix verification
|
||||||
|
sizeAfterCancel := getMapSize()
|
||||||
|
if sizeAfterCancel != 0 {
|
||||||
|
t.Errorf("After context cancellation, subTopics has %d entries (expected 0). "+
|
||||||
|
"removeTopic() should have been called at line 420.",
|
||||||
|
sizeAfterCancel)
|
||||||
|
} else {
|
||||||
|
t.Logf("SUCCESS: Cleanup working correctly - map size is 0 after cancellation")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestConcurrentSubscription_RaceCondition tests the following bug:
|
||||||
|
// Multiple goroutines can pass topicExists() check simultaneously
|
||||||
|
// before any calls addTopic(), causing duplicate subscriptions.
|
||||||
|
func TestConcurrentSubscription_RaceCondition(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
numGoroutines int
|
||||||
|
iterations int
|
||||||
|
useBarrier bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "two concurrent",
|
||||||
|
numGoroutines: 2,
|
||||||
|
iterations: 20,
|
||||||
|
useBarrier: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "five concurrent",
|
||||||
|
numGoroutines: 5,
|
||||||
|
iterations: 15,
|
||||||
|
useBarrier: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
duplicateDetected := 0
|
||||||
|
|
||||||
|
for iter := 0; iter < tt.iterations; iter++ {
|
||||||
|
// Fresh setup for each iteration
|
||||||
|
p2pService := p2ptest.NewTestP2P(t)
|
||||||
|
gt := time.Now()
|
||||||
|
vr := [32]byte{'A'}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
|
||||||
|
r := &Service{
|
||||||
|
ctx: ctx,
|
||||||
|
cfg: &config{
|
||||||
|
p2p: p2pService,
|
||||||
|
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||||
|
chain: &mockChain.ChainService{
|
||||||
|
ValidatorsRoot: vr,
|
||||||
|
Genesis: gt,
|
||||||
|
},
|
||||||
|
clock: startup.NewClock(gt, vr),
|
||||||
|
},
|
||||||
|
subHandler: newSubTopicHandler(),
|
||||||
|
chainStarted: abool.New(),
|
||||||
|
}
|
||||||
|
markInitSyncComplete(t, r)
|
||||||
|
|
||||||
|
digest, err := r.currentForkDigest()
|
||||||
|
require.NoError(t, err)
|
||||||
|
p2pService.Digest = digest
|
||||||
|
|
||||||
|
baseTopic := "/eth2/%x/voluntary_exit"
|
||||||
|
|
||||||
|
r.markForChainStart()
|
||||||
|
|
||||||
|
// Track successful subscriptions
|
||||||
|
successfulSubs := atomic.Int32{}
|
||||||
|
checksPassed := atomic.Int32{}
|
||||||
|
|
||||||
|
// Barrier to synchronize goroutine starts
|
||||||
|
var barrier sync.WaitGroup
|
||||||
|
if tt.useBarrier {
|
||||||
|
barrier.Add(tt.numGoroutines)
|
||||||
|
}
|
||||||
|
startSignal := make(chan struct{})
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Launch concurrent subscription attempts
|
||||||
|
for i := 0; i < tt.numGoroutines; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
if tt.useBarrier {
|
||||||
|
barrier.Done()
|
||||||
|
barrier.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
<-startSignal
|
||||||
|
|
||||||
|
// Attempt subscription
|
||||||
|
// ideally only one goroutine should get a non-nil subscription
|
||||||
|
handler := func(ctx context.Context, msg proto.Message) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||||
|
if sub != nil {
|
||||||
|
successfulSubs.Add(1)
|
||||||
|
}
|
||||||
|
// Count how many goroutines attempted (for stats)
|
||||||
|
checksPassed.Add(1)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutines to be ready
|
||||||
|
if tt.useBarrier {
|
||||||
|
barrier.Wait()
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// Start all goroutines simultaneously
|
||||||
|
close(startSignal)
|
||||||
|
|
||||||
|
// Wait for completion
|
||||||
|
wg.Wait()
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Check results
|
||||||
|
subs := successfulSubs.Load()
|
||||||
|
attempts := checksPassed.Load()
|
||||||
|
|
||||||
|
r.subHandler.RLock()
|
||||||
|
finalMapSize := len(r.subHandler.subTopics)
|
||||||
|
r.subHandler.RUnlock()
|
||||||
|
|
||||||
|
// ideally only ONE goroutine should successfully subscribe
|
||||||
|
// If more than one succeeds, a race condition exists
|
||||||
|
if subs > 1 {
|
||||||
|
duplicateDetected++
|
||||||
|
t.Logf("Iteration %d: RACE DETECTED - %d goroutines attempted, "+
|
||||||
|
"%d successful subscriptions (expected 1), final map size: %d",
|
||||||
|
iter, attempts, subs, finalMapSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The map should have exactly 0 or 1 entry
|
||||||
|
if finalMapSize > 1 {
|
||||||
|
t.Errorf("Iteration %d: INCONSISTENT STATE - map has %d entries (expected 0-1). "+
|
||||||
|
"This indicates multiple goroutines subscribed concurrently.",
|
||||||
|
iter, finalMapSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
cancel()
|
||||||
|
r.subHandler.Lock()
|
||||||
|
for topic := range r.subHandler.subTopics {
|
||||||
|
sub := r.subHandler.subTopics[topic]
|
||||||
|
if sub != nil {
|
||||||
|
sub.Cancel()
|
||||||
|
}
|
||||||
|
delete(r.subHandler.subTopics, topic)
|
||||||
|
}
|
||||||
|
r.subHandler.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if duplicateDetected > 0 {
|
||||||
|
racePercentage := float64(duplicateDetected) / float64(tt.iterations) * 100
|
||||||
|
t.Errorf("RACE CONDITION EXISTS in %d/%d iterations (%.1f%%). "+
|
||||||
|
"Multiple goroutines successfully subscribed (only 1 expected). ",
|
||||||
|
duplicateDetected, tt.iterations, racePercentage)
|
||||||
|
} else {
|
||||||
|
t.Logf("SUCCESS: No Race condition! Only 1 subscription succeeded in all %d iterations", tt.iterations)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMemoryGrowth_SubscriptionFailures demonstrates memory growth over time
|
||||||
|
func TestMemoryGrowth_SubscriptionFailures(t *testing.T) {
|
||||||
|
p2pService := p2ptest.NewTestP2P(t)
|
||||||
|
gt := time.Now()
|
||||||
|
vr := [32]byte{'A'}
|
||||||
|
|
||||||
|
r := &Service{
|
||||||
|
ctx: context.Background(),
|
||||||
|
cfg: &config{
|
||||||
|
p2p: p2pService,
|
||||||
|
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||||
|
chain: &mockChain.ChainService{
|
||||||
|
ValidatorsRoot: vr,
|
||||||
|
Genesis: gt,
|
||||||
|
},
|
||||||
|
clock: startup.NewClock(gt, vr),
|
||||||
|
},
|
||||||
|
subHandler: newSubTopicHandler(),
|
||||||
|
chainStarted: abool.New(),
|
||||||
|
}
|
||||||
|
markInitSyncComplete(t, r)
|
||||||
|
|
||||||
|
digest, err := r.currentForkDigest()
|
||||||
|
require.NoError(t, err)
|
||||||
|
p2pService.Digest = digest
|
||||||
|
|
||||||
|
baseTopic := "/eth2/%x/voluntary_exit"
|
||||||
|
|
||||||
|
getMapSize := func() int {
|
||||||
|
r.subHandler.RLock()
|
||||||
|
defer r.subHandler.RUnlock()
|
||||||
|
return len(r.subHandler.subTopics)
|
||||||
|
}
|
||||||
|
|
||||||
|
failures := 50
|
||||||
|
var memStats runtime.MemStats
|
||||||
|
|
||||||
|
runtime.ReadMemStats(&memStats)
|
||||||
|
startAlloc := memStats.Alloc
|
||||||
|
|
||||||
|
for i := 0; i < failures; i++ {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
r.ctx = ctx
|
||||||
|
r.markForChainStart()
|
||||||
|
|
||||||
|
handler := func(ctx context.Context, msg proto.Message) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||||
|
if sub != nil {
|
||||||
|
// Cancel immediately to simulate failure
|
||||||
|
cancel()
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if i%10 == 0 {
|
||||||
|
runtime.ReadMemStats(&memStats)
|
||||||
|
currentAlloc := memStats.Alloc
|
||||||
|
growth := currentAlloc - startAlloc
|
||||||
|
t.Logf("After %d failures: subTopics size=%d, heap growth=%d KB",
|
||||||
|
i, getMapSize(), growth/1024)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
finalSize := getMapSize()
|
||||||
|
runtime.ReadMemStats(&memStats)
|
||||||
|
finalAlloc := memStats.Alloc
|
||||||
|
|
||||||
|
t.Logf("Final results: %d subscription failures", failures)
|
||||||
|
t.Logf(" subTopics map size: %d entries", finalSize)
|
||||||
|
t.Logf(" Start heap: %d KB, Final heap: %d KB", startAlloc/1024, finalAlloc/1024)
|
||||||
|
|
||||||
|
// With the bug, even one stale entry is a problem because it prevents resubscription
|
||||||
|
if finalSize > 0 {
|
||||||
|
t.Errorf("MEMORY LEAK / STALE ENTRY: After %d failures, %d stale entries remain in subTopics map (expected 0). "+
|
||||||
|
"Even 1 stale entry prevents resubscription, causing missed attestations in production.",
|
||||||
|
failures, finalSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if heap grew significantly (handle wraparound by checking if finalAlloc >= startAlloc)
|
||||||
|
if finalAlloc >= startAlloc {
|
||||||
|
totalGrowth := finalAlloc - startAlloc
|
||||||
|
if totalGrowth > 50*1024 { // 50 KB threshold
|
||||||
|
t.Logf("NOTE: Heap grew by %d KB over %d failures. ",
|
||||||
|
totalGrowth/1024, failures)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
t.Logf("NOTE: Heap decreased (GC ran), cannot measure growth accurately")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -26,13 +26,22 @@ func newSubTopicHandler() *subTopicHandler {
|
|||||||
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
|
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
|
// Check if this is updating a reserved entry (nil subscription)
|
||||||
|
existingSub, exists := s.subTopics[topic]
|
||||||
|
wasReserved := exists && existingSub == nil
|
||||||
|
|
||||||
s.subTopics[topic] = sub
|
s.subTopics[topic] = sub
|
||||||
digest, err := p2p.ExtractGossipDigest(topic)
|
digest, err := p2p.ExtractGossipDigest(topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Could not retrieve digest")
|
log.WithError(err).Error("Could not retrieve digest")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.digestMap[digest] += 1
|
|
||||||
|
// Only increment digest count if this is a new topic (not just updating a reservation)
|
||||||
|
if !wasReserved {
|
||||||
|
s.digestMap[digest] += 1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subTopicHandler) topicExists(topic string) bool {
|
func (s *subTopicHandler) topicExists(topic string) bool {
|
||||||
@@ -42,25 +51,57 @@ func (s *subTopicHandler) topicExists(topic string) bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tryReserveTopic atomically checks if a topic exists and reserves it if not.
|
||||||
|
// Returns true if the topic was successfully reserved (didn't exist before),
|
||||||
|
// false if the topic already exists or is reserved.
|
||||||
|
// This prevents the race condition where multiple goroutines check topicExists()
|
||||||
|
// simultaneously and both proceed to subscribe.
|
||||||
|
func (s *subTopicHandler) tryReserveTopic(topic string) bool {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
|
||||||
|
// Check if topic already exists or is reserved
|
||||||
|
if _, exists := s.subTopics[topic]; exists {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reserve the topic with a nil placeholder
|
||||||
|
// This will be updated with the actual subscription later
|
||||||
|
s.subTopics[topic] = nil
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subTopicHandler) removeTopic(topic string) {
|
func (s *subTopicHandler) removeTopic(topic string) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
|
// Check if topic exists and whether it was just a reservation (nil)
|
||||||
|
existingSub, exists := s.subTopics[topic]
|
||||||
|
if !exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wasReserved := existingSub == nil
|
||||||
|
|
||||||
delete(s.subTopics, topic)
|
delete(s.subTopics, topic)
|
||||||
digest, err := p2p.ExtractGossipDigest(topic)
|
|
||||||
if err != nil {
|
// Only decrement digest count if this wasn't just a reservation
|
||||||
log.WithError(err).Error("Could not retrieve digest")
|
if !wasReserved {
|
||||||
return
|
digest, err := p2p.ExtractGossipDigest(topic)
|
||||||
}
|
if err != nil {
|
||||||
currAmt, ok := s.digestMap[digest]
|
log.WithError(err).Error("Could not retrieve digest")
|
||||||
// Should never be possible, is a
|
return
|
||||||
// defensive check.
|
}
|
||||||
if !ok || currAmt <= 0 {
|
currAmt, ok := s.digestMap[digest]
|
||||||
delete(s.digestMap, digest)
|
// Should never be possible, is a
|
||||||
return
|
// defensive check.
|
||||||
}
|
if !ok || currAmt <= 0 {
|
||||||
s.digestMap[digest] -= 1
|
delete(s.digestMap, digest)
|
||||||
if s.digestMap[digest] == 0 {
|
return
|
||||||
delete(s.digestMap, digest)
|
}
|
||||||
|
s.digestMap[digest] -= 1
|
||||||
|
if s.digestMap[digest] == 0 {
|
||||||
|
delete(s.digestMap, digest)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user