mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-11 14:28:09 -05:00
Compare commits
7 Commits
topic-bug
...
agent-stri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e7d4cefa8 | ||
|
|
28a6138b86 | ||
|
|
ebbf9f0efc | ||
|
|
69b0f7d734 | ||
|
|
1432867c92 | ||
|
|
18efd620dc | ||
|
|
6139d58fa5 |
1
.bazelrc
1
.bazelrc
@@ -34,6 +34,7 @@ build:minimal --@io_bazel_rules_go//go/config:tags=minimal
|
||||
build:release --compilation_mode=opt
|
||||
build:release --stamp
|
||||
build:release --define pgo_enabled=1
|
||||
build:release --strip=always
|
||||
|
||||
# Build binary with cgo symbolizer for debugging / profiling.
|
||||
build:cgo_symbolizer --copt=-g
|
||||
|
||||
2
.github/workflows/changelog.yml
vendored
2
.github/workflows/changelog.yml
vendored
@@ -9,7 +9,7 @@ on:
|
||||
|
||||
jobs:
|
||||
run-changelog-check:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
steps:
|
||||
- name: Checkout source code
|
||||
uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
|
||||
|
||||
2
.github/workflows/check-specrefs.yml
vendored
2
.github/workflows/check-specrefs.yml
vendored
@@ -3,7 +3,7 @@ on: [push, pull_request]
|
||||
|
||||
jobs:
|
||||
check-specrefs:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
|
||||
2
.github/workflows/clang-format.yml
vendored
2
.github/workflows/clang-format.yml
vendored
@@ -10,7 +10,7 @@ on:
|
||||
|
||||
jobs:
|
||||
clang-format-checking:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
# Is this step failing for you?
|
||||
|
||||
4
.github/workflows/fuzz.yml
vendored
4
.github/workflows/fuzz.yml
vendored
@@ -10,7 +10,7 @@ permissions:
|
||||
|
||||
jobs:
|
||||
list:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
timeout-minutes: 180
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@@ -25,7 +25,7 @@ jobs:
|
||||
fuzz-tests: ${{steps.list.outputs.fuzz-tests}}
|
||||
|
||||
fuzz:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
timeout-minutes: 360
|
||||
needs: list
|
||||
strategy:
|
||||
|
||||
8
.github/workflows/go.yml
vendored
8
.github/workflows/go.yml
vendored
@@ -11,7 +11,7 @@ on:
|
||||
jobs:
|
||||
formatting:
|
||||
name: Formatting
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -22,7 +22,7 @@ jobs:
|
||||
|
||||
gosec:
|
||||
name: Gosec scan
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
env:
|
||||
GO111MODULE: on
|
||||
steps:
|
||||
@@ -40,7 +40,7 @@ jobs:
|
||||
|
||||
lint:
|
||||
name: Lint
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -59,7 +59,7 @@ jobs:
|
||||
|
||||
build:
|
||||
name: Build
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
steps:
|
||||
- name: Set up Go 1.25.1
|
||||
uses: actions/setup-go@v4
|
||||
|
||||
4
.github/workflows/horusec.yaml
vendored
4
.github/workflows/horusec.yaml
vendored
@@ -8,7 +8,7 @@ on:
|
||||
jobs:
|
||||
Horusec_Scan:
|
||||
name: horusec-Scan
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-4
|
||||
if: github.ref == 'refs/heads/develop'
|
||||
steps:
|
||||
- name: Check out code
|
||||
@@ -19,4 +19,4 @@ jobs:
|
||||
- name: Running Security Scan
|
||||
run: |
|
||||
curl -fsSL https://raw.githubusercontent.com/ZupIT/horusec/main/deployments/scripts/install.sh | bash -s latest
|
||||
horusec start -t="10000" -p="./" -e="true" -i="**/crypto/bls/herumi/**, **/**/*_test.go, **/third_party/afl/**, **/crypto/keystore/key.go"
|
||||
horusec start -t="10000" -p="./" -e="true" -i="**/crypto/bls/herumi/**, **/**/*_test.go, **/third_party/afl/**, **/crypto/keystore/key.go"
|
||||
|
||||
@@ -130,6 +130,7 @@ func (s *Service) logCheckSubscribableError(pid peer.ID) func(string) bool {
|
||||
log.WithError(err).WithFields(logrus.Fields{
|
||||
"peerID": pid,
|
||||
"topic": topic,
|
||||
"agent": agentString(pid, s.Host()),
|
||||
}).Debug("Peer subscription rejected")
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -27,5 +27,7 @@ go_test(
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -132,6 +132,10 @@ func convertValueForJSON(v reflect.Value, tag string) interface{} {
|
||||
}
|
||||
return m
|
||||
|
||||
// ===== String =====
|
||||
case reflect.String:
|
||||
return v.String()
|
||||
|
||||
// ===== Default =====
|
||||
default:
|
||||
log.WithFields(log.Fields{
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api/server/structs"
|
||||
@@ -17,6 +18,8 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
log "github.com/sirupsen/logrus"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestGetDepositContract(t *testing.T) {
|
||||
@@ -715,3 +718,35 @@ func TestGetSpec_BlobSchedule_NotFulu(t *testing.T) {
|
||||
_, exists := data["BLOB_SCHEDULE"]
|
||||
require.Equal(t, false, exists)
|
||||
}
|
||||
|
||||
func TestConvertValueForJSON_NoErrorLogsForStrings(t *testing.T) {
|
||||
logHook := logTest.NewLocal(log.StandardLogger())
|
||||
defer logHook.Reset()
|
||||
|
||||
stringTestCases := []struct {
|
||||
tag string
|
||||
value string
|
||||
}{
|
||||
{"CONFIG_NAME", "mainnet"},
|
||||
{"PRESET_BASE", "mainnet"},
|
||||
{"DEPOSIT_CONTRACT_ADDRESS", "0x00000000219ab540356cBB839Cbe05303d7705Fa"},
|
||||
{"TERMINAL_TOTAL_DIFFICULTY", "58750000000000000000000"},
|
||||
}
|
||||
|
||||
for _, tc := range stringTestCases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
logHook.Reset()
|
||||
|
||||
// Convert the string value
|
||||
v := reflect.ValueOf(tc.value)
|
||||
result := convertValueForJSON(v, tc.tag)
|
||||
|
||||
// Verify the result is correct
|
||||
require.Equal(t, tc.value, result)
|
||||
|
||||
// Verify NO error was logged about unsupported field kind
|
||||
require.LogsDoNotContain(t, logHook, "Unsupported config field kind")
|
||||
require.LogsDoNotContain(t, logHook, "kind=string")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +196,6 @@ go_test(
|
||||
"subscriber_beacon_aggregate_proof_test.go",
|
||||
"subscriber_beacon_blocks_test.go",
|
||||
"subscriber_data_column_sidecar_test.go",
|
||||
"subscriber_race_test.go",
|
||||
"subscriber_test.go",
|
||||
"subscription_topic_handler_test.go",
|
||||
"sync_fuzz_test.go",
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
@@ -12,7 +14,6 @@ import (
|
||||
// Is a background routine that observes for new incoming forks. Depending on the epoch
|
||||
// it will be in charge of subscribing/unsubscribing the relevant topics at the fork boundaries.
|
||||
func (s *Service) forkWatcher() {
|
||||
<-s.initialSyncComplete
|
||||
slotTicker := slots.NewSlotTicker(s.cfg.clock.GenesisTime(), params.BeaconConfig().SecondsPerSlot)
|
||||
for {
|
||||
select {
|
||||
@@ -46,6 +47,7 @@ func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error {
|
||||
}
|
||||
|
||||
if s.subHandler.digestExists(nextEntry.ForkDigest) {
|
||||
log.WithField("digest", fmt.Sprintf("%#x", nextEntry.ForkDigest)).Debug("Already subscribed to fork")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -413,14 +413,11 @@ func (s *Service) startDiscoveryAndSubscriptions() {
|
||||
return
|
||||
}
|
||||
|
||||
s.registeredNetworkEntry = params.GetNetworkScheduleEntry(currentEpoch)
|
||||
|
||||
// Register respective pubsub handlers at state synced event.
|
||||
s.registerSubscribers(currentEpoch, forkDigest)
|
||||
|
||||
// Initialize registeredNetworkEntry to the current network schedule entry to avoid
|
||||
// duplicate subscriber registration on the first forkWatcher tick when the next
|
||||
// epoch has the same digest.
|
||||
s.registeredNetworkEntry = params.GetNetworkScheduleEntry(currentEpoch)
|
||||
|
||||
// Start the fork watcher.
|
||||
go s.forkWatcher()
|
||||
}
|
||||
|
||||
@@ -344,23 +344,15 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
||||
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
log := log.WithField("topic", topic)
|
||||
|
||||
// 1) Fast-path bail if it already exists.
|
||||
if s.subHandler.topicExists(topic) {
|
||||
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2) Otherwise, atomically reserve to block concurrent goroutines.
|
||||
if !s.subHandler.tryReserveTopic(topic) {
|
||||
// Someone else reserved first.
|
||||
// Do not resubscribe already seen subscriptions.
|
||||
ok := s.subHandler.topicExists(topic)
|
||||
if ok {
|
||||
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
|
||||
log.WithError(err).Error("Could not register validator for topic")
|
||||
// Clean up the reservation since we're not proceeding
|
||||
s.subHandler.removeTopic(topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -370,12 +362,9 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
||||
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
|
||||
// subscription filter.
|
||||
log.WithError(err).Error("Could not subscribe topic")
|
||||
// Clean up the reservation since we're not proceeding
|
||||
s.subHandler.removeTopic(topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the reservation with the actual subscription
|
||||
s.subHandler.addTopic(sub.Topic(), sub)
|
||||
|
||||
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
|
||||
@@ -425,8 +414,6 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
||||
// Cancel subscription in the event of an error, as we are
|
||||
// now exiting topic event loop.
|
||||
sub.Cancel()
|
||||
// Remove topic from our tracking to allow resubscription.
|
||||
s.subHandler.removeTopic(topic)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -546,15 +533,7 @@ func (s *Service) subscribeToSubnets(t *subnetTracker) error {
|
||||
for _, subnet := range t.missing(subnetsToJoin) {
|
||||
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
|
||||
topic := t.fullTopic(subnet, "")
|
||||
sub := s.subscribeWithBase(topic, t.validate, t.handle)
|
||||
// Even if sub is nil (topic already exists), we need to track the subnet
|
||||
// to avoid repeated subscription attempts every slot.
|
||||
if sub == nil {
|
||||
// Topic already exists, get the existing subscription for tracking
|
||||
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
sub = s.subHandler.subForTopic(fullTopic)
|
||||
}
|
||||
t.track(subnet, sub)
|
||||
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -1,347 +0,0 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/async/abool"
|
||||
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// TestSubscriptionCleanup_MissingRemoveTopic tests the following bug:
|
||||
// When a subscription's message loop fails and sub.Cancel() is called,
|
||||
// removeTopic() is NOT called, leaving stale entries in subTopics map.
|
||||
// This likely causes memory leaks and prevents resubscription (missed attestations).
|
||||
func TestSubscriptionCleanup_MissingRemoveTopic(t *testing.T) {
|
||||
t.Run("memory leak with repeated failures", func(t *testing.T) {
|
||||
// This test verifies that removeTopic() is called when subscription fails
|
||||
// Fresh setup for this subtest
|
||||
p2pService := p2ptest.NewTestP2P(t)
|
||||
gt := time.Now()
|
||||
vr := [32]byte{'A'}
|
||||
|
||||
r := &Service{
|
||||
ctx: context.Background(),
|
||||
cfg: &config{
|
||||
p2p: p2pService,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
chain: &mockChain.ChainService{
|
||||
ValidatorsRoot: vr,
|
||||
Genesis: gt,
|
||||
},
|
||||
clock: startup.NewClock(gt, vr),
|
||||
},
|
||||
subHandler: newSubTopicHandler(),
|
||||
chainStarted: abool.New(),
|
||||
}
|
||||
markInitSyncComplete(t, r)
|
||||
|
||||
digest, err := r.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
p2pService.Digest = digest
|
||||
|
||||
getMapSize := func() int {
|
||||
r.subHandler.RLock()
|
||||
defer r.subHandler.RUnlock()
|
||||
return len(r.subHandler.subTopics)
|
||||
}
|
||||
|
||||
baseTopic := "/eth2/%x/voluntary_exit"
|
||||
|
||||
// Do one cycle: subscribe, cancel, check cleanup
|
||||
iterCtx, iterCancel := context.WithCancel(context.Background())
|
||||
r.ctx = iterCtx
|
||||
|
||||
handler := func(ctx context.Context, msg proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
r.markForChainStart()
|
||||
|
||||
// Subscribe
|
||||
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||
require.NotNil(t, sub, "First subscription should succeed")
|
||||
|
||||
// Verify subscribed
|
||||
sizeAfterSubscribe := getMapSize()
|
||||
require.Equal(t, 1, sizeAfterSubscribe, "Should have 1 entry after subscribe")
|
||||
|
||||
// Cancel to simulate failure
|
||||
iterCancel()
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Check cleanup happened - this is the core fix verification
|
||||
sizeAfterCancel := getMapSize()
|
||||
if sizeAfterCancel != 0 {
|
||||
t.Errorf("After context cancellation, subTopics has %d entries (expected 0). "+
|
||||
"removeTopic() should have been called at line 420.",
|
||||
sizeAfterCancel)
|
||||
} else {
|
||||
t.Logf("SUCCESS: Cleanup working correctly - map size is 0 after cancellation")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestConcurrentSubscription_RaceCondition tests the following bug:
|
||||
// Multiple goroutines can pass topicExists() check simultaneously
|
||||
// before any calls addTopic(), causing duplicate subscriptions.
|
||||
func TestConcurrentSubscription_RaceCondition(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
numGoroutines int
|
||||
iterations int
|
||||
useBarrier bool
|
||||
}{
|
||||
{
|
||||
name: "two concurrent",
|
||||
numGoroutines: 2,
|
||||
iterations: 20,
|
||||
useBarrier: true,
|
||||
},
|
||||
{
|
||||
name: "five concurrent",
|
||||
numGoroutines: 5,
|
||||
iterations: 15,
|
||||
useBarrier: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
duplicateDetected := 0
|
||||
|
||||
for iter := 0; iter < tt.iterations; iter++ {
|
||||
// Fresh setup for each iteration
|
||||
p2pService := p2ptest.NewTestP2P(t)
|
||||
gt := time.Now()
|
||||
vr := [32]byte{'A'}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
|
||||
r := &Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
p2p: p2pService,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
chain: &mockChain.ChainService{
|
||||
ValidatorsRoot: vr,
|
||||
Genesis: gt,
|
||||
},
|
||||
clock: startup.NewClock(gt, vr),
|
||||
},
|
||||
subHandler: newSubTopicHandler(),
|
||||
chainStarted: abool.New(),
|
||||
}
|
||||
markInitSyncComplete(t, r)
|
||||
|
||||
digest, err := r.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
p2pService.Digest = digest
|
||||
|
||||
baseTopic := "/eth2/%x/voluntary_exit"
|
||||
|
||||
r.markForChainStart()
|
||||
|
||||
// Track successful subscriptions
|
||||
successfulSubs := atomic.Int32{}
|
||||
checksPassed := atomic.Int32{}
|
||||
|
||||
// Barrier to synchronize goroutine starts
|
||||
var barrier sync.WaitGroup
|
||||
if tt.useBarrier {
|
||||
barrier.Add(tt.numGoroutines)
|
||||
}
|
||||
startSignal := make(chan struct{})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Launch concurrent subscription attempts
|
||||
for i := 0; i < tt.numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
if tt.useBarrier {
|
||||
barrier.Done()
|
||||
barrier.Wait()
|
||||
}
|
||||
|
||||
<-startSignal
|
||||
|
||||
// Attempt subscription
|
||||
// ideally only one goroutine should get a non-nil subscription
|
||||
handler := func(ctx context.Context, msg proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||
if sub != nil {
|
||||
successfulSubs.Add(1)
|
||||
}
|
||||
// Count how many goroutines attempted (for stats)
|
||||
checksPassed.Add(1)
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for all goroutines to be ready
|
||||
if tt.useBarrier {
|
||||
barrier.Wait()
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Start all goroutines simultaneously
|
||||
close(startSignal)
|
||||
|
||||
// Wait for completion
|
||||
wg.Wait()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Check results
|
||||
subs := successfulSubs.Load()
|
||||
attempts := checksPassed.Load()
|
||||
|
||||
r.subHandler.RLock()
|
||||
finalMapSize := len(r.subHandler.subTopics)
|
||||
r.subHandler.RUnlock()
|
||||
|
||||
// ideally only ONE goroutine should successfully subscribe
|
||||
// If more than one succeeds, a race condition exists
|
||||
if subs > 1 {
|
||||
duplicateDetected++
|
||||
t.Logf("Iteration %d: RACE DETECTED - %d goroutines attempted, "+
|
||||
"%d successful subscriptions (expected 1), final map size: %d",
|
||||
iter, attempts, subs, finalMapSize)
|
||||
}
|
||||
|
||||
// The map should have exactly 0 or 1 entry
|
||||
if finalMapSize > 1 {
|
||||
t.Errorf("Iteration %d: INCONSISTENT STATE - map has %d entries (expected 0-1). "+
|
||||
"This indicates multiple goroutines subscribed concurrently.",
|
||||
iter, finalMapSize)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
cancel()
|
||||
r.subHandler.Lock()
|
||||
for topic := range r.subHandler.subTopics {
|
||||
sub := r.subHandler.subTopics[topic]
|
||||
if sub != nil {
|
||||
sub.Cancel()
|
||||
}
|
||||
delete(r.subHandler.subTopics, topic)
|
||||
}
|
||||
r.subHandler.Unlock()
|
||||
}
|
||||
|
||||
if duplicateDetected > 0 {
|
||||
racePercentage := float64(duplicateDetected) / float64(tt.iterations) * 100
|
||||
t.Errorf("RACE CONDITION EXISTS in %d/%d iterations (%.1f%%). "+
|
||||
"Multiple goroutines successfully subscribed (only 1 expected). ",
|
||||
duplicateDetected, tt.iterations, racePercentage)
|
||||
} else {
|
||||
t.Logf("SUCCESS: No Race condition! Only 1 subscription succeeded in all %d iterations", tt.iterations)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestMemoryGrowth_SubscriptionFailures demonstrates memory growth over time
|
||||
func TestMemoryGrowth_SubscriptionFailures(t *testing.T) {
|
||||
p2pService := p2ptest.NewTestP2P(t)
|
||||
gt := time.Now()
|
||||
vr := [32]byte{'A'}
|
||||
|
||||
r := &Service{
|
||||
ctx: context.Background(),
|
||||
cfg: &config{
|
||||
p2p: p2pService,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
chain: &mockChain.ChainService{
|
||||
ValidatorsRoot: vr,
|
||||
Genesis: gt,
|
||||
},
|
||||
clock: startup.NewClock(gt, vr),
|
||||
},
|
||||
subHandler: newSubTopicHandler(),
|
||||
chainStarted: abool.New(),
|
||||
}
|
||||
markInitSyncComplete(t, r)
|
||||
|
||||
digest, err := r.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
p2pService.Digest = digest
|
||||
|
||||
baseTopic := "/eth2/%x/voluntary_exit"
|
||||
|
||||
getMapSize := func() int {
|
||||
r.subHandler.RLock()
|
||||
defer r.subHandler.RUnlock()
|
||||
return len(r.subHandler.subTopics)
|
||||
}
|
||||
|
||||
failures := 50
|
||||
var memStats runtime.MemStats
|
||||
|
||||
runtime.ReadMemStats(&memStats)
|
||||
startAlloc := memStats.Alloc
|
||||
|
||||
for i := 0; i < failures; i++ {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r.ctx = ctx
|
||||
r.markForChainStart()
|
||||
|
||||
handler := func(ctx context.Context, msg proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||
if sub != nil {
|
||||
// Cancel immediately to simulate failure
|
||||
cancel()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
if i%10 == 0 {
|
||||
runtime.ReadMemStats(&memStats)
|
||||
currentAlloc := memStats.Alloc
|
||||
growth := currentAlloc - startAlloc
|
||||
t.Logf("After %d failures: subTopics size=%d, heap growth=%d KB",
|
||||
i, getMapSize(), growth/1024)
|
||||
}
|
||||
}
|
||||
|
||||
finalSize := getMapSize()
|
||||
runtime.ReadMemStats(&memStats)
|
||||
finalAlloc := memStats.Alloc
|
||||
|
||||
t.Logf("Final results: %d subscription failures", failures)
|
||||
t.Logf(" subTopics map size: %d entries", finalSize)
|
||||
t.Logf(" Start heap: %d KB, Final heap: %d KB", startAlloc/1024, finalAlloc/1024)
|
||||
|
||||
// With the bug, even one stale entry is a problem because it prevents resubscription
|
||||
if finalSize > 0 {
|
||||
t.Errorf("MEMORY LEAK / STALE ENTRY: After %d failures, %d stale entries remain in subTopics map (expected 0). "+
|
||||
"Even 1 stale entry prevents resubscription, causing missed attestations in production.",
|
||||
failures, finalSize)
|
||||
}
|
||||
|
||||
// Check if heap grew significantly (handle wraparound by checking if finalAlloc >= startAlloc)
|
||||
if finalAlloc >= startAlloc {
|
||||
totalGrowth := finalAlloc - startAlloc
|
||||
if totalGrowth > 50*1024 { // 50 KB threshold
|
||||
t.Logf("NOTE: Heap grew by %d KB over %d failures. ",
|
||||
totalGrowth/1024, failures)
|
||||
}
|
||||
} else {
|
||||
t.Logf("NOTE: Heap decreased (GC ran), cannot measure growth accurately")
|
||||
}
|
||||
}
|
||||
@@ -26,22 +26,13 @@ func newSubTopicHandler() *subTopicHandler {
|
||||
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// Check if this is updating a reserved entry (nil subscription)
|
||||
existingSub, exists := s.subTopics[topic]
|
||||
wasReserved := exists && existingSub == nil
|
||||
|
||||
s.subTopics[topic] = sub
|
||||
digest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
return
|
||||
}
|
||||
|
||||
// Only increment digest count if this is a new topic (not just updating a reservation)
|
||||
if !wasReserved {
|
||||
s.digestMap[digest] += 1
|
||||
}
|
||||
s.digestMap[digest] += 1
|
||||
}
|
||||
|
||||
func (s *subTopicHandler) topicExists(topic string) bool {
|
||||
@@ -51,57 +42,25 @@ func (s *subTopicHandler) topicExists(topic string) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// tryReserveTopic atomically checks if a topic exists and reserves it if not.
|
||||
// Returns true if the topic was successfully reserved (didn't exist before),
|
||||
// false if the topic already exists or is reserved.
|
||||
// This prevents the race condition where multiple goroutines check topicExists()
|
||||
// simultaneously and both proceed to subscribe.
|
||||
func (s *subTopicHandler) tryReserveTopic(topic string) bool {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// Check if topic already exists or is reserved
|
||||
if _, exists := s.subTopics[topic]; exists {
|
||||
return false
|
||||
}
|
||||
|
||||
// Reserve the topic with a nil placeholder
|
||||
// This will be updated with the actual subscription later
|
||||
s.subTopics[topic] = nil
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *subTopicHandler) removeTopic(topic string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// Check if topic exists and whether it was just a reservation (nil)
|
||||
existingSub, exists := s.subTopics[topic]
|
||||
if !exists {
|
||||
delete(s.subTopics, topic)
|
||||
digest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
return
|
||||
}
|
||||
wasReserved := existingSub == nil
|
||||
|
||||
delete(s.subTopics, topic)
|
||||
|
||||
// Only decrement digest count if this wasn't just a reservation
|
||||
if !wasReserved {
|
||||
digest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
return
|
||||
}
|
||||
currAmt, ok := s.digestMap[digest]
|
||||
// Should never be possible, is a
|
||||
// defensive check.
|
||||
if !ok || currAmt <= 0 {
|
||||
delete(s.digestMap, digest)
|
||||
return
|
||||
}
|
||||
s.digestMap[digest] -= 1
|
||||
if s.digestMap[digest] == 0 {
|
||||
delete(s.digestMap, digest)
|
||||
}
|
||||
currAmt, ok := s.digestMap[digest]
|
||||
// Should never be possible, is a
|
||||
// defensive check.
|
||||
if !ok || currAmt <= 0 {
|
||||
delete(s.digestMap, digest)
|
||||
return
|
||||
}
|
||||
s.digestMap[digest] -= 1
|
||||
if s.digestMap[digest] == 0 {
|
||||
delete(s.digestMap, digest)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
3
changelog/james-prysm_fix-config-parsing.md
Normal file
3
changelog/james-prysm_fix-config-parsing.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Fixing Unsupported config field kind; value forwarded verbatim errors for type string.
|
||||
2
changelog/manu-fix-#15738.md
Normal file
2
changelog/manu-fix-#15738.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Fixed
|
||||
- [#15738](https://github.com/OffchainLabs/prysm/issues/15738)
|
||||
3
changelog/pvl-gh-runners.md
Normal file
3
changelog/pvl-gh-runners.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Ignored
|
||||
|
||||
- Changed github action runners from `ubuntu-latest` to `ubuntu-4`
|
||||
3
changelog/pvl-strip.md
Normal file
3
changelog/pvl-strip.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Bazel builds with `--config=release` now properly apply `--strip=always` to strip debug symbols from the release assets.
|
||||
Reference in New Issue
Block a user