feat(test): add apply_with_import method to e2e Setup (#17263)

This commit is contained in:
Federico Gimenez
2025-07-08 11:26:27 +02:00
committed by GitHub
parent 5645659d59
commit 557836b93d
10 changed files with 592 additions and 284 deletions

1
Cargo.lock generated
View File

@@ -7787,6 +7787,7 @@ dependencies = [
"alloy-eips",
"alloy-network",
"alloy-primitives",
"alloy-provider",
"alloy-rlp",
"alloy-rpc-types-engine",
"alloy-rpc-types-eth",

View File

@@ -56,17 +56,18 @@ url.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true
alloy-rlp.workspace = true
futures-util.workspace = true
eyre.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
serde_json.workspace = true
alloy-signer.workspace = true
alloy-signer-local = { workspace = true, features = ["mnemonic"] }
alloy-rpc-types-eth.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-network.workspace = true
alloy-consensus = { workspace = true, features = ["kzg"] }
alloy-provider = { workspace = true, features = ["reqwest"] }
futures-util.workspace = true
eyre.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
serde_json.workspace = true
tracing.workspace = true
derive_more.workspace = true

View File

@@ -42,6 +42,9 @@ mod network;
/// Helper for rpc operations
mod rpc;
/// Utilities for creating and writing RLP test data
pub mod test_rlp_utils;
/// Creates the initial setup with `num_nodes` started and interconnected.
pub async fn setup<N>(
num_nodes: usize,

View File

@@ -21,7 +21,6 @@ use tempfile::TempDir;
use tracing::{debug, info, span, Level};
/// Setup result containing nodes and temporary directories that must be kept alive
#[allow(missing_debug_implementations)]
pub struct ChainImportResult {
/// The nodes that were created
pub nodes: Vec<NodeHelperType<EthereumNode>>,
@@ -33,6 +32,16 @@ pub struct ChainImportResult {
pub _temp_dirs: Vec<TempDir>,
}
impl std::fmt::Debug for ChainImportResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChainImportResult")
.field("nodes", &self.nodes.len())
.field("wallet", &self.wallet)
.field("temp_dirs", &self._temp_dirs.len())
.finish()
}
}
/// Creates a test setup with Ethereum nodes that have pre-imported chain data from RLP files.
///
/// This function:
@@ -40,6 +49,10 @@ pub struct ChainImportResult {
/// 2. Imports the specified RLP chain data into the datadir
/// 3. Starts the nodes with the pre-populated database
/// 4. Returns the running nodes ready for testing
///
/// Note: This function is currently specific to `EthereumNode` because the import process
/// uses Ethereum-specific consensus and block format. It can be made generic in the future
/// by abstracting the import process.
pub async fn setup_engine_with_chain_import(
num_nodes: usize,
chain_spec: Arc<ChainSpec>,
@@ -103,9 +116,8 @@ pub async fn setup_engine_with_chain_import(
// Create a provider factory with the initialized database (use regular DB, not
// TempDatabase) We need to specify the node types properly for the adapter
type ImportNodeTypes = reth_node_ethereum::EthereumNode;
let provider_factory = ProviderFactory::<
NodeTypesWithDBAdapter<ImportNodeTypes, Arc<DatabaseEnv>>,
NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>,
>::new(
db.clone(),
chain_spec.clone(),
@@ -226,7 +238,7 @@ pub async fn setup_engine_with_chain_import(
Ok(ChainImportResult {
nodes,
task_manager: tasks,
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain().into()),
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
_temp_dirs: temp_dirs,
})
}
@@ -256,193 +268,15 @@ pub fn load_forkchoice_state(path: &Path) -> eyre::Result<alloy_rpc_types_engine
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::{constants::EMPTY_WITHDRAWALS, BlockHeader, Header};
use alloy_eips::eip4895::Withdrawals;
use alloy_primitives::{Address, B256, B64, U256};
use reth_chainspec::{ChainSpecBuilder, EthereumHardforks, MAINNET};
use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_db::mdbx::DatabaseArguments;
use reth_ethereum_primitives::{Block, BlockBody};
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_primitives::SealedBlock;
use reth_primitives_traits::Block as BlockTrait;
use reth_provider::{
test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
};
use std::io::Write;
/// Generate test blocks for a given chain spec
fn generate_test_blocks(chain_spec: &ChainSpec, count: u64) -> Vec<SealedBlock> {
let mut blocks: Vec<SealedBlock> = Vec::new();
let genesis_header = chain_spec.sealed_genesis_header();
let mut parent_hash = genesis_header.hash();
let mut parent_number = genesis_header.number();
let mut parent_base_fee = genesis_header.base_fee_per_gas;
let mut parent_gas_limit = genesis_header.gas_limit;
debug!(target: "e2e::import",
"Genesis header base fee: {:?}, gas limit: {}, state root: {:?}",
parent_base_fee,
parent_gas_limit,
genesis_header.state_root()
);
for i in 1..=count {
// Create a simple header
let mut header = Header {
parent_hash,
number: parent_number + 1,
gas_limit: parent_gas_limit, // Use parent's gas limit
gas_used: 0, // Empty blocks use no gas
timestamp: genesis_header.timestamp() + i * 12, // 12 second blocks
beneficiary: Address::ZERO,
receipts_root: alloy_consensus::constants::EMPTY_RECEIPTS,
logs_bloom: Default::default(),
difficulty: U256::from(1), // Will be overridden for post-merge
// Use the same state root as parent for now (empty state changes)
state_root: if i == 1 {
genesis_header.state_root()
} else {
blocks.last().unwrap().state_root
},
transactions_root: alloy_consensus::constants::EMPTY_TRANSACTIONS,
ommers_hash: alloy_consensus::constants::EMPTY_OMMER_ROOT_HASH,
mix_hash: B256::ZERO,
nonce: B64::from(0u64),
extra_data: Default::default(),
base_fee_per_gas: None,
withdrawals_root: None,
blob_gas_used: None,
excess_blob_gas: None,
parent_beacon_block_root: None,
requests_hash: None,
};
// Set required fields based on chain spec
if chain_spec.is_london_active_at_block(header.number) {
// Calculate base fee based on parent block
if let Some(parent_fee) = parent_base_fee {
// For the first block, we need to use the exact expected base fee
// The consensus rules expect it to be calculated from the genesis
let (parent_gas_used, parent_gas_limit) = if i == 1 {
// Genesis block parameters
(genesis_header.gas_used, genesis_header.gas_limit)
} else {
let last_block = blocks.last().unwrap();
(last_block.gas_used, last_block.gas_limit)
};
header.base_fee_per_gas = Some(alloy_eips::calc_next_block_base_fee(
parent_gas_used,
parent_gas_limit,
parent_fee,
chain_spec.base_fee_params_at_timestamp(header.timestamp),
));
debug!(target: "e2e::import", "Block {} calculated base fee: {:?} (parent gas used: {}, parent gas limit: {}, parent base fee: {})",
i, header.base_fee_per_gas, parent_gas_used, parent_gas_limit, parent_fee);
parent_base_fee = header.base_fee_per_gas;
}
}
// For post-merge blocks
if chain_spec.is_paris_active_at_block(header.number) {
header.difficulty = U256::ZERO;
header.nonce = B64::ZERO;
}
// For post-shanghai blocks
if chain_spec.is_shanghai_active_at_timestamp(header.timestamp) {
header.withdrawals_root = Some(EMPTY_WITHDRAWALS);
}
// For post-cancun blocks
if chain_spec.is_cancun_active_at_timestamp(header.timestamp) {
header.blob_gas_used = Some(0);
header.excess_blob_gas = Some(0);
header.parent_beacon_block_root = Some(B256::ZERO);
}
// Create an empty block body
let body = BlockBody {
transactions: vec![],
ommers: vec![],
withdrawals: header.withdrawals_root.is_some().then(Withdrawals::default),
};
// Create the block
let block = Block { header: header.clone(), body: body.clone() };
let sealed_block = BlockTrait::seal_slow(block);
debug!(target: "e2e::import",
"Generated block {} with hash {:?}",
sealed_block.number(),
sealed_block.hash()
);
debug!(target: "e2e::import",
" Body has {} transactions, {} ommers, withdrawals: {}",
body.transactions.len(),
body.ommers.len(),
body.withdrawals.is_some()
);
// Update parent for next iteration
parent_hash = sealed_block.hash();
parent_number = sealed_block.number();
parent_gas_limit = sealed_block.gas_limit;
if header.base_fee_per_gas.is_some() {
parent_base_fee = header.base_fee_per_gas;
}
blocks.push(sealed_block);
}
blocks
}
/// Write blocks to RLP file
fn write_blocks_to_rlp(blocks: &[SealedBlock], path: &Path) -> std::io::Result<()> {
use alloy_rlp::Encodable;
let mut file = std::fs::File::create(path)?;
let mut total_bytes = 0;
for (i, block) in blocks.iter().enumerate() {
// Convert SealedBlock to Block before encoding
let block_for_encoding = block.clone().unseal();
let mut buf = Vec::new();
block_for_encoding.encode(&mut buf);
debug!(target: "e2e::import",
"Block {} has {} transactions, encoded to {} bytes",
i,
block.body().transactions.len(),
buf.len()
);
// Debug: check what's in the encoded data
debug!(target: "e2e::import", "Block {} encoded to {} bytes", i, buf.len());
if buf.len() < 20 {
debug!(target: "e2e::import", " Raw bytes: {:?}", &buf);
} else {
debug!(target: "e2e::import", " First 20 bytes: {:?}", &buf[..20]);
}
total_bytes += buf.len();
file.write_all(&buf)?;
}
file.flush()?;
debug!(target: "e2e::import", "Total RLP bytes written: {total_bytes}");
Ok(())
}
/// Create FCU JSON for the tip of the chain
fn create_fcu_json(tip: &SealedBlock) -> serde_json::Value {
serde_json::json!({
"forkchoiceState": {
"headBlockHash": format!("0x{:x}", tip.hash()),
"safeBlockHash": format!("0x{:x}", tip.hash()),
"finalizedBlockHash": format!("0x{:x}", tip.hash()),
}
})
}
use std::path::PathBuf;
#[tokio::test]
async fn test_stage_checkpoints_persistence() {
@@ -595,7 +429,7 @@ mod tests {
chain_spec: &ChainSpec,
block_count: u64,
temp_dir: &Path,
) -> (Vec<SealedBlock>, std::path::PathBuf) {
) -> (Vec<SealedBlock>, PathBuf) {
let test_blocks = generate_test_blocks(chain_spec, block_count);
assert_eq!(
test_blocks.len(),

View File

@@ -0,0 +1,185 @@
//! Utilities for creating and writing RLP test data
use alloy_consensus::{constants::EMPTY_WITHDRAWALS, BlockHeader, Header};
use alloy_eips::eip4895::Withdrawals;
use alloy_primitives::{Address, B256, B64, U256};
use alloy_rlp::Encodable;
use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_ethereum_primitives::{Block, BlockBody};
use reth_primitives::SealedBlock;
use reth_primitives_traits::Block as BlockTrait;
use std::{io::Write, path::Path};
use tracing::debug;
/// Generate test blocks for a given chain spec
pub fn generate_test_blocks(chain_spec: &ChainSpec, count: u64) -> Vec<SealedBlock> {
let mut blocks: Vec<SealedBlock> = Vec::new();
let genesis_header = chain_spec.sealed_genesis_header();
let mut parent_hash = genesis_header.hash();
let mut parent_number = genesis_header.number();
let mut parent_base_fee = genesis_header.base_fee_per_gas;
let mut parent_gas_limit = genesis_header.gas_limit;
debug!(target: "e2e::import",
"Genesis header base fee: {:?}, gas limit: {}, state root: {:?}",
parent_base_fee,
parent_gas_limit,
genesis_header.state_root()
);
for i in 1..=count {
// Create a simple header
let mut header = Header {
parent_hash,
number: parent_number + 1,
gas_limit: parent_gas_limit, // Use parent's gas limit
gas_used: 0, // Empty blocks use no gas
timestamp: genesis_header.timestamp() + i * 12, // 12 second blocks
beneficiary: Address::ZERO,
receipts_root: alloy_consensus::constants::EMPTY_RECEIPTS,
logs_bloom: Default::default(),
difficulty: U256::from(1), // Will be overridden for post-merge
// Use the same state root as parent for now (empty state changes)
state_root: if i == 1 {
genesis_header.state_root()
} else {
blocks.last().unwrap().state_root
},
transactions_root: alloy_consensus::constants::EMPTY_TRANSACTIONS,
ommers_hash: alloy_consensus::constants::EMPTY_OMMER_ROOT_HASH,
mix_hash: B256::ZERO,
nonce: B64::from(0u64),
extra_data: Default::default(),
base_fee_per_gas: None,
withdrawals_root: None,
blob_gas_used: None,
excess_blob_gas: None,
parent_beacon_block_root: None,
requests_hash: None,
};
// Set required fields based on chain spec
if chain_spec.is_london_active_at_block(header.number) {
// Calculate base fee based on parent block
if let Some(parent_fee) = parent_base_fee {
// For the first block, we need to use the exact expected base fee
// The consensus rules expect it to be calculated from the genesis
let (parent_gas_used, parent_gas_limit) = if i == 1 {
// Genesis block parameters
(genesis_header.gas_used, genesis_header.gas_limit)
} else {
let last_block = blocks.last().unwrap();
(last_block.gas_used, last_block.gas_limit)
};
header.base_fee_per_gas = Some(alloy_eips::calc_next_block_base_fee(
parent_gas_used,
parent_gas_limit,
parent_fee,
chain_spec.base_fee_params_at_timestamp(header.timestamp),
));
debug!(target: "e2e::import", "Block {} calculated base fee: {:?} (parent gas used: {}, parent gas limit: {}, parent base fee: {})",
i, header.base_fee_per_gas, parent_gas_used, parent_gas_limit, parent_fee);
parent_base_fee = header.base_fee_per_gas;
}
}
// For post-merge blocks
if chain_spec.is_paris_active_at_block(header.number) {
header.difficulty = U256::ZERO;
header.nonce = B64::ZERO;
}
// For post-shanghai blocks
if chain_spec.is_shanghai_active_at_timestamp(header.timestamp) {
header.withdrawals_root = Some(EMPTY_WITHDRAWALS);
}
// For post-cancun blocks
if chain_spec.is_cancun_active_at_timestamp(header.timestamp) {
header.blob_gas_used = Some(0);
header.excess_blob_gas = Some(0);
header.parent_beacon_block_root = Some(B256::ZERO);
}
// Create an empty block body
let body = BlockBody {
transactions: vec![],
ommers: vec![],
withdrawals: header.withdrawals_root.is_some().then(Withdrawals::default),
};
// Create the block
let block = Block { header: header.clone(), body: body.clone() };
let sealed_block = BlockTrait::seal_slow(block);
debug!(target: "e2e::import",
"Generated block {} with hash {:?}",
sealed_block.number(),
sealed_block.hash()
);
debug!(target: "e2e::import",
" Body has {} transactions, {} ommers, withdrawals: {}",
body.transactions.len(),
body.ommers.len(),
body.withdrawals.is_some()
);
// Update parent for next iteration
parent_hash = sealed_block.hash();
parent_number = sealed_block.number();
parent_gas_limit = sealed_block.gas_limit;
if header.base_fee_per_gas.is_some() {
parent_base_fee = header.base_fee_per_gas;
}
blocks.push(sealed_block);
}
blocks
}
/// Write blocks to RLP file
pub fn write_blocks_to_rlp(blocks: &[SealedBlock], path: &Path) -> std::io::Result<()> {
let mut file = std::fs::File::create(path)?;
let mut total_bytes = 0;
for (i, block) in blocks.iter().enumerate() {
// Convert SealedBlock to Block before encoding
let block_for_encoding = block.clone().unseal();
let mut buf = Vec::new();
block_for_encoding.encode(&mut buf);
debug!(target: "e2e::import",
"Block {} has {} transactions, encoded to {} bytes",
i,
block.body().transactions.len(),
buf.len()
);
// Debug: check what's in the encoded data
debug!(target: "e2e::import", "Block {} encoded to {} bytes", i, buf.len());
if buf.len() < 20 {
debug!(target: "e2e::import", " Raw bytes: {:?}", &buf);
} else {
debug!(target: "e2e::import", " First 20 bytes: {:?}", &buf[..20]);
}
total_bytes += buf.len();
file.write_all(&buf)?;
}
file.flush()?;
debug!(target: "e2e::import", "Total RLP bytes written: {total_bytes}");
Ok(())
}
/// Create FCU JSON for the tip of the chain
pub fn create_fcu_json(tip: &SealedBlock) -> serde_json::Value {
serde_json::json!({
"forkchoiceState": {
"headBlockHash": format!("0x{:x}", tip.hash()),
"safeBlockHash": format!("0x{:x}", tip.hash()),
"finalizedBlockHash": format!("0x{:x}", tip.hash()),
}
})
}

View File

@@ -18,7 +18,8 @@ pub mod reorg;
pub use engine_api::{ExpectedPayloadStatus, SendNewPayload, SendNewPayloads};
pub use fork::{CreateFork, ForkBase, SetForkBase, SetForkBaseFromBlockInfo, ValidateFork};
pub use node_ops::{
CaptureBlockOnNode, CompareNodeChainTips, SelectActiveNode, ValidateBlockTag, WaitForSync,
AssertChainTip, CaptureBlockOnNode, CompareNodeChainTips, SelectActiveNode, ValidateBlockTag,
WaitForSync,
};
pub use produce_blocks::{
AssertMineBlock, BroadcastLatestForkchoice, BroadcastNextNewPayload, CheckPayloadAccepted,

View File

@@ -338,3 +338,45 @@ where
})
}
}
/// Action to assert the current chain tip is at a specific block number.
#[derive(Debug)]
pub struct AssertChainTip {
/// Expected block number
pub expected_block_number: u64,
}
impl AssertChainTip {
/// Create a new `AssertChainTip` action
pub const fn new(expected_block_number: u64) -> Self {
Self { expected_block_number }
}
}
impl<Engine> Action<Engine> for AssertChainTip
where
Engine: EngineTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let current_block = env
.current_block_info()
.ok_or_else(|| eyre::eyre!("No current block information available"))?;
if current_block.number != self.expected_block_number {
return Err(eyre::eyre!(
"Expected chain tip to be at block {}, but found block {}",
self.expected_block_number,
current_block.number
));
}
debug!(
"Chain tip verified at block {} (hash: {})",
current_block.number, current_block.hash
);
Ok(())
})
}
}

View File

@@ -2,8 +2,9 @@
use crate::testsuite::{
actions::{
AssertMineBlock, CaptureBlock, CaptureBlockOnNode, CompareNodeChainTips, CreateFork,
MakeCanonical, ProduceBlocks, ReorgTo, SelectActiveNode,
Action, AssertChainTip, AssertMineBlock, CaptureBlock, CaptureBlockOnNode,
CompareNodeChainTips, CreateFork, MakeCanonical, ProduceBlocks, ReorgTo, SelectActiveNode,
UpdateBlockInfo,
},
setup::{NetworkSetup, Setup},
TestBuilder,
@@ -15,6 +16,107 @@ use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_node_api::TreeConfig;
use reth_node_ethereum::{EthEngineTypes, EthereumNode};
use std::sync::Arc;
use tracing::debug;
#[tokio::test]
async fn test_apply_with_import() -> Result<()> {
use crate::test_rlp_utils::{generate_test_blocks, write_blocks_to_rlp};
use tempfile::TempDir;
reth_tracing::init_test_tracing();
// Create test chain spec
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("assets/genesis.json")).unwrap())
.london_activated()
.shanghai_activated()
.cancun_activated()
.build(),
);
// Generate test blocks
let test_blocks = generate_test_blocks(&chain_spec, 10);
// Write blocks to RLP file
let temp_dir = TempDir::new()?;
let rlp_path = temp_dir.path().join("test_chain.rlp");
write_blocks_to_rlp(&test_blocks, &rlp_path)?;
// Create setup with imported chain
let mut setup =
Setup::default().with_chain_spec(chain_spec).with_network(NetworkSetup::single_node());
// Create environment and apply setup with import
let mut env = crate::testsuite::Environment::<EthEngineTypes>::default();
setup.apply_with_import::<EthereumNode>(&mut env, &rlp_path).await?;
// Now run test actions on the environment with imported chain
// First check what block we're at after import
debug!("Current block info after import: {:?}", env.current_block_info());
// Update block info to sync environment state with the node
let mut update_block_info = UpdateBlockInfo::default();
update_block_info.execute(&mut env).await?;
// Make the imported chain canonical first
let mut make_canonical = MakeCanonical::new();
make_canonical.execute(&mut env).await?;
// Wait for the pipeline to finish processing all stages
debug!("Waiting for pipeline to finish processing imported blocks...");
let start = std::time::Instant::now();
loop {
// Check if we can get the block from RPC (indicates pipeline finished)
let client = &env.node_clients[0];
let block_result = reth_rpc_api::clients::EthApiClient::<
alloy_rpc_types_eth::TransactionRequest,
alloy_rpc_types_eth::Transaction,
alloy_rpc_types_eth::Block,
alloy_rpc_types_eth::Receipt,
alloy_rpc_types_eth::Header,
>::block_by_number(
&client.rpc,
alloy_eips::BlockNumberOrTag::Number(10),
true, // Include full transaction details
)
.await;
if let Ok(Some(block)) = block_result {
if block.header.number == 10 {
debug!("Pipeline finished, block 10 is fully available");
break;
}
}
if start.elapsed() > std::time::Duration::from_secs(10) {
return Err(eyre::eyre!("Timeout waiting for pipeline to finish"));
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
// Update block info again after making canonical
let mut update_block_info_2 = UpdateBlockInfo::default();
update_block_info_2.execute(&mut env).await?;
// Assert we're at block 10 after import
let mut assert_tip = AssertChainTip::new(10);
assert_tip.execute(&mut env).await?;
debug!("Successfully imported chain to block 10");
// Produce 5 more blocks
let mut produce_blocks = ProduceBlocks::<EthEngineTypes>::new(5);
produce_blocks.execute(&mut env).await?;
// Assert we're now at block 15
let mut assert_new_tip = AssertChainTip::new(15);
assert_new_tip.execute(&mut env).await?;
Ok(())
}
#[tokio::test]
async fn test_testsuite_assert_mine_block() -> Result<()> {

View File

@@ -14,25 +14,58 @@ use std::{collections::HashMap, marker::PhantomData};
pub mod actions;
pub mod setup;
use crate::testsuite::setup::Setup;
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
use reth_rpc_builder::auth::AuthServerHandle;
use std::sync::Arc;
use url::Url;
#[cfg(test)]
mod examples;
/// Client handles for both regular RPC and Engine API endpoints
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct NodeClient {
/// Regular JSON-RPC client
pub rpc: HttpClient,
/// Engine API client
pub engine: AuthServerHandle,
/// Alloy provider for interacting with the node
provider: Arc<dyn Provider + Send + Sync>,
}
impl NodeClient {
/// Instantiates a new [`NodeClient`] with the given handles
pub const fn new(rpc: HttpClient, engine: AuthServerHandle) -> Self {
Self { rpc, engine }
/// Instantiates a new [`NodeClient`] with the given handles and RPC URL
pub fn new(rpc: HttpClient, engine: AuthServerHandle, url: Url) -> Self {
let provider =
Arc::new(ProviderBuilder::new().connect_http(url)) as Arc<dyn Provider + Send + Sync>;
Self { rpc, engine, provider }
}
/// Get a block by number using the alloy provider
pub async fn get_block_by_number(
&self,
number: alloy_eips::BlockNumberOrTag,
) -> Result<Option<alloy_rpc_types_eth::Block>> {
self.provider
.get_block_by_number(number)
.await
.map_err(|e| eyre::eyre!("Failed to get block by number: {}", e))
}
/// Check if the node is ready by attempting to get the latest block
pub async fn is_ready(&self) -> bool {
self.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest).await.is_ok()
}
}
impl std::fmt::Debug for NodeClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeClient")
.field("rpc", &self.rpc)
.field("engine", &self.engine)
.field("provider", &"<Provider>")
.finish()
}
}

