Tests: Add require.Eventually and fix a few test flakes (#16217)

**What type of PR is this?**

Other

**What does this PR do? Why is it needed?**

This is a better way to wait for a test condition to hit, rather than
time.Sleep.

**Which issues(s) does this PR fix?**


**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
This commit is contained in:
Preston Van Loon
2026-01-06 12:20:27 -06:00
committed by GitHub
parent ffad861e2c
commit 27c009e7ff
12 changed files with 167 additions and 104 deletions

View File

@@ -17,6 +17,7 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpbv1 "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
@@ -130,12 +131,10 @@ func TestService_ReceiveBlock(t *testing.T) {
block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/),
},
check: func(t *testing.T, s *Service) {
// Hacky sleep, should use a better way to be able to resolve the race
// between event being sent out and processed.
time.Sleep(100 * time.Millisecond)
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
t.Errorf("Received %d state notifications, expected at least 1", recvd)
}
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) >= 1
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
},
},
{
@@ -222,10 +221,10 @@ func TestService_ReceiveBlockUpdateHead(t *testing.T) {
require.NoError(t, s.ReceiveBlock(ctx, wsb, root, nil))
})
wg.Wait()
time.Sleep(100 * time.Millisecond)
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
t.Errorf("Received %d state notifications, expected at least 1", recvd)
}
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) >= 1
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
// Verify fork choice has processed the block. (Genesis block and the new block)
assert.Equal(t, 2, s.cfg.ForkChoiceStore.NodeCount())
}
@@ -265,10 +264,10 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/),
},
check: func(t *testing.T, s *Service) {
time.Sleep(100 * time.Millisecond)
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
t.Errorf("Received %d state notifications, expected at least 1", recvd)
}
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) >= 1
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
},
},
}
@@ -512,8 +511,9 @@ func Test_executePostFinalizationTasks(t *testing.T) {
s.cfg.StateNotifier = notifier
s.executePostFinalizationTasks(s.ctx, headState)
time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine
require.Equal(t, 1, len(notifier.ReceivedEvents()))
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) == 1
}, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification")
e := notifier.ReceivedEvents()[0]
assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type))
fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint)
@@ -552,8 +552,9 @@ func Test_executePostFinalizationTasks(t *testing.T) {
s.cfg.StateNotifier = notifier
s.executePostFinalizationTasks(s.ctx, headState)
time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine
require.Equal(t, 1, len(notifier.ReceivedEvents()))
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) == 1
}, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification")
e := notifier.ReceivedEvents()[0]
assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type))
fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint)
@@ -596,13 +597,13 @@ func TestProcessLightClientBootstrap(t *testing.T) {
s.executePostFinalizationTasks(s.ctx, l.AttestedState)
// wait for the goroutine to finish processing
time.Sleep(1 * time.Second)
// Check that the light client bootstrap is saved
b, err := s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root))
require.NoError(t, err)
require.NotNil(t, b)
// Wait for the light client bootstrap to be saved (runs in goroutine)
var b interfaces.LightClientBootstrap
require.Eventually(t, func() bool {
var err error
b, err = s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root))
return err == nil && b != nil
}, 5*time.Second, 50*time.Millisecond, "Light client bootstrap was not saved within timeout")
btst, err := lightClient.NewLightClientBootstrapFromBeaconState(ctx, l.FinalizedState.Slot(), l.FinalizedState, l.FinalizedBlock)
require.NoError(t, err)

View File

