Compare commits

..

1 Commits

Author SHA1 Message Date
james-prysm
0a49546598 payload attestation pool 2026-01-30 15:45:17 -06:00
15 changed files with 606 additions and 294 deletions

View File

@@ -0,0 +1,32 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["pool.go"],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/payloadattestation",
visibility = ["//visibility:public"],
deps = [
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["pool_test.go"],
embed = [":go_default_library"],
deps = [
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

View File

@@ -0,0 +1,179 @@
package payloadattestation
import (
"sync"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/crypto/hash"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
var errNilPayloadAttestationMessage = errors.New("nil payload attestation message")
// PoolManager maintains pending payload attestations.
// This pool is used by proposers to insert payload attestations into new blocks.
type PoolManager interface {
// PendingPayloadAttestations returns all pending aggregated payload attestations.
// If a slot is provided, only attestations for that slot are returned.
PendingPayloadAttestations(slot ...primitives.Slot) []*ethpb.PayloadAttestation
// InsertPayloadAttestation inserts or aggregates a payload attestation
// message into the pool. The idx parameter is the PTC committee index
// of the validator (position in the bitvector).
InsertPayloadAttestation(msg *ethpb.PayloadAttestationMessage, idx uint64) error
// Seen returns true if the PTC committee index has already been seen
// for the given PayloadAttestationData.
Seen(data *ethpb.PayloadAttestationData, idx uint64) bool
// MarkIncluded removes the attestation matching the given data from the pool.
MarkIncluded(att *ethpb.PayloadAttestation)
}
// Pool is a concrete implementation of PoolManager.
// Keyed by hash of PayloadAttestationData; stores aggregated PayloadAttestation.
type Pool struct {
lock sync.RWMutex
pending map[[32]byte]*ethpb.PayloadAttestation
}
// NewPool returns an initialized pool.
func NewPool() *Pool {
return &Pool{
pending: make(map[[32]byte]*ethpb.PayloadAttestation),
}
}
// PendingPayloadAttestations returns all pending payload attestations.
// If a slot filter is provided, only attestations for that slot are returned.
func (p *Pool) PendingPayloadAttestations(slot ...primitives.Slot) []*ethpb.PayloadAttestation {
p.lock.RLock()
defer p.lock.RUnlock()
result := make([]*ethpb.PayloadAttestation, 0, len(p.pending))
for _, att := range p.pending {
if len(slot) > 0 && att.Data.Slot != slot[0] {
continue
}
result = append(result, att)
}
return result
}
// InsertPayloadAttestation inserts a payload attestation message into the pool,
// aggregating it with any existing attestation that shares the same PayloadAttestationData.
// The idx parameter is the PTC committee index used to set the aggregation bit.
func (p *Pool) InsertPayloadAttestation(msg *ethpb.PayloadAttestationMessage, idx uint64) error {
if msg == nil || msg.Data == nil {
return errNilPayloadAttestationMessage
}
key, err := dataKey(msg.Data)
if err != nil {
return errors.Wrap(err, "could not compute data key")
}
p.lock.Lock()
defer p.lock.Unlock()
existing, ok := p.pending[key]
if !ok {
p.pending[key] = messageToPayloadAttestation(msg, idx)
return nil
}
if existing.AggregationBits.BitAt(idx) {
return nil
}
sig, err := aggregateSigFromMessage(existing, msg)
if err != nil {
return errors.Wrap(err, "could not aggregate signatures")
}
existing.Signature = sig
existing.AggregationBits.SetBitAt(idx, true)
return nil
}
// Seen returns true if the PTC committee index has already been seen
// for the given PayloadAttestationData.
func (p *Pool) Seen(data *ethpb.PayloadAttestationData, idx uint64) bool {
if data == nil {
return false
}
key, err := dataKey(data)
if err != nil {
return false
}
p.lock.RLock()
defer p.lock.RUnlock()
existing, ok := p.pending[key]
if !ok {
return false
}
return existing.AggregationBits.BitAt(idx)
}
// MarkIncluded removes the attestation with matching data from the pool.
func (p *Pool) MarkIncluded(att *ethpb.PayloadAttestation) {
if att == nil || att.Data == nil {
return
}
key, err := dataKey(att.Data)
if err != nil {
return
}
p.lock.Lock()
defer p.lock.Unlock()
delete(p.pending, key)
}
// messageToPayloadAttestation creates a PayloadAttestation with a single
// aggregated bit from the passed PayloadAttestationMessage.
func messageToPayloadAttestation(msg *ethpb.PayloadAttestationMessage, idx uint64) *ethpb.PayloadAttestation {
bits := bitfield.NewBitvector512()
bits.SetBitAt(idx, true)
data := &ethpb.PayloadAttestationData{
BeaconBlockRoot: bytesutil.SafeCopyBytes(msg.Data.BeaconBlockRoot),
Slot: msg.Data.Slot,
PayloadPresent: msg.Data.PayloadPresent,
BlobDataAvailable: msg.Data.BlobDataAvailable,
}
return &ethpb.PayloadAttestation{
AggregationBits: bits,
Data: data,
Signature: bytesutil.SafeCopyBytes(msg.Signature),
}
}
// aggregateSigFromMessage returns the aggregated signature by combining the
// existing aggregated signature with the message's signature.
func aggregateSigFromMessage(aggregated *ethpb.PayloadAttestation, message *ethpb.PayloadAttestationMessage) ([]byte, error) {
aggSig, err := bls.SignatureFromBytesNoValidation(aggregated.Signature)
if err != nil {
return nil, err
}
sig, err := bls.SignatureFromBytesNoValidation(message.Signature)
if err != nil {
return nil, err
}
return bls.AggregateSignatures([]bls.Signature{aggSig, sig}).Marshal(), nil
}
// dataKey computes a deterministic key for PayloadAttestationData
// by hashing its serialized form.
func dataKey(data *ethpb.PayloadAttestationData) ([32]byte, error) {
enc, err := proto.Marshal(data)
if err != nil {
return [32]byte{}, err
}
return hash.Hash(enc), nil
}

View File

@@ -0,0 +1,291 @@
package payloadattestation
import (
"testing"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestPool_PendingPayloadAttestations(t *testing.T) {
t.Run("empty pool", func(t *testing.T) {
pool := NewPool()
atts := pool.PendingPayloadAttestations()
assert.Equal(t, 0, len(atts))
})
t.Run("non-empty pool", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 2,
PayloadPresent: false,
BlobDataAvailable: true,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 0))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 1))
atts := pool.PendingPayloadAttestations()
assert.Equal(t, 2, len(atts))
})
t.Run("filter by slot", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 2,
PayloadPresent: false,
BlobDataAvailable: true,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 0))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 1))
atts := pool.PendingPayloadAttestations(primitives.Slot(1))
assert.Equal(t, 1, len(atts))
assert.Equal(t, primitives.Slot(1), atts[0].Data.Slot)
atts = pool.PendingPayloadAttestations(primitives.Slot(2))
assert.Equal(t, 1, len(atts))
assert.Equal(t, primitives.Slot(2), atts[0].Data.Slot)
atts = pool.PendingPayloadAttestations(primitives.Slot(99))
assert.Equal(t, 0, len(atts))
})
}
func TestPool_InsertPayloadAttestation(t *testing.T) {
t.Run("nil message", func(t *testing.T) {
pool := NewPool()
err := pool.InsertPayloadAttestation(nil, 0)
require.ErrorContains(t, "nil payload attestation message", err)
})
t.Run("nil data", func(t *testing.T) {
pool := NewPool()
err := pool.InsertPayloadAttestation(&ethpb.PayloadAttestationMessage{}, 0)
require.ErrorContains(t, "nil payload attestation message", err)
})
t.Run("insert creates new entry with correct aggregation bit", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
idx := uint64(5)
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, idx))
atts := pool.PendingPayloadAttestations()
require.Equal(t, 1, len(atts))
assert.Equal(t, true, atts[0].AggregationBits.BitAt(idx))
assert.Equal(t, false, atts[0].AggregationBits.BitAt(idx+1))
})
t.Run("duplicate index is no-op", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
idx := uint64(3)
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, idx))
firstSig := bytesutil.SafeCopyBytes(pool.PendingPayloadAttestations()[0].Signature)
require.NoError(t, pool.InsertPayloadAttestation(msg, idx))
atts := pool.PendingPayloadAttestations()
require.Equal(t, 1, len(atts))
assert.DeepEqual(t, firstSig, atts[0].Signature)
})
t.Run("aggregates different indices", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
root := make([]byte, 32)
root[0] = 'r'
data := &ethpb.PayloadAttestationData{
BeaconBlockRoot: root,
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
}
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: data,
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: data,
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 5))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 7))
atts := pool.PendingPayloadAttestations()
require.Equal(t, 1, len(atts))
assert.Equal(t, true, atts[0].AggregationBits.BitAt(5))
assert.Equal(t, true, atts[0].AggregationBits.BitAt(7))
assert.Equal(t, false, atts[0].AggregationBits.BitAt(6))
})
t.Run("different data creates separate entries", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: false, // different
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 0))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 1))
atts := pool.PendingPayloadAttestations()
assert.Equal(t, 2, len(atts))
})
}
func TestPool_Seen(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
data := &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
}
assert.Equal(t, false, pool.Seen(data, 5))
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: data,
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, 5))
assert.Equal(t, true, pool.Seen(data, 5))
assert.Equal(t, false, pool.Seen(data, 6))
assert.Equal(t, false, pool.Seen(nil, 5))
}
func TestPool_MarkIncluded(t *testing.T) {
t.Run("mark included removes from pool", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, 0))
assert.Equal(t, 1, len(pool.PendingPayloadAttestations()))
pool.MarkIncluded(&ethpb.PayloadAttestation{
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
})
assert.Equal(t, 0, len(pool.PendingPayloadAttestations()))
})
t.Run("mark included with non-matching data does nothing", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, 0))
pool.MarkIncluded(&ethpb.PayloadAttestation{
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 999,
PayloadPresent: true,
BlobDataAvailable: false,
},
})
assert.Equal(t, 1, len(pool.PendingPayloadAttestations()))
})
t.Run("mark included with nil is safe", func(t *testing.T) {
pool := NewPool()
pool.MarkIncluded(nil)
pool.MarkIncluded(&ethpb.PayloadAttestation{})
})
}

