feat(test): rewrite test_engine_tree_buffered_blocks_are_eventually_connected using e2e framework (#16830)

This commit is contained in:
Federico Gimenez
2025-06-18 23:25:48 +02:00
committed by GitHub
parent c0c2eeaa36
commit 57281834ec
7 changed files with 659 additions and 140 deletions

View File

@@ -114,6 +114,35 @@ pub async fn setup_engine<N>(
TaskManager,
Wallet,
)>
where
N: NodeBuilderHelper,
LocalPayloadAttributesBuilder<N::ChainSpec>:
PayloadAttributesBuilder<<N::Payload as PayloadTypes>::PayloadAttributes>,
{
setup_engine_with_connection::<N>(
num_nodes,
chain_spec,
is_dev,
tree_config,
attributes_generator,
true,
)
.await
}
/// Creates the initial setup with `num_nodes` started and optionally interconnected.
pub async fn setup_engine_with_connection<N>(
num_nodes: usize,
chain_spec: Arc<N::ChainSpec>,
is_dev: bool,
tree_config: reth_node_api::TreeConfig,
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
connect_nodes: bool,
) -> eyre::Result<(
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
where
N: NodeBuilderHelper,
LocalPayloadAttributesBuilder<N::ChainSpec>:
@@ -165,15 +194,17 @@ where
let genesis = node.block_hash(0);
node.update_forkchoice(genesis, genesis).await?;
// Connect each node in a chain.
if let Some(previous_node) = nodes.last_mut() {
previous_node.connect(&mut node).await;
}
// Connect each node in a chain if requested.
if connect_nodes {
if let Some(previous_node) = nodes.last_mut() {
previous_node.connect(&mut node).await;
}
// Connect last node with the first if there are more than two
if idx + 1 == num_nodes && num_nodes > 2 {
if let Some(first_node) = nodes.first_mut() {
node.connect(first_node).await;
// Connect last node with the first if there are more than two
if idx + 1 == num_nodes && num_nodes > 2 {
if let Some(first_node) = nodes.first_mut() {
node.connect(first_node).await;
}
}
}

View File

@@ -0,0 +1,350 @@
//! Engine API specific actions for testing.
use crate::testsuite::{Action, Environment};
use alloy_primitives::B256;
use alloy_rpc_types_engine::{
ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, PayloadStatusEnum,
};
use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction};
use eyre::Result;
use futures_util::future::BoxFuture;
use reth_node_api::{EngineTypes, PayloadTypes};
use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
use std::marker::PhantomData;
use tracing::debug;
/// Action that sends a newPayload request to a specific node.
#[derive(Debug)]
pub struct SendNewPayload<Engine>
where
Engine: EngineTypes,
{
/// The node index to send to
pub node_idx: usize,
/// The block number to send
pub block_number: u64,
/// The source node to get the block from
pub source_node_idx: usize,
/// Expected payload status
pub expected_status: ExpectedPayloadStatus,
_phantom: PhantomData<Engine>,
}
/// Expected status for a payload
#[derive(Debug, Clone)]
pub enum ExpectedPayloadStatus {
/// Expect the payload to be valid
Valid,
/// Expect the payload to be invalid
Invalid,
/// Expect the payload to be syncing or accepted (buffered)
SyncingOrAccepted,
}
impl<Engine> SendNewPayload<Engine>
where
Engine: EngineTypes,
{
/// Create a new `SendNewPayload` action
pub fn new(
node_idx: usize,
block_number: u64,
source_node_idx: usize,
expected_status: ExpectedPayloadStatus,
) -> Self {
Self {
node_idx,
block_number,
source_node_idx,
expected_status,
_phantom: Default::default(),
}
}
}
impl<Engine> Action<Engine> for SendNewPayload<Engine>
where
Engine: EngineTypes + PayloadTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if self.node_idx >= env.node_clients.len() {
return Err(eyre::eyre!("Target node index out of bounds: {}", self.node_idx));
}
if self.source_node_idx >= env.node_clients.len() {
return Err(eyre::eyre!(
"Source node index out of bounds: {}",
self.source_node_idx
));
}
// Get the block from the source node with retries
let source_rpc = &env.node_clients[self.source_node_idx].rpc;
let mut block = None;
let mut retries = 0;
const MAX_RETRIES: u32 = 5;
while retries < MAX_RETRIES {
match EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
source_rpc,
alloy_eips::BlockNumberOrTag::Number(self.block_number),
true, // include transactions
)
.await
{
Ok(Some(b)) => {
block = Some(b);
break;
}
Ok(None) => {
debug!(
"Block {} not found on source node {} (attempt {}/{})",
self.block_number,
self.source_node_idx,
retries + 1,
MAX_RETRIES
);
retries += 1;
if retries < MAX_RETRIES {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
Err(e) => return Err(e.into()),
}
}
let block = block.ok_or_else(|| {
eyre::eyre!(
"Block {} not found on source node {} after {} retries",
self.block_number,
self.source_node_idx,
MAX_RETRIES
)
})?;
// Convert block to ExecutionPayloadV3
let payload = block_to_payload_v3(block.clone());
// Send the payload to the target node
let target_engine = env.node_clients[self.node_idx].engine.http_client();
let result = EngineApiClient::<Engine>::new_payload_v3(
&target_engine,
payload,
vec![],
B256::ZERO, // parent_beacon_block_root
)
.await?;
debug!(
"Node {}: new_payload for block {} response - status: {:?}, latest_valid_hash: {:?}",
self.node_idx, self.block_number, result.status, result.latest_valid_hash
);
// Validate the response based on expectations
match (&result.status, &self.expected_status) {
(PayloadStatusEnum::Valid, ExpectedPayloadStatus::Valid) => {
debug!(
"Node {}: Block {} marked as VALID as expected",
self.node_idx, self.block_number
);
Ok(())
}
(
PayloadStatusEnum::Invalid { validation_error },
ExpectedPayloadStatus::Invalid,
) => {
debug!(
"Node {}: Block {} marked as INVALID as expected: {:?}",
self.node_idx, self.block_number, validation_error
);
Ok(())
}
(
PayloadStatusEnum::Syncing | PayloadStatusEnum::Accepted,
ExpectedPayloadStatus::SyncingOrAccepted,
) => {
debug!(
"Node {}: Block {} marked as SYNCING/ACCEPTED as expected (buffered)",
self.node_idx, self.block_number
);
Ok(())
}
(status, expected) => Err(eyre::eyre!(
"Node {}: Unexpected payload status for block {}. Got {:?}, expected {:?}",
self.node_idx,
self.block_number,
status,
expected
)),
}
})
}
}
/// Action that sends multiple blocks to a node in a specific order.
#[derive(Debug)]
pub struct SendNewPayloads<Engine>
where
Engine: EngineTypes,
{
/// The node index to send to
target_node: Option<usize>,
/// The source node to get the blocks from
source_node: Option<usize>,
/// The starting block number
start_block: Option<u64>,
/// The total number of blocks to send
total_blocks: Option<u64>,
/// Whether to send in reverse order
reverse_order: bool,
/// Custom block numbers to send (if not using `start_block` + `total_blocks`)
custom_block_numbers: Option<Vec<u64>>,
_phantom: PhantomData<Engine>,
}
impl<Engine> SendNewPayloads<Engine>
where
Engine: EngineTypes,
{
/// Create a new `SendNewPayloads` action builder
pub fn new() -> Self {
Self {
target_node: None,
source_node: None,
start_block: None,
total_blocks: None,
reverse_order: false,
custom_block_numbers: None,
_phantom: Default::default(),
}
}
/// Set the target node index
pub const fn with_target_node(mut self, node_idx: usize) -> Self {
self.target_node = Some(node_idx);
self
}
/// Set the source node index
pub const fn with_source_node(mut self, node_idx: usize) -> Self {
self.source_node = Some(node_idx);
self
}
/// Set the starting block number
pub const fn with_start_block(mut self, block_num: u64) -> Self {
self.start_block = Some(block_num);
self
}
/// Set the total number of blocks to send
pub const fn with_total_blocks(mut self, count: u64) -> Self {
self.total_blocks = Some(count);
self
}
/// Send blocks in reverse order (useful for testing buffering)
pub const fn in_reverse_order(mut self) -> Self {
self.reverse_order = true;
self
}
/// Set custom block numbers to send
pub fn with_block_numbers(mut self, block_numbers: Vec<u64>) -> Self {
self.custom_block_numbers = Some(block_numbers);
self
}
}
impl<Engine> Default for SendNewPayloads<Engine>
where
Engine: EngineTypes,
{
fn default() -> Self {
Self::new()
}
}
impl<Engine> Action<Engine> for SendNewPayloads<Engine>
where
Engine: EngineTypes + PayloadTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
// Validate required fields
let target_node =
self.target_node.ok_or_else(|| eyre::eyre!("Target node not specified"))?;
let source_node =
self.source_node.ok_or_else(|| eyre::eyre!("Source node not specified"))?;
// Determine block numbers to send
let block_numbers = if let Some(custom_numbers) = &self.custom_block_numbers {
custom_numbers.clone()
} else {
let start =
self.start_block.ok_or_else(|| eyre::eyre!("Start block not specified"))?;
let count =
self.total_blocks.ok_or_else(|| eyre::eyre!("Total blocks not specified"))?;
if self.reverse_order {
// Send blocks in reverse order (e.g., for count=2, start=1: [2, 1])
(0..count).map(|i| start + count - 1 - i).collect()
} else {
// Send blocks in normal order
(0..count).map(|i| start + i).collect()
}
};
for &block_number in &block_numbers {
// For the first block in reverse order, expect buffering
// For subsequent blocks, they might connect immediately
let expected_status =
if self.reverse_order && block_number == *block_numbers.first().unwrap() {
ExpectedPayloadStatus::SyncingOrAccepted
} else {
ExpectedPayloadStatus::Valid
};
let mut action = SendNewPayload::<Engine>::new(
target_node,
block_number,
source_node,
expected_status,
);
action.execute(env).await?;
}
Ok(())
})
}
}
/// Helper function to convert a block to `ExecutionPayloadV3`
fn block_to_payload_v3(block: Block) -> ExecutionPayloadV3 {
use alloy_primitives::U256;
ExecutionPayloadV3 {
payload_inner: ExecutionPayloadV2 {
payload_inner: ExecutionPayloadV1 {
parent_hash: block.header.inner.parent_hash,
fee_recipient: block.header.inner.beneficiary,
state_root: block.header.inner.state_root,
receipts_root: block.header.inner.receipts_root,
logs_bloom: block.header.inner.logs_bloom,
prev_randao: block.header.inner.mix_hash,
block_number: block.header.inner.number,
gas_limit: block.header.inner.gas_limit,
gas_used: block.header.inner.gas_used,
timestamp: block.header.inner.timestamp,
extra_data: block.header.inner.extra_data.clone(),
base_fee_per_gas: U256::from(block.header.inner.base_fee_per_gas.unwrap_or(0)),
block_hash: block.header.hash,
transactions: vec![], // No transactions needed for buffering tests
},
withdrawals: block.withdrawals.unwrap_or_default().to_vec(),
},
blob_gas_used: block.header.inner.blob_gas_used.unwrap_or(0),
excess_blob_gas: block.header.inner.excess_blob_gas.unwrap_or(0),
}
}

