mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
10 Commits
process-ex
...
fix-backwa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93b2445d83 | ||
|
|
fe5d67496a | ||
|
|
a7eb4add27 | ||
|
|
0f0fced955 | ||
|
|
56477f944f | ||
|
|
5069057354 | ||
|
|
30340f632c | ||
|
|
6bfbbdbd1b | ||
|
|
d30c1cbb8a | ||
|
|
e9c7a62b97 |
@@ -295,11 +295,20 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB(saved.Slot())
|
||||
// Track supernode subscription status for logging purposes.
|
||||
isSupernode := flags.Get().Supernode
|
||||
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "update subscribed to all data subnets")
|
||||
}
|
||||
|
||||
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get and save custody group count")
|
||||
}
|
||||
|
||||
logCustodyStatus(wasSupernode, custodySubnetCount)
|
||||
|
||||
if _, _, err := s.cfg.P2P.UpdateCustodyInfo(earliestAvailableSlot, custodySubnetCount); err != nil {
|
||||
return errors.Wrap(err, "update custody info")
|
||||
}
|
||||
@@ -468,58 +477,100 @@ func (s *Service) removeStartupState() {
|
||||
s.cfg.FinalizedStateAtStartUp = nil
|
||||
}
|
||||
|
||||
// UpdateCustodyInfoInDB updates the custody information in the database.
|
||||
// It returns the (potentially updated) custody group count and the earliest available slot.
|
||||
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
isSemiSupernode := flags.Get().SemiSupernode
|
||||
|
||||
cfg := params.BeaconConfig()
|
||||
custodyRequirement := cfg.CustodyRequirement
|
||||
|
||||
// Check if the node was previously subscribed to all data subnets, and if so,
|
||||
// store the new status accordingly.
|
||||
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
|
||||
// updateCustodyInfoInDB updates the custody information in the database.
|
||||
// It returns the (potentially updated) earliest available slot and custody group count.
|
||||
func (s *Service) updateCustodyInfoInDB() (primitives.Slot, uint64, error) {
|
||||
targetCustodyGroupCount, err := computeTargetCustodyGroupCount()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update subscribed to all data subnets")
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
// Compute the target custody group count based on current flag configuration.
|
||||
targetCustodyGroupCount := custodyRequirement
|
||||
// Query current custody info.
|
||||
storedEarliestSlot, storedCustodyCount, err := s.cfg.BeaconDB.CustodyInfo(s.ctx)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "custody info")
|
||||
}
|
||||
|
||||
// Supernode: custody all groups (either currently set or previously enabled)
|
||||
// If custody was already initialized and no increase needed, return stored values.
|
||||
if storedCustodyCount > 0 && targetCustodyGroupCount <= storedCustodyCount {
|
||||
return storedEarliestSlot, storedCustodyCount, nil
|
||||
}
|
||||
|
||||
earliestAvailableSlot, err := s.earliestAvailableSlotForCustodyIncrease(storedCustodyCount)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
storedEarliestSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(
|
||||
s.ctx, earliestAvailableSlot, targetCustodyGroupCount)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update custody info")
|
||||
}
|
||||
|
||||
return storedEarliestSlot, actualCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
// earliestAvailableSlotForCustodyIncrease determines the earliest available slot when custody is increasing.
|
||||
// - Pre-fulu: use earliest block slot (no data column sharding)
|
||||
// - Post-fulu checkpoint sync (storedCustodyCount == 0): use headSlot
|
||||
// - Post-fulu restart with custody increase: use headSlot + 1
|
||||
//
|
||||
// (we don't have data columns for new custody groups at the current head)
|
||||
func (s *Service) earliestAvailableSlotForCustodyIncrease(storedCustodyCount uint64) (primitives.Slot, error) {
|
||||
fuluSlot, err := fuluForkSlot()
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "fulu fork slot")
|
||||
}
|
||||
|
||||
headSlot := s.HeadSlot()
|
||||
|
||||
if headSlot >= fuluSlot {
|
||||
if storedCustodyCount > 0 {
|
||||
// Restart with custody increase: new groups only have data from headSlot + 1
|
||||
return headSlot + 1, nil
|
||||
}
|
||||
// Checkpoint sync: we download data columns for the checkpoint block
|
||||
return headSlot, nil
|
||||
}
|
||||
|
||||
// Pre-fulu: use earliest block slot
|
||||
earliestAvailableSlot, err := s.cfg.BeaconDB.EarliestSlot(s.ctx)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "earliest slot")
|
||||
}
|
||||
return earliestAvailableSlot, nil
|
||||
}
|
||||
|
||||
// computeTargetCustodyGroupCount returns the custody group count based on current flag configuration.
|
||||
func computeTargetCustodyGroupCount() (uint64, error) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
isSemiSupernode := flags.Get().SemiSupernode
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
// Default to custody requirement
|
||||
targetCustodyGroupCount := cfg.CustodyRequirement
|
||||
|
||||
// Supernode: custody all groups
|
||||
if isSupernode {
|
||||
targetCustodyGroupCount = cfg.NumberOfCustodyGroups
|
||||
return cfg.NumberOfCustodyGroups, nil
|
||||
}
|
||||
|
||||
// Semi-supernode: custody minimum needed for reconstruction, or custody requirement if higher
|
||||
if isSemiSupernode {
|
||||
semiSupernodeCustody, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "minimum custody group count")
|
||||
return 0, errors.Wrap(err, "minimum custody group count")
|
||||
}
|
||||
|
||||
targetCustodyGroupCount = max(custodyRequirement, semiSupernodeCustody)
|
||||
targetCustodyGroupCount = max(cfg.CustodyRequirement, semiSupernodeCustody)
|
||||
}
|
||||
|
||||
// Safely compute the fulu fork slot.
|
||||
fuluForkSlot, err := fuluForkSlot()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "fulu fork slot")
|
||||
}
|
||||
return targetCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
// If slot is before the fulu fork slot, then use the earliest stored slot as the reference slot.
|
||||
if slot < fuluForkSlot {
|
||||
slot, err = s.cfg.BeaconDB.EarliestSlot(s.ctx)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "earliest slot")
|
||||
}
|
||||
}
|
||||
|
||||
earliestAvailableSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(s.ctx, slot, targetCustodyGroupCount)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update custody info")
|
||||
}
|
||||
// logCustodyStatus logs information about the custody configuration.
|
||||
func logCustodyStatus(wasSupernode bool, actualCustodyGroupCount uint64) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
if isSupernode {
|
||||
log.WithFields(logrus.Fields{
|
||||
@@ -531,8 +582,6 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
|
||||
if wasSupernode && !isSupernode {
|
||||
log.Warningf("Because the `--%s` flag was previously used, the node will continue to act as a super node.", flags.Supernode.Name)
|
||||
}
|
||||
|
||||
return earliestAvailableSlot, actualCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) {
|
||||
|
||||
@@ -621,37 +621,54 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
roBlock, err := blocks.NewROBlock(signedBeaconBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
fuluForkSlot := fuluForkEpoch * primitives.Slot(cfg.SlotsPerEpoch)
|
||||
|
||||
// Helper to set up forkchoice and head at a specific slot.
|
||||
// Returns the block root so it can be used as parent for subsequent calls.
|
||||
setupHead := func(t *testing.T, service *Service, slot primitives.Slot, parentRoot [32]byte) [32]byte {
|
||||
fc := ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
|
||||
blockRoot := [32]byte{byte(slot)}
|
||||
headState, headBlock, err := prepareForkchoiceState(ctx, slot, blockRoot, parentRoot, params.BeaconConfig().ZeroHash, fc, fc)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, headState, headBlock))
|
||||
require.NoError(t, service.setHead(&head{
|
||||
root: headBlock.Root(),
|
||||
block: headBlock,
|
||||
state: headState,
|
||||
slot: slot,
|
||||
}))
|
||||
return blockRoot
|
||||
}
|
||||
|
||||
t.Run("CGC increases before fulu", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
// Set head to pre-fulu slot
|
||||
preFuluSlot := primitives.Slot(15)
|
||||
setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Before Fulu - should use earliest stored slot
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
// Call again to ensure idempotency
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// Enable supernode while still pre-fulu
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(19)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1)
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
@@ -662,35 +679,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
// Start pre-fulu
|
||||
preFuluSlot := primitives.Slot(15)
|
||||
preFuluRoot := setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
// Move to post-fulu and enable supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot)
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, preFuluRoot)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot+1, actualEas) // head + 1
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
// Call again - earliest available slot shouldn't change
|
||||
laterSlot := postFuluSlot + 10
|
||||
setupHead(t, service, laterSlot, postFuluRoot)
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot+1, actualEas) // Still the original EAS
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
@@ -699,16 +717,19 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Enable supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
@@ -716,10 +737,14 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should still be supernode
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
// Move head forward
|
||||
laterSlot := postFuluSlot + 10
|
||||
setupHead(t, service, laterSlot, postFuluRoot)
|
||||
|
||||
// Should still be supernode (custody count shouldn't decrease)
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Still 64, not downgraded
|
||||
})
|
||||
|
||||
@@ -728,29 +753,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 32
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.SemiSupernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// UpdateCustodyInfo should prevent downgrade - custody count should remain at 64
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
// Move head forward
|
||||
laterSlot := postFuluSlot + 10
|
||||
setupHead(t, service, laterSlot, postFuluRoot)
|
||||
|
||||
// UpdateCustodyInfo should prevent downgrade
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 64 due to downgrade prevention by UpdateCustodyInfo
|
||||
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 32
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode to supernode upgrade allowed", func(t *testing.T) {
|
||||
@@ -758,18 +790,21 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Start with semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
|
||||
// Upgrade to full supernode
|
||||
gFlags.SemiSupernode = false
|
||||
@@ -777,12 +812,15 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should upgrade to full supernode
|
||||
upgradeSlot := slot + 2
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(upgradeSlot)
|
||||
// Move head forward for upgrade
|
||||
upgradeSlot := postFuluSlot + 10
|
||||
setupHead(t, service, upgradeSlot, postFuluRoot)
|
||||
|
||||
// Second call - restart with custody increase
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, upgradeSlot, actualEas) // Earliest slot updates when upgrading
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 128
|
||||
require.Equal(t, upgradeSlot+1, actualEas) // headSlot + 1 (restart with custody increase)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 64
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode with high validator requirements uses higher custody", func(t *testing.T) {
|
||||
@@ -790,6 +828,9 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
postFuluSlot := fuluForkSlot + 1
|
||||
setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
@@ -797,17 +838,92 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Mock a high custody requirement (simulating many validators)
|
||||
// We need to override the custody requirement calculation
|
||||
// For this test, we'll verify the logic by checking if custodyRequirement > 64
|
||||
// Since custodyRequirement in minimalTestService is 4, we can't test the high case here
|
||||
// This would require a different test setup with actual validators
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
// First call - checkpoint sync scenario (no existing custody)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
// With low validator requirements (4), should use semi-supernode minimum (64)
|
||||
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2
|
||||
// With low validator requirements (4), should use semi-supernode minimum (32)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("Restart with higher custody uses head slot plus one", func(t *testing.T) {
|
||||
// This test simulates a restart scenario where:
|
||||
// 1. Node was running with lower custody count
|
||||
// 2. Node stops and waits (new blocks are produced)
|
||||
// 3. Node restarts with higher custody count
|
||||
// In this case, we should use head + 1 as the earliest available slot.
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// First, establish existing custody in the DB (simulates previous run)
|
||||
initialSlot := fuluForkSlot + 10
|
||||
initialRoot := setupHead(t, service, initialSlot, params.BeaconConfig().ZeroHash)
|
||||
|
||||
// Initial call with default custody (simulates first run checkpoint sync)
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, initialSlot, actualEas) // First time = headSlot
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// Verify existing custody is stored (calling with 0 reads without modifying)
|
||||
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, custodyRequirement, storedCount)
|
||||
|
||||
// Now simulate restart at a higher head slot with increased custody
|
||||
headSlot := fuluForkSlot + 100
|
||||
setupHead(t, service, headSlot, initialRoot)
|
||||
require.Equal(t, headSlot, service.HeadSlot())
|
||||
|
||||
// Enable supernode (simulating restart with increased custody)
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
|
||||
// The earliest available slot should be head + 1 (restart scenario)
|
||||
expectedEas := headSlot + 1
|
||||
require.Equal(t, expectedEas, actualEas, "Expected head + 1 for restart with custody increase")
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("Checkpoint sync uses head slot without plus one", func(t *testing.T) {
|
||||
// This test verifies checkpoint sync behavior:
|
||||
// When syncing from a checkpoint with no existing custody info,
|
||||
// the earliest available slot should be the checkpoint slot itself
|
||||
// (not +1), because we download the checkpoint block's data columns.
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set up head at checkpoint slot (simulates checkpoint sync)
|
||||
checkpointSlot := fuluForkSlot + 50
|
||||
setupHead(t, service, checkpointSlot, params.BeaconConfig().ZeroHash)
|
||||
require.Equal(t, checkpointSlot, service.HeadSlot())
|
||||
|
||||
// Verify no existing custody in DB (calling with 0 reads without modifying)
|
||||
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), storedCount)
|
||||
|
||||
// Enable supernode for checkpoint sync
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
|
||||
require.NoError(t, err)
|
||||
|
||||
// For checkpoint sync, EAS should be the checkpoint slot (not +1)
|
||||
require.Equal(t, checkpointSlot, actualEas, "Checkpoint sync should use headSlot, not headSlot+1")
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -128,6 +128,7 @@ type NoHeadAccessDatabase interface {
|
||||
BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error
|
||||
|
||||
// Custody operations.
|
||||
CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error)
|
||||
UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error)
|
||||
UpdateEarliestAvailableSlot(ctx context.Context, earliestAvailableSlot primitives.Slot) error
|
||||
UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error)
|
||||
|
||||
@@ -14,9 +14,38 @@ import (
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// CustodyInfo returns the stored earliest available slot and custody group count.
|
||||
func (s *Store) CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.CustodyInfo")
|
||||
defer span.End()
|
||||
|
||||
var storedGroupCount uint64
|
||||
var storedEarliestAvailableSlot primitives.Slot
|
||||
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(custodyBucket)
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if b := bucket.Get(groupCountKey); len(b) != 0 {
|
||||
storedGroupCount = bytesutil.BytesToUint64BigEndian(b)
|
||||
}
|
||||
if b := bucket.Get(earliestAvailableSlotKey); len(b) != 0 {
|
||||
storedEarliestAvailableSlot = primitives.Slot(bytesutil.BytesToUint64BigEndian(b))
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
return storedEarliestAvailableSlot, storedGroupCount, nil
|
||||
}
|
||||
|
||||
// UpdateCustodyInfo atomically updates the custody group count only if it is greater than the stored one.
|
||||
// In this case, it also updates the earliest available slot with the provided value.
|
||||
// It returns the (potentially updated) custody group count and earliest available slot.
|
||||
// The earliest available slot is only updated if the new value is higher than the stored one,
|
||||
// preventing loss of availability for data we already have (e.g., when switching from normal to supernode).
|
||||
// It returns the (potentially updated) earliest available slot and custody group count.
|
||||
func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.UpdateCustodyInfo")
|
||||
defer span.End()
|
||||
@@ -46,20 +75,23 @@ func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot pri
|
||||
return nil
|
||||
}
|
||||
|
||||
storedGroupCount, storedEarliestAvailableSlot = custodyGroupCount, earliestAvailableSlot
|
||||
|
||||
// Store the earliest available slot.
|
||||
bytes := bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
|
||||
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
|
||||
return errors.Wrap(err, "put earliest available slot")
|
||||
}
|
||||
|
||||
// Store the custody group count.
|
||||
bytes = bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
|
||||
// Update the custody group count.
|
||||
storedGroupCount = custodyGroupCount
|
||||
bytes := bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
|
||||
if err := bucket.Put(groupCountKey, bytes); err != nil {
|
||||
return errors.Wrap(err, "put custody group count")
|
||||
}
|
||||
|
||||
// Only update earliestAvailableSlot if the new value is higher.
|
||||
// This prevents losing availability for data we already have.
|
||||
if earliestAvailableSlot > storedEarliestAvailableSlot {
|
||||
storedEarliestAvailableSlot = earliestAvailableSlot
|
||||
bytes = bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
|
||||
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
|
||||
return errors.Wrap(err, "put earliest available slot")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, 0, err
|
||||
|
||||
@@ -89,7 +89,7 @@ func TestUpdateCustodyInfo(t *testing.T) {
|
||||
require.Equal(t, groupCount, storedCount)
|
||||
})
|
||||
|
||||
t.Run("update with higher group count", func(t *testing.T) {
|
||||
t.Run("update with higher group count and higher slot", func(t *testing.T) {
|
||||
const (
|
||||
initialSlot = primitives.Slot(100)
|
||||
initialCount = uint64(5)
|
||||
@@ -112,6 +112,150 @@ func TestUpdateCustodyInfo(t *testing.T) {
|
||||
require.Equal(t, groupCount, storedCount)
|
||||
})
|
||||
|
||||
t.Run("update with higher group count and lower slot should preserve higher slot", func(t *testing.T) {
|
||||
// This is the bug scenario: when switching from normal mode to semi-supernode,
|
||||
// the incoming slot might be lower than the stored slot, but we should preserve
|
||||
// the higher stored slot to avoid advertising that we can serve data we don't have.
|
||||
const (
|
||||
initialSlot = primitives.Slot(1835523) // Higher stored slot
|
||||
initialCount = uint64(10)
|
||||
earliestSlot = primitives.Slot(1835456) // Lower incoming slot (e.g., from head slot)
|
||||
groupCount = uint64(64) // Increasing custody (e.g., semi-supernode)
|
||||
)
|
||||
|
||||
db := setupDB(t)
|
||||
|
||||
_, _, err := db.UpdateCustodyInfo(ctx, initialSlot, initialCount)
|
||||
require.NoError(t, err)
|
||||
|
||||
// When custody count increases but slot is lower, the higher slot should be preserved
|
||||
slot, count, err := db.UpdateCustodyInfo(ctx, earliestSlot, groupCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, initialSlot, slot, "earliestAvailableSlot should not decrease when custody group count increases")
|
||||
require.Equal(t, groupCount, count)
|
||||
|
||||
// Verify in the database
|
||||
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, initialSlot, storedSlot, "stored slot should be the higher value")
|
||||
require.Equal(t, groupCount, storedCount)
|
||||
})
|
||||
|
||||
t.Run("pre-fulu scenario: checkpoint sync before fork, restart with semi-supernode", func(t *testing.T) {
|
||||
// This test covers the pre-Fulu bug scenario:
|
||||
// 1. Node starts with checkpoint sync BEFORE Fulu fork - uses EarliestSlot() (checkpoint block slot)
|
||||
// 2. Validators connect after Fulu activates - maintainCustodyInfo() updates to head slot (higher)
|
||||
// 3. Node restarts with --semi-supernode - updateCustodyInfoInDB uses EarliestSlot() again
|
||||
// The bug was that step 3 would overwrite the higher slot from step 2.
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.FuluForkEpoch = 100
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Derive slot values relative to Fulu fork
|
||||
checkpointBlockSlot := fuluForkSlot - 10 // Checkpoint sync happened before Fulu
|
||||
headSlot := fuluForkSlot + 5 // Head slot after Fulu activates
|
||||
defaultCustody := cfg.CustodyRequirement // Default custody from config
|
||||
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
|
||||
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
|
||||
|
||||
// Verify our test setup: checkpoint is pre-Fulu, head is post-Fulu
|
||||
require.Equal(t, true, checkpointBlockSlot < fuluForkSlot, "checkpoint must be before Fulu fork")
|
||||
require.Equal(t, true, headSlot >= fuluForkSlot, "head must be at or after Fulu fork")
|
||||
|
||||
db := setupDB(t)
|
||||
|
||||
// Step 1: Node starts with checkpoint sync (pre-Fulu)
|
||||
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot()
|
||||
slot, count, err := db.UpdateCustodyInfo(ctx, checkpointBlockSlot, defaultCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, checkpointBlockSlot, slot)
|
||||
require.Equal(t, defaultCustody, count)
|
||||
|
||||
// Step 2: Validators connect after Fulu activates, maintainCustodyInfo() runs
|
||||
// Uses headSlot which is higher than checkpointBlockSlot
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot, "should update to head slot")
|
||||
require.Equal(t, validatorCustody, count)
|
||||
|
||||
// Verify step 2 stored correctly
|
||||
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, headSlot, storedSlot)
|
||||
require.Equal(t, validatorCustody, storedCount)
|
||||
|
||||
// Step 3: Restart with --semi-supernode
|
||||
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot() again
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, checkpointBlockSlot, semiSupernodeCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease back to checkpoint slot")
|
||||
require.Equal(t, semiSupernodeCustody, count)
|
||||
|
||||
// Verify the database preserved the higher slot
|
||||
storedSlot, storedCount = getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, headSlot, storedSlot, "stored slot should remain at head slot, not checkpoint slot")
|
||||
require.Equal(t, semiSupernodeCustody, storedCount)
|
||||
})
|
||||
|
||||
t.Run("post-fulu scenario: finalized slot lower than stored head slot", func(t *testing.T) {
|
||||
// This test covers the post-Fulu bug scenario:
|
||||
// Post-fork, updateCustodyInfoInDB uses saved.Slot() (finalized slot) directly,
|
||||
// not EarliestSlot(). But the same bug can occur because:
|
||||
// - maintainCustodyInfo() stores headSlot (higher)
|
||||
// - Restart uses finalized slot (lower than head)
|
||||
// Our fix ensures earliestAvailableSlot never decreases.
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.FuluForkEpoch = 100
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Derive slot values relative to Fulu fork - all slots are AFTER Fulu
|
||||
finalizedSlotAtStart := fuluForkSlot + 100 // Finalized slot at first start (post-Fulu)
|
||||
headSlot := fuluForkSlot + 200 // Head slot when validators connect
|
||||
finalizedSlotRestart := fuluForkSlot + 150 // Finalized slot at restart (< headSlot)
|
||||
defaultCustody := cfg.CustodyRequirement // Default custody from config
|
||||
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
|
||||
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
|
||||
|
||||
// Verify our test setup: all slots are post-Fulu
|
||||
require.Equal(t, true, finalizedSlotAtStart >= fuluForkSlot, "finalized slot must be at or after Fulu fork")
|
||||
require.Equal(t, true, headSlot >= fuluForkSlot, "head slot must be at or after Fulu fork")
|
||||
require.Equal(t, true, finalizedSlotRestart >= fuluForkSlot, "restart finalized slot must be at or after Fulu fork")
|
||||
require.Equal(t, true, finalizedSlotRestart < headSlot, "restart finalized slot must be less than head slot")
|
||||
|
||||
db := setupDB(t)
|
||||
|
||||
// Step 1: Node starts post-Fulu
|
||||
// updateCustodyInfoInDB sees saved.Slot() >= fuluForkSlot, so uses saved.Slot() directly
|
||||
slot, count, err := db.UpdateCustodyInfo(ctx, finalizedSlotAtStart, defaultCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, finalizedSlotAtStart, slot)
|
||||
require.Equal(t, defaultCustody, count)
|
||||
|
||||
// Step 2: Validators connect, maintainCustodyInfo() uses head slot
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot)
|
||||
require.Equal(t, validatorCustody, count)
|
||||
|
||||
// Step 3: Restart with --semi-supernode
|
||||
// updateCustodyInfoInDB uses finalized slot which is lower than stored head slot
|
||||
slot, count, err = db.UpdateCustodyInfo(ctx, finalizedSlotRestart, semiSupernodeCustody)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease to finalized slot")
|
||||
require.Equal(t, semiSupernodeCustody, count)
|
||||
|
||||
// Verify database preserved the higher slot
|
||||
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
|
||||
require.Equal(t, headSlot, storedSlot)
|
||||
require.Equal(t, semiSupernodeCustody, storedCount)
|
||||
})
|
||||
|
||||
t.Run("update with lower group count should not update", func(t *testing.T) {
|
||||
const (
|
||||
initialSlot = primitives.Slot(200)
|
||||
|
||||
3
changelog/james-prysm_fix-backward-earliest-slot.md
Normal file
3
changelog/james-prysm_fix-backward-earliest-slot.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Fixes earliest slot should never go backwards when setting semi-supernode or supernode flags
|
||||
Reference in New Issue
Block a user