Compare commits

..

5 Commits

Author SHA1 Message Date
Manu NALEPA
94d35f7d52 Add seen cache for aggregated attestations
Add a seen cache that stores aggregated attestations when they are deleted from
the aggregatedAtt or blockAtt caches. This allows HasAggregatedAttestation
to still find recently pruned attestations, preventing duplicate processing.

The seen cache is populated on delete (not save) and is pruned along with
the other attestation caches. A metric tracks when the seen cache prevents
duplicate attestation processing.
2026-02-18 22:33:28 +01:00
Manu NALEPA
fc16bfb8f6 Refactor attestation cache lookup functions
Extract hasAggregatedAtt and hasBlockAtt helper functions from
HasAggregatedAttestation for better code organization and reuse.
Also improve code style with clearer variable names and error wrapping.
2026-02-18 13:30:21 +01:00
terence
8ee28394ab Add Gloas attestation committee index validation (#16359)
Adds Gloas fork attestation validation rules for gossip processing. This
implements the new committee index validation requirements introduced in
the Gloas fork.

## Changes
- Uses attestation epoch to determine if Gloas rules apply
- **Committee index validation**: 
  - Committee index must be < 2 (0 or 1 only)
- Same-slot attestations (where `attestation.data.slot == block.slot`)
must use committee index 0
  - Different-slot attestations can use either committee index 0 or 1
2026-02-17 18:00:32 +00:00
kasey
b31e2ffe51 avoid copying the finalized state when computing cgc (#16355)
Reviewing some (unrelated) sync code today I noticed that we are using a
stategen accessor for the finalized state which copies the entire state
object to look up validator balances to compute the custody_group_count.
This excess memory allocation is likely causing GC pressure and
increasing memory utilization.

This PR avoids state copying for this purpose by making the following
changes:
- Adds a new method to the `ReadOnlyBalances` state interface:
`EffectiveBalances([]primitives.ValidatorIndex) (uint64, []uint64,
error)`. This method computes returns the sum of the effective balances
of the given list of validator indices, a list with the individual
effective balance of each requested index (where the i-th element in the
return corresponds to the i-th element of the parameter), and an error -
which is necessary due to index bounds checks and quirks of multi-value
slice that can apparently result in the state being unusable for such
lookups if not correctly initialized.
- Adds a new method to the stategen interface
`FinalizedReadOnlyBalances`, which returns the finalized state asserted
to the `ReadOnlyBalances` interface.
- Switches the peerdas code to use the sum given by `EffectiveBalances`.

There was some existing nil checking code in the peerdas package that I
didn't want to modify, so I added a new compound interface in stategen
to allow the returned state to also expose the `IsNil` method.

fixes https://github.com/OffchainLabs/prysm/issues/16354

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2026-02-17 08:50:58 +00:00
terence
22e77add54 Refactor function ProcessExecutionPayload with ApplyExecutionPayload (#16356)
This PR refactors `ProcessExecutionPayload` with `ApplyExecutionPayload`
so caller can use Apply method to calculate post state root. Note that
validations are not required for Apply function. We really need the
state mutation lines that's:
```
1. Ensure latest_block_header.state_root is set (if zero, set it to the pre‑payload HashTreeRoot)...
2. processExecutionRequests()
3. QueueBuilderPayment()
4. SetExecutionPayloadAvailability(state.Slot(), true)
5. SetLatestBlockHash(payload.BlockHash())
```
I decided to keep them there because a. it's cheap b. it makes refactor
cleaner to reason c. API/caller may want to validate envelope and bid
consistency (ex: beacon api has option to validate consensus)
2026-02-13 15:51:22 +00:00
33 changed files with 695 additions and 419 deletions

View File

@@ -46,7 +46,6 @@ type ForkchoiceFetcher interface {
HighestReceivedBlockSlot() primitives.Slot
ReceivedBlocksLastEpoch() (uint64, error)
InsertNode(context.Context, state.BeaconState, consensus_blocks.ROBlock) error
InsertPayload(interfaces.ROExecutionPayloadEnvelope) error
ForkChoiceDump(context.Context) (*forkchoice.Dump, error)
NewSlot(context.Context, primitives.Slot) error
ProposerBoost() [32]byte

View File

@@ -7,7 +7,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
consensus_blocks "github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/forkchoice"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/runtime/version"
@@ -56,13 +55,6 @@ func (s *Service) InsertNode(ctx context.Context, st state.BeaconState, block co
return s.cfg.ForkChoiceStore.InsertNode(ctx, st, block)
}
// InsertPayload is a wrapper for payload insertion which is self locked
func (s *Service) InsertPayload(pe interfaces.ROExecutionPayloadEnvelope) error {
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
return s.cfg.ForkChoiceStore.InsertPayload(pe)
}
// ForkChoiceDump returns the corresponding value from forkchoice
func (s *Service) ForkChoiceDump(ctx context.Context) (*forkchoice.Dump, error) {
s.cfg.ForkChoiceStore.RLock()

View File

@@ -700,14 +700,6 @@ func (s *ChainService) InsertNode(ctx context.Context, st state.BeaconState, blo
return nil
}
// InsertPayload mocks the same method in the chain service
func (s *ChainService) InsertPayload(pe interfaces.ROExecutionPayloadEnvelope) error {
if s.ForkChoiceStore != nil {
return s.ForkChoiceStore.InsertPayload(pe)
}
return nil
}
// ForkChoiceDump mocks the same method in the chain service
func (s *ChainService) ForkChoiceDump(ctx context.Context) (*forkchoice2.Dump, error) {
if s.ForkChoiceStore != nil {

View File

@@ -112,6 +112,34 @@ func ProcessExecutionPayload(
return errors.Wrap(err, "signature verification failed")
}
envelope, err := signedEnvelope.Envelope()
if err != nil {
return errors.Wrap(err, "could not get envelope from signed envelope")
}
if err := ApplyExecutionPayload(ctx, st, envelope); err != nil {
return err
}
r, err := st.HashTreeRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get hash tree root")
}
if r != envelope.StateRoot() {
return fmt.Errorf("state root mismatch: expected %#x, got %#x", envelope.StateRoot(), r)
}
return nil
}
// ApplyExecutionPayload applies the execution payload envelope to the state and performs the same
// consistency checks as the full processing path. This keeps the post-payload state root computation
// on a shared code path, even though some bid/payload checks are not strictly required for the root itself.
func ApplyExecutionPayload(
ctx context.Context,
st state.BeaconState,
envelope interfaces.ROExecutionPayloadEnvelope,
) error {
latestHeader := st.LatestBlockHeader()
if len(latestHeader.StateRoot) == 0 || bytes.Equal(latestHeader.StateRoot, make([]byte, 32)) {
previousStateRoot, err := st.HashTreeRoot(ctx)
@@ -128,10 +156,6 @@ func ProcessExecutionPayload(
if err != nil {
return errors.Wrap(err, "could not compute block header root")
}
envelope, err := signedEnvelope.Envelope()
if err != nil {
return errors.Wrap(err, "could not get envelope from signed envelope")
}
beaconBlockRoot := envelope.BeaconBlockRoot()
if !bytes.Equal(beaconBlockRoot[:], blockHeaderRoot[:]) {
@@ -217,14 +241,6 @@ func ProcessExecutionPayload(
return errors.Wrap(err, "could not set latest block hash")
}
r, err := st.HashTreeRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get hash tree root")
}
if r != envelope.StateRoot() {
return fmt.Errorf("state root mismatch: expected %#x, got %#x", envelope.StateRoot(), r)
}
return nil
}

View File

@@ -73,23 +73,22 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
totalNodeBalance := uint64(0)
func ValidatorsCustodyRequirement(st beaconState.ReadOnlyBalances, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
cfg := params.BeaconConfig()
idxs := make([]primitives.ValidatorIndex, 0, len(validatorsIndex))
for index := range validatorsIndex {
validator, err := state.ValidatorAtIndexReadOnly(index)
if err != nil {
return 0, errors.Wrapf(err, "validator at index %v", index)
}
totalNodeBalance += validator.EffectiveBalance()
idxs = append(idxs, index)
}
totalBalance, err := st.EffectiveBalanceSum(idxs)
if err != nil {
return 0, errors.Wrap(err, "effective balances")
}
cfg := params.BeaconConfig()
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
validatorCustodyRequirement := cfg.ValidatorCustodyRequirement
balancePerAdditionalCustodyGroup := cfg.BalancePerAdditionalCustodyGroup
count := totalNodeBalance / balancePerAdditionalCustodyGroup
count := totalBalance / balancePerAdditionalCustodyGroup
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroups), nil
}

View File

@@ -20,7 +20,6 @@ go_library(
"//config/fieldparams:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/forkchoice:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],

View File

@@ -52,7 +52,6 @@ go_test(
srcs = [
"ffg_update_test.go",
"forkchoice_test.go",
"gloas_test.go",
"no_vote_test.go",
"node_test.go",
"on_tick_test.go",
@@ -72,7 +71,6 @@ go_test(
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/forkchoice:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"slices"
"time"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -288,29 +287,3 @@ func (s *Store) nodeTreeDump(ctx context.Context, n *Node, nodes []*forkchoice2.
}
return nodes, nil
}
// InsertPayload inserts a full node into forkchoice after the Gloas fork.
func (f *ForkChoice) InsertPayload(pe interfaces.ROExecutionPayloadEnvelope) error {
if pe.IsNil() {
return errors.New("cannot insert nil payload")
}
s := f.store
root := pe.BeaconBlockRoot()
en := s.emptyNodeByRoot[root]
if en == nil {
return errors.Wrap(ErrNilNode, "cannot insert full node without an empty one")
}
if _, ok := s.fullNodeByRoot[root]; ok {
// We don't import two payloads for the same root
return nil
}
fn := &PayloadNode{
node: en.node,
optimistic: true,
timestamp: time.Now(),
full: true,
children: make([]*Node, 0),
}
s.fullNodeByRoot[root] = fn
return nil
}

View File

@@ -1,299 +0,0 @@
package doublylinkedtree
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
)
func prepareGloasForkchoiceState(
_ context.Context,
slot primitives.Slot,
blockRoot [32]byte,
parentRoot [32]byte,
blockHash [32]byte,
parentBlockHash [32]byte,
justifiedEpoch primitives.Epoch,
finalizedEpoch primitives.Epoch,
) (state.BeaconState, blocks.ROBlock, error) {
blockHeader := &ethpb.BeaconBlockHeader{
ParentRoot: parentRoot[:],
}
justifiedCheckpoint := &ethpb.Checkpoint{
Epoch: justifiedEpoch,
}
finalizedCheckpoint := &ethpb.Checkpoint{
Epoch: finalizedEpoch,
}
builderPendingPayments := make([]*ethpb.BuilderPendingPayment, 64)
for i := range builderPendingPayments {
builderPendingPayments[i] = &ethpb.BuilderPendingPayment{
Withdrawal: &ethpb.BuilderPendingWithdrawal{
FeeRecipient: make([]byte, 20),
},
}
}
base := &ethpb.BeaconStateGloas{
Slot: slot,
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
CurrentJustifiedCheckpoint: justifiedCheckpoint,
FinalizedCheckpoint: finalizedCheckpoint,
LatestBlockHeader: blockHeader,
LatestExecutionPayloadBid: &ethpb.ExecutionPayloadBid{
BlockHash: blockHash[:],
ParentBlockHash: parentBlockHash[:],
ParentBlockRoot: make([]byte, 32),
PrevRandao: make([]byte, 32),
FeeRecipient: make([]byte, 20),
BlobKzgCommitments: [][]byte{make([]byte, 48)},
},
Builders: make([]*ethpb.Builder, 0),
BuilderPendingPayments: builderPendingPayments,
ExecutionPayloadAvailability: make([]byte, 1024),
LatestBlockHash: make([]byte, 32),
PayloadExpectedWithdrawals: make([]*enginev1.Withdrawal, 0),
ProposerLookahead: make([]uint64, 64),
}
st, err := state_native.InitializeFromProtoUnsafeGloas(base)
if err != nil {
return nil, blocks.ROBlock{}, err
}
bid := util.HydrateSignedExecutionPayloadBid(&ethpb.SignedExecutionPayloadBid{
Message: &ethpb.ExecutionPayloadBid{
BlockHash: blockHash[:],
ParentBlockHash: parentBlockHash[:],
},
})
blk := util.HydrateSignedBeaconBlockGloas(&ethpb.SignedBeaconBlockGloas{
Block: &ethpb.BeaconBlockGloas{
Slot: slot,
ParentRoot: parentRoot[:],
Body: &ethpb.BeaconBlockBodyGloas{
SignedExecutionPayloadBid: bid,
},
},
})
signed, err := blocks.NewSignedBeaconBlock(blk)
if err != nil {
return nil, blocks.ROBlock{}, err
}
roblock, err := blocks.NewROBlockWithRoot(signed, blockRoot)
return st, roblock, err
}
func prepareGloasForkchoicePayload(
blockRoot [32]byte,
) (interfaces.ROExecutionPayloadEnvelope, error) {
env := &ethpb.ExecutionPayloadEnvelope{
BeaconBlockRoot: blockRoot[:],
Payload: &enginev1.ExecutionPayloadDeneb{},
}
return blocks.WrappedROExecutionPayloadEnvelope(env)
}
func TestInsertGloasBlock_EmptyNodeOnly(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
root := indexToHash(1)
blockHash := indexToHash(100)
st, roblock, err := prepareGloasForkchoiceState(ctx, 1, root, params.BeaconConfig().ZeroHash, blockHash, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
// Empty node should exist.
en := f.store.emptyNodeByRoot[root]
require.NotNil(t, en)
// Full node should NOT exist.
_, hasFull := f.store.fullNodeByRoot[root]
assert.Equal(t, false, hasFull)
// Parent should be the genesis full node.
genesisRoot := params.BeaconConfig().ZeroHash
genesisFull := f.store.fullNodeByRoot[genesisRoot]
require.NotNil(t, genesisFull)
assert.Equal(t, genesisFull, en.node.parent)
}
func TestInsertPayload_CreatesFullNode(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
root := indexToHash(1)
blockHash := indexToHash(100)
st, roblock, err := prepareGloasForkchoiceState(ctx, 1, root, params.BeaconConfig().ZeroHash, blockHash, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
require.Equal(t, 2, len(f.store.emptyNodeByRoot))
require.Equal(t, 1, len(f.store.fullNodeByRoot))
pe, err := prepareGloasForkchoicePayload(root)
require.NoError(t, err)
require.NoError(t, f.InsertPayload(pe))
require.Equal(t, 2, len(f.store.fullNodeByRoot))
fn := f.store.fullNodeByRoot[root]
require.NotNil(t, fn)
en := f.store.emptyNodeByRoot[root]
require.NotNil(t, en)
// Empty and full share the same *Node.
assert.Equal(t, en.node, fn.node)
assert.Equal(t, true, fn.optimistic)
assert.Equal(t, true, fn.full)
}
func TestInsertPayload_DuplicateIsNoop(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
root := indexToHash(1)
blockHash := indexToHash(100)
st, roblock, err := prepareGloasForkchoiceState(ctx, 1, root, params.BeaconConfig().ZeroHash, blockHash, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
pe, err := prepareGloasForkchoicePayload(root)
require.NoError(t, err)
require.NoError(t, f.InsertPayload(pe))
require.Equal(t, 2, len(f.store.fullNodeByRoot))
fn := f.store.fullNodeByRoot[root]
require.NotNil(t, fn)
// Insert again — should be a no-op.
require.NoError(t, f.InsertPayload(pe))
assert.Equal(t, fn, f.store.fullNodeByRoot[root])
require.Equal(t, 2, len(f.store.fullNodeByRoot))
}
func TestInsertPayload_WithoutEmptyNode_Errors(t *testing.T) {
f := setup(0, 0)
root := indexToHash(99)
pe, err := prepareGloasForkchoicePayload(root)
require.NoError(t, err)
err = f.InsertPayload(pe)
require.ErrorContains(t, ErrNilNode.Error(), err)
}
func TestGloasBlock_ChildBuildsOnEmpty(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
// Insert Gloas block A (empty only).
rootA := indexToHash(1)
blockHashA := indexToHash(100)
st, roblock, err := prepareGloasForkchoiceState(ctx, 1, rootA, params.BeaconConfig().ZeroHash, blockHashA, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
// Insert Gloas block B as child of (A, empty)
rootB := indexToHash(2)
blockHashB := indexToHash(200)
nonMatchingParentHash := indexToHash(999)
st, roblock, err = prepareGloasForkchoiceState(ctx, 2, rootB, rootA, blockHashB, nonMatchingParentHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
emptyA := f.store.emptyNodeByRoot[rootA]
require.NotNil(t, emptyA)
nodeB := f.store.emptyNodeByRoot[rootB]
require.NotNil(t, nodeB)
require.Equal(t, emptyA, nodeB.node.parent)
}
func TestGloasBlock_ChildrenOfEmptyAndFull(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
// Insert Gloas block A (empty only).
rootA := indexToHash(1)
blockHashA := indexToHash(100)
st, roblock, err := prepareGloasForkchoiceState(ctx, 1, rootA, params.BeaconConfig().ZeroHash, blockHashA, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
// Insert payload for A
pe, err := prepareGloasForkchoicePayload(rootA)
require.NoError(t, err)
require.NoError(t, f.InsertPayload(pe))
// Insert Gloas block B as child of (A, empty)
rootB := indexToHash(2)
blockHashB := indexToHash(200)
nonMatchingParentHash := indexToHash(999)
st, roblock, err = prepareGloasForkchoiceState(ctx, 2, rootB, rootA, blockHashB, nonMatchingParentHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
// Insert Gloas block C as child of (A, full)
rootC := indexToHash(3)
blockHashC := indexToHash(201)
st, roblock, err = prepareGloasForkchoiceState(ctx, 3, rootC, rootA, blockHashC, blockHashA, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
emptyA := f.store.emptyNodeByRoot[rootA]
require.NotNil(t, emptyA)
nodeB := f.store.emptyNodeByRoot[rootB]
require.NotNil(t, nodeB)
require.Equal(t, emptyA, nodeB.node.parent)
nodeC := f.store.emptyNodeByRoot[rootC]
require.NotNil(t, nodeC)
fullA := f.store.fullNodeByRoot[rootA]
require.NotNil(t, fullA)
require.Equal(t, fullA, nodeC.node.parent)
}
func TestGloasBlock_ChildBuildsOnFull(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
// Insert Gloas block A (empty only).
rootA := indexToHash(1)
blockHashA := indexToHash(100)
st, roblock, err := prepareGloasForkchoiceState(ctx, 1, rootA, params.BeaconConfig().ZeroHash, blockHashA, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
// Insert payload for A → creates the full node.
pe, err := prepareGloasForkchoicePayload(rootA)
require.NoError(t, err)
require.NoError(t, f.InsertPayload(pe))
fullA := f.store.fullNodeByRoot[rootA]
require.NotNil(t, fullA)
// Child for (A, full)
rootB := indexToHash(2)
blockHashB := indexToHash(200)
st, roblock, err = prepareGloasForkchoiceState(ctx, 2, rootB, rootA, blockHashB, blockHashA, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, roblock))
nodeB := f.store.emptyNodeByRoot[rootB]
require.NotNil(t, nodeB)
assert.Equal(t, fullA, nodeB.node.parent)
}

View File

@@ -136,7 +136,6 @@ func (s *Store) insert(ctx context.Context,
node: n,
optimistic: optimistic,
timestamp: time.Now(),
children: make([]*Node, 0),
}
s.emptyNodeByRoot[root] = pn
ret = pn

View File

@@ -9,7 +9,6 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
consensus_blocks "github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
forkchoice2 "github.com/OffchainLabs/prysm/v7/consensus-types/forkchoice"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
@@ -24,7 +23,6 @@ type ForkChoicer interface {
Unlock()
HeadRetriever // to compute head.
BlockProcessor // to track new block for fork choice.
PayloadProcessor // to track new payloads for fork choice.
AttestationProcessor // to track new attestation for fork choice.
Getter // to retrieve fork choice information.
Setter // to set fork choice information.
@@ -49,11 +47,6 @@ type BlockProcessor interface {
InsertChain(context.Context, []*forkchoicetypes.BlockAndCheckpoints) error
}
// PayloadProcessor processes a payload envelope
type PayloadProcessor interface {
InsertPayload(interfaces.ROExecutionPayloadEnvelope) error
}
// AttestationProcessor processes the attestation that's used for accounting fork choice.
type AttestationProcessor interface {
ProcessAttestation(context.Context, []uint64, [32]byte, primitives.Epoch)

View File

@@ -7,6 +7,7 @@ go_library(
"block.go",
"kv.go",
"log.go",
"metrics.go",
"seen_bits.go",
"unaggregated.go",
],
@@ -24,6 +25,8 @@ go_library(
"//runtime/version:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"fmt"
"runtime"
"sync"
@@ -273,18 +274,29 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
filtered := make([]ethpb.Att, 0)
for _, a := range attList {
if c, err := att.GetAggregationBits().Contains(a.GetAggregationBits()); err != nil {
return err
} else if !c {
filtered = append(filtered, a)
contains, err := att.GetAggregationBits().Contains(a.GetAggregationBits())
if err != nil {
return fmt.Errorf("aggregation bits contain: %w", err)
}
}
if len(filtered) == 0 {
delete(c.aggregatedAtt, id)
} else {
c.aggregatedAtt[id] = filtered
if contains {
if err := c.insertSeenAggregatedAtt(a); err != nil {
return fmt.Errorf("insert aggregated att: %w", err)
}
continue
}
// If the attestation in the cache doesn't contain the bits of the attestation to delete, we keep it in the cache.
filtered = append(filtered, a)
}
if len(filtered) == 0 {
delete(c.aggregatedAtt, id)
return nil
}
c.aggregatedAtt[id] = filtered
return nil
}
@@ -294,32 +306,118 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {
return false, err
}
has, err := c.hasAggregatedAtt(att)
if err != nil {
return false, fmt.Errorf("has aggregated att: %w", err)
}
if has {
return true, nil
}
has, err = c.hasBlockAtt(att)
if err != nil {
return false, fmt.Errorf("has block att: %w", err)
}
if has {
return true, nil
}
has, err = c.hasSeenAggregatedAtt(att)
if err != nil {
return false, fmt.Errorf("has seen aggregated att: %w", err)
}
if has {
savedBySeenAggregatedCache.Inc()
return true, nil
}
return false, nil
}
// hasAggregatedAtt checks if the attestation bits are contained in the aggregated attestation cache.
func (c *AttCaches) hasAggregatedAtt(att ethpb.Att) (bool, error) {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, errors.Wrap(err, "could not create attestation ID")
return false, fmt.Errorf("could not create attestation ID: %w", err)
}
c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
if atts, ok := c.aggregatedAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
} else if c {
return true, nil
}
cacheAtts, ok := c.aggregatedAtt[id]
if !ok {
return false, nil
}
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return false, fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return true, nil
}
}
return false, nil
}
// hasBlockAtt checks if the attestation bits are contained in the block attestation cache.
func (c *AttCaches) hasBlockAtt(att ethpb.Att) (bool, error) {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, fmt.Errorf("could not create attestation ID: %w", err)
}
c.blockAttLock.RLock()
defer c.blockAttLock.RUnlock()
if atts, ok := c.blockAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
} else if c {
return true, nil
}
cacheAtts, ok := c.blockAtt[id]
if !ok {
return false, nil
}
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return false, fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return true, nil
}
}
return false, nil
}
// hasSeenAggregatedAtt checks if the attestation bits are contained in the seen aggregated cache.
func (c *AttCaches) hasSeenAggregatedAtt(att ethpb.Att) (bool, error) {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, fmt.Errorf("could not create attestation ID: %w", err)
}
c.seenAggregatedAttLock.RLock()
defer c.seenAggregatedAttLock.RUnlock()
cacheAtts, ok := c.seenAggregatedAtt[id]
if !ok {
return false, nil
}
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return false, fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return true, nil
}
}
@@ -332,3 +430,93 @@ func (c *AttCaches) AggregatedAttestationCount() int {
defer c.aggregatedAttLock.RUnlock()
return len(c.aggregatedAtt)
}
// insertSeenAggregatedAtt inserts an attestation into the seen aggregated cache.
func (c *AttCaches) insertSeenAggregatedAtt(att ethpb.Att) error {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return fmt.Errorf("new ID: %w", err)
}
c.seenAggregatedAttLock.Lock()
defer c.seenAggregatedAttLock.Unlock()
cacheAtts, ok := c.seenAggregatedAtt[id]
if !ok {
c.seenAggregatedAtt[id] = []ethpb.Att{att.Clone()}
return nil
}
// Check if attestation is already contained
for _, cacheAtt := range cacheAtts {
contains, err := cacheAtt.GetAggregationBits().Contains(att.GetAggregationBits())
if err != nil {
return fmt.Errorf("aggregation bits contains: %w", err)
}
if contains {
return nil
}
}
c.seenAggregatedAtt[id] = append(cacheAtts, att.Clone())
return nil
}
// SeenAggregatedAttestationCount returns the number of keys in the seen aggregated cache.
func (c *AttCaches) SeenAggregatedAttestationCount() int {
c.seenAggregatedAttLock.RLock()
defer c.seenAggregatedAttLock.RUnlock()
return len(c.seenAggregatedAtt)
}
// SeenAggregatedAttestations returns all attestations from the seen cache.
func (c *AttCaches) SeenAggregatedAttestations() []ethpb.Att {
c.seenAggregatedAttLock.RLock()
defer c.seenAggregatedAttLock.RUnlock()
atts := make([]ethpb.Att, 0)
for _, a := range c.seenAggregatedAtt {
atts = append(atts, a...)
}
return atts
}
// DeleteSeenAggregatedAttestation deletes an attestation from the seen cache.
func (c *AttCaches) DeleteSeenAggregatedAttestation(att ethpb.Att) error {
if att == nil || att.IsNil() {
return nil
}
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return fmt.Errorf("could not create attestation ID: %w", err)
}
c.seenAggregatedAttLock.Lock()
defer c.seenAggregatedAttLock.Unlock()
cacheAtts, ok := c.seenAggregatedAtt[id]
if !ok {
return nil
}
filtered := make([]ethpb.Att, 0)
for _, cacheAtt := range cacheAtts {
contains, err := att.GetAggregationBits().Contains(cacheAtt.GetAggregationBits())
if err != nil {
return fmt.Errorf("aggregation bits contains: %w", err)
}
if !contains {
filtered = append(filtered, cacheAtt)
}
}
if len(filtered) == 0 {
delete(c.seenAggregatedAtt, id)
return nil
}
c.seenAggregatedAtt[id] = filtered
return nil
}

View File

@@ -544,3 +544,204 @@ func TestKV_Aggregated_AggregatedAttestationsBySlotIndexElectra(t *testing.T) {
returned = cache.AggregatedAttestationsBySlotIndexElectra(ctx, 2, 1)
assert.DeepEqual(t, []*ethpb.AttestationElectra{att3}, returned)
}
func TestKV_SeenAggregated_Cache(t *testing.T) {
t.Run("insert on delete from aggregated cache", func(t *testing.T) {
cache := NewAttCaches()
att1 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
att2 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}})
// Save attestations
require.NoError(t, cache.SaveAggregatedAttestation(att1))
require.NoError(t, cache.SaveAggregatedAttestation(att2))
// Seen aggregated cache should be empty before deletion
assert.Equal(t, 0, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should be empty before deletion")
// Delete one attestation
require.NoError(t, cache.DeleteAggregatedAttestation(att1))
// Seen aggregated cache should now contain the deleted attestation
assert.Equal(t, 1, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should have one entry after deletion")
// The deleted attestation should be found via HasAggregatedAttestation (through seen aggregated cache)
has, err := cache.HasAggregatedAttestation(att1)
require.NoError(t, err)
assert.Equal(t, true, has, "Deleted attestation should be found in seen aggregated cache")
})
t.Run("has aggregated attestation via seen aggregated cache", func(t *testing.T) {
cache := NewAttCaches()
att := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
// Save and delete attestation to populate seen aggregated cache
require.NoError(t, cache.SaveAggregatedAttestation(att))
require.NoError(t, cache.DeleteAggregatedAttestation(att))
// Attestation should not be in aggregated cache
assert.Equal(t, 0, cache.AggregatedAttestationCount(), "Aggregated cache should be empty")
// But should still be found via HasAggregatedAttestation (through seen aggregated cache)
has, err := cache.HasAggregatedAttestation(att)
require.NoError(t, err)
assert.Equal(t, true, has, "Attestation should be found in seen aggregated cache")
// Subset of bits should also be found
attSubset := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1100}})
has, err = cache.HasAggregatedAttestation(attSubset)
require.NoError(t, err)
assert.Equal(t, true, has, "Subset attestation should be found in seen aggregated cache")
})
t.Run("delete from seen aggregated cache", func(t *testing.T) {
cache := NewAttCaches()
att1 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
att2 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}})
// Save and delete attestations to populate seen aggregated cache
require.NoError(t, cache.SaveAggregatedAttestation(att1))
require.NoError(t, cache.SaveAggregatedAttestation(att2))
require.NoError(t, cache.DeleteAggregatedAttestation(att1))
require.NoError(t, cache.DeleteAggregatedAttestation(att2))
assert.Equal(t, 2, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should have two entries")
// Delete from seen aggregated cache
require.NoError(t, cache.DeleteSeenAggregatedAttestation(att1))
assert.Equal(t, 1, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should have one entry after deletion")
// att1 should no longer be found
has, err := cache.HasAggregatedAttestation(att1)
require.NoError(t, err)
assert.Equal(t, false, has, "Deleted seen aggregated attestation should not be found")
// att2 should still be found
has, err = cache.HasAggregatedAttestation(att2)
require.NoError(t, err)
assert.Equal(t, true, has, "Non-deleted seen aggregated attestation should still be found")
})
t.Run("get all seen aggregated attestations", func(t *testing.T) {
cache := NewAttCaches()
att1 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
att2 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}})
// Save and delete attestations to populate seen aggregated cache
require.NoError(t, cache.SaveAggregatedAttestation(att1))
require.NoError(t, cache.SaveAggregatedAttestation(att2))
require.NoError(t, cache.DeleteAggregatedAttestation(att1))
require.NoError(t, cache.DeleteAggregatedAttestation(att2))
// Get all seen aggregated attestations
ttlAtts := cache.SeenAggregatedAttestations()
assert.Equal(t, 2, len(ttlAtts), "Should return all seen aggregated attestations")
})
t.Run("insert on delete from block cache", func(t *testing.T) {
cache := NewAttCaches()
att := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
// Save as block attestation
require.NoError(t, cache.SaveBlockAttestation(att))
// Seen aggregated cache should be empty before deletion
assert.Equal(t, 0, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should be empty before deletion")
// Delete block attestation
require.NoError(t, cache.DeleteBlockAttestation(att))
// Seen aggregated cache should now contain the deleted attestation
assert.Equal(t, 1, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should have one entry after block attestation deletion")
// The deleted attestation should be found via HasAggregatedAttestation (through seen aggregated cache)
has, err := cache.HasAggregatedAttestation(att)
require.NoError(t, err)
assert.Equal(t, true, has, "Deleted block attestation should be found in seen aggregated cache")
})
t.Run("no duplicates in seen aggregated cache", func(t *testing.T) {
cache := NewAttCaches()
att := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
// Save and delete the same attestation multiple times
require.NoError(t, cache.SaveAggregatedAttestation(att))
require.NoError(t, cache.DeleteAggregatedAttestation(att))
require.NoError(t, cache.SaveAggregatedAttestation(att))
require.NoError(t, cache.DeleteAggregatedAttestation(att))
// Seen aggregated cache should only have one entry (no duplicates)
ttlAtts := cache.SeenAggregatedAttestations()
assert.Equal(t, 1, len(ttlAtts), "seen aggregated cache should not have duplicates")
})
t.Run("multiple attestations with different bits for same data", func(t *testing.T) {
cache := NewAttCaches()
// Create attestations with the same data but non-overlapping aggregation bits
att1 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b10011}})
att2 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b11100}})
// Directly insert into seen aggregated cache to test the append path
require.NoError(t, cache.insertSeenAggregatedAtt(att1))
require.NoError(t, cache.insertSeenAggregatedAtt(att2))
// Seen aggregated cache should have one key but two attestations (since bits don't overlap)
assert.Equal(t, 1, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should have one key")
ttlAtts := cache.SeenAggregatedAttestations()
assert.Equal(t, 2, len(ttlAtts), "seen aggregated cache should have two attestations under the same key")
// Both should be found
has, err := cache.HasAggregatedAttestation(att1)
require.NoError(t, err)
assert.Equal(t, true, has, "First attestation should be found in seen aggregated cache")
has, err = cache.HasAggregatedAttestation(att2)
require.NoError(t, err)
assert.Equal(t, true, has, "Second attestation should be found in seen aggregated cache")
// A subset of att1 should be found
attSubset := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b10001}})
has, err = cache.HasAggregatedAttestation(attSubset)
require.NoError(t, err)
assert.Equal(t, true, has, "Subset of first attestation should be found in seen aggregated cache")
})
t.Run("partial delete from seen aggregated cache with same key", func(t *testing.T) {
cache := NewAttCaches()
// Create attestations with the same data but non-overlapping aggregation bits
att1 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b10011}})
att2 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b11100}})
// Insert both into seen aggregated cache
require.NoError(t, cache.insertSeenAggregatedAtt(att1))
require.NoError(t, cache.insertSeenAggregatedAtt(att2))
// Verify both are in the cache
assert.Equal(t, 1, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should have one key")
assert.Equal(t, 2, len(cache.SeenAggregatedAttestations()), "seen aggregated cache should have two attestations")
// Delete only att1 from seen aggregated cache
require.NoError(t, cache.DeleteSeenAggregatedAttestation(att1))
// Key should still exist with one attestation remaining
assert.Equal(t, 1, cache.SeenAggregatedAttestationCount(), "seen aggregated cache should still have one key")
assert.Equal(t, 1, len(cache.SeenAggregatedAttestations()), "seen aggregated cache should have one attestation remaining")
// att1 should no longer be found
has, err := cache.HasAggregatedAttestation(att1)
require.NoError(t, err)
assert.Equal(t, false, has, "Deleted attestation should not be found")
// att2 should still be found
has, err = cache.HasAggregatedAttestation(att2)
require.NoError(t, err)
assert.Equal(t, true, has, "Remaining attestation should still be found")
})
}