View File

@@ -1,3 +0,0 @@
### Ignored
- adding some short retries for some end to end evaluators in an attempt to deflake tests.

View File

@@ -1,3 +0,0 @@
### Added
- Added README for maintaining specrefs.

View File

@@ -1,3 +0,0 @@
### Added
- The ability to download the nightly reference tests from a specific day.

View File

@@ -1,35 +0,0 @@
# Specification References
This directory contains specification reference tracking files managed by
[ethspecify](https://github.com/jtraglia/ethspecify).
## Installation
Install `ethspecify` with the following command:
```bash
pipx install ethspecify
```
> [!NOTE]
> You can run `ethspecify <cmd>` in the `specrefs` directory or
> `ethspecify <cmd> --path=specrefs` from the project's root directory.
## Maintenance
When adding support for a new specification version, follow these steps:
0. Change directory into the `specrefs` directory.
1. Update the version in `.ethspecify.yml` configuration.
2. Run `ethspecify process` to update/populate specrefs.
3. Run `ethspecify check` to check specrefs.
4. If there are errors, use the error message as a guide to fix the issue. If
there are new specrefs with empty sources, implement/locate each item and
update each specref source list. If you choose not to implement an item,
add an exception to the appropriate section the the `.ethspecify.yml`
configuration.
5. Repeat steps 3 and 4 until `ethspecify check` passes.
6. Run `git diff` to view updated specrefs. If an object/function/etc has
changed, make the necessary updates to the implementation.
7. Lastly, in the project's root directory, run `act -j check-specrefs` to
ensure everything is correct.

View File

@@ -40,7 +40,6 @@ type TransactionGenerator struct {
cancel context.CancelFunc
paused bool
useLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
blobTxCount int // Number of blob transactions per slot (0 means default of 5)
}
func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
@@ -49,8 +48,8 @@ func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
return &os.Process{}
}
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool, blobTxCount int) *TransactionGenerator {
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs, blobTxCount: blobTxCount}
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool) *TransactionGenerator {
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs}
}
func (t *TransactionGenerator) Start(ctx context.Context) error {
@@ -115,7 +114,7 @@ func (t *TransactionGenerator) Start(ctx context.Context) error {
continue
}
backend := ethclient.NewClient(client)
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs, t.blobTxCount)
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs)
if err != nil {
return err
}
@@ -129,7 +128,7 @@ func (s *TransactionGenerator) Started() <-chan struct{} {
return s.started
}
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool, blobTxCount int) error {
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool) error {
sender := common.HexToAddress(addr)
nonce, err := backend.PendingNonceAt(context.Background(), fundedAccount.Address)
if err != nil {
@@ -151,19 +150,14 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
clock := startup.NewClock(e2e.TestParams.CLGenesisTime, [32]byte{})
isPostFulu := clock.CurrentEpoch() >= params.BeaconConfig().FuluForkEpoch
// Default to 5 blob transactions per slot if not configured.
numBlobTxs := blobTxCount
if numBlobTxs <= 0 {
numBlobTxs = 5
}
g, _ := errgroup.WithContext(context.Background())
txs := make([]*types.Transaction, numBlobTxs)
txs := make([]*types.Transaction, 10)
// Send blob transactions - use different versions pre/post Fulu
if isPostFulu {
logrus.Info("Sending blob transactions with cell proofs")
for index := range uint64(numBlobTxs) {
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
for index := range uint64(5) {
g.Go(func() error {
tx, err := RandomBlobCellTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)
@@ -182,7 +176,8 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
}
} else {
logrus.Info("Sending blob transactions with sidecars")
for index := range uint64(numBlobTxs) {
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
for index := range uint64(5) {
g.Go(func() error {
tx, err := RandomBlobTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)

View File

@@ -225,9 +225,9 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{r.depositor}); err != nil {
return errors.Wrap(err, "testDepositsAndTx unable to run, depositor did not Start")
}
go func() {
if r.config.TestDeposits {
log.Info("Running deposit tests")
go func() {
if r.config.TestDeposits {
log.Info("Running deposit tests")
// The validators with an index < minGenesisActiveCount all have deposits already from the chain start.
// Skip all of those chain start validators by seeking to minGenesisActiveCount in the validator list
// for further deposit testing.
@@ -238,12 +238,12 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
r.t.Error(errors.Wrap(err, "depositor.SendAndMine failed"))
}
}
}
// Only generate background transactions when relevant for the test.
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
}
}()
}
// Only generate background transactions when relevant for the test.
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
}
}()
if r.config.TestDeposits {
return depositCheckValidator.Start(ctx)
}
@@ -252,7 +252,7 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
}
func (r *testRunner) testTxGeneration(ctx context.Context, g *errgroup.Group, keystorePath string, requiredNodes []e2etypes.ComponentRunner) {
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs, r.config.BlobTxCount)
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs)
r.comHandler.txGen = txGenerator
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, requiredNodes); err != nil {

View File

@@ -156,9 +156,19 @@ func waitForMidEpoch(conn *grpc.ClientConn) error {
}
}
// getHeadEpochs fetches the head epoch from all beacon nodes concurrently.
func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
epochs := make([]primitives.Epoch, len(conns))
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
// Wait until we're at least halfway into the epoch to avoid race conditions
// at epoch boundaries where nodes may report different epochs.
if err := waitForMidEpoch(conns[0]); err != nil {
return errors.Wrap(err, "failed waiting for mid-epoch")
}
headEpochs := make([]primitives.Epoch, len(conns))
headBlockRoots := make([][]byte, len(conns))
justifiedRoots := make([][]byte, len(conns))
prevJustifiedRoots := make([][]byte, len(conns))
finalizedRoots := make([][]byte, len(conns))
chainHeads := make([]*eth.ChainHead, len(conns))
g, _ := errgroup.WithContext(context.Background())
for i, conn := range conns {
@@ -170,145 +180,63 @@ func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
if err != nil {
return errors.Wrapf(err, "connection number=%d", conIdx)
}
epochs[conIdx] = chainHead.HeadEpoch
headEpochs[conIdx] = chainHead.HeadEpoch
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
chainHeads[conIdx] = chainHead
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
return err
}
return epochs, nil
}
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
// Wait until we're at least halfway into the epoch to avoid race conditions
// at epoch boundaries where nodes may report different epochs.
if err := waitForMidEpoch(conns[0]); err != nil {
return errors.Wrap(err, "failed waiting for mid-epoch")
}
// First, wait for all nodes to reach the same epoch. Sync nodes may be
// behind and need time to catch up. We poll every 2 seconds with a
// 60 second timeout - this adapts to actual sync progress rather than
// using fixed delays.
const epochTimeout = 60 * time.Second
const epochPollInterval = 2 * time.Second
epochDeadline := time.Now().Add(epochTimeout)
for time.Now().Before(epochDeadline) {
epochs, err := getHeadEpochs(conns)
if err != nil {
return err
}
allSame := true
for i := 1; i < len(epochs); i++ {
if epochs[0] != epochs[i] {
allSame = false
break
}
}
if allSame {
break
}
time.Sleep(epochPollInterval)
}
// Now that epochs match (or timeout reached), do detailed head comparison
// with a few retries to handle block propagation delays.
const maxRetries = 5
const retryDelay = 1 * time.Second
var lastErr error
for attempt := range maxRetries {
if attempt > 0 {
time.Sleep(retryDelay)
}
headEpochs := make([]primitives.Epoch, len(conns))
headBlockRoots := make([][]byte, len(conns))
justifiedRoots := make([][]byte, len(conns))
prevJustifiedRoots := make([][]byte, len(conns))
finalizedRoots := make([][]byte, len(conns))
chainHeads := make([]*eth.ChainHead, len(conns))
g, _ := errgroup.WithContext(context.Background())
for i, conn := range conns {
conIdx := i
currConn := conn
g.Go(func() error {
beaconClient := eth.NewBeaconChainClient(currConn)
chainHead, err := beaconClient.GetChainHead(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrapf(err, "connection number=%d", conIdx)
}
headEpochs[conIdx] = chainHead.HeadEpoch
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
chainHeads[conIdx] = chainHead
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
lastErr = nil
for i := range conns {
if headEpochs[0] != headEpochs[i] {
lastErr = fmt.Errorf(
"received conflicting head epochs on node %d, expected %d, received %d",
i,
headEpochs[0],
headEpochs[i],
)
break
}
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting head block roots on node %d, expected %#x, received %#x",
i,
headBlockRoots[0],
headBlockRoots[i],
)
break
}
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
i,
justifiedRoots[0],
justifiedRoots[i],
chainHeads[0].String(),
chainHeads[i].String(),
)
break
}
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
i,
prevJustifiedRoots[0],
prevJustifiedRoots[i],
)
break
}
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
i,
finalizedRoots[0],
finalizedRoots[i],
)
break
}
}
if lastErr == nil {
return nil
}
}
return lastErr
for i := range conns {
if headEpochs[0] != headEpochs[i] {
return fmt.Errorf(
"received conflicting head epochs on node %d, expected %d, received %d",
i,
headEpochs[0],
headEpochs[i],
)
}
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
return fmt.Errorf(
"received conflicting head block roots on node %d, expected %#x, received %#x",
i,
headBlockRoots[0],
headBlockRoots[i],
)
}
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
return fmt.Errorf(
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
i,
justifiedRoots[0],
justifiedRoots[i],
chainHeads[0].String(),
chainHeads[i].String(),
)
}
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
return fmt.Errorf(
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
i,
prevJustifiedRoots[0],
prevJustifiedRoots[i],
)
}
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
return fmt.Errorf(
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
i,
finalizedRoots[0],
finalizedRoots[i],
)
}
}
return nil
}

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/altair"
@@ -124,25 +123,6 @@ func validatorsAreActive(ec *types.EvaluationContext, conns ...*grpc.ClientConn)
// validatorsParticipating ensures the validators have an acceptable participation rate.
func validatorsParticipating(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
// Retry up to 3 times with 2 second delays to handle timing flakes where
// attestations haven't been fully processed yet due to block propagation delays.
const maxRetries = 3
const retryDelay = 2 * time.Second
var lastErr error
for attempt := range maxRetries {
if attempt > 0 {
time.Sleep(retryDelay)
}
lastErr = checkValidatorsParticipating(conns)
if lastErr == nil {
return nil
}
}
return lastErr
}
func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewBeaconChainClient(conn)
validatorRequest := &ethpb.GetValidatorParticipationRequest{}
@@ -254,25 +234,6 @@ func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
// validatorsSyncParticipation ensures the validators have an acceptable participation rate for
// sync committee assignments.
func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
// Retry up to 3 times with 2 second delays to handle timing flakes where
// sync committee messages haven't fully propagated yet.
const maxRetries = 3
const retryDelay = 2 * time.Second
var lastErr error
for attempt := range maxRetries {
if attempt > 0 {
time.Sleep(retryDelay)
}
lastErr = checkSyncParticipation(conns)
if lastErr == nil {
return nil
}
}
return lastErr
}
func checkSyncParticipation(conns []*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewNodeClient(conn)
altairClient := ethpb.NewBeaconChainClient(conn)
@@ -311,9 +272,9 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
// Skip fork slot.
continue
}
// Skip early slots at genesis - validators need time to ramp up after chain start
// Skip slots 1-2 at genesis - validators need time to ramp up after chain start
// due to doppelganger protection. This is a startup timing issue, not a fork transition issue.
if b.Block().Slot() < 5 {
if b.Block().Slot() < 3 {
continue
}
expectedParticipation := expectedSyncParticipation
@@ -328,11 +289,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
if err != nil {
return err
}
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
// where the proposer didn't receive sync committee contributions in time.
if syncAgg.SyncCommitteeBits.Count() == 0 {
continue
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
@@ -387,11 +343,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
if err != nil {
return err
}
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
// where the proposer didn't receive sync committee contributions in time.
if syncAgg.SyncCommitteeBits.Count() == 0 {
continue
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())

View File

@@ -9,11 +9,11 @@ import (
)
func TestEndToEnd_MinimalConfig_WithBuilder(t *testing.T) {
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithBlobTxCount(2))
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder())
r.run()
}
func TestEndToEnd_MinimalConfig_WithBuilder_ValidatorRESTApi(t *testing.T) {
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi(), types.WithBlobTxCount(2))
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi())
r.run()
}

