From 27c009e7ff1caf170e67e5a8b4fc2b35cb25e4af Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Tue, 6 Jan 2026 12:20:27 -0600 Subject: [PATCH] Tests: Add require.Eventually and fix a few test flakes (#16217) **What type of PR is this?** Other **What does this PR do? Why is it needed?** This is a better way to wait for a test condition to hit, rather than time.Sleep. **Which issues(s) does this PR fix?** **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --- beacon-chain/blockchain/receive_block_test.go | 51 ++++++++++--------- beacon-chain/light-client/store_test.go | 24 +++++---- beacon-chain/p2p/broadcaster_test.go | 46 +++++++++++++---- beacon-chain/p2p/discovery_test.go | 8 +-- beacon-chain/p2p/service_test.go | 19 +++---- .../rpc/eth/beacon/handlers_pool_test.go | 32 +++++++----- beacon-chain/sync/service_test.go | 27 +++++----- .../sync/validate_aggregate_proof_test.go | 9 ++-- .../sync/validate_beacon_blocks_test.go | 9 ++-- changelog/pvl_fix-flaky-tests-polling.md | 3 ++ testing/assertions/assertions.go | 35 +++++++++++-- testing/require/requires.go | 8 +++ 12 files changed, 167 insertions(+), 104 deletions(-) create mode 100644 changelog/pvl_fix-flaky-tests-polling.md diff --git a/beacon-chain/blockchain/receive_block_test.go b/beacon-chain/blockchain/receive_block_test.go index 95577054f2..dbbff19240 100644 --- a/beacon-chain/blockchain/receive_block_test.go +++ b/beacon-chain/blockchain/receive_block_test.go @@ -17,6 +17,7 @@ import ( fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams" "github.com/OffchainLabs/prysm/v7/config/params" "github.com/OffchainLabs/prysm/v7/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v7/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v7/consensus-types/primitives" "github.com/OffchainLabs/prysm/v7/encoding/bytesutil" ethpbv1 "github.com/OffchainLabs/prysm/v7/proto/eth/v1" @@ -130,12 +131,10 @@ func TestService_ReceiveBlock(t *testing.T) { block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/), }, check: func(t *testing.T, s *Service) { - // Hacky sleep, should use a better way to be able to resolve the race - // between event being sent out and processed. - time.Sleep(100 * time.Millisecond) - if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 { - t.Errorf("Received %d state notifications, expected at least 1", recvd) - } + notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier) + require.Eventually(t, func() bool { + return len(notifier.ReceivedEvents()) >= 1 + }, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification") }, }, { @@ -222,10 +221,10 @@ func TestService_ReceiveBlockUpdateHead(t *testing.T) { require.NoError(t, s.ReceiveBlock(ctx, wsb, root, nil)) }) wg.Wait() - time.Sleep(100 * time.Millisecond) - if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 { - t.Errorf("Received %d state notifications, expected at least 1", recvd) - } + notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier) + require.Eventually(t, func() bool { + return len(notifier.ReceivedEvents()) >= 1 + }, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification") // Verify fork choice has processed the block. (Genesis block and the new block) assert.Equal(t, 2, s.cfg.ForkChoiceStore.NodeCount()) } @@ -265,10 +264,10 @@ func TestService_ReceiveBlockBatch(t *testing.T) { block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/), }, check: func(t *testing.T, s *Service) { - time.Sleep(100 * time.Millisecond) - if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 { - t.Errorf("Received %d state notifications, expected at least 1", recvd) - } + notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier) + require.Eventually(t, func() bool { + return len(notifier.ReceivedEvents()) >= 1 + }, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification") }, }, } @@ -512,8 +511,9 @@ func Test_executePostFinalizationTasks(t *testing.T) { s.cfg.StateNotifier = notifier s.executePostFinalizationTasks(s.ctx, headState) - time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine - require.Equal(t, 1, len(notifier.ReceivedEvents())) + require.Eventually(t, func() bool { + return len(notifier.ReceivedEvents()) == 1 + }, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification") e := notifier.ReceivedEvents()[0] assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type)) fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint) @@ -552,8 +552,9 @@ func Test_executePostFinalizationTasks(t *testing.T) { s.cfg.StateNotifier = notifier s.executePostFinalizationTasks(s.ctx, headState) - time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine - require.Equal(t, 1, len(notifier.ReceivedEvents())) + require.Eventually(t, func() bool { + return len(notifier.ReceivedEvents()) == 1 + }, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification") e := notifier.ReceivedEvents()[0] assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type)) fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint) @@ -596,13 +597,13 @@ func TestProcessLightClientBootstrap(t *testing.T) { s.executePostFinalizationTasks(s.ctx, l.AttestedState) - // wait for the goroutine to finish processing - time.Sleep(1 * time.Second) - - // Check that the light client bootstrap is saved - b, err := s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root)) - require.NoError(t, err) - require.NotNil(t, b) + // Wait for the light client bootstrap to be saved (runs in goroutine) + var b interfaces.LightClientBootstrap + require.Eventually(t, func() bool { + var err error + b, err = s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root)) + return err == nil && b != nil + }, 5*time.Second, 50*time.Millisecond, "Light client bootstrap was not saved within timeout") btst, err := lightClient.NewLightClientBootstrapFromBeaconState(ctx, l.FinalizedState.Slot(), l.FinalizedState, l.FinalizedBlock) require.NoError(t, err) diff --git a/beacon-chain/light-client/store_test.go b/beacon-chain/light-client/store_test.go index 2f399ecaa6..10b5e44900 100644 --- a/beacon-chain/light-client/store_test.go +++ b/beacon-chain/light-client/store_test.go @@ -75,7 +75,6 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { p2p := p2pTesting.NewTestP2P(t) lcStore := NewLightClientStore(p2p, new(event.Feed), testDB.SetupDB(t)) - timeForGoroutinesToFinish := 20 * time.Microsecond // update 0 with basic data and no supermajority following an empty lastFinalityUpdate - should save and broadcast l0 := util.NewTestLightClient(t, version.Altair) update0, err := NewLightClientFinalityUpdateFromBeaconState(l0.Ctx, l0.State, l0.Block, l0.AttestedState, l0.AttestedBlock, l0.FinalizedBlock) @@ -87,8 +86,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { lcStore.SetLastFinalityUpdate(update0, true) require.Equal(t, update0, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value") - time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish - require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update when previous is nil") + require.Eventually(t, func() bool { + return p2p.BroadcastCalled.Load() + }, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update when previous is nil") p2p.BroadcastCalled.Store(false) // Reset for next test // update 1 with same finality slot, increased attested slot, and no supermajority - should save but not broadcast @@ -102,7 +102,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { lcStore.SetLastFinalityUpdate(update1, true) require.Equal(t, update1, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value") - time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish + time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called after setting a new last finality update without supermajority") p2p.BroadcastCalled.Store(false) // Reset for next test @@ -117,8 +117,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { lcStore.SetLastFinalityUpdate(update2, true) require.Equal(t, update2, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value") - time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish - require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update with supermajority") + require.Eventually(t, func() bool { + return p2p.BroadcastCalled.Load() + }, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update with supermajority") p2p.BroadcastCalled.Store(false) // Reset for next test // update 3 with same finality slot, increased attested slot, and supermajority - should save but not broadcast @@ -132,7 +133,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { lcStore.SetLastFinalityUpdate(update3, true) require.Equal(t, update3, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value") - time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish + time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been when previous was already broadcast") // update 4 with increased finality slot, increased attested slot, and supermajority - should save and broadcast @@ -146,8 +147,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { lcStore.SetLastFinalityUpdate(update4, true) require.Equal(t, update4, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value") - time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish - require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after a new finality update with increased finality slot") + require.Eventually(t, func() bool { + return p2p.BroadcastCalled.Load() + }, time.Second, 10*time.Millisecond, "Broadcast should have been called after a new finality update with increased finality slot") p2p.BroadcastCalled.Store(false) // Reset for next test // update 5 with the same new finality slot, increased attested slot, and supermajority - should save but not broadcast @@ -161,7 +163,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { lcStore.SetLastFinalityUpdate(update5, true) require.Equal(t, update5, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value") - time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish + time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority") // update 6 with the same new finality slot, increased attested slot, and no supermajority - should save but not broadcast @@ -175,7 +177,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) { lcStore.SetLastFinalityUpdate(update6, true) require.Equal(t, update6, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value") - time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish + time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority") } diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index e02e067438..e001e5da52 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -72,7 +72,10 @@ func TestService_Broadcast(t *testing.T) { sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + // Wait for libp2p mesh to establish + require.Eventually(t, func() bool { + return len(p.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup @@ -186,7 +189,10 @@ func TestService_BroadcastAttestation(t *testing.T) { sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + // Wait for libp2p mesh to establish + require.Eventually(t, func() bool { + return len(p.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup @@ -375,7 +381,15 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { _, err = tpHandle.Subscribe() require.NoError(t, err) - time.Sleep(500 * time.Millisecond) // libp2p fails without this delay... + // This test specifically tests discovery-based peer finding, which requires + // time for nodes to discover each other. Using a fixed sleep here is intentional + // as we're testing the discovery timing behavior. + time.Sleep(500 * time.Millisecond) + + // Verify mesh establishment after discovery + require.Eventually(t, func() bool { + return len(p.pubsub.ListPeers(topic)) > 0 && len(p2.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") nodePeers := p.pubsub.ListPeers(topic) nodePeers2 := p2.pubsub.ListPeers(topic) @@ -444,7 +458,10 @@ func TestService_BroadcastSyncCommittee(t *testing.T) { sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + // Wait for libp2p mesh to establish + require.Eventually(t, func() bool { + return len(p.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup @@ -521,7 +538,10 @@ func TestService_BroadcastBlob(t *testing.T) { sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + // Wait for libp2p mesh to establish + require.Eventually(t, func() bool { + return len(p.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup @@ -584,7 +604,10 @@ func TestService_BroadcastLightClientOptimisticUpdate(t *testing.T) { sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + // Wait for libp2p mesh to establish + require.Eventually(t, func() bool { + return len(p.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup @@ -660,7 +683,10 @@ func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) { sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... + // Wait for libp2p mesh to establish + require.Eventually(t, func() bool { + return len(p.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup @@ -771,8 +797,10 @@ func TestService_BroadcastDataColumn(t *testing.T) { sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - // libp2p fails without this delay - time.Sleep(50 * time.Millisecond) + // Wait for libp2p mesh to establish + require.Eventually(t, func() bool { + return len(service.pubsub.ListPeers(topic)) > 0 + }, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish") // Broadcast to peers and wait. err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar}) diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index a18a649618..3e9bea5ee8 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -482,12 +482,12 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) { s.Start() <-exitRoutine }() - time.Sleep(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) // Wait for service initialization var vr [32]byte require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr))) - time.Sleep(4 * time.Second) - ps := s.host.Network().Peers() - assert.Equal(t, 5, len(ps), "Not all peers added to peerstore") + require.Eventually(t, func() bool { + return len(s.host.Network().Peers()) == 5 + }, 10*time.Second, 100*time.Millisecond, "Not all peers added to peerstore") require.NoError(t, s.Stop()) exitRoutine <- true } diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index c6d68dd508..0a89fc97a9 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -80,8 +80,9 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) { }() var vr [32]byte require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr))) - time.Sleep(time.Second * 2) - assert.Equal(t, true, s.started, "Expected service to be started") + require.Eventually(t, func() bool { + return s.started + }, 5*time.Second, 100*time.Millisecond, "Expected service to be started") s.Start() require.LogsContain(t, hook, "Attempted to start p2p service when it was already started") require.NoError(t, s.Stop()) @@ -260,17 +261,9 @@ func TestListenForNewNodes(t *testing.T) { err = cs.SetClock(startup.NewClock(genesisTime, gvr)) require.NoError(t, err, "Could not set clock in service") - actualPeerCount := len(s.host.Network().Peers()) - for range 40 { - if actualPeerCount == peerCount { - break - } - - time.Sleep(100 * time.Millisecond) - actualPeerCount = len(s.host.Network().Peers()) - } - - assert.Equal(t, peerCount, actualPeerCount, "Not all peers added to peerstore") + require.Eventually(t, func() bool { + return len(s.host.Network().Peers()) == peerCount + }, 5*time.Second, 100*time.Millisecond, "Not all peers added to peerstore") err = s.Stop() require.NoError(t, err, "Failed to stop service") diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go index 0601428371..c49a69eb51 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool_test.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool_test.go @@ -657,8 +657,9 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) - time.Sleep(100 * time.Millisecond) // Wait for async pool save - assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) + require.Eventually(t, func() bool { + return s.AttestationsPool.UnaggregatedAttestationCount() == 1 + }, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool") }) t.Run("multiple", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} @@ -677,8 +678,9 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, 2, broadcaster.NumAttestations()) - time.Sleep(100 * time.Millisecond) // Wait for async pool save - assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) + require.Eventually(t, func() bool { + return s.AttestationsPool.UnaggregatedAttestationCount() == 2 + }, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool") }) t.Run("phase0 att post electra", func(t *testing.T) { params.SetupTestConfigCleanup(t) @@ -798,8 +800,9 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch) assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root)) assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch) - time.Sleep(100 * time.Millisecond) // Wait for async pool save - assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount()) + require.Eventually(t, func() bool { + return s.AttestationsPool.UnaggregatedAttestationCount() == 1 + }, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool") }) t.Run("multiple", func(t *testing.T) { broadcaster := &p2pMock.MockBroadcaster{} @@ -818,8 +821,9 @@ func TestSubmitAttestationsV2(t *testing.T) { assert.Equal(t, http.StatusOK, writer.Code) assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) assert.Equal(t, 2, broadcaster.NumAttestations()) - time.Sleep(100 * time.Millisecond) // Wait for async pool save - assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount()) + require.Eventually(t, func() bool { + return s.AttestationsPool.UnaggregatedAttestationCount() == 2 + }, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool") }) t.Run("no body", func(t *testing.T) { request := httptest.NewRequest(http.MethodPost, "http://example.com", nil) @@ -1375,9 +1379,9 @@ func TestSubmitSignedBLSToExecutionChanges_Ok(t *testing.T) { writer.Body = &bytes.Buffer{} s.SubmitBLSToExecutionChanges(writer, request) assert.Equal(t, http.StatusOK, writer.Code) - time.Sleep(100 * time.Millisecond) // Delay to let the routine start - assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) - assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)) + require.Eventually(t, func() bool { + return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages) == numValidators + }, time.Second, 10*time.Millisecond, "Broadcast should be called with all messages") poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges() require.Equal(t, len(poolChanges), len(signedChanges)) @@ -1591,10 +1595,10 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) { s.SubmitBLSToExecutionChanges(writer, request) assert.Equal(t, http.StatusBadRequest, writer.Code) - time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start require.StringContains(t, "One or more messages failed validation", writer.Body.String()) - assert.Equal(t, true, broadcaster.BroadcastCalled.Load()) - assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1) + require.Eventually(t, func() bool { + return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages)+1 == numValidators + }, time.Second, 10*time.Millisecond, "Broadcast should be called with expected messages") poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges() require.Equal(t, len(poolChanges)+1, len(signedChanges)) diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go index fefc0dd7f2..f3b8c7255c 100644 --- a/beacon-chain/sync/service_test.go +++ b/beacon-chain/sync/service_test.go @@ -70,7 +70,6 @@ func TestSyncHandlers_WaitToSync(t *testing.T) { topic := "/eth2/%x/beacon_block" go r.startDiscoveryAndSubscriptions() - time.Sleep(100 * time.Millisecond) var vr [32]byte require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr))) @@ -83,9 +82,11 @@ func TestSyncHandlers_WaitToSync(t *testing.T) { msg.Block.ParentRoot = util.Random32Bytes(t) msg.Signature = sk.Sign([]byte("data")).Marshal() p2p.ReceivePubSub(topic, msg) - // wait for chainstart to be sent - time.Sleep(400 * time.Millisecond) - require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.") + + // Wait for chainstart event to be processed + require.Eventually(t, func() bool { + return r.chainStarted.IsSet() + }, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event.") } func TestSyncHandlers_WaitForChainStart(t *testing.T) { @@ -217,20 +218,18 @@ func TestSyncService_StopCleanly(t *testing.T) { p2p.Digest, err = r.currentForkDigest() require.NoError(t, err) - // wait for chainstart to be sent - time.Sleep(2 * time.Second) - require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.") - - require.NotEqual(t, 0, len(r.cfg.p2p.PubSub().GetTopics())) - require.NotEqual(t, 0, len(r.cfg.p2p.Host().Mux().Protocols())) + // Wait for chainstart and topics to be registered + require.Eventually(t, func() bool { + return r.chainStarted.IsSet() && len(r.cfg.p2p.PubSub().GetTopics()) > 0 && len(r.cfg.p2p.Host().Mux().Protocols()) > 0 + }, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event or topics not registered.") // Both pubsub and rpc topics should be unsubscribed. require.NoError(t, r.Stop()) - // Sleep to allow pubsub topics to be deregistered. - time.Sleep(1 * time.Second) - require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics())) - require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols())) + // Wait for pubsub topics to be deregistered. + require.Eventually(t, func() bool { + return len(r.cfg.p2p.PubSub().GetTopics()) == 0 && len(r.cfg.p2p.Host().Mux().Protocols()) == 0 + }, 5*time.Second, 50*time.Millisecond, "Pubsub topics were not deregistered") } func TestService_Stop_SendsGoodbyeMessages(t *testing.T) { diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index f501efaa25..e52dc289d8 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -614,11 +614,10 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) { }, } - time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers. - if res, err := r.validateAggregateAndProof(t.Context(), "", msg); res == pubsub.ValidationAccept { - _ = err - t.Fatal("Validated status is true") - } + require.Eventually(t, func() bool { + res, _ := r.validateAggregateAndProof(t.Context(), "", msg) + return res != pubsub.ValidationAccept + }, time.Second, 10*time.Millisecond, "Expected validation to reject duplicate aggregate") } func TestValidateAggregateAndProof_BadBlock(t *testing.T) { diff --git a/beacon-chain/sync/validate_beacon_blocks_test.go b/beacon-chain/sync/validate_beacon_blocks_test.go index d56b2dfc73..aa9deeb290 100644 --- a/beacon-chain/sync/validate_beacon_blocks_test.go +++ b/beacon-chain/sync/validate_beacon_blocks_test.go @@ -992,7 +992,6 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { // Mark the proposer/slot as seen r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex) - time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers // Prepare and validate the second message (clone) buf := new(bytes.Buffer) @@ -1010,9 +1009,11 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { } // Since this is not an equivocation (same signature), it should be ignored - res, err := r.validateBeaconBlockPubSub(ctx, "", m) - assert.NoError(t, err) - assert.Equal(t, pubsub.ValidationIgnore, res, "block with same signature should be ignored") + // Wait for the cached value to propagate through buffers + require.Eventually(t, func() bool { + res, err := r.validateBeaconBlockPubSub(ctx, "", m) + return err == nil && res == pubsub.ValidationIgnore + }, time.Second, 10*time.Millisecond, "block with same signature should be ignored") // Verify no slashings were created assert.Equal(t, 0, len(slashingPool.PendingPropSlashings), "Expected no slashings for same signature") diff --git a/changelog/pvl_fix-flaky-tests-polling.md b/changelog/pvl_fix-flaky-tests-polling.md new file mode 100644 index 0000000000..82b97b1981 --- /dev/null +++ b/changelog/pvl_fix-flaky-tests-polling.md @@ -0,0 +1,3 @@ +### Changed + +- Replaced `time.Sleep` with `require.Eventually` polling in tests to fix flaky behavior caused by race conditions between goroutines and assertions. diff --git a/testing/assertions/assertions.go b/testing/assertions/assertions.go index 27a2570e3f..4dbf46d45e 100644 --- a/testing/assertions/assertions.go +++ b/testing/assertions/assertions.go @@ -8,6 +8,7 @@ import ( "runtime" "sort" "strings" + "time" "github.com/OffchainLabs/prysm/v7/encoding/ssz/equality" "github.com/d4l3k/messagediff" @@ -138,12 +139,21 @@ func StringContains(loggerFn assertionLoggerFn, expected, actual string, flag bo // NoError asserts that error is nil. func NoError(loggerFn assertionLoggerFn, err error, msg ...any) { - // reflect.ValueOf is needed for nil instances of custom types implementing Error - if err != nil && !reflect.ValueOf(err).IsNil() { - errMsg := parseMsg("Unexpected error", msg...) - _, file, line, _ := runtime.Caller(2) - loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err) + if err == nil { + return } + // reflect.ValueOf is needed for nil instances of custom types implementing Error. + // Only check IsNil for types that support it to avoid panics on struct types. + v := reflect.ValueOf(err) + switch v.Kind() { + case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice, reflect.UnsafePointer: + if v.IsNil() { + return + } + } + errMsg := parseMsg("Unexpected error", msg...) + _, file, line, _ := runtime.Caller(2) + loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err) } // ErrorIs uses Errors.Is to recursively unwrap err looking for target in the chain. @@ -341,3 +351,18 @@ func (tb *TBMock) Errorf(format string, args ...any) { func (tb *TBMock) Fatalf(format string, args ...any) { tb.FatalfMsg = fmt.Sprintf(format, args...) } + +// Eventually asserts that given condition will be met within waitFor time, +// periodically checking target function each tick. +func Eventually(loggerFn assertionLoggerFn, condition func() bool, waitFor, tick time.Duration, msg ...any) { + deadline := time.Now().Add(waitFor) + for time.Now().Before(deadline) { + if condition() { + return + } + time.Sleep(tick) + } + errMsg := parseMsg("Condition never satisfied", msg...) + _, file, line, _ := runtime.Caller(2) + loggerFn("%s:%d %s (waited %v)", filepath.Base(file), line, errMsg, waitFor) +} diff --git a/testing/require/requires.go b/testing/require/requires.go index a4f5caa2d3..6620670a97 100644 --- a/testing/require/requires.go +++ b/testing/require/requires.go @@ -1,6 +1,8 @@ package require import ( + "time" + "github.com/OffchainLabs/prysm/v7/testing/assertions" "github.com/sirupsen/logrus/hooks/test" ) @@ -87,3 +89,9 @@ func ErrorIs(tb assertions.AssertionTestingTB, err, target error, msg ...any) { func StringContains(tb assertions.AssertionTestingTB, expected, actual string, msg ...any) { assertions.StringContains(tb.Fatalf, expected, actual, true, msg) } + +// Eventually asserts that given condition will be met within waitFor time, +// periodically checking target function each tick. +func Eventually(tb assertions.AssertionTestingTB, condition func() bool, waitFor, tick time.Duration, msg ...any) { + assertions.Eventually(tb.Fatalf, condition, waitFor, tick, msg...) +}