Compare commits

...

12 Commits

Author SHA1 Message Date
james-prysm
c690412c61 fixing test 2025-10-02 16:22:01 -05:00
james-prysm
595728b0bb Merge changes and resolve conflicts
Resolved conflict in beacon-chain/blockchain/process_block.go where both
branches modified the postBlockProcess function. Kept the blockProcessed
flag logic while incorporating upstream changes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-authored-by: Sculptor <sculptor@imbue.com>
2025-10-02 19:46:21 +00:00
james-prysm
f379b37d25 state channel handling
Co-authored-by: Sculptor <sculptor@imbue.com>
2025-10-02 19:44:30 +00:00
james-prysm
f4fee5e26d init 2025-10-02 14:42:38 -05:00
Manu NALEPA
b24fe0d23a requestAndSaveMissingDataColumnSidecars: Fix log (#15794) 2025-10-02 16:38:00 +00:00
Bastin
cbe50269de Change LC p2p validation rules (#15783)
* compare incoming lc message with locally computed object

* fix logs

* add comment
2025-10-02 15:04:33 +00:00
Manu NALEPA
4ed2953fcf inclusionProofKey: Include the commitments in the key. (#15795)
* `inclusionProofKey`: Include the commitments in the key.

* Fix Potuz's comment.

* Update beacon-chain/verification/data_column.go

Co-authored-by: Potuz <potuz@prysmaticlabs.com>

* Fix Potuz's comment.

---------

Co-authored-by: Potuz <potuz@prysmaticlabs.com>
2025-10-02 13:17:11 +00:00
Potuz
915837d059 Process pending on block (#15791)
* Process pending attestations with block insertion

* fix tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Terence's review

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-02 00:19:04 +00:00
Potuz
26b276660f Avoid unnecessary calls to ExitInformation() (#15764)
* Avoid unnecessary calls to ExitInformation()

ExitInformation runs a loop over the whole validator set. This is needed
in case that there are slashings or exits to be processed in a block (we
could be caching or avoid this entirely post-Electra though). This PR
removes these calls on normal state transition to this function. h/t to
@terencechain for finding out this bug.

In addition, on processing withdrawal requests and registry updates, we
kept recomputing the exit information at the same time that the state is
updated and the function that updates the state already takes care of
tracking and updating the right exit information. So this PR removes the
calls to compute this exit information on a loop. Notice that this bug
has been present even before we had a function `ExitInformation()` so I
will document here to help the reviewer

Our previous behavior is to do this in a loop

```
st, err = validators.InitiateValidatorExit(ctx, st, vIdx, validators.ExitInformation(st))
```

This is a bit problematic since `ExitInformation` loops over the whole validator set to compute the exit information (and the total active balance) and then the function `InitiateValidatorExit` actually recomputes the total active balance looping again over the whole validator set and overwriting the pointer returned by `ExitInformation`.

On the other hand, the funciton `InitiateValidatorExit` does mutate the state `st` itself. So each call to `ExitInformation(st)` may actually return a different pointer.

The function ExitInformation computes as follows

```
	err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
		e := val.ExitEpoch()
		if e != farFutureEpoch {
			if e > exitInfo.HighestExitEpoch {
				exitInfo.HighestExitEpoch = e
				exitInfo.Churn = 1
			} else if e == exitInfo.HighestExitEpoch {
				exitInfo.Churn++
			}
```

So it simply increases the churn for each validator that has epoch equal to the highest exit epoch.

The function `InitiateValidatorExit` mutates this pointer in the following way

if the state is post-electra, it disregards completely this pointer and computes the highest exit epoch and updates churn inconditionally, so the pointer `exitInfo.HighestExitEpoch` will always have the right value and is not even neded to be computed before. We could even avoid the fist loop even. If the state is pre-Electra then the function itself updates correctly the exit info for the next iteration.

* Only care about exits pre-Electra

* Update beacon-chain/core/transition/transition_no_verify_sig.go

Co-authored-by: terence <terence@prysmaticlabs.com>

* Radek's review

---------

Co-authored-by: terence <terence@prysmaticlabs.com>
2025-10-02 00:17:39 +00:00
james-prysm
580509f2f4 attempting to improve duties v2 (#15784)
* attempting to improve duties v2

* removing go routine

* changelog

* unnessesary variable

* fixing test

* small optimization existing early on  CommitteeAssignments function

* fixing small bug

* fixes performance issues with duties v2

* fixed changelog

* gofmt
2025-10-01 20:40:14 +00:00
Potuz
be144da099 fix test race conditions (#15792)
Fix race condition where svc.verifierWaiter was being set after
svc.Start() was already running, causing nil pointer dereference.
2025-10-01 19:29:20 +00:00
Manu NALEPA
cc2565a422 Update c-kzg-4844 to v2.1.5 (#15708)
* Sort sidecars by index before calling `RecoverCellsAndKZGProofs`.

Reason: Starting at `c-kzg-4844 v2.1.2`, the library needs input to be sorted.

* Update `c-kzg-4844` to `v2.1.3`

* Update `c-kzg-4844` to `v2.1.5`
2025-10-01 14:24:22 +00:00
44 changed files with 560 additions and 612 deletions

View File

@@ -109,6 +109,7 @@ func VerifyCellKZGProofBatch(commitmentsBytes []Bytes48, cellIndices []uint64, c
}
// RecoverCellsAndKZGProofs recovers the complete cells and KZG proofs from a given set of cell indices and partial cells.
// Note: `len(cellIndices)` must be equal to `len(partialCells)` and `cellIndices` must be sorted in ascending order.
func RecoverCellsAndKZGProofs(cellIndices []uint64, partialCells []Cell) (CellsAndProofs, error) {
// Convert `Cell` type to `ckzg4844.Cell`
ckzgPartialCells := make([]ckzg4844.Cell, len(partialCells))

View File

@@ -72,7 +72,13 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
if features.Get().EnableLightClient && slots.ToEpoch(s.CurrentSlot()) >= params.BeaconConfig().AltairForkEpoch {
defer s.processLightClientUpdates(cfg)
}
defer s.sendStateFeedOnBlock(cfg)
// Track whether block processing succeeded to only send event on success
var blockProcessed bool
defer func() {
if blockProcessed {
s.sendStateFeedOnBlock(cfg)
}
}()
defer reportProcessingTime(startTime)
defer reportAttestationInclusion(cfg.roblock.Block())
@@ -101,16 +107,22 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
if cfg.headRoot != cfg.roblock.Root() {
s.logNonCanonicalBlockReceived(cfg.roblock.Root(), cfg.headRoot)
// Mark as processed even for non-canonical blocks that succeed
blockProcessed = true
return nil
}
if err := s.getFCUArgs(cfg, fcuArgs); err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
// Mark as processed - block was successfully inserted even if FCU args failed
blockProcessed = true
return nil
}
if err := s.sendFCU(cfg, fcuArgs); err != nil {
return errors.Wrap(err, "could not send FCU to engine")
}
// Block successfully processed
blockProcessed = true
return nil
}

View File

@@ -9,8 +9,10 @@ import (
"testing"
"time"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing"
@@ -3462,3 +3464,156 @@ func TestProcessLightClientFinalityUpdate(t *testing.T) {
}
}
}
// Test_postBlockProcess_EventSending tests that block processed events are only sent
// when block processing succeeds according to the decision tree:
//
// Block Processing Flow:
// ├─ InsertNode FAILS (fork choice timeout)
// │ └─ blockProcessed = false ❌ NO EVENT
// │
// ├─ InsertNode succeeds
// │ ├─ handleBlockAttestations FAILS
// │ │ └─ blockProcessed = false ❌ NO EVENT
// │ │
// │ ├─ Block is NON-CANONICAL (not head)
// │ │ └─ blockProcessed = true ✅ SEND EVENT (Line 111)
// │ │
// │ ├─ Block IS CANONICAL (new head)
// │ │ ├─ getFCUArgs FAILS
// │ │ │ └─ blockProcessed = true ✅ SEND EVENT (Line 117)
// │ │ │
// │ │ ├─ sendFCU FAILS
// │ │ │ └─ blockProcessed = false ❌ NO EVENT
// │ │ │
// │ │ └─ Full success
// │ │ └─ blockProcessed = true ✅ SEND EVENT (Line 125)
func Test_postBlockProcess_EventSending(t *testing.T) {
ctx := context.Background()
// Helper to create a minimal valid block and state
createTestBlockAndState := func(t *testing.T, slot primitives.Slot, parentRoot [32]byte) (consensusblocks.ROBlock, state.BeaconState) {
st, _ := util.DeterministicGenesisState(t, 64)
require.NoError(t, st.SetSlot(slot))
stateRoot, err := st.HashTreeRoot(ctx)
require.NoError(t, err)
blk := util.NewBeaconBlock()
blk.Block.Slot = slot
blk.Block.ProposerIndex = 0
blk.Block.ParentRoot = parentRoot[:]
blk.Block.StateRoot = stateRoot[:]
signed := util.HydrateSignedBeaconBlock(blk)
roBlock, err := consensusblocks.NewSignedBeaconBlock(signed)
require.NoError(t, err)
roBlk, err := consensusblocks.NewROBlock(roBlock)
require.NoError(t, err)
return roBlk, st
}
tests := []struct {
name string
setupService func(*Service, [32]byte)
expectEvent bool
expectError bool
errorContains string
}{
{
name: "Block successfully processed - sends event",
setupService: func(s *Service, blockRoot [32]byte) {
// Default setup should work
},
expectEvent: true,
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create service with required options
opts := testServiceOptsWithDB(t)
service, err := NewService(ctx, opts...)
require.NoError(t, err)
// Initialize fork choice with genesis block
st, _ := util.DeterministicGenesisState(t, 64)
require.NoError(t, st.SetSlot(0))
genesisBlock := util.NewBeaconBlock()
genesisBlock.Block.StateRoot = bytesutil.PadTo([]byte("genesisState"), 32)
signedGenesis := util.HydrateSignedBeaconBlock(genesisBlock)
block, err := consensusblocks.NewSignedBeaconBlock(signedGenesis)
require.NoError(t, err)
genesisRoot, err := block.Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, block))
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, genesisRoot))
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, st, genesisRoot))
genesisROBlock, err := consensusblocks.NewROBlock(block)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, genesisROBlock))
// Create test block and state with genesis as parent
roBlock, postSt := createTestBlockAndState(t, 100, genesisRoot)
// Apply additional service setup if provided
if tt.setupService != nil {
tt.setupService(service, roBlock.Root())
}
// Create post block process config
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roBlock,
postState: postSt,
isValidPayload: true,
}
// Execute postBlockProcess
err = service.postBlockProcess(cfg)
// Check error expectation
if tt.expectError {
require.NotNil(t, err)
if tt.errorContains != "" {
require.ErrorContains(t, tt.errorContains, err)
}
} else {
require.NoError(t, err)
}
// Give a moment for deferred functions to execute
time.Sleep(10 * time.Millisecond)
// Check event expectation
notifier := service.cfg.StateNotifier.(*mock.MockStateNotifier)
events := notifier.ReceivedEvents()
if tt.expectEvent {
require.NotEqual(t, 0, len(events), "Expected event to be sent but none were received")
// Verify it's a BlockProcessed event
foundBlockProcessed := false
for _, evt := range events {
if evt.Type == statefeed.BlockProcessed {
foundBlockProcessed = true
data, ok := evt.Data.(*statefeed.BlockProcessedData)
require.Equal(t, true, ok, "Event data should be BlockProcessedData")
require.Equal(t, roBlock.Root(), data.BlockRoot, "Event should contain correct block root")
break
}
}
require.Equal(t, true, foundBlockProcessed, "Expected BlockProcessed event type")
} else {
// For no-event cases, verify no BlockProcessed events were sent
for _, evt := range events {
require.NotEqual(t, statefeed.BlockProcessed, evt.Type,
"Expected no BlockProcessed event but one was sent")
}
}
})
}
}