View File

@@ -68,14 +68,6 @@ func WithLargeBlobs() E2EConfigOpt {
}
}
// WithBlobTxCount sets the number of blob transactions sent per slot.
// Default is 5 when not specified.
func WithBlobTxCount(n int) E2EConfigOpt {
return func(cfg *E2EConfig) {
cfg.BlobTxCount = n
}
}
func WithSSZOnly() E2EConfigOpt {
return func(cfg *E2EConfig) {
if err := os.Setenv(params.EnvNameOverrideAccept, api.OctetStreamMediaType); err != nil {
@@ -116,7 +108,6 @@ type E2EConfig struct {
UseBeaconRestApi bool
UseBuilder bool
UseLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
BlobTxCount int // Number of blob transactions per slot (0 means default of 5)
EpochsToRun uint64
ExitEpoch primitives.Epoch // Custom epoch for voluntary exit submission (0 means use default)
Seed int64

View File

@@ -21,14 +21,10 @@ There are tests for mainnet and minimal config, so for each config we will add a
## Running nightly spectests
Since [PR 15312](https://github.com/OffchainLabs/prysm/pull/15312), Prysm has support to download "nightly" spectests from github via a starlark rule configuration by environment variable.
Set `--repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly` or `--repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly-<run_id>` when running spectest to download the "nightly" spectests.
Note: A GITHUB_TOKEN environment variable is required to be set. The github token does not need to be associated with your main account; it can be from a "burner account". And the token does not need to be a fine-grained token; it can be a classic token.
Since [PR 15312](https://github.com/OffchainLabs/prysm/pull/15312), Prysm has support to download "nightly" spectests from github via a starlark rule configuration by environment variable.
Set `--repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly` when running spectest to download the "nightly" spectests.
Note: A GITHUB_TOKEN environment variable is required to be set. The github token must be a [fine grained token](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens#creating-a-fine-grained-personal-access-token).
```
bazel test //... --test_tag_filters=spectest --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly
```
```
bazel test //... --test_tag_filters=spectest --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly-21422848633
```

View File

@@ -1,6 +1,5 @@
# bazel build @consensus_spec_tests//:test_data
# bazel build @consensus_spec_tests//:test_data --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly
# bazel build @consensus_spec_tests//:test_data --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly-<run_id>
def _get_redirected_url(repository_ctx, url, headers):
if not repository_ctx.which("curl"):
@@ -25,7 +24,7 @@ def _impl(repository_ctx):
version = repository_ctx.getenv("CONSENSUS_SPEC_TESTS_VERSION") or repository_ctx.attr.version
token = repository_ctx.getenv("GITHUB_TOKEN") or ""
if version == "nightly" or version.startswith("nightly-"):
if version == "nightly":
print("Downloading nightly tests")
if not token:
fail("Error GITHUB_TOKEN is not set")
@@ -35,22 +34,16 @@ def _impl(repository_ctx):
"Accept": "application/vnd.github+json",
}
if version.startswith("nightly-"):
run_id = version.split("nightly-", 1)[1]
if not run_id:
fail("Error invalid run id")
else:
repository_ctx.download(
"https://api.github.com/repos/%s/actions/workflows/%s/runs?branch=%s&status=success&per_page=1"
% (repository_ctx.attr.repo, repository_ctx.attr.workflow, repository_ctx.attr.branch),
headers = headers,
output = "runs.json"
)
repository_ctx.download(
"https://api.github.com/repos/%s/actions/workflows/%s/runs?branch=%s&status=success&per_page=1"
% (repository_ctx.attr.repo, repository_ctx.attr.workflow, repository_ctx.attr.branch),
headers = headers,
output = "runs.json"
)
run_id = json.decode(repository_ctx.read("runs.json"))["workflow_runs"][0]["id"]
repository_ctx.delete("runs.json")
run_id = json.decode(repository_ctx.read("runs.json"))["workflow_runs"][0]["id"]
repository_ctx.delete("runs.json")
print("Run id:", run_id)
repository_ctx.download(
"https://api.github.com/repos/%s/actions/runs/%s/artifacts"
% (repository_ctx.attr.repo, run_id),
@@ -115,8 +108,8 @@ consensus_spec_tests = repository_rule(
"version": attr.string(mandatory = True),
"flavors": attr.string_dict(mandatory = True),
"repo": attr.string(default = "ethereum/consensus-specs"),
"workflow": attr.string(default = "nightly-reftests.yml"),
"branch": attr.string(default = "master"),
"workflow": attr.string(default = "generate_vectors.yml"),
"branch": attr.string(default = "dev"),
"release_url_template": attr.string(default = "https://github.com/ethereum/consensus-specs/releases/download/%s"),
},
)