Compare commits

...

13 Commits

Author SHA1 Message Date
james-prysm
93b2445d83 another refactor for readability 2026-01-06 20:45:13 -06:00
james-prysm
fe5d67496a refactoring code for readability 2026-01-06 20:30:29 -06:00
james-prysm
a7eb4add27 more cleanup fixing edge cases for checkpoint sync vs restart with head 2026-01-06 19:31:30 -06:00
james-prysm
0f0fced955 handle non checkpoint case on restart with increased custody count 2026-01-06 15:58:44 -06:00
james-prysm
56477f944f Merge branch 'develop' into fix-backward-earliest-slot 2026-01-06 12:22:49 -08:00
james-prysm
1a6252ade4 changing isHealthy to isReady (#16167)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

 Bug fix

**What does this PR do? Why is it needed?**

validator fallbacks shouldn't work on nodes that are syncing as many of
the tasks validators perform require the node to be fully synced.

- 206 or any other code is  interpreted as "not ready"
- 200 interpreted as "ready"

**Which issues(s) does this PR fix?**
 
continuation of https://github.com/OffchainLabs/prysm/pull/15401

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-06 18:58:12 +00:00
Preston Van Loon
27c009e7ff Tests: Add require.Eventually and fix a few test flakes (#16217)
**What type of PR is this?**

Other

**What does this PR do? Why is it needed?**

This is a better way to wait for a test condition to hit, rather than
time.Sleep.

**Which issues(s) does this PR fix?**


**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-06 18:20:27 +00:00
Jonny Rhea
ffad861e2c WithMaxExportBatchSize is specified twice (#16211)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

> Bug fix


**What does this PR do? Why is it needed?**

It's just a simple fix. I was looking at how prysm uses OpenTelemetry
and I noticed it.

**Which issues(s) does this PR fix?**

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-06 16:22:20 +00:00
james-prysm
5069057354 Update beacon-chain/db/kv/custody.go
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-01-06 09:33:47 -06:00
james-prysm
30340f632c Merge branch 'develop' into fix-backward-earliest-slot 2025-12-17 08:18:16 -08:00
james-prysm
6bfbbdbd1b fixing unit tests and updating changelog 2025-12-08 21:55:34 -06:00
james-prysm
d30c1cbb8a optimization for earliest slot 2025-12-08 16:57:53 -06:00
james-prysm
e9c7a62b97 attempt to fix earliest slot going backwards 2025-12-08 16:51:46 -06:00
30 changed files with 851 additions and 269 deletions

View File

@@ -17,6 +17,7 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpbv1 "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
@@ -130,12 +131,10 @@ func TestService_ReceiveBlock(t *testing.T) {
block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/),
},
check: func(t *testing.T, s *Service) {
// Hacky sleep, should use a better way to be able to resolve the race
// between event being sent out and processed.
time.Sleep(100 * time.Millisecond)
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
t.Errorf("Received %d state notifications, expected at least 1", recvd)
}
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) >= 1
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
},
},
{
@@ -222,10 +221,10 @@ func TestService_ReceiveBlockUpdateHead(t *testing.T) {
require.NoError(t, s.ReceiveBlock(ctx, wsb, root, nil))
})
wg.Wait()
time.Sleep(100 * time.Millisecond)
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
t.Errorf("Received %d state notifications, expected at least 1", recvd)
}
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) >= 1
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
// Verify fork choice has processed the block. (Genesis block and the new block)
assert.Equal(t, 2, s.cfg.ForkChoiceStore.NodeCount())
}
@@ -265,10 +264,10 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/),
},
check: func(t *testing.T, s *Service) {
time.Sleep(100 * time.Millisecond)
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
t.Errorf("Received %d state notifications, expected at least 1", recvd)
}
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) >= 1
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
},
},
}
@@ -512,8 +511,9 @@ func Test_executePostFinalizationTasks(t *testing.T) {
s.cfg.StateNotifier = notifier
s.executePostFinalizationTasks(s.ctx, headState)
time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine
require.Equal(t, 1, len(notifier.ReceivedEvents()))
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) == 1
}, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification")
e := notifier.ReceivedEvents()[0]
assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type))
fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint)
@@ -552,8 +552,9 @@ func Test_executePostFinalizationTasks(t *testing.T) {
s.cfg.StateNotifier = notifier
s.executePostFinalizationTasks(s.ctx, headState)
time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine
require.Equal(t, 1, len(notifier.ReceivedEvents()))
require.Eventually(t, func() bool {
return len(notifier.ReceivedEvents()) == 1
}, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification")
e := notifier.ReceivedEvents()[0]
assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type))
fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint)
@@ -596,13 +597,13 @@ func TestProcessLightClientBootstrap(t *testing.T) {
s.executePostFinalizationTasks(s.ctx, l.AttestedState)
// wait for the goroutine to finish processing
time.Sleep(1 * time.Second)
// Check that the light client bootstrap is saved
b, err := s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root))
require.NoError(t, err)
require.NotNil(t, b)
// Wait for the light client bootstrap to be saved (runs in goroutine)
var b interfaces.LightClientBootstrap
require.Eventually(t, func() bool {
var err error
b, err = s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root))
return err == nil && b != nil
}, 5*time.Second, 50*time.Millisecond, "Light client bootstrap was not saved within timeout")
btst, err := lightClient.NewLightClientBootstrapFromBeaconState(ctx, l.FinalizedState.Slot(), l.FinalizedState, l.FinalizedBlock)
require.NoError(t, err)

View File