View File

@@ -7,7 +7,6 @@ use crate::{
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
use alloy_rpc_types_eth::{Block as RpcBlock, Header, Receipt, Transaction, TransactionRequest};
use eyre::{eyre, Result};
use reth_chainspec::ChainSpec;
use reth_engine_local::LocalPayloadAttributesBuilder;
@@ -15,14 +14,13 @@ use reth_ethereum_primitives::Block;
use reth_node_api::{EngineTypes, NodeTypes, PayloadTypes, TreeConfig};
use reth_node_core::primitives::RecoveredBlock;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_rpc_api::clients::EthApiClient;
use revm::state::EvmState;
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, path::Path, sync::Arc};
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};
use tracing::{debug, error};
use tracing::debug;
/// Configuration for setting up test environment
#[derive(Debug)]
@@ -45,6 +43,9 @@ pub struct Setup<I> {
pub is_dev: bool,
/// Tracks instance generic.
_phantom: PhantomData<I>,
/// Holds the import result to keep nodes alive when using imported chain
/// This is stored as an option to avoid lifetime issues with `tokio::spawn`
import_result_holder: Option<crate::setup_import::ChainImportResult>,
}
impl<I> Default for Setup<I> {
@@ -59,6 +60,7 @@ impl<I> Default for Setup<I> {
shutdown_tx: None,
is_dev: true,
_phantom: Default::default(),
import_result_holder: None,
}
}
}
@@ -129,6 +131,41 @@ where
self
}
/// Apply setup using pre-imported chain data from RLP file
pub async fn apply_with_import<N>(
&mut self,
env: &mut Environment<I>,
rlp_path: &Path,
) -> Result<()>
where
N: NodeBuilderHelper,
LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
<<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
>,
{
// Create nodes with imported chain data
let import_result = self.create_nodes_with_import::<N>(rlp_path).await?;
// Extract node clients
let mut node_clients = Vec::new();
let nodes = &import_result.nodes;
for node in nodes {
let rpc = node
.rpc_client()
.ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
let auth = node.auth_server_handle();
let url = node.rpc_url();
node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
}
// Store the import result to keep nodes alive
// They will be dropped when the Setup is dropped
self.import_result_holder = Some(import_result);
// Finalize setup - this will wait for nodes and initialize states
self.finalize_setup(env, node_clients, true).await
}
/// Apply the setup to the environment
pub async fn apply<N>(&mut self, env: &mut Environment<I>) -> Result<()>
where
@@ -141,24 +178,12 @@ where
self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
self.shutdown_tx = Some(shutdown_tx);
let is_dev = self.is_dev;
let node_count = self.network.node_count;
let attributes_generator = move |timestamp| {
let attributes = PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
};
<<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
)
};
let attributes_generator = self.create_attributes_generator::<N>();
let result = setup_engine_with_connection::<N>(
node_count,
@@ -179,8 +204,9 @@ where
.rpc_client()
.ok_or_else(|| eyre!("Failed to create HTTP RPC client for node"))?;
let auth = node.auth_server_handle();
let url = node.rpc_url();
node_clients.push(crate::testsuite::NodeClient::new(rpc, auth));
node_clients.push(crate::testsuite::NodeClient::new(rpc, auth, url));
}
// spawn a separate task just to handle the shutdown
@@ -194,100 +220,180 @@ where
});
}
Err(e) => {
error!("Failed to setup nodes: {}", e);
return Err(eyre!("Failed to setup nodes: {}", e));
}
}
// Finalize setup
self.finalize_setup(env, node_clients, false).await
}
/// Create nodes with imported chain data
///
/// Note: Currently this only supports `EthereumNode` due to the import process
/// being Ethereum-specific. The generic parameter N is kept for consistency
/// with other methods but is not used.
async fn create_nodes_with_import<N>(
&self,
rlp_path: &Path,
) -> Result<crate::setup_import::ChainImportResult>
where
N: NodeBuilderHelper,
LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
<<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
>,
{
let chain_spec =
self.chain_spec.clone().ok_or_else(|| eyre!("Chain specification is required"))?;
let attributes_generator = move |timestamp| {
let attributes = PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
};
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
};
crate::setup_import::setup_engine_with_chain_import(
self.network.node_count,
chain_spec,
self.is_dev,
self.tree_config.clone(),
rlp_path,
attributes_generator,
)
.await
}
/// Create the attributes generator function
fn create_attributes_generator<N>(
&self,
) -> impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Copy
where
N: NodeBuilderHelper,
LocalPayloadAttributesBuilder<N::ChainSpec>: PayloadAttributesBuilder<
<<N as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
>,
{
move |timestamp| {
let attributes = PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
};
<<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes::from(
EthPayloadBuilderAttributes::new(B256::ZERO, attributes),
)
}
}
/// Common finalization logic for both apply methods
async fn finalize_setup(
&self,
env: &mut Environment<I>,
node_clients: Vec<crate::testsuite::NodeClient>,
use_latest_block: bool,
) -> Result<()> {
if node_clients.is_empty() {
return Err(eyre!("No nodes were created"));
}
// wait for all nodes to be ready to accept RPC requests before proceeding
for (idx, client) in node_clients.iter().enumerate() {
let mut retry_count = 0;
const MAX_RETRIES: usize = 5;
let mut last_error = None;
while retry_count < MAX_RETRIES {
match EthApiClient::<TransactionRequest, Transaction, RpcBlock, Receipt, Header>::block_by_number(
&client.rpc,
BlockNumberOrTag::Latest,
false,
)
.await
{
Ok(_) => {
debug!("Node {idx} RPC endpoint is ready");
break;
}
Err(e) => {
last_error = Some(e);
retry_count += 1;
debug!(
"Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}"
);
sleep(Duration::from_millis(500)).await;
}
}
}
if retry_count == MAX_RETRIES {
return Err(eyre!("Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries: {:?}", last_error));
}
}
// Wait for all nodes to be ready
self.wait_for_nodes_ready(&node_clients).await?;
env.node_clients = node_clients;
env.initialize_node_states(self.network.node_count);
// Initialize per-node states for all nodes
env.initialize_node_states(node_count);
// Initialize each node's state with genesis block information
let genesis_block_info = {
let first_client = &env.node_clients[0];
let genesis_block = EthApiClient::<
TransactionRequest,
Transaction,
RpcBlock,
Receipt,
Header,
>::block_by_number(
&first_client.rpc, BlockNumberOrTag::Number(0), false
)
.await?
.ok_or_else(|| eyre!("Genesis block not found"))?;
crate::testsuite::BlockInfo {
hash: genesis_block.header.hash,
number: genesis_block.header.number,
timestamp: genesis_block.header.timestamp,
}
// Get initial block info (genesis or latest depending on use_latest_block)
let (initial_block_info, genesis_block_info) = if use_latest_block {
// For imported chain, get both latest and genesis
let latest =
self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Latest).await?;
let genesis =
self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
(latest, genesis)
} else {
// For fresh chain, both are genesis
let genesis =
self.get_block_info(&env.node_clients[0], BlockNumberOrTag::Number(0)).await?;
(genesis, genesis)
};
// Initialize all node states with the same genesis block
// Initialize all node states
for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
node_state.current_block_info = Some(genesis_block_info);
node_state.latest_header_time = genesis_block_info.timestamp;
node_state.current_block_info = Some(initial_block_info);
node_state.latest_header_time = initial_block_info.timestamp;
node_state.latest_fork_choice_state = ForkchoiceState {
head_block_hash: genesis_block_info.hash,
safe_block_hash: genesis_block_info.hash,
head_block_hash: initial_block_info.hash,
safe_block_hash: initial_block_info.hash,
finalized_block_hash: genesis_block_info.hash,
};
debug!(
"Node {} initialized with genesis block {} (hash: {})",
node_idx, genesis_block_info.number, genesis_block_info.hash
"Node {} initialized with block {} (hash: {})",
node_idx, initial_block_info.number, initial_block_info.hash
);
}
debug!(
"Environment initialized with {} nodes, all starting from genesis block {} (hash: {})",
node_count, genesis_block_info.number, genesis_block_info.hash
"Environment initialized with {} nodes, starting from block {} (hash: {})",
self.network.node_count, initial_block_info.number, initial_block_info.hash
);
// TODO: For each block in self.blocks, replay it on the node
Ok(())
}
/// Wait for all nodes to be ready to accept RPC requests
async fn wait_for_nodes_ready(
&self,
node_clients: &[crate::testsuite::NodeClient],
) -> Result<()> {
for (idx, client) in node_clients.iter().enumerate() {
let mut retry_count = 0;
const MAX_RETRIES: usize = 10;
while retry_count < MAX_RETRIES {
if client.is_ready().await {
debug!("Node {idx} RPC endpoint is ready");
break;
}
retry_count += 1;
debug!("Node {idx} RPC endpoint not ready, retry {retry_count}/{MAX_RETRIES}");
sleep(Duration::from_millis(500)).await;
}
if retry_count == MAX_RETRIES {
return Err(eyre!(
"Failed to connect to node {idx} RPC endpoint after {MAX_RETRIES} retries"
));
}
}
Ok(())
}
/// Get block info for a given block number or tag
async fn get_block_info(
&self,
client: &crate::testsuite::NodeClient,
block: BlockNumberOrTag,
) -> Result<crate::testsuite::BlockInfo> {
let block = client
.get_block_by_number(block)
.await?
.ok_or_else(|| eyre!("Block {:?} not found", block))?;
Ok(crate::testsuite::BlockInfo {
hash: block.header.hash,
number: block.header.number,
timestamp: block.header.timestamp,
})
}
}
/// Genesis block configuration