use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext}; use alloy_consensus::BlockHeader; use alloy_eips::BlockId; use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256}; use alloy_rpc_types_engine::ForkchoiceState; use alloy_rpc_types_eth::BlockNumberOrTag; use eyre::Ok; use futures_util::Future; use jsonrpsee::http_client::HttpClient; use reth_chainspec::EthereumHardforks; use reth_network_api::test_utils::PeersHandleProvider; use reth_node_api::{ Block, BlockBody, BlockTy, EngineApiMessageVersion, FullNodeComponents, PayloadTypes, PrimitivesTy, }; use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes}; use reth_node_core::primitives::SignedTransaction; use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes}; use reth_provider::{ BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions, StageCheckpointReader, }; use reth_rpc_builder::auth::AuthServerHandle; use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt}; use reth_stages_types::StageId; use std::pin::Pin; use tokio_stream::StreamExt; use url::Url; /// An helper struct to handle node actions #[expect(missing_debug_implementations)] pub struct NodeTestContext where Node: FullNodeComponents, AddOns: RethRpcAddOns, { /// The core structure representing the full node. pub inner: FullNode, /// Context for testing payload-related features. pub payload: PayloadTestContext<::Payload>, /// Context for testing network functionalities. pub network: NetworkTestContext, /// Context for testing RPC features. pub rpc: RpcTestContext, /// Canonical state events. pub canonical_stream: CanonStateNotificationStream>, } impl NodeTestContext where Payload: PayloadTypes, Node: FullNodeComponents, Node::Types: NodeTypes, Node::Network: PeersHandleProvider, AddOns: RethRpcAddOns, { /// Creates a new test node pub async fn new( node: FullNode, attributes_generator: impl Fn(u64) -> Payload::PayloadBuilderAttributes + Send + Sync + 'static, ) -> eyre::Result { Ok(Self { inner: node.clone(), payload: PayloadTestContext::new( node.payload_builder_handle.clone(), attributes_generator, ) .await?, network: NetworkTestContext::new(node.network.clone()), rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry }, canonical_stream: node.provider.canonical_state_stream(), }) } /// Establish a connection to the node pub async fn connect(&mut self, node: &mut Self) { self.network.add_peer(node.network.record()).await; node.network.next_session_established().await; self.network.next_session_established().await; } /// Advances the chain `length` blocks. /// /// Returns the added chain as a Vec of block hashes. pub async fn advance( &mut self, length: u64, tx_generator: impl Fn(u64) -> Pin>>, ) -> eyre::Result> where AddOns::EthApi: EthApiSpec>> + EthTransactions + TraceExt, { let mut chain = Vec::with_capacity(length as usize); for i in 0..length { let raw_tx = tx_generator(i).await; let tx_hash = self.rpc.inject_tx(raw_tx).await?; let payload = self.advance_block().await?; let block_hash = payload.block().hash(); let block_number = payload.block().number(); self.assert_new_block(tx_hash, block_hash, block_number).await?; chain.push(payload); } Ok(chain) } /// Creates a new payload from given attributes generator /// expects a payload attribute event and waits until the payload is built. /// /// It triggers the resolve payload via engine api and expects the built payload event. pub async fn new_payload(&mut self) -> eyre::Result { // trigger new payload building draining the pool let eth_attr = self.payload.new_payload().await.unwrap(); // first event is the payload attributes self.payload.expect_attr_event(eth_attr.clone()).await?; // wait for the payload builder to have finished building self.payload.wait_for_built_payload(eth_attr.payload_id()).await; // ensure we're also receiving the built payload as event Ok(self.payload.expect_built_payload().await?) } /// Triggers payload building job and submits it to the engine. pub async fn build_and_submit_payload(&mut self) -> eyre::Result { let payload = self.new_payload().await?; self.submit_payload(payload.clone()).await?; Ok(payload) } /// Advances the node forward one block pub async fn advance_block(&mut self) -> eyre::Result { let payload = self.build_and_submit_payload().await?; // trigger forkchoice update via engine api to commit the block to the blockchain self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?; Ok(payload) } /// Waits for block to be available on node. pub async fn wait_block( &self, number: BlockNumber, expected_block_hash: BlockHash, wait_finish_checkpoint: bool, ) -> eyre::Result<()> { let mut check = !wait_finish_checkpoint; loop { tokio::time::sleep(std::time::Duration::from_millis(20)).await; if !check && wait_finish_checkpoint { if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Finish)? { if checkpoint.block_number >= number { check = true } } } if check { if let Some(latest_block) = self.inner.provider.block_by_number(number)? { assert_eq!(latest_block.header().hash_slow(), expected_block_hash); break } assert!( !wait_finish_checkpoint, "Finish checkpoint matches, but could not fetch block." ); } } Ok(()) } /// Waits for the node to unwind to the given block number pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> { loop { tokio::time::sleep(std::time::Duration::from_millis(10)).await; if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? { if checkpoint.block_number == number { break } } } Ok(()) } /// Asserts that a new block has been added to the blockchain /// and the tx has been included in the block. /// /// Does NOT work for pipeline since there's no stream notification! pub async fn assert_new_block( &mut self, tip_tx_hash: B256, block_hash: B256, block_number: BlockNumber, ) -> eyre::Result<()> { // get head block from notifications stream and verify the tx has been pushed to the // pool is actually present in the canonical block let head = self.canonical_stream.next().await.unwrap(); let tx = head.tip().body().transactions().first(); assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice()); loop { // wait for the block to commit tokio::time::sleep(std::time::Duration::from_millis(20)).await; if let Some(latest_block) = self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? { if latest_block.header().number() == block_number { // make sure the block hash we submitted via FCU engine api is the new latest // block using an RPC call assert_eq!(latest_block.header().hash_slow(), block_hash); break } } } Ok(()) } /// Gets block hash by number. pub fn block_hash(&self, number: u64) -> BlockHash { self.inner .provider .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number)) .unwrap() .unwrap() .hash() } /// Sends FCU and waits for the node to sync to the given block. pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> { let start = std::time::Instant::now(); while self .inner .provider .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))? .is_none_or(|h| h.hash() != block) { tokio::time::sleep(std::time::Duration::from_millis(100)).await; self.update_forkchoice(block, block).await?; assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out"); } // Hack to make sure that all components have time to process canonical state update. // Otherwise, this might result in e.g "nonce too low" errors when advancing chain further, // making tests flaky. tokio::time::sleep(std::time::Duration::from_millis(1000)).await; Ok(()) } /// Sends a forkchoice update message to the engine. pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> { self.inner .add_ons_handle .beacon_engine_handle .fork_choice_updated( ForkchoiceState { head_block_hash: new_head, safe_block_hash: current_head, finalized_block_hash: current_head, }, None, EngineApiMessageVersion::default(), ) .await?; Ok(()) } /// Sends forkchoice update to the engine api with a zero finalized hash pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> { self.update_forkchoice(B256::ZERO, hash).await } /// Submits a payload to the engine. pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result { let block_hash = payload.block().hash(); self.inner .add_ons_handle .beacon_engine_handle .new_payload(Payload::block_to_payload(payload.block().clone())) .await?; Ok(block_hash) } /// Returns the RPC URL. pub fn rpc_url(&self) -> Url { let addr = self.inner.rpc_server_handle().http_local_addr().unwrap(); format!("http://{addr}").parse().unwrap() } /// Returns an RPC client. pub fn rpc_client(&self) -> Option { self.inner.rpc_server_handle().http_client() } /// Returns an Engine API client. pub fn auth_server_handle(&self) -> AuthServerHandle { self.inner.auth_server_handle().clone() } }