mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
13 Commits
5fb96df47e
...
fix-backwa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93b2445d83 | ||
|
|
fe5d67496a | ||
|
|
a7eb4add27 | ||
|
|
0f0fced955 | ||
|
|
56477f944f | ||
|
|
1a6252ade4 | ||
|
|
27c009e7ff | ||
|
|
ffad861e2c | ||
|
|
5069057354 | ||
|
|
30340f632c | ||
|
|
6bfbbdbd1b | ||
|
|
d30c1cbb8a | ||
|
|
e9c7a62b97 |
@@ -17,6 +17,7 @@ import (
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
|
||||
ethpbv1 "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
|
||||
@@ -130,12 +131,10 @@ func TestService_ReceiveBlock(t *testing.T) {
|
||||
block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/),
|
||||
},
|
||||
check: func(t *testing.T, s *Service) {
|
||||
// Hacky sleep, should use a better way to be able to resolve the race
|
||||
// between event being sent out and processed.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
|
||||
t.Errorf("Received %d state notifications, expected at least 1", recvd)
|
||||
}
|
||||
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
|
||||
require.Eventually(t, func() bool {
|
||||
return len(notifier.ReceivedEvents()) >= 1
|
||||
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -222,10 +221,10 @@ func TestService_ReceiveBlockUpdateHead(t *testing.T) {
|
||||
require.NoError(t, s.ReceiveBlock(ctx, wsb, root, nil))
|
||||
})
|
||||
wg.Wait()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
|
||||
t.Errorf("Received %d state notifications, expected at least 1", recvd)
|
||||
}
|
||||
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
|
||||
require.Eventually(t, func() bool {
|
||||
return len(notifier.ReceivedEvents()) >= 1
|
||||
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
|
||||
// Verify fork choice has processed the block. (Genesis block and the new block)
|
||||
assert.Equal(t, 2, s.cfg.ForkChoiceStore.NodeCount())
|
||||
}
|
||||
@@ -265,10 +264,10 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
|
||||
block: genFullBlock(t, util.DefaultBlockGenConfig(), 1 /*slot*/),
|
||||
},
|
||||
check: func(t *testing.T, s *Service) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if recvd := len(s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier).ReceivedEvents()); recvd < 1 {
|
||||
t.Errorf("Received %d state notifications, expected at least 1", recvd)
|
||||
}
|
||||
notifier := s.cfg.StateNotifier.(*blockchainTesting.MockStateNotifier)
|
||||
require.Eventually(t, func() bool {
|
||||
return len(notifier.ReceivedEvents()) >= 1
|
||||
}, 2*time.Second, 10*time.Millisecond, "Expected at least 1 state notification")
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -512,8 +511,9 @@ func Test_executePostFinalizationTasks(t *testing.T) {
|
||||
s.cfg.StateNotifier = notifier
|
||||
s.executePostFinalizationTasks(s.ctx, headState)
|
||||
|
||||
time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine
|
||||
require.Equal(t, 1, len(notifier.ReceivedEvents()))
|
||||
require.Eventually(t, func() bool {
|
||||
return len(notifier.ReceivedEvents()) == 1
|
||||
}, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification")
|
||||
e := notifier.ReceivedEvents()[0]
|
||||
assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type))
|
||||
fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint)
|
||||
@@ -552,8 +552,9 @@ func Test_executePostFinalizationTasks(t *testing.T) {
|
||||
s.cfg.StateNotifier = notifier
|
||||
s.executePostFinalizationTasks(s.ctx, headState)
|
||||
|
||||
time.Sleep(1 * time.Second) // sleep for a second because event is in a separate go routine
|
||||
require.Equal(t, 1, len(notifier.ReceivedEvents()))
|
||||
require.Eventually(t, func() bool {
|
||||
return len(notifier.ReceivedEvents()) == 1
|
||||
}, 5*time.Second, 50*time.Millisecond, "Expected exactly 1 state notification")
|
||||
e := notifier.ReceivedEvents()[0]
|
||||
assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type))
|
||||
fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint)
|
||||
@@ -596,13 +597,13 @@ func TestProcessLightClientBootstrap(t *testing.T) {
|
||||
|
||||
s.executePostFinalizationTasks(s.ctx, l.AttestedState)
|
||||
|
||||
// wait for the goroutine to finish processing
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Check that the light client bootstrap is saved
|
||||
b, err := s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, b)
|
||||
// Wait for the light client bootstrap to be saved (runs in goroutine)
|
||||
var b interfaces.LightClientBootstrap
|
||||
require.Eventually(t, func() bool {
|
||||
var err error
|
||||
b, err = s.lcStore.LightClientBootstrap(ctx, [32]byte(cp.Root))
|
||||
return err == nil && b != nil
|
||||
}, 5*time.Second, 50*time.Millisecond, "Light client bootstrap was not saved within timeout")
|
||||
|
||||
btst, err := lightClient.NewLightClientBootstrapFromBeaconState(ctx, l.FinalizedState.Slot(), l.FinalizedState, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -295,11 +295,20 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB(saved.Slot())
|
||||
// Track supernode subscription status for logging purposes.
|
||||
isSupernode := flags.Get().Supernode
|
||||
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "update subscribed to all data subnets")
|
||||
}
|
||||
|
||||
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get and save custody group count")
|
||||
}
|
||||
|
||||
logCustodyStatus(wasSupernode, custodySubnetCount)
|
||||
|
||||
if _, _, err := s.cfg.P2P.UpdateCustodyInfo(earliestAvailableSlot, custodySubnetCount); err != nil {
|
||||
return errors.Wrap(err, "update custody info")
|
||||
}
|
||||
@@ -468,58 +477,100 @@ func (s *Service) removeStartupState() {
|
||||
s.cfg.FinalizedStateAtStartUp = nil
|
||||
}
|
||||
|
||||
// UpdateCustodyInfoInDB updates the custody information in the database.
|
||||
// It returns the (potentially updated) custody group count and the earliest available slot.
|
||||
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
isSemiSupernode := flags.Get().SemiSupernode
|
||||
|
||||
cfg := params.BeaconConfig()
|
||||
custodyRequirement := cfg.CustodyRequirement
|
||||
|
||||
// Check if the node was previously subscribed to all data subnets, and if so,
|
||||
// store the new status accordingly.
|
||||
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
|
||||
// updateCustodyInfoInDB updates the custody information in the database.
|
||||
// It returns the (potentially updated) earliest available slot and custody group count.
|
||||
func (s *Service) updateCustodyInfoInDB() (primitives.Slot, uint64, error) {
|
||||
targetCustodyGroupCount, err := computeTargetCustodyGroupCount()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update subscribed to all data subnets")
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
// Compute the target custody group count based on current flag configuration.
|
||||
targetCustodyGroupCount := custodyRequirement
|
||||
// Query current custody info.
|
||||
storedEarliestSlot, storedCustodyCount, err := s.cfg.BeaconDB.CustodyInfo(s.ctx)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "custody info")
|
||||
}
|
||||
|
||||
// Supernode: custody all groups (either currently set or previously enabled)
|
||||
// If custody was already initialized and no increase needed, return stored values.
|
||||
if storedCustodyCount > 0 && targetCustodyGroupCount <= storedCustodyCount {
|
||||
return storedEarliestSlot, storedCustodyCount, nil
|
||||
}
|
||||
|
||||
earliestAvailableSlot, err := s.earliestAvailableSlotForCustodyIncrease(storedCustodyCount)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
storedEarliestSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(
|
||||
s.ctx, earliestAvailableSlot, targetCustodyGroupCount)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update custody info")
|
||||
}
|
||||
|
||||
return storedEarliestSlot, actualCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
// earliestAvailableSlotForCustodyIncrease determines the earliest available slot when custody is increasing.
|
||||
// - Pre-fulu: use earliest block slot (no data column sharding)
|
||||
// - Post-fulu checkpoint sync (storedCustodyCount == 0): use headSlot
|
||||
// - Post-fulu restart with custody increase: use headSlot + 1
|
||||
//
|
||||
// (we don't have data columns for new custody groups at the current head)
|
||||
func (s *Service) earliestAvailableSlotForCustodyIncrease(storedCustodyCount uint64) (primitives.Slot, error) {
|
||||
fuluSlot, err := fuluForkSlot()
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "fulu fork slot")
|
||||
}
|
||||
|
||||
headSlot := s.HeadSlot()
|
||||
|
||||
if headSlot >= fuluSlot {
|
||||
if storedCustodyCount > 0 {
|
||||
// Restart with custody increase: new groups only have data from headSlot + 1
|
||||
return headSlot + 1, nil
|
||||
}
|
||||
// Checkpoint sync: we download data columns for the checkpoint block
|
||||
return headSlot, nil
|
||||
}
|
||||
|
||||
// Pre-fulu: use earliest block slot
|
||||
earliestAvailableSlot, err := s.cfg.BeaconDB.EarliestSlot(s.ctx)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "earliest slot")
|
||||
}
|
||||
return earliestAvailableSlot, nil
|
||||
}
|
||||
|
||||
// computeTargetCustodyGroupCount returns the custody group count based on current flag configuration.
|
||||
func computeTargetCustodyGroupCount() (uint64, error) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
isSemiSupernode := flags.Get().SemiSupernode
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
// Default to custody requirement
|
||||
targetCustodyGroupCount := cfg.CustodyRequirement
|
||||
|
||||
// Supernode: custody all groups
|
||||
if isSupernode {
|
||||
targetCustodyGroupCount = cfg.NumberOfCustodyGroups
|
||||
return cfg.NumberOfCustodyGroups, nil
|
||||
}
|
||||
|
||||
// Semi-supernode: custody minimum needed for reconstruction, or custody requirement if higher
|
||||
if isSemiSupernode {
|
||||
semiSupernodeCustody, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "minimum custody group count")
|
||||
return 0, errors.Wrap(err, "minimum custody group count")
|
||||
}
|
||||
|
||||
targetCustodyGroupCount = max(custodyRequirement, semiSupernodeCustody)
|
||||
targetCustodyGroupCount = max(cfg.CustodyRequirement, semiSupernodeCustody)
|
||||
}
|
||||
|
||||
// Safely compute the fulu fork slot.
|
||||
fuluForkSlot, err := fuluForkSlot()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "fulu fork slot")
|
||||
}
|
||||
return targetCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
// If slot is before the fulu fork slot, then use the earliest stored slot as the reference slot.
|
||||
if slot < fuluForkSlot {
|
||||
slot, err = s.cfg.BeaconDB.EarliestSlot(s.ctx)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "earliest slot")
|
||||
}
|
||||
}
|
||||
|
||||
earliestAvailableSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(s.ctx, slot, targetCustodyGroupCount)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update custody info")
|
||||
}
|
||||
// logCustodyStatus logs information about the custody configuration.
|
||||
func logCustodyStatus(wasSupernode bool, actualCustodyGroupCount uint64) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
if isSupernode {
|
||||
log.WithFields(logrus.Fields{
|
||||
@@ -531,8 +582,6 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
|
||||
if wasSupernode && !isSupernode {
|
||||
log.Warningf("Because the `--%s` flag was previously used, the node will continue to act as a super node.", flags.Supernode.Name)
|
||||
}
|
||||
|
||||
return earliestAvailableSlot, actualCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) {
|
||||
|
||||
@@ -621,37 +621,54 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
roBlock, err := blocks.NewROBlock(signedBeaconBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
fuluForkSlot := fuluForkEpoch * primitives.Slot(cfg.SlotsPerEpoch)
|
||||
|
||||
// Helper to set up forkchoice and head at a specific slot.
|
||||
// Returns the block root so it can be used as parent for subsequent calls.
|
||||
setupHead := func(t *testing.T, service *Service, slot primitives.Slot, parentRoot [32]byte) [32]byte {
|
||||
fc := ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
|
||||
blockRoot := [32]byte{byte(slot)}
|
||||
headState, headBlock, err := prepareForkchoiceState(ctx, slot, blockRoot, parentRoot, params.BeaconConfig().ZeroHash, fc, fc)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, headState, headBlock))
|
||||
require.NoError(t, service.setHead(&head{
|
||||
root: headBlock.Root(),
|
||||
block: headBlock,
|
||||
state: headState,
|
||||
slot: slot,
|
||||
}))
|
||||
return blockRoot
|
||||
}
|
||||
|
||||
t.Run("CGC increases before fulu", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
// Set head to pre-fulu slot
|
||||
preFuluSlot := primitives.Slot(15)
|
||||
setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Before Fulu - should use earliest stored slot
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
// Call again to ensure idempotency
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// Enable supernode while still pre-fulu
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(19)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1)
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
@@ -662,35 +679,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
// Start pre-fulu
|
||||
preFuluSlot := primitives.Slot(15)
|
||||
preFuluRoot := setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
// Move to post-fulu and enable supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot)
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, preFuluRoot)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot+1, actualEas) // head + 1
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
// Call again - earliest available slot shouldn't change
|
||||
laterSlot := postFuluSlot + 10
|
||||
setupHead(t, service, laterSlot, postFuluRoot)
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot+1, actualEas) // Still the original EAS
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
@@ -699,16 +717,19 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Enable supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
@@ -716,10 +737,14 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should still be supernode
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
// Move head forward
|
||||
laterSlot := postFuluSlot + 10
|
||||
setupHead(t, service, laterSlot, postFuluRoot)
|
||||
|
||||
// Should still be supernode (custody count shouldn't decrease)
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Still 64, not downgraded
|
||||
})
|
||||
|
||||
@@ -728,29 +753,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 32
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.SemiSupernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// UpdateCustodyInfo should prevent downgrade - custody count should remain at 64
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
// Move head forward
|
||||
laterSlot := postFuluSlot + 10
|
||||
setupHead(t, service, laterSlot, postFuluRoot)
|
||||
|
||||
// UpdateCustodyInfo should prevent downgrade
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 64 due to downgrade prevention by UpdateCustodyInfo
|
||||
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 32
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode to supernode upgrade allowed", func(t *testing.T) {
|
||||
@@ -758,18 +790,21 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Start with semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
|
||||
// Upgrade to full supernode
|
||||
gFlags.SemiSupernode = false
|
||||
@@ -777,12 +812,15 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should upgrade to full supernode
|
||||
upgradeSlot := slot + 2
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(upgradeSlot)
|
||||
// Move head forward for upgrade
|
||||
upgradeSlot := postFuluSlot + 10
|
||||
setupHead(t, service, upgradeSlot, postFuluRoot)
|
||||
|
||||
// Second call - restart with custody increase
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, upgradeSlot, actualEas) // Earliest slot updates when upgrading
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 128
|
||||
require.Equal(t, upgradeSlot+1, actualEas) // headSlot + 1 (restart with custody increase)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 64
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode with high validator requirements uses higher custody", func(t *testing.T) {
|
||||
@@ -790,6 +828,9 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
@@ -797,17 +838,92 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Mock a high custody requirement (simulating many validators)
|
||||
// We need to override the custody requirement calculation
|
||||
// For this test, we'll verify the logic by checking if custodyRequirement > 64
|
||||
// Since custodyRequirement in minimalTestService is 4, we can't test the high case here
|
||||
// This would require a different test setup with actual validators
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
// With low validator requirements (4), should use semi-supernode minimum (64)
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2
|
||||
// With low validator requirements (4), should use semi-supernode minimum (32)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("Restart with higher custody uses head slot plus one", func(t *testing.T) {
|
||||
// This test simulates a restart scenario where:
|
||||
// 1. Node was running with lower custody count
|
||||
// 2. Node stops and waits (new blocks are produced)
|
||||
// 3. Node restarts with higher custody count
|
||||
// In this case, we should use head + 1 as the earliest available slot.
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// First, establish existing custody in the DB (simulates previous run)
|
||||
initialSlot := fuluForkSlot + 10
|
||||
initialRoot := setupHead(t, service, initialSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Initial call with default custody (simulates first run checkpoint sync)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, initialSlot, actualEas) // First time = headSlot
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// Verify existing custody is stored (calling with 0 reads without modifying)
|
||||
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, custodyRequirement, storedCount)
|
||||
|
||||
// Now simulate restart at a higher head slot with increased custody
|
||||
headSlot := fuluForkSlot + 100
|
||||
setupHead(t, service, headSlot, initialRoot)
|
||||
require.Equal(t, headSlot, service.HeadSlot())
|
||||
|
||||
// Enable supernode (simulating restart with increased custody)
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
|
||||
// The earliest available slot should be head + 1 (restart scenario)
|
||||
expectedEas := headSlot + 1
|
||||
require.Equal(t, expectedEas, actualEas, "Expected head + 1 for restart with custody increase")
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("Checkpoint sync uses head slot without plus one", func(t *testing.T) {
|
||||
// This test verifies checkpoint sync behavior:
|
||||
// When syncing from a checkpoint with no existing custody info,
|
||||
// the earliest available slot should be the checkpoint slot itself
|
||||
// (not +1), because we download the checkpoint block's data columns.
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set up head at checkpoint slot (simulates checkpoint sync)
|
||||
checkpointSlot := fuluForkSlot + 50
|
||||
setupHead(t, service, checkpointSlot, params.BeaconConfig().ZeroHash)
|
||||
require.Equal(t, checkpointSlot, service.HeadSlot())
|
||||
|
||||
// Verify no existing custody in DB (calling with 0 reads without modifying)
|
||||
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), storedCount)
|
||||
|
||||
// Enable supernode for checkpoint sync
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
|
||||
// For checkpoint sync, EAS should be the checkpoint slot (not +1)
|
||||
require.Equal(t, checkpointSlot, actualEas, "Checkpoint sync should use headSlot, not headSlot+1")
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -128,6 +128,7 @@ type NoHeadAccessDatabase interface {
|
||||
BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error
|
||||
|
||||
// Custody operations.
|
||||
CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error)
|
||||
UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error)
|
||||
UpdateEarliestAvailableSlot(ctx context.Context, earliestAvailableSlot primitives.Slot) error
|
||||
UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error)
|
||||
|
||||
@@ -14,9 +14,38 @@ import (
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// CustodyInfo returns the stored earliest available slot and custody group count.
|
||||
func (s *Store) CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.CustodyInfo")
|
||||
defer span.End()
|
||||
|
||||
var storedGroupCount uint64
|
||||
var storedEarliestAvailableSlot primitives.Slot
|
||||
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(custodyBucket)
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if b := bucket.Get(groupCountKey); len(b) != 0 {
|
||||
storedGroupCount = bytesutil.BytesToUint64BigEndian(b)
|
||||
}
|
||||
if b := bucket.Get(earliestAvailableSlotKey); len(b) != 0 {
|
||||
storedEarliestAvailableSlot = primitives.Slot(bytesutil.BytesToUint64BigEndian(b))
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
return storedEarliestAvailableSlot, storedGroupCount, nil
|
||||
}
|
||||
|
||||
// UpdateCustodyInfo atomically updates the custody group count only if it is greater than the stored one.
|
||||
// In this case, it also updates the earliest available slot with the provided value.
|
||||
// It returns the (potentially updated) custody group count and earliest available slot.
|
||||
// The earliest available slot is only updated if the new value is higher than the stored one,
|
||||
// preventing loss of availability for data we already have (e.g., when switching from normal to supernode).
|
||||
// It returns the (potentially updated) earliest available slot and custody group count.
|
||||
func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.UpdateCustodyInfo")
|
||||
defer span.End()
|
||||
@@ -46,20 +75,23 @@ func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot pri
|
||||
return nil
|
||||
}
|
||||
|
||||
storedGroupCount, storedEarliestAvailableSlot = custodyGroupCount, earliestAvailableSlot
|
||||
|
||||
// Store the earliest available slot.
|
||||
bytes := bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
|
||||
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
|
||||
return errors.Wrap(err, "put earliest available slot")
|
||||
}
|
||||
|
||||
// Store the custody group count.
|
||||
bytes = bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
|
||||
// Update the custody group count.
|
||||
storedGroupCount = custodyGroupCount
|
||||
bytes := bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
|
||||
if err := bucket.Put(groupCountKey, bytes); err != nil {
|
||||
return errors.Wrap(err, "put custody group count")
|
||||
}
|
||||
|
||||
// Only update earliestAvailableSlot if the new value is higher.
|
||||
// This prevents losing availability for data we already have.
|
||||
if earliestAvailableSlot > storedEarliestAvailableSlot {
|
||||
storedEarliestAvailableSlot = earliestAvailableSlot
|
||||
bytes = bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
|
||||
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
|
||||
return errors.Wrap(err, "put earliest available slot")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, 0, err
|
||||
|
||||
@@ -89,7 +89,7 @@ func TestUpdateCustodyInfo(t *testing.T) {
|
||||
require.Equal(t, groupCount, storedCount)
|
||||
})
|
||||
|
||||
t.Run("update with higher group count", func(t *testing.T) {
|
||||
t.Run("update with higher group count and higher slot", func(t *testing.T) {
|
||||
const (
|
||||
initialSlot = primitives.Slot(100)
|
||||
initialCount = uint64(5)
|
||||
@@ -112,6 +112,150 @@ func TestUpdateCustodyInfo(t *testing.T) {
|
||||
require.Equal(t, groupCount, storedCount)
|
||||
})
|
||||
|
||||
t.Run("update with higher group count and lower slot should preserve higher slot", func(t *testing.T) {
|
||||
// This is the bug scenario: when switching from normal mode to semi-supernode,
|
||||
// the incoming slot might be lower than the stored slot, but we should preserve
|
||||
// the higher stored slot to avoid advertising that we can serve data we don't have.
|
||||
const (
|
||||
initialSlot = primitives.Slot(1835523) // Higher stored slot
|
||||
initialCount = uint64(10)
|
||||
earliestSlot = primitives.Slot(1835456) // Lower incoming slot (e.g., from head slot)
|
||||
groupCount = uint64(64) // Increasing custody (e.g., semi-supernode)
|
||||
)
|
||||
|
||||
db := setupDB(t)
|
||||
|
||||
_, _, err := db.UpdateCustodyInfo(ctx, initialSlot, initialCount)
|
||||
require.NoError(t, err)
|
||||
|
||||
// When custody count increases but slot is lower, the higher slot should be preserved
|
||||
slot, count, err := db.UpdateCustodyInfo(ctx, earliestSlot, groupCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, initialSlot, slot, "earliestAvailableSlot should not decrease when custody group count increases")
|
||||
require.Equal(t, groupCount, count)
|
||||
|
||||
// Verify in the database
|
||||
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, initialSlot, storedSlot, "stored slot should be the higher value")
|
||||
require.Equal(t, groupCount, storedCount)
|
||||
})
|
||||
|
||||
t.Run("pre-fulu scenario: checkpoint sync before fork, restart with semi-supernode", func(t *testing.T) {
|
||||
// This test covers the pre-Fulu bug scenario:
|
||||
// 1. Node starts with checkpoint sync BEFORE Fulu fork - uses EarliestSlot() (checkpoint block slot)
|
||||
// 2. Validators connect after Fulu activates - maintainCustodyInfo() updates to head slot (higher)
|
||||
// 3. Node restarts with --semi-supernode - updateCustodyInfoInDB uses EarliestSlot() again
|
||||
// The bug was that step 3 would overwrite the higher slot from step 2.
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.FuluForkEpoch = 100
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Derive slot values relative to Fulu fork
|
||||
checkpointBlockSlot := fuluForkSlot - 10 // Checkpoint sync happened before Fulu
|
||||
headSlot := fuluForkSlot + 5 // Head slot after Fulu activates
|
||||
defaultCustody := cfg.CustodyRequirement // Default custody from config
|
||||
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
|
||||
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
|
||||
|
||||
// Verify our test setup: checkpoint is pre-Fulu, head is post-Fulu
|
||||
require.Equal(t, true, checkpointBlockSlot < fuluForkSlot, "checkpoint must be before Fulu fork")
|
||||
require.Equal(t, true, headSlot >= fuluForkSlot, "head must be at or after Fulu fork")
|
||||
|
||||
db := setupDB(t)
|
||||
|
||||
// Step 1: Node starts with checkpoint sync (pre-Fulu)
|
||||
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot()
|
||||
slot, count, err := db.UpdateCustodyInfo(ctx, checkpointBlockSlot, defaultCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, checkpointBlockSlot, slot)
|
||||
require.Equal(t, defaultCustody, count)
|
||||
|
||||
// Step 2: Validators connect after Fulu activates, maintainCustodyInfo() runs
|
||||
// Uses headSlot which is higher than checkpointBlockSlot
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot, "should update to head slot")
|
||||
require.Equal(t, validatorCustody, count)
|
||||
|
||||
// Verify step 2 stored correctly
|
||||
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, headSlot, storedSlot)
|
||||
require.Equal(t, validatorCustody, storedCount)
|
||||
|
||||
// Step 3: Restart with --semi-supernode
|
||||
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot() again
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, checkpointBlockSlot, semiSupernodeCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease back to checkpoint slot")
|
||||
require.Equal(t, semiSupernodeCustody, count)
|
||||
|
||||
// Verify the database preserved the higher slot
|
||||
storedSlot, storedCount = getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, headSlot, storedSlot, "stored slot should remain at head slot, not checkpoint slot")
|
||||
require.Equal(t, semiSupernodeCustody, storedCount)
|
||||
})
|
||||
|
||||
t.Run("post-fulu scenario: finalized slot lower than stored head slot", func(t *testing.T) {
|
||||
// This test covers the post-Fulu bug scenario:
|
||||
// Post-fork, updateCustodyInfoInDB uses saved.Slot() (finalized slot) directly,
|
||||
// not EarliestSlot(). But the same bug can occur because:
|
||||
// - maintainCustodyInfo() stores headSlot (higher)
|
||||
// - Restart uses finalized slot (lower than head)
|
||||
// Our fix ensures earliestAvailableSlot never decreases.
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.FuluForkEpoch = 100
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Derive slot values relative to Fulu fork - all slots are AFTER Fulu
|
||||
finalizedSlotAtStart := fuluForkSlot + 100 // Finalized slot at first start (post-Fulu)
|
||||
headSlot := fuluForkSlot + 200 // Head slot when validators connect
|
||||
finalizedSlotRestart := fuluForkSlot + 150 // Finalized slot at restart (< headSlot)
|
||||
defaultCustody := cfg.CustodyRequirement // Default custody from config
|
||||
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
|
||||
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
|
||||
|
||||
// Verify our test setup: all slots are post-Fulu
|
||||
require.Equal(t, true, finalizedSlotAtStart >= fuluForkSlot, "finalized slot must be at or after Fulu fork")
|
||||
require.Equal(t, true, headSlot >= fuluForkSlot, "head slot must be at or after Fulu fork")
|
||||
require.Equal(t, true, finalizedSlotRestart >= fuluForkSlot, "restart finalized slot must be at or after Fulu fork")
|
||||
require.Equal(t, true, finalizedSlotRestart < headSlot, "restart finalized slot must be less than head slot")
|
||||
|
||||
db := setupDB(t)
|
||||
|
||||
// Step 1: Node starts post-Fulu
|
||||
// updateCustodyInfoInDB sees saved.Slot() >= fuluForkSlot, so uses saved.Slot() directly
|
||||
slot, count, err := db.UpdateCustodyInfo(ctx, finalizedSlotAtStart, defaultCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, finalizedSlotAtStart, slot)
|
||||
require.Equal(t, defaultCustody, count)
|
||||
|
||||
// Step 2: Validators connect, maintainCustodyInfo() uses head slot
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot)
|
||||
require.Equal(t, validatorCustody, count)
|
||||
|
||||
// Step 3: Restart with --semi-supernode
|
||||
// updateCustodyInfoInDB uses finalized slot which is lower than stored head slot
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, finalizedSlotRestart, semiSupernodeCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease to finalized slot")
|
||||
require.Equal(t, semiSupernodeCustody, count)
|
||||
|
||||
// Verify database preserved the higher slot
|
||||
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, headSlot, storedSlot)
|
||||
require.Equal(t, semiSupernodeCustody, storedCount)
|
||||
})
|
||||
|
||||
t.Run("update with lower group count should not update", func(t *testing.T) {
|
||||
const (
|
||||
initialSlot = primitives.Slot(200)
|
||||
|
||||
@@ -75,7 +75,6 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
p2p := p2pTesting.NewTestP2P(t)
|
||||
lcStore := NewLightClientStore(p2p, new(event.Feed), testDB.SetupDB(t))
|
||||
|
||||
timeForGoroutinesToFinish := 20 * time.Microsecond
|
||||
// update 0 with basic data and no supermajority following an empty lastFinalityUpdate - should save and broadcast
|
||||
l0 := util.NewTestLightClient(t, version.Altair)
|
||||
update0, err := NewLightClientFinalityUpdateFromBeaconState(l0.Ctx, l0.State, l0.Block, l0.AttestedState, l0.AttestedBlock, l0.FinalizedBlock)
|
||||
@@ -87,8 +86,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
|
||||
lcStore.SetLastFinalityUpdate(update0, true)
|
||||
require.Equal(t, update0, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
|
||||
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
|
||||
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update when previous is nil")
|
||||
require.Eventually(t, func() bool {
|
||||
return p2p.BroadcastCalled.Load()
|
||||
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update when previous is nil")
|
||||
p2p.BroadcastCalled.Store(false) // Reset for next test
|
||||
|
||||
// update 1 with same finality slot, increased attested slot, and no supermajority - should save but not broadcast
|
||||
@@ -102,7 +102,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
|
||||
lcStore.SetLastFinalityUpdate(update1, true)
|
||||
require.Equal(t, update1, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
|
||||
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
|
||||
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
|
||||
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called after setting a new last finality update without supermajority")
|
||||
p2p.BroadcastCalled.Store(false) // Reset for next test
|
||||
|
||||
@@ -117,8 +117,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
|
||||
lcStore.SetLastFinalityUpdate(update2, true)
|
||||
require.Equal(t, update2, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
|
||||
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
|
||||
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after setting a new last finality update with supermajority")
|
||||
require.Eventually(t, func() bool {
|
||||
return p2p.BroadcastCalled.Load()
|
||||
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after setting a new last finality update with supermajority")
|
||||
p2p.BroadcastCalled.Store(false) // Reset for next test
|
||||
|
||||
// update 3 with same finality slot, increased attested slot, and supermajority - should save but not broadcast
|
||||
@@ -132,7 +133,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
|
||||
lcStore.SetLastFinalityUpdate(update3, true)
|
||||
require.Equal(t, update3, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
|
||||
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
|
||||
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
|
||||
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been when previous was already broadcast")
|
||||
|
||||
// update 4 with increased finality slot, increased attested slot, and supermajority - should save and broadcast
|
||||
@@ -146,8 +147,9 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
|
||||
lcStore.SetLastFinalityUpdate(update4, true)
|
||||
require.Equal(t, update4, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
|
||||
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
|
||||
require.Equal(t, true, p2p.BroadcastCalled.Load(), "Broadcast should have been called after a new finality update with increased finality slot")
|
||||
require.Eventually(t, func() bool {
|
||||
return p2p.BroadcastCalled.Load()
|
||||
}, time.Second, 10*time.Millisecond, "Broadcast should have been called after a new finality update with increased finality slot")
|
||||
p2p.BroadcastCalled.Store(false) // Reset for next test
|
||||
|
||||
// update 5 with the same new finality slot, increased attested slot, and supermajority - should save but not broadcast
|
||||
@@ -161,7 +163,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
|
||||
lcStore.SetLastFinalityUpdate(update5, true)
|
||||
require.Equal(t, update5, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
|
||||
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
|
||||
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
|
||||
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
|
||||
|
||||
// update 6 with the same new finality slot, increased attested slot, and no supermajority - should save but not broadcast
|
||||
@@ -175,7 +177,7 @@ func TestLightClientStore_SetLastFinalityUpdate(t *testing.T) {
|
||||
|
||||
lcStore.SetLastFinalityUpdate(update6, true)
|
||||
require.Equal(t, update6, lcStore.LastFinalityUpdate(), "lastFinalityUpdate should match the set value")
|
||||
time.Sleep(timeForGoroutinesToFinish) // give some time for the broadcast goroutine to finish
|
||||
time.Sleep(50 * time.Millisecond) // Wait briefly to verify broadcast is not called
|
||||
require.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcast should not have been called when previous was already broadcast with supermajority")
|
||||
}
|
||||
|
||||
|
||||
@@ -72,7 +72,10 @@ func TestService_Broadcast(t *testing.T) {
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
// Wait for libp2p mesh to establish
|
||||
require.Eventually(t, func() bool {
|
||||
return len(p.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
@@ -186,7 +189,10 @@ func TestService_BroadcastAttestation(t *testing.T) {
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
// Wait for libp2p mesh to establish
|
||||
require.Eventually(t, func() bool {
|
||||
return len(p.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
@@ -375,7 +381,15 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
|
||||
_, err = tpHandle.Subscribe()
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // libp2p fails without this delay...
|
||||
// This test specifically tests discovery-based peer finding, which requires
|
||||
// time for nodes to discover each other. Using a fixed sleep here is intentional
|
||||
// as we're testing the discovery timing behavior.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Verify mesh establishment after discovery
|
||||
require.Eventually(t, func() bool {
|
||||
return len(p.pubsub.ListPeers(topic)) > 0 && len(p2.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
nodePeers := p.pubsub.ListPeers(topic)
|
||||
nodePeers2 := p2.pubsub.ListPeers(topic)
|
||||
@@ -444,7 +458,10 @@ func TestService_BroadcastSyncCommittee(t *testing.T) {
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
// Wait for libp2p mesh to establish
|
||||
require.Eventually(t, func() bool {
|
||||
return len(p.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
@@ -521,7 +538,10 @@ func TestService_BroadcastBlob(t *testing.T) {
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
// Wait for libp2p mesh to establish
|
||||
require.Eventually(t, func() bool {
|
||||
return len(p.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
@@ -584,7 +604,10 @@ func TestService_BroadcastLightClientOptimisticUpdate(t *testing.T) {
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
// Wait for libp2p mesh to establish
|
||||
require.Eventually(t, func() bool {
|
||||
return len(p.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
@@ -660,7 +683,10 @@ func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) {
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
|
||||
// Wait for libp2p mesh to establish
|
||||
require.Eventually(t, func() bool {
|
||||
return len(p.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
// Async listen for the pubsub, must be before the broadcast.
|
||||
var wg sync.WaitGroup
|
||||
@@ -771,8 +797,10 @@ func TestService_BroadcastDataColumn(t *testing.T) {
|
||||
sub, err := p2.SubscribeToTopic(topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
// libp2p fails without this delay
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Wait for libp2p mesh to establish
|
||||
require.Eventually(t, func() bool {
|
||||
return len(service.pubsub.ListPeers(topic)) > 0
|
||||
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
|
||||
|
||||
// Broadcast to peers and wait.
|
||||
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})
|
||||
|
||||
@@ -482,12 +482,12 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) {
|
||||
s.Start()
|
||||
<-exitRoutine
|
||||
}()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
time.Sleep(50 * time.Millisecond) // Wait for service initialization
|
||||
var vr [32]byte
|
||||
require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr)))
|
||||
time.Sleep(4 * time.Second)
|
||||
ps := s.host.Network().Peers()
|
||||
assert.Equal(t, 5, len(ps), "Not all peers added to peerstore")
|
||||
require.Eventually(t, func() bool {
|
||||
return len(s.host.Network().Peers()) == 5
|
||||
}, 10*time.Second, 100*time.Millisecond, "Not all peers added to peerstore")
|
||||
require.NoError(t, s.Stop())
|
||||
exitRoutine <- true
|
||||
}
|
||||
|
||||
@@ -80,8 +80,9 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
|
||||
}()
|
||||
var vr [32]byte
|
||||
require.NoError(t, cs.SetClock(startup.NewClock(time.Now(), vr)))
|
||||
time.Sleep(time.Second * 2)
|
||||
assert.Equal(t, true, s.started, "Expected service to be started")
|
||||
require.Eventually(t, func() bool {
|
||||
return s.started
|
||||
}, 5*time.Second, 100*time.Millisecond, "Expected service to be started")
|
||||
s.Start()
|
||||
require.LogsContain(t, hook, "Attempted to start p2p service when it was already started")
|
||||
require.NoError(t, s.Stop())
|
||||
@@ -260,17 +261,9 @@ func TestListenForNewNodes(t *testing.T) {
|
||||
err = cs.SetClock(startup.NewClock(genesisTime, gvr))
|
||||
require.NoError(t, err, "Could not set clock in service")
|
||||
|
||||
actualPeerCount := len(s.host.Network().Peers())
|
||||
for range 40 {
|
||||
if actualPeerCount == peerCount {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
actualPeerCount = len(s.host.Network().Peers())
|
||||
}
|
||||
|
||||
assert.Equal(t, peerCount, actualPeerCount, "Not all peers added to peerstore")
|
||||
require.Eventually(t, func() bool {
|
||||
return len(s.host.Network().Peers()) == peerCount
|
||||
}, 5*time.Second, 100*time.Millisecond, "Not all peers added to peerstore")
|
||||
|
||||
err = s.Stop()
|
||||
require.NoError(t, err, "Failed to stop service")
|
||||
|
||||
@@ -657,8 +657,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
||||
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
||||
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||
require.Eventually(t, func() bool {
|
||||
return s.AttestationsPool.UnaggregatedAttestationCount() == 1
|
||||
}, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool")
|
||||
})
|
||||
t.Run("multiple", func(t *testing.T) {
|
||||
broadcaster := &p2pMock.MockBroadcaster{}
|
||||
@@ -677,8 +678,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||
assert.Equal(t, 2, broadcaster.NumAttestations())
|
||||
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||
require.Eventually(t, func() bool {
|
||||
return s.AttestationsPool.UnaggregatedAttestationCount() == 2
|
||||
}, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool")
|
||||
})
|
||||
t.Run("phase0 att post electra", func(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -798,8 +800,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
||||
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
||||
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||
require.Eventually(t, func() bool {
|
||||
return s.AttestationsPool.UnaggregatedAttestationCount() == 1
|
||||
}, time.Second, 10*time.Millisecond, "Expected 1 attestation in pool")
|
||||
})
|
||||
t.Run("multiple", func(t *testing.T) {
|
||||
broadcaster := &p2pMock.MockBroadcaster{}
|
||||
@@ -818,8 +821,9 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||
assert.Equal(t, 2, broadcaster.NumAttestations())
|
||||
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||
require.Eventually(t, func() bool {
|
||||
return s.AttestationsPool.UnaggregatedAttestationCount() == 2
|
||||
}, time.Second, 10*time.Millisecond, "Expected 2 attestations in pool")
|
||||
})
|
||||
t.Run("no body", func(t *testing.T) {
|
||||
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
|
||||
@@ -1375,9 +1379,9 @@ func TestSubmitSignedBLSToExecutionChanges_Ok(t *testing.T) {
|
||||
writer.Body = &bytes.Buffer{}
|
||||
s.SubmitBLSToExecutionChanges(writer, request)
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
time.Sleep(100 * time.Millisecond) // Delay to let the routine start
|
||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages))
|
||||
require.Eventually(t, func() bool {
|
||||
return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages) == numValidators
|
||||
}, time.Second, 10*time.Millisecond, "Broadcast should be called with all messages")
|
||||
|
||||
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
|
||||
require.Equal(t, len(poolChanges), len(signedChanges))
|
||||
@@ -1591,10 +1595,10 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) {
|
||||
|
||||
s.SubmitBLSToExecutionChanges(writer, request)
|
||||
assert.Equal(t, http.StatusBadRequest, writer.Code)
|
||||
time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start
|
||||
require.StringContains(t, "One or more messages failed validation", writer.Body.String())
|
||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1)
|
||||
require.Eventually(t, func() bool {
|
||||
return broadcaster.BroadcastCalled.Load() && len(broadcaster.BroadcastMessages)+1 == numValidators
|
||||
}, time.Second, 10*time.Millisecond, "Broadcast should be called with expected messages")
|
||||
|
||||
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
|
||||
require.Equal(t, len(poolChanges)+1, len(signedChanges))
|
||||
|
||||
@@ -70,7 +70,6 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
|
||||
|
||||
topic := "/eth2/%x/beacon_block"
|
||||
go r.startDiscoveryAndSubscriptions()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
var vr [32]byte
|
||||
require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr)))
|
||||
@@ -83,9 +82,11 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
|
||||
msg.Block.ParentRoot = util.Random32Bytes(t)
|
||||
msg.Signature = sk.Sign([]byte("data")).Marshal()
|
||||
p2p.ReceivePubSub(topic, msg)
|
||||
// wait for chainstart to be sent
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
|
||||
|
||||
// Wait for chainstart event to be processed
|
||||
require.Eventually(t, func() bool {
|
||||
return r.chainStarted.IsSet()
|
||||
}, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event.")
|
||||
}
|
||||
|
||||
func TestSyncHandlers_WaitForChainStart(t *testing.T) {
|
||||
@@ -217,20 +218,18 @@ func TestSyncService_StopCleanly(t *testing.T) {
|
||||
p2p.Digest, err = r.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
|
||||
// wait for chainstart to be sent
|
||||
time.Sleep(2 * time.Second)
|
||||
require.Equal(t, true, r.chainStarted.IsSet(), "Did not receive chain start event.")
|
||||
|
||||
require.NotEqual(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
require.NotEqual(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
|
||||
// Wait for chainstart and topics to be registered
|
||||
require.Eventually(t, func() bool {
|
||||
return r.chainStarted.IsSet() && len(r.cfg.p2p.PubSub().GetTopics()) > 0 && len(r.cfg.p2p.Host().Mux().Protocols()) > 0
|
||||
}, 5*time.Second, 50*time.Millisecond, "Did not receive chain start event or topics not registered.")
|
||||
|
||||
// Both pubsub and rpc topics should be unsubscribed.
|
||||
require.NoError(t, r.Stop())
|
||||
|
||||
// Sleep to allow pubsub topics to be deregistered.
|
||||
time.Sleep(1 * time.Second)
|
||||
require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
|
||||
// Wait for pubsub topics to be deregistered.
|
||||
require.Eventually(t, func() bool {
|
||||
return len(r.cfg.p2p.PubSub().GetTopics()) == 0 && len(r.cfg.p2p.Host().Mux().Protocols()) == 0
|
||||
}, 5*time.Second, 50*time.Millisecond, "Pubsub topics were not deregistered")
|
||||
}
|
||||
|
||||
func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {
|
||||
|
||||
@@ -614,11 +614,10 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers.
|
||||
if res, err := r.validateAggregateAndProof(t.Context(), "", msg); res == pubsub.ValidationAccept {
|
||||
_ = err
|
||||
t.Fatal("Validated status is true")
|
||||
}
|
||||
require.Eventually(t, func() bool {
|
||||
res, _ := r.validateAggregateAndProof(t.Context(), "", msg)
|
||||
return res != pubsub.ValidationAccept
|
||||
}, time.Second, 10*time.Millisecond, "Expected validation to reject duplicate aggregate")
|
||||
}
|
||||
|
||||
func TestValidateAggregateAndProof_BadBlock(t *testing.T) {
|
||||
|
||||
@@ -992,7 +992,6 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
|
||||
|
||||
// Mark the proposer/slot as seen
|
||||
r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex)
|
||||
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers
|
||||
|
||||
// Prepare and validate the second message (clone)
|
||||
buf := new(bytes.Buffer)
|
||||
@@ -1010,9 +1009,11 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
|
||||
}
|
||||
|
||||
// Since this is not an equivocation (same signature), it should be ignored
|
||||
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, pubsub.ValidationIgnore, res, "block with same signature should be ignored")
|
||||
// Wait for the cached value to propagate through buffers
|
||||
require.Eventually(t, func() bool {
|
||||
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
|
||||
return err == nil && res == pubsub.ValidationIgnore
|
||||
}, time.Second, 10*time.Millisecond, "block with same signature should be ignored")
|
||||
|
||||
// Verify no slashings were created
|
||||
assert.Equal(t, 0, len(slashingPool.PendingPropSlashings), "Expected no slashings for same signature")
|
||||
|
||||
3
changelog/james-prysm_fix-backward-earliest-slot.md
Normal file
3
changelog/james-prysm_fix-backward-earliest-slot.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Fixes earliest slot should never go backwards when setting semi-supernode or supernode flags
|
||||
3
changelog/james-prysm_is-ready.md
Normal file
3
changelog/james-prysm_is-ready.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- changed IsHealthy check to IsReady for validator client's interpretation from /eth/v1/node/health, 206 will now return false as the node is syncing.
|
||||
3
changelog/jrhea_duplicate_tracer_provider_setting.md
Normal file
3
changelog/jrhea_duplicate_tracer_provider_setting.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Don't call trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize) twice.
|
||||
3
changelog/pvl_fix-flaky-tests-polling.md
Normal file
3
changelog/pvl_fix-flaky-tests-polling.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Replaced `time.Sleep` with `require.Eventually` polling in tests to fix flaky behavior caused by race conditions between goroutines and assertions.
|
||||
@@ -45,7 +45,6 @@ func Setup(ctx context.Context, serviceName, processName, endpoint string, sampl
|
||||
exporter,
|
||||
trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize),
|
||||
trace.WithBatchTimeout(trace.DefaultScheduleDelay*time.Millisecond),
|
||||
trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize),
|
||||
),
|
||||
trace.WithResource(
|
||||
resource.NewWithAttributes(
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/encoding/ssz/equality"
|
||||
"github.com/d4l3k/messagediff"
|
||||
@@ -138,12 +139,21 @@ func StringContains(loggerFn assertionLoggerFn, expected, actual string, flag bo
|
||||
|
||||
// NoError asserts that error is nil.
|
||||
func NoError(loggerFn assertionLoggerFn, err error, msg ...any) {
|
||||
// reflect.ValueOf is needed for nil instances of custom types implementing Error
|
||||
if err != nil && !reflect.ValueOf(err).IsNil() {
|
||||
errMsg := parseMsg("Unexpected error", msg...)
|
||||
_, file, line, _ := runtime.Caller(2)
|
||||
loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
// reflect.ValueOf is needed for nil instances of custom types implementing Error.
|
||||
// Only check IsNil for types that support it to avoid panics on struct types.
|
||||
v := reflect.ValueOf(err)
|
||||
switch v.Kind() {
|
||||
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice, reflect.UnsafePointer:
|
||||
if v.IsNil() {
|
||||
return
|
||||
}
|
||||
}
|
||||
errMsg := parseMsg("Unexpected error", msg...)
|
||||
_, file, line, _ := runtime.Caller(2)
|
||||
loggerFn("%s:%d %s: %v", filepath.Base(file), line, errMsg, err)
|
||||
}
|
||||
|
||||
// ErrorIs uses Errors.Is to recursively unwrap err looking for target in the chain.
|
||||
@@ -341,3 +351,18 @@ func (tb *TBMock) Errorf(format string, args ...any) {
|
||||
func (tb *TBMock) Fatalf(format string, args ...any) {
|
||||
tb.FatalfMsg = fmt.Sprintf(format, args...)
|
||||
}
|
||||
|
||||
// Eventually asserts that given condition will be met within waitFor time,
|
||||
// periodically checking target function each tick.
|
||||
func Eventually(loggerFn assertionLoggerFn, condition func() bool, waitFor, tick time.Duration, msg ...any) {
|
||||
deadline := time.Now().Add(waitFor)
|
||||
for time.Now().Before(deadline) {
|
||||
if condition() {
|
||||
return
|
||||
}
|
||||
time.Sleep(tick)
|
||||
}
|
||||
errMsg := parseMsg("Condition never satisfied", msg...)
|
||||
_, file, line, _ := runtime.Caller(2)
|
||||
loggerFn("%s:%d %s (waited %v)", filepath.Base(file), line, errMsg, waitFor)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package require
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/testing/assertions"
|
||||
"github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
@@ -87,3 +89,9 @@ func ErrorIs(tb assertions.AssertionTestingTB, err, target error, msg ...any) {
|
||||
func StringContains(tb assertions.AssertionTestingTB, expected, actual string, msg ...any) {
|
||||
assertions.StringContains(tb.Fatalf, expected, actual, true, msg)
|
||||
}
|
||||
|
||||
// Eventually asserts that given condition will be met within waitFor time,
|
||||
// periodically checking target function each tick.
|
||||
func Eventually(tb assertions.AssertionTestingTB, condition func() bool, waitFor, tick time.Duration, msg ...any) {
|
||||
assertions.Eventually(tb.Fatalf, condition, waitFor, tick, msg...)
|
||||
}
|
||||
|
||||
12
testing/validator-mock/node_client_mock.go
generated
12
testing/validator-mock/node_client_mock.go
generated
@@ -56,18 +56,18 @@ func (mr *MockNodeClientMockRecorder) Genesis(arg0, arg1 any) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Genesis", reflect.TypeOf((*MockNodeClient)(nil).Genesis), arg0, arg1)
|
||||
}
|
||||
|
||||
// IsHealthy mocks base method.
|
||||
func (m *MockNodeClient) IsHealthy(arg0 context.Context) bool {
|
||||
// IsReady mocks base method.
|
||||
func (m *MockNodeClient) IsReady(arg0 context.Context) bool {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "IsHealthy", arg0)
|
||||
ret := m.ctrl.Call(m, "IsReady", arg0)
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// IsHealthy indicates an expected call of IsHealthy.
|
||||
func (mr *MockNodeClientMockRecorder) IsHealthy(arg0 any) *gomock.Call {
|
||||
// IsReady indicates an expected call of IsReady.
|
||||
func (mr *MockNodeClientMockRecorder) IsReady(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockNodeClient)(nil).IsHealthy), arg0)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockNodeClient)(nil).IsReady), arg0)
|
||||
}
|
||||
|
||||
// Peers mocks base method.
|
||||
|
||||
@@ -2,6 +2,7 @@ package beacon_api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/api/server/structs"
|
||||
@@ -101,12 +102,17 @@ func (c *beaconApiNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethp
|
||||
return nil, errors.New("beaconApiNodeClient.Peers is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiNodeClientWithFallback.")
|
||||
}
|
||||
|
||||
func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool {
|
||||
if err := c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil); err != nil {
|
||||
// IsReady returns true only if the node is fully synced (200 OK).
|
||||
// A 206 Partial Content response indicates the node is syncing and not ready.
|
||||
func (c *beaconApiNodeClient) IsReady(ctx context.Context) bool {
|
||||
statusCode, err := c.jsonRestHandler.GetStatusCode(ctx, "/eth/v1/node/health")
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to get health of node")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
// Only 200 OK means the node is fully synced and ready.
|
||||
// 206 Partial Content means syncing, 503 means unavailable.
|
||||
return statusCode == http.StatusOK
|
||||
}
|
||||
|
||||
func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
|
||||
|
||||
@@ -2,6 +2,7 @@ package beacon_api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/api/server/structs"
|
||||
@@ -287,3 +288,59 @@ func TestGetVersion(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsReady(t *testing.T) {
|
||||
const healthEndpoint = "/eth/v1/node/health"
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
statusCode int
|
||||
err error
|
||||
expectedResult bool
|
||||
}{
|
||||
{
|
||||
name: "returns true for 200 OK (fully synced)",
|
||||
statusCode: http.StatusOK,
|
||||
expectedResult: true,
|
||||
},
|
||||
{
|
||||
name: "returns false for 206 Partial Content (syncing)",
|
||||
statusCode: http.StatusPartialContent,
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "returns false for 503 Service Unavailable",
|
||||
statusCode: http.StatusServiceUnavailable,
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "returns false for 500 Internal Server Error",
|
||||
statusCode: http.StatusInternalServerError,
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "returns false on error",
|
||||
err: errors.New("request failed"),
|
||||
expectedResult: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
ctx := t.Context()
|
||||
|
||||
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
|
||||
jsonRestHandler.EXPECT().GetStatusCode(
|
||||
gomock.Any(),
|
||||
healthEndpoint,
|
||||
).Return(tc.statusCode, tc.err)
|
||||
|
||||
nodeClient := &beaconApiNodeClient{jsonRestHandler: jsonRestHandler}
|
||||
result := nodeClient.IsReady(ctx)
|
||||
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: validator/client/beacon-api/json_rest_handler.go
|
||||
// Source: validator/client/beacon-api/rest_handler_client.go
|
||||
//
|
||||
// Generated by this command:
|
||||
//
|
||||
// mockgen -package=mock -source=validator/client/beacon-api/json_rest_handler.go -destination=validator/client/beacon-api/mock/json_rest_handler_mock.go
|
||||
// mockgen -package=mock -source=validator/client/beacon-api/rest_handler_client.go -destination=validator/client/beacon-api/mock/json_rest_handler_mock.go RestHandler
|
||||
//
|
||||
|
||||
// Package mock is a generated GoMock package.
|
||||
@@ -18,32 +18,37 @@ import (
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
// MockJsonRestHandler is a mock of JsonRestHandler interface.
|
||||
type MockJsonRestHandler struct {
|
||||
// Backward compatibility aliases for the renamed mock type.
|
||||
type MockJsonRestHandler = MockRestHandler
|
||||
type MockJsonRestHandlerMockRecorder = MockRestHandlerMockRecorder
|
||||
|
||||
var NewMockJsonRestHandler = NewMockRestHandler
|
||||
|
||||
// MockRestHandler is a mock of RestHandler interface.
|
||||
type MockRestHandler struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockJsonRestHandlerMockRecorder
|
||||
isgomock struct{}
|
||||
recorder *MockRestHandlerMockRecorder
|
||||
}
|
||||
|
||||
// MockJsonRestHandlerMockRecorder is the mock recorder for MockJsonRestHandler.
|
||||
type MockJsonRestHandlerMockRecorder struct {
|
||||
mock *MockJsonRestHandler
|
||||
// MockRestHandlerMockRecorder is the mock recorder for MockRestHandler.
|
||||
type MockRestHandlerMockRecorder struct {
|
||||
mock *MockRestHandler
|
||||
}
|
||||
|
||||
// NewMockJsonRestHandler creates a new mock instance.
|
||||
func NewMockJsonRestHandler(ctrl *gomock.Controller) *MockJsonRestHandler {
|
||||
mock := &MockJsonRestHandler{ctrl: ctrl}
|
||||
mock.recorder = &MockJsonRestHandlerMockRecorder{mock}
|
||||
// NewMockRestHandler creates a new mock instance.
|
||||
func NewMockRestHandler(ctrl *gomock.Controller) *MockRestHandler {
|
||||
mock := &MockRestHandler{ctrl: ctrl}
|
||||
mock.recorder = &MockRestHandlerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockJsonRestHandler) EXPECT() *MockJsonRestHandlerMockRecorder {
|
||||
func (m *MockRestHandler) EXPECT() *MockRestHandlerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Get mocks base method.
|
||||
func (m *MockJsonRestHandler) Get(ctx context.Context, endpoint string, resp any) error {
|
||||
func (m *MockRestHandler) Get(ctx context.Context, endpoint string, resp any) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Get", ctx, endpoint, resp)
|
||||
ret0, _ := ret[0].(error)
|
||||
@@ -51,13 +56,13 @@ func (m *MockJsonRestHandler) Get(ctx context.Context, endpoint string, resp any
|
||||
}
|
||||
|
||||
// Get indicates an expected call of Get.
|
||||
func (mr *MockJsonRestHandlerMockRecorder) Get(ctx, endpoint, resp any) *gomock.Call {
|
||||
func (mr *MockRestHandlerMockRecorder) Get(ctx, endpoint, resp any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockJsonRestHandler)(nil).Get), ctx, endpoint, resp)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockRestHandler)(nil).Get), ctx, endpoint, resp)
|
||||
}
|
||||
|
||||
// GetSSZ mocks base method.
|
||||
func (m *MockJsonRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
|
||||
func (m *MockRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetSSZ", ctx, endpoint)
|
||||
ret0, _ := ret[0].([]byte)
|
||||
@@ -67,13 +72,28 @@ func (m *MockJsonRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]by
|
||||
}
|
||||
|
||||
// GetSSZ indicates an expected call of GetSSZ.
|
||||
func (mr *MockJsonRestHandlerMockRecorder) GetSSZ(ctx, endpoint any) *gomock.Call {
|
||||
func (mr *MockRestHandlerMockRecorder) GetSSZ(ctx, endpoint any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSSZ", reflect.TypeOf((*MockJsonRestHandler)(nil).GetSSZ), ctx, endpoint)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSSZ", reflect.TypeOf((*MockRestHandler)(nil).GetSSZ), ctx, endpoint)
|
||||
}
|
||||
|
||||
// GetStatusCode mocks base method.
|
||||
func (m *MockRestHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetStatusCode", ctx, endpoint)
|
||||
ret0, _ := ret[0].(int)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetStatusCode indicates an expected call of GetStatusCode.
|
||||
func (mr *MockRestHandlerMockRecorder) GetStatusCode(ctx, endpoint any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStatusCode", reflect.TypeOf((*MockRestHandler)(nil).GetStatusCode), ctx, endpoint)
|
||||
}
|
||||
|
||||
// Host mocks base method.
|
||||
func (m *MockJsonRestHandler) Host() string {
|
||||
func (m *MockRestHandler) Host() string {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Host")
|
||||
ret0, _ := ret[0].(string)
|
||||
@@ -81,13 +101,13 @@ func (m *MockJsonRestHandler) Host() string {
|
||||
}
|
||||
|
||||
// Host indicates an expected call of Host.
|
||||
func (mr *MockJsonRestHandlerMockRecorder) Host() *gomock.Call {
|
||||
func (mr *MockRestHandlerMockRecorder) Host() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Host", reflect.TypeOf((*MockJsonRestHandler)(nil).Host))
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Host", reflect.TypeOf((*MockRestHandler)(nil).Host))
|
||||
}
|
||||
|
||||
// HttpClient mocks base method.
|
||||
func (m *MockJsonRestHandler) HttpClient() *http.Client {
|
||||
func (m *MockRestHandler) HttpClient() *http.Client {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "HttpClient")
|
||||
ret0, _ := ret[0].(*http.Client)
|
||||
@@ -95,13 +115,13 @@ func (m *MockJsonRestHandler) HttpClient() *http.Client {
|
||||
}
|
||||
|
||||
// HttpClient indicates an expected call of HttpClient.
|
||||
func (mr *MockJsonRestHandlerMockRecorder) HttpClient() *gomock.Call {
|
||||
func (mr *MockRestHandlerMockRecorder) HttpClient() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HttpClient", reflect.TypeOf((*MockJsonRestHandler)(nil).HttpClient))
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HttpClient", reflect.TypeOf((*MockRestHandler)(nil).HttpClient))
|
||||
}
|
||||
|
||||
// Post mocks base method.
|
||||
func (m *MockJsonRestHandler) Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error {
|
||||
func (m *MockRestHandler) Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Post", ctx, endpoint, headers, data, resp)
|
||||
ret0, _ := ret[0].(error)
|
||||
@@ -109,13 +129,13 @@ func (m *MockJsonRestHandler) Post(ctx context.Context, endpoint string, headers
|
||||
}
|
||||
|
||||
// Post indicates an expected call of Post.
|
||||
func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, resp any) *gomock.Call {
|
||||
func (mr *MockRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, resp any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
|
||||
}
|
||||
|
||||
// Post mocks base method.
|
||||
func (m *MockJsonRestHandler) PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error) {
|
||||
// PostSSZ mocks base method.
|
||||
func (m *MockRestHandler) PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "PostSSZ", ctx, endpoint, headers, data)
|
||||
ret0, _ := ret[0].([]byte)
|
||||
@@ -124,20 +144,20 @@ func (m *MockJsonRestHandler) PostSSZ(ctx context.Context, endpoint string, head
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// Post indicates an expected call of Post.
|
||||
func (mr *MockJsonRestHandlerMockRecorder) PostSSZ(ctx, endpoint, headers, data any) *gomock.Call {
|
||||
// PostSSZ indicates an expected call of PostSSZ.
|
||||
func (mr *MockRestHandlerMockRecorder) PostSSZ(ctx, endpoint, headers, data any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostSSZ", reflect.TypeOf((*MockJsonRestHandler)(nil).PostSSZ), ctx, endpoint, headers, data)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostSSZ", reflect.TypeOf((*MockRestHandler)(nil).PostSSZ), ctx, endpoint, headers, data)
|
||||
}
|
||||
|
||||
// SetHost mocks base method.
|
||||
func (m *MockJsonRestHandler) SetHost(host string) {
|
||||
func (m *MockRestHandler) SetHost(host string) {
|
||||
m.ctrl.T.Helper()
|
||||
m.ctrl.Call(m, "SetHost", host)
|
||||
}
|
||||
|
||||
// SetHost indicates an expected call of SetHost.
|
||||
func (mr *MockJsonRestHandlerMockRecorder) SetHost(host any) *gomock.Call {
|
||||
func (mr *MockRestHandlerMockRecorder) SetHost(host any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockJsonRestHandler)(nil).SetHost), host)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockRestHandler)(nil).SetHost), host)
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ type reqOption func(*http.Request)
|
||||
|
||||
type RestHandler interface {
|
||||
Get(ctx context.Context, endpoint string, resp any) error
|
||||
GetStatusCode(ctx context.Context, endpoint string) (int, error)
|
||||
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
|
||||
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error
|
||||
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
|
||||
@@ -90,6 +91,28 @@ func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp an
|
||||
return decodeResp(httpResp, resp)
|
||||
}
|
||||
|
||||
// GetStatusCode sends a GET request and returns only the HTTP status code.
|
||||
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
|
||||
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
|
||||
func (c *BeaconApiRestHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
|
||||
url := c.host + endpoint
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
||||
}
|
||||
req.Header.Set("User-Agent", version.BuildData())
|
||||
httpResp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
||||
}
|
||||
defer func() {
|
||||
if err := httpResp.Body.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
return httpResp.StatusCode, nil
|
||||
}
|
||||
|
||||
func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
|
||||
url := c.host + endpoint
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
|
||||
@@ -359,3 +359,66 @@ func Test_decodeResp(t *testing.T) {
|
||||
assert.ErrorContains(t, "HTTP request unsuccessful (500: foo)", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetStatusCode(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
const endpoint = "/eth/v1/node/health"
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
serverStatusCode int
|
||||
expectedStatusCode int
|
||||
}{
|
||||
{
|
||||
name: "returns 200 OK",
|
||||
serverStatusCode: http.StatusOK,
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "returns 206 Partial Content",
|
||||
serverStatusCode: http.StatusPartialContent,
|
||||
expectedStatusCode: http.StatusPartialContent,
|
||||
},
|
||||
{
|
||||
name: "returns 503 Service Unavailable",
|
||||
serverStatusCode: http.StatusServiceUnavailable,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
{
|
||||
name: "returns 500 Internal Server Error",
|
||||
serverStatusCode: http.StatusInternalServerError,
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
|
||||
assert.Equal(t, version.BuildData(), r.Header.Get("User-Agent"))
|
||||
w.WriteHeader(tc.serverStatusCode)
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
jsonRestHandler := BeaconApiRestHandler{
|
||||
client: http.Client{Timeout: time.Second * 5},
|
||||
host: server.URL,
|
||||
}
|
||||
|
||||
statusCode, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedStatusCode, statusCode)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("returns error on connection failure", func(t *testing.T) {
|
||||
jsonRestHandler := BeaconApiRestHandler{
|
||||
client: http.Client{Timeout: time.Millisecond * 100},
|
||||
host: "http://localhost:99999", // Invalid port
|
||||
}
|
||||
|
||||
_, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
|
||||
require.ErrorContains(t, "failed to perform request", err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Pee
|
||||
return c.nodeClient.ListPeers(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) IsHealthy(ctx context.Context) bool {
|
||||
func (c *grpcNodeClient) IsReady(ctx context.Context) bool {
|
||||
_, err := c.nodeClient.GetHealth(ctx, ðpb.HealthRequest{})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to get health of node")
|
||||
|
||||
@@ -12,5 +12,5 @@ type NodeClient interface {
|
||||
Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error)
|
||||
Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error)
|
||||
Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error)
|
||||
IsHealthy(ctx context.Context) bool
|
||||
IsReady(ctx context.Context) bool
|
||||
}
|
||||
|
||||
@@ -1274,7 +1274,7 @@ func (v *validator) FindHealthyHost(ctx context.Context) bool {
|
||||
// Tail-recursive closure keeps retry count private.
|
||||
var check func(remaining int) bool
|
||||
check = func(remaining int) bool {
|
||||
if v.nodeClient.IsHealthy(ctx) { // healthy → done
|
||||
if v.nodeClient.IsReady(ctx) { // ready → done
|
||||
return true
|
||||
}
|
||||
if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi {
|
||||
|
||||
Reference in New Issue
Block a user