@@ -75,7 +75,6 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
p2p := p2pTesting.NewTestP2P(t)
lcStore := NewLightClientStore(p2p, new(event.Feed), testDB.SetupDB(t))
timeForGoroutinesToFinish := 20 * time.Microsecond
// update 0 with basic data and no supermajority following an empty lastFinalityUpdate - should save and broadcast
l0 := util.NewTestLightClient(t, version.Altair)
update0, err := NewLightClientFinalityUpdateFromBeaconState(l0.Ctx, l0.State, l0.Block, l0.AttestedState, l0.AttestedBlock, l0.FinalizedBlock)
@@ -87,8 +86,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update0, true)
require.Equal(t, update0, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update when previous is nil")
require.Eventually(t, func() bool {
return p2p.BroadcastCalled.Load()
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update when previous is nil")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 1 with same finality slot, increased attested slot, and no supermajority - should save but not broadcast
@@ -102,7 +102,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update1, true)
require.Equal(t, update1, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called after setting a new last finality update without supermajority")
p2p.BroadcastCalled.Store(false) // Reset for next test
@@ -117,8 +117,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update2, true)
require.Equal(t, update2, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update with supermajority")
require.Eventually(t, func() bool {
return p2p.BroadcastCalled.Load()
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update with supermajority")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 3 with same finality slot, increased attested slot, and supermajority - should save but not broadcast
@@ -132,7 +133,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update3, true)
require.Equal(t, update3, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been when previous was already broadcast")
// update 4 with increased finality slot, increased attested slot, and supermajority - should save and broadcast
@@ -146,8 +147,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update4, true)
require.Equal(t, update4, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after a new finality update with increased finality slot")
require.Eventually(t, func() bool {
return p2p.BroadcastCalled.Load()
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after a new finality update with increased finality slot")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 5 with the same new finality slot, increased attested slot, and supermajority - should save but not broadcast
@@ -161,7 +163,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update5, true)
require.Equal(t, update5, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
// update 6 with the same new finality slot, increased attested slot, and no supermajority - should save but not broadcast
@@ -175,7 +177,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update6, true)
require.Equal(t, update6, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
}

View File

@@ -72,7 +72,10 @@ func TestService_Broadcast(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -186,7 +189,10 @@ func TestService_BroadcastAttestation(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -375,7 +381,15 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
_, err = tpHandle.Subscribe()
require.NoError(t, err)
time.Sleep(500 * time.Millisecond) // libp2p fails without this delay...
// This test specifically tests discovery-based peer finding, which requires
// time for nodes to discover each other. Using a fixed sleep here is intentional
// as we're testing the discovery timing behavior.
time.Sleep(500 * time.Millisecond)
// Verify mesh establishment after discovery
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0 && len(p2.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
nodePeers := p.pubsub.ListPeers(topic)
nodePeers2 := p2.pubsub.ListPeers(topic)
@@ -444,7 +458,10 @@ func TestService_BroadcastSyncCommittee(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -521,7 +538,10 @@ func TestService_BroadcastBlob(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -584,7 +604,10 @@ func TestService_BroadcastLightClientOptimisticUpdate(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -660,7 +683,10 @@ func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -771,8 +797,10 @@ func TestService_BroadcastDataColumn(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
// libp2p fails without this delay
time.Sleep(50 * time.Millisecond)
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(service.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})

View File

@@ -482,12 +482,12 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) {
s.Start()
<-exitRoutine
}()
time.Sleep(50 * time.Millisecond)
time.Sleep(50 * time.Millisecond) // Wait for service initialization
var vr [32]byte
require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr)))
time.Sleep(4 * time.Second)
ps := s.host.Network().Peers()
assert.Equal(t, 5, len(ps), "Not all peers added to peerstore")
require.Eventually(t, func() bool {
return len(s.host.Network().Peers()) == 5
}, 10*time.Second, 100*time.Millisecond, "Not all peers added to peerstore")
require.NoError(t, s.Stop())
exitRoutine <- true
}

View File

@@ -80,8 +80,9 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
}()
var vr [32]byte
require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr)))
time.Sleep(time.Second * 2)
assert.Equal(t, true, s.started, "Expected service to be started")
require.Eventually(t, func() bool {
return s.started
}, 5*time.Second, 100*time.Millisecond, "Expected service to be started")
s.Start()
require.LogsContain(t, hook, "Attempted to start p2p service when it was already started")
require.NoError(t, s.Stop())
@@ -260,17 +261,9 @@ func TestListenForNewNodes(t *testing.T) {
err = cs.SetClock(startup.NewClock(genesisTime, gvr))
require.NoError(t, err, "Could not set clock in service")
actualPeerCount := len(s.host.Network().Peers())
for range 40 {
if actualPeerCount == peerCount {
break
}
time.Sleep(100 * time.Millisecond)
actualPeerCount = len(s.host.Network().Peers())
}
assert.Equal(t, peerCount, actualPeerCount, "Not all peers added to peerstore")
require.Eventually(t, func() bool {
return len(s.host.Network().Peers()) == peerCount
}, 5*time.Second, 100*time.Millisecond, "Not all peers added to peerstore")
err = s.Stop()
require.NoError(t, err, "Failed to stop service")

View File

@@ -657,8 +657,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 1
}, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool")
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
@@ -677,8 +678,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 2
}, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool")
})
t.Run("phase0 att post electra", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
@@ -798,8 +800,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 1
}, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool")
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
@@ -818,8 +821,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 2
}, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool")
})
t.Run("no body", func(t *testing.T) {
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
@@ -1375,9 +1379,9 @@ func TestSubmitSignedBLSToExecutionChanges_Ok(t *testing.T) {
writer.Body = &bytes.Buffer{}
s.SubmitBLSToExecutionChanges(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
time.Sleep(100 * time.Millisecond) // Delay to let the routine start
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages))
require.Eventually(t, func() bool {
return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages) == numValidators
}, time.Second, 10*time.Millisecond, "Broadcast should be called with all messages")
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
require.Equal(t, len(poolChanges), len(signedChanges))
@@ -1591,10 +1595,10 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) {
s.SubmitBLSToExecutionChanges(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start
require.StringContains(t, "One or more messages failed validation", writer.Body.String())
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1)
require.Eventually(t, func() bool {
return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages)+1 == numValidators
}, time.Second, 10*time.Millisecond, "Broadcast should be called with expected messages")
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
require.Equal(t, len(poolChanges)+1, len(signedChanges))

