Compare commits

...

10 Commits

Author SHA1 Message Date
james-prysm
93b2445d83 another refactor for readability 2026-01-06 20:45:13 -06:00
james-prysm
fe5d67496a refactoring code for readability 2026-01-06 20:30:29 -06:00
james-prysm
a7eb4add27 more cleanup fixing edge cases for checkpoint sync vs restart with head 2026-01-06 19:31:30 -06:00
james-prysm
0f0fced955 handle non checkpoint case on restart with increased custody count 2026-01-06 15:58:44 -06:00
james-prysm
56477f944f Merge branch 'develop' into fix-backward-earliest-slot 2026-01-06 12:22:49 -08:00
james-prysm
5069057354 Update beacon-chain/db/kv/custody.go
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-01-06 09:33:47 -06:00
james-prysm
30340f632c Merge branch 'develop' into fix-backward-earliest-slot 2025-12-17 08:18:16 -08:00
james-prysm
6bfbbdbd1b fixing unit tests and updating changelog 2025-12-08 21:55:34 -06:00
james-prysm
d30c1cbb8a optimization for earliest slot 2025-12-08 16:57:53 -06:00
james-prysm
e9c7a62b97 attempt to fix earliest slot going backwards 2025-12-08 16:51:46 -06:00
6 changed files with 460 additions and 115 deletions

View File

