mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-04 10:05:17 -05:00
Compare commits
1 Commits
deflake-ev
...
gloas-even
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3454853b5 |
@@ -540,6 +540,12 @@ type PayloadAttestation struct {
|
||||
Signature string `json:"signature"`
|
||||
}
|
||||
|
||||
type PayloadAttestationMessage struct {
|
||||
ValidatorIndex string `json:"validator_index"`
|
||||
Data *PayloadAttestationData `json:"data"`
|
||||
Signature string `json:"signature"`
|
||||
}
|
||||
|
||||
type BeaconBlockBodyGloas struct {
|
||||
RandaoReveal string `json:"randao_reveal"`
|
||||
Eth1Data *Eth1Data `json:"eth1_data"`
|
||||
|
||||
@@ -2971,6 +2971,14 @@ func PayloadAttestationDataFromConsensus(d *eth.PayloadAttestationData) *Payload
|
||||
}
|
||||
}
|
||||
|
||||
func PayloadAttestationMessageFromConsensus(m *eth.PayloadAttestationMessage) *PayloadAttestationMessage {
|
||||
return &PayloadAttestationMessage{
|
||||
ValidatorIndex: fmt.Sprintf("%d", m.ValidatorIndex),
|
||||
Data: PayloadAttestationDataFromConsensus(m.Data),
|
||||
Signature: hexutil.Encode(m.Signature),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *SignedBeaconBlockGloas) ToConsensus() (*eth.SignedBeaconBlockGloas, error) {
|
||||
if b == nil {
|
||||
return nil, errNilValue
|
||||
|
||||
@@ -112,3 +112,8 @@ type LightClientOptimisticUpdateEvent struct {
|
||||
Version string `json:"version"`
|
||||
Data *LightClientOptimisticUpdate `json:"data"`
|
||||
}
|
||||
|
||||
type ExecutionPayloadAvailableEvent struct {
|
||||
Slot string `json:"slot"`
|
||||
BlockRoot string `json:"block_root"`
|
||||
}
|
||||
|
||||
@@ -46,6 +46,14 @@ const (
|
||||
|
||||
// DataColumnReceived is sent after a data column has been seen after gossip validation rules.
|
||||
DataColumnReceived = 12
|
||||
|
||||
// ExecutionPayloadBidReceived is sent after a signed execution payload bid is received from gossip or API
|
||||
// that passes gossip validation on the execution_payload_bid topic.
|
||||
ExecutionPayloadBidReceived = 13
|
||||
|
||||
// PayloadAttestationMessageReceived is sent after a payload attestation message is received
|
||||
// that passes validation rules of the payload_attestation_message topic.
|
||||
PayloadAttestationMessageReceived = 14
|
||||
)
|
||||
|
||||
// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
|
||||
@@ -114,3 +122,13 @@ type DataColumnReceivedData struct {
|
||||
BlockRoot [32]byte
|
||||
KzgCommitments [][]byte
|
||||
}
|
||||
|
||||
// ExecutionPayloadBidReceivedData is the data sent with ExecutionPayloadBidReceived events.
|
||||
type ExecutionPayloadBidReceivedData struct {
|
||||
SignedBid *ethpb.SignedExecutionPayloadBid
|
||||
}
|
||||
|
||||
// PayloadAttestationMessageReceivedData is the data sent with PayloadAttestationMessageReceived events.
|
||||
type PayloadAttestationMessageReceivedData struct {
|
||||
PayloadAttestationMessage *ethpb.PayloadAttestationMessage
|
||||
}
|
||||
|
||||
@@ -33,6 +33,9 @@ const (
|
||||
LightClientOptimisticUpdate
|
||||
// PayloadAttributes events are fired upon a missed slot or new head.
|
||||
PayloadAttributes
|
||||
// ExecutionPayloadAvailable is sent when the node has verified that the execution payload
|
||||
// and blobs for a block are available and ready for payload attestation.
|
||||
ExecutionPayloadAvailable
|
||||
)
|
||||
|
||||
// BlockProcessedData is the data sent with BlockProcessed events.
|
||||
@@ -72,3 +75,11 @@ type InitializedData struct {
|
||||
// GenesisValidatorsRoot represents state.validators.HashTreeRoot().
|
||||
GenesisValidatorsRoot []byte
|
||||
}
|
||||
|
||||
// ExecutionPayloadAvailableData is the data sent with ExecutionPayloadAvailable events.
|
||||
type ExecutionPayloadAvailableData struct {
|
||||
// Slot is the slot of the block whose execution payload became available.
|
||||
Slot primitives.Slot
|
||||
// BlockRoot is the root of the block whose execution payload became available.
|
||||
BlockRoot [32]byte
|
||||
}
|
||||
|
||||
@@ -74,6 +74,12 @@ const (
|
||||
LightClientOptimisticUpdateTopic = "light_client_optimistic_update"
|
||||
// DataColumnTopic represents a data column sidecar event topic
|
||||
DataColumnTopic = "data_column_sidecar"
|
||||
// ExecutionPayloadAvailableTopic represents an event indicating execution payload and blobs are available.
|
||||
ExecutionPayloadAvailableTopic = "execution_payload_available"
|
||||
// ExecutionPayloadBidTopic represents an event for a signed execution payload bid passing gossip validation.
|
||||
ExecutionPayloadBidTopic = "execution_payload_bid"
|
||||
// PayloadAttestationMessageTopic represents an event for a payload attestation message passing validation.
|
||||
PayloadAttestationMessageTopic = "payload_attestation_message"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -108,6 +114,8 @@ var opsFeedEventTopics = map[feed.EventType]string{
|
||||
operation.ProposerSlashingReceived: ProposerSlashingTopic,
|
||||
operation.BlockGossipReceived: BlockGossipTopic,
|
||||
operation.DataColumnReceived: DataColumnTopic,
|
||||
operation.ExecutionPayloadBidReceived: ExecutionPayloadBidTopic,
|
||||
operation.PayloadAttestationMessageReceived: PayloadAttestationMessageTopic,
|
||||
}
|
||||
|
||||
var stateFeedEventTopics = map[feed.EventType]string{
|
||||
@@ -118,6 +126,7 @@ var stateFeedEventTopics = map[feed.EventType]string{
|
||||
statefeed.Reorg: ChainReorgTopic,
|
||||
statefeed.BlockProcessed: BlockTopic,
|
||||
statefeed.PayloadAttributes: PayloadAttributesTopic,
|
||||
statefeed.ExecutionPayloadAvailable: ExecutionPayloadAvailableTopic,
|
||||
}
|
||||
|
||||
var topicsForStateFeed = topicsForFeed(stateFeedEventTopics)
|
||||
@@ -466,6 +475,12 @@ func topicForEvent(event *feed.Event) string {
|
||||
return PayloadAttributesTopic
|
||||
case *operation.DataColumnReceivedData:
|
||||
return DataColumnTopic
|
||||
case *operation.ExecutionPayloadBidReceivedData:
|
||||
return ExecutionPayloadBidTopic
|
||||
case *operation.PayloadAttestationMessageReceivedData:
|
||||
return PayloadAttestationMessageTopic
|
||||
case *statefeed.ExecutionPayloadAvailableData:
|
||||
return ExecutionPayloadAvailableTopic
|
||||
default:
|
||||
return InvalidTopic
|
||||
}
|
||||
@@ -638,6 +653,21 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
|
||||
}
|
||||
return jsonMarshalReader(eventName, blk)
|
||||
}, nil
|
||||
case *statefeed.ExecutionPayloadAvailableData:
|
||||
return func() io.Reader {
|
||||
return jsonMarshalReader(eventName, &structs.ExecutionPayloadAvailableEvent{
|
||||
Slot: fmt.Sprintf("%d", v.Slot),
|
||||
BlockRoot: hexutil.Encode(v.BlockRoot[:]),
|
||||
})
|
||||
}, nil
|
||||
case *operation.ExecutionPayloadBidReceivedData:
|
||||
return func() io.Reader {
|
||||
return jsonMarshalReader(eventName, structs.SignedExecutionPayloadBidFromConsensus(v.SignedBid))
|
||||
}, nil
|
||||
case *operation.PayloadAttestationMessageReceivedData:
|
||||
return func() io.Reader {
|
||||
return jsonMarshalReader(eventName, structs.PayloadAttestationMessageFromConsensus(v.PayloadAttestationMessage))
|
||||
}, nil
|
||||
default:
|
||||
return nil, errors.Wrapf(errUnhandledEventData, "event data type %T unsupported", v)
|
||||
}
|
||||
|
||||
@@ -123,6 +123,8 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
|
||||
ProposerSlashingTopic,
|
||||
BlockGossipTopic,
|
||||
DataColumnTopic,
|
||||
ExecutionPayloadBidTopic,
|
||||
PayloadAttestationMessageTopic,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(ð.BlobSidecar{}))
|
||||
@@ -312,6 +314,42 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
|
||||
KzgCommitments: [][]byte{{'a'}, {'b'}, {'c'}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: operation.ExecutionPayloadBidReceived,
|
||||
Data: &operation.ExecutionPayloadBidReceivedData{
|
||||
SignedBid: ð.SignedExecutionPayloadBid{
|
||||
Message: ð.ExecutionPayloadBid{
|
||||
ParentBlockHash: make([]byte, 32),
|
||||
ParentBlockRoot: make([]byte, 32),
|
||||
BlockHash: make([]byte, 32),
|
||||
PrevRandao: make([]byte, 32),
|
||||
FeeRecipient: make([]byte, 20),
|
||||
GasLimit: 30000000,
|
||||
BuilderIndex: 42,
|
||||
Slot: 10,
|
||||
Value: 1000000000,
|
||||
ExecutionPayment: 0,
|
||||
BlobKzgCommitmentsRoot: make([]byte, 32),
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: operation.PayloadAttestationMessageReceived,
|
||||
Data: &operation.PayloadAttestationMessageReceivedData{
|
||||
PayloadAttestationMessage: ð.PayloadAttestationMessage{
|
||||
ValidatorIndex: 123,
|
||||
Data: ð.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 10,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: true,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -393,6 +431,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
FinalizedCheckpointTopic,
|
||||
ChainReorgTopic,
|
||||
BlockTopic,
|
||||
ExecutionPayloadAvailableTopic,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
request := topics.testHttpRequest(testSync.ctx, t)
|
||||
@@ -445,6 +484,13 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
ExecutionOptimistic: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: statefeed.ExecutionPayloadAvailable,
|
||||
Data: &statefeed.ExecutionPayloadAvailableData{
|
||||
Slot: 10,
|
||||
BlockRoot: [32]byte{0x9a},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
go func() {
|
||||
@@ -721,7 +767,7 @@ func TestStuckReaderScenarios(t *testing.T) {
|
||||
|
||||
func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) {
|
||||
topics, events := operationEventsFixtures(t)
|
||||
require.Equal(t, 12, len(events))
|
||||
require.Equal(t, 14, len(events))
|
||||
|
||||
// set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader.
|
||||
stn := mockChain.NewEventFeedWrapper()
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
### Ignored
|
||||
|
||||
- adding some short retries for some end to end evaluators in an attempt to deflake tests.
|
||||
3
changelog/james-prysm_gloas-events.md
Normal file
3
changelog/james-prysm_gloas-events.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Added
|
||||
|
||||
- the following events available at gloas `execution_payload_available`, `execution_payload_bid`,and `payload_attestation_message`
|
||||
@@ -40,7 +40,6 @@ type TransactionGenerator struct {
|
||||
cancel context.CancelFunc
|
||||
paused bool
|
||||
useLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
|
||||
blobTxCount int // Number of blob transactions per slot (0 means default of 5)
|
||||
}
|
||||
|
||||
func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
|
||||
@@ -49,8 +48,8 @@ func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
|
||||
return &os.Process{}
|
||||
}
|
||||
|
||||
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool, blobTxCount int) *TransactionGenerator {
|
||||
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs, blobTxCount: blobTxCount}
|
||||
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool) *TransactionGenerator {
|
||||
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs}
|
||||
}
|
||||
|
||||
func (t *TransactionGenerator) Start(ctx context.Context) error {
|
||||
@@ -115,7 +114,7 @@ func (t *TransactionGenerator) Start(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
backend := ethclient.NewClient(client)
|
||||
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs, t.blobTxCount)
|
||||
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -129,7 +128,7 @@ func (s *TransactionGenerator) Started() <-chan struct{} {
|
||||
return s.started
|
||||
}
|
||||
|
||||
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool, blobTxCount int) error {
|
||||
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool) error {
|
||||
sender := common.HexToAddress(addr)
|
||||
nonce, err := backend.PendingNonceAt(context.Background(), fundedAccount.Address)
|
||||
if err != nil {
|
||||
@@ -151,19 +150,14 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
|
||||
clock := startup.NewClock(e2e.TestParams.CLGenesisTime, [32]byte{})
|
||||
isPostFulu := clock.CurrentEpoch() >= params.BeaconConfig().FuluForkEpoch
|
||||
|
||||
// Default to 5 blob transactions per slot if not configured.
|
||||
numBlobTxs := blobTxCount
|
||||
if numBlobTxs <= 0 {
|
||||
numBlobTxs = 5
|
||||
}
|
||||
|
||||
g, _ := errgroup.WithContext(context.Background())
|
||||
txs := make([]*types.Transaction, numBlobTxs)
|
||||
txs := make([]*types.Transaction, 10)
|
||||
|
||||
// Send blob transactions - use different versions pre/post Fulu
|
||||
if isPostFulu {
|
||||
logrus.Info("Sending blob transactions with cell proofs")
|
||||
for index := range uint64(numBlobTxs) {
|
||||
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
|
||||
for index := range uint64(5) {
|
||||
|
||||
g.Go(func() error {
|
||||
tx, err := RandomBlobCellTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)
|
||||
@@ -182,7 +176,8 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
|
||||
}
|
||||
} else {
|
||||
logrus.Info("Sending blob transactions with sidecars")
|
||||
for index := range uint64(numBlobTxs) {
|
||||
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
|
||||
for index := range uint64(5) {
|
||||
|
||||
g.Go(func() error {
|
||||
tx, err := RandomBlobTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)
|
||||
|
||||
@@ -252,7 +252,7 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
||||
}
|
||||
|
||||
func (r *testRunner) testTxGeneration(ctx context.Context, g *errgroup.Group, keystorePath string, requiredNodes []e2etypes.ComponentRunner) {
|
||||
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs, r.config.BlobTxCount)
|
||||
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs)
|
||||
r.comHandler.txGen = txGenerator
|
||||
g.Go(func() error {
|
||||
if err := helpers.ComponentsStarted(ctx, requiredNodes); err != nil {
|
||||
|
||||
@@ -156,9 +156,19 @@ func waitForMidEpoch(conn *grpc.ClientConn) error {
|
||||
}
|
||||
}
|
||||
|
||||
// getHeadEpochs fetches the head epoch from all beacon nodes concurrently.
|
||||
func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
|
||||
epochs := make([]primitives.Epoch, len(conns))
|
||||
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Wait until we're at least halfway into the epoch to avoid race conditions
|
||||
// at epoch boundaries where nodes may report different epochs.
|
||||
if err := waitForMidEpoch(conns[0]); err != nil {
|
||||
return errors.Wrap(err, "failed waiting for mid-epoch")
|
||||
}
|
||||
|
||||
headEpochs := make([]primitives.Epoch, len(conns))
|
||||
headBlockRoots := make([][]byte, len(conns))
|
||||
justifiedRoots := make([][]byte, len(conns))
|
||||
prevJustifiedRoots := make([][]byte, len(conns))
|
||||
finalizedRoots := make([][]byte, len(conns))
|
||||
chainHeads := make([]*eth.ChainHead, len(conns))
|
||||
g, _ := errgroup.WithContext(context.Background())
|
||||
|
||||
for i, conn := range conns {
|
||||
@@ -170,145 +180,63 @@ func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "connection number=%d", conIdx)
|
||||
}
|
||||
epochs[conIdx] = chainHead.HeadEpoch
|
||||
headEpochs[conIdx] = chainHead.HeadEpoch
|
||||
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
|
||||
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
|
||||
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
|
||||
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
|
||||
chainHeads[conIdx] = chainHead
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
return epochs, nil
|
||||
}
|
||||
|
||||
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Wait until we're at least halfway into the epoch to avoid race conditions
|
||||
// at epoch boundaries where nodes may report different epochs.
|
||||
if err := waitForMidEpoch(conns[0]); err != nil {
|
||||
return errors.Wrap(err, "failed waiting for mid-epoch")
|
||||
}
|
||||
|
||||
// First, wait for all nodes to reach the same epoch. Sync nodes may be
|
||||
// behind and need time to catch up. We poll every 2 seconds with a
|
||||
// 60 second timeout - this adapts to actual sync progress rather than
|
||||
// using fixed delays.
|
||||
const epochTimeout = 60 * time.Second
|
||||
const epochPollInterval = 2 * time.Second
|
||||
epochDeadline := time.Now().Add(epochTimeout)
|
||||
|
||||
for time.Now().Before(epochDeadline) {
|
||||
epochs, err := getHeadEpochs(conns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allSame := true
|
||||
for i := 1; i < len(epochs); i++ {
|
||||
if epochs[0] != epochs[i] {
|
||||
allSame = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allSame {
|
||||
break
|
||||
}
|
||||
time.Sleep(epochPollInterval)
|
||||
}
|
||||
|
||||
// Now that epochs match (or timeout reached), do detailed head comparison
|
||||
// with a few retries to handle block propagation delays.
|
||||
const maxRetries = 5
|
||||
const retryDelay = 1 * time.Second
|
||||
var lastErr error
|
||||
|
||||
for attempt := range maxRetries {
|
||||
if attempt > 0 {
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
|
||||
headEpochs := make([]primitives.Epoch, len(conns))
|
||||
headBlockRoots := make([][]byte, len(conns))
|
||||
justifiedRoots := make([][]byte, len(conns))
|
||||
prevJustifiedRoots := make([][]byte, len(conns))
|
||||
finalizedRoots := make([][]byte, len(conns))
|
||||
chainHeads := make([]*eth.ChainHead, len(conns))
|
||||
g, _ := errgroup.WithContext(context.Background())
|
||||
|
||||
for i, conn := range conns {
|
||||
conIdx := i
|
||||
currConn := conn
|
||||
g.Go(func() error {
|
||||
beaconClient := eth.NewBeaconChainClient(currConn)
|
||||
chainHead, err := beaconClient.GetChainHead(context.Background(), &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "connection number=%d", conIdx)
|
||||
}
|
||||
headEpochs[conIdx] = chainHead.HeadEpoch
|
||||
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
|
||||
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
|
||||
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
|
||||
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
|
||||
chainHeads[conIdx] = chainHead
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lastErr = nil
|
||||
for i := range conns {
|
||||
if headEpochs[0] != headEpochs[i] {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting head epochs on node %d, expected %d, received %d",
|
||||
i,
|
||||
headEpochs[0],
|
||||
headEpochs[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting head block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
headBlockRoots[0],
|
||||
headBlockRoots[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
|
||||
i,
|
||||
justifiedRoots[0],
|
||||
justifiedRoots[i],
|
||||
chainHeads[0].String(),
|
||||
chainHeads[i].String(),
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
prevJustifiedRoots[0],
|
||||
prevJustifiedRoots[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
finalizedRoots[0],
|
||||
finalizedRoots[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr
|
||||
|
||||
for i := range conns {
|
||||
if headEpochs[0] != headEpochs[i] {
|
||||
return fmt.Errorf(
|
||||
"received conflicting head epochs on node %d, expected %d, received %d",
|
||||
i,
|
||||
headEpochs[0],
|
||||
headEpochs[i],
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting head block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
headBlockRoots[0],
|
||||
headBlockRoots[i],
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
|
||||
i,
|
||||
justifiedRoots[0],
|
||||
justifiedRoots[i],
|
||||
chainHeads[0].String(),
|
||||
chainHeads[i].String(),
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
prevJustifiedRoots[0],
|
||||
prevJustifiedRoots[i],
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
finalizedRoots[0],
|
||||
finalizedRoots[i],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/api/server/structs"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/altair"
|
||||
@@ -124,25 +123,6 @@ func validatorsAreActive(ec *types.EvaluationContext, conns ...*grpc.ClientConn)
|
||||
|
||||
// validatorsParticipating ensures the validators have an acceptable participation rate.
|
||||
func validatorsParticipating(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Retry up to 3 times with 2 second delays to handle timing flakes where
|
||||
// attestations haven't been fully processed yet due to block propagation delays.
|
||||
const maxRetries = 3
|
||||
const retryDelay = 2 * time.Second
|
||||
var lastErr error
|
||||
|
||||
for attempt := range maxRetries {
|
||||
if attempt > 0 {
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
lastErr = checkValidatorsParticipating(conns)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
|
||||
conn := conns[0]
|
||||
client := ethpb.NewBeaconChainClient(conn)
|
||||
validatorRequest := ðpb.GetValidatorParticipationRequest{}
|
||||
@@ -254,25 +234,6 @@ func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
|
||||
// validatorsSyncParticipation ensures the validators have an acceptable participation rate for
|
||||
// sync committee assignments.
|
||||
func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Retry up to 3 times with 2 second delays to handle timing flakes where
|
||||
// sync committee messages haven't fully propagated yet.
|
||||
const maxRetries = 3
|
||||
const retryDelay = 2 * time.Second
|
||||
var lastErr error
|
||||
|
||||
for attempt := range maxRetries {
|
||||
if attempt > 0 {
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
lastErr = checkSyncParticipation(conns)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
conn := conns[0]
|
||||
client := ethpb.NewNodeClient(conn)
|
||||
altairClient := ethpb.NewBeaconChainClient(conn)
|
||||
@@ -311,9 +272,9 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
// Skip fork slot.
|
||||
continue
|
||||
}
|
||||
// Skip early slots at genesis - validators need time to ramp up after chain start
|
||||
// Skip slots 1-2 at genesis - validators need time to ramp up after chain start
|
||||
// due to doppelganger protection. This is a startup timing issue, not a fork transition issue.
|
||||
if b.Block().Slot() < 5 {
|
||||
if b.Block().Slot() < 3 {
|
||||
continue
|
||||
}
|
||||
expectedParticipation := expectedSyncParticipation
|
||||
@@ -328,11 +289,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
|
||||
// where the proposer didn't receive sync committee contributions in time.
|
||||
if syncAgg.SyncCommitteeBits.Count() == 0 {
|
||||
continue
|
||||
}
|
||||
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedParticipation)
|
||||
if syncAgg.SyncCommitteeBits.Count() < threshold {
|
||||
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
|
||||
@@ -387,11 +343,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
|
||||
// where the proposer didn't receive sync committee contributions in time.
|
||||
if syncAgg.SyncCommitteeBits.Count() == 0 {
|
||||
continue
|
||||
}
|
||||
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
|
||||
if syncAgg.SyncCommitteeBits.Count() < threshold {
|
||||
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
|
||||
|
||||
@@ -9,11 +9,11 @@ import (
|
||||
)
|
||||
|
||||
func TestEndToEnd_MinimalConfig_WithBuilder(t *testing.T) {
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithBlobTxCount(2))
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder())
|
||||
r.run()
|
||||
}
|
||||
|
||||
func TestEndToEnd_MinimalConfig_WithBuilder_ValidatorRESTApi(t *testing.T) {
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi(), types.WithBlobTxCount(2))
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi())
|
||||
r.run()
|
||||
}
|
||||
|
||||
@@ -68,14 +68,6 @@ func WithLargeBlobs() E2EConfigOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlobTxCount sets the number of blob transactions sent per slot.
|
||||
// Default is 5 when not specified.
|
||||
func WithBlobTxCount(n int) E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
cfg.BlobTxCount = n
|
||||
}
|
||||
}
|
||||
|
||||
func WithSSZOnly() E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
if err := os.Setenv(params.EnvNameOverrideAccept, api.OctetStreamMediaType); err != nil {
|
||||
@@ -116,7 +108,6 @@ type E2EConfig struct {
|
||||
UseBeaconRestApi bool
|
||||
UseBuilder bool
|
||||
UseLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
|
||||
BlobTxCount int // Number of blob transactions per slot (0 means default of 5)
|
||||
EpochsToRun uint64
|
||||
ExitEpoch primitives.Epoch // Custom epoch for voluntary exit submission (0 means use default)
|
||||
Seed int64
|
||||
|
||||
Reference in New Issue
Block a user