View File

@@ -42,6 +42,9 @@ func ProcessAttesterSlashings(
slashings []ethpb.AttSlashing,
exitInfo *validators.ExitInfo,
) (state.BeaconState, error) {
if exitInfo == nil && len(slashings) > 0 {
return nil, errors.New("exit info required to process attester slashings")
}
var err error
for _, slashing := range slashings {
beaconState, err = ProcessAttesterSlashing(ctx, beaconState, slashing, exitInfo)
@@ -59,6 +62,9 @@ func ProcessAttesterSlashing(
slashing ethpb.AttSlashing,
exitInfo *validators.ExitInfo,
) (state.BeaconState, error) {
if exitInfo == nil {
return nil, errors.New("exit info is required to process attester slashing")
}
if err := VerifyAttesterSlashing(ctx, beaconState, slashing); err != nil {
return nil, errors.Wrap(err, "could not verify attester slashing")
}

View File

@@ -55,6 +55,9 @@ func ProcessVoluntaryExits(
if len(exits) == 0 {
return beaconState, nil
}
if exitInfo == nil {
return nil, errors.New("exit info required to process voluntary exits")
}
for idx, exit := range exits {
if exit == nil || exit.Exit == nil {
return nil, errors.New("nil voluntary exit in block body")

View File

@@ -51,6 +51,9 @@ func ProcessProposerSlashings(
slashings []*ethpb.ProposerSlashing,
exitInfo *validators.ExitInfo,
) (state.BeaconState, error) {
if exitInfo == nil && len(slashings) > 0 {
return nil, errors.New("exit info required to process proposer slashings")
}
var err error
for _, slashing := range slashings {
beaconState, err = ProcessProposerSlashing(ctx, beaconState, slashing, exitInfo)
@@ -75,6 +78,9 @@ func ProcessProposerSlashing(
if err = VerifyProposerSlashing(beaconState, slashing); err != nil {
return nil, errors.Wrap(err, "could not verify proposer slashing")
}
if exitInfo == nil {
return nil, errors.New("exit info is required to process proposer slashing")
}
beaconState, err = validators.SlashValidator(ctx, beaconState, slashing.Header_1.Header.ProposerIndex, exitInfo)
if err != nil {
return nil, errors.Wrapf(err, "could not slash proposer index %d", slashing.Header_1.Header.ProposerIndex)

View File

@@ -53,9 +53,15 @@ func ProcessOperations(ctx context.Context, st state.BeaconState, block interfac
// 6110 validations are in VerifyOperationLengths
bb := block.Body()
// Electra extends the altair operations.
exitInfo := v.ExitInformation(st)
if err := helpers.UpdateTotalActiveBalanceCache(st, exitInfo.TotalActiveBalance); err != nil {
return nil, errors.Wrap(err, "could not update total active balance cache")
var exitInfo *v.ExitInfo
hasSlashings := len(bb.ProposerSlashings()) > 0 || len(bb.AttesterSlashings()) > 0
hasExits := len(bb.VoluntaryExits()) > 0
if hasSlashings || hasExits {
// ExitInformation is expensive to compute, only do it if we need it.
exitInfo = v.ExitInformation(st)
if err := helpers.UpdateTotalActiveBalanceCache(st, exitInfo.TotalActiveBalance); err != nil {
return nil, errors.Wrap(err, "could not update total active balance cache")
}
}
st, err = ProcessProposerSlashings(ctx, st, bb.ProposerSlashings(), exitInfo)
if err != nil {

View File

@@ -13,6 +13,7 @@ import (
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
enginev1 "github.com/OffchainLabs/prysm/v6/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
@@ -91,6 +92,18 @@ func ProcessWithdrawalRequests(ctx context.Context, st state.BeaconState, wrs []
ctx, span := trace.StartSpan(ctx, "electra.ProcessWithdrawalRequests")
defer span.End()
currentEpoch := slots.ToEpoch(st.Slot())
if len(wrs) == 0 {
return st, nil
}
// It is correct to compute exitInfo once for all withdrawals in the block, as the ExitInfo pointer is
// updated within InitiateValidatorExit which is the only function that uses it.
var exitInfo *validators.ExitInfo
if st.Version() < version.Electra {
exitInfo = validators.ExitInformation(st)
} else {
// After Electra, the function InitiateValidatorExit ignores the exitInfo passed to it and recomputes it anyway.
exitInfo = &validators.ExitInfo{}
}
for _, wr := range wrs {
if wr == nil {
return nil, errors.New("nil execution layer withdrawal request")
@@ -148,7 +161,8 @@ func ProcessWithdrawalRequests(ctx context.Context, st state.BeaconState, wrs []
// Only exit validator if it has no pending withdrawals in the queue
if pendingBalanceToWithdraw == 0 {
var err error
st, err = validators.InitiateValidatorExit(ctx, st, vIdx, validators.ExitInformation(st))
// exitInfo is updated within InitiateValidatorExit
st, err = validators.InitiateValidatorExit(ctx, st, vIdx, exitInfo)
if err != nil {
return nil, err
}

View File

@@ -96,12 +96,17 @@ func ProcessRegistryUpdates(ctx context.Context, st state.BeaconState) (state.Be
}
// Process validators eligible for ejection.
for _, idx := range eligibleForEjection {
// Here is fine to do a quadratic loop since this should
// barely happen
st, err = validators.InitiateValidatorExit(ctx, st, idx, validators.ExitInformation(st))
if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) {
return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx)
if len(eligibleForEjection) > 0 {
// It is safe to compute exitInfo once for all ejections in the epoch, as the ExitInfo pointer is
// updated within InitiateValidatorExit which is the only function that uses it.
exitInfo := validators.ExitInformation(st)
for _, idx := range eligibleForEjection {
// Here is fine to do a quadratic loop since this should
// barely happen
st, err = validators.InitiateValidatorExit(ctx, st, idx, exitInfo)
if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) {
return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx)
}
}
}

View File

@@ -399,7 +399,6 @@ func CommitteeAssignments(ctx context.Context, state state.BeaconState, epoch pr
ctx, span := trace.StartSpan(ctx, "helpers.CommitteeAssignments")
defer span.End()
// Verify if the epoch is valid for assignment based on the provided state.
if err := VerifyAssignmentEpoch(epoch, state); err != nil {
return nil, err
}
@@ -407,12 +406,15 @@ func CommitteeAssignments(ctx context.Context, state state.BeaconState, epoch pr
if err != nil {
return nil, err
}
vals := make(map[primitives.ValidatorIndex]struct{})
// Deduplicate and make set for O(1) membership checks.
vals := make(map[primitives.ValidatorIndex]struct{}, len(validators))
for _, v := range validators {
vals[v] = struct{}{}
}
assignments := make(map[primitives.ValidatorIndex]*CommitteeAssignment)
// Compute committee assignments for each slot in the epoch.
remaining := len(vals)
assignments := make(map[primitives.ValidatorIndex]*CommitteeAssignment, len(vals))
for slot := startSlot; slot < startSlot+params.BeaconConfig().SlotsPerEpoch; slot++ {
committees, err := BeaconCommittees(ctx, state, slot)
if err != nil {
@@ -420,7 +422,7 @@ func CommitteeAssignments(ctx context.Context, state state.BeaconState, epoch pr
}
for j, committee := range committees {
for _, vIndex := range committee {
if _, ok := vals[vIndex]; !ok { // Skip if the validator is not in the provided validators slice.
if _, ok := vals[vIndex]; !ok {
continue
}
if _, ok := assignments[vIndex]; !ok {
@@ -429,6 +431,11 @@ func CommitteeAssignments(ctx context.Context, state state.BeaconState, epoch pr
assignments[vIndex].Committee = committee
assignments[vIndex].AttesterSlot = slot
assignments[vIndex].CommitteeIndex = primitives.CommitteeIndex(j)
delete(vals, vIndex)
remaining--
if remaining == 0 {
return assignments, nil // early exit
}
}
}
}

View File

@@ -1,6 +1,8 @@
package peerdas
import (
"sort"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
@@ -28,7 +30,8 @@ func MinimumColumnCountToReconstruct() uint64 {
// ReconstructDataColumnSidecars reconstructs all the data column sidecars from the given input data column sidecars.
// All input sidecars must be committed to the same block.
// `inVerifiedRoSidecars` should contain enough (unique) sidecars to reconstruct the missing columns.
// `inVerifiedRoSidecars` should contain enough sidecars to reconstruct the missing columns, and should not contain any duplicate.
// WARNING: This function sorts inplace `verifiedRoSidecars` by index.
func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataColumn) ([]blocks.VerifiedRODataColumn, error) {
// Check if there is at least one input sidecar.
if len(verifiedRoSidecars) == 0 {
@@ -51,18 +54,17 @@ func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataCol
}
}
// Deduplicate sidecars.
sidecarByIndex := make(map[uint64]blocks.VerifiedRODataColumn, len(verifiedRoSidecars))
for _, inVerifiedRoSidecar := range verifiedRoSidecars {
sidecarByIndex[inVerifiedRoSidecar.Index] = inVerifiedRoSidecar
}
// Check if there is enough sidecars to reconstruct the missing columns.
sidecarCount := len(sidecarByIndex)
sidecarCount := len(verifiedRoSidecars)
if uint64(sidecarCount) < MinimumColumnCountToReconstruct() {
return nil, ErrNotEnoughDataColumnSidecars
}
// Sort the input sidecars by index.
sort.Slice(verifiedRoSidecars, func(i, j int) bool {
return verifiedRoSidecars[i].Index < verifiedRoSidecars[j].Index
})
// Recover cells and compute proofs in parallel.
var wg errgroup.Group
cellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
@@ -71,10 +73,10 @@ func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataCol
cellsIndices := make([]uint64, 0, sidecarCount)
cells := make([]kzg.Cell, 0, sidecarCount)
for columnIndex, sidecar := range sidecarByIndex {
for _, sidecar := range verifiedRoSidecars {
cell := sidecar.Column[blobIndex]
cells = append(cells, kzg.Cell(cell))
cellsIndices = append(cellsIndices, columnIndex)
cellsIndices = append(cellsIndices, sidecar.Index)
}
// Recover the cells and proofs for the corresponding blob

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/electra"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition/interop"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/validators"
v "github.com/OffchainLabs/prysm/v6/beacon-chain/core/validators"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -378,9 +379,16 @@ func ProcessBlockForStateRoot(
func altairOperations(ctx context.Context, st state.BeaconState, beaconBlock interfaces.ReadOnlyBeaconBlock) (state.BeaconState, error) {
var err error
exitInfo := v.ExitInformation(st)
if err := helpers.UpdateTotalActiveBalanceCache(st, exitInfo.TotalActiveBalance); err != nil {
return nil, errors.Wrap(err, "could not update total active balance cache")
hasSlashings := len(beaconBlock.Body().ProposerSlashings()) > 0 || len(beaconBlock.Body().AttesterSlashings()) > 0
// exitInfo is only needed for voluntary exits pre Electra.
hasExits := st.Version() < version.Electra && len(beaconBlock.Body().VoluntaryExits()) > 0
exitInfo := &validators.ExitInfo{}
if hasSlashings || hasExits {
// ExitInformation is expensive to compute, only do it if we need it.
exitInfo = v.ExitInformation(st)
if err := helpers.UpdateTotalActiveBalanceCache(st, exitInfo.TotalActiveBalance); err != nil {
return nil, errors.Wrap(err, "could not update total active balance cache")
}
}
st, err = b.ProcessProposerSlashings(ctx, st, beaconBlock.Body().ProposerSlashings(), exitInfo)
if err != nil {
@@ -407,10 +415,15 @@ func altairOperations(ctx context.Context, st state.BeaconState, beaconBlock int
// This calls phase 0 block operations.
func phase0Operations(ctx context.Context, st state.BeaconState, beaconBlock interfaces.ReadOnlyBeaconBlock) (state.BeaconState, error) {
var err error
exitInfo := v.ExitInformation(st)
if err := helpers.UpdateTotalActiveBalanceCache(st, exitInfo.TotalActiveBalance); err != nil {
return nil, errors.Wrap(err, "could not update total active balance cache")
hasSlashings := len(beaconBlock.Body().ProposerSlashings()) > 0 || len(beaconBlock.Body().AttesterSlashings()) > 0
hasExits := len(beaconBlock.Body().VoluntaryExits()) > 0
var exitInfo *v.ExitInfo
if hasSlashings || hasExits {
// ExitInformation is expensive to compute, only do it if we need it.
exitInfo = v.ExitInformation(st)
if err := helpers.UpdateTotalActiveBalanceCache(st, exitInfo.TotalActiveBalance); err != nil {
return nil, errors.Wrap(err, "could not update total active balance cache")
}
}
st, err = b.ProcessProposerSlashings(ctx, st, beaconBlock.Body().ProposerSlashings(), exitInfo)
if err != nil {

View File

@@ -98,7 +98,9 @@ func InitiateValidatorExit(
if validator.ExitEpoch != params.BeaconConfig().FarFutureEpoch {
return s, ErrValidatorAlreadyExited
}
if exitInfo == nil {
return nil, errors.New("exit info is required to process validator exit")
}
// Compute exit queue epoch.
if s.Version() < version.Electra {
if err = initiateValidatorExitPreElectra(ctx, s, exitInfo); err != nil {
@@ -177,6 +179,9 @@ func initiateValidatorExitPreElectra(ctx context.Context, s state.BeaconState, e
// if exit_queue_churn >= get_validator_churn_limit(state):
// exit_queue_epoch += Epoch(1)
exitableEpoch := helpers.ActivationExitEpoch(time.CurrentEpoch(s))
if exitInfo == nil {
return errors.New("exit info is required to process validator exit")
}
if exitableEpoch > exitInfo.HighestExitEpoch {
exitInfo.HighestExitEpoch = exitableEpoch
exitInfo.Churn = 0
@@ -235,7 +240,9 @@ func SlashValidator(
exitInfo *ExitInfo,
) (state.BeaconState, error) {
var err error
if exitInfo == nil {
return nil, errors.New("exit info is required to slash validator")
}
s, err = InitiateValidatorExitForTotalBal(ctx, s, slashedIdx, exitInfo, primitives.Gwei(exitInfo.TotalActiveBalance))
if err != nil && !errors.Is(err, ErrValidatorAlreadyExited) {
return nil, errors.Wrapf(err, "could not initiate validator %d exit", slashedIdx)

View File

@@ -254,6 +254,7 @@ func (s *Store) getCacheUpdatesByPeriod(headBlock interfaces.ReadOnlySignedBeaco
return updatesByPeriod, nil
}
// SetLastFinalityUpdate should be used only for testing.
func (s *Store) SetLastFinalityUpdate(update interfaces.LightClientFinalityUpdate, broadcast bool) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -283,6 +284,7 @@ func (s *Store) LastFinalityUpdate() interfaces.LightClientFinalityUpdate {
return s.lastFinalityUpdate
}
// SetLastOptimisticUpdate should be used only for testing.
func (s *Store) SetLastOptimisticUpdate(update interfaces.LightClientOptimisticUpdate, broadcast bool) {
s.mu.Lock()
defer s.mu.Unlock()

View File

@@ -68,7 +68,11 @@ func (rs *BlockRewardService) GetBlockRewardsData(ctx context.Context, blk inter
Code: http.StatusInternalServerError,
}
}
exitInfo := validators.ExitInformation(st)
var exitInfo *validators.ExitInfo
if len(blk.Body().ProposerSlashings()) > 0 || len(blk.Body().AttesterSlashings()) > 0 {
// ExitInformation is expensive to compute, only do it if we need it.
exitInfo = validators.ExitInformation(st)
}
st, err = coreblocks.ProcessAttesterSlashings(ctx, st, blk.Body().AttesterSlashings(), exitInfo)
if err != nil {
return nil, &httputil.DefaultJsonError{

View File

@@ -17,13 +17,6 @@ import (
"google.golang.org/grpc/status"
)
const (
// validatorLookupThreshold determines when to use full assignment map vs cached linear search.
// For requests with fewer validators, we use cached linear search to avoid the overhead
// of building a complete assignment map for all validators in the epoch.
validatorLookupThreshold = 3000
)
// GetDutiesV2 returns the duties assigned to a list of validators specified
// in the request object.
//
@@ -60,7 +53,26 @@ func (vs *Server) dutiesv2(ctx context.Context, req *ethpb.DutiesRequest) (*ethp
span.SetAttributes(trace.Int64Attribute("num_pubkeys", int64(len(req.PublicKeys))))
defer span.End()
meta, err := loadDutiesMetadata(ctx, s, req.Epoch, len(req.PublicKeys))
// Collect validator indices from public keys and cache the lookups
type validatorInfo struct {
index primitives.ValidatorIndex
found bool
}
validatorLookup := make(map[string]validatorInfo, len(req.PublicKeys))
requestIndices := make([]primitives.ValidatorIndex, 0, len(req.PublicKeys))
for _, pubKey := range req.PublicKeys {
key := string(pubKey)
if _, exists := validatorLookup[key]; !exists {
idx, ok := s.ValidatorIndexByPubkey(bytesutil.ToBytes48(pubKey))
validatorLookup[key] = validatorInfo{index: idx, found: ok}
if ok {
requestIndices = append(requestIndices, idx)
}
}
}
meta, err := loadDutiesMetadata(ctx, s, req.Epoch, requestIndices)
if err != nil {
return nil, err
}
@@ -68,14 +80,14 @@ func (vs *Server) dutiesv2(ctx context.Context, req *ethpb.DutiesRequest) (*ethp
validatorAssignments := make([]*ethpb.DutiesV2Response_Duty, 0, len(req.PublicKeys))
nextValidatorAssignments := make([]*ethpb.DutiesV2Response_Duty, 0, len(req.PublicKeys))
// start loop for assignments for current and next epochs
// Build duties using cached validator index lookups
for _, pubKey := range req.PublicKeys {
if ctx.Err() != nil {
return nil, status.Errorf(codes.Aborted, "Could not continue fetching assignments: %v", ctx.Err())
}
validatorIndex, ok := s.ValidatorIndexByPubkey(bytesutil.ToBytes48(pubKey))
if !ok {
info := validatorLookup[string(pubKey)]
if !info.found {
unknownDuty := &ethpb.DutiesV2Response_Duty{
PublicKey: pubKey,
Status: ethpb.ValidatorStatus_UNKNOWN_STATUS,
@@ -85,16 +97,15 @@ func (vs *Server) dutiesv2(ctx context.Context, req *ethpb.DutiesRequest) (*ethp
continue
}
meta.current.liteAssignment = vs.getValidatorAssignment(meta.current, validatorIndex)
currentAssignment := vs.getValidatorAssignment(meta.current, info.index)
nextAssignment := vs.getValidatorAssignment(meta.next, info.index)
meta.next.liteAssignment = vs.getValidatorAssignment(meta.next, validatorIndex)
assignment, nextAssignment, err := vs.buildValidatorDuty(pubKey, validatorIndex, s, req.Epoch, meta)
assignment, nextDuty, err := vs.buildValidatorDuty(pubKey, info.index, s, req.Epoch, meta, currentAssignment, nextAssignment)
if err != nil {
return nil, err
}
validatorAssignments = append(validatorAssignments, assignment)
nextValidatorAssignments = append(nextValidatorAssignments, nextAssignment)
nextValidatorAssignments = append(nextValidatorAssignments, nextDuty)
}
// Dependent roots for fork choice
@@ -147,18 +158,15 @@ type dutiesMetadata struct {
}
type metadata struct {
committeesAtSlot uint64
proposalSlots map[primitives.ValidatorIndex][]primitives.Slot
startSlot primitives.Slot
committeesBySlot [][][]primitives.ValidatorIndex
validatorAssignmentMap map[primitives.ValidatorIndex]*helpers.LiteAssignment
liteAssignment *helpers.LiteAssignment
committeesAtSlot uint64
proposalSlots map[primitives.ValidatorIndex][]primitives.Slot
committeeAssignments map[primitives.ValidatorIndex]*helpers.CommitteeAssignment
}
func loadDutiesMetadata(ctx context.Context, s state.BeaconState, reqEpoch primitives.Epoch, numValidators int) (*dutiesMetadata, error) {
func loadDutiesMetadata(ctx context.Context, s state.BeaconState, reqEpoch primitives.Epoch, requestIndices []primitives.ValidatorIndex) (*dutiesMetadata, error) {
meta := &dutiesMetadata{}
var err error
meta.current, err = loadMetadata(ctx, s, reqEpoch, numValidators)
meta.current, err = loadMetadata(ctx, s, reqEpoch, requestIndices)
if err != nil {
return nil, err
}
@@ -168,14 +176,14 @@ func loadDutiesMetadata(ctx context.Context, s state.BeaconState, reqEpoch primi
return nil, status.Errorf(codes.Internal, "Could not compute proposer slots: %v", err)
}
meta.next, err = loadMetadata(ctx, s, reqEpoch+1, numValidators)
meta.next, err = loadMetadata(ctx, s, reqEpoch+1, requestIndices)
if err != nil {
return nil, err
}
return meta, nil
}
func loadMetadata(ctx context.Context, s state.BeaconState, reqEpoch primitives.Epoch, numValidators int) (*metadata, error) {
func loadMetadata(ctx context.Context, s state.BeaconState, reqEpoch primitives.Epoch, requestIndices []primitives.ValidatorIndex) (*metadata, error) {
meta := &metadata{}
if err := helpers.VerifyAssignmentEpoch(reqEpoch, s); err != nil {
@@ -188,56 +196,36 @@ func loadMetadata(ctx context.Context, s state.BeaconState, reqEpoch primitives.
}
meta.committeesAtSlot = helpers.SlotCommitteeCount(activeValidatorCount)
meta.startSlot, err = slots.EpochStart(reqEpoch)
// Use CommitteeAssignments which only computes committees for requested validators
meta.committeeAssignments, err = helpers.CommitteeAssignments(ctx, s, reqEpoch, requestIndices)
if err != nil {
return nil, err
}
meta.committeesBySlot, err = helpers.PrecomputeCommittees(ctx, s, meta.startSlot)
if err != nil {
return nil, err
}
if numValidators >= validatorLookupThreshold {
meta.validatorAssignmentMap = buildValidatorAssignmentMap(meta.committeesBySlot, meta.startSlot)
return nil, status.Errorf(codes.Internal, "Could not compute committee assignments: %v", err)
}
return meta, nil
}
// buildValidatorAssignmentMap creates a map from validator index to assignment for O(1) lookup.
func buildValidatorAssignmentMap(
bySlot [][][]primitives.ValidatorIndex,
startSlot primitives.Slot,
) map[primitives.ValidatorIndex]*helpers.LiteAssignment {
validatorToAssignment := make(map[primitives.ValidatorIndex]*helpers.LiteAssignment)
for relativeSlot, committees := range bySlot {
for cIdx, committee := range committees {
for pos, vIdx := range committee {
validatorToAssignment[vIdx] = &helpers.LiteAssignment{
AttesterSlot: startSlot + primitives.Slot(relativeSlot),
CommitteeIndex: primitives.CommitteeIndex(cIdx),
CommitteeLength: uint64(len(committee)),
ValidatorCommitteeIndex: uint64(pos),
}
}
// findValidatorIndexInCommittee finds the position of a validator in a committee.
func findValidatorIndexInCommittee(committee []primitives.ValidatorIndex, validatorIndex primitives.ValidatorIndex) uint64 {
for i, vIdx := range committee {
if vIdx == validatorIndex {
return uint64(i)
}
}
return validatorToAssignment
return 0
}
// getValidatorAssignment retrieves the assignment for a validator using either
// the pre-built assignment map (for large requests) or linear search (for small requests).
// getValidatorAssignment retrieves the assignment for a validator from CommitteeAssignments.
func (vs *Server) getValidatorAssignment(meta *metadata, validatorIndex primitives.ValidatorIndex) *helpers.LiteAssignment {
if meta.validatorAssignmentMap != nil {
if assignment, exists := meta.validatorAssignmentMap[validatorIndex]; exists {
return assignment
if assignment, exists := meta.committeeAssignments[validatorIndex]; exists {
return &helpers.LiteAssignment{
AttesterSlot: assignment.AttesterSlot,
CommitteeIndex: assignment.CommitteeIndex,
CommitteeLength: uint64(len(assignment.Committee)),
ValidatorCommitteeIndex: findValidatorIndexInCommittee(assignment.Committee, validatorIndex),
}
return &helpers.LiteAssignment{}
}
return helpers.AssignmentForValidator(meta.committeesBySlot, meta.startSlot, validatorIndex)
return &helpers.LiteAssignment{}
}
// buildValidatorDuty builds both currentepoch and nextepoch V2 duty objects
@@ -248,21 +236,23 @@ func (vs *Server) buildValidatorDuty(
s state.BeaconState,
reqEpoch primitives.Epoch,
meta *dutiesMetadata,
currentAssignment *helpers.LiteAssignment,
nextAssignment *helpers.LiteAssignment,
) (*ethpb.DutiesV2Response_Duty, *ethpb.DutiesV2Response_Duty, error) {
assignment := &ethpb.DutiesV2Response_Duty{PublicKey: pubKey}
nextAssignment := &ethpb.DutiesV2Response_Duty{PublicKey: pubKey}
nextDuty := &ethpb.DutiesV2Response_Duty{PublicKey: pubKey}
statusEnum := assignmentStatus(s, idx)
assignment.ValidatorIndex = idx
assignment.Status = statusEnum
assignment.CommitteesAtSlot = meta.current.committeesAtSlot
assignment.ProposerSlots = meta.current.proposalSlots[idx]
populateCommitteeFields(assignment, meta.current.liteAssignment)
populateCommitteeFields(assignment, currentAssignment)
nextAssignment.ValidatorIndex = idx
nextAssignment.Status = statusEnum
nextAssignment.CommitteesAtSlot = meta.next.committeesAtSlot
populateCommitteeFields(nextAssignment, meta.next.liteAssignment)
nextDuty.ValidatorIndex = idx
nextDuty.Status = statusEnum
nextDuty.CommitteesAtSlot = meta.next.committeesAtSlot
populateCommitteeFields(nextDuty, nextAssignment)
// Sync committee flags
if coreTime.HigherEqualThanAltairVersionAndEpoch(s, reqEpoch) {
@@ -271,7 +261,7 @@ func (vs *Server) buildValidatorDuty(
return nil, nil, status.Errorf(codes.Internal, "Could not determine current epoch sync committee: %v", err)
}
assignment.IsSyncCommittee = inSync
nextAssignment.IsSyncCommittee = inSync
nextDuty.IsSyncCommittee = inSync
if inSync {
if err := core.RegisterSyncSubnetCurrentPeriodProto(s, reqEpoch, pubKey, statusEnum); err != nil {
return nil, nil, status.Errorf(codes.Internal, "Could not register sync subnet current period: %v", err)
@@ -290,18 +280,16 @@ func (vs *Server) buildValidatorDuty(
if err != nil {
return nil, nil, status.Errorf(codes.Internal, "Could not determine next epoch sync committee: %v", err)
}
nextAssignment.IsSyncCommittee = nextInSync
nextDuty.IsSyncCommittee = nextInSync
if nextInSync {
go func() {
if err := core.RegisterSyncSubnetNextPeriodProto(s, reqEpoch, pubKey, statusEnum); err != nil {
log.WithError(err).Warn("Could not register sync subnet next period")
}
}()
if err := core.RegisterSyncSubnetNextPeriodProto(s, reqEpoch, pubKey, statusEnum); err != nil {
log.WithError(err).Warn("Could not register sync subnet next period")
}
}
}
}
return assignment, nextAssignment, nil
return assignment, nextDuty, nil
}
func populateCommitteeFields(duty *ethpb.DutiesV2Response_Duty, la *helpers.LiteAssignment) {

View File

@@ -560,105 +560,20 @@ func TestGetDutiesV2_SyncNotReady(t *testing.T) {
assert.ErrorContains(t, "Syncing to latest head", err)
}
func TestBuildValidatorAssignmentMap(t *testing.T) {
start := primitives.Slot(200)
bySlot := [][][]primitives.ValidatorIndex{
{{1, 2, 3}}, // slot 200, committee 0
{{7, 8, 9}}, // slot 201, committee 0
{{4, 5}, {10, 11}}, // slot 202, committee 0 & 1
}
assignmentMap := buildValidatorAssignmentMap(bySlot, start)
// Test validator 8 assignment (slot 201, committee 0, position 1)
vIdx := primitives.ValidatorIndex(8)
got, exists := assignmentMap[vIdx]
assert.Equal(t, true, exists)
require.NotNil(t, got)
assert.Equal(t, start+1, got.AttesterSlot)
assert.Equal(t, primitives.CommitteeIndex(0), got.CommitteeIndex)
assert.Equal(t, uint64(3), got.CommitteeLength)
assert.Equal(t, uint64(1), got.ValidatorCommitteeIndex)
// Test validator 1 assignment (slot 200, committee 0, position 0)
vIdx1 := primitives.ValidatorIndex(1)
got1, exists1 := assignmentMap[vIdx1]
assert.Equal(t, true, exists1)
require.NotNil(t, got1)
assert.Equal(t, start, got1.AttesterSlot)
assert.Equal(t, primitives.CommitteeIndex(0), got1.CommitteeIndex)
assert.Equal(t, uint64(3), got1.CommitteeLength)
assert.Equal(t, uint64(0), got1.ValidatorCommitteeIndex)
// Test validator 10 assignment (slot 202, committee 1, position 0)
vIdx10 := primitives.ValidatorIndex(10)
got10, exists10 := assignmentMap[vIdx10]
assert.Equal(t, true, exists10)
require.NotNil(t, got10)
assert.Equal(t, start+2, got10.AttesterSlot)
assert.Equal(t, primitives.CommitteeIndex(1), got10.CommitteeIndex)
assert.Equal(t, uint64(2), got10.CommitteeLength)
assert.Equal(t, uint64(0), got10.ValidatorCommitteeIndex)
// Test non-existent validator
_, exists99 := assignmentMap[primitives.ValidatorIndex(99)]
assert.Equal(t, false, exists99)
// Verify that we get the same results as the linear search
for _, committees := range bySlot {
for _, committee := range committees {
for _, validatorIdx := range committee {
linearResult := helpers.AssignmentForValidator(bySlot, start, validatorIdx)
mapResult, mapExists := assignmentMap[validatorIdx]
assert.Equal(t, true, mapExists)
require.DeepEqual(t, linearResult, mapResult)
}
}
}
}
func TestGetValidatorAssignment_WithAssignmentMap(t *testing.T) {
func TestGetValidatorAssignment(t *testing.T) {
start := primitives.Slot(100)
bySlot := [][][]primitives.ValidatorIndex{
{{1, 2, 3}},
{{4, 5, 6}},
// Test using CommitteeAssignments
committeeAssignments := map[primitives.ValidatorIndex]*helpers.CommitteeAssignment{
5: {
Committee: []primitives.ValidatorIndex{4, 5, 6},
AttesterSlot: start + 1,
CommitteeIndex: primitives.CommitteeIndex(0),
},
}
// Test with pre-built assignment map (large request scenario)
meta := &metadata{
startSlot: start,
committeesBySlot: bySlot,
validatorAssignmentMap: buildValidatorAssignmentMap(bySlot, start),
}
vs := &Server{}
// Test existing validator (validator 2 is at position 1 in the committee, not position 2)
assignment := vs.getValidatorAssignment(meta, primitives.ValidatorIndex(2))
require.NotNil(t, assignment)
assert.Equal(t, start, assignment.AttesterSlot)
assert.Equal(t, primitives.CommitteeIndex(0), assignment.CommitteeIndex)
assert.Equal(t, uint64(1), assignment.ValidatorCommitteeIndex)
// Test non-existent validator should return empty assignment
assignment = vs.getValidatorAssignment(meta, primitives.ValidatorIndex(99))
require.NotNil(t, assignment)
assert.Equal(t, primitives.Slot(0), assignment.AttesterSlot)
assert.Equal(t, primitives.CommitteeIndex(0), assignment.CommitteeIndex)
}
func TestGetValidatorAssignment_WithoutAssignmentMap(t *testing.T) {
start := primitives.Slot(100)
bySlot := [][][]primitives.ValidatorIndex{
{{1, 2, 3}},
{{4, 5, 6}},
}
// Test without assignment map (small request scenario)
meta := &metadata{
startSlot: start,
committeesBySlot: bySlot,
validatorAssignmentMap: nil, // No map - should use linear search
committeeAssignments: committeeAssignments,
}
vs := &Server{}
@@ -676,53 +591,3 @@ func TestGetValidatorAssignment_WithoutAssignmentMap(t *testing.T) {
assert.Equal(t, primitives.Slot(0), assignment.AttesterSlot)
assert.Equal(t, primitives.CommitteeIndex(0), assignment.CommitteeIndex)
}
func TestLoadMetadata_ThresholdBehavior(t *testing.T) {
state, _ := util.DeterministicGenesisState(t, 128)
epoch := primitives.Epoch(0)
tests := []struct {
name string
numValidators int
expectAssignmentMap bool
}{
{
name: "Small request - below threshold",
numValidators: 100,
expectAssignmentMap: false,
},
{
name: "Large request - at threshold",
numValidators: validatorLookupThreshold,
expectAssignmentMap: true,
},
{
name: "Large request - above threshold",
numValidators: validatorLookupThreshold + 1000,
expectAssignmentMap: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
meta, err := loadMetadata(t.Context(), state, epoch, tt.numValidators)
require.NoError(t, err)
require.NotNil(t, meta)
if tt.expectAssignmentMap {
require.NotNil(t, meta.validatorAssignmentMap, "Expected assignment map to be built for large requests")
assert.Equal(t, true, len(meta.validatorAssignmentMap) > 0, "Assignment map should not be empty")
} else {
// For small requests, the map should be nil (not initialized)
if meta.validatorAssignmentMap != nil {
t.Errorf("Expected no assignment map for small requests, got: %v", meta.validatorAssignmentMap)
}
}
// Common fields should always be set
assert.Equal(t, true, meta.committeesAtSlot > 0)
require.NotNil(t, meta.committeesBySlot)
assert.Equal(t, true, len(meta.committeesBySlot) > 0)
})
}
}

View File

@@ -12,13 +12,18 @@ import (
func (vs *Server) getSlashings(ctx context.Context, head state.BeaconState) ([]*ethpb.ProposerSlashing, []ethpb.AttSlashing) {
var err error
proposerSlashings := vs.SlashingsPool.PendingProposerSlashings(ctx, head, false /*noLimit*/)
attSlashings := vs.SlashingsPool.PendingAttesterSlashings(ctx, head, false /*noLimit*/)
validProposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(proposerSlashings))
validAttSlashings := make([]ethpb.AttSlashing, 0, len(attSlashings))
if len(proposerSlashings) == 0 && len(attSlashings) == 0 {
return validProposerSlashings, validAttSlashings
}
// ExitInformation is expensive to compute, only do it if we need it.
exitInfo := v.ExitInformation(head)
if err := helpers.UpdateTotalActiveBalanceCache(head, exitInfo.TotalActiveBalance); err != nil {
log.WithError(err).Warn("Could not update total active balance cache")
}
proposerSlashings := vs.SlashingsPool.PendingProposerSlashings(ctx, head, false /*noLimit*/)
validProposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(proposerSlashings))
for _, slashing := range proposerSlashings {
_, err = blocks.ProcessProposerSlashing(ctx, head, slashing, exitInfo)
if err != nil {
@@ -27,8 +32,6 @@ func (vs *Server) getSlashings(ctx context.Context, head state.BeaconState) ([]*
}
validProposerSlashings = append(validProposerSlashings, slashing)
}
attSlashings := vs.SlashingsPool.PendingAttesterSlashings(ctx, head, false /*noLimit*/)
validAttSlashings := make([]ethpb.AttSlashing, 0, len(attSlashings))
for _, slashing := range attSlashings {
_, err = blocks.ProcessAttesterSlashing(ctx, head, slashing, exitInfo)
if err != nil {

View File

@@ -45,7 +45,6 @@ go_library(
"subscriber_bls_to_execution_change.go",
"subscriber_data_column_sidecar.go",
"subscriber_handlers.go",
"subscriber_light_client.go",
"subscriber_sync_committee_message.go",
"subscriber_sync_contribution_proof.go",
"subscription_topic_handler.go",
@@ -113,7 +112,6 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/light-client:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
"//container/leaky-bucket:go_default_library",

View File

@@ -4,9 +4,8 @@ import (
"bytes"
"context"
"encoding/hex"
"sync"
"fmt"
"github.com/OffchainLabs/prysm/v6/async"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
@@ -24,70 +23,51 @@ import (
"github.com/sirupsen/logrus"
)
// This defines how often a node cleans up and processes pending attestations in the queue.
var processPendingAttsPeriod = slots.DivideSlotBy(2 /* twice per slot */)
var pendingAttsLimit = 10000
var pendingAttsLimit = 32768
// This processes pending attestation queues on every processPendingAttsPeriod.
func (s *Service) runPendingAttsQueue() {
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
mutex := new(sync.Mutex)
async.RunEvery(s.ctx, processPendingAttsPeriod, func() {
mutex.Lock()
if err := s.processPendingAtts(s.ctx); err != nil {
log.WithError(err).Debug("Could not process pending attestation")
}
mutex.Unlock()
})
}
// This defines how pending attestations are processed. It contains features:
// 1. Clean up invalid pending attestations from the queue.
// 2. Check if pending attestations can be processed when the block has arrived.
// 3. Request block from a random peer if unable to proceed step 2.
func (s *Service) processPendingAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
// This method processes pending attestations as a "known" block as arrived. With validations,
// the valid attestations get saved into the operation mem pool, and the invalid attestations gets deleted
// from the sync pending pool.
func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "processPendingAttsForBlock")
defer span.End()
// Confirm that the pending attestation's missing block arrived and the node processed the block.
if !s.cfg.beaconDB.HasBlock(ctx, bRoot) || !(s.cfg.beaconDB.HasState(ctx, bRoot) || s.cfg.beaconDB.HasStateSummary(ctx, bRoot)) || !s.cfg.chain.InForkchoice(bRoot) {
return fmt.Errorf("could not process unknown block root %#x", bRoot)
}
// Before a node processes pending attestations queue, it verifies
// the attestations in the queue are still valid. Attestations will
// be deleted from the queue if invalid (i.e. getting stalled from falling too many slots behind).
s.validatePendingAtts(ctx, s.cfg.clock.CurrentSlot())
s.pendingAttsLock.RLock()
roots := make([][32]byte, 0, len(s.blkRootToPendingAtts))
for br := range s.blkRootToPendingAtts {
roots = append(roots, br)
}
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
var pendingRoots [][32]byte
if len(attestations) > 0 {
s.processAttestations(ctx, attestations)
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
}).Debug("Verified and saved pending attestations to pool")
}
randGen := rand.NewGenerator()
for _, bRoot := range roots {
s.pendingAttsLock.RLock()
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
// has the pending attestation's missing block arrived and the node processed block yet?
if s.cfg.beaconDB.HasBlock(ctx, bRoot) && (s.cfg.beaconDB.HasState(ctx, bRoot) || s.cfg.beaconDB.HasStateSummary(ctx, bRoot)) && s.cfg.chain.InForkchoice(bRoot) {
s.processAttestations(ctx, attestations)
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
}).Debug("Verified and saved pending attestations to pool")
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
s.pendingAttsLock.Lock()
delete(s.blkRootToPendingAtts, bRoot)
s.pendingAttsLock.Unlock()
} else {
s.pendingQueueLock.RLock()
seen := s.seenPendingBlocks[bRoot]
s.pendingQueueLock.RUnlock()
if !seen {
pendingRoots = append(pendingRoots, bRoot)
}
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
s.pendingAttsLock.Lock()
delete(s.blkRootToPendingAtts, bRoot)
pendingRoots := make([][32]byte, 0, len(s.blkRootToPendingAtts))
s.pendingQueueLock.RLock()
for r := range s.blkRootToPendingAtts {
if !s.seenPendingBlocks[r] {
pendingRoots = append(pendingRoots, r)
}
}
s.pendingQueueLock.RUnlock()
s.pendingAttsLock.Unlock()
// Request the blocks for the pending attestations that could not be processed.
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}

View File

@@ -53,16 +53,47 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
p1.Peers().SetConnectionState(p2.PeerID(), peers.Connected)
p1.Peers().SetChainState(p2.PeerID(), &ethpb.StatusV2{})
chain := &mock.ChainService{Genesis: prysmTime.Now(), FinalizedCheckPoint: &ethpb.Checkpoint{}}
// Create and save block 'A' to DB
blockA := util.NewBeaconBlock()
util.SaveBlock(t, t.Context(), db, blockA)
rootA, err := blockA.Block.HashTreeRoot()
require.NoError(t, err)
// Save state for block 'A'
stateA, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, db.SaveState(t.Context(), stateA, rootA))
// Setup chain service with block 'A' in forkchoice
chain := &mock.ChainService{
Genesis: prysmTime.Now(),
FinalizedCheckPoint: &ethpb.Checkpoint{},
// NotFinalized: false means InForkchoice returns true
}
r := &Service{
cfg: &config{p2p: p1, beaconDB: db, chain: chain, clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot)},
blkRootToPendingAtts: make(map[[32]byte][]any),
seenPendingBlocks: make(map[[32]byte]bool),
chainStarted: abool.New(),
}
a := &ethpb.Attestation{Data: &ethpb.AttestationData{Target: &ethpb.Checkpoint{Root: make([]byte, 32)}}}
r.blkRootToPendingAtts[[32]byte{'A'}] = []any{a}
require.NoError(t, r.processPendingAtts(t.Context()))
// Add pending attestations for OTHER block roots (not block A)
// These are blocks we don't have yet, so they should be requested
attB := &ethpb.Attestation{Data: &ethpb.AttestationData{
BeaconBlockRoot: bytesutil.PadTo([]byte{'B'}, 32),
Target: &ethpb.Checkpoint{Root: make([]byte, 32)},
}}
attC := &ethpb.Attestation{Data: &ethpb.AttestationData{
BeaconBlockRoot: bytesutil.PadTo([]byte{'C'}, 32),
Target: &ethpb.Checkpoint{Root: make([]byte, 32)},
}}
r.blkRootToPendingAtts[[32]byte{'B'}] = []any{attB}
r.blkRootToPendingAtts[[32]byte{'C'}] = []any{attC}
// Process block A (which exists and has no pending attestations)
// This should skip processing attestations for A and request blocks B and C
require.NoError(t, r.processPendingAttsForBlock(t.Context(), rootA))
require.LogsContain(t, hook, "Requesting block by root")
}
@@ -141,7 +172,7 @@ func TestProcessPendingAtts_HasBlockSaveUnaggregatedAtt(t *testing.T) {
require.NoError(t, r.cfg.beaconDB.SaveState(t.Context(), s, root))
r.blkRootToPendingAtts[root] = []any{att}
require.NoError(t, r.processPendingAtts(t.Context()))
require.NoError(t, r.processPendingAttsForBlock(t.Context(), root))
var wg sync.WaitGroup
wg.Add(1)
@@ -235,7 +266,7 @@ func TestProcessPendingAtts_HasBlockSaveUnaggregatedAttElectra(t *testing.T) {
require.NoError(t, r.cfg.beaconDB.SaveState(t.Context(), s, root))
r.blkRootToPendingAtts[root] = []any{att}
require.NoError(t, r.processPendingAtts(t.Context()))
require.NoError(t, r.processPendingAttsForBlock(t.Context(), root))
var wg sync.WaitGroup
wg.Add(1)
go func() {
@@ -360,7 +391,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAttElectra_VerifyAlreadySeen
r.blkRootToPendingAtts[root] = []any{
att,
}
require.NoError(t, r.processPendingAtts(t.Context()))
require.NoError(t, r.processPendingAttsForBlock(t.Context(), root))
// Verify that the event feed receives the expected attestation.
var wg sync.WaitGroup
@@ -471,7 +502,7 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
}
s.blkRootToPendingAtts[r32] = []any{&ethpb.SignedAggregateAttestationAndProof{Message: a, Signature: make([]byte, fieldparams.BLSSignatureLength)}}
require.NoError(t, s.processPendingAtts(t.Context()))
require.NoError(t, s.processPendingAttsForBlock(t.Context(), r32))
assert.Equal(t, false, p2p.BroadcastCalled.Load(), "Broadcasted bad aggregate")
@@ -510,7 +541,7 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
require.NoError(t, err)
s.blkRootToPendingAtts[r32] = []any{&ethpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}}
require.NoError(t, s.processPendingAtts(t.Context()))
require.NoError(t, s.processPendingAttsForBlock(t.Context(), r32))
assert.Equal(t, true, p2p.BroadcastCalled.Load(), "The good aggregate was not broadcasted")
@@ -601,7 +632,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
require.NoError(t, r.cfg.beaconDB.SaveState(t.Context(), s, root))
r.blkRootToPendingAtts[root] = []any{&ethpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}}
require.NoError(t, r.processPendingAtts(t.Context()))
require.NoError(t, r.processPendingAttsForBlock(t.Context(), root))
assert.Equal(t, 1, len(r.cfg.attPool.AggregatedAttestations()), "Did not save aggregated att")
assert.DeepEqual(t, att, r.cfg.attPool.AggregatedAttestations()[0], "Incorrect saved att")
@@ -696,7 +727,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAttElectra(t *testing.T) {
require.NoError(t, r.cfg.beaconDB.SaveState(t.Context(), s, root))
r.blkRootToPendingAtts[root] = []any{&ethpb.SignedAggregateAttestationAndProofElectra{Message: aggregateAndProof, Signature: aggreSig}}
require.NoError(t, r.processPendingAtts(t.Context()))
require.NoError(t, r.processPendingAttsForBlock(t.Context(), root))
assert.Equal(t, 1, len(r.cfg.attPool.AggregatedAttestations()), "Did not save aggregated att")
assert.DeepEqual(t, att, r.cfg.attPool.AggregatedAttestations()[0], "Incorrect saved att")
@@ -780,8 +811,8 @@ func TestProcessPendingAtts_BlockNotInForkChoice(t *testing.T) {
// Add pending attestation
r.blkRootToPendingAtts[root] = []any{&ethpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof}}
// Process pending attestations - should not process because block is not in fork choice
require.NoError(t, r.processPendingAtts(t.Context()))
// Process pending attestations - should return error because block is not in fork choice
require.ErrorContains(t, "could not process unknown block root", r.processPendingAttsForBlock(t.Context(), root))
// Verify attestations were not processed (should still be pending)
assert.Equal(t, 1, len(r.blkRootToPendingAtts[root]), "Attestations should still be pending")

View File

@@ -97,7 +97,7 @@ func (s *Service) requestAndSaveMissingDataColumnSidecars(blks []blocks.ROBlock)
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount()
if err != nil {
return errors.Wrap(err, "fetch custody group count from peer")
return errors.Wrap(err, "custody group count")
}
samplingSize := max(custodyGroupCount, samplesPerSlot)

View File

@@ -274,7 +274,6 @@ func (s *Service) Start() {
s.cfg.p2p.AddPingMethod(s.sendPingRequest)
s.processPendingBlocksQueue()
s.runPendingAttsQueue()
s.maintainPeerStatuses()
if params.FuluEnabled() {

View File

@@ -44,6 +44,11 @@ type wrappedVal func(context.Context, peer.ID, *pubsub.Message) (pubsub.Validati
// subHandler represents handler for a given subscription.
type subHandler func(context.Context, proto.Message) error
// noopHandler is used for subscriptions that do not require anything to be done.
var noopHandler subHandler = func(ctx context.Context, msg proto.Message) error {
return nil
}
// subscribeParameters holds the parameters that are needed to construct a set of subscriptions topics for a given
// set of gossipsub subnets.
type subscribeParameters struct {
@@ -251,7 +256,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.validateLightClientOptimisticUpdate,
s.lightClientOptimisticUpdateSubscriber,
noopHandler,
digest,
)
})
@@ -259,7 +264,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.validateLightClientFinalityUpdate,
s.lightClientFinalityUpdateSubscriber,
noopHandler,
digest,
)
})

View File

@@ -68,7 +68,10 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
}
return err
}
return err
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
return errors.Wrap(err, "process pending atts for block")
}
return nil
}
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,

View File

@@ -1,66 +0,0 @@
package sync
import (
"context"
"fmt"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
lightclientTypes "github.com/OffchainLabs/prysm/v6/consensus-types/light-client"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
func (s *Service) lightClientOptimisticUpdateSubscriber(_ context.Context, msg proto.Message) error {
update, err := lightclientTypes.NewWrappedOptimisticUpdate(msg)
if err != nil {
return err
}
attestedHeaderRoot, err := update.AttestedHeader().Beacon().HashTreeRoot()
if err != nil {
return err
}
log.WithFields(logrus.Fields{
"attestedSlot": fmt.Sprintf("%d", update.AttestedHeader().Beacon().Slot),
"signatureSlot": fmt.Sprintf("%d", update.SignatureSlot()),
"attestedHeaderRoot": fmt.Sprintf("%x", attestedHeaderRoot),
}).Debug("Saving newly received light client optimistic update.")
s.lcStore.SetLastOptimisticUpdate(update, false)
s.cfg.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.LightClientOptimisticUpdate,
Data: update,
})
return nil
}
func (s *Service) lightClientFinalityUpdateSubscriber(_ context.Context, msg proto.Message) error {
update, err := lightclientTypes.NewWrappedFinalityUpdate(msg)
if err != nil {
return err
}
attestedHeaderRoot, err := update.AttestedHeader().Beacon().HashTreeRoot()
if err != nil {
return err
}
log.WithFields(logrus.Fields{
"attestedSlot": fmt.Sprintf("%d", update.AttestedHeader().Beacon().Slot),
"signatureSlot": fmt.Sprintf("%d", update.SignatureSlot()),
"attestedHeaderRoot": fmt.Sprintf("%x", attestedHeaderRoot),
}).Debug("Saving newly received light client finality update.")
s.lcStore.SetLastFinalityUpdate(update, false)
s.cfg.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.LightClientFinalityUpdate,
Data: update,
})
return nil
}

View File

@@ -9,12 +9,10 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/async/abool"
"github.com/OffchainLabs/prysm/v6/async/event"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing"
db "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/light-client"
"github.com/OffchainLabs/prysm/v6/beacon-chain/operations/slashings"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
@@ -28,7 +26,6 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
@@ -676,143 +673,3 @@ func createPeer(t *testing.T, topics ...string) *p2ptest.TestP2P {
}
return p
}
func TestSubscribe_ReceivesLCOptimisticUpdate(t *testing.T) {
origNC := params.BeaconConfig()
// restore network config after test completes
defer func() {
params.OverrideBeaconConfig(origNC)
}()
params.SetupTestConfigCleanup(t)
p2pService := p2ptest.NewTestP2P(t)
ctx := t.Context()
cfg := params.BeaconConfig().Copy()
cfg.AltairForkEpoch = 1
cfg.ForkVersionSchedule[[4]byte{1, 0, 0, 0}] = 1
params.OverrideBeaconConfig(cfg)
secondsPerSlot := int(params.BeaconConfig().SecondsPerSlot)
slotIntervals := int(params.BeaconConfig().IntervalsPerSlot)
slotsPerEpoch := int(params.BeaconConfig().SlotsPerEpoch)
genesisDrift := slotsPerEpoch*secondsPerSlot + 2*secondsPerSlot + secondsPerSlot/slotIntervals
chainService := &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Unix(time.Now().Unix()-int64(genesisDrift), 0),
}
d := db.SetupDB(t)
lcStore := lightClient.NewLightClientStore(&p2ptest.FakeP2P{}, new(event.Feed), d)
r := Service{
ctx: ctx,
cfg: &config{
p2p: p2pService,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
beaconDB: d,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: lcStore,
subHandler: newSubTopicHandler(),
}
markInitSyncComplete(t, &r)
topic := p2p.LightClientOptimisticUpdateTopicFormat
var wg sync.WaitGroup
wg.Add(1)
var err error
p2pService.Digest, err = r.currentForkDigest()
require.NoError(t, err)
r.subscribe(topic, r.validateLightClientOptimisticUpdate, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.lightClientOptimisticUpdateSubscriber(ctx, msg))
wg.Done()
return nil
}, p2pService.Digest)
r.markForChainStart()
l := util.NewTestLightClient(t, version.Altair, util.WithSupermajority(0))
update, err := lightClient.NewLightClientOptimisticUpdateFromBeaconState(l.Ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock)
require.NoError(t, err, "Error generating light client optimistic update")
p2pService.ReceivePubSub(topic, update.Proto())
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
}
u := r.lcStore.LastOptimisticUpdate()
assert.DeepEqual(t, update.Proto(), u.Proto())
}
func TestSubscribe_ReceivesLCFinalityUpdate(t *testing.T) {
origNC := params.BeaconConfig()
// restore network config after test completes
defer func() {
params.OverrideBeaconConfig(origNC)
}()
params.SetupTestConfigCleanup(t)
p2pService := p2ptest.NewTestP2P(t)
ctx := t.Context()
cfg := params.BeaconConfig().Copy()
cfg.AltairForkEpoch = 1
cfg.ForkVersionSchedule[[4]byte{1, 0, 0, 0}] = 1
params.OverrideBeaconConfig(cfg)
secondsPerSlot := int(params.BeaconConfig().SecondsPerSlot)
slotIntervals := int(params.BeaconConfig().IntervalsPerSlot)
slotsPerEpoch := int(params.BeaconConfig().SlotsPerEpoch)
genesisDrift := slotsPerEpoch*secondsPerSlot + 2*secondsPerSlot + secondsPerSlot/slotIntervals
chainService := &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Unix(time.Now().Unix()-int64(genesisDrift), 0),
}
d := db.SetupDB(t)
lcStore := lightClient.NewLightClientStore(&p2ptest.FakeP2P{}, new(event.Feed), d)
r := Service{
ctx: ctx,
cfg: &config{
p2p: p2pService,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
beaconDB: d,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: lcStore,
subHandler: newSubTopicHandler(),
}
markInitSyncComplete(t, &r)
topic := p2p.LightClientFinalityUpdateTopicFormat
var wg sync.WaitGroup
wg.Add(1)
var err error
p2pService.Digest, err = r.currentForkDigest()
require.NoError(t, err)
r.subscribe(topic, r.validateLightClientFinalityUpdate, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.lightClientFinalityUpdateSubscriber(ctx, msg))
wg.Done()
return nil
}, p2pService.Digest)
r.markForChainStart()
l := util.NewTestLightClient(t, version.Altair, util.WithSupermajority(0))
update, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(l.Ctx, l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
require.NoError(t, err, "Error generating light client finality update")
p2pService.ReceivePubSub(topic, update.Proto())
if util.WaitTimeout(&wg, time.Second) {
t.Fatal("Did not receive PubSub in 1 second")
}
u := r.lcStore.LastFinalityUpdate()
assert.DeepEqual(t, update.Proto(), u.Proto())
}

View File

@@ -429,12 +429,12 @@ func TestService_ValidateBlsToExecutionChange(t *testing.T) {
svc := NewService(ctx, append(opts, tt.svcopts...)...)
markInitSyncComplete(t, svc)
svc, tt.args.topic = tt.setupSvc(svc, tt.args.msg, tt.args.topic)
go svc.Start()
if tt.clock == nil {
tt.clock = startup.NewClock(time.Now(), [32]byte{})
}
require.NoError(t, cw.SetClock(tt.clock))
svc.verifierWaiter = verification.NewInitializerWaiter(cw, chainService.ForkChoiceStore, svc.cfg.stateGen)
go svc.Start()
marshalledObj, err := tt.args.msg.MarshalSSZ()
assert.NoError(t, err)

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"time"
lightclient "github.com/OffchainLabs/prysm/v6/beacon-chain/light-client"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
@@ -14,6 +13,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
func (s *Service) validateLightClientOptimisticUpdate(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
@@ -31,6 +31,12 @@ func (s *Service) validateLightClientOptimisticUpdate(ctx context.Context, pid p
_, span := trace.StartSpan(ctx, "sync.validateLightClientOptimisticUpdate")
defer span.End()
currentUpdate := s.lcStore.LastOptimisticUpdate()
if currentUpdate == nil {
log.Debug("No existing optimistic update to compare against. Ignoring.")
return pubsub.ValidationIgnore, nil
}
m, err := s.decodePubsubMessage(msg)
if err != nil {
tracing.AnnotateError(span, err)
@@ -64,12 +70,12 @@ func (s *Service) validateLightClientOptimisticUpdate(ctx context.Context, pid p
return pubsub.ValidationIgnore, nil
}
if !lightclient.IsBetterOptimisticUpdate(newUpdate, s.lcStore.LastOptimisticUpdate()) {
if !proto.Equal(newUpdate.Proto(), currentUpdate.Proto()) {
log.WithFields(logrus.Fields{
"attestedSlot": fmt.Sprintf("%d", newUpdate.AttestedHeader().Beacon().Slot),
"signatureSlot": fmt.Sprintf("%d", newUpdate.SignatureSlot()),
"attestedHeaderRoot": fmt.Sprintf("%x", attestedHeaderRoot),
}).Debug("Newly received light client optimistic update ignored. current update is better.")
}).Debug("Received light client optimistic update is different from the local one. Ignoring.")
return pubsub.ValidationIgnore, nil
}
@@ -98,6 +104,12 @@ func (s *Service) validateLightClientFinalityUpdate(ctx context.Context, pid pee
_, span := trace.StartSpan(ctx, "sync.validateLightClientFinalityUpdate")
defer span.End()
currentUpdate := s.lcStore.LastFinalityUpdate()
if currentUpdate == nil {
log.Debug("No existing finality update to compare against. Ignoring.")
return pubsub.ValidationIgnore, nil
}
m, err := s.decodePubsubMessage(msg)
if err != nil {
tracing.AnnotateError(span, err)
@@ -131,12 +143,12 @@ func (s *Service) validateLightClientFinalityUpdate(ctx context.Context, pid pee
return pubsub.ValidationIgnore, nil
}
if !lightclient.IsFinalityUpdateValidForBroadcast(newUpdate, s.lcStore.LastFinalityUpdate()) {
if !proto.Equal(newUpdate.Proto(), currentUpdate.Proto()) {
log.WithFields(logrus.Fields{
"attestedSlot": fmt.Sprintf("%d", newUpdate.AttestedHeader().Beacon().Slot),
"signatureSlot": fmt.Sprintf("%d", newUpdate.SignatureSlot()),
"attestedHeaderRoot": fmt.Sprintf("%x", attestedHeaderRoot),
}).Debug("Newly received light client finality update ignored. current update is better.")
}).Debug("Received light client finality update is different from the local one. ignoring.")
return pubsub.ValidationIgnore, nil
}

View File

@@ -26,9 +26,13 @@ func TestValidateLightClientOptimisticUpdate_NilMessageOrTopic(t *testing.T) {
params.SetupTestConfigCleanup(t)
ctx := t.Context()
p := p2ptest.NewTestP2P(t)
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}}}
lcStore := lightClient.NewLightClientStore(&p2ptest.FakeP2P{}, new(event.Feed), testDB.SetupDB(t))
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}}, lcStore: lcStore}
mockUpdate, err := util.MockOptimisticUpdate()
require.NoError(t, err)
s.lcStore.SetLastOptimisticUpdate(mockUpdate, false)
_, err := s.validateLightClientOptimisticUpdate(ctx, "", nil)
_, err = s.validateLightClientOptimisticUpdate(ctx, "", nil)
require.ErrorIs(t, err, errNilPubsubMessage)
_, err = s.validateLightClientOptimisticUpdate(ctx, "", &pubsub.Message{Message: &pb.Message{}})
@@ -72,27 +76,27 @@ func TestValidateLightClientOptimisticUpdate(t *testing.T) {
name: "no previous update",
oldUpdateOptions: nil,
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationAccept,
expectedResult: pubsub.ValidationIgnore,
},
{
name: "not enough time passed",
genesisDrift: -secondsPerSlot / slotIntervals,
oldUpdateOptions: nil,
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationIgnore,
},
{
name: "new update has no age advantage",
oldUpdateOptions: []util.LightClientOption{},
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationIgnore,
},
{
name: "new update is better - younger",
name: "new update is the same",
oldUpdateOptions: []util.LightClientOption{},
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationAccept,
},
{
name: "new update is different",
genesisDrift: secondsPerSlot,
oldUpdateOptions: []util.LightClientOption{},
newUpdateOptions: []util.LightClientOption{util.WithIncreasedAttestedSlot(1)},
expectedResult: pubsub.ValidationAccept,
expectedResult: pubsub.ValidationIgnore,
},
}
@@ -149,9 +153,13 @@ func TestValidateLightClientFinalityUpdate_NilMessageOrTopic(t *testing.T) {
params.SetupTestConfigCleanup(t)
ctx := t.Context()
p := p2ptest.NewTestP2P(t)
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}}}
lcStore := lightClient.NewLightClientStore(&p2ptest.FakeP2P{}, new(event.Feed), testDB.SetupDB(t))
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}}, lcStore: lcStore}
mockUpdate, err := util.MockFinalityUpdate()
require.NoError(t, err)
s.lcStore.SetLastFinalityUpdate(mockUpdate, false)
_, err := s.validateLightClientFinalityUpdate(ctx, "", nil)
_, err = s.validateLightClientFinalityUpdate(ctx, "", nil)
require.ErrorIs(t, err, errNilPubsubMessage)
_, err = s.validateLightClientFinalityUpdate(ctx, "", &pubsub.Message{Message: &pb.Message{}})
@@ -195,44 +203,26 @@ func TestValidateLightClientFinalityUpdate(t *testing.T) {
name: "no previous update",
oldUpdateOptions: nil,
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationAccept,
expectedResult: pubsub.ValidationIgnore,
},
{
name: "not enough time passed",
genesisDrift: -secondsPerSlot / slotIntervals,
oldUpdateOptions: nil,
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationIgnore,
},
{
name: "new update has no advantage",
oldUpdateOptions: []util.LightClientOption{},
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationIgnore,
},
{
name: "new update is better - age",
name: "new update is the same",
oldUpdateOptions: []util.LightClientOption{},
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationAccept,
},
{
name: "new update is different",
genesisDrift: secondsPerSlot,
oldUpdateOptions: []util.LightClientOption{},
newUpdateOptions: []util.LightClientOption{util.WithIncreasedFinalizedSlot(1)},
expectedResult: pubsub.ValidationAccept,
},
{
name: "new update is better - supermajority",
oldUpdateOptions: []util.LightClientOption{},
newUpdateOptions: []util.LightClientOption{util.WithSupermajority(0)},
expectedResult: pubsub.ValidationAccept,
},
{
name: "old update is better - supermajority",
oldUpdateOptions: []util.LightClientOption{util.WithSupermajority(0)},
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationIgnore,
},
{
name: "old update is better - age",
oldUpdateOptions: []util.LightClientOption{util.WithIncreasedAttestedSlot(1)},
newUpdateOptions: []util.LightClientOption{},
expectedResult: pubsub.ValidationIgnore,
},
}

View File

@@ -410,9 +410,9 @@ func TestService_ValidateSyncCommitteeMessage(t *testing.T) {
var clock *startup.Clock
svc, tt.args.topic, clock = tt.setupSvc(svc, tt.args.msg, tt.args.topic)
markInitSyncComplete(t, svc)
go svc.Start()
require.NoError(t, cw.SetClock(clock))
svc.verifierWaiter = verification.NewInitializerWaiter(cw, chainService.ForkChoiceStore, svc.cfg.stateGen)
go svc.Start()
marshalledObj, err := tt.args.msg.MarshalSSZ()
assert.NoError(t, err)

View File

@@ -2,6 +2,7 @@ package verification
import (
"context"
"crypto/sha256"
"fmt"
"strings"
"time"
@@ -524,24 +525,39 @@ func columnErrBuilder(baseErr error) error {
return errors.Wrap(baseErr, errColumnsInvalid.Error())
}
func inclusionProofKey(c blocks.RODataColumn) ([160]byte, error) {
var key [160]byte
if len(c.KzgCommitmentsInclusionProof) != 4 {
// incluseionProofKey computes a unique key based on the KZG commitments,
// the KZG commitments inclusion proof, and the signed block header root.
func inclusionProofKey(c blocks.RODataColumn) ([32]byte, error) {
const (
commsIncProofLen = 4
commsIncProofByteCount = commsIncProofLen * 32
)
if len(c.KzgCommitmentsInclusionProof) != commsIncProofLen {
// This should be already enforced by ssz unmarshaling; still check so we don't panic on array bounds.
return key, columnErrBuilder(ErrSidecarInclusionProofInvalid)
return [32]byte{}, columnErrBuilder(ErrSidecarInclusionProofInvalid)
}
commsByteCount := len(c.KzgCommitments) * fieldparams.KzgCommitmentSize
unhashedKey := make([]byte, 0, commsIncProofByteCount+fieldparams.RootLength+commsByteCount)
// Include the commitments inclusion proof in the key.
for _, proof := range c.KzgCommitmentsInclusionProof {
unhashedKey = append(unhashedKey, proof...)
}
// Include the block root in the key.
root, err := c.SignedBlockHeader.HashTreeRoot()
if err != nil {
return [160]byte{}, columnErrBuilder(errors.Wrap(err, "hash tree root"))
return [32]byte{}, columnErrBuilder(errors.Wrap(err, "hash tree root"))
}
for i := range c.KzgCommitmentsInclusionProof {
if copy(key[32*i:32*i+32], c.KzgCommitmentsInclusionProof[i]) != 32 {
return key, columnErrBuilder(ErrSidecarInclusionProofInvalid)
}
unhashedKey = append(unhashedKey, root[:]...)
// Include the commitments in the key.
for _, commitment := range c.KzgCommitments {
unhashedKey = append(unhashedKey, commitment...)
}
copy(key[128:], root[:])
return key, nil
return sha256.Sum256(unhashedKey), nil
}

View File

@@ -0,0 +1,4 @@
### Changed
- Compare received LC messages over gossipsub with locally computed ones before forwarding. Also no longer save updates
from gossipsub, just validate and forward.

View File

@@ -0,0 +1,3 @@
### Fixed
- adding in improvements to getduties v2, replaces helpers.PrecomputeCommittees() ( exepensive ) with CommitteeAssignments

View File

@@ -0,0 +1,3 @@
### Ignored
- `requestAndSaveMissingDataColumnSidecars`: Fix log

View File

@@ -0,0 +1,3 @@
### Fixed
- `inclusionProofKey`: Include the commitments in the key.

View File

@@ -0,0 +1,2 @@
### Changed
- `c-kzg-4844`: Update from `v2.1.1` to `v2.1.5`

View File

@@ -0,0 +1,3 @@
### Fixed
- Avoid unnecessary calls to `ExitInformation()`.

View File

@@ -0,0 +1,3 @@
### Ignored
- Fix races in tests that cause nil panics.

View File

@@ -0,0 +1,3 @@
### Changed
- Process pending attestations as soon as the block arrives.

View File

@@ -776,8 +776,8 @@ def prysm_deps():
importpath = "github.com/ethereum/c-kzg-4844/v2",
patch_args = ["-p1"],
patches = ["//third_party:com_github_ethereum_c_kzg_4844.patch"],
sum = "h1:KhzBVjmURsfr1+S3k/VE35T02+AW2qU9t9gr4R6YpSo=",
version = "v2.1.1",
sum = "h1:aVtoLK5xwJ6c5RiqO8g8ptJ5KU+2Hdquf6G3aXiHh5s=",
version = "v2.1.5",
)
go_repository(
name = "com_github_ethereum_go_ethereum",
@@ -3318,8 +3318,8 @@ def prysm_deps():
importpath = "github.com/supranational/blst",
patch_args = ["-p1"],
patches = ["//third_party:com_github_supranational_blst.patch"],
sum = "h1:xNMoHRJOTwMn63ip6qoWJ2Ymgvj7E2b9jY2FAwY+qRo=",
version = "v0.3.14",
sum = "h1:nbdqkIGOGfUAD54q1s2YBcBz/WcsxCO9HUQ4aGV5hUw=",
version = "v0.3.16-0.20250831170142-f48500c1fdbe",
)
go_repository(
name = "com_github_syndtr_goleveldb",

4
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/dgraph-io/ristretto/v2 v2.2.0
github.com/dustin/go-humanize v1.0.1
github.com/emicklei/dot v0.11.0
github.com/ethereum/c-kzg-4844/v2 v2.1.1
github.com/ethereum/c-kzg-4844/v2 v2.1.5
github.com/ethereum/go-ethereum v1.15.9
github.com/fsnotify/fsnotify v1.6.0
github.com/ghodss/yaml v1.0.0
@@ -70,7 +70,7 @@ require (
github.com/spf13/afero v1.10.0
github.com/status-im/keycard-go v0.2.0
github.com/stretchr/testify v1.10.0
github.com/supranational/blst v0.3.14
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe
github.com/thomaso-mirodin/intmath v0.0.0-20160323211736-5dc6d854e46e
github.com/trailofbits/go-mutexasserts v0.0.0-20250212181730-4c2b8e9e784b
github.com/tyler-smith/go-bip39 v1.1.0

8
go.sum
View File

@@ -234,8 +234,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA=
github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/c-kzg-4844/v2 v2.1.1 h1:KhzBVjmURsfr1+S3k/VE35T02+AW2qU9t9gr4R6YpSo=
github.com/ethereum/c-kzg-4844/v2 v2.1.1/go.mod h1:TC48kOKjJKPbN7C++qIgt0TJzZ70QznYR7Ob+WXl57E=
github.com/ethereum/c-kzg-4844/v2 v2.1.5 h1:aVtoLK5xwJ6c5RiqO8g8ptJ5KU+2Hdquf6G3aXiHh5s=
github.com/ethereum/c-kzg-4844/v2 v2.1.5/go.mod h1:u59hRTTah4Co6i9fDWtiCjTrblJv0UwsqZKCc0GfgUs=
github.com/ethereum/go-ethereum v1.15.9 h1:bRra1zi+/q+qyXZ6fylZOrlaF8kDdnlTtzNTmNHfX+g=
github.com/ethereum/go-ethereum v1.15.9/go.mod h1:+S9k+jFzlyVTNcYGvqFhzN/SFhI6vA+aOY4T5tLSPL0=
github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8=
@@ -1021,8 +1021,8 @@ github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/supranational/blst v0.3.14 h1:xNMoHRJOTwMn63ip6qoWJ2Ymgvj7E2b9jY2FAwY+qRo=
github.com/supranational/blst v0.3.14/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe h1:nbdqkIGOGfUAD54q1s2YBcBz/WcsxCO9HUQ4aGV5hUw=
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=

View File

@@ -230,7 +230,7 @@ func (v *ValidatorService) Start() {
distributed: v.distributed,
disableDutiesPolling: v.disableDutiesPolling,
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
eventsChannel: make(chan *eventClient.Event, 1),
eventsChannel: make(chan *eventClient.Event, 100), // 100 gives some room if the validator is slow at processing the events
}
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)