mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
6 Commits
hdiff_migr
...
7185c4fb39
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7185c4fb39 | ||
|
|
0db74365e0 | ||
|
|
6f90101364 | ||
|
|
49e1763ec2 | ||
|
|
c2527c82cd | ||
|
|
d4ea8fafd6 |
@@ -1053,40 +1053,3 @@ func TestKZGCommitmentToVersionedHashes(t *testing.T) {
|
||||
require.Equal(t, vhs[0].String(), vh0)
|
||||
require.Equal(t, vhs[1].String(), vh1)
|
||||
}
|
||||
|
||||
func TestComputePayloadAttribute(t *testing.T) {
|
||||
service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache()))
|
||||
ctx := tr.ctx
|
||||
|
||||
st, _ := util.DeterministicGenesisStateBellatrix(t, 1)
|
||||
|
||||
service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, Index: 0})
|
||||
// Cache hit, advance state, no fee recipient
|
||||
slot := primitives.Slot(1)
|
||||
service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{})
|
||||
blk := util.NewBeaconBlockBellatrix()
|
||||
signed, err := consensusblocks.NewSignedBeaconBlock(blk)
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(signed, [32]byte{'a'})
|
||||
require.NoError(t, err)
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
}
|
||||
fcu := &fcuConfig{
|
||||
headState: st,
|
||||
proposingSlot: slot,
|
||||
headRoot: [32]byte{},
|
||||
}
|
||||
require.NoError(t, service.computePayloadAttributes(cfg, fcu))
|
||||
require.Equal(t, false, fcu.attributes.IsEmpty())
|
||||
require.Equal(t, params.BeaconConfig().EthBurnAddressHex, common.BytesToAddress(fcu.attributes.SuggestedFeeRecipient()).String())
|
||||
|
||||
// Cache hit, advance state, has fee recipient
|
||||
suggestedAddr := common.HexToAddress("123")
|
||||
service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, FeeRecipient: primitives.ExecutionAddress(suggestedAddr), Index: 0})
|
||||
service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{})
|
||||
require.NoError(t, service.computePayloadAttributes(cfg, fcu))
|
||||
require.Equal(t, false, fcu.attributes.IsEmpty())
|
||||
require.Equal(t, suggestedAddr, common.BytesToAddress(fcu.attributes.SuggestedFeeRecipient()))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
payloadattribute "github.com/OffchainLabs/prysm/v7/consensus-types/payload-attribute"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -53,58 +54,53 @@ type fcuConfig struct {
|
||||
}
|
||||
|
||||
// sendFCU handles the logic to notify the engine of a forckhoice update
|
||||
// for the first time when processing an incoming block during regular sync. It
|
||||
// always updates the shuffling caches and handles epoch transitions when the
|
||||
// incoming block is late, preparing payload attributes in this case while it
|
||||
// only sends a message with empty attributes for early blocks.
|
||||
func (s *Service) sendFCU(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
|
||||
if !s.isNewHead(cfg.headRoot) {
|
||||
return nil
|
||||
// when processing an incoming block during regular sync. It
|
||||
// always updates the shuffling caches and handles epoch transitions .
|
||||
func (s *Service) sendFCU(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) {
|
||||
if cfg.postState.Version() < version.Fulu {
|
||||
// update the caches to compute the right proposer index
|
||||
// this function is called under a forkchoice lock which we need to release.
|
||||
s.ForkChoicer().Unlock()
|
||||
s.updateCachesPostBlockProcessing(cfg)
|
||||
s.ForkChoicer().Lock()
|
||||
}
|
||||
if err := s.getFCUArgs(cfg, fcuArgs); err != nil {
|
||||
log.WithError(err).Error("Could not get forkchoice update argument")
|
||||
return
|
||||
}
|
||||
// If head has not been updated and attributes are nil, we can skip the FCU.
|
||||
if !s.isNewHead(cfg.headRoot) && (fcuArgs.attributes == nil || fcuArgs.attributes.IsEmpty()) {
|
||||
return
|
||||
}
|
||||
// If we are proposing and we aim to reorg the block, we have already sent FCU with attributes on lateBlockTasks
|
||||
if fcuArgs.attributes != nil && !fcuArgs.attributes.IsEmpty() && s.shouldOverrideFCU(cfg.headRoot, s.CurrentSlot()+1) {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
if s.inRegularSync() {
|
||||
go s.forkchoiceUpdateWithExecution(cfg.ctx, fcuArgs)
|
||||
}
|
||||
return s.forkchoiceUpdateWithExecution(cfg.ctx, fcuArgs)
|
||||
}
|
||||
|
||||
// sendFCUWithAttributes computes the payload attributes and sends an FCU message
|
||||
// to the engine if needed
|
||||
func (s *Service) sendFCUWithAttributes(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) {
|
||||
slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline)
|
||||
defer cancel()
|
||||
cfg.ctx = slotCtx
|
||||
s.cfg.ForkChoiceStore.RLock()
|
||||
defer s.cfg.ForkChoiceStore.RUnlock()
|
||||
if err := s.computePayloadAttributes(cfg, fcuArgs); err != nil {
|
||||
log.WithError(err).Error("Could not compute payload attributes")
|
||||
return
|
||||
}
|
||||
if fcuArgs.attributes.IsEmpty() {
|
||||
return
|
||||
}
|
||||
if _, err := s.notifyForkchoiceUpdate(cfg.ctx, fcuArgs); err != nil {
|
||||
log.WithError(err).Error("Could not update forkchoice with payload attributes for proposal")
|
||||
if s.isNewHead(fcuArgs.headRoot) {
|
||||
if err := s.saveHead(cfg.ctx, fcuArgs.headRoot, fcuArgs.headBlock, fcuArgs.headState); err != nil {
|
||||
log.WithError(err).Error("Could not save head")
|
||||
}
|
||||
s.pruneAttsFromPool(s.ctx, fcuArgs.headState, fcuArgs.headBlock)
|
||||
}
|
||||
}
|
||||
|
||||
// fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It decides whether a new call to FCU should be made.
|
||||
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) error {
|
||||
// fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It gets a forkchoice lock and calls the engine.
|
||||
// The caller of this function should NOT have a lock in forkchoice store.
|
||||
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) {
|
||||
_, span := trace.StartSpan(ctx, "beacon-chain.blockchain.forkchoiceUpdateWithExecution")
|
||||
defer span.End()
|
||||
// Note: Use the service context here to avoid the parent context being ended during a forkchoice update.
|
||||
ctx = trace.NewContext(s.ctx, span)
|
||||
s.ForkChoicer().Lock()
|
||||
defer s.ForkChoicer().Unlock()
|
||||
_, err := s.notifyForkchoiceUpdate(ctx, args)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not notify forkchoice update")
|
||||
log.WithError(err).Error("Could not notify forkchoice update")
|
||||
}
|
||||
|
||||
if err := s.saveHead(ctx, args.headRoot, args.headBlock, args.headState); err != nil {
|
||||
log.WithError(err).Error("Could not save head")
|
||||
}
|
||||
|
||||
// Only need to prune attestations from pool if the head has changed.
|
||||
s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock)
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldOverrideFCU checks whether the incoming block is still subject to being
|
||||
|
||||
@@ -97,7 +97,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
|
||||
headBlock: wsb,
|
||||
proposingSlot: service.CurrentSlot() + 1,
|
||||
}
|
||||
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))
|
||||
service.forkchoiceUpdateWithExecution(ctx, args)
|
||||
|
||||
payloadID, has := service.cfg.PayloadIDCache.PayloadID(2, [32]byte{2})
|
||||
require.Equal(t, true, has)
|
||||
@@ -151,7 +151,7 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin
|
||||
headRoot: r,
|
||||
proposingSlot: service.CurrentSlot() + 1,
|
||||
}
|
||||
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))
|
||||
service.forkchoiceUpdateWithExecution(ctx, args)
|
||||
}
|
||||
|
||||
func TestShouldOverrideFCU(t *testing.T) {
|
||||
|
||||
@@ -66,9 +66,6 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
|
||||
startTime := time.Now()
|
||||
fcuArgs := &fcuConfig{}
|
||||
|
||||
if s.inRegularSync() {
|
||||
defer s.handleSecondFCUCall(cfg, fcuArgs)
|
||||
}
|
||||
if features.Get().EnableLightClient && slots.ToEpoch(s.CurrentSlot()) >= params.BeaconConfig().AltairForkEpoch {
|
||||
defer s.processLightClientUpdates(cfg)
|
||||
}
|
||||
@@ -105,14 +102,17 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
|
||||
s.logNonCanonicalBlockReceived(cfg.roblock.Root(), cfg.headRoot)
|
||||
return nil
|
||||
}
|
||||
if err := s.getFCUArgs(cfg, fcuArgs); err != nil {
|
||||
log.WithError(err).Error("Could not get forkchoice update argument")
|
||||
return nil
|
||||
}
|
||||
if err := s.sendFCU(cfg, fcuArgs); err != nil {
|
||||
return errors.Wrap(err, "could not send FCU to engine")
|
||||
}
|
||||
s.sendFCU(cfg, fcuArgs)
|
||||
|
||||
// Pre-Fulu the caches are updated when computing the payload attributes
|
||||
if cfg.postState.Version() >= version.Fulu {
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, slotDeadline)
|
||||
defer cancel()
|
||||
cfg.ctx = ctx
|
||||
s.updateCachesPostBlockProcessing(cfg)
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -322,6 +322,7 @@ func (s *Service) areSidecarsAvailable(ctx context.Context, avs das.Availability
|
||||
return nil
|
||||
}
|
||||
|
||||
// the caller of this function must not hold a lock in forkchoice store.
|
||||
func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error {
|
||||
e := coreTime.CurrentEpoch(st)
|
||||
if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil {
|
||||
@@ -351,7 +352,9 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
|
||||
if e > 0 {
|
||||
e = e - 1
|
||||
}
|
||||
s.ForkChoicer().RLock()
|
||||
target, err := s.cfg.ForkChoiceStore.TargetRootForEpoch(r, e)
|
||||
s.ForkChoicer().RUnlock()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not update proposer index state-root map")
|
||||
return nil
|
||||
@@ -364,7 +367,7 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
|
||||
}
|
||||
|
||||
// Epoch boundary tasks: it copies the headState and updates the epoch boundary
|
||||
// caches.
|
||||
// caches. The caller of this function must not hold a lock in forkchoice store.
|
||||
func (s *Service) handleEpochBoundary(ctx context.Context, slot primitives.Slot, headState state.BeaconState, blockRoot []byte) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.handleEpochBoundary")
|
||||
defer span.End()
|
||||
@@ -904,8 +907,6 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
|
||||
if currentSlot == s.HeadSlot() {
|
||||
return
|
||||
}
|
||||
s.cfg.ForkChoiceStore.RLock()
|
||||
defer s.cfg.ForkChoiceStore.RUnlock()
|
||||
// return early if we are in init sync
|
||||
if !s.inRegularSync() {
|
||||
return
|
||||
@@ -918,14 +919,32 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
|
||||
if lastState == nil {
|
||||
lastRoot, lastState = headRoot[:], headState
|
||||
}
|
||||
// Copy all the field tries in our cached state in the event of late
|
||||
// blocks.
|
||||
lastState.CopyAllTries()
|
||||
if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
|
||||
log.WithError(err).Debug("Could not update next slot state cache")
|
||||
}
|
||||
if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil {
|
||||
log.WithError(err).Error("Could not update epoch boundary caches")
|
||||
// Before Fulu we need to process the next slot to find out if we are proposing.
|
||||
if lastState.Version() < version.Fulu {
|
||||
// Copy all the field tries in our cached state in the event of late
|
||||
// blocks.
|
||||
lastState.CopyAllTries()
|
||||
if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
|
||||
log.WithError(err).Debug("Could not update next slot state cache")
|
||||
}
|
||||
if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil {
|
||||
log.WithError(err).Error("Could not update epoch boundary caches")
|
||||
}
|
||||
} else {
|
||||
// After Fulu, we can update the caches asynchronously after sending FCU to the engine
|
||||
defer func() {
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, slotDeadline)
|
||||
defer cancel()
|
||||
lastState.CopyAllTries()
|
||||
if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
|
||||
log.WithError(err).Debug("Could not update next slot state cache")
|
||||
}
|
||||
if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil {
|
||||
log.WithError(err).Error("Could not update epoch boundary caches")
|
||||
}
|
||||
}()
|
||||
}()
|
||||
}
|
||||
// return early if we already started building a block for the current
|
||||
// head root
|
||||
@@ -955,6 +974,8 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
|
||||
headBlock: headBlock,
|
||||
attributes: attribute,
|
||||
}
|
||||
s.cfg.ForkChoiceStore.Lock()
|
||||
defer s.cfg.ForkChoiceStore.Unlock()
|
||||
_, err = s.notifyForkchoiceUpdate(ctx, fcuArgs)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("could not perform late block tasks: failed to update forkchoice with engine")
|
||||
|
||||
@@ -42,14 +42,8 @@ func (s *Service) getFCUArgs(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) er
|
||||
if err := s.getFCUArgsEarlyBlock(cfg, fcuArgs); err != nil {
|
||||
return err
|
||||
}
|
||||
if !s.inRegularSync() {
|
||||
return nil
|
||||
}
|
||||
slot := cfg.roblock.Block().Slot()
|
||||
if slots.WithinVotingWindow(s.genesisTime, slot) {
|
||||
return nil
|
||||
}
|
||||
return s.computePayloadAttributes(cfg, fcuArgs)
|
||||
fcuArgs.attributes = s.getPayloadAttribute(cfg.ctx, fcuArgs.headState, fcuArgs.proposingSlot, cfg.headRoot[:])
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) getFCUArgsEarlyBlock(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
|
||||
@@ -173,26 +167,19 @@ func (s *Service) processLightClientUpdates(cfg *postBlockProcessConfig) {
|
||||
|
||||
// updateCachesPostBlockProcessing updates the next slot cache and handles the epoch
|
||||
// boundary in order to compute the right proposer indices after processing
|
||||
// state transition. This function is called on late blocks while still locked,
|
||||
// before sending FCU to the engine.
|
||||
func (s *Service) updateCachesPostBlockProcessing(cfg *postBlockProcessConfig) error {
|
||||
// state transition. The caller of this function must not hold a lock in forkchoice store.
|
||||
func (s *Service) updateCachesPostBlockProcessing(cfg *postBlockProcessConfig) {
|
||||
slot := cfg.postState.Slot()
|
||||
root := cfg.roblock.Root()
|
||||
if err := transition.UpdateNextSlotCache(cfg.ctx, root[:], cfg.postState); err != nil {
|
||||
return errors.Wrap(err, "could not update next slot state cache")
|
||||
log.WithError(err).Error("Could not update next slot state cache")
|
||||
return
|
||||
}
|
||||
if !slots.IsEpochEnd(slot) {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
return s.handleEpochBoundary(cfg.ctx, slot, cfg.postState, root[:])
|
||||
}
|
||||
|
||||
// handleSecondFCUCall handles a second call to FCU when syncing a new block.
|
||||
// This is useful when proposing in the next block and we want to defer the
|
||||
// computation of the next slot shuffling.
|
||||
func (s *Service) handleSecondFCUCall(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) {
|
||||
if (fcuArgs.attributes == nil || fcuArgs.attributes.IsEmpty()) && cfg.headRoot == cfg.roblock.Root() {
|
||||
go s.sendFCUWithAttributes(cfg, fcuArgs)
|
||||
if err := s.handleEpochBoundary(cfg.ctx, slot, cfg.postState, root[:]); err != nil {
|
||||
log.WithError(err).Error("Could not handle epoch boundary")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,20 +189,6 @@ func reportProcessingTime(startTime time.Time) {
|
||||
onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds()))
|
||||
}
|
||||
|
||||
// computePayloadAttributes modifies the passed FCU arguments to
|
||||
// contain the right payload attributes with the tracked proposer. It gets
|
||||
// called on blocks that arrive after the attestation voting window, or in a
|
||||
// background routine after syncing early blocks.
|
||||
func (s *Service) computePayloadAttributes(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
|
||||
if cfg.roblock.Root() == cfg.headRoot {
|
||||
if err := s.updateCachesPostBlockProcessing(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
fcuArgs.attributes = s.getPayloadAttribute(cfg.ctx, fcuArgs.headState, fcuArgs.proposingSlot, cfg.headRoot[:])
|
||||
return nil
|
||||
}
|
||||
|
||||
// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block
|
||||
// to retrieve the state in DB. It verifies the pre state's validity and the incoming block
|
||||
// is in the correct time window.
|
||||
|
||||
@@ -738,7 +738,9 @@ func TestOnBlock_CanFinalize_WithOnTick(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, r)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch))
|
||||
_, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch)
|
||||
require.NoError(t, err)
|
||||
@@ -788,7 +790,9 @@ func TestOnBlock_CanFinalize(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, r)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch))
|
||||
_, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch)
|
||||
require.NoError(t, err)
|
||||
@@ -816,7 +820,9 @@ func TestOnBlock_NilBlock(t *testing.T) {
|
||||
service, tr := minimalTestService(t)
|
||||
signed := &consensusblocks.SignedBeaconBlock{}
|
||||
roblock := consensusblocks.ROBlock{ReadOnlySignedBeaconBlock: signed}
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err := service.postBlockProcess(&postBlockProcessConfig{tr.ctx, roblock, [32]byte{}, nil, true})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.Equal(t, true, IsInvalidBlock(err))
|
||||
}
|
||||
|
||||
@@ -848,7 +854,9 @@ func TestOnBlock_CallNewPayloadAndForkchoiceUpdated(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, r)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
testState, err = service.cfg.StateGen.StateByRoot(ctx, r)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
@@ -1321,7 +1329,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
|
||||
lock.Lock()
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb1, r1)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
lock.Unlock()
|
||||
wg.Done()
|
||||
}()
|
||||
@@ -1333,7 +1343,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
|
||||
lock.Lock()
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb2, r2)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
lock.Unlock()
|
||||
wg.Done()
|
||||
}()
|
||||
@@ -1345,7 +1357,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
|
||||
lock.Lock()
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb3, r3)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
lock.Unlock()
|
||||
wg.Done()
|
||||
}()
|
||||
@@ -1357,7 +1371,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
|
||||
lock.Lock()
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb4, r4)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
lock.Unlock()
|
||||
wg.Done()
|
||||
}()
|
||||
@@ -1382,197 +1398,6 @@ func Test_verifyBlkFinalizedSlot_invalidBlock(t *testing.T) {
|
||||
require.Equal(t, true, IsInvalidBlock(err))
|
||||
}
|
||||
|
||||
// See the description in #10777 and #10782 for the full setup
|
||||
// We sync optimistically a chain of blocks. Block 17 is the last block in Epoch
|
||||
// 2. Block 18 justifies block 12 (the first in Epoch 2) and Block 19 returns
|
||||
// INVALID from FCU, with LVH block 17. No head is viable. We check
|
||||
// that the node is optimistic and that we can actually import a block on top of
|
||||
// 17 and recover.
|
||||
func TestStore_NoViableHead_FCU(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
config := params.BeaconConfig()
|
||||
config.SlotsPerEpoch = 6
|
||||
config.AltairForkEpoch = 1
|
||||
config.BellatrixForkEpoch = 2
|
||||
params.OverrideBeaconConfig(config)
|
||||
|
||||
mockEngine := &mockExecution.EngineClient{ErrNewPayload: execution.ErrAcceptedSyncingPayloadStatus, ErrForkchoiceUpdated: execution.ErrAcceptedSyncingPayloadStatus}
|
||||
service, tr := minimalTestService(t, WithExecutionEngineCaller(mockEngine))
|
||||
ctx := tr.ctx
|
||||
|
||||
st, keys := util.DeterministicGenesisState(t, 64)
|
||||
stateRoot, err := st.HashTreeRoot(ctx)
|
||||
require.NoError(t, err, "Could not hash genesis state")
|
||||
|
||||
require.NoError(t, service.saveGenesisData(ctx, st))
|
||||
|
||||
genesis := blocks.NewGenesisBlock(stateRoot[:])
|
||||
wsb, err := consensusblocks.NewSignedBeaconBlock(genesis)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb), "Could not save genesis block")
|
||||
|
||||
parentRoot, err := genesis.Block.HashTreeRoot()
|
||||
require.NoError(t, err, "Could not get signing root")
|
||||
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, st, parentRoot), "Could not save genesis state")
|
||||
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, parentRoot), "Could not save genesis state")
|
||||
|
||||
for i := 1; i < 6; i++ {
|
||||
driftGenesisTime(service, primitives.Slot(i), 0)
|
||||
st, err := service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
b, err := util.GenerateFullBlock(st, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
|
||||
require.NoError(t, err)
|
||||
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
root, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
preState, err := service.getBlockPreState(ctx, wsb.Block())
|
||||
require.NoError(t, err)
|
||||
postState, err := service.validateStateTransition(ctx, preState, wsb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
}
|
||||
|
||||
for i := 6; i < 12; i++ {
|
||||
driftGenesisTime(service, primitives.Slot(i), 0)
|
||||
st, err := service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
b, err := util.GenerateFullBlockAltair(st, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
|
||||
require.NoError(t, err)
|
||||
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
root, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
preState, err := service.getBlockPreState(ctx, wsb.Block())
|
||||
require.NoError(t, err)
|
||||
postState, err := service.validateStateTransition(ctx, preState, wsb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for i := 12; i < 18; i++ {
|
||||
driftGenesisTime(service, primitives.Slot(i), 0)
|
||||
st, err := service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
b, err := util.GenerateFullBlockBellatrix(st, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
|
||||
require.NoError(t, err)
|
||||
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
root, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
preState, err := service.getBlockPreState(ctx, wsb.Block())
|
||||
require.NoError(t, err)
|
||||
postState, err := service.validateStateTransition(ctx, preState, wsb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// Check that we haven't justified the second epoch yet
|
||||
jc := service.cfg.ForkChoiceStore.JustifiedCheckpoint()
|
||||
require.Equal(t, primitives.Epoch(0), jc.Epoch)
|
||||
|
||||
// import a block that justifies the second epoch
|
||||
driftGenesisTime(service, 18, 0)
|
||||
validHeadState, err := service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
b, err := util.GenerateFullBlockBellatrix(validHeadState, keys, util.DefaultBlockGenConfig(), 18)
|
||||
require.NoError(t, err)
|
||||
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
firstInvalidRoot, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
preState, err := service.getBlockPreState(ctx, wsb.Block())
|
||||
require.NoError(t, err)
|
||||
postState, err := service.validateStateTransition(ctx, preState, wsb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.savePostStateInfo(ctx, firstInvalidRoot, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, firstInvalidRoot)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
jc = service.cfg.ForkChoiceStore.JustifiedCheckpoint()
|
||||
require.Equal(t, primitives.Epoch(2), jc.Epoch)
|
||||
|
||||
sjc := validHeadState.CurrentJustifiedCheckpoint()
|
||||
require.Equal(t, primitives.Epoch(0), sjc.Epoch)
|
||||
lvh := b.Block.Body.ExecutionPayload.ParentHash
|
||||
// check our head
|
||||
require.Equal(t, firstInvalidRoot, service.cfg.ForkChoiceStore.CachedHeadRoot())
|
||||
|
||||
// import another block to find out that it was invalid
|
||||
mockEngine = &mockExecution.EngineClient{ErrNewPayload: execution.ErrAcceptedSyncingPayloadStatus, ErrForkchoiceUpdated: execution.ErrInvalidPayloadStatus, ForkChoiceUpdatedResp: lvh}
|
||||
service.cfg.ExecutionEngineCaller = mockEngine
|
||||
driftGenesisTime(service, 19, 0)
|
||||
st, err = service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
b, err = util.GenerateFullBlockBellatrix(st, keys, util.DefaultBlockGenConfig(), 19)
|
||||
require.NoError(t, err)
|
||||
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
root, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
preState, err = service.getBlockPreState(ctx, wsb.Block())
|
||||
require.NoError(t, err)
|
||||
postState, err = service.validateStateTransition(ctx, preState, wsb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.ErrorContains(t, "received an INVALID payload from execution engine", err)
|
||||
// Check that forkchoice's head is the last invalid block imported. The
|
||||
// store's headroot is the previous head (since the invalid block did
|
||||
// not finish importing) one and that the node is optimistic
|
||||
require.Equal(t, root, service.cfg.ForkChoiceStore.CachedHeadRoot())
|
||||
headRoot, err := service.HeadRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, firstInvalidRoot, bytesutil.ToBytes32(headRoot))
|
||||
optimistic, err := service.IsOptimistic(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, optimistic)
|
||||
|
||||
// import another block based on the last valid head state
|
||||
mockEngine = &mockExecution.EngineClient{}
|
||||
service.cfg.ExecutionEngineCaller = mockEngine
|
||||
driftGenesisTime(service, 20, 0)
|
||||
b, err = util.GenerateFullBlockBellatrix(validHeadState, keys, &util.BlockGenConfig{}, 20)
|
||||
require.NoError(t, err)
|
||||
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
|
||||
require.NoError(t, err)
|
||||
root, err = b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
preState, err = service.getBlockPreState(ctx, wsb.Block())
|
||||
require.NoError(t, err)
|
||||
postState, err = service.validateStateTransition(ctx, preState, wsb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
|
||||
require.NoError(t, err)
|
||||
// Check the newly imported block is head, it justified the right
|
||||
// checkpoint and the node is no longer optimistic
|
||||
require.Equal(t, root, service.cfg.ForkChoiceStore.CachedHeadRoot())
|
||||
sjc = service.CurrentJustifiedCheckpt()
|
||||
require.Equal(t, jc.Epoch, sjc.Epoch)
|
||||
require.Equal(t, jc.Root, bytesutil.ToBytes32(sjc.Root))
|
||||
optimistic, err = service.IsOptimistic(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, optimistic)
|
||||
}
|
||||
|
||||
// See the description in #10777 and #10782 for the full setup
|
||||
// We sync optimistically a chain of blocks. Block 17 is the last block in Epoch
|
||||
// 2. Block 18 justifies block 12 (the first in Epoch 2) and Block 19 returns
|
||||
@@ -1624,7 +1449,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
|
||||
for i := 6; i < 12; i++ {
|
||||
@@ -1644,8 +1471,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
|
||||
for i := 12; i < 18; i++ {
|
||||
@@ -1666,8 +1494,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
// Check that we haven't justified the second epoch yet
|
||||
jc := service.cfg.ForkChoiceStore.JustifiedCheckpoint()
|
||||
@@ -1690,7 +1519,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, firstInvalidRoot, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, firstInvalidRoot)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, err)
|
||||
jc = service.cfg.ForkChoiceStore.JustifiedCheckpoint()
|
||||
require.Equal(t, primitives.Epoch(2), jc.Epoch)
|
||||
@@ -1700,6 +1531,10 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
|
||||
lvh := b.Block.Body.ExecutionPayload.ParentHash
|
||||
// check our head
|
||||
require.Equal(t, firstInvalidRoot, service.cfg.ForkChoiceStore.CachedHeadRoot())
|
||||
isBlock18OptimisticAfterImport, err := service.IsOptimisticForRoot(ctx, firstInvalidRoot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, isBlock18OptimisticAfterImport)
|
||||
time.Sleep(20 * time.Millisecond) // wait for async forkchoice update to be processed
|
||||
|
||||
// import another block to find out that it was invalid
|
||||
mockEngine = &mockExecution.EngineClient{ErrNewPayload: execution.ErrInvalidPayloadStatus, NewPayloadResp: lvh}
|
||||
@@ -1750,7 +1585,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, err)
|
||||
// Check the newly imported block is head, it justified the right
|
||||
// checkpoint and the node is no longer optimistic
|
||||
@@ -1817,7 +1654,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
|
||||
for i := 6; i < 12; i++ {
|
||||
@@ -1838,8 +1677,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
|
||||
// import the merge block
|
||||
@@ -1859,7 +1699,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, lastValidRoot, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, lastValidRoot)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, err)
|
||||
// save the post state and the payload Hash of this block since it will
|
||||
// be the LVH
|
||||
@@ -1888,8 +1730,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, invalidRoots[i-13], wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, invalidRoots[i-13])
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
// Check that we have justified the second epoch
|
||||
jc := service.cfg.ForkChoiceStore.JustifiedCheckpoint()
|
||||
@@ -1957,7 +1800,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
// Check that the head is still INVALID and the node is still optimistic
|
||||
require.Equal(t, invalidHeadRoot, service.cfg.ForkChoiceStore.CachedHeadRoot())
|
||||
optimistic, err = service.IsOptimistic(ctx)
|
||||
@@ -1982,7 +1827,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, err)
|
||||
st, err = service.cfg.StateGen.StateByRoot(ctx, root)
|
||||
require.NoError(t, err)
|
||||
@@ -2010,7 +1857,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, root, service.cfg.ForkChoiceStore.CachedHeadRoot())
|
||||
sjc = service.CurrentJustifiedCheckpt()
|
||||
@@ -2054,7 +1903,6 @@ func TestNoViableHead_Reboot(t *testing.T) {
|
||||
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, genesisRoot), "Could not save genesis state")
|
||||
|
||||
for i := 1; i < 6; i++ {
|
||||
t.Log(i)
|
||||
driftGenesisTime(service, primitives.Slot(i), 0)
|
||||
st, err := service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
@@ -2071,7 +1919,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
|
||||
for i := 6; i < 12; i++ {
|
||||
@@ -2091,8 +1941,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
}
|
||||
|
||||
// import the merge block
|
||||
@@ -2112,7 +1963,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, lastValidRoot, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, lastValidRoot)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, err)
|
||||
// save the post state and the payload Hash of this block since it will
|
||||
// be the LVH
|
||||
@@ -2143,7 +1996,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch))
|
||||
_, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch)
|
||||
require.NoError(t, err)
|
||||
@@ -2264,7 +2119,9 @@ func TestOnBlock_HandleBlockAttestations(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
|
||||
st, err = service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
@@ -2330,7 +2187,9 @@ func TestOnBlock_HandleBlockAttestations(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
|
||||
st, err = service.HeadState(ctx)
|
||||
require.NoError(t, err)
|
||||
@@ -2613,7 +2472,10 @@ func TestRollbackBlock(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Rollback block insertion into db and caches.
|
||||
require.ErrorContains(t, fmt.Sprintf("could not insert block %d to fork choice store", roblock.Block().Slot()), service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.ErrorContains(t, fmt.Sprintf("could not insert block %d to fork choice store", roblock.Block().Slot()), err)
|
||||
|
||||
// The block should no longer exist.
|
||||
require.Equal(t, false, service.cfg.BeaconDB.HasBlock(ctx, root))
|
||||
@@ -2714,7 +2576,9 @@ func TestRollbackBlock_ContextDeadline(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
|
||||
b, err = util.GenerateFullBlock(postState, keys, util.DefaultBlockGenConfig(), 34)
|
||||
require.NoError(t, err)
|
||||
@@ -2748,7 +2612,10 @@ func TestRollbackBlock_ContextDeadline(t *testing.T) {
|
||||
require.NoError(t, postState.SetFinalizedCheckpoint(cj))
|
||||
|
||||
// Rollback block insertion into db and caches.
|
||||
require.ErrorContains(t, "context canceled", service.postBlockProcess(&postBlockProcessConfig{cancCtx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(&postBlockProcessConfig{cancCtx, roblock, [32]byte{}, postState, false})
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.ErrorContains(t, "context canceled", err)
|
||||
|
||||
// The block should no longer exist.
|
||||
require.Equal(t, false, service.cfg.BeaconDB.HasBlock(ctx, root))
|
||||
@@ -3244,7 +3111,9 @@ func Test_postBlockProcess_EventSending(t *testing.T) {
|
||||
}
|
||||
|
||||
// Execute postBlockProcess
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
err = service.postBlockProcess(cfg)
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
|
||||
// Check error expectation
|
||||
if tt.expectError {
|
||||
|
||||
@@ -156,13 +156,15 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
|
||||
}
|
||||
if s.inRegularSync() {
|
||||
fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:])
|
||||
if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
|
||||
return
|
||||
}
|
||||
go s.forkchoiceUpdateWithExecution(s.ctx, fcuArgs)
|
||||
}
|
||||
if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
|
||||
return
|
||||
}
|
||||
if err := s.forkchoiceUpdateWithExecution(s.ctx, fcuArgs); err != nil {
|
||||
log.WithError(err).Error("Could not update forkchoice")
|
||||
if err := s.saveHead(s.ctx, fcuArgs.headRoot, fcuArgs.headBlock, fcuArgs.headState); err != nil {
|
||||
log.WithError(err).Error("Could not save head")
|
||||
}
|
||||
s.pruneAttsFromPool(s.ctx, fcuArgs.headState, fcuArgs.headBlock)
|
||||
}
|
||||
|
||||
// This processes fork choice attestations from the pool to account for validator votes and fork choice.
|
||||
|
||||
@@ -117,7 +117,9 @@ func TestService_ProcessAttestationsAndUpdateHead(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, tRoot, wsb, postState))
|
||||
roblock, err := blocks.NewROBlockWithRoot(wsb, tRoot)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
copied, err = service.cfg.StateGen.StateByRoot(ctx, tRoot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, fcs.NodeCount())
|
||||
@@ -177,7 +179,9 @@ func TestService_UpdateHead_NoAtts(t *testing.T) {
|
||||
require.NoError(t, service.savePostStateInfo(ctx, tRoot, wsb, postState))
|
||||
roblock, err := blocks.NewROBlockWithRoot(wsb, tRoot)
|
||||
require.NoError(t, err)
|
||||
service.cfg.ForkChoiceStore.Lock()
|
||||
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
|
||||
service.cfg.ForkChoiceStore.Unlock()
|
||||
require.Equal(t, 2, fcs.NodeCount())
|
||||
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb))
|
||||
require.Equal(t, tRoot, service.head.root)
|
||||
|
||||
@@ -142,6 +142,18 @@ func ProcessSlot(ctx context.Context, state state.BeaconState) (state.BeaconStat
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Spec v1.6.1 (pseudocode):
|
||||
// # [New in Gloas:EIP7732]
|
||||
// # Unset the next payload availability
|
||||
// state.execution_payload_availability[(state.slot + 1) % SLOTS_PER_HISTORICAL_ROOT] = 0b0
|
||||
if state.Version() >= version.Gloas {
|
||||
index := uint64((state.Slot() + 1) % params.BeaconConfig().SlotsPerHistoricalRoot)
|
||||
if err := state.UpdateExecutionPayloadAvailabilityAtIndex(index, 0x0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
|
||||
100
beacon-chain/core/transition/transition_gloas_test.go
Normal file
100
beacon-chain/core/transition/transition_gloas_test.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package transition
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type trackingState struct {
|
||||
state.BeaconState
|
||||
slot primitives.Slot
|
||||
version int
|
||||
header *ethpb.BeaconBlockHeader
|
||||
epaCalls int
|
||||
epaIndex uint64
|
||||
epaValue byte
|
||||
epaErr error
|
||||
}
|
||||
|
||||
func (s *trackingState) Slot() primitives.Slot {
|
||||
return s.slot
|
||||
}
|
||||
|
||||
func (s *trackingState) Version() int {
|
||||
return s.version
|
||||
}
|
||||
|
||||
func (s *trackingState) HashTreeRoot(_ context.Context) ([32]byte, error) {
|
||||
return [32]byte{0x01}, nil
|
||||
}
|
||||
|
||||
func (s *trackingState) UpdateStateRootAtIndex(_ uint64, _ [32]byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *trackingState) LatestBlockHeader() *ethpb.BeaconBlockHeader {
|
||||
return s.header
|
||||
}
|
||||
|
||||
func (s *trackingState) SetLatestBlockHeader(header *ethpb.BeaconBlockHeader) error {
|
||||
s.header = header
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *trackingState) UpdateBlockRootAtIndex(_ uint64, _ [32]byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *trackingState) UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val byte) error {
|
||||
s.epaCalls++
|
||||
s.epaIndex = idx
|
||||
s.epaValue = val
|
||||
if s.epaErr != nil {
|
||||
return s.epaErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestProcessSlot_GloasClearsNextPayloadAvailability(t *testing.T) {
|
||||
st := &trackingState{
|
||||
slot: 10,
|
||||
version: version.Gloas,
|
||||
header: testBeaconBlockHeader(),
|
||||
}
|
||||
|
||||
_, err := ProcessSlot(context.Background(), st)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, st.epaCalls)
|
||||
require.Equal(t, uint64((st.slot+1)%params.BeaconConfig().SlotsPerHistoricalRoot), st.epaIndex)
|
||||
require.Equal(t, byte(0x0), st.epaValue)
|
||||
}
|
||||
|
||||
func TestProcessSlot_GloasAvailabilityUpdateError(t *testing.T) {
|
||||
updateErr := errors.New("update failed")
|
||||
st := &trackingState{
|
||||
slot: 7,
|
||||
version: version.Gloas,
|
||||
header: testBeaconBlockHeader(),
|
||||
epaErr: updateErr,
|
||||
}
|
||||
|
||||
_, err := ProcessSlot(context.Background(), st)
|
||||
require.ErrorIs(t, err, updateErr)
|
||||
require.Equal(t, 1, st.epaCalls)
|
||||
}
|
||||
|
||||
func testBeaconBlockHeader() *ethpb.BeaconBlockHeader {
|
||||
return ðpb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
}
|
||||
}
|
||||
@@ -38,6 +38,7 @@ go_library(
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_spf13_afero//:go_default_library",
|
||||
"@org_golang_x_sync//errgroup:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/afero"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -185,73 +186,162 @@ func (dcs *DataColumnStorage) WarmCache() {
|
||||
|
||||
highestStoredEpoch := primitives.Epoch(0)
|
||||
|
||||
// Walk the data column filesystem to warm up the cache.
|
||||
if err := afero.Walk(dcs.fs, ".", func(path string, info os.FileInfo, fileErr error) (err error) {
|
||||
if fileErr != nil {
|
||||
return fileErr
|
||||
}
|
||||
|
||||
// If not a leaf, skip.
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Extract metadata from the file path.
|
||||
fileMetadata, err := extractFileMetadata(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error encountered while extracting file metadata")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Open the data column filesystem file.
|
||||
f, err := dcs.fs.Open(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error encountered while opening data column filesystem file")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close the file.
|
||||
defer func() {
|
||||
// Overwrite the existing error only if it is nil, since the close error is less important.
|
||||
closeErr := f.Close()
|
||||
if closeErr != nil && err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
}()
|
||||
|
||||
// Read the metadata of the file.
|
||||
metadata, err := dcs.metadata(f)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error encountered while reading metadata from data column filesystem file")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check the indices.
|
||||
indices := metadata.indices.all()
|
||||
if len(indices) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build the ident.
|
||||
dataColumnsIdent := DataColumnsIdent{Root: fileMetadata.blockRoot, Epoch: fileMetadata.epoch, Indices: indices}
|
||||
|
||||
// Update the highest stored epoch.
|
||||
highestStoredEpoch = max(highestStoredEpoch, fileMetadata.epoch)
|
||||
|
||||
// Set the ident in the cache.
|
||||
if err := dcs.cache.set(dataColumnsIdent); err != nil {
|
||||
log.WithError(err).Error("Error encountered while ensuring data column filesystem cache")
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.WithError(err).Error("Error encountered while walking data column filesystem.")
|
||||
// List all period directories
|
||||
periodFileInfos, err := afero.ReadDir(dcs.fs, ".")
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error reading top directory during warm cache")
|
||||
return
|
||||
}
|
||||
|
||||
// Prune the cache and the filesystem.
|
||||
// Iterate through periods
|
||||
for _, periodFileInfo := range periodFileInfos {
|
||||
if !periodFileInfo.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
periodPath := periodFileInfo.Name()
|
||||
|
||||
// List all epoch directories in this period
|
||||
epochFileInfos, err := afero.ReadDir(dcs.fs, periodPath)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("period", periodPath).Error("Error reading period directory during warm cache")
|
||||
continue
|
||||
}
|
||||
|
||||
// Iterate through epochs
|
||||
for _, epochFileInfo := range epochFileInfos {
|
||||
if !epochFileInfo.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
epochPath := path.Join(periodPath, epochFileInfo.Name())
|
||||
|
||||
// List all .sszs files in this epoch
|
||||
files, err := listEpochFiles(dcs.fs, epochPath)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("epoch", epochPath).Error("Error listing epoch files during warm cache")
|
||||
continue
|
||||
}
|
||||
|
||||
if len(files) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Process all files in this epoch in parallel
|
||||
epochHighest, err := dcs.processEpochFiles(files)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("epoch", epochPath).Error("Error processing epoch files during warm cache")
|
||||
}
|
||||
|
||||
highestStoredEpoch = max(highestStoredEpoch, epochHighest)
|
||||
}
|
||||
}
|
||||
|
||||
// Prune the cache and the filesystem
|
||||
dcs.prune()
|
||||
|
||||
log.WithField("elapsed", time.Since(start)).Info("Data column filesystem cache warm-up complete")
|
||||
totalElapsed := time.Since(start)
|
||||
|
||||
// Log summary
|
||||
log.WithField("elapsed", totalElapsed).Info("Data column filesystem cache warm-up complete")
|
||||
}
|
||||
|
||||
// listEpochFiles lists all .sszs files in an epoch directory.
|
||||
func listEpochFiles(fs afero.Fs, epochPath string) ([]string, error) {
|
||||
fileInfos, err := afero.ReadDir(fs, epochPath)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "read epoch directory")
|
||||
}
|
||||
|
||||
files := make([]string, 0, len(fileInfos))
|
||||
for _, fileInfo := range fileInfos {
|
||||
if fileInfo.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
fileName := fileInfo.Name()
|
||||
if strings.HasSuffix(fileName, "."+dataColumnsFileExtension) {
|
||||
files = append(files, path.Join(epochPath, fileName))
|
||||
}
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
// processEpochFiles processes all .sszs files in an epoch directory in parallel.
|
||||
func (dcs *DataColumnStorage) processEpochFiles(files []string) (primitives.Epoch, error) {
|
||||
var (
|
||||
eg errgroup.Group
|
||||
mu sync.Mutex
|
||||
)
|
||||
|
||||
highestEpoch := primitives.Epoch(0)
|
||||
for _, filePath := range files {
|
||||
eg.Go(func() error {
|
||||
epoch, err := dcs.processFile(filePath)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("file", filePath).Error("Error processing file during warm cache")
|
||||
return nil
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
highestEpoch = max(highestEpoch, epoch)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return highestEpoch, err
|
||||
}
|
||||
|
||||
return highestEpoch, nil
|
||||
}
|
||||
|
||||
// processFile processes a single .sszs file.
|
||||
func (dcs *DataColumnStorage) processFile(filePath string) (primitives.Epoch, error) {
|
||||
// Extract metadata from the file path
|
||||
fileMetadata, err := extractFileMetadata(filePath)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "extract file metadata")
|
||||
}
|
||||
|
||||
// Open the file (each goroutine gets its own FD)
|
||||
f, err := dcs.fs.Open(filePath)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "open file")
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := f.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).WithField("file", filePath).Error("Error closing file during warm cache")
|
||||
}
|
||||
}()
|
||||
|
||||
// Read metadata
|
||||
metadata, err := dcs.metadata(f)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "read metadata")
|
||||
}
|
||||
|
||||
// Extract indices
|
||||
indices := metadata.indices.all()
|
||||
if len(indices) == 0 {
|
||||
return fileMetadata.epoch, nil // No indices, skip
|
||||
}
|
||||
|
||||
// Build ident and set in cache (thread-safe)
|
||||
dataColumnsIdent := DataColumnsIdent{
|
||||
Root: fileMetadata.blockRoot,
|
||||
Epoch: fileMetadata.epoch,
|
||||
Indices: indices,
|
||||
}
|
||||
|
||||
if err := dcs.cache.set(dataColumnsIdent); err != nil {
|
||||
return 0, errors.Wrap(err, "cache set")
|
||||
}
|
||||
|
||||
return fileMetadata.epoch, nil
|
||||
}
|
||||
|
||||
// Summary returns the DataColumnStorageSummary.
|
||||
|
||||
@@ -5,6 +5,7 @@ go_library(
|
||||
srcs = [
|
||||
"error.go",
|
||||
"interfaces.go",
|
||||
"interfaces_gloas.go",
|
||||
"prometheus.go",
|
||||
],
|
||||
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/state",
|
||||
|
||||
@@ -98,6 +98,7 @@ type WriteOnlyBeaconState interface {
|
||||
WriteOnlyWithdrawals
|
||||
WriteOnlyDeposits
|
||||
WriteOnlyProposerLookahead
|
||||
WriteOnlyGloas
|
||||
SetGenesisTime(val time.Time) error
|
||||
SetGenesisValidatorsRoot(val []byte) error
|
||||
SetSlot(val primitives.Slot) error
|
||||
|
||||
6
beacon-chain/state/interfaces_gloas.go
Normal file
6
beacon-chain/state/interfaces_gloas.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package state
|
||||
|
||||
// WriteOnlyGloas defines a struct which only has write access to Gloas field methods.
|
||||
type WriteOnlyGloas interface {
|
||||
UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val byte) error
|
||||
}
|
||||
@@ -36,6 +36,7 @@ go_library(
|
||||
"setters_deposit_requests.go",
|
||||
"setters_deposits.go",
|
||||
"setters_eth1.go",
|
||||
"setters_gloas.go",
|
||||
"setters_misc.go",
|
||||
"setters_participation.go",
|
||||
"setters_payload_header.go",
|
||||
|
||||
@@ -721,3 +721,13 @@ func ProtobufBeaconStateFulu(s any) (*ethpb.BeaconStateFulu, error) {
|
||||
}
|
||||
return pbState, nil
|
||||
}
|
||||
|
||||
// ProtobufBeaconStateGloas transforms an input into beacon state Gloas in the form of protobuf.
|
||||
// Error is returned if the input is not type protobuf beacon state.
|
||||
func ProtobufBeaconStateGloas(s any) (*ethpb.BeaconStateGloas, error) {
|
||||
pbState, ok := s.(*ethpb.BeaconStateGloas)
|
||||
if !ok {
|
||||
return nil, errors.New("input is not type pb.BeaconStateGloas")
|
||||
}
|
||||
return pbState, nil
|
||||
}
|
||||
|
||||
30
beacon-chain/state/state-native/setters_gloas.go
Normal file
30
beacon-chain/state/state-native/setters_gloas.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package state_native
|
||||
|
||||
import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native/types"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// UpdateExecutionPayloadAvailabilityAtIndex updates the execution payload availability bit at a specific index.
|
||||
func (b *BeaconState) UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val byte) error {
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
|
||||
byteIndex := idx / 8
|
||||
bitIndex := idx % 8
|
||||
|
||||
if byteIndex >= uint64(len(b.executionPayloadAvailability)) {
|
||||
return errors.Errorf("bit index %d (byte index %d) out of range for execution payload availability length %d", idx, byteIndex, len(b.executionPayloadAvailability))
|
||||
}
|
||||
|
||||
if val != 0 {
|
||||
// Set the bit
|
||||
b.executionPayloadAvailability[byteIndex] |= (1 << bitIndex)
|
||||
} else {
|
||||
// Clear the bit
|
||||
b.executionPayloadAvailability[byteIndex] &^= (1 << bitIndex)
|
||||
}
|
||||
|
||||
b.markFieldAsDirty(types.ExecutionPayloadAvailability)
|
||||
return nil
|
||||
}
|
||||
50
beacon-chain/state/state-native/setters_gloas_test.go
Normal file
50
beacon-chain/state/state-native/setters_gloas_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package state_native
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUpdateExecutionPayloadAvailabilityAtIndex_SetAndClear(t *testing.T) {
|
||||
st := newGloasStateWithAvailability(t, make([]byte, 1024))
|
||||
|
||||
otherIdx := uint64(8) // byte 1, bit 0
|
||||
idx := uint64(9) // byte 1, bit 1
|
||||
|
||||
require.NoError(t, st.UpdateExecutionPayloadAvailabilityAtIndex(otherIdx, 1))
|
||||
require.Equal(t, byte(0x01), st.executionPayloadAvailability[1])
|
||||
|
||||
require.NoError(t, st.UpdateExecutionPayloadAvailabilityAtIndex(idx, 1))
|
||||
require.Equal(t, byte(0x03), st.executionPayloadAvailability[1])
|
||||
|
||||
require.NoError(t, st.UpdateExecutionPayloadAvailabilityAtIndex(idx, 0))
|
||||
require.Equal(t, byte(0x01), st.executionPayloadAvailability[1])
|
||||
}
|
||||
|
||||
func TestUpdateExecutionPayloadAvailabilityAtIndex_OutOfRange(t *testing.T) {
|
||||
st := newGloasStateWithAvailability(t, make([]byte, 1024))
|
||||
|
||||
idx := uint64(len(st.executionPayloadAvailability)) * 8
|
||||
err := st.UpdateExecutionPayloadAvailabilityAtIndex(idx, 1)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "out of range")
|
||||
|
||||
for _, b := range st.executionPayloadAvailability {
|
||||
if b != 0 {
|
||||
t.Fatalf("execution payload availability mutated on error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newGloasStateWithAvailability(t *testing.T, availability []byte) *BeaconState {
|
||||
t.Helper()
|
||||
|
||||
st, err := InitializeFromProtoUnsafeGloas(ðpb.BeaconStateGloas{
|
||||
ExecutionPayloadAvailability: availability,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return st.(*BeaconState)
|
||||
}
|
||||
@@ -3,11 +3,12 @@ package sync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
@@ -192,19 +193,13 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
|
||||
dataColumnSidecarArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
|
||||
dataColumnSidecarVerificationGossipHistogram.Observe(float64(validationTime.Milliseconds()))
|
||||
|
||||
peerGossipScore := s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid)
|
||||
|
||||
select {
|
||||
case s.dataColumnLogCh <- dataColumnLogEntry{
|
||||
Slot: roDataColumn.Slot(),
|
||||
ColIdx: roDataColumn.Index,
|
||||
PropIdx: roDataColumn.ProposerIndex(),
|
||||
BlockRoot: roDataColumn.BlockRoot(),
|
||||
ParentRoot: roDataColumn.ParentRoot(),
|
||||
PeerSuffix: pid.String()[len(pid.String())-6:],
|
||||
PeerGossipScore: peerGossipScore,
|
||||
validationTime: validationTime,
|
||||
sinceStartTime: sinceSlotStartTime,
|
||||
slot: roDataColumn.Slot(),
|
||||
index: roDataColumn.Index,
|
||||
root: roDataColumn.BlockRoot(),
|
||||
validationTime: validationTime,
|
||||
sinceStartTime: sinceSlotStartTime,
|
||||
}:
|
||||
default:
|
||||
log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry")
|
||||
@@ -249,68 +244,69 @@ func computeCacheKey(slot primitives.Slot, proposerIndex primitives.ValidatorInd
|
||||
}
|
||||
|
||||
type dataColumnLogEntry struct {
|
||||
Slot primitives.Slot
|
||||
ColIdx uint64
|
||||
PropIdx primitives.ValidatorIndex
|
||||
BlockRoot [32]byte
|
||||
ParentRoot [32]byte
|
||||
PeerSuffix string
|
||||
PeerGossipScore float64
|
||||
validationTime time.Duration
|
||||
sinceStartTime time.Duration
|
||||
slot primitives.Slot
|
||||
index uint64
|
||||
root [32]byte
|
||||
validationTime time.Duration
|
||||
sinceStartTime time.Duration
|
||||
}
|
||||
|
||||
func (s *Service) processDataColumnLogs() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
slotStats := make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)
|
||||
slotStats := make(map[[fieldparams.RootLength]byte][]dataColumnLogEntry)
|
||||
|
||||
for {
|
||||
select {
|
||||
case entry := <-s.dataColumnLogCh:
|
||||
cols := slotStats[entry.Slot]
|
||||
cols[entry.ColIdx] = entry
|
||||
slotStats[entry.Slot] = cols
|
||||
case col := <-s.dataColumnLogCh:
|
||||
cols := slotStats[col.root]
|
||||
cols = append(cols, col)
|
||||
slotStats[col.root] = cols
|
||||
case <-ticker.C:
|
||||
for slot, columns := range slotStats {
|
||||
var (
|
||||
colIndices = make([]uint64, 0, fieldparams.NumberOfColumns)
|
||||
peers = make([]string, 0, fieldparams.NumberOfColumns)
|
||||
gossipScores = make([]float64, 0, fieldparams.NumberOfColumns)
|
||||
validationTimes = make([]string, 0, fieldparams.NumberOfColumns)
|
||||
sinceStartTimes = make([]string, 0, fieldparams.NumberOfColumns)
|
||||
)
|
||||
for root, columns := range slotStats {
|
||||
indices := make([]uint64, 0, fieldparams.NumberOfColumns)
|
||||
minValidationTime, maxValidationTime, sumValidationTime := time.Duration(0), time.Duration(0), time.Duration(0)
|
||||
minSinceStartTime, maxSinceStartTime, sumSinceStartTime := time.Duration(0), time.Duration(0), time.Duration(0)
|
||||
|
||||
totalReceived := 0
|
||||
for _, entry := range columns {
|
||||
if entry.PeerSuffix == "" {
|
||||
for _, column := range columns {
|
||||
indices = append(indices, column.index)
|
||||
|
||||
sumValidationTime += column.validationTime
|
||||
sumSinceStartTime += column.sinceStartTime
|
||||
|
||||
if totalReceived == 0 {
|
||||
minValidationTime, maxValidationTime = column.validationTime, column.validationTime
|
||||
minSinceStartTime, maxSinceStartTime = column.sinceStartTime, column.sinceStartTime
|
||||
totalReceived++
|
||||
continue
|
||||
}
|
||||
colIndices = append(colIndices, entry.ColIdx)
|
||||
peers = append(peers, entry.PeerSuffix)
|
||||
gossipScores = append(gossipScores, roundFloat(entry.PeerGossipScore, 2))
|
||||
validationTimes = append(validationTimes, fmt.Sprintf("%.2fms", float64(entry.validationTime.Milliseconds())))
|
||||
sinceStartTimes = append(sinceStartTimes, fmt.Sprintf("%.2fms", float64(entry.sinceStartTime.Milliseconds())))
|
||||
|
||||
minValidationTime, maxValidationTime = min(minValidationTime, column.validationTime), max(maxValidationTime, column.validationTime)
|
||||
minSinceStartTime, maxSinceStartTime = min(minSinceStartTime, column.sinceStartTime), max(maxSinceStartTime, column.sinceStartTime)
|
||||
totalReceived++
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": slot,
|
||||
"receivedCount": totalReceived,
|
||||
"columnIndices": colIndices,
|
||||
"peers": peers,
|
||||
"gossipScores": gossipScores,
|
||||
"validationTimes": validationTimes,
|
||||
"sinceStartTimes": sinceStartTimes,
|
||||
}).Debug("Accepted data column sidecars summary")
|
||||
if totalReceived > 0 {
|
||||
slices.Sort(indices)
|
||||
avgValidationTime := sumValidationTime / time.Duration(totalReceived)
|
||||
avgSinceStartTime := sumSinceStartTime / time.Duration(totalReceived)
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": columns[0].slot,
|
||||
"root": fmt.Sprintf("%#x", root),
|
||||
"count": totalReceived,
|
||||
"indices": helpers.PrettySlice(indices),
|
||||
"validationTime": prettyMinMaxAverage(minValidationTime, maxValidationTime, avgValidationTime),
|
||||
"sinceStartTime": prettyMinMaxAverage(minSinceStartTime, maxSinceStartTime, avgSinceStartTime),
|
||||
}).Debug("Accepted data column sidecars summary")
|
||||
}
|
||||
}
|
||||
slotStats = make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)
|
||||
|
||||
slotStats = make(map[[fieldparams.RootLength]byte][]dataColumnLogEntry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func roundFloat(f float64, decimals int) float64 {
|
||||
mult := math.Pow(10, float64(decimals))
|
||||
return math.Round(f*mult) / mult
|
||||
func prettyMinMaxAverage(min, max, average time.Duration) string {
|
||||
return fmt.Sprintf("[min: %v, avg: %v, max: %v]", min, average, max)
|
||||
}
|
||||
|
||||
@@ -687,6 +687,12 @@ func sbrNotFound(t *testing.T, expectedRoot [32]byte) *mockStateByRooter {
|
||||
}}
|
||||
}
|
||||
|
||||
func sbrReturnsState(st state.BeaconState) *mockStateByRooter {
|
||||
return &mockStateByRooter{sbr: func(_ context.Context, _ [32]byte) (state.BeaconState, error) {
|
||||
return st, nil
|
||||
}}
|
||||
}
|
||||
|
||||
func sbrForValOverride(idx primitives.ValidatorIndex, val *ethpb.Validator) *mockStateByRooter {
|
||||
return sbrForValOverrideWithT(nil, idx, val)
|
||||
}
|
||||
|
||||
@@ -11,12 +11,10 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
||||
forkchoicetypes "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/types"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime/logging"
|
||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||
@@ -484,88 +482,19 @@ func (dv *RODataColumnsVerifier) SidecarProposerExpected(ctx context.Context) (e
|
||||
|
||||
defer dv.recordResult(RequireSidecarProposerExpected, &err)
|
||||
|
||||
type slotParentRoot struct {
|
||||
slot primitives.Slot
|
||||
parentRoot [fieldparams.RootLength]byte
|
||||
}
|
||||
|
||||
targetRootBySlotParentRoot := make(map[slotParentRoot][fieldparams.RootLength]byte)
|
||||
|
||||
var targetRootFromCache = func(slot primitives.Slot, parentRoot [fieldparams.RootLength]byte) ([fieldparams.RootLength]byte, error) {
|
||||
// Use cached values if available.
|
||||
slotParentRoot := slotParentRoot{slot: slot, parentRoot: parentRoot}
|
||||
if root, ok := targetRootBySlotParentRoot[slotParentRoot]; ok {
|
||||
return root, nil
|
||||
}
|
||||
|
||||
// Compute the epoch of the data column slot.
|
||||
dataColumnEpoch := slots.ToEpoch(slot)
|
||||
if dataColumnEpoch > 0 {
|
||||
dataColumnEpoch = dataColumnEpoch - 1
|
||||
}
|
||||
|
||||
// Compute the target root for the epoch.
|
||||
targetRoot, err := dv.fc.TargetRootForEpoch(parentRoot, dataColumnEpoch)
|
||||
if err != nil {
|
||||
return [fieldparams.RootLength]byte{}, columnErrBuilder(errors.Wrap(err, "target root from epoch"))
|
||||
}
|
||||
|
||||
// Store the target root in the cache.
|
||||
targetRootBySlotParentRoot[slotParentRoot] = targetRoot
|
||||
|
||||
return targetRoot, nil
|
||||
}
|
||||
|
||||
for _, dataColumn := range dv.dataColumns {
|
||||
// Extract the slot of the data column.
|
||||
dataColumnSlot := dataColumn.Slot()
|
||||
|
||||
// Extract the root of the parent block corresponding to the data column.
|
||||
parentRoot := dataColumn.ParentRoot()
|
||||
|
||||
// Compute the target root for the data column.
|
||||
targetRoot, err := targetRootFromCache(dataColumnSlot, parentRoot)
|
||||
// Get the verifying state, it is guaranteed to have the correct proposer in the lookahead.
|
||||
verifyingState, err := dv.getVerifyingState(ctx, dataColumn)
|
||||
if err != nil {
|
||||
return columnErrBuilder(errors.Wrap(err, "target root"))
|
||||
return columnErrBuilder(errors.Wrap(err, "verifying state"))
|
||||
}
|
||||
|
||||
// Compute the epoch of the data column slot.
|
||||
dataColumnEpoch := slots.ToEpoch(dataColumnSlot)
|
||||
if dataColumnEpoch > 0 {
|
||||
dataColumnEpoch = dataColumnEpoch - 1
|
||||
}
|
||||
|
||||
// Create a checkpoint for the target root.
|
||||
checkpoint := &forkchoicetypes.Checkpoint{Root: targetRoot, Epoch: dataColumnEpoch}
|
||||
|
||||
// Try to extract the proposer index from the data column in the cache.
|
||||
idx, cached := dv.pc.Proposer(checkpoint, dataColumnSlot)
|
||||
|
||||
if !cached {
|
||||
parentRoot := dataColumn.ParentRoot()
|
||||
// Ensure the expensive index computation is only performed once for
|
||||
// concurrent requests for the same signature data.
|
||||
idxAny, err, _ := dv.sg.Do(concatRootSlot(parentRoot, dataColumnSlot), func() (any, error) {
|
||||
verifyingState, err := dv.getVerifyingState(ctx, dataColumn)
|
||||
if err != nil {
|
||||
return nil, columnErrBuilder(errors.Wrap(err, "verifying state"))
|
||||
}
|
||||
|
||||
idx, err = helpers.BeaconProposerIndexAtSlot(ctx, verifyingState, dataColumnSlot)
|
||||
if err != nil {
|
||||
return nil, columnErrBuilder(errors.Wrap(err, "compute proposer"))
|
||||
}
|
||||
|
||||
return idx, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
if idx, ok = idxAny.(primitives.ValidatorIndex); !ok {
|
||||
return columnErrBuilder(errors.New("type assertion to ValidatorIndex failed"))
|
||||
}
|
||||
// Use proposer lookahead directly
|
||||
idx, err := helpers.BeaconProposerIndexAtSlot(ctx, verifyingState, dataColumnSlot)
|
||||
if err != nil {
|
||||
return columnErrBuilder(errors.Wrap(err, "proposer from lookahead"))
|
||||
}
|
||||
|
||||
if idx != dataColumn.ProposerIndex() {
|
||||
@@ -626,7 +555,3 @@ func inclusionProofKey(c blocks.RODataColumn) ([32]byte, error) {
|
||||
|
||||
return sha256.Sum256(unhashedKey), nil
|
||||
}
|
||||
|
||||
func concatRootSlot(root [fieldparams.RootLength]byte, slot primitives.Slot) string {
|
||||
return string(root[:]) + fmt.Sprintf("%d", slot)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package verification
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -795,87 +794,90 @@ func TestDataColumnsSidecarProposerExpected(t *testing.T) {
|
||||
blobCount = 1
|
||||
)
|
||||
|
||||
parentRoot := [fieldparams.RootLength]byte{}
|
||||
columns := GenerateTestDataColumns(t, parentRoot, columnSlot, blobCount)
|
||||
firstColumn := columns[0]
|
||||
ctx := t.Context()
|
||||
testCases := []struct {
|
||||
name string
|
||||
stateByRooter StateByRooter
|
||||
proposerCache proposerCache
|
||||
columns []blocks.RODataColumn
|
||||
error string
|
||||
}{
|
||||
{
|
||||
name: "Cached, matches",
|
||||
stateByRooter: nil,
|
||||
proposerCache: &mockProposerCache{
|
||||
ProposerCB: pcReturnsIdx(firstColumn.ProposerIndex()),
|
||||
},
|
||||
columns: columns,
|
||||
},
|
||||
{
|
||||
name: "Cached, does not match",
|
||||
stateByRooter: nil,
|
||||
proposerCache: &mockProposerCache{
|
||||
ProposerCB: pcReturnsIdx(firstColumn.ProposerIndex() + 1),
|
||||
},
|
||||
columns: columns,
|
||||
error: errSidecarUnexpectedProposer.Error(),
|
||||
},
|
||||
{
|
||||
name: "Not cached, state lookup failure",
|
||||
stateByRooter: sbrNotFound(t, firstColumn.ParentRoot()),
|
||||
proposerCache: &mockProposerCache{
|
||||
ProposerCB: pcReturnsNotFound(),
|
||||
},
|
||||
columns: columns,
|
||||
error: "verifying state",
|
||||
},
|
||||
}
|
||||
parentRoot := [fieldparams.RootLength]byte{}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
initializer := Initializer{
|
||||
shared: &sharedResources{
|
||||
sr: tc.stateByRooter,
|
||||
pc: tc.proposerCache,
|
||||
hsp: &mockHeadStateProvider{},
|
||||
fc: &mockForkchoicer{
|
||||
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{}),
|
||||
},
|
||||
// Create a Fulu state to get the expected proposer from the lookahead.
|
||||
fuluState, _ := util.DeterministicGenesisStateFulu(t, 32)
|
||||
expectedProposer, err := fuluState.ProposerLookahead()
|
||||
require.NoError(t, err)
|
||||
expectedProposerIdx := primitives.ValidatorIndex(expectedProposer[columnSlot])
|
||||
|
||||
// Generate data columns with the expected proposer index.
|
||||
matchingColumns := generateTestDataColumnsWithProposer(t, parentRoot, columnSlot, blobCount, expectedProposerIdx)
|
||||
// Generate data columns with wrong proposer index.
|
||||
wrongColumns := generateTestDataColumnsWithProposer(t, parentRoot, columnSlot, blobCount, expectedProposerIdx+1)
|
||||
|
||||
t.Run("Proposer matches", func(t *testing.T) {
|
||||
initializer := Initializer{
|
||||
shared: &sharedResources{
|
||||
sr: sbrReturnsState(fuluState),
|
||||
hsp: &mockHeadStateProvider{
|
||||
headRoot: parentRoot[:],
|
||||
headSlot: columnSlot, // Same epoch so HeadStateReadOnly is used
|
||||
headStateReadOnly: fuluState,
|
||||
},
|
||||
}
|
||||
fc: &mockForkchoicer{},
|
||||
},
|
||||
}
|
||||
|
||||
verifier := initializer.NewDataColumnsVerifier(tc.columns, GossipDataColumnSidecarRequirements)
|
||||
var wg sync.WaitGroup
|
||||
verifier := initializer.NewDataColumnsVerifier(matchingColumns, GossipDataColumnSidecarRequirements)
|
||||
err := verifier.SidecarProposerExpected(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
|
||||
require.NoError(t, verifier.results.result(RequireSidecarProposerExpected))
|
||||
})
|
||||
|
||||
var err1, err2 error
|
||||
wg.Go(func() {
|
||||
err1 = verifier.SidecarProposerExpected(ctx)
|
||||
})
|
||||
wg.Go(func() {
|
||||
err2 = verifier.SidecarProposerExpected(ctx)
|
||||
})
|
||||
wg.Wait()
|
||||
t.Run("Proposer does not match", func(t *testing.T) {
|
||||
initializer := Initializer{
|
||||
shared: &sharedResources{
|
||||
sr: sbrReturnsState(fuluState),
|
||||
hsp: &mockHeadStateProvider{
|
||||
headRoot: parentRoot[:],
|
||||
headSlot: columnSlot, // Same epoch so HeadStateReadOnly is used
|
||||
headStateReadOnly: fuluState,
|
||||
},
|
||||
fc: &mockForkchoicer{},
|
||||
},
|
||||
}
|
||||
|
||||
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
|
||||
verifier := initializer.NewDataColumnsVerifier(wrongColumns, GossipDataColumnSidecarRequirements)
|
||||
err := verifier.SidecarProposerExpected(ctx)
|
||||
require.ErrorContains(t, errSidecarUnexpectedProposer.Error(), err)
|
||||
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
|
||||
require.NotNil(t, verifier.results.result(RequireSidecarProposerExpected))
|
||||
})
|
||||
|
||||
if len(tc.error) > 0 {
|
||||
require.ErrorContains(t, tc.error, err1)
|
||||
require.ErrorContains(t, tc.error, err2)
|
||||
require.NotNil(t, verifier.results.result(RequireSidecarProposerExpected))
|
||||
return
|
||||
}
|
||||
t.Run("State lookup failure", func(t *testing.T) {
|
||||
columns := GenerateTestDataColumns(t, parentRoot, columnSlot, blobCount)
|
||||
initializer := Initializer{
|
||||
shared: &sharedResources{
|
||||
sr: sbrNotFound(t, columns[0].ParentRoot()),
|
||||
hsp: &mockHeadStateProvider{},
|
||||
fc: &mockForkchoicer{},
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, err1)
|
||||
require.NoError(t, err2)
|
||||
require.NoError(t, verifier.results.result(RequireSidecarProposerExpected))
|
||||
verifier := initializer.NewDataColumnsVerifier(columns, GossipDataColumnSidecarRequirements)
|
||||
err := verifier.SidecarProposerExpected(ctx)
|
||||
require.ErrorContains(t, "verifying state", err)
|
||||
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
|
||||
require.NotNil(t, verifier.results.result(RequireSidecarProposerExpected))
|
||||
})
|
||||
}
|
||||
|
||||
err := verifier.SidecarProposerExpected(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
func generateTestDataColumnsWithProposer(t *testing.T, parent [fieldparams.RootLength]byte, slot primitives.Slot, blobCount int, proposer primitives.ValidatorIndex) []blocks.RODataColumn {
|
||||
roBlock, roBlobs := util.GenerateTestDenebBlockWithSidecar(t, parent, slot, blobCount, util.WithProposer(proposer))
|
||||
blobs := make([]kzg.Blob, 0, len(roBlobs))
|
||||
for i := range roBlobs {
|
||||
blobs = append(blobs, kzg.Blob(roBlobs[i].Blob))
|
||||
}
|
||||
|
||||
cellsPerBlob, proofsPerBlob := util.GenerateCellsAndProofs(t, blobs)
|
||||
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(roBlock))
|
||||
require.NoError(t, err)
|
||||
|
||||
return roDataColumnSidecars
|
||||
}
|
||||
|
||||
func TestColumnRequirementSatisfaction(t *testing.T) {
|
||||
@@ -922,12 +924,3 @@ func TestColumnRequirementSatisfaction(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestConcatRootSlot(t *testing.T) {
|
||||
root := [fieldparams.RootLength]byte{1, 2, 3}
|
||||
const slot = primitives.Slot(3210)
|
||||
|
||||
const expected = "\x01\x02\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x003210"
|
||||
|
||||
actual := concatRootSlot(root, slot)
|
||||
require.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
3
changelog/manu-cache-warmup.md
Normal file
3
changelog/manu-cache-warmup.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Data column sidecars cache warmup: Process in parallel all sidecars for a given epoch.
|
||||
3
changelog/manu-log.md
Normal file
3
changelog/manu-log.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Summarize DEBUG log corresponding to incoming via gossip data column sidecar.
|
||||
2
changelog/potuz_dcs_pc_removal.md
Normal file
2
changelog/potuz_dcs_pc_removal.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Changed
|
||||
- Use lookahead to validate data column sidecar proposer index.
|
||||
3
changelog/potuz_dont_lock_fcu.md
Normal file
3
changelog/potuz_dont_lock_fcu.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Notify the engine about forkchoice updates in the background.
|
||||
2
changelog/potuz_fcu_ctx.md
Normal file
2
changelog/potuz_fcu_ctx.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Changed
|
||||
- Use a separate context when updating the slot cache.
|
||||
@@ -200,6 +200,7 @@ go_test(
|
||||
"fulu__sanity__blocks_test.go",
|
||||
"fulu__sanity__slots_test.go",
|
||||
"fulu__ssz_static__ssz_static_test.go",
|
||||
"gloas__sanity__slots_test.go",
|
||||
"gloas__ssz_static__ssz_static_test.go",
|
||||
"phase0__epoch_processing__effective_balance_updates_test.go",
|
||||
"phase0__epoch_processing__epoch_processing_test.go",
|
||||
@@ -278,6 +279,7 @@ go_test(
|
||||
"//testing/spectest/shared/fulu/rewards:go_default_library",
|
||||
"//testing/spectest/shared/fulu/sanity:go_default_library",
|
||||
"//testing/spectest/shared/fulu/ssz_static:go_default_library",
|
||||
"//testing/spectest/shared/gloas/sanity:go_default_library",
|
||||
"//testing/spectest/shared/gloas/ssz_static:go_default_library",
|
||||
"//testing/spectest/shared/phase0/epoch_processing:go_default_library",
|
||||
"//testing/spectest/shared/phase0/finality:go_default_library",
|
||||
|
||||
11
testing/spectest/mainnet/gloas__sanity__slots_test.go
Normal file
11
testing/spectest/mainnet/gloas__sanity__slots_test.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package mainnet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/sanity"
|
||||
)
|
||||
|
||||
func TestMainnet_Gloas_Sanity_Slots(t *testing.T) {
|
||||
sanity.RunSlotProcessingTests(t, "mainnet")
|
||||
}
|
||||
@@ -206,6 +206,7 @@ go_test(
|
||||
"fulu__sanity__blocks_test.go",
|
||||
"fulu__sanity__slots_test.go",
|
||||
"fulu__ssz_static__ssz_static_test.go",
|
||||
"gloas__sanity__slots_test.go",
|
||||
"gloas__ssz_static__ssz_static_test.go",
|
||||
"phase0__epoch_processing__effective_balance_updates_test.go",
|
||||
"phase0__epoch_processing__epoch_processing_test.go",
|
||||
@@ -288,6 +289,7 @@ go_test(
|
||||
"//testing/spectest/shared/fulu/rewards:go_default_library",
|
||||
"//testing/spectest/shared/fulu/sanity:go_default_library",
|
||||
"//testing/spectest/shared/fulu/ssz_static:go_default_library",
|
||||
"//testing/spectest/shared/gloas/sanity:go_default_library",
|
||||
"//testing/spectest/shared/gloas/ssz_static:go_default_library",
|
||||
"//testing/spectest/shared/phase0/epoch_processing:go_default_library",
|
||||
"//testing/spectest/shared/phase0/finality:go_default_library",
|
||||
|
||||
11
testing/spectest/minimal/gloas__sanity__slots_test.go
Normal file
11
testing/spectest/minimal/gloas__sanity__slots_test.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package minimal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/sanity"
|
||||
)
|
||||
|
||||
func TestMinimal_Gloas_Sanity_Slots(t *testing.T) {
|
||||
sanity.RunSlotProcessingTests(t, "minimal")
|
||||
}
|
||||
19
testing/spectest/shared/gloas/sanity/BUILD.bazel
Normal file
19
testing/spectest/shared/gloas/sanity/BUILD.bazel
Normal file
@@ -0,0 +1,19 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
testonly = True,
|
||||
srcs = ["slot_processing.go"],
|
||||
importpath = "github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/sanity",
|
||||
visibility = ["//testing/spectest:__subpackages__"],
|
||||
deps = [
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/state/state-native:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/spectest/utils:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_golang_snappy//:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
],
|
||||
)
|
||||
61
testing/spectest/shared/gloas/sanity/slot_processing.go
Normal file
61
testing/spectest/shared/gloas/sanity/slot_processing.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package sanity
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
||||
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/spectest/utils"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/util"
|
||||
"github.com/golang/snappy"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func init() {
|
||||
transition.SkipSlotCache.Disable()
|
||||
}
|
||||
|
||||
// RunSlotProcessingTests executes "sanity/slots" tests.
|
||||
func RunSlotProcessingTests(t *testing.T, config string) {
|
||||
require.NoError(t, utils.SetConfig(t, config))
|
||||
|
||||
testFolders, testsFolderPath := utils.TestFolders(t, config, "gloas", "sanity/slots/pyspec_tests")
|
||||
|
||||
for _, folder := range testFolders {
|
||||
t.Run(folder.Name(), func(t *testing.T) {
|
||||
preBeaconStateFile, err := util.BazelFileBytes(testsFolderPath, folder.Name(), "pre.ssz_snappy")
|
||||
require.NoError(t, err)
|
||||
preBeaconStateSSZ, err := snappy.Decode(nil /* dst */, preBeaconStateFile)
|
||||
require.NoError(t, err, "Failed to decompress")
|
||||
base := ðpb.BeaconStateGloas{}
|
||||
require.NoError(t, base.UnmarshalSSZ(preBeaconStateSSZ), "Failed to unmarshal")
|
||||
beaconState, err := state_native.InitializeFromProtoGloas(base)
|
||||
require.NoError(t, err)
|
||||
|
||||
file, err := util.BazelFileBytes(testsFolderPath, folder.Name(), "slots.yaml")
|
||||
require.NoError(t, err)
|
||||
fileStr := string(file)
|
||||
slotsCount, err := strconv.ParseUint(fileStr[:len(fileStr)-5], 10, 64)
|
||||
require.NoError(t, err)
|
||||
|
||||
postBeaconStateFile, err := util.BazelFileBytes(testsFolderPath, folder.Name(), "post.ssz_snappy")
|
||||
require.NoError(t, err)
|
||||
postBeaconStateSSZ, err := snappy.Decode(nil /* dst */, postBeaconStateFile)
|
||||
require.NoError(t, err, "Failed to decompress")
|
||||
postBeaconState := ðpb.BeaconStateGloas{}
|
||||
require.NoError(t, postBeaconState.UnmarshalSSZ(postBeaconStateSSZ), "Failed to unmarshal")
|
||||
postState, err := transition.ProcessSlots(context.Background(), beaconState, beaconState.Slot().Add(slotsCount))
|
||||
require.NoError(t, err)
|
||||
|
||||
pbState, err := state_native.ProtobufBeaconStateGloas(postState.ToProto())
|
||||
require.NoError(t, err)
|
||||
if !proto.Equal(pbState, postBeaconState) {
|
||||
t.Fatal("Did not receive expected post state")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -44,6 +44,13 @@ func WithProposerSigning(idx primitives.ValidatorIndex, sk bls.SecretKey, valRoo
|
||||
}
|
||||
}
|
||||
|
||||
// WithProposer sets the proposer index for the generated block without signing.
|
||||
func WithProposer(idx primitives.ValidatorIndex) DenebBlockGeneratorOption {
|
||||
return func(g *denebBlockGenerator) {
|
||||
g.proposer = idx
|
||||
}
|
||||
}
|
||||
|
||||
func WithPayloadSetter(p *enginev1.ExecutionPayloadDeneb) DenebBlockGeneratorOption {
|
||||
return func(g *denebBlockGenerator) {
|
||||
g.payload = p
|
||||
|
||||
Reference in New Issue
Block a user