Compare commits

...

5 Commits

Author SHA1 Message Date
Manu NALEPA
b49c35f37a Add TTL cache for aggregated attestations
Add a TTL cache that stores attestations when they are deleted from
the aggregatedAtt or blockAtt caches. This allows HasAggregatedAttestation
to still find recently pruned attestations, preventing duplicate processing.

The TTL cache is populated on delete (not save) and is pruned along with
the other attestation caches. A metric tracks when the TTL cache prevents
duplicate attestation processing.
2026-02-18 14:03:33 +01:00
Manu NALEPA
fc16bfb8f6 Refactor attestation cache lookup functions
Extract hasAggregatedAtt and hasBlockAtt helper functions from
HasAggregatedAttestation for better code organization and reuse.
Also improve code style with clearer variable names and error wrapping.
2026-02-18 13:30:21 +01:00
terence
8ee28394ab Add Gloas attestation committee index validation (#16359)
Adds Gloas fork attestation validation rules for gossip processing. This
implements the new committee index validation requirements introduced in
the Gloas fork.

## Changes
- Uses attestation epoch to determine if Gloas rules apply
- **Committee index validation**: 
  - Committee index must be < 2 (0 or 1 only)
- Same-slot attestations (where `attestation.data.slot == block.slot`)
must use committee index 0
  - Different-slot attestations can use either committee index 0 or 1
2026-02-17 18:00:32 +00:00
kasey
b31e2ffe51 avoid copying the finalized state when computing cgc (#16355)
Reviewing some (unrelated) sync code today I noticed that we are using a
stategen accessor for the finalized state which copies the entire state
object to look up validator balances to compute the custody_group_count.
This excess memory allocation is likely causing GC pressure and
increasing memory utilization.

This PR avoids state copying for this purpose by making the following
changes:
- Adds a new method to the `ReadOnlyBalances` state interface:
`EffectiveBalances([]primitives.ValidatorIndex) (uint64, []uint64,
error)`. This method computes returns the sum of the effective balances
of the given list of validator indices, a list with the individual
effective balance of each requested index (where the i-th element in the
return corresponds to the i-th element of the parameter), and an error -
which is necessary due to index bounds checks and quirks of multi-value
slice that can apparently result in the state being unusable for such
lookups if not correctly initialized.
- Adds a new method to the stategen interface
`FinalizedReadOnlyBalances`, which returns the finalized state asserted
to the `ReadOnlyBalances` interface.
- Switches the peerdas code to use the sum given by `EffectiveBalances`.

There was some existing nil checking code in the peerdas package that I
didn't want to modify, so I added a new compound interface in stategen
to allow the returned state to also expose the `IsNil` method.

fixes https://github.com/OffchainLabs/prysm/issues/16354

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2026-02-17 08:50:58 +00:00
terence
22e77add54 Refactor function ProcessExecutionPayload with ApplyExecutionPayload (#16356)
This PR refactors `ProcessExecutionPayload` with `ApplyExecutionPayload`
so caller can use Apply method to calculate post state root. Note that
validations are not required for Apply function. We really need the
state mutation lines that's:
```
1. Ensure latest_block_header.state_root is set (if zero, set it to the pre‑payload HashTreeRoot)...
2. processExecutionRequests()
3. QueueBuilderPayment()
4. SetExecutionPayloadAvailability(state.Slot(), true)
5. SetLatestBlockHash(payload.BlockHash())
```
I decided to keep them there because a. it's cheap b. it makes refactor
cleaner to reason c. API/caller may want to validate envelope and bid
consistency (ex: beacon api has option to validate consensus)
2026-02-13 15:51:22 +00:00
21 changed files with 488 additions and 56 deletions

View File

@@ -112,6 +112,34 @@ func ProcessExecutionPayload(
return errors.Wrap(err, "signature verification failed")
}
envelope, err := signedEnvelope.Envelope()
if err != nil {
return errors.Wrap(err, "could not get envelope from signed envelope")
}
if err := ApplyExecutionPayload(ctx, st, envelope); err != nil {
return err
}
r, err := st.HashTreeRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get hash tree root")
}
if r != envelope.StateRoot() {
return fmt.Errorf("state root mismatch: expected %#x, got %#x", envelope.StateRoot(), r)
}
return nil
}
// ApplyExecutionPayload applies the execution payload envelope to the state and performs the same
// consistency checks as the full processing path. This keeps the post-payload state root computation
// on a shared code path, even though some bid/payload checks are not strictly required for the root itself.
func ApplyExecutionPayload(
ctx context.Context,
st state.BeaconState,
envelope interfaces.ROExecutionPayloadEnvelope,
) error {
latestHeader := st.LatestBlockHeader()
if len(latestHeader.StateRoot) == 0 || bytes.Equal(latestHeader.StateRoot, make([]byte, 32)) {
previousStateRoot, err := st.HashTreeRoot(ctx)
@@ -128,10 +156,6 @@ func ProcessExecutionPayload(
if err != nil {
return errors.Wrap(err, "could not compute block header root")
}
envelope, err := signedEnvelope.Envelope()
if err != nil {
return errors.Wrap(err, "could not get envelope from signed envelope")
}
beaconBlockRoot := envelope.BeaconBlockRoot()
if !bytes.Equal(beaconBlockRoot[:], blockHeaderRoot[:]) {
@@ -217,14 +241,6 @@ func ProcessExecutionPayload(
return errors.Wrap(err, "could not set latest block hash")
}
r, err := st.HashTreeRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get hash tree root")
}
if r != envelope.StateRoot() {
return fmt.Errorf("state root mismatch: expected %#x, got %#x", envelope.StateRoot(), r)
}
return nil
}

View File

@@ -73,23 +73,22 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
totalNodeBalance := uint64(0)
func ValidatorsCustodyRequirement(st beaconState.ReadOnlyBalances, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
cfg := params.BeaconConfig()
idxs := make([]primitives.ValidatorIndex, 0, len(validatorsIndex))
for index := range validatorsIndex {
validator, err := state.ValidatorAtIndexReadOnly(index)
if err != nil {
return 0, errors.Wrapf(err, "validator at index %v", index)
}
totalNodeBalance += validator.EffectiveBalance()
idxs = append(idxs, index)
}
totalBalance, err := st.EffectiveBalanceSum(idxs)
if err != nil {
return 0, errors.Wrap(err, "effective balances")
}
cfg := params.BeaconConfig()
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
validatorCustodyRequirement := cfg.ValidatorCustodyRequirement
balancePerAdditionalCustodyGroup := cfg.BalancePerAdditionalCustodyGroup
count := totalNodeBalance / balancePerAdditionalCustodyGroup
count := totalBalance / balancePerAdditionalCustodyGroup
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroups), nil
}

View File

@@ -7,6 +7,7 @@ go_library(
"block.go",
"kv.go",
"log.go",
"metrics.go",
"seen_bits.go",
"unaggregated.go",
],
@@ -24,6 +25,8 @@ go_library(
"//runtime/version:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"fmt"
"runtime"
"sync"
@@ -273,18 +274,30 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
filtered := make([]ethpb.Att, 0)
for _, a := range attList {
if c, err := att.GetAggregationBits().Contains(a.GetAggregationBits()); err != nil {
return err
} else if !c {
filtered = append(filtered, a)
contains, err := att.GetAggregationBits().Contains(a.GetAggregationBits())
if err != nil {
return fmt.Errorf("aggregation bits contain: %w", err)
}
}
if len(filtered) == 0 {
delete(c.aggregatedAtt, id)
} else {
c.aggregatedAtt[id] = filtered
if contains {
// Insert into TTL cache before removing from aggregated cache
if err := c.insertAggregatedAttTTL(a); err != nil {
return errors.Wrap(err, "could not insert into TTL cache")
}
continue
}
// If the attestation in the cache doesn't contain the bits of the attestation to delete, we keep it in the cache.
filtered = append(filtered, a)
}
if len(filtered) == 0 {
delete(c.aggregatedAtt, id)
return nil
}
c.aggregatedAtt[id] = filtered
return nil
}
@@ -294,32 +307,118 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {
return false, err
}
has, err := c.hasAggregatedAtt(att)
if err != nil {
return false, fmt.Errorf("has aggregated att: %w", err)
}
if has {
return true, nil
}
has, err = c.hasBlockAtt(att)
if err != nil {
return false, fmt.Errorf("has block att: %w", err)
}
if has {
return true, nil
}
has, err = c.hasAggregatedAttTTL(att)
if err != nil {
return false, fmt.Errorf("has aggregated att TTL: %w", err)
}
if has {
savedByTTLCache.Inc()
return true, nil
}
return false, nil
}
// hasAggregatedAtt checks if the attestation bits are contained in the aggregated attestation cache.
func (c *AttCaches) hasAggregatedAtt(att ethpb.Att) (bool, error) {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, errors.Wrap(err, "could not create attestation ID")
return false, fmt.Errorf("could not create attestation ID: %w", err)
}
c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
if atts, ok := c.aggregatedAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
} else if c {
return true, nil
}
cacheAtts, ok := c.aggregatedAtt[id]
if !ok {
return false, nil
}
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return false, fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return true, nil
}
}
return false, nil
}
// hasBlockAtt checks if the attestation bits are contained in the block attestation cache.
func (c *AttCaches) hasBlockAtt(att ethpb.Att) (bool, error) {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, fmt.Errorf("could not create attestation ID: %w", err)
}
c.blockAttLock.RLock()
defer c.blockAttLock.RUnlock()
if atts, ok := c.blockAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
} else if c {
return true, nil
}
cacheAtts, ok := c.blockAtt[id]
if !ok {
return false, nil
}
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return false, fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return true, nil
}
}
return false, nil
}
// hasAggregatedAttTTL checks if the attestation bits are contained in the TTL cache.
func (c *AttCaches) hasAggregatedAttTTL(att ethpb.Att) (bool, error) {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, fmt.Errorf("could not create attestation ID: %w", err)
}
c.aggregatedAttTTLLock.RLock()
defer c.aggregatedAttTTLLock.RUnlock()
cacheAtts, ok := c.aggregatedAttTTL[id]
if !ok {
return false, nil
}
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return false, fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return true, nil
}
}
@@ -332,3 +431,93 @@ func (c *AttCaches) AggregatedAttestationCount() int {
defer c.aggregatedAttLock.RUnlock()
return len(c.aggregatedAtt)
}
// insertAggregatedAttTTL inserts an attestation into the TTL cache.
func (c *AttCaches) insertAggregatedAttTTL(att ethpb.Att) error {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return fmt.Errorf("new ID: %w", err)
}
c.aggregatedAttTTLLock.Lock()
defer c.aggregatedAttTTLLock.Unlock()
cacheAtts, ok := c.aggregatedAttTTL[id]
if !ok {
c.aggregatedAttTTL[id] = []ethpb.Att{att.Clone()}
return nil
}
// Check if attestation is already contained
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return nil
}
}
c.aggregatedAttTTL[id] = append(cacheAtts, att.Clone())
return nil
}
// AggregatedAttestationTTLCount returns the number of keys in the TTL cache.
func (c *AttCaches) AggregatedAttestationTTLCount() int {
c.aggregatedAttTTLLock.RLock()
defer c.aggregatedAttTTLLock.RUnlock()
return len(c.aggregatedAttTTL)
}
// AggregatedAttestationsTTL returns all attestations from the TTL cache.
func (c *AttCaches) AggregatedAttestationsTTL() []ethpb.Att {
c.aggregatedAttTTLLock.RLock()
defer c.aggregatedAttTTLLock.RUnlock()
atts := make([]ethpb.Att, 0)
for _, a := range c.aggregatedAttTTL {
atts = append(atts, a...)
}
return atts
}
// DeleteAggregatedAttestationTTL deletes an attestation from the TTL cache.
func (c *AttCaches) DeleteAggregatedAttestationTTL(att ethpb.Att) error {
if att == nil || att.IsNil() {
return nil
}
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return fmt.Errorf("could not create attestation ID: %w", err)
}
c.aggregatedAttTTLLock.Lock()
defer c.aggregatedAttTTLLock.Unlock()
cacheAtts, ok := c.aggregatedAttTTL[id]
if !ok {
return nil
}
filtered := make([]ethpb.Att, 0)
for _, cacheAtt := range cacheAtts {
contains, err := att.GetAggregationBits().Contains(cacheAtt.GetAggregationBits())
if err != nil {
return fmt.Errorf("aggregation bits contains: %w", err)
}
if !contains {
filtered = append(filtered, cacheAtt)
}
}
if len(filtered) == 0 {
delete(c.aggregatedAttTTL, id)
return nil
}
c.aggregatedAttTTL[id] = filtered
return nil
}