View File

@@ -1,24 +1,27 @@
//! Actions that can be performed in tests.
use crate::testsuite::Environment;
use alloy_rpc_types_engine::{ForkchoiceUpdated, PayloadStatusEnum};
use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatusEnum};
use eyre::Result;
use futures_util::future::BoxFuture;
use reth_node_api::EngineTypes;
use reth_rpc_api::clients::EngineApiClient;
use std::future::Future;
use tracing::debug;
pub mod engine_api;
pub mod fork;
pub mod node_ops;
pub mod produce_blocks;
pub mod reorg;
pub use engine_api::{ExpectedPayloadStatus, SendNewPayload, SendNewPayloads};
pub use fork::{CreateFork, ForkBase, SetForkBase, SetForkBaseFromBlockInfo, ValidateFork};
pub use node_ops::{CaptureBlockOnNode, CompareNodeChainTips, SelectActiveNode, ValidateBlockTag};
pub use produce_blocks::{
AssertMineBlock, BroadcastLatestForkchoice, BroadcastNextNewPayload, CheckPayloadAccepted,
ExpectFcuStatus, GenerateNextPayload, GeneratePayloadAttributes, PickNextBlockProducer,
ProduceBlocks, ProduceInvalidBlocks, TestFcuToTag, UpdateBlockInfo,
ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, TestFcuToTag, UpdateBlockInfo,
UpdateBlockInfoToLatestPayload, ValidateCanonicalTag,
};
pub use reorg::{ReorgTarget, ReorgTo, SetReorgTarget};
@@ -102,12 +105,20 @@ where
/// Action that makes the current latest block canonical by broadcasting a forkchoice update
#[derive(Debug, Default)]
pub struct MakeCanonical {}
pub struct MakeCanonical {
/// If true, only send to the active node. If false, broadcast to all nodes.
active_node_only: bool,
}
impl MakeCanonical {
/// Create a new `MakeCanonical` action
pub const fn new() -> Self {
Self {}
Self { active_node_only: false }
}
/// Create a new `MakeCanonical` action that only applies to the active node
pub const fn with_active_node() -> Self {
Self { active_node_only: true }
}
}
@@ -120,23 +131,59 @@ where
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let mut actions: Vec<Box<dyn Action<Engine>>> = vec![
Box::new(BroadcastLatestForkchoice::default()),
Box::new(UpdateBlockInfo::default()),
];
if self.active_node_only {
// Only update the active node
let latest_block = env
.current_block_info()
.ok_or_else(|| eyre::eyre!("No latest block information available"))?;
// if we're on a fork, validate it now that it's canonical
if let Ok(active_state) = env.active_node_state() {
if let Some(fork_base) = active_state.current_fork_base {
debug!("MakeCanonical: Adding fork validation from base block {}", fork_base);
actions.push(Box::new(ValidateFork::new(fork_base)));
// clear the fork base since we're now canonical
env.active_node_state_mut()?.current_fork_base = None;
let fork_choice_state = ForkchoiceState {
head_block_hash: latest_block.hash,
safe_block_hash: latest_block.hash,
finalized_block_hash: latest_block.hash,
};
let active_idx = env.active_node_idx;
let engine = env.node_clients[active_idx].engine.http_client();
let fcu_response = EngineApiClient::<Engine>::fork_choice_updated_v3(
&engine,
fork_choice_state,
None,
)
.await?;
debug!(
"Active node {}: Forkchoice update status: {:?}",
active_idx, fcu_response.payload_status.status
);
validate_fcu_response(&fcu_response, &format!("Active node {active_idx}"))?;
Ok(())
} else {
// Original broadcast behavior
let mut actions: Vec<Box<dyn Action<Engine>>> = vec![
Box::new(BroadcastLatestForkchoice::default()),
Box::new(UpdateBlockInfo::default()),
];
// if we're on a fork, validate it now that it's canonical
if let Ok(active_state) = env.active_node_state() {
if let Some(fork_base) = active_state.current_fork_base {
debug!(
"MakeCanonical: Adding fork validation from base block {}",
fork_base
);
actions.push(Box::new(ValidateFork::new(fork_base)));
// clear the fork base since we're now canonical
env.active_node_state_mut()?.current_fork_base = None;
}
}
}
let mut sequence = Sequence::new(actions);
sequence.execute(env).await
let mut sequence = Sequence::new(actions);
sequence.execute(env).await
}
})
}
}