@@ -295,11 +295,20 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
return nil
}
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB(saved.Slot())
// Track supernode subscription status for logging purposes.
isSupernode := flags.Get().Supernode
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
if err != nil {
return errors.Wrap(err, "update subscribed to all data subnets")
}
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB()
if err != nil {
return errors.Wrap(err, "could not get and save custody group count")
}
logCustodyStatus(wasSupernode, custodySubnetCount)
if _, _, err := s.cfg.P2P.UpdateCustodyInfo(earliestAvailableSlot, custodySubnetCount); err != nil {
return errors.Wrap(err, "update custody info")
}
@@ -468,58 +477,100 @@ func (s *Service) removeStartupState() {
s.cfg.FinalizedStateAtStartUp = nil
}
// UpdateCustodyInfoInDB updates the custody information in the database.
// It returns the (potentially updated) custody group count and the earliest available slot.
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
isSupernode := flags.Get().Supernode
isSemiSupernode := flags.Get().SemiSupernode
cfg := params.BeaconConfig()
custodyRequirement := cfg.CustodyRequirement
// Check if the node was previously subscribed to all data subnets, and if so,
// store the new status accordingly.
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
// updateCustodyInfoInDB updates the custody information in the database.
// It returns the (potentially updated) earliest available slot and custody group count.
func (s *Service) updateCustodyInfoInDB() (primitives.Slot, uint64, error) {
targetCustodyGroupCount, err := computeTargetCustodyGroupCount()
if err != nil {
return 0, 0, errors.Wrap(err, "update subscribed to all data subnets")
return 0, 0, err
}
// Compute the target custody group count based on current flag configuration.
targetCustodyGroupCount := custodyRequirement
// Query current custody info.
storedEarliestSlot, storedCustodyCount, err := s.cfg.BeaconDB.CustodyInfo(s.ctx)
if err != nil {
return 0, 0, errors.Wrap(err, "custody info")
}
// Supernode: custody all groups (either currently set or previously enabled)
// If custody was already initialized and no increase needed, return stored values.
if storedCustodyCount > 0 && targetCustodyGroupCount <= storedCustodyCount {
return storedEarliestSlot, storedCustodyCount, nil
}
earliestAvailableSlot, err := s.earliestAvailableSlotForCustodyIncrease(storedCustodyCount)
if err != nil {
return 0, 0, err
}
storedEarliestSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(
s.ctx, earliestAvailableSlot, targetCustodyGroupCount)
if err != nil {
return 0, 0, errors.Wrap(err, "update custody info")
}
return storedEarliestSlot, actualCustodyGroupCount, nil
}
// earliestAvailableSlotForCustodyIncrease determines the earliest available slot when custody is increasing.
// - Pre-fulu: use earliest block slot (no data column sharding)
// - Post-fulu checkpoint sync (storedCustodyCount == 0): use headSlot
// - Post-fulu restart with custody increase: use headSlot + 1
//
// (we don't have data columns for new custody groups at the current head)
func (s *Service) earliestAvailableSlotForCustodyIncrease(storedCustodyCount uint64) (primitives.Slot, error) {
fuluSlot, err := fuluForkSlot()
if err != nil {
return 0, errors.Wrap(err, "fulu fork slot")
}
headSlot := s.HeadSlot()
if headSlot >= fuluSlot {
if storedCustodyCount > 0 {
// Restart with custody increase: new groups only have data from headSlot + 1
return headSlot + 1, nil
}
// Checkpoint sync: we download data columns for the checkpoint block
return headSlot, nil
}
// Pre-fulu: use earliest block slot
earliestAvailableSlot, err := s.cfg.BeaconDB.EarliestSlot(s.ctx)
if err != nil {
return 0, errors.Wrap(err, "earliest slot")
}
return earliestAvailableSlot, nil
}
// computeTargetCustodyGroupCount returns the custody group count based on current flag configuration.
func computeTargetCustodyGroupCount() (uint64, error) {
isSupernode := flags.Get().Supernode
isSemiSupernode := flags.Get().SemiSupernode
cfg := params.BeaconConfig()
// Default to custody requirement
targetCustodyGroupCount := cfg.CustodyRequirement
// Supernode: custody all groups
if isSupernode {
targetCustodyGroupCount = cfg.NumberOfCustodyGroups
return cfg.NumberOfCustodyGroups, nil
}
// Semi-supernode: custody minimum needed for reconstruction, or custody requirement if higher
if isSemiSupernode {
semiSupernodeCustody, err := peerdas.MinimumCustodyGroupCountToReconstruct()
if err != nil {
return 0, 0, errors.Wrap(err, "minimum custody group count")
return 0, errors.Wrap(err, "minimum custody group count")
}
targetCustodyGroupCount = max(custodyRequirement, semiSupernodeCustody)
targetCustodyGroupCount = max(cfg.CustodyRequirement, semiSupernodeCustody)
}
// Safely compute the fulu fork slot.
fuluForkSlot, err := fuluForkSlot()
if err != nil {
return 0, 0, errors.Wrap(err, "fulu fork slot")
}
return targetCustodyGroupCount, nil
}
// If slot is before the fulu fork slot, then use the earliest stored slot as the reference slot.
if slot < fuluForkSlot {
slot, err = s.cfg.BeaconDB.EarliestSlot(s.ctx)
if err != nil {
return 0, 0, errors.Wrap(err, "earliest slot")
}
}
earliestAvailableSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(s.ctx, slot, targetCustodyGroupCount)
if err != nil {
return 0, 0, errors.Wrap(err, "update custody info")
}
// logCustodyStatus logs information about the custody configuration.
func logCustodyStatus(wasSupernode bool, actualCustodyGroupCount uint64) {
isSupernode := flags.Get().Supernode
cfg := params.BeaconConfig()
if isSupernode {
log.WithFields(logrus.Fields{
@@ -531,8 +582,6 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
if wasSupernode && !isSupernode {
log.Warningf("Because the `--%s` flag was previously used, the node will continue to act as a super node.", flags.Supernode.Name)
}
return earliestAvailableSlot, actualCustodyGroupCount, nil
}
func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) {

View File

@@ -621,37 +621,54 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
roBlock, err := blocks.NewROBlock(signedBeaconBlock)
require.NoError(t, err)
fuluForkSlot := fuluForkEpoch * primitives.Slot(cfg.SlotsPerEpoch)
// Helper to set up forkchoice and head at a specific slot.
// Returns the block root so it can be used as parent for subsequent calls.
setupHead := func(t *testing.T, service *Service, slot primitives.Slot, parentRoot [32]byte) [32]byte {
fc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
blockRoot := [32]byte{byte(slot)}
headState, headBlock, err := prepareForkchoiceState(ctx, slot, blockRoot, parentRoot, params.BeaconConfig().ZeroHash, fc, fc)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, headState, headBlock))
require.NoError(t, service.setHead(&head{
root: headBlock.Root(),
block: headBlock,
state: headState,
slot: slot,
}))
return blockRoot
}
t.Run("CGC increases before fulu", func(t *testing.T) {
service, requirements := minimalTestService(t)
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// Before Fulu
// -----------
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
// Set head to pre-fulu slot
preFuluSlot := primitives.Slot(15)
setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
// Before Fulu - should use earliest stored slot
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
// Call again to ensure idempotency
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
// Enable supernode while still pre-fulu
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(19)
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, numberOfCustodyGroups, actualCgc)
// After Fulu
// ----------
actualEas, actualCgc, err = service.updateCustodyInfoInDB(fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, numberOfCustodyGroups, actualCgc)
@@ -662,35 +679,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// Before Fulu
// -----------
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
// Start pre-fulu
preFuluSlot := primitives.Slot(15)
preFuluRoot := setupHead(t, service, preFuluSlot, params.BeaconConfig().ZeroHash)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
require.NoError(t, err)
require.Equal(t, earliestStoredSlot, actualEas)
require.Equal(t, custodyRequirement, actualCgc)
// After Fulu
// ----------
// Move to post-fulu and enable supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, preFuluRoot)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot+1, actualEas) // head + 1
require.Equal(t, numberOfCustodyGroups, actualCgc)
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
// Call again - earliest available slot shouldn't change
laterSlot := postFuluSlot + 10
setupHead(t, service, laterSlot, postFuluRoot)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot+1, actualEas) // Still the original EAS
require.Equal(t, numberOfCustodyGroups, actualCgc)
})
@@ -699,16 +717,19 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Enable supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
require.Equal(t, numberOfCustodyGroups, actualCgc)
// Try to downgrade by removing flag
@@ -716,10 +737,14 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
flags.Init(gFlags)
defer flags.Init(resetFlags)
// Should still be supernode
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
// Move head forward
laterSlot := postFuluSlot + 10
setupHead(t, service, laterSlot, postFuluRoot)
// Should still be supernode (custody count shouldn't decrease)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
require.Equal(t, numberOfCustodyGroups, actualCgc) // Still 64, not downgraded
})
@@ -728,29 +753,36 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Enable semi-supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SemiSupernode = true
flags.Init(gFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 32
require.Equal(t, semiSupernodeCustody, actualCgc)
// Try to downgrade by removing flag
gFlags.SemiSupernode = false
flags.Init(gFlags)
defer flags.Init(resetFlags)
// UpdateCustodyInfo should prevent downgrade - custody count should remain at 64
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
// Move head forward
laterSlot := postFuluSlot + 10
setupHead(t, service, laterSlot, postFuluRoot)
// UpdateCustodyInfo should prevent downgrade
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 64 due to downgrade prevention by UpdateCustodyInfo
require.Equal(t, postFuluSlot, actualEas) // Original EAS preserved
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 32
})
t.Run("Semi-supernode to supernode upgrade allowed", func(t *testing.T) {
@@ -758,18 +790,21 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
postFuluRoot := setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Start with semi-supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SemiSupernode = true
flags.Init(gFlags)
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
semiSupernodeCustody := numberOfCustodyGroups / 2
require.Equal(t, semiSupernodeCustody, actualCgc)
// Upgrade to full supernode
gFlags.SemiSupernode = false
@@ -777,12 +812,15 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
flags.Init(gFlags)
defer flags.Init(resetFlags)
// Should upgrade to full supernode
upgradeSlot := slot + 2
actualEas, actualCgc, err = service.updateCustodyInfoInDB(upgradeSlot)
// Move head forward for upgrade
upgradeSlot := postFuluSlot + 10
setupHead(t, service, upgradeSlot, postFuluRoot)
// Second call - restart with custody increase
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, upgradeSlot, actualEas) // Earliest slot updates when upgrading
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 128
require.Equal(t, upgradeSlot+1, actualEas) // headSlot + 1 (restart with custody increase)
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 64
})
t.Run("Semi-supernode with high validator requirements uses higher custody", func(t *testing.T) {
@@ -790,6 +828,9 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
postFuluSlot := fuluForkSlot + 1
setupHead(t, service, postFuluSlot, params.BeaconConfig().ZeroHash)
// Enable semi-supernode
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
@@ -797,17 +838,92 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
flags.Init(gFlags)
defer flags.Init(resetFlags)
// Mock a high custody requirement (simulating many validators)
// We need to override the custody requirement calculation
// For this test, we'll verify the logic by checking if custodyRequirement > 64
// Since custodyRequirement in minimalTestService is 4, we can't test the high case here
// This would require a different test setup with actual validators
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
// First call - checkpoint sync scenario (no existing custody)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, slot, actualEas)
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
// With low validator requirements (4), should use semi-supernode minimum (64)
require.Equal(t, postFuluSlot, actualEas) // headSlot (checkpoint sync)
semiSupernodeCustody := numberOfCustodyGroups / 2
// With low validator requirements (4), should use semi-supernode minimum (32)
require.Equal(t, semiSupernodeCustody, actualCgc)
})
t.Run("Restart with higher custody uses head slot plus one", func(t *testing.T) {
// This test simulates a restart scenario where:
// 1. Node was running with lower custody count
// 2. Node stops and waits (new blocks are produced)
// 3. Node restarts with higher custody count
// In this case, we should use head + 1 as the earliest available slot.
service, requirements := minimalTestService(t)
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// First, establish existing custody in the DB (simulates previous run)
initialSlot := fuluForkSlot + 10
initialRoot := setupHead(t, service, initialSlot, params.BeaconConfig().ZeroHash)
// Initial call with default custody (simulates first run checkpoint sync)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
require.Equal(t, initialSlot, actualEas) // First time = headSlot
require.Equal(t, custodyRequirement, actualCgc)
// Verify existing custody is stored (calling with 0 reads without modifying)
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
require.NoError(t, err)
require.Equal(t, custodyRequirement, storedCount)
// Now simulate restart at a higher head slot with increased custody
headSlot := fuluForkSlot + 100
setupHead(t, service, headSlot, initialRoot)
require.Equal(t, headSlot, service.HeadSlot())
// Enable supernode (simulating restart with increased custody)
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
actualEas, actualCgc, err = service.updateCustodyInfoInDB()
require.NoError(t, err)
// The earliest available slot should be head + 1 (restart scenario)
expectedEas := headSlot + 1
require.Equal(t, expectedEas, actualEas, "Expected head + 1 for restart with custody increase")
require.Equal(t, numberOfCustodyGroups, actualCgc)
})
t.Run("Checkpoint sync uses head slot without plus one", func(t *testing.T) {
// This test verifies checkpoint sync behavior:
// When syncing from a checkpoint with no existing custody info,
// the earliest available slot should be the checkpoint slot itself
// (not +1), because we download the checkpoint block's data columns.
service, requirements := minimalTestService(t)
err = requirements.db.SaveBlock(ctx, roBlock)
require.NoError(t, err)
// Set up head at checkpoint slot (simulates checkpoint sync)
checkpointSlot := fuluForkSlot + 50
setupHead(t, service, checkpointSlot, params.BeaconConfig().ZeroHash)
require.Equal(t, checkpointSlot, service.HeadSlot())
// Verify no existing custody in DB (calling with 0 reads without modifying)
_, storedCount, err := requirements.db.UpdateCustodyInfo(ctx, 0, 0)
require.NoError(t, err)
require.Equal(t, uint64(0), storedCount)
// Enable supernode for checkpoint sync
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.Supernode = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
actualEas, actualCgc, err := service.updateCustodyInfoInDB()
require.NoError(t, err)
// For checkpoint sync, EAS should be the checkpoint slot (not +1)
require.Equal(t, checkpointSlot, actualEas, "Checkpoint sync should use headSlot, not headSlot+1")
require.Equal(t, numberOfCustodyGroups, actualCgc)
})
}