View File

@@ -1,6 +1,8 @@
package kv
import (
"fmt"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1/attestation"
"github.com/pkg/errors"
@@ -63,6 +65,16 @@ func (c *AttCaches) DeleteBlockAttestation(att ethpb.Att) error {
c.blockAttLock.Lock()
defer c.blockAttLock.Unlock()
// Insert all attestations into the seen aggregated cache before deleting
if cacheAtts, ok := c.blockAtt[id]; ok {
for _, cacheAtt := range cacheAtts {
if err := c.insertSeenAggregatedAtt(cacheAtt); err != nil {
return fmt.Errorf("insert seen aggregated att: %w", err)
}
}
}
delete(c.blockAtt, id)
return nil

View File

@@ -18,14 +18,16 @@ import (
// These caches are KV store for various attestations
// such are unaggregated, aggregated or attestations within a block.
type AttCaches struct {
aggregatedAttLock sync.RWMutex
aggregatedAtt map[attestation.Id][]ethpb.Att
unAggregateAttLock sync.RWMutex
unAggregatedAtt map[attestation.Id]ethpb.Att
forkchoiceAtt *attmap.Attestations
blockAttLock sync.RWMutex
blockAtt map[attestation.Id][]ethpb.Att
seenAtt *cache.Cache
aggregatedAttLock sync.RWMutex
aggregatedAtt map[attestation.Id][]ethpb.Att
unAggregateAttLock sync.RWMutex
unAggregatedAtt map[attestation.Id]ethpb.Att
forkchoiceAtt *attmap.Attestations
blockAttLock sync.RWMutex
blockAtt map[attestation.Id][]ethpb.Att
seenAtt *cache.Cache
seenAggregatedAttLock sync.RWMutex
seenAggregatedAtt map[attestation.Id][]ethpb.Att
}
// NewAttCaches initializes a new attestation pool consists of multiple KV store in cache for
@@ -34,11 +36,12 @@ func NewAttCaches() *AttCaches {
secsInEpoch := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
c := cache.New(2*secsInEpoch*time.Second, 2*secsInEpoch*time.Second)
pool := &AttCaches{
unAggregatedAtt: make(map[attestation.Id]ethpb.Att),
aggregatedAtt: make(map[attestation.Id][]ethpb.Att),
forkchoiceAtt: attmap.New(),
blockAtt: make(map[attestation.Id][]ethpb.Att),
seenAtt: c,
unAggregatedAtt: make(map[attestation.Id]ethpb.Att),
aggregatedAtt: make(map[attestation.Id][]ethpb.Att),
forkchoiceAtt: attmap.New(),
blockAtt: make(map[attestation.Id][]ethpb.Att),
seenAtt: c,
seenAggregatedAtt: make(map[attestation.Id][]ethpb.Att),
}
return pool

View File

@@ -0,0 +1,11 @@
package kv
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var savedBySeenAggregatedCache = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_saved_by_seen_aggregated_cache_total",
Help: "The number of times an attestation was found only in the seen aggregated cache and not in the regular caches.",
})

View File

@@ -30,6 +30,16 @@ var (
Name: "expired_block_atts_total",
Help: "The number of expired and deleted block attestations in the pool.",
})
seenAggregatedAttsCount = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "seen_aggregated_attestations_in_pool_total",
Help: "The number of aggregated attestations in the seen cache.",
},
)
expiredSeenAggregatedAtts = promauto.NewCounter(prometheus.CounterOpts{
Name: "expired_seen_aggregated_atts_total",
Help: "The number of expired and deleted aggregated attestations from the seen cache.",
})
attCount = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "attestations_in_pool_total",
@@ -59,6 +69,7 @@ var (
func (s *Service) updateMetrics() {
aggregatedAttsCount.Set(float64(s.cfg.Pool.AggregatedAttestationCount()))
unaggregatedAttsCount.Set(float64(s.cfg.Pool.UnaggregatedAttestationCount()))
seenAggregatedAttsCount.Set(float64(s.cfg.Pool.SeenAggregatedAttestationCount()))
}
func (s *Service) updateMetricsExperimental(numExpired uint64) {

View File

@@ -68,6 +68,21 @@ func (*PoolMock) AggregatedAttestationCount() int {
panic("implement me")
}
// SeenAggregatedAttestations --
func (*PoolMock) SeenAggregatedAttestations() []ethpb.Att {
panic("implement me")
}
// DeleteSeenAggregatedAttestation --
func (*PoolMock) DeleteSeenAggregatedAttestation(_ ethpb.Att) error {
panic("implement me")
}
// SeenAggregatedAttestationCount --
func (*PoolMock) SeenAggregatedAttestationCount() int {
panic("implement me")
}
// SaveUnaggregatedAttestation --
func (*PoolMock) SaveUnaggregatedAttestation(_ ethpb.Att) error {
panic("implement me")

View File

@@ -23,6 +23,10 @@ type Pool interface {
DeleteAggregatedAttestation(att ethpb.Att) error
HasAggregatedAttestation(att ethpb.Att) (bool, error)
AggregatedAttestationCount() int
// Seen aggregated attestations.
SeenAggregatedAttestations() []ethpb.Att
DeleteSeenAggregatedAttestation(att ethpb.Att) error
SeenAggregatedAttestationCount() int
// For unaggregated attestations.
SaveUnaggregatedAttestation(att ethpb.Att) error
SaveUnaggregatedAttestations(atts []ethpb.Att) error

View File

@@ -81,6 +81,16 @@ func (s *Service) pruneExpiredAtts() {
expiredBlockAtts.Inc()
}
}
ttlAtts := s.cfg.Pool.SeenAggregatedAttestations()
for _, att := range ttlAtts {
if s.expired(att.GetData().Slot) {
if err := s.cfg.Pool.DeleteSeenAggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not delete expired seen aggregated attestation")
}
expiredSeenAggregatedAtts.Inc()
}
}
}
// Return true if the input slot has been expired.

View File

@@ -152,6 +152,7 @@ type ReadOnlyBalances interface {
Balances() []uint64
BalanceAtIndex(idx primitives.ValidatorIndex) (uint64, error)
BalancesLength() int
EffectiveBalanceSum([]primitives.ValidatorIndex) (uint64, error)
}
// ReadOnlyCheckpoint defines a struct which only has read access to checkpoint methods.

View File

@@ -8,6 +8,7 @@ import (
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/pkg/errors"
)
// Validators participating in consensus on the beacon chain.
@@ -80,6 +81,25 @@ func (b *BeaconState) ValidatorAtIndex(idx primitives.ValidatorIndex) (*ethpb.Va
return b.validatorAtIndex(idx)
}
// EffectiveBalances returns the sum of the effective balances of the given list of validator indices, the eb of each given validator, or an
// error if one of the indices is out of bounds, or the state wasn't correctly initialized.
func (b *BeaconState) EffectiveBalanceSum(idxs []primitives.ValidatorIndex) (uint64, error) {
b.lock.RLock()
defer b.lock.RUnlock()
var sum uint64
for i := range idxs {
if b.validatorsMultiValue == nil {
return 0, errors.Wrap(state.ErrNilValidatorsInState, "nil validators multi-value slice")
}
v, err := b.validatorsMultiValue.At(b, uint64(idxs[i]))
if err != nil {
return 0, errors.Wrap(err, "validators multi value at index")
}
sum += v.EffectiveBalance
}
return sum, nil
}
func (b *BeaconState) validatorAtIndex(idx primitives.ValidatorIndex) (*ethpb.Validator, error) {
if b.validatorsMultiValue == nil {
return &ethpb.Validator{}, nil

View File

@@ -5,6 +5,7 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
@@ -14,6 +15,8 @@ type StateManager struct {
StatesBySlot map[primitives.Slot]state.BeaconState
}
var _ stategen.StateManager = (*StateManager)(nil)
// NewService --
func NewService() *StateManager {
return &StateManager{
@@ -101,3 +104,8 @@ func (m *StateManager) AddStateForSlot(state state.BeaconState, slot primitives.
func (m *StateManager) DeleteStateFromCaches(context.Context, [32]byte) error {
return nil
}
// FinalizedReadOnlyBalances --
func (m *StateManager) FinalizedReadOnlyBalances() stategen.NilCheckableReadOnlyBalances {
panic("unimplemented")
}

View File

@@ -27,6 +27,13 @@ var defaultHotStateDBInterval primitives.Slot = 128
var populatePubkeyCacheOnce sync.Once
// NilCheckableReadOnlyBalances adds the IsNil method to ReadOnlyBalances
// to allow checking if the underlying state value is nil.
type NilCheckableReadOnlyBalances interface {
state.ReadOnlyBalances
IsNil() bool
}
// StateManager represents a management object that handles the internal
// logic of maintaining both hot and cold states in DB.
type StateManager interface {
@@ -43,6 +50,7 @@ type StateManager interface {
ActiveNonSlashedBalancesByRoot(context.Context, [32]byte) ([]uint64, error)
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState
StateByRootInitialSync(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
FinalizedReadOnlyBalances() NilCheckableReadOnlyBalances
}
// State is a concrete implementation of StateManager.
@@ -201,3 +209,8 @@ func (s *State) FinalizedState() state.BeaconState {
defer s.finalizedInfo.lock.RUnlock()
return s.finalizedInfo.state.Copy()
}
// Returns the finalized state as a ReadOnlyBalances so that it can be used read-only without copying.
func (s *State) FinalizedReadOnlyBalances() NilCheckableReadOnlyBalances {
return s.finalizedInfo.state
}

View File

@@ -185,7 +185,7 @@ func (s *Service) validatorsCustodyRequirement() (uint64, error) {
}
// Retrieve the finalized state.
finalizedState := s.cfg.stateGen.FinalizedState()
finalizedState := s.cfg.stateGen.FinalizedReadOnlyBalances()
if finalizedState == nil || finalizedState.IsNil() {
return 0, nilFinalizedStateError
}

View File

@@ -268,10 +268,23 @@ func (s *Service) validateCommitteeIndexAndCount(
a eth.Att,
bs state.ReadOnlyBeaconState,
) (primitives.CommitteeIndex, uint64, pubsub.ValidationResult, error) {
// - [REJECT] attestation.data.index == 0
if a.Version() >= version.Electra && a.GetData().CommitteeIndex != 0 {
return 0, 0, pubsub.ValidationReject, errors.New("attestation data's committee index must be 0")
// Validate committee index based on fork.
if a.Version() >= version.Electra {
data := a.GetData()
attEpoch := slots.ToEpoch(data.Slot)
postGloas := attEpoch >= params.BeaconConfig().GloasForkEpoch
if postGloas {
if result, err := s.validateGloasCommitteeIndex(data); result != pubsub.ValidationAccept {
return 0, 0, result, err
}
} else {
// [REJECT] attestation.data.index == 0 (New in Electra, removed in Gloas)
if data.CommitteeIndex != 0 {
return 0, 0, pubsub.ValidationReject, errors.New("attestation data's committee index must be 0")
}
}
}
valCount, err := helpers.ActiveValidatorCount(ctx, bs, slots.ToEpoch(a.GetData().Slot))
if err != nil {
return 0, 0, pubsub.ValidationIgnore, err
@@ -356,6 +369,29 @@ func validateAttestingIndex(
return pubsub.ValidationAccept, nil
}
// validateGloasCommitteeIndex validates committee index rules for Gloas fork.
// [REJECT] attestation.data.index < 2. (New in Gloas)
// [REJECT] attestation.data.index == 0 if block.slot == attestation.data.slot. (New in Gloas)
func (s *Service) validateGloasCommitteeIndex(data *eth.AttestationData) (pubsub.ValidationResult, error) {
if data.CommitteeIndex >= 2 {
return pubsub.ValidationReject, errors.New("attestation data's committee index must be < 2")
}
// Same-slot attestations must use committee index 0
if data.CommitteeIndex != 0 {
blockRoot := bytesutil.ToBytes32(data.BeaconBlockRoot)
slot, err := s.cfg.chain.RecentBlockSlot(blockRoot)
if err != nil {
return pubsub.ValidationIgnore, err
}
if slot == data.Slot {
return pubsub.ValidationReject, errors.New("same slot attestations must use committee index 0")
}
}
return pubsub.ValidationAccept, nil
}
// generateUnaggregatedAttCacheKey generates the cache key for unaggregated attestation tracking.
func generateUnaggregatedAttCacheKey(att eth.Att) (string, error) {
var attester uint64

View File

@@ -684,3 +684,75 @@ func Test_validateCommitteeIndexAndCount_Boundary(t *testing.T) {
require.ErrorContains(t, "committee index", err)
require.Equal(t, pubsub.ValidationReject, res)
}
func Test_validateGloasCommitteeIndex(t *testing.T) {
tests := []struct {
name string
committeeIndex primitives.CommitteeIndex
attestationSlot primitives.Slot
blockSlot primitives.Slot
wantResult pubsub.ValidationResult
wantErr string
}{
{
name: "committee index >= 2 should reject",
committeeIndex: 2,
attestationSlot: 10,
blockSlot: 10,
wantResult: pubsub.ValidationReject,
wantErr: "committee index must be < 2",
},
{
name: "committee index 0 should accept",
committeeIndex: 0,
attestationSlot: 10,
blockSlot: 10,
wantResult: pubsub.ValidationAccept,
wantErr: "",
},
{
name: "committee index 1 different-slot should accept",
committeeIndex: 1,
attestationSlot: 10,
blockSlot: 9,
wantResult: pubsub.ValidationAccept,
wantErr: "",
},
{
name: "committee index 1 same-slot should reject",
committeeIndex: 1,
attestationSlot: 10,
blockSlot: 10,
wantResult: pubsub.ValidationReject,
wantErr: "same slot attestations must use committee index 0",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockChain := &mockChain.ChainService{
BlockSlot: tt.blockSlot,
}
s := &Service{
cfg: &config{
chain: mockChain,
},
}
data := &ethpb.AttestationData{
Slot: tt.attestationSlot,
CommitteeIndex: tt.committeeIndex,
BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32),
}
result, err := s.validateGloasCommitteeIndex(data)
require.Equal(t, tt.wantResult, result)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -0,0 +1,2 @@
### Fixed
- Avoid copying the full finalized state every time we compute cgc.

View File

@@ -0,0 +1,2 @@
### Added
- TTL cache for aggregated attestations to prevent re-processing of recently deleted attestations.

View File

@@ -1,2 +0,0 @@
### Added
- Added an InsertPayload method to allow full node insertion after gloas.

View File

@@ -0,0 +1,3 @@
### Added
- Add gossip beacon attestation validation conditions for Gloas fork

View File

@@ -0,0 +1,2 @@
### Ignored
- Refactor ProcessExecutionPayload to ApplyExecutionPayload