View File

@@ -599,7 +599,17 @@ where
/// Action that broadcasts the next new payload
#[derive(Debug, Default)]
pub struct BroadcastNextNewPayload {}
pub struct BroadcastNextNewPayload {
/// If true, only send to the active node. If false, broadcast to all nodes.
active_node_only: bool,
}
impl BroadcastNextNewPayload {
/// Create a new `BroadcastNextNewPayload` action that only sends to the active node
pub const fn with_active_node() -> Self {
Self { active_node_only: true }
}
}
impl<Engine> Action<Engine> for BroadcastNextNewPayload
where
@@ -630,14 +640,11 @@ where
let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
let execution_payload = execution_payload_envelope.execution_payload;
// Loop through all clients and broadcast the next new payload
let mut broadcast_results = Vec::new();
let mut first_valid_seen = false;
if self.active_node_only {
// Send only to the active node
let active_idx = env.active_node_idx;
let engine = env.node_clients[active_idx].engine.http_client();
for (idx, client) in env.node_clients.iter().enumerate() {
let engine = client.engine.http_client();
// Broadcast the execution payload
let result = EngineApiClient::<Engine>::new_payload_v3(
&engine,
execution_payload.clone(),
@@ -646,35 +653,70 @@ where
)
.await?;
broadcast_results.push((idx, result.status.clone()));
debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
// Check if this node accepted the payload
if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
first_valid_seen = true;
} else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
debug!(
"Node {}: Invalid payload status returned from broadcast: {:?}",
idx, validation_error
);
// Validate the response
match result.status {
PayloadStatusEnum::Valid => {
env.active_node_state_mut()?.latest_payload_executed =
Some(next_new_payload);
Ok(())
}
other => Err(eyre::eyre!(
"Active node {}: Unexpected payload status: {:?}",
active_idx,
other
)),
}
} else {
// Loop through all clients and broadcast the next new payload
let mut broadcast_results = Vec::new();
let mut first_valid_seen = false;
for (idx, client) in env.node_clients.iter().enumerate() {
let engine = client.engine.http_client();
// Broadcast the execution payload
let result = EngineApiClient::<Engine>::new_payload_v3(
&engine,
execution_payload.clone(),
vec![],
parent_beacon_block_root,
)
.await?;
broadcast_results.push((idx, result.status.clone()));
debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
// Check if this node accepted the payload
if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
first_valid_seen = true;
} else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
debug!(
"Node {}: Invalid payload status returned from broadcast: {:?}",
idx, validation_error
);
}
}
// Update the executed payload state after broadcasting to all nodes
if first_valid_seen {
env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
}
// Check if at least one node accepted the payload
let any_valid =
broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
if !any_valid {
return Err(eyre::eyre!(
"Failed to successfully broadcast payload to any client"
));
}
debug!("Broadcast complete. Results: {:?}", broadcast_results);
Ok(())
}
// Update the executed payload state after broadcasting to all nodes
if first_valid_seen {
env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
}
// Check if at least one node accepted the payload
let any_valid =
broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
if !any_valid {
return Err(eyre::eyre!("Failed to successfully broadcast payload to any client"));
}
debug!("Broadcast complete. Results: {:?}", broadcast_results);
Ok(())
})
}
}
@@ -873,6 +915,60 @@ where
}
}
/// Action that produces blocks locally without broadcasting to other nodes
/// This sends the payload only to the active node to ensure it's available locally
#[derive(Debug)]
pub struct ProduceBlocksLocally<Engine> {
/// Number of blocks to produce
pub num_blocks: u64,
/// Tracks engine type
_phantom: PhantomData<Engine>,
}
impl<Engine> ProduceBlocksLocally<Engine> {
/// Create a new `ProduceBlocksLocally` action
pub fn new(num_blocks: u64) -> Self {
Self { num_blocks, _phantom: Default::default() }
}
}
impl<Engine> Default for ProduceBlocksLocally<Engine> {
fn default() -> Self {
Self::new(0)
}
}
impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
where
Engine: EngineTypes + PayloadTypes,
Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
// Remember the active node to ensure all blocks are produced on the same node
let producer_idx = env.active_node_idx;
for _ in 0..self.num_blocks {
// Ensure we always use the same producer
env.last_producer_idx = Some(producer_idx);
// create a sequence that produces blocks and sends only to active node
let mut sequence = Sequence::new(vec![
// Skip PickNextBlockProducer to maintain the same producer
Box::new(GeneratePayloadAttributes::default()),
Box::new(GenerateNextPayload::default()),
// Send payload only to the active node to make it available
Box::new(BroadcastNextNewPayload::with_active_node()),
Box::new(UpdateBlockInfoToLatestPayload::default()),
]);
sequence.execute(env).await?;
}
Ok(())
})
}
}
/// Action that produces a sequence of blocks where some blocks are intentionally invalid
#[derive(Debug)]
pub struct ProduceInvalidBlocks<Engine> {

View File

@@ -1,6 +1,9 @@
//! Test setup utilities for configuring the initial state.
use crate::{setup_engine, testsuite::Environment, NodeBuilderHelper, PayloadAttributesBuilder};
use crate::{
setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper,
PayloadAttributesBuilder,
};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
@@ -157,12 +160,13 @@ where
)
};
let result = setup_engine::<N>(
let result = setup_engine_with_connection::<N>(
node_count,
Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
is_dev,
self.tree_config.clone(),
attributes_generator,
self.network.connect_nodes,
)
.await;
@@ -292,16 +296,23 @@ pub struct Genesis {}
pub struct NetworkSetup {
/// Number of nodes to create
pub node_count: usize,
/// Whether nodes should be connected to each other
pub connect_nodes: bool,
}
impl NetworkSetup {
/// Create a new network setup with a single node
pub const fn single_node() -> Self {
Self { node_count: 1 }
Self { node_count: 1, connect_nodes: true }
}
/// Create a new network setup with multiple nodes
/// Create a new network setup with multiple nodes (connected)
pub const fn multi_node(count: usize) -> Self {
Self { node_count: count }
Self { node_count: count, connect_nodes: true }
}
/// Create a new network setup with multiple nodes (disconnected)
pub const fn multi_node_unconnected(count: usize) -> Self {
Self { node_count: count, connect_nodes: false }
}
}

View File

@@ -5,8 +5,9 @@ use eyre::Result;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::testsuite::{
actions::{
CaptureBlock, CreateFork, ExpectFcuStatus, MakeCanonical, ProduceBlocks,
ProduceInvalidBlocks, ReorgTo, ValidateCanonicalTag,
CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus, MakeCanonical,
ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo, SelectActiveNode,
SendNewPayloads, UpdateBlockInfo, ValidateCanonicalTag,
},
setup::{NetworkSetup, Setup},
TestBuilder,
@@ -185,3 +186,58 @@ async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid_e2e() -> R
Ok(())
}
/// Test that verifies buffered blocks are eventually connected when sent in reverse order.
#[tokio::test]
async fn test_engine_tree_buffered_blocks_are_eventually_connected_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
let test = TestBuilder::new()
.with_setup(
Setup::default()
.with_chain_spec(Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(
serde_json::from_str(include_str!(
"../../../../e2e-test-utils/src/testsuite/assets/genesis.json"
))
.unwrap(),
)
.cancun_activated()
.build(),
))
.with_network(NetworkSetup::multi_node_unconnected(2)) // Need 2 disconnected nodes
.with_tree_config(
TreeConfig::default()
.with_legacy_state_root(false)
.with_has_enough_parallelism(true),
),
)
// node 0 produces blocks 1 and 2 locally without broadcasting
.with_action(SelectActiveNode::new(0))
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(2))
// make the blocks canonical on node 0 so they're available via RPC
.with_action(MakeCanonical::with_active_node())
// send blocks in reverse order (2, then 1) from node 0 to node 1
.with_action(
SendNewPayloads::<EthEngineTypes>::new()
.with_target_node(1)
.with_source_node(0)
.with_start_block(1)
.with_total_blocks(2)
.in_reverse_order(),
)
// update node 1's view to recognize the new blocks
.with_action(SelectActiveNode::new(1))
// get the latest block from node 1's RPC and update environment
.with_action(UpdateBlockInfo::default())
// make block 2 canonical on node 1 with a forkchoice update
.with_action(MakeCanonical::with_active_node())
// verify both nodes eventually have the same chain tip
.with_action(CompareNodeChainTips::expect_same(0, 1));
test.run::<EthereumNode>().await?;
Ok(())
}