View File

@@ -1,6 +1,8 @@
package kv
import (
"fmt"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1/attestation"
"github.com/pkg/errors"
@@ -63,6 +65,16 @@ func (c *AttCaches) DeleteBlockAttestation(att ethpb.Att) error {
c.blockAttLock.Lock()
defer c.blockAttLock.Unlock()
// Insert all attestations into TTL cache before deleting
if cacheAtts, ok := c.blockAtt[id]; ok {
for _, cacheAtt := range cacheAtts {
if err := c.insertAggregatedAttTTL(cacheAtt); err != nil {
return fmt.Errorf("insert aggregated att TTL: %w", err)
}
}
}
delete(c.blockAtt, id)
return nil

View File

@@ -25,7 +25,11 @@ type AttCaches struct {
forkchoiceAtt *attmap.Attestations
blockAttLock sync.RWMutex
blockAtt map[attestation.Id][]ethpb.Att
seenAtt *cache.Cache
seenAtt *cache.Cache
// TTL cache for aggregated attestations, used for fast lookup in HasAggregatedAttestation.
// Entries expire after 1 epoch.
aggregatedAttTTLLock sync.RWMutex
aggregatedAttTTL map[attestation.Id][]ethpb.Att
}
// NewAttCaches initializes a new attestation pool consists of multiple KV store in cache for
@@ -34,11 +38,12 @@ func NewAttCaches() *AttCaches {
secsInEpoch := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
c := cache.New(2*secsInEpoch*time.Second, 2*secsInEpoch*time.Second)
pool := &AttCaches{
unAggregatedAtt: make(map[attestation.Id]ethpb.Att),
aggregatedAtt: make(map[attestation.Id][]ethpb.Att),
forkchoiceAtt: attmap.New(),
blockAtt: make(map[attestation.Id][]ethpb.Att),
seenAtt: c,
unAggregatedAtt: make(map[attestation.Id]ethpb.Att),
aggregatedAtt: make(map[attestation.Id][]ethpb.Att),
forkchoiceAtt: attmap.New(),
blockAtt: make(map[attestation.Id][]ethpb.Att),
seenAtt: c,
aggregatedAttTTL: make(map[attestation.Id][]ethpb.Att),
}
return pool

View File

@@ -0,0 +1,11 @@
package kv
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var savedByTTLCache = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_saved_by_ttl_cache_total",
Help: "The number of times an attestation was found only in the TTL cache and not in the regular caches.",
})

View File

@@ -30,6 +30,16 @@ var (
Name: "expired_block_atts_total",
Help: "The number of expired and deleted block attestations in the pool.",
})
aggregatedAttsTTLCount = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "aggregated_attestations_ttl_in_pool_total",
Help: "The number of aggregated attestations in the TTL cache.",
},
)
expiredAggregatedAttsTTL = promauto.NewCounter(prometheus.CounterOpts{
Name: "expired_aggregated_atts_ttl_total",
Help: "The number of expired and deleted aggregated attestations from the TTL cache.",
})
attCount = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "attestations_in_pool_total",
@@ -59,6 +69,7 @@ var (
func (s *Service) updateMetrics() {
aggregatedAttsCount.Set(float64(s.cfg.Pool.AggregatedAttestationCount()))
unaggregatedAttsCount.Set(float64(s.cfg.Pool.UnaggregatedAttestationCount()))
aggregatedAttsTTLCount.Set(float64(s.cfg.Pool.AggregatedAttestationTTLCount()))
}
func (s *Service) updateMetricsExperimental(numExpired uint64) {

View File

@@ -68,6 +68,21 @@ func (*PoolMock) AggregatedAttestationCount() int {
panic("implement me")
}
// AggregatedAttestationsTTL --
func (*PoolMock) AggregatedAttestationsTTL() []ethpb.Att {
panic("implement me")
}
// DeleteAggregatedAttestationTTL --
func (*PoolMock) DeleteAggregatedAttestationTTL(_ ethpb.Att) error {
panic("implement me")
}
// AggregatedAttestationTTLCount --
func (*PoolMock) AggregatedAttestationTTLCount() int {
panic("implement me")
}
// SaveUnaggregatedAttestation --
func (*PoolMock) SaveUnaggregatedAttestation(_ ethpb.Att) error {
panic("implement me")

View File

@@ -23,6 +23,10 @@ type Pool interface {
DeleteAggregatedAttestation(att ethpb.Att) error
HasAggregatedAttestation(att ethpb.Att) (bool, error)
AggregatedAttestationCount() int
// TTL cache methods for aggregated attestations
AggregatedAttestationsTTL() []ethpb.Att
DeleteAggregatedAttestationTTL(att ethpb.Att) error
AggregatedAttestationTTLCount() int
// For unaggregated attestations.
SaveUnaggregatedAttestation(att ethpb.Att) error
SaveUnaggregatedAttestations(atts []ethpb.Att) error

View File

@@ -81,6 +81,16 @@ func (s *Service) pruneExpiredAtts() {
expiredBlockAtts.Inc()
}
}
ttlAtts := s.cfg.Pool.AggregatedAttestationsTTL()
for _, att := range ttlAtts {
if s.expired(att.GetData().Slot) {
if err := s.cfg.Pool.DeleteAggregatedAttestationTTL(att); err != nil {
log.WithError(err).Error("Could not delete expired TTL attestation")
}
expiredAggregatedAttsTTL.Inc()
}
}
}
// Return true if the input slot has been expired.

View File

@@ -152,6 +152,7 @@ type ReadOnlyBalances interface {
Balances() []uint64
BalanceAtIndex(idx primitives.ValidatorIndex) (uint64, error)
BalancesLength() int
EffectiveBalanceSum([]primitives.ValidatorIndex) (uint64, error)
}
// ReadOnlyCheckpoint defines a struct which only has read access to checkpoint methods.

View File

@@ -8,6 +8,7 @@ import (
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/pkg/errors"
)
// Validators participating in consensus on the beacon chain.
@@ -80,6 +81,25 @@ func (b *BeaconState) ValidatorAtIndex(idx primitives.ValidatorIndex) (*ethpb.Va
return b.validatorAtIndex(idx)
}
// EffectiveBalances returns the sum of the effective balances of the given list of validator indices, the eb of each given validator, or an
// error if one of the indices is out of bounds, or the state wasn't correctly initialized.
func (b *BeaconState) EffectiveBalanceSum(idxs []primitives.ValidatorIndex) (uint64, error) {
b.lock.RLock()
defer b.lock.RUnlock()
var sum uint64
for i := range idxs {
if b.validatorsMultiValue == nil {
return 0, errors.Wrap(state.ErrNilValidatorsInState, "nil validators multi-value slice")
}
v, err := b.validatorsMultiValue.At(b, uint64(idxs[i]))
if err != nil {
return 0, errors.Wrap(err, "validators multi value at index")
}
sum += v.EffectiveBalance
}
return sum, nil
}
func (b *BeaconState) validatorAtIndex(idx primitives.ValidatorIndex) (*ethpb.Validator, error) {
if b.validatorsMultiValue == nil {
return &ethpb.Validator{}, nil

View File

@@ -5,6 +5,7 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
@@ -14,6 +15,8 @@ type StateManager struct {
StatesBySlot map[primitives.Slot]state.BeaconState
}
var _ stategen.StateManager = (*StateManager)(nil)
// NewService --
func NewService() *StateManager {
return &StateManager{
@@ -101,3 +104,8 @@ func (m *StateManager) AddStateForSlot(state state.BeaconState, slot primitives.
func (m *StateManager) DeleteStateFromCaches(context.Context, [32]byte) error {
return nil
}
// FinalizedReadOnlyBalances --
func (m *StateManager) FinalizedReadOnlyBalances() stategen.NilCheckableReadOnlyBalances {
panic("unimplemented")
}

View File

@@ -27,6 +27,13 @@ var defaultHotStateDBInterval primitives.Slot = 128
var populatePubkeyCacheOnce sync.Once
// NilCheckableReadOnlyBalances adds the IsNil method to ReadOnlyBalances
// to allow checking if the underlying state value is nil.
type NilCheckableReadOnlyBalances interface {
state.ReadOnlyBalances
IsNil() bool
}
// StateManager represents a management object that handles the internal
// logic of maintaining both hot and cold states in DB.
type StateManager interface {
@@ -43,6 +50,7 @@ type StateManager interface {
ActiveNonSlashedBalancesByRoot(context.Context, [32]byte) ([]uint64, error)
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState
StateByRootInitialSync(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
FinalizedReadOnlyBalances() NilCheckableReadOnlyBalances
}
// State is a concrete implementation of StateManager.
@@ -201,3 +209,8 @@ func (s *State) FinalizedState() state.BeaconState {
defer s.finalizedInfo.lock.RUnlock()
return s.finalizedInfo.state.Copy()
}
// Returns the finalized state as a ReadOnlyBalances so that it can be used read-only without copying.
func (s *State) FinalizedReadOnlyBalances() NilCheckableReadOnlyBalances {
return s.finalizedInfo.state
}

View File

@@ -185,7 +185,7 @@ func (s *Service) validatorsCustodyRequirement() (uint64, error) {
}
// Retrieve the finalized state.
finalizedState := s.cfg.stateGen.FinalizedState()
finalizedState := s.cfg.stateGen.FinalizedReadOnlyBalances()
if finalizedState == nil || finalizedState.IsNil() {
return 0, nilFinalizedStateError
}

View File

@@ -268,10 +268,23 @@ func (s *Service) validateCommitteeIndexAndCount(
a eth.Att,
bs state.ReadOnlyBeaconState,
) (primitives.CommitteeIndex, uint64, pubsub.ValidationResult, error) {
// - [REJECT] attestation.data.index == 0
if a.Version() >= version.Electra && a.GetData().CommitteeIndex != 0 {
return 0, 0, pubsub.ValidationReject, errors.New("attestation data's committee index must be 0")
// Validate committee index based on fork.
if a.Version() >= version.Electra {
data := a.GetData()
attEpoch := slots.ToEpoch(data.Slot)
postGloas := attEpoch >= params.BeaconConfig().GloasForkEpoch
if postGloas {
if result, err := s.validateGloasCommitteeIndex(data); result != pubsub.ValidationAccept {
return 0, 0, result, err
}
} else {
// [REJECT] attestation.data.index == 0 (New in Electra, removed in Gloas)
if data.CommitteeIndex != 0 {
return 0, 0, pubsub.ValidationReject, errors.New("attestation data's committee index must be 0")
}
}
}
valCount, err := helpers.ActiveValidatorCount(ctx, bs, slots.ToEpoch(a.GetData().Slot))
if err != nil {
return 0, 0, pubsub.ValidationIgnore, err
@@ -356,6 +369,29 @@ func validateAttestingIndex(
return pubsub.ValidationAccept, nil
}
// validateGloasCommitteeIndex validates committee index rules for Gloas fork.
// [REJECT] attestation.data.index < 2. (New in Gloas)
// [REJECT] attestation.data.index == 0 if block.slot == attestation.data.slot. (New in Gloas)
func (s *Service) validateGloasCommitteeIndex(data *eth.AttestationData) (pubsub.ValidationResult, error) {
if data.CommitteeIndex >= 2 {
return pubsub.ValidationReject, errors.New("attestation data's committee index must be < 2")
}
// Same-slot attestations must use committee index 0
if data.CommitteeIndex != 0 {
blockRoot := bytesutil.ToBytes32(data.BeaconBlockRoot)
slot, err := s.cfg.chain.RecentBlockSlot(blockRoot)
if err != nil {
return pubsub.ValidationIgnore, err
}
if slot == data.Slot {
return pubsub.ValidationReject, errors.New("same slot attestations must use committee index 0")
}
}
return pubsub.ValidationAccept, nil
}
// generateUnaggregatedAttCacheKey generates the cache key for unaggregated attestation tracking.
func generateUnaggregatedAttCacheKey(att eth.Att) (string, error) {
var attester uint64

View File

@@ -684,3 +684,75 @@ func Test_validateCommitteeIndexAndCount_Boundary(t *testing.T) {
require.ErrorContains(t, "committee index", err)
require.Equal(t, pubsub.ValidationReject, res)
}
func Test_validateGloasCommitteeIndex(t *testing.T) {
tests := []struct {
name string
committeeIndex primitives.CommitteeIndex
attestationSlot primitives.Slot
blockSlot primitives.Slot
wantResult pubsub.ValidationResult
wantErr string
}{
{
name: "committee index >= 2 should reject",
committeeIndex: 2,
attestationSlot: 10,
blockSlot: 10,
wantResult: pubsub.ValidationReject,
wantErr: "committee index must be < 2",
},
{
name: "committee index 0 should accept",
committeeIndex: 0,
attestationSlot: 10,
blockSlot: 10,
wantResult: pubsub.ValidationAccept,
wantErr: "",
},
{
name: "committee index 1 different-slot should accept",
committeeIndex: 1,
attestationSlot: 10,
blockSlot: 9,
wantResult: pubsub.ValidationAccept,
wantErr: "",
},
{
name: "committee index 1 same-slot should reject",
committeeIndex: 1,
attestationSlot: 10,
blockSlot: 10,
wantResult: pubsub.ValidationReject,
wantErr: "same slot attestations must use committee index 0",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockChain := &mockChain.ChainService{
BlockSlot: tt.blockSlot,
}
s := &Service{
cfg: &config{
chain: mockChain,
},
}
data := &ethpb.AttestationData{
Slot: tt.attestationSlot,
CommitteeIndex: tt.committeeIndex,
BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32),
}
result, err := s.validateGloasCommitteeIndex(data)
require.Equal(t, tt.wantResult, result)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -0,0 +1,2 @@
### Fixed
- Avoid copying the full finalized state every time we compute cgc.

View File

@@ -0,0 +1,3 @@
### Added
- Add gossip beacon attestation validation conditions for Gloas fork

View File

@@ -0,0 +1,2 @@
### Ignored
- Refactor ProcessExecutionPayload to ApplyExecutionPayload