@@ -295,11 +295,20 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
return nil
}
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB(saved.Slot())
// Track supernode subscription status for logging purposes.
isSupernode := flags.Get().Supernode
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
if err != nil {
return errors.Wrap(err, "update subscribed to all data subnets")
}
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB()
if err != nil {
return errors.Wrap(err, "could not get and save custody group count")
}
logCustodyStatus(wasSupernode, custodySubnetCount)
if _, _, err := s.cfg.P2P.UpdateCustodyInfo(earliestAvailableSlot, custodySubnetCount); err != nil {
return errors.Wrap(err, "update custody info")
}
@@ -468,58 +477,100 @@ func (s *Service) removeStartupState() {
s.cfg.FinalizedStateAtStartUp = nil
}
// UpdateCustodyInfoInDB updates the custody information in the database.
// It returns the (potentially updated) custody group count and the earliest available slot.
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
isSupernode := flags.Get().Supernode
isSemiSupernode := flags.Get().SemiSupernode
cfg := params.BeaconConfig()
custodyRequirement := cfg.CustodyRequirement
// Check if the node was previously subscribed to all data subnets, and if so,
// store the new status accordingly.
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
// updateCustodyInfoInDB updates the custody information in the database.
// It returns the (potentially updated) earliest available slot and custody group count.
func (s *Service) updateCustodyInfoInDB() (primitives.Slot, uint64, error) {
targetCustodyGroupCount, err := computeTargetCustodyGroupCount()
if err != nil {
return 0, 0, errors.Wrap(err, "update subscribed to all data subnets")
return 0, 0, err
}
// Compute the target custody group count based on current flag configuration.
targetCustodyGroupCount := custodyRequirement
// Query current custody info.
storedEarliestSlot, storedCustodyCount, err := s.cfg.BeaconDB.CustodyInfo(s.ctx)
if err != nil {
return 0, 0, errors.Wrap(err, "custody info")
}
// Supernode: custody all groups (either currently set or previously enabled)
// If custody was already initialized and no increase needed, return stored values.
if storedCustodyCount > 0 && targetCustodyGroupCount <= storedCustodyCount {
return storedEarliestSlot, storedCustodyCount, nil
}
earliestAvailableSlot, err := s.earliestAvailableSlotForCustodyIncrease(storedCustodyCount)
if err != nil {
return 0, 0, err
}
storedEarliestSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(
s.ctx, earliestAvailableSlot, targetCustodyGroupCount)
if err != nil {
return 0, 0, errors.Wrap(err, "update custody info")
}
return storedEarliestSlot, actualCustodyGroupCount, nil
}
// earliestAvailableSlotForCustodyIncrease determines the earliest available slot when custody is increasing.
// - Pre-fulu: use earliest block slot (no data column sharding)
// - Post-fulu checkpoint sync (storedCustodyCount == 0): use headSlot
// - Post-fulu restart with custody increase: use headSlot + 1
//
// (we don't have data columns for new custody groups at the current head)
func (s *Service) earliestAvailableSlotForCustodyIncrease(storedCustodyCount uint64) (primitives.Slot, error) {
fuluSlot, err := fuluForkSlot()
if err != nil {
return 0, errors.Wrap(err, "fulu fork slot")
}
headSlot := s.HeadSlot()
if headSlot >= fuluSlot {
if storedCustodyCount > 0 {
// Restart with custody increase: new groups only have data from headSlot + 1
return headSlot + 1, nil
}
// Checkpoint sync: we download data columns for the checkpoint block
return headSlot, nil
}
// Pre-fulu: use earliest block slot
earliestAvailableSlot, err := s.cfg.BeaconDB.EarliestSlot(s.ctx)
if err != nil {
return 0, errors.Wrap(err, "earliest slot")
}
return earliestAvailableSlot, nil
}
// computeTargetCustodyGroupCount returns the custody group count based on current flag configuration.
func computeTargetCustodyGroupCount() (uint64, error) {
isSupernode := flags.Get().Supernode
isSemiSupernode := flags.Get().SemiSupernode
cfg := params.BeaconConfig()
// Default to custody requirement
targetCustodyGroupCount := cfg.CustodyRequirement
// Supernode: custody all groups
if isSupernode {
targetCustodyGroupCount = cfg.NumberOfCustodyGroups
return cfg.NumberOfCustodyGroups, nil
}
// Semi-supernode: custody minimum needed for reconstruction, or custody requirement if higher
if isSemiSupernode {
semiSupernodeCustody, err := peerdas.MinimumCustodyGroupCountToReconstruct()
if err != nil {
return 0, 0, errors.Wrap(err, "minimum custody group count")
return 0, errors.Wrap(err, "minimum custody group count")
}
targetCustodyGroupCount = max(custodyRequirement, semiSupernodeCustody)
targetCustodyGroupCount = max(cfg.CustodyRequirement, semiSupernodeCustody)
}
// Safely compute the fulu fork slot.
fuluForkSlot, err := fuluForkSlot()
if err != nil {
return 0, 0, errors.Wrap(err, "fulu fork slot")
}
return targetCustodyGroupCount, nil
}
// If slot is before the fulu fork slot, then use the earliest stored slot as the reference slot.
if slot < fuluForkSlot {
slot, err = s.cfg.BeaconDB.EarliestSlot(s.ctx)
if err != nil {
return 0, 0, errors.Wrap(err, "earliest slot")
}
}
earliestAvailableSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(s.ctx, slot, targetCustodyGroupCount)
if err != nil {
return 0, 0, errors.Wrap(err, "update custody info")
}
// logCustodyStatus logs information about the custody configuration.
func logCustodyStatus(wasSupernode bool, actualCustodyGroupCount uint64) {
isSupernode := flags.Get().Supernode
cfg := params.BeaconConfig()
if isSupernode {
log.WithFields(logrus.Fields{
@@ -531,8 +582,6 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
if wasSupernode && !isSupernode {
log.Warningf("Because the `--%s` flag was previously used, the node will continue to act as a super node.", flags.Supernode.Name)
}
return earliestAvailableSlot, actualCustodyGroupCount, nil
}
func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) {

View File

@@ -621,37 +621,54 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
roBlock, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
fuluForkSlot := fuluForkEpoch * primitives.Slot(cfg.SlotsPerEpoch)
// Helper to set up forkchoice and head at a specific slot.
// Returns the block root so it can be used as parent for subsequent calls.
setupHead := func(t *testing.T, service *Service, slot primitives.Slot, parentRoot [32]byte) [32]byte {
fc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
blockRoot := [32]byte{byte(slot)}
headState, headBlock, err := prepareForkchoiceState(ctx, slot, blockRoot, parentRoot, params.BeaconConfig().ZeroHash, fc, fc)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, headState, headBlock))
require.NoError(t, service.setHead(&head{
root: headBlock.Root(),
block: headBlock,
state: headState,
slot: slot,
}))
return blockRoot
}
t.Run("CGC increases before fulu", func(t *testing.T) {
service, requirements := minimalTestService(t)
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// Before Fulu
// -----------
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
// Set head to pre-fulu slot
preFuluSlot := primitives.Slot(15)
setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
// Before Fulu - should use earliest stored slot
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
// Call again to ensure idempotency
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
// Enable supernode while still pre-fulu
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(19)
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, numberOfCustodyGroups, actualCgc)
// After Fulu
// ----------
actualEas, actualCgc, err = service.updateCustodyInfoInDB(fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, numberOfCustodyGroups, actualCgc)
@@ -662,35 +679,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// Before Fulu
// -----------
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
// Start pre-fulu
preFuluSlot := primitives.Slot(15)
preFuluRoot := setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
// After Fulu
// ----------
// Move to post-fulu and enable supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, preFuluRoot)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot+1, actualEas) // head + 1
require.Equal(t, numberOfCustodyGroups, actualCgc)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
// Call again - earliest available slot shouldn't change
laterSlot := postFuluSlot + 10
setupHead(t, service, laterSlot, postFuluRoot)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot+1, actualEas) // Still the original EAS
require.Equal(t, numberOfCustodyGroups, actualCgc)
})
@@ -699,16 +717,19 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Enable supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
require.Equal(t, numberOfCustodyGroups, actualCgc)
// Try to downgrade by removing flag
@@ -716,10 +737,14 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
flags.Init(gFlags)
defer flags.Init(resetFlags)
// Should still be supernode
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
// Move head forward
laterSlot := postFuluSlot + 10
setupHead(t, service, laterSlot, postFuluRoot)
// Should still be supernode (custody count shouldn't decrease)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
require.Equal(t, numberOfCustodyGroups, actualCgc) // Still 64, not downgraded
})
@@ -728,29 +753,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Enable semi-supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SemiSupernode = true
flags.Init(gFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 32
require.Equal(t, semiSupernodeCustody, actualCgc)
// Try to downgrade by removing flag
gFlags.SemiSupernode = false
flags.Init(gFlags)
defer flags.Init(resetFlags)
// UpdateCustodyInfo should prevent downgrade - custody count should remain at 64
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
// Move head forward
laterSlot := postFuluSlot + 10
setupHead(t, service, laterSlot, postFuluRoot)
// UpdateCustodyInfo should prevent downgrade
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 64 due to downgrade prevention by UpdateCustodyInfo
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 32
})
t.Run("Semi-supernode to supernode upgrade allowed", func(t *testing.T) {
@@ -758,18 +790,21 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Start with semi-supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SemiSupernode = true
flags.Init(gFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
semiSupernodeCustody := numberOfCustodyGroups / 2
require.Equal(t, semiSupernodeCustody, actualCgc)
// Upgrade to full supernode
gFlags.SemiSupernode = false
@@ -777,12 +812,15 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
flags.Init(gFlags)
defer flags.Init(resetFlags)
// Should upgrade to full supernode
upgradeSlot := slot + 2
actualEas, actualCgc, err = service.updateCustodyInfoInDB(upgradeSlot)
// Move head forward for upgrade
upgradeSlot := postFuluSlot + 10
setupHead(t, service, upgradeSlot, postFuluRoot)
// Second call - restart with custody increase
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, upgradeSlot, actualEas) // Earliest slot updates when upgrading
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 128
require.Equal(t, upgradeSlot+1, actualEas) // headSlot + 1 (restart with custody increase)
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 64
})
t.Run("Semi-supernode with high validator requirements uses higher custody", func(t *testing.T) {
@@ -790,6 +828,9 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Enable semi-supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
@@ -797,17 +838,92 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
flags.Init(gFlags)
defer flags.Init(resetFlags)
// Mock a high custody requirement (simulating many validators)
// We need to override the custody requirement calculation
// For this test, we'll verify the logic by checking if custodyRequirement > 64
// Since custodyRequirement in minimalTestService is 4, we can't test the high case here
// This would require a different test setup with actual validators
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
// With low validator requirements (4), should use semi-supernode minimum (64)
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
semiSupernodeCustody := numberOfCustodyGroups / 2
// With low validator requirements (4), should use semi-supernode minimum (32)
require.Equal(t, semiSupernodeCustody, actualCgc)
})
t.Run("Restart with higher custody uses head slot plus one", func(t *testing.T) {
// This test simulates a restart scenario where:
// 1. Node was running with lower custody count
// 2. Node stops and waits (new blocks are produced)
// 3. Node restarts with higher custody count
// In this case, we should use head + 1 as the earliest available slot.
service, requirements := minimalTestService(t)
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// First, establish existing custody in the DB (simulates previous run)
initialSlot := fuluForkSlot + 10
initialRoot := setupHead(t, service, initialSlot, params.BeaconConfig().ZeroHash)
// Initial call with default custody (simulates first run checkpoint sync)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, initialSlot, actualEas) // First time = headSlot
require.Equal(t, custodyRequirement, actualCgc)
// Verify existing custody is stored (calling with 0 reads without modifying)
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
require.NoError(t, err)
require.Equal(t, custodyRequirement, storedCount)
// Now simulate restart at a higher head slot with increased custody
headSlot := fuluForkSlot + 100
setupHead(t, service, headSlot, initialRoot)
require.Equal(t, headSlot, service.HeadSlot())
// Enable supernode (simulating restart with increased custody)
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
// The earliest available slot should be head + 1 (restart scenario)
expectedEas := headSlot + 1
require.Equal(t, expectedEas, actualEas, "Expected head + 1 for restart with custody increase")
require.Equal(t, numberOfCustodyGroups, actualCgc)
})
t.Run("Checkpoint sync uses head slot without plus one", func(t *testing.T) {
// This test verifies checkpoint sync behavior:
// When syncing from a checkpoint with no existing custody info,
// the earliest available slot should be the checkpoint slot itself
// (not +1), because we download the checkpoint block's data columns.
service, requirements := minimalTestService(t)
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// Set up head at checkpoint slot (simulates checkpoint sync)
checkpointSlot := fuluForkSlot + 50
setupHead(t, service, checkpointSlot, params.BeaconConfig().ZeroHash)
require.Equal(t, checkpointSlot, service.HeadSlot())
// Verify no existing custody in DB (calling with 0 reads without modifying)
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
require.NoError(t, err)
require.Equal(t, uint64(0), storedCount)
// Enable supernode for checkpoint sync
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
// For checkpoint sync, EAS should be the checkpoint slot (not +1)
require.Equal(t, checkpointSlot, actualEas, "Checkpoint sync should use headSlot, not headSlot+1")
require.Equal(t, numberOfCustodyGroups, actualCgc)
})
}