View File

@@ -6,10 +6,7 @@ use alloy_primitives::{
Bytes, B256,
};
use alloy_rlp::Decodable;
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1,
ExecutionPayloadV3,
};
use alloy_rpc_types_engine::{ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1};
use assert_matches::assert_matches;
use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
@@ -289,22 +286,6 @@ impl TestHarness {
}
}
async fn send_new_payload(&mut self, block: RecoveredBlock<reth_ethereum_primitives::Block>) {
let payload = ExecutionPayloadV3::from_block_unchecked(
block.hash(),
&block.clone_sealed_block().into_block(),
);
self.tree
.on_new_payload(ExecutionData {
payload: payload.into(),
sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
versioned_hashes: vec![],
}),
})
.unwrap();
}
async fn insert_chain(
&mut self,
chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
@@ -349,18 +330,6 @@ impl TestHarness {
}
}
async fn check_block_received(&mut self, hash: B256) {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::BlockReceived(
num_hash,
)) => {
assert_eq!(num_hash.hash, hash);
}
_ => panic!("Unexpected event: {event:#?}"),
}
}
fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
@@ -1106,44 +1075,3 @@ async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
test_harness.check_canon_head(main_last_hash);
}
#[tokio::test]
async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());
// side chain consisting of two blocks, the last will be inserted first
// so that we force it to be buffered
let side_chain =
test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2);
// buffer last block of side chain
let buffered_block = side_chain.last().unwrap();
let buffered_block_hash = buffered_block.hash();
test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
test_harness.send_new_payload(buffered_block.clone()).await;
assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
let non_buffered_block = side_chain.first().unwrap();
let non_buffered_block_hash = non_buffered_block.hash();
// insert block that continues the canon chain, should not be buffered
test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
test_harness.send_new_payload(non_buffered_block.clone()).await;
assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
// the previously buffered block should be connected now
assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
// both blocks are added to the canon chain in order
// note that the buffered block is received first, but added last
test_harness.check_block_received(buffered_block_hash).await;
test_harness.check_block_received(non_buffered_block_hash).await;
test_harness.check_canon_block_added(non_buffered_block_hash).await;
test_harness.check_canon_block_added(buffered_block_hash).await;
}