View File

@@ -128,6 +128,7 @@ type NoHeadAccessDatabase interface {
BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error
// Custody operations.
CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error)
UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error)
UpdateEarliestAvailableSlot(ctx context.Context, earliestAvailableSlot primitives.Slot) error
UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error)

View File

@@ -14,9 +14,38 @@ import (
bolt "go.etcd.io/bbolt"
)
// CustodyInfo returns the stored earliest available slot and custody group count.
func (s *Store) CustodyInfo(ctx context.Context) (primitives.Slot, uint64, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.CustodyInfo")
defer span.End()
var storedGroupCount uint64
var storedEarliestAvailableSlot primitives.Slot
if err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(custodyBucket)
if bucket == nil {
return nil
}
if b := bucket.Get(groupCountKey); len(b) != 0 {
storedGroupCount = bytesutil.BytesToUint64BigEndian(b)
}
if b := bucket.Get(earliestAvailableSlotKey); len(b) != 0 {
storedEarliestAvailableSlot = primitives.Slot(bytesutil.BytesToUint64BigEndian(b))
}
return nil
}); err != nil {
return 0, 0, err
}
return storedEarliestAvailableSlot, storedGroupCount, nil
}
// UpdateCustodyInfo atomically updates the custody group count only if it is greater than the stored one.
// In this case, it also updates the earliest available slot with the provided value.
// It returns the (potentially updated) custody group count and earliest available slot.
// The earliest available slot is only updated if the new value is higher than the stored one,
// preventing loss of availability for data we already have (e.g., when switching from normal to supernode).
// It returns the (potentially updated) earliest available slot and custody group count.
func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.UpdateCustodyInfo")
defer span.End()
@@ -46,20 +75,23 @@ func (s *Store) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot pri
return nil
}
storedGroupCount, storedEarliestAvailableSlot = custodyGroupCount, earliestAvailableSlot
// Store the earliest available slot.
bytes := bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
return errors.Wrap(err, "put earliest available slot")
}
// Store the custody group count.
bytes = bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
// Update the custody group count.
storedGroupCount = custodyGroupCount
bytes := bytesutil.Uint64ToBytesBigEndian(custodyGroupCount)
if err := bucket.Put(groupCountKey, bytes); err != nil {
return errors.Wrap(err, "put custody group count")
}
// Only update earliestAvailableSlot if the new value is higher.
// This prevents losing availability for data we already have.
if earliestAvailableSlot > storedEarliestAvailableSlot {
storedEarliestAvailableSlot = earliestAvailableSlot
bytes = bytesutil.Uint64ToBytesBigEndian(uint64(earliestAvailableSlot))
if err := bucket.Put(earliestAvailableSlotKey, bytes); err != nil {
return errors.Wrap(err, "put earliest available slot")
}
}
return nil
}); err != nil {
return 0, 0, err

View File

@@ -89,7 +89,7 @@ func TestUpdateCustodyInfo(t *testing.T) {
require.Equal(t, groupCount, storedCount)
})
t.Run("update with higher group count", func(t *testing.T) {
t.Run("update with higher group count and higher slot", func(t *testing.T) {
const (
initialSlot = primitives.Slot(100)
initialCount = uint64(5)
@@ -112,6 +112,150 @@ func TestUpdateCustodyInfo(t *testing.T) {
require.Equal(t, groupCount, storedCount)
})
t.Run("update with higher group count and lower slot should preserve higher slot", func(t *testing.T) {
// This is the bug scenario: when switching from normal mode to semi-supernode,
// the incoming slot might be lower than the stored slot, but we should preserve
// the higher stored slot to avoid advertising that we can serve data we don't have.
const (
initialSlot = primitives.Slot(1835523) // Higher stored slot
initialCount = uint64(10)
earliestSlot = primitives.Slot(1835456) // Lower incoming slot (e.g., from head slot)
groupCount = uint64(64) // Increasing custody (e.g., semi-supernode)
)
db := setupDB(t)
_, _, err := db.UpdateCustodyInfo(ctx, initialSlot, initialCount)
require.NoError(t, err)
// When custody count increases but slot is lower, the higher slot should be preserved
slot, count, err := db.UpdateCustodyInfo(ctx, earliestSlot, groupCount)
require.NoError(t, err)
require.Equal(t, initialSlot, slot, "earliestAvailableSlot should not decrease when custody group count increases")
require.Equal(t, groupCount, count)
// Verify in the database
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
require.Equal(t, initialSlot, storedSlot, "stored slot should be the higher value")
require.Equal(t, groupCount, storedCount)
})
t.Run("pre-fulu scenario: checkpoint sync before fork, restart with semi-supernode", func(t *testing.T) {
// This test covers the pre-Fulu bug scenario:
// 1. Node starts with checkpoint sync BEFORE Fulu fork - uses EarliestSlot() (checkpoint block slot)
// 2. Validators connect after Fulu activates - maintainCustodyInfo() updates to head slot (higher)
// 3. Node restarts with --semi-supernode - updateCustodyInfoInDB uses EarliestSlot() again
// The bug was that step 3 would overwrite the higher slot from step 2.
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 100
params.OverrideBeaconConfig(cfg)
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
require.NoError(t, err)
// Derive slot values relative to Fulu fork
checkpointBlockSlot := fuluForkSlot - 10 // Checkpoint sync happened before Fulu
headSlot := fuluForkSlot + 5 // Head slot after Fulu activates
defaultCustody := cfg.CustodyRequirement // Default custody from config
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
// Verify our test setup: checkpoint is pre-Fulu, head is post-Fulu
require.Equal(t, true, checkpointBlockSlot < fuluForkSlot, "checkpoint must be before Fulu fork")
require.Equal(t, true, headSlot >= fuluForkSlot, "head must be at or after Fulu fork")
db := setupDB(t)
// Step 1: Node starts with checkpoint sync (pre-Fulu)
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot()
slot, count, err := db.UpdateCustodyInfo(ctx, checkpointBlockSlot, defaultCustody)
require.NoError(t, err)
require.Equal(t, checkpointBlockSlot, slot)
require.Equal(t, defaultCustody, count)
// Step 2: Validators connect after Fulu activates, maintainCustodyInfo() runs
// Uses headSlot which is higher than checkpointBlockSlot
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot, "should update to head slot")
require.Equal(t, validatorCustody, count)
// Verify step 2 stored correctly
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
require.Equal(t, headSlot, storedSlot)
require.Equal(t, validatorCustody, storedCount)
// Step 3: Restart with --semi-supernode
// updateCustodyInfoInDB sees saved.Slot() < fuluForkSlot, so uses EarliestSlot() again
slot, count, err = db.UpdateCustodyInfo(ctx, checkpointBlockSlot, semiSupernodeCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease back to checkpoint slot")
require.Equal(t, semiSupernodeCustody, count)
// Verify the database preserved the higher slot
storedSlot, storedCount = getCustodyInfoFromDB(t, db)
require.Equal(t, headSlot, storedSlot, "stored slot should remain at head slot, not checkpoint slot")
require.Equal(t, semiSupernodeCustody, storedCount)
})
t.Run("post-fulu scenario: finalized slot lower than stored head slot", func(t *testing.T) {
// This test covers the post-Fulu bug scenario:
// Post-fork, updateCustodyInfoInDB uses saved.Slot() (finalized slot) directly,
// not EarliestSlot(). But the same bug can occur because:
// - maintainCustodyInfo() stores headSlot (higher)
// - Restart uses finalized slot (lower than head)
// Our fix ensures earliestAvailableSlot never decreases.
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 100
params.OverrideBeaconConfig(cfg)
fuluForkSlot, err := slots.EpochStart(cfg.FuluForkEpoch)
require.NoError(t, err)
// Derive slot values relative to Fulu fork - all slots are AFTER Fulu
finalizedSlotAtStart := fuluForkSlot + 100 // Finalized slot at first start (post-Fulu)
headSlot := fuluForkSlot + 200 // Head slot when validators connect
finalizedSlotRestart := fuluForkSlot + 150 // Finalized slot at restart (< headSlot)
defaultCustody := cfg.CustodyRequirement // Default custody from config
validatorCustody := cfg.CustodyRequirement + 6 // Custody after validators connect
semiSupernodeCustody := cfg.NumberOfCustodyGroups // Semi-supernode custodies all groups
// Verify our test setup: all slots are post-Fulu
require.Equal(t, true, finalizedSlotAtStart >= fuluForkSlot, "finalized slot must be at or after Fulu fork")
require.Equal(t, true, headSlot >= fuluForkSlot, "head slot must be at or after Fulu fork")
require.Equal(t, true, finalizedSlotRestart >= fuluForkSlot, "restart finalized slot must be at or after Fulu fork")
require.Equal(t, true, finalizedSlotRestart < headSlot, "restart finalized slot must be less than head slot")
db := setupDB(t)
// Step 1: Node starts post-Fulu
// updateCustodyInfoInDB sees saved.Slot() >= fuluForkSlot, so uses saved.Slot() directly
slot, count, err := db.UpdateCustodyInfo(ctx, finalizedSlotAtStart, defaultCustody)
require.NoError(t, err)
require.Equal(t, finalizedSlotAtStart, slot)
require.Equal(t, defaultCustody, count)
// Step 2: Validators connect, maintainCustodyInfo() uses head slot
slot, count, err = db.UpdateCustodyInfo(ctx, headSlot, validatorCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot)
require.Equal(t, validatorCustody, count)
// Step 3: Restart with --semi-supernode
// updateCustodyInfoInDB uses finalized slot which is lower than stored head slot
slot, count, err = db.UpdateCustodyInfo(ctx, finalizedSlotRestart, semiSupernodeCustody)
require.NoError(t, err)
require.Equal(t, headSlot, slot, "earliestAvailableSlot should NOT decrease to finalized slot")
require.Equal(t, semiSupernodeCustody, count)
// Verify database preserved the higher slot
storedSlot, storedCount := getCustodyInfoFromDB(t, db)
require.Equal(t, headSlot, storedSlot)
require.Equal(t, semiSupernodeCustody, storedCount)
})
t.Run("update with lower group count should not update", func(t *testing.T) {
const (
initialSlot = primitives.Slot(200)

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixes earliest slot should never go backwards when setting semi-supernode or supernode flags