View File

@@ -70,7 +70,6 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
topic := "/eth2/%x/beacon_block"
go r.startDiscoveryAndSubscriptions()
time.Sleep(100 * time.Millisecond)
var vr [32]byte
require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr)))
@@ -83,9 +82,11 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
msg.Block.ParentRoot = util.Random32Bytes(t)
msg.Signature = sk.Sign([]byte("data")).Marshal()
p2p.ReceivePubSub(topic, msg)
// wait for chainstart to be sent
time.Sleep(400 * time.Millisecond)
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
// Wait for chainstart event to be processed
require.Eventually(t, func() bool {
return r.chainStarted.IsSet()
}, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event.")
}
func TestSyncHandlers_WaitForChainStart(t *testing.T) {
@@ -217,20 +218,18 @@ func TestSyncService_StopCleanly(t *testing.T) {
p2p.Digest, err = r.currentForkDigest()
require.NoError(t, err)
// wait for chainstart to be sent
time.Sleep(2 * time.Second)
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
require.NotEqual(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
require.NotEqual(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
// Wait for chainstart and topics to be registered
require.Eventually(t, func() bool {
return r.chainStarted.IsSet() && len(r.cfg.p2p.PubSub().GetTopics()) > 0 && len(r.cfg.p2p.Host().Mux().Protocols()) > 0
}, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event or topics not registered.")
// Both pubsub and rpc topics should be unsubscribed.
require.NoError(t, r.Stop())
// Sleep to allow pubsub topics to be deregistered.
time.Sleep(1 * time.Second)
require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
// Wait for pubsub topics to be deregistered.
require.Eventually(t, func() bool {
return len(r.cfg.p2p.PubSub().GetTopics()) == 0 && len(r.cfg.p2p.Host().Mux().Protocols()) == 0
}, 5*time.Second, 50*time.Millisecond, "Pubsub topics were not deregistered")
}
func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {

View File

@@ -614,11 +614,10 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
},
}
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers.
if res, err := r.validateAggregateAndProof(t.Context(), "", msg); res == pubsub.ValidationAccept {
_ = err
t.Fatal("Validated status is true")
}
require.Eventually(t, func() bool {
res, _ := r.validateAggregateAndProof(t.Context(), "", msg)
return res != pubsub.ValidationAccept
}, time.Second, 10*time.Millisecond, "Expected validation to reject duplicate aggregate")
}
func TestValidateAggregateAndProof_BadBlock(t *testing.T) {

View File

@@ -992,7 +992,6 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
// Mark the proposer/slot as seen
r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex)
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers
// Prepare and validate the second message (clone)
buf := new(bytes.Buffer)
@@ -1010,9 +1009,11 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
}
// Since this is not an equivocation (same signature), it should be ignored
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
assert.NoError(t, err)
assert.Equal(t, pubsub.ValidationIgnore, res, "block with same signature should be ignored")
// Wait for the cached value to propagate through buffers
require.Eventually(t, func() bool {
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
return err == nil && res == pubsub.ValidationIgnore
}, time.Second, 10*time.Millisecond, "block with same signature should be ignored")
// Verify no slashings were created
assert.Equal(t, 0, len(slashingPool.PendingPropSlashings), "Expected no slashings for same signature")

View File

@@ -0,0 +1,3 @@
### Changed
- Replaced `time.Sleep` with `require.Eventually` polling in tests to fix flaky behavior caused by race conditions between goroutines and assertions.

View File

@@ -8,6 +8,7 @@ import (
"runtime"
"sort"
"strings"
"time"
"github.com/OffchainLabs/prysm/v7/encoding/ssz/equality"
"github.com/d4l3k/messagediff"
@@ -138,12 +139,21 @@ func StringContains(loggerFn assertionLoggerFn, expected, actual string, flag bo
// NoError asserts that error is nil.
func NoError(loggerFn assertionLoggerFn, err error, msg ...any) {
// reflect.ValueOf is needed for nil instances of custom types implementing Error
if err != nil && !reflect.ValueOf(err).IsNil() {
errMsg := parseMsg("Unexpected error", msg...)
_, file, line, _ := runtime.Caller(2)
loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err)
if err == nil {
return
}
// reflect.ValueOf is needed for nil instances of custom types implementing Error.
// Only check IsNil for types that support it to avoid panics on struct types.
v := reflect.ValueOf(err)
switch v.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice, reflect.UnsafePointer:
if v.IsNil() {
return
}
}
errMsg := parseMsg("Unexpected error", msg...)
_, file, line, _ := runtime.Caller(2)
loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err)
}
// ErrorIs uses Errors.Is to recursively unwrap err looking for target in the chain.
@@ -341,3 +351,18 @@ func (tb *TBMock) Errorf(format string, args ...any) {
func (tb *TBMock) Fatalf(format string, args ...any) {
tb.FatalfMsg = fmt.Sprintf(format, args...)
}
// Eventually asserts that given condition will be met within waitFor time,
// periodically checking target function each tick.
func Eventually(loggerFn assertionLoggerFn, condition func() bool, waitFor, tick time.Duration, msg ...any) {
deadline := time.Now().Add(waitFor)
for time.Now().Before(deadline) {
if condition() {
return
}
time.Sleep(tick)
}
errMsg := parseMsg("Condition never satisfied", msg...)
_, file, line, _ := runtime.Caller(2)
loggerFn("%s:%d %s (waited %v)", filepath.Base(file), line, errMsg, waitFor)
}

View File

@@ -1,6 +1,8 @@
package require
import (
"time"
"github.com/OffchainLabs/prysm/v7/testing/assertions"
"github.com/sirupsen/logrus/hooks/test"
)
@@ -87,3 +89,9 @@ func ErrorIs(tb assertions.AssertionTestingTB, err, target error, msg ...any) {
func StringContains(tb assertions.AssertionTestingTB, expected, actual string, msg ...any) {
assertions.StringContains(tb.Fatalf, expected, actual, true, msg)
}
// Eventually asserts that given condition will be met within waitFor time,
// periodically checking target function each tick.
func Eventually(tb assertions.AssertionTestingTB, condition func() bool, waitFor, tick time.Duration, msg ...any) {
assertions.Eventually(tb.Fatalf, condition, waitFor, tick, msg...)
}