View File

@@ -128,6 +128,7 @@ type NoHeadAccessDatabase interface {
BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error
// Custody operations.
CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error)
UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error)
UpdateEarliestAvailableSlot(ctx context.Context, earliestAvailableSlot primitives.Slot) error
UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error)

View File

@@ -14,9 +14,38 @@ import (
bolt "go.etcd.io/bbolt"
)
// CustodyInfo returns the stored earliest available slot and custody group count.
func (s *Store) CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.CustodyInfo")
defer span.End()
var storedGroupCount uint64
var storedEarliestAvailableSlot primitives.Slot
if err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(custodyBucket)
if bucket == nil {
return nil
}
if b := bucket.Get(groupCountKey); len(b) != 0 {
storedGroupCount = bytesutil.BytesToUint64BigEndian(b)
}
if b := bucket.Get(earliestAvailableSlotKey); len(b) != 0 {
storedEarliestAvailableSlot = primitives.Slot(bytesutil.BytesToUint64BigEndian(b))
}
return nil
}); err != nil {
return 0, 0, err
}
return storedEarliestAvailableSlot, storedGroupCount, nil
}
// UpdateCustodyInfo atomically updates the custody group count only if it is greater than the stored one.
// In this case, it also updates the earliest available slot with the provided value.
// It returns the (potentially updated) custody group count and earliest available slot.
// The earliest available slot is only updated if the new value is higher than the stored one,
// preventing loss of availability for data we already have (e.g., when switching from normal to supernode).
// It returns the (potentially updated) earliest available slot and custody group count.
func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.UpdateCustodyInfo")
defer span.End()
@@ -46,20 +75,23 @@ func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot pri
return nil
}
storedGroupCount, storedEarliestAvailableSlot = custodyGroupCount, earliestAvailableSlot
// Store the earliest available slot.
bytes := bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
return errors.Wrap(err, "put earliest available slot")
}
// Store the custody group count.
bytes = bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
// Update the custody group count.
storedGroupCount = custodyGroupCount
bytes := bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
if err := bucket.Put(groupCountKey, bytes); err != nil {
return errors.Wrap(err, "put custody group count")
}
// Only update earliestAvailableSlot if the new value is higher.
// This prevents losing availability for data we already have.
if earliestAvailableSlot > storedEarliestAvailableSlot {
storedEarliestAvailableSlot = earliestAvailableSlot
bytes = bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
return errors.Wrap(err, "put earliest available slot")
}
}
return nil
}); err != nil {
return 0, 0, err

View File

@@ -89,7 +89,7 @@ func TestUpdateCustodyInfo(t *testing.T) {
require.Equal(t, groupCount, storedCount)
})
t.Run("update with higher group count", func(t *testing.T) {
t.Run("update with higher group count and higher slot", func(t *testing.T) {
const (
initialSlot = primitives.Slot(100)
initialCount = uint64(5)
@@ -112,6 +112,150 @@ func TestUpdateCustodyInfo(t *testing.T) {
require.Equal(t, groupCount, storedCount)
})
t.Run("update with higher group count and lower slot should preserve higher slot", func(t *testing.T) {
// This is the bug scenario: when switching from normal mode to semi-supernode,
// the incoming slot might be lower than the stored slot, but we should preserve
// the higher stored slot to avoid advertising that we can serve data we don't have.
const (
initialSlot = primitives.Slot(1835523) // Higher stored slot
initialCount = uint64(10)
earliestSlot = primitives.Slot(1835456) // Lower incoming slot (e.g., from head slot)
groupCount = uint64(64) // Increasing custody (e.g., semi-supernode)
)
db := setupDB(t)
_, _, err := db.UpdateCustodyInfo(ctx, initialSlot, initialCount)
require.NoError(t, err)
// When custody count increases but slot is lower, the higher slot should be preserved
slot, count, err := db.UpdateCustodyInfo(ctx, earliestSlot, groupCount)
require.NoError(t, err)
require.Equal(t, initialSlot, slot, "earliestAvailableSlot should not decrease when custody group count increases")
require.Equal(t, groupCount, count)
// Verify in the database
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
require.Equal(t, initialSlot, storedSlot, "stored slot should be the higher value")
require.Equal(t, groupCount, storedCount)
})
t.Run("pre-fulu scenario: checkpoint sync before fork, restart with semi-supernode", func(t *testing.T) {
// This test covers the pre-Fulu bug scenario:
// 1. Node starts with checkpoint sync BEFORE Fulu fork - uses EarliestSlot() (checkpoint block slot)
// 2. Validators connect after Fulu activates - maintainCustodyInfo() updates to head slot (higher)
// 3. Node restarts with --semi-supernode - updateCustodyInfoInDB uses EarliestSlot() again
// The bug was that step 3 would overwrite the higher slot from step 2.
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 100
params.OverrideBeaconConfig(cfg)
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
require.NoError(t, err)
// Derive slot values relative to Fulu fork
checkpointBlockSlot := fuluForkSlot - 10 // Checkpoint sync happened before Fulu
headSlot := fuluForkSlot + 5 // Head slot after Fulu activates
defaultCustody := cfg.CustodyRequirement // Default custody from config
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
// Verify our test setup: checkpoint is pre-Fulu, head is post-Fulu
require.Equal(t, true, checkpointBlockSlot < fuluForkSlot, "checkpoint must be before Fulu fork")
require.Equal(t, true, headSlot >= fuluForkSlot, "head must be at or after Fulu fork")
db := setupDB(t)
// Step 1: Node starts with checkpoint sync (pre-Fulu)
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot()
slot, count, err := db.UpdateCustodyInfo(ctx, checkpointBlockSlot, defaultCustody)
require.NoError(t, err)
require.Equal(t, checkpointBlockSlot, slot)
require.Equal(t, defaultCustody, count)
// Step 2: Validators connect after Fulu activates, maintainCustodyInfo() runs
// Uses headSlot which is higher than checkpointBlockSlot
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot, "should update to head slot")
require.Equal(t, validatorCustody, count)
// Verify step 2 stored correctly
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
require.Equal(t, headSlot, storedSlot)
require.Equal(t, validatorCustody, storedCount)
// Step 3: Restart with --semi-supernode
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot() again
slot, count, err = db.UpdateCustodyInfo(ctx, checkpointBlockSlot, semiSupernodeCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease back to checkpoint slot")
require.Equal(t, semiSupernodeCustody, count)
// Verify the database preserved the higher slot
storedSlot, storedCount = getCustodyInfoFromDB(t, db)
require.Equal(t, headSlot, storedSlot, "stored slot should remain at head slot, not checkpoint slot")
require.Equal(t, semiSupernodeCustody, storedCount)
})
t.Run("post-fulu scenario: finalized slot lower than stored head slot", func(t *testing.T) {
// This test covers the post-Fulu bug scenario:
// Post-fork, updateCustodyInfoInDB uses saved.Slot() (finalized slot) directly,
// not EarliestSlot(). But the same bug can occur because:
// - maintainCustodyInfo() stores headSlot (higher)
// - Restart uses finalized slot (lower than head)
// Our fix ensures earliestAvailableSlot never decreases.
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 100
params.OverrideBeaconConfig(cfg)
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
require.NoError(t, err)
// Derive slot values relative to Fulu fork - all slots are AFTER Fulu
finalizedSlotAtStart := fuluForkSlot + 100 // Finalized slot at first start (post-Fulu)
headSlot := fuluForkSlot + 200 // Head slot when validators connect
finalizedSlotRestart := fuluForkSlot + 150 // Finalized slot at restart (< headSlot)
defaultCustody := cfg.CustodyRequirement // Default custody from config
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
// Verify our test setup: all slots are post-Fulu
require.Equal(t, true, finalizedSlotAtStart >= fuluForkSlot, "finalized slot must be at or after Fulu fork")
require.Equal(t, true, headSlot >= fuluForkSlot, "head slot must be at or after Fulu fork")
require.Equal(t, true, finalizedSlotRestart >= fuluForkSlot, "restart finalized slot must be at or after Fulu fork")
require.Equal(t, true, finalizedSlotRestart < headSlot, "restart finalized slot must be less than head slot")
db := setupDB(t)
// Step 1: Node starts post-Fulu
// updateCustodyInfoInDB sees saved.Slot() >= fuluForkSlot, so uses saved.Slot() directly
slot, count, err := db.UpdateCustodyInfo(ctx, finalizedSlotAtStart, defaultCustody)
require.NoError(t, err)
require.Equal(t, finalizedSlotAtStart, slot)
require.Equal(t, defaultCustody, count)
// Step 2: Validators connect, maintainCustodyInfo() uses head slot
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot)
require.Equal(t, validatorCustody, count)
// Step 3: Restart with --semi-supernode
// updateCustodyInfoInDB uses finalized slot which is lower than stored head slot
slot, count, err = db.UpdateCustodyInfo(ctx, finalizedSlotRestart, semiSupernodeCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease to finalized slot")
require.Equal(t, semiSupernodeCustody, count)
// Verify database preserved the higher slot
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
require.Equal(t, headSlot, storedSlot)
require.Equal(t, semiSupernodeCustody, storedCount)
})
t.Run("update with lower group count should not update", func(t *testing.T) {
const (
initialSlot = primitives.Slot(200)

View File

@@ -75,7 +75,6 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
p2p := p2pTesting.NewTestP2P(t)
lcStore := NewLightClientStore(p2p, new(event.Feed), testDB.SetupDB(t))
timeForGoroutinesToFinish := 20 * time.Microsecond
// update 0 with basic data and no supermajority following an empty lastFinalityUpdate - should save and broadcast
l0 := util.NewTestLightClient(t, version.Altair)
update0, err := NewLightClientFinalityUpdateFromBeaconState(l0.Ctx, l0.State, l0.Block, l0.AttestedState, l0.AttestedBlock, l0.FinalizedBlock)
@@ -87,8 +86,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update0, true)
require.Equal(t, update0, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update when previous is nil")
require.Eventually(t, func() bool {
return p2p.BroadcastCalled.Load()
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update when previous is nil")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 1 with same finality slot, increased attested slot, and no supermajority - should save but not broadcast
@@ -102,7 +102,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update1, true)
require.Equal(t, update1, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called after setting a new last finality update without supermajority")
p2p.BroadcastCalled.Store(false) // Reset for next test
@@ -117,8 +117,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update2, true)
require.Equal(t, update2, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update with supermajority")
require.Eventually(t, func() bool {
return p2p.BroadcastCalled.Load()
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update with supermajority")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 3 with same finality slot, increased attested slot, and supermajority - should save but not broadcast
@@ -132,7 +133,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update3, true)
require.Equal(t, update3, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been when previous was already broadcast")
// update 4 with increased finality slot, increased attested slot, and supermajority - should save and broadcast
@@ -146,8 +147,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update4, true)
require.Equal(t, update4, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after a new finality update with increased finality slot")
require.Eventually(t, func() bool {
return p2p.BroadcastCalled.Load()
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after a new finality update with increased finality slot")
p2p.BroadcastCalled.Store(false) // Reset for next test
// update 5 with the same new finality slot, increased attested slot, and supermajority - should save but not broadcast
@@ -161,7 +163,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update5, true)
require.Equal(t, update5, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
// update 6 with the same new finality slot, increased attested slot, and no supermajority - should save but not broadcast
@@ -175,7 +177,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
lcStore.SetLastFinalityUpdate(update6, true)
require.Equal(t, update6, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
}

View File

@@ -72,7 +72,10 @@ func TestService_Broadcast(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -186,7 +189,10 @@ func TestService_BroadcastAttestation(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -375,7 +381,15 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
_, err = tpHandle.Subscribe()
require.NoError(t, err)
time.Sleep(500 * time.Millisecond) // libp2p fails without this delay...
// This test specifically tests discovery-based peer finding, which requires
// time for nodes to discover each other. Using a fixed sleep here is intentional
// as we're testing the discovery timing behavior.
time.Sleep(500 * time.Millisecond)
// Verify mesh establishment after discovery
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0 && len(p2.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
nodePeers := p.pubsub.ListPeers(topic)
nodePeers2 := p2.pubsub.ListPeers(topic)
@@ -444,7 +458,10 @@ func TestService_BroadcastSyncCommittee(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -521,7 +538,10 @@ func TestService_BroadcastBlob(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -584,7 +604,10 @@ func TestService_BroadcastLightClientOptimisticUpdate(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -660,7 +683,10 @@ func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(p.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
@@ -771,8 +797,10 @@ func TestService_BroadcastDataColumn(t *testing.T) {
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
// libp2p fails without this delay
time.Sleep(50 * time.Millisecond)
// Wait for libp2p mesh to establish
require.Eventually(t, func() bool {
return len(service.pubsub.ListPeers(topic)) > 0
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})

View File

@@ -482,12 +482,12 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) {
s.Start()
<-exitRoutine
}()
time.Sleep(50 * time.Millisecond)
time.Sleep(50 * time.Millisecond) // Wait for service initialization
var vr [32]byte
require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr)))
time.Sleep(4 * time.Second)
ps := s.host.Network().Peers()
assert.Equal(t, 5, len(ps), "Not all peers added to peerstore")
require.Eventually(t, func() bool {
return len(s.host.Network().Peers()) == 5
}, 10*time.Second, 100*time.Millisecond, "Not all peers added to peerstore")
require.NoError(t, s.Stop())
exitRoutine <- true
}

View File

@@ -80,8 +80,9 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
}()
var vr [32]byte
require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr)))
time.Sleep(time.Second * 2)
assert.Equal(t, true, s.started, "Expected service to be started")
require.Eventually(t, func() bool {
return s.started
}, 5*time.Second, 100*time.Millisecond, "Expected service to be started")
s.Start()
require.LogsContain(t, hook, "Attempted to start p2p service when it was already started")
require.NoError(t, s.Stop())
@@ -260,17 +261,9 @@ func TestListenForNewNodes(t *testing.T) {
err = cs.SetClock(startup.NewClock(genesisTime, gvr))
require.NoError(t, err, "Could not set clock in service")
actualPeerCount := len(s.host.Network().Peers())
for range 40 {
if actualPeerCount == peerCount {
break
}
time.Sleep(100 * time.Millisecond)
actualPeerCount = len(s.host.Network().Peers())
}
assert.Equal(t, peerCount, actualPeerCount, "Not all peers added to peerstore")
require.Eventually(t, func() bool {
return len(s.host.Network().Peers()) == peerCount
}, 5*time.Second, 100*time.Millisecond, "Not all peers added to peerstore")
err = s.Stop()
require.NoError(t, err, "Failed to stop service")

View File

@@ -657,8 +657,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 1
}, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool")
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
@@ -677,8 +678,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 2
}, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool")
})
t.Run("phase0 att post electra", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
@@ -798,8 +800,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 1
}, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool")
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
@@ -818,8 +821,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
require.Eventually(t, func() bool {
return s.AttestationsPool.UnaggregatedAttestationCount() == 2
}, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool")
})
t.Run("no body", func(t *testing.T) {
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
@@ -1375,9 +1379,9 @@ func TestSubmitSignedBLSToExecutionChanges_Ok(t *testing.T) {
writer.Body = &bytes.Buffer{}
s.SubmitBLSToExecutionChanges(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
time.Sleep(100 * time.Millisecond) // Delay to let the routine start
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages))
require.Eventually(t, func() bool {
return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages) == numValidators
}, time.Second, 10*time.Millisecond, "Broadcast should be called with all messages")
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
require.Equal(t, len(poolChanges), len(signedChanges))
@@ -1591,10 +1595,10 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) {
s.SubmitBLSToExecutionChanges(writer, request)
assert.Equal(t, http.StatusBadRequest, writer.Code)
time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start
require.StringContains(t, "One or more messages failed validation", writer.Body.String())
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1)
require.Eventually(t, func() bool {
return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages)+1 == numValidators
}, time.Second, 10*time.Millisecond, "Broadcast should be called with expected messages")
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
require.Equal(t, len(poolChanges)+1, len(signedChanges))

View File

@@ -70,7 +70,6 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
topic := "/eth2/%x/beacon_block"
go r.startDiscoveryAndSubscriptions()
time.Sleep(100 * time.Millisecond)
var vr [32]byte
require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr)))
@@ -83,9 +82,11 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
msg.Block.ParentRoot = util.Random32Bytes(t)
msg.Signature = sk.Sign([]byte("data")).Marshal()
p2p.ReceivePubSub(topic, msg)
// wait for chainstart to be sent
time.Sleep(400 * time.Millisecond)
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
// Wait for chainstart event to be processed
require.Eventually(t, func() bool {
return r.chainStarted.IsSet()
}, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event.")
}
func TestSyncHandlers_WaitForChainStart(t *testing.T) {
@@ -217,20 +218,18 @@ func TestSyncService_StopCleanly(t *testing.T) {
p2p.Digest, err = r.currentForkDigest()
require.NoError(t, err)
// wait for chainstart to be sent
time.Sleep(2 * time.Second)
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
require.NotEqual(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
require.NotEqual(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
// Wait for chainstart and topics to be registered
require.Eventually(t, func() bool {
return r.chainStarted.IsSet() && len(r.cfg.p2p.PubSub().GetTopics()) > 0 && len(r.cfg.p2p.Host().Mux().Protocols()) > 0
}, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event or topics not registered.")
// Both pubsub and rpc topics should be unsubscribed.
require.NoError(t, r.Stop())
// Sleep to allow pubsub topics to be deregistered.
time.Sleep(1 * time.Second)
require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
// Wait for pubsub topics to be deregistered.
require.Eventually(t, func() bool {
return len(r.cfg.p2p.PubSub().GetTopics()) == 0 && len(r.cfg.p2p.Host().Mux().Protocols()) == 0
}, 5*time.Second, 50*time.Millisecond, "Pubsub topics were not deregistered")
}
func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {

View File

@@ -614,11 +614,10 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
},
}
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers.
if res, err := r.validateAggregateAndProof(t.Context(), "", msg); res == pubsub.ValidationAccept {
_ = err
t.Fatal("Validated status is true")
}
require.Eventually(t, func() bool {
res, _ := r.validateAggregateAndProof(t.Context(), "", msg)
return res != pubsub.ValidationAccept
}, time.Second, 10*time.Millisecond, "Expected validation to reject duplicate aggregate")
}
func TestValidateAggregateAndProof_BadBlock(t *testing.T) {

View File

@@ -992,7 +992,6 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
// Mark the proposer/slot as seen
r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex)
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers
// Prepare and validate the second message (clone)
buf := new(bytes.Buffer)
@@ -1010,9 +1009,11 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
}
// Since this is not an equivocation (same signature), it should be ignored
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
assert.NoError(t, err)
assert.Equal(t, pubsub.ValidationIgnore, res, "block with same signature should be ignored")
// Wait for the cached value to propagate through buffers
require.Eventually(t, func() bool {
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
return err == nil && res == pubsub.ValidationIgnore
}, time.Second, 10*time.Millisecond, "block with same signature should be ignored")
// Verify no slashings were created
assert.Equal(t, 0, len(slashingPool.PendingPropSlashings), "Expected no slashings for same signature")

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixes earliest slot should never go backwards when setting semi-supernode or supernode flags

View File

@@ -0,0 +1,3 @@
### Changed
- changed IsHealthy check to IsReady for validator client's interpretation from /eth/v1/node/health, 206 will now return false as the node is syncing.

View File

@@ -0,0 +1,3 @@
### Fixed
- Don't call trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize) twice.

View File

@@ -0,0 +1,3 @@
### Changed
- Replaced `time.Sleep` with `require.Eventually` polling in tests to fix flaky behavior caused by race conditions between goroutines and assertions.

View File

@@ -45,7 +45,6 @@ func Setup(ctx context.Context, serviceName, processName, endpoint string, sampl
exporter,
trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize),
trace.WithBatchTimeout(trace.DefaultScheduleDelay*time.Millisecond),
trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize),
),
trace.WithResource(
resource.NewWithAttributes(

View File

@@ -8,6 +8,7 @@ import (
"runtime"
"sort"
"strings"
"time"
"github.com/OffchainLabs/prysm/v7/encoding/ssz/equality"
"github.com/d4l3k/messagediff"
@@ -138,12 +139,21 @@ func StringContains(loggerFn assertionLoggerFn, expected, actual string, flag bo
// NoError asserts that error is nil.
func NoError(loggerFn assertionLoggerFn, err error, msg ...any) {
// reflect.ValueOf is needed for nil instances of custom types implementing Error
if err != nil && !reflect.ValueOf(err).IsNil() {
errMsg := parseMsg("Unexpected error", msg...)
_, file, line, _ := runtime.Caller(2)
loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err)
if err == nil {
return
}
// reflect.ValueOf is needed for nil instances of custom types implementing Error.
// Only check IsNil for types that support it to avoid panics on struct types.
v := reflect.ValueOf(err)
switch v.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice, reflect.UnsafePointer:
if v.IsNil() {
return
}
}
errMsg := parseMsg("Unexpected error", msg...)
_, file, line, _ := runtime.Caller(2)
loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err)
}
// ErrorIs uses Errors.Is to recursively unwrap err looking for target in the chain.
@@ -341,3 +351,18 @@ func (tb *TBMock) Errorf(format string, args ...any) {
func (tb *TBMock) Fatalf(format string, args ...any) {
tb.FatalfMsg = fmt.Sprintf(format, args...)
}
// Eventually asserts that given condition will be met within waitFor time,
// periodically checking target function each tick.
func Eventually(loggerFn assertionLoggerFn, condition func() bool, waitFor, tick time.Duration, msg ...any) {
deadline := time.Now().Add(waitFor)
for time.Now().Before(deadline) {
if condition() {
return
}
time.Sleep(tick)
}
errMsg := parseMsg("Condition never satisfied", msg...)
_, file, line, _ := runtime.Caller(2)
loggerFn("%s:%d %s (waited %v)", filepath.Base(file), line, errMsg, waitFor)
}

View File

@@ -1,6 +1,8 @@
package require
import (
"time"
"github.com/OffchainLabs/prysm/v7/testing/assertions"
"github.com/sirupsen/logrus/hooks/test"
)
@@ -87,3 +89,9 @@ func ErrorIs(tb assertions.AssertionTestingTB, err, target error, msg ...any) {
func StringContains(tb assertions.AssertionTestingTB, expected, actual string, msg ...any) {
assertions.StringContains(tb.Fatalf, expected, actual, true, msg)
}
// Eventually asserts that given condition will be met within waitFor time,
// periodically checking target function each tick.
func Eventually(tb assertions.AssertionTestingTB, condition func() bool, waitFor, tick time.Duration, msg ...any) {
assertions.Eventually(tb.Fatalf, condition, waitFor, tick, msg...)
}

View File

@@ -56,18 +56,18 @@ func (mr *MockNodeClientMockRecorder) Genesis(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Genesis", reflect.TypeOf((*MockNodeClient)(nil).Genesis), arg0, arg1)
}
// IsHealthy mocks base method.
func (m *MockNodeClient) IsHealthy(arg0 context.Context) bool {
// IsReady mocks base method.
func (m *MockNodeClient) IsReady(arg0 context.Context) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsHealthy", arg0)
ret := m.ctrl.Call(m, "IsReady", arg0)
ret0, _ := ret[0].(bool)
return ret0
}
// IsHealthy indicates an expected call of IsHealthy.
func (mr *MockNodeClientMockRecorder) IsHealthy(arg0 any) *gomock.Call {
// IsReady indicates an expected call of IsReady.
func (mr *MockNodeClientMockRecorder) IsReady(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockNodeClient)(nil).IsHealthy), arg0)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockNodeClient)(nil).IsReady), arg0)
}
// Peers mocks base method.

View File

@@ -2,6 +2,7 @@ package beacon_api
import (
"context"
"net/http"
"strconv"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
@@ -101,12 +102,17 @@ func (c *beaconApiNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethp
return nil, errors.New("beaconApiNodeClient.Peers is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiNodeClientWithFallback.")
}
func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool {
if err := c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil); err != nil {
// IsReady returns true only if the node is fully synced (200 OK).
// A 206 Partial Content response indicates the node is syncing and not ready.
func (c *beaconApiNodeClient) IsReady(ctx context.Context) bool {
statusCode, err := c.jsonRestHandler.GetStatusCode(ctx, "/eth/v1/node/health")
if err != nil {
log.WithError(err).Error("failed to get health of node")
return false
}
return true
// Only 200 OK means the node is fully synced and ready.
// 206 Partial Content means syncing, 503 means unavailable.
return statusCode == http.StatusOK
}
func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.NodeClient) iface.NodeClient {

View File

@@ -2,6 +2,7 @@ package beacon_api
import (
"errors"
"net/http"
"testing"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
@@ -287,3 +288,59 @@ func TestGetVersion(t *testing.T) {
})
}
}
func TestIsReady(t *testing.T) {
const healthEndpoint = "/eth/v1/node/health"
testCases := []struct {
name string
statusCode int
err error
expectedResult bool
}{
{
name: "returns true for 200 OK (fully synced)",
statusCode: http.StatusOK,
expectedResult: true,
},
{
name: "returns false for 206 Partial Content (syncing)",
statusCode: http.StatusPartialContent,
expectedResult: false,
},
{
name: "returns false for 503 Service Unavailable",
statusCode: http.StatusServiceUnavailable,
expectedResult: false,
},
{
name: "returns false for 500 Internal Server Error",
statusCode: http.StatusInternalServerError,
expectedResult: false,
},
{
name: "returns false on error",
err: errors.New("request failed"),
expectedResult: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := t.Context()
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
jsonRestHandler.EXPECT().GetStatusCode(
gomock.Any(),
healthEndpoint,
).Return(tc.statusCode, tc.err)
nodeClient := &beaconApiNodeClient{jsonRestHandler: jsonRestHandler}
result := nodeClient.IsReady(ctx)
assert.Equal(t, tc.expectedResult, result)
})
}
}

View File

@@ -1,9 +1,9 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: validator/client/beacon-api/json_rest_handler.go
// Source: validator/client/beacon-api/rest_handler_client.go
//
// Generated by this command:
//
// mockgen -package=mock -source=validator/client/beacon-api/json_rest_handler.go -destination=validator/client/beacon-api/mock/json_rest_handler_mock.go
// mockgen -package=mock -source=validator/client/beacon-api/rest_handler_client.go -destination=validator/client/beacon-api/mock/json_rest_handler_mock.go RestHandler
//
// Package mock is a generated GoMock package.
@@ -18,32 +18,37 @@ import (
gomock "go.uber.org/mock/gomock"
)
// MockJsonRestHandler is a mock of JsonRestHandler interface.
type MockJsonRestHandler struct {
// Backward compatibility aliases for the renamed mock type.
type MockJsonRestHandler = MockRestHandler
type MockJsonRestHandlerMockRecorder = MockRestHandlerMockRecorder
var NewMockJsonRestHandler = NewMockRestHandler
// MockRestHandler is a mock of RestHandler interface.
type MockRestHandler struct {
ctrl *gomock.Controller
recorder *MockJsonRestHandlerMockRecorder
isgomock struct{}
recorder *MockRestHandlerMockRecorder
}
// MockJsonRestHandlerMockRecorder is the mock recorder for MockJsonRestHandler.
type MockJsonRestHandlerMockRecorder struct {
mock *MockJsonRestHandler
// MockRestHandlerMockRecorder is the mock recorder for MockRestHandler.
type MockRestHandlerMockRecorder struct {
mock *MockRestHandler
}
// NewMockJsonRestHandler creates a new mock instance.
func NewMockJsonRestHandler(ctrl *gomock.Controller) *MockJsonRestHandler {
mock := &MockJsonRestHandler{ctrl: ctrl}
mock.recorder = &MockJsonRestHandlerMockRecorder{mock}
// NewMockRestHandler creates a new mock instance.
func NewMockRestHandler(ctrl *gomock.Controller) *MockRestHandler {
mock := &MockRestHandler{ctrl: ctrl}
mock.recorder = &MockRestHandlerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockJsonRestHandler) EXPECT() *MockJsonRestHandlerMockRecorder {
func (m *MockRestHandler) EXPECT() *MockRestHandlerMockRecorder {
return m.recorder
}
// Get mocks base method.
func (m *MockJsonRestHandler) Get(ctx context.Context, endpoint string, resp any) error {
func (m *MockRestHandler) Get(ctx context.Context, endpoint string, resp any) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get", ctx, endpoint, resp)
ret0, _ := ret[0].(error)
@@ -51,13 +56,13 @@ func (m *MockJsonRestHandler) Get(ctx context.Context, endpoint string, resp any
}
// Get indicates an expected call of Get.
func (mr *MockJsonRestHandlerMockRecorder) Get(ctx, endpoint, resp any) *gomock.Call {
func (mr *MockRestHandlerMockRecorder) Get(ctx, endpoint, resp any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockJsonRestHandler)(nil).Get), ctx, endpoint, resp)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockRestHandler)(nil).Get), ctx, endpoint, resp)
}
// GetSSZ mocks base method.
func (m *MockJsonRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
func (m *MockRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetSSZ", ctx, endpoint)
ret0, _ := ret[0].([]byte)
@@ -67,13 +72,28 @@ func (m *MockJsonRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]by
}
// GetSSZ indicates an expected call of GetSSZ.
func (mr *MockJsonRestHandlerMockRecorder) GetSSZ(ctx, endpoint any) *gomock.Call {
func (mr *MockRestHandlerMockRecorder) GetSSZ(ctx, endpoint any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSSZ", reflect.TypeOf((*MockJsonRestHandler)(nil).GetSSZ), ctx, endpoint)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSSZ", reflect.TypeOf((*MockRestHandler)(nil).GetSSZ), ctx, endpoint)
}
// GetStatusCode mocks base method.
func (m *MockRestHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetStatusCode", ctx, endpoint)
ret0, _ := ret[0].(int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetStatusCode indicates an expected call of GetStatusCode.
func (mr *MockRestHandlerMockRecorder) GetStatusCode(ctx, endpoint any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStatusCode", reflect.TypeOf((*MockRestHandler)(nil).GetStatusCode), ctx, endpoint)
}
// Host mocks base method.
func (m *MockJsonRestHandler) Host() string {
func (m *MockRestHandler) Host() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Host")
ret0, _ := ret[0].(string)
@@ -81,13 +101,13 @@ func (m *MockJsonRestHandler) Host() string {
}
// Host indicates an expected call of Host.
func (mr *MockJsonRestHandlerMockRecorder) Host() *gomock.Call {
func (mr *MockRestHandlerMockRecorder) Host() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Host", reflect.TypeOf((*MockJsonRestHandler)(nil).Host))
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Host", reflect.TypeOf((*MockRestHandler)(nil).Host))
}
// HttpClient mocks base method.
func (m *MockJsonRestHandler) HttpClient() *http.Client {
func (m *MockRestHandler) HttpClient() *http.Client {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HttpClient")
ret0, _ := ret[0].(*http.Client)
@@ -95,13 +115,13 @@ func (m *MockJsonRestHandler) HttpClient() *http.Client {
}
// HttpClient indicates an expected call of HttpClient.
func (mr *MockJsonRestHandlerMockRecorder) HttpClient() *gomock.Call {
func (mr *MockRestHandlerMockRecorder) HttpClient() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HttpClient", reflect.TypeOf((*MockJsonRestHandler)(nil).HttpClient))
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HttpClient", reflect.TypeOf((*MockRestHandler)(nil).HttpClient))
}
// Post mocks base method.
func (m *MockJsonRestHandler) Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error {
func (m *MockRestHandler) Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Post", ctx, endpoint, headers, data, resp)
ret0, _ := ret[0].(error)
@@ -109,13 +129,13 @@ func (m *MockJsonRestHandler) Post(ctx context.Context, endpoint string, headers
}
// Post indicates an expected call of Post.
func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, resp any) *gomock.Call {
func (mr *MockRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, resp any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
}
// Post mocks base method.
func (m *MockJsonRestHandler) PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error) {
// PostSSZ mocks base method.
func (m *MockRestHandler) PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PostSSZ", ctx, endpoint, headers, data)
ret0, _ := ret[0].([]byte)
@@ -124,20 +144,20 @@ func (m *MockJsonRestHandler) PostSSZ(ctx context.Context, endpoint string, head
return ret0, ret1, ret2
}
// Post indicates an expected call of Post.
func (mr *MockJsonRestHandlerMockRecorder) PostSSZ(ctx, endpoint, headers, data any) *gomock.Call {
// PostSSZ indicates an expected call of PostSSZ.
func (mr *MockRestHandlerMockRecorder) PostSSZ(ctx, endpoint, headers, data any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostSSZ", reflect.TypeOf((*MockJsonRestHandler)(nil).PostSSZ), ctx, endpoint, headers, data)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostSSZ", reflect.TypeOf((*MockRestHandler)(nil).PostSSZ), ctx, endpoint, headers, data)
}
// SetHost mocks base method.
func (m *MockJsonRestHandler) SetHost(host string) {
func (m *MockRestHandler) SetHost(host string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetHost", host)
}
// SetHost indicates an expected call of SetHost.
func (mr *MockJsonRestHandlerMockRecorder) SetHost(host any) *gomock.Call {
func (mr *MockRestHandlerMockRecorder) SetHost(host any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockJsonRestHandler)(nil).SetHost), host)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockRestHandler)(nil).SetHost), host)
}

View File

@@ -23,6 +23,7 @@ type reqOption func(*http.Request)
type RestHandler interface {
Get(ctx context.Context, endpoint string, resp any) error
GetStatusCode(ctx context.Context, endpoint string) (int, error)
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
@@ -90,6 +91,28 @@ func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp an
return decodeResp(httpResp, resp)
}
// GetStatusCode sends a GET request and returns only the HTTP status code.
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
func (c *BeaconApiRestHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return 0, errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
req.Header.Set("User-Agent", version.BuildData())
httpResp, err := c.client.Do(req)
if err != nil {
return 0, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
}
defer func() {
if err := httpResp.Body.Close(); err != nil {
return
}
}()
return httpResp.StatusCode, nil
}
func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)

View File

@@ -359,3 +359,66 @@ func Test_decodeResp(t *testing.T) {
assert.ErrorContains(t, "HTTP request unsuccessful (500: foo)", err)
})
}
func TestGetStatusCode(t *testing.T) {
ctx := t.Context()
const endpoint = "/eth/v1/node/health"
testCases := []struct {
name string
serverStatusCode int
expectedStatusCode int
}{
{
name: "returns 200 OK",
serverStatusCode: http.StatusOK,
expectedStatusCode: http.StatusOK,
},
{
name: "returns 206 Partial Content",
serverStatusCode: http.StatusPartialContent,
expectedStatusCode: http.StatusPartialContent,
},
{
name: "returns 503 Service Unavailable",
serverStatusCode: http.StatusServiceUnavailable,
expectedStatusCode: http.StatusServiceUnavailable,
},
{
name: "returns 500 Internal Server Error",
serverStatusCode: http.StatusInternalServerError,
expectedStatusCode: http.StatusInternalServerError,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, version.BuildData(), r.Header.Get("User-Agent"))
w.WriteHeader(tc.serverStatusCode)
})
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
statusCode, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
require.NoError(t, err)
assert.Equal(t, tc.expectedStatusCode, statusCode)
})
}
t.Run("returns error on connection failure", func(t *testing.T) {
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Millisecond * 100},
host: "http://localhost:99999", // Invalid port
}
_, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
require.ErrorContains(t, "failed to perform request", err)
})
}

View File

@@ -33,7 +33,7 @@ func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Pee
return c.nodeClient.ListPeers(ctx, in)
}
func (c *grpcNodeClient) IsHealthy(ctx context.Context) bool {
func (c *grpcNodeClient) IsReady(ctx context.Context) bool {
_, err := c.nodeClient.GetHealth(ctx, &ethpb.HealthRequest{})
if err != nil {
log.WithError(err).Error("Failed to get health of node")

View File

@@ -12,5 +12,5 @@ type NodeClient interface {
Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error)
Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error)
Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error)
IsHealthy(ctx context.Context) bool
IsReady(ctx context.Context) bool
}

View File

@@ -1274,7 +1274,7 @@ func (v *validator) FindHealthyHost(ctx context.Context) bool {
// Tail-recursive closure keeps retry count private.
var check func(remaining int) bool
check = func(remaining int) bool {
if v.nodeClient.IsHealthy(ctx) { // healthy → done
if v.nodeClient.IsReady(ctx) { // ready → done
return true
}
if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi {