From 57281834ec700ea72b8fbc135e63e9d8073e89b5 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 18 Jun 2025 23:25:48 +0200 Subject: [PATCH] feat(test): rewrite test_engine_tree_buffered_blocks_are_eventually_connected using e2e framework (#16830) --- crates/e2e-test-utils/src/lib.rs | 47 ++- .../src/testsuite/actions/engine_api.rs | 350 ++++++++++++++++++ .../src/testsuite/actions/mod.rs | 83 ++++- .../src/testsuite/actions/produce_blocks.rs | 164 ++++++-- crates/e2e-test-utils/src/testsuite/setup.rs | 21 +- crates/engine/tree/src/tree/e2e_tests.rs | 60 ++- crates/engine/tree/src/tree/tests.rs | 74 +--- 7 files changed, 659 insertions(+), 140 deletions(-) create mode 100644 crates/e2e-test-utils/src/testsuite/actions/engine_api.rs diff --git a/crates/e2e-test-utils/src/lib.rs b/crates/e2e-test-utils/src/lib.rs index 99d48c9373..0a2aa467e7 100644 --- a/crates/e2e-test-utils/src/lib.rs +++ b/crates/e2e-test-utils/src/lib.rs @@ -114,6 +114,35 @@ pub async fn setup_engine( TaskManager, Wallet, )> +where + N: NodeBuilderHelper, + LocalPayloadAttributesBuilder: + PayloadAttributesBuilder<::PayloadAttributes>, +{ + setup_engine_with_connection::( + num_nodes, + chain_spec, + is_dev, + tree_config, + attributes_generator, + true, + ) + .await +} + +/// Creates the initial setup with `num_nodes` started and optionally interconnected. +pub async fn setup_engine_with_connection( + num_nodes: usize, + chain_spec: Arc, + is_dev: bool, + tree_config: reth_node_api::TreeConfig, + attributes_generator: impl Fn(u64) -> <::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static, + connect_nodes: bool, +) -> eyre::Result<( + Vec>>>, + TaskManager, + Wallet, +)> where N: NodeBuilderHelper, LocalPayloadAttributesBuilder: @@ -165,15 +194,17 @@ where let genesis = node.block_hash(0); node.update_forkchoice(genesis, genesis).await?; - // Connect each node in a chain. - if let Some(previous_node) = nodes.last_mut() { - previous_node.connect(&mut node).await; - } + // Connect each node in a chain if requested. + if connect_nodes { + if let Some(previous_node) = nodes.last_mut() { + previous_node.connect(&mut node).await; + } - // Connect last node with the first if there are more than two - if idx + 1 == num_nodes && num_nodes > 2 { - if let Some(first_node) = nodes.first_mut() { - node.connect(first_node).await; + // Connect last node with the first if there are more than two + if idx + 1 == num_nodes && num_nodes > 2 { + if let Some(first_node) = nodes.first_mut() { + node.connect(first_node).await; + } } } diff --git a/crates/e2e-test-utils/src/testsuite/actions/engine_api.rs b/crates/e2e-test-utils/src/testsuite/actions/engine_api.rs new file mode 100644 index 0000000000..655a6d723a --- /dev/null +++ b/crates/e2e-test-utils/src/testsuite/actions/engine_api.rs @@ -0,0 +1,350 @@ +//! Engine API specific actions for testing. + +use crate::testsuite::{Action, Environment}; +use alloy_primitives::B256; +use alloy_rpc_types_engine::{ + ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, PayloadStatusEnum, +}; +use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction}; +use eyre::Result; +use futures_util::future::BoxFuture; +use reth_node_api::{EngineTypes, PayloadTypes}; +use reth_rpc_api::clients::{EngineApiClient, EthApiClient}; +use std::marker::PhantomData; +use tracing::debug; + +/// Action that sends a newPayload request to a specific node. +#[derive(Debug)] +pub struct SendNewPayload +where + Engine: EngineTypes, +{ + /// The node index to send to + pub node_idx: usize, + /// The block number to send + pub block_number: u64, + /// The source node to get the block from + pub source_node_idx: usize, + /// Expected payload status + pub expected_status: ExpectedPayloadStatus, + _phantom: PhantomData, +} + +/// Expected status for a payload +#[derive(Debug, Clone)] +pub enum ExpectedPayloadStatus { + /// Expect the payload to be valid + Valid, + /// Expect the payload to be invalid + Invalid, + /// Expect the payload to be syncing or accepted (buffered) + SyncingOrAccepted, +} + +impl SendNewPayload +where + Engine: EngineTypes, +{ + /// Create a new `SendNewPayload` action + pub fn new( + node_idx: usize, + block_number: u64, + source_node_idx: usize, + expected_status: ExpectedPayloadStatus, + ) -> Self { + Self { + node_idx, + block_number, + source_node_idx, + expected_status, + _phantom: Default::default(), + } + } +} + +impl Action for SendNewPayload +where + Engine: EngineTypes + PayloadTypes, +{ + fn execute<'a>(&'a mut self, env: &'a mut Environment) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + if self.node_idx >= env.node_clients.len() { + return Err(eyre::eyre!("Target node index out of bounds: {}", self.node_idx)); + } + if self.source_node_idx >= env.node_clients.len() { + return Err(eyre::eyre!( + "Source node index out of bounds: {}", + self.source_node_idx + )); + } + + // Get the block from the source node with retries + let source_rpc = &env.node_clients[self.source_node_idx].rpc; + let mut block = None; + let mut retries = 0; + const MAX_RETRIES: u32 = 5; + + while retries < MAX_RETRIES { + match EthApiClient::::block_by_number( + source_rpc, + alloy_eips::BlockNumberOrTag::Number(self.block_number), + true, // include transactions + ) + .await + { + Ok(Some(b)) => { + block = Some(b); + break; + } + Ok(None) => { + debug!( + "Block {} not found on source node {} (attempt {}/{})", + self.block_number, + self.source_node_idx, + retries + 1, + MAX_RETRIES + ); + retries += 1; + if retries < MAX_RETRIES { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + Err(e) => return Err(e.into()), + } + } + + let block = block.ok_or_else(|| { + eyre::eyre!( + "Block {} not found on source node {} after {} retries", + self.block_number, + self.source_node_idx, + MAX_RETRIES + ) + })?; + + // Convert block to ExecutionPayloadV3 + let payload = block_to_payload_v3(block.clone()); + + // Send the payload to the target node + let target_engine = env.node_clients[self.node_idx].engine.http_client(); + let result = EngineApiClient::::new_payload_v3( + &target_engine, + payload, + vec![], + B256::ZERO, // parent_beacon_block_root + ) + .await?; + + debug!( + "Node {}: new_payload for block {} response - status: {:?}, latest_valid_hash: {:?}", + self.node_idx, self.block_number, result.status, result.latest_valid_hash + ); + + // Validate the response based on expectations + match (&result.status, &self.expected_status) { + (PayloadStatusEnum::Valid, ExpectedPayloadStatus::Valid) => { + debug!( + "Node {}: Block {} marked as VALID as expected", + self.node_idx, self.block_number + ); + Ok(()) + } + ( + PayloadStatusEnum::Invalid { validation_error }, + ExpectedPayloadStatus::Invalid, + ) => { + debug!( + "Node {}: Block {} marked as INVALID as expected: {:?}", + self.node_idx, self.block_number, validation_error + ); + Ok(()) + } + ( + PayloadStatusEnum::Syncing | PayloadStatusEnum::Accepted, + ExpectedPayloadStatus::SyncingOrAccepted, + ) => { + debug!( + "Node {}: Block {} marked as SYNCING/ACCEPTED as expected (buffered)", + self.node_idx, self.block_number + ); + Ok(()) + } + (status, expected) => Err(eyre::eyre!( + "Node {}: Unexpected payload status for block {}. Got {:?}, expected {:?}", + self.node_idx, + self.block_number, + status, + expected + )), + } + }) + } +} + +/// Action that sends multiple blocks to a node in a specific order. +#[derive(Debug)] +pub struct SendNewPayloads +where + Engine: EngineTypes, +{ + /// The node index to send to + target_node: Option, + /// The source node to get the blocks from + source_node: Option, + /// The starting block number + start_block: Option, + /// The total number of blocks to send + total_blocks: Option, + /// Whether to send in reverse order + reverse_order: bool, + /// Custom block numbers to send (if not using `start_block` + `total_blocks`) + custom_block_numbers: Option>, + _phantom: PhantomData, +} + +impl SendNewPayloads +where + Engine: EngineTypes, +{ + /// Create a new `SendNewPayloads` action builder + pub fn new() -> Self { + Self { + target_node: None, + source_node: None, + start_block: None, + total_blocks: None, + reverse_order: false, + custom_block_numbers: None, + _phantom: Default::default(), + } + } + + /// Set the target node index + pub const fn with_target_node(mut self, node_idx: usize) -> Self { + self.target_node = Some(node_idx); + self + } + + /// Set the source node index + pub const fn with_source_node(mut self, node_idx: usize) -> Self { + self.source_node = Some(node_idx); + self + } + + /// Set the starting block number + pub const fn with_start_block(mut self, block_num: u64) -> Self { + self.start_block = Some(block_num); + self + } + + /// Set the total number of blocks to send + pub const fn with_total_blocks(mut self, count: u64) -> Self { + self.total_blocks = Some(count); + self + } + + /// Send blocks in reverse order (useful for testing buffering) + pub const fn in_reverse_order(mut self) -> Self { + self.reverse_order = true; + self + } + + /// Set custom block numbers to send + pub fn with_block_numbers(mut self, block_numbers: Vec) -> Self { + self.custom_block_numbers = Some(block_numbers); + self + } +} + +impl Default for SendNewPayloads +where + Engine: EngineTypes, +{ + fn default() -> Self { + Self::new() + } +} + +impl Action for SendNewPayloads +where + Engine: EngineTypes + PayloadTypes, +{ + fn execute<'a>(&'a mut self, env: &'a mut Environment) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + // Validate required fields + let target_node = + self.target_node.ok_or_else(|| eyre::eyre!("Target node not specified"))?; + let source_node = + self.source_node.ok_or_else(|| eyre::eyre!("Source node not specified"))?; + + // Determine block numbers to send + let block_numbers = if let Some(custom_numbers) = &self.custom_block_numbers { + custom_numbers.clone() + } else { + let start = + self.start_block.ok_or_else(|| eyre::eyre!("Start block not specified"))?; + let count = + self.total_blocks.ok_or_else(|| eyre::eyre!("Total blocks not specified"))?; + + if self.reverse_order { + // Send blocks in reverse order (e.g., for count=2, start=1: [2, 1]) + (0..count).map(|i| start + count - 1 - i).collect() + } else { + // Send blocks in normal order + (0..count).map(|i| start + i).collect() + } + }; + + for &block_number in &block_numbers { + // For the first block in reverse order, expect buffering + // For subsequent blocks, they might connect immediately + let expected_status = + if self.reverse_order && block_number == *block_numbers.first().unwrap() { + ExpectedPayloadStatus::SyncingOrAccepted + } else { + ExpectedPayloadStatus::Valid + }; + + let mut action = SendNewPayload::::new( + target_node, + block_number, + source_node, + expected_status, + ); + + action.execute(env).await?; + } + + Ok(()) + }) + } +} + +/// Helper function to convert a block to `ExecutionPayloadV3` +fn block_to_payload_v3(block: Block) -> ExecutionPayloadV3 { + use alloy_primitives::U256; + + ExecutionPayloadV3 { + payload_inner: ExecutionPayloadV2 { + payload_inner: ExecutionPayloadV1 { + parent_hash: block.header.inner.parent_hash, + fee_recipient: block.header.inner.beneficiary, + state_root: block.header.inner.state_root, + receipts_root: block.header.inner.receipts_root, + logs_bloom: block.header.inner.logs_bloom, + prev_randao: block.header.inner.mix_hash, + block_number: block.header.inner.number, + gas_limit: block.header.inner.gas_limit, + gas_used: block.header.inner.gas_used, + timestamp: block.header.inner.timestamp, + extra_data: block.header.inner.extra_data.clone(), + base_fee_per_gas: U256::from(block.header.inner.base_fee_per_gas.unwrap_or(0)), + block_hash: block.header.hash, + transactions: vec![], // No transactions needed for buffering tests + }, + withdrawals: block.withdrawals.unwrap_or_default().to_vec(), + }, + blob_gas_used: block.header.inner.blob_gas_used.unwrap_or(0), + excess_blob_gas: block.header.inner.excess_blob_gas.unwrap_or(0), + } +} diff --git a/crates/e2e-test-utils/src/testsuite/actions/mod.rs b/crates/e2e-test-utils/src/testsuite/actions/mod.rs index 7f09c28356..a5ed75ed44 100644 --- a/crates/e2e-test-utils/src/testsuite/actions/mod.rs +++ b/crates/e2e-test-utils/src/testsuite/actions/mod.rs @@ -1,24 +1,27 @@ //! Actions that can be performed in tests. use crate::testsuite::Environment; -use alloy_rpc_types_engine::{ForkchoiceUpdated, PayloadStatusEnum}; +use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatusEnum}; use eyre::Result; use futures_util::future::BoxFuture; use reth_node_api::EngineTypes; +use reth_rpc_api::clients::EngineApiClient; use std::future::Future; use tracing::debug; +pub mod engine_api; pub mod fork; pub mod node_ops; pub mod produce_blocks; pub mod reorg; +pub use engine_api::{ExpectedPayloadStatus, SendNewPayload, SendNewPayloads}; pub use fork::{CreateFork, ForkBase, SetForkBase, SetForkBaseFromBlockInfo, ValidateFork}; pub use node_ops::{CaptureBlockOnNode, CompareNodeChainTips, SelectActiveNode, ValidateBlockTag}; pub use produce_blocks::{ AssertMineBlock, BroadcastLatestForkchoice, BroadcastNextNewPayload, CheckPayloadAccepted, ExpectFcuStatus, GenerateNextPayload, GeneratePayloadAttributes, PickNextBlockProducer, - ProduceBlocks, ProduceInvalidBlocks, TestFcuToTag, UpdateBlockInfo, + ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, TestFcuToTag, UpdateBlockInfo, UpdateBlockInfoToLatestPayload, ValidateCanonicalTag, }; pub use reorg::{ReorgTarget, ReorgTo, SetReorgTarget}; @@ -102,12 +105,20 @@ where /// Action that makes the current latest block canonical by broadcasting a forkchoice update #[derive(Debug, Default)] -pub struct MakeCanonical {} +pub struct MakeCanonical { + /// If true, only send to the active node. If false, broadcast to all nodes. + active_node_only: bool, +} impl MakeCanonical { /// Create a new `MakeCanonical` action pub const fn new() -> Self { - Self {} + Self { active_node_only: false } + } + + /// Create a new `MakeCanonical` action that only applies to the active node + pub const fn with_active_node() -> Self { + Self { active_node_only: true } } } @@ -120,23 +131,59 @@ where { fn execute<'a>(&'a mut self, env: &'a mut Environment) -> BoxFuture<'a, Result<()>> { Box::pin(async move { - let mut actions: Vec>> = vec![ - Box::new(BroadcastLatestForkchoice::default()), - Box::new(UpdateBlockInfo::default()), - ]; + if self.active_node_only { + // Only update the active node + let latest_block = env + .current_block_info() + .ok_or_else(|| eyre::eyre!("No latest block information available"))?; - // if we're on a fork, validate it now that it's canonical - if let Ok(active_state) = env.active_node_state() { - if let Some(fork_base) = active_state.current_fork_base { - debug!("MakeCanonical: Adding fork validation from base block {}", fork_base); - actions.push(Box::new(ValidateFork::new(fork_base))); - // clear the fork base since we're now canonical - env.active_node_state_mut()?.current_fork_base = None; + let fork_choice_state = ForkchoiceState { + head_block_hash: latest_block.hash, + safe_block_hash: latest_block.hash, + finalized_block_hash: latest_block.hash, + }; + + let active_idx = env.active_node_idx; + let engine = env.node_clients[active_idx].engine.http_client(); + + let fcu_response = EngineApiClient::::fork_choice_updated_v3( + &engine, + fork_choice_state, + None, + ) + .await?; + + debug!( + "Active node {}: Forkchoice update status: {:?}", + active_idx, fcu_response.payload_status.status + ); + + validate_fcu_response(&fcu_response, &format!("Active node {active_idx}"))?; + + Ok(()) + } else { + // Original broadcast behavior + let mut actions: Vec>> = vec![ + Box::new(BroadcastLatestForkchoice::default()), + Box::new(UpdateBlockInfo::default()), + ]; + + // if we're on a fork, validate it now that it's canonical + if let Ok(active_state) = env.active_node_state() { + if let Some(fork_base) = active_state.current_fork_base { + debug!( + "MakeCanonical: Adding fork validation from base block {}", + fork_base + ); + actions.push(Box::new(ValidateFork::new(fork_base))); + // clear the fork base since we're now canonical + env.active_node_state_mut()?.current_fork_base = None; + } } - } - let mut sequence = Sequence::new(actions); - sequence.execute(env).await + let mut sequence = Sequence::new(actions); + sequence.execute(env).await + } }) } } diff --git a/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs b/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs index dfeb5a8fa8..a6ceec2eee 100644 --- a/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs +++ b/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs @@ -599,7 +599,17 @@ where /// Action that broadcasts the next new payload #[derive(Debug, Default)] -pub struct BroadcastNextNewPayload {} +pub struct BroadcastNextNewPayload { + /// If true, only send to the active node. If false, broadcast to all nodes. + active_node_only: bool, +} + +impl BroadcastNextNewPayload { + /// Create a new `BroadcastNextNewPayload` action that only sends to the active node + pub const fn with_active_node() -> Self { + Self { active_node_only: true } + } +} impl Action for BroadcastNextNewPayload where @@ -630,14 +640,11 @@ where let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into(); let execution_payload = execution_payload_envelope.execution_payload; - // Loop through all clients and broadcast the next new payload - let mut broadcast_results = Vec::new(); - let mut first_valid_seen = false; + if self.active_node_only { + // Send only to the active node + let active_idx = env.active_node_idx; + let engine = env.node_clients[active_idx].engine.http_client(); - for (idx, client) in env.node_clients.iter().enumerate() { - let engine = client.engine.http_client(); - - // Broadcast the execution payload let result = EngineApiClient::::new_payload_v3( &engine, execution_payload.clone(), @@ -646,35 +653,70 @@ where ) .await?; - broadcast_results.push((idx, result.status.clone())); - debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status); + debug!("Active node {}: new_payload status: {:?}", active_idx, result.status); - // Check if this node accepted the payload - if result.status == PayloadStatusEnum::Valid && !first_valid_seen { - first_valid_seen = true; - } else if let PayloadStatusEnum::Invalid { validation_error } = result.status { - debug!( - "Node {}: Invalid payload status returned from broadcast: {:?}", - idx, validation_error - ); + // Validate the response + match result.status { + PayloadStatusEnum::Valid => { + env.active_node_state_mut()?.latest_payload_executed = + Some(next_new_payload); + Ok(()) + } + other => Err(eyre::eyre!( + "Active node {}: Unexpected payload status: {:?}", + active_idx, + other + )), } + } else { + // Loop through all clients and broadcast the next new payload + let mut broadcast_results = Vec::new(); + let mut first_valid_seen = false; + + for (idx, client) in env.node_clients.iter().enumerate() { + let engine = client.engine.http_client(); + + // Broadcast the execution payload + let result = EngineApiClient::::new_payload_v3( + &engine, + execution_payload.clone(), + vec![], + parent_beacon_block_root, + ) + .await?; + + broadcast_results.push((idx, result.status.clone())); + debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status); + + // Check if this node accepted the payload + if result.status == PayloadStatusEnum::Valid && !first_valid_seen { + first_valid_seen = true; + } else if let PayloadStatusEnum::Invalid { validation_error } = result.status { + debug!( + "Node {}: Invalid payload status returned from broadcast: {:?}", + idx, validation_error + ); + } + } + + // Update the executed payload state after broadcasting to all nodes + if first_valid_seen { + env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload); + } + + // Check if at least one node accepted the payload + let any_valid = + broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid); + if !any_valid { + return Err(eyre::eyre!( + "Failed to successfully broadcast payload to any client" + )); + } + + debug!("Broadcast complete. Results: {:?}", broadcast_results); + + Ok(()) } - - // Update the executed payload state after broadcasting to all nodes - if first_valid_seen { - env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload); - } - - // Check if at least one node accepted the payload - let any_valid = - broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid); - if !any_valid { - return Err(eyre::eyre!("Failed to successfully broadcast payload to any client")); - } - - debug!("Broadcast complete. Results: {:?}", broadcast_results); - - Ok(()) }) } } @@ -873,6 +915,60 @@ where } } +/// Action that produces blocks locally without broadcasting to other nodes +/// This sends the payload only to the active node to ensure it's available locally +#[derive(Debug)] +pub struct ProduceBlocksLocally { + /// Number of blocks to produce + pub num_blocks: u64, + /// Tracks engine type + _phantom: PhantomData, +} + +impl ProduceBlocksLocally { + /// Create a new `ProduceBlocksLocally` action + pub fn new(num_blocks: u64) -> Self { + Self { num_blocks, _phantom: Default::default() } + } +} + +impl Default for ProduceBlocksLocally { + fn default() -> Self { + Self::new(0) + } +} + +impl Action for ProduceBlocksLocally +where + Engine: EngineTypes + PayloadTypes, + Engine::PayloadAttributes: From + Clone, + Engine::ExecutionPayloadEnvelopeV3: Into, +{ + fn execute<'a>(&'a mut self, env: &'a mut Environment) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + // Remember the active node to ensure all blocks are produced on the same node + let producer_idx = env.active_node_idx; + + for _ in 0..self.num_blocks { + // Ensure we always use the same producer + env.last_producer_idx = Some(producer_idx); + + // create a sequence that produces blocks and sends only to active node + let mut sequence = Sequence::new(vec![ + // Skip PickNextBlockProducer to maintain the same producer + Box::new(GeneratePayloadAttributes::default()), + Box::new(GenerateNextPayload::default()), + // Send payload only to the active node to make it available + Box::new(BroadcastNextNewPayload::with_active_node()), + Box::new(UpdateBlockInfoToLatestPayload::default()), + ]); + sequence.execute(env).await?; + } + Ok(()) + }) + } +} + /// Action that produces a sequence of blocks where some blocks are intentionally invalid #[derive(Debug)] pub struct ProduceInvalidBlocks { diff --git a/crates/e2e-test-utils/src/testsuite/setup.rs b/crates/e2e-test-utils/src/testsuite/setup.rs index 9198851af5..c51fd3c0bf 100644 --- a/crates/e2e-test-utils/src/testsuite/setup.rs +++ b/crates/e2e-test-utils/src/testsuite/setup.rs @@ -1,6 +1,9 @@ //! Test setup utilities for configuring the initial state. -use crate::{setup_engine, testsuite::Environment, NodeBuilderHelper, PayloadAttributesBuilder}; +use crate::{ + setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper, + PayloadAttributesBuilder, +}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::B256; use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes}; @@ -157,12 +160,13 @@ where ) }; - let result = setup_engine::( + let result = setup_engine_with_connection::( node_count, Arc::::new((*chain_spec).clone().into()), is_dev, self.tree_config.clone(), attributes_generator, + self.network.connect_nodes, ) .await; @@ -292,16 +296,23 @@ pub struct Genesis {} pub struct NetworkSetup { /// Number of nodes to create pub node_count: usize, + /// Whether nodes should be connected to each other + pub connect_nodes: bool, } impl NetworkSetup { /// Create a new network setup with a single node pub const fn single_node() -> Self { - Self { node_count: 1 } + Self { node_count: 1, connect_nodes: true } } - /// Create a new network setup with multiple nodes + /// Create a new network setup with multiple nodes (connected) pub const fn multi_node(count: usize) -> Self { - Self { node_count: count } + Self { node_count: count, connect_nodes: true } + } + + /// Create a new network setup with multiple nodes (disconnected) + pub const fn multi_node_unconnected(count: usize) -> Self { + Self { node_count: count, connect_nodes: false } } } diff --git a/crates/engine/tree/src/tree/e2e_tests.rs b/crates/engine/tree/src/tree/e2e_tests.rs index ec74eecc20..0bbd92b8df 100644 --- a/crates/engine/tree/src/tree/e2e_tests.rs +++ b/crates/engine/tree/src/tree/e2e_tests.rs @@ -5,8 +5,9 @@ use eyre::Result; use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_e2e_test_utils::testsuite::{ actions::{ - CaptureBlock, CreateFork, ExpectFcuStatus, MakeCanonical, ProduceBlocks, - ProduceInvalidBlocks, ReorgTo, ValidateCanonicalTag, + CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus, MakeCanonical, + ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo, SelectActiveNode, + SendNewPayloads, UpdateBlockInfo, ValidateCanonicalTag, }, setup::{NetworkSetup, Setup}, TestBuilder, @@ -185,3 +186,58 @@ async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid_e2e() -> R Ok(()) } + +/// Test that verifies buffered blocks are eventually connected when sent in reverse order. +#[tokio::test] +async fn test_engine_tree_buffered_blocks_are_eventually_connected_e2e() -> Result<()> { + reth_tracing::init_test_tracing(); + + let test = TestBuilder::new() + .with_setup( + Setup::default() + .with_chain_spec(Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis( + serde_json::from_str(include_str!( + "../../../../e2e-test-utils/src/testsuite/assets/genesis.json" + )) + .unwrap(), + ) + .cancun_activated() + .build(), + )) + .with_network(NetworkSetup::multi_node_unconnected(2)) // Need 2 disconnected nodes + .with_tree_config( + TreeConfig::default() + .with_legacy_state_root(false) + .with_has_enough_parallelism(true), + ), + ) + // node 0 produces blocks 1 and 2 locally without broadcasting + .with_action(SelectActiveNode::new(0)) + .with_action(ProduceBlocksLocally::::new(2)) + // make the blocks canonical on node 0 so they're available via RPC + .with_action(MakeCanonical::with_active_node()) + // send blocks in reverse order (2, then 1) from node 0 to node 1 + .with_action( + SendNewPayloads::::new() + .with_target_node(1) + .with_source_node(0) + .with_start_block(1) + .with_total_blocks(2) + .in_reverse_order(), + ) + // update node 1's view to recognize the new blocks + .with_action(SelectActiveNode::new(1)) + // get the latest block from node 1's RPC and update environment + .with_action(UpdateBlockInfo::default()) + // make block 2 canonical on node 1 with a forkchoice update + .with_action(MakeCanonical::with_active_node()) + // verify both nodes eventually have the same chain tip + .with_action(CompareNodeChainTips::expect_same(0, 1)); + + test.run::().await?; + + Ok(()) +} diff --git a/crates/engine/tree/src/tree/tests.rs b/crates/engine/tree/src/tree/tests.rs index 68db670754..43891c6fb7 100644 --- a/crates/engine/tree/src/tree/tests.rs +++ b/crates/engine/tree/src/tree/tests.rs @@ -6,10 +6,7 @@ use alloy_primitives::{ Bytes, B256, }; use alloy_rlp::Decodable; -use alloy_rpc_types_engine::{ - CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1, - ExecutionPayloadV3, -}; +use alloy_rpc_types_engine::{ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1}; use assert_matches::assert_matches; use reth_chain_state::{test_utils::TestBlockBuilder, BlockState}; use reth_chainspec::{ChainSpec, HOLESKY, MAINNET}; @@ -289,22 +286,6 @@ impl TestHarness { } } - async fn send_new_payload(&mut self, block: RecoveredBlock) { - let payload = ExecutionPayloadV3::from_block_unchecked( - block.hash(), - &block.clone_sealed_block().into_block(), - ); - self.tree - .on_new_payload(ExecutionData { - payload: payload.into(), - sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields { - parent_beacon_block_root: block.parent_beacon_block_root.unwrap(), - versioned_hashes: vec![], - }), - }) - .unwrap(); - } - async fn insert_chain( &mut self, chain: impl IntoIterator> + Clone, @@ -349,18 +330,6 @@ impl TestHarness { } } - async fn check_block_received(&mut self, hash: B256) { - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::BlockReceived( - num_hash, - )) => { - assert_eq!(num_hash.hash, hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - fn persist_blocks(&self, blocks: Vec>) { let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len()); let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len()); @@ -1106,44 +1075,3 @@ async fn test_engine_tree_live_sync_fcu_extends_canon_chain() { test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await; test_harness.check_canon_head(main_last_hash); } - -#[tokio::test] -async fn test_engine_tree_buffered_blocks_are_eventually_connected() { - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - // side chain consisting of two blocks, the last will be inserted first - // so that we force it to be buffered - let side_chain = - test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2); - - // buffer last block of side chain - let buffered_block = side_chain.last().unwrap(); - let buffered_block_hash = buffered_block.hash(); - - test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]); - test_harness.send_new_payload(buffered_block.clone()).await; - - assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some()); - - let non_buffered_block = side_chain.first().unwrap(); - let non_buffered_block_hash = non_buffered_block.hash(); - - // insert block that continues the canon chain, should not be buffered - test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]); - test_harness.send_new_payload(non_buffered_block.clone()).await; - assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none()); - - // the previously buffered block should be connected now - assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none()); - - // both blocks are added to the canon chain in order - // note that the buffered block is received first, but added last - test_harness.check_block_received(buffered_block_hash).await; - test_harness.check_block_received(non_buffered_block_hash).await; - test_harness.check_canon_block_added(non_buffered_block_hash).await; - test_harness.check_canon_block_added(buffered_block_hash).await; -}