use crate::{ engine_api::EngineApiTestContext, network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext, traits::PayloadEnvelopeExt, }; use alloy_consensus::BlockHeader; use alloy_eips::BlockId; use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256}; use alloy_rpc_types_engine::PayloadStatusEnum; use alloy_rpc_types_eth::BlockNumberOrTag; use eyre::Ok; use futures_util::Future; use reth_chainspec::EthereumHardforks; use reth_network_api::test_utils::PeersHandleProvider; use reth_node_api::{Block, BlockBody, BlockTy, EngineTypes, FullNodeComponents, PrimitivesTy}; use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes, NodeTypesWithEngine}; use reth_node_core::primitives::SignedTransaction; use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes}; use reth_provider::{ BlockReader, BlockReaderIdExt, CanonStateSubscriptions, StageCheckpointReader, }; use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt}; use reth_stages_types::StageId; use std::{marker::PhantomData, pin::Pin}; use tokio_stream::StreamExt; use url::Url; /// An helper struct to handle node actions #[expect(missing_debug_implementations, clippy::complexity)] pub struct NodeTestContext where Node: FullNodeComponents, AddOns: RethRpcAddOns, { /// The core structure representing the full node. pub inner: FullNode, /// Context for testing payload-related features. pub payload: PayloadTestContext<::Engine>, /// Context for testing network functionalities. pub network: NetworkTestContext, /// Context for testing the Engine API. pub engine_api: EngineApiTestContext< ::Engine, ::ChainSpec, PrimitivesTy, >, /// Context for testing RPC features. pub rpc: RpcTestContext, } impl NodeTestContext where Engine: EngineTypes, Node: FullNodeComponents, Node::Types: NodeTypesWithEngine, Node::Network: PeersHandleProvider, AddOns: RethRpcAddOns, { /// Creates a new test node pub async fn new( node: FullNode, attributes_generator: impl Fn(u64) -> Engine::PayloadBuilderAttributes + 'static, ) -> eyre::Result { Ok(Self { inner: node.clone(), payload: PayloadTestContext::new( node.payload_builder_handle.clone(), attributes_generator, ) .await?, network: NetworkTestContext::new(node.network.clone()), engine_api: EngineApiTestContext { chain_spec: node.chain_spec(), engine_api_client: node.auth_server_handle().http_client(), canonical_stream: node.provider.canonical_state_stream(), _marker: PhantomData::, }, rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry }, }) } /// Establish a connection to the node pub async fn connect(&mut self, node: &mut Self) { self.network.add_peer(node.network.record()).await; node.network.next_session_established().await; self.network.next_session_established().await; } /// Advances the chain `length` blocks. /// /// Returns the added chain as a Vec of block hashes. pub async fn advance( &mut self, length: u64, tx_generator: impl Fn(u64) -> Pin>>, ) -> eyre::Result> where Engine::ExecutionPayloadEnvelopeV3: From + PayloadEnvelopeExt, Engine::ExecutionPayloadEnvelopeV4: From + PayloadEnvelopeExt, AddOns::EthApi: EthApiSpec>> + EthTransactions + TraceExt, { let mut chain = Vec::with_capacity(length as usize); for i in 0..length { let raw_tx = tx_generator(i).await; let tx_hash = self.rpc.inject_tx(raw_tx).await?; let (payload, eth_attr) = self.advance_block().await?; let block_hash = payload.block().hash(); let block_number = payload.block().number(); self.assert_new_block(tx_hash, block_hash, block_number).await?; chain.push((payload, eth_attr)); } Ok(chain) } /// Creates a new payload from given attributes generator /// expects a payload attribute event and waits until the payload is built. /// /// It triggers the resolve payload via engine api and expects the built payload event. pub async fn new_payload( &mut self, ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)> where ::ExecutionPayloadEnvelopeV3: From + PayloadEnvelopeExt, { // trigger new payload building draining the pool let eth_attr = self.payload.new_payload().await.unwrap(); // first event is the payload attributes self.payload.expect_attr_event(eth_attr.clone()).await?; // wait for the payload builder to have finished building self.payload.wait_for_built_payload(eth_attr.payload_id()).await; // trigger resolve payload via engine api self.engine_api.get_payload_v3_value(eth_attr.payload_id()).await?; // ensure we're also receiving the built payload as event Ok((self.payload.expect_built_payload().await?, eth_attr)) } /// Triggers payload building job and submits it to the engine. pub async fn build_and_submit_payload( &mut self, ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)> where ::ExecutionPayloadEnvelopeV3: From + PayloadEnvelopeExt, ::ExecutionPayloadEnvelopeV4: From + PayloadEnvelopeExt, { let (payload, eth_attr) = self.new_payload().await?; self.engine_api .submit_payload(payload.clone(), eth_attr.clone(), PayloadStatusEnum::Valid) .await?; Ok((payload, eth_attr)) } /// Advances the node forward one block pub async fn advance_block( &mut self, ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)> where ::ExecutionPayloadEnvelopeV3: From + PayloadEnvelopeExt, ::ExecutionPayloadEnvelopeV4: From + PayloadEnvelopeExt, { let (payload, eth_attr) = self.build_and_submit_payload().await?; // trigger forkchoice update via engine api to commit the block to the blockchain self.engine_api.update_forkchoice(payload.block().hash(), payload.block().hash()).await?; Ok((payload, eth_attr)) } /// Waits for block to be available on node. pub async fn wait_block( &self, number: BlockNumber, expected_block_hash: BlockHash, wait_finish_checkpoint: bool, ) -> eyre::Result<()> { let mut check = !wait_finish_checkpoint; loop { tokio::time::sleep(std::time::Duration::from_millis(20)).await; if !check && wait_finish_checkpoint { if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Finish)? { if checkpoint.block_number >= number { check = true } } } if check { if let Some(latest_block) = self.inner.provider.block_by_number(number)? { assert_eq!(latest_block.header().hash_slow(), expected_block_hash); break } assert!( !wait_finish_checkpoint, "Finish checkpoint matches, but could not fetch block." ); } } Ok(()) } /// Waits for the node to unwind to the given block number pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> { loop { tokio::time::sleep(std::time::Duration::from_millis(10)).await; if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? { if checkpoint.block_number == number { break } } } Ok(()) } /// Asserts that a new block has been added to the blockchain /// and the tx has been included in the block. /// /// Does NOT work for pipeline since there's no stream notification! pub async fn assert_new_block( &mut self, tip_tx_hash: B256, block_hash: B256, block_number: BlockNumber, ) -> eyre::Result<()> { // get head block from notifications stream and verify the tx has been pushed to the // pool is actually present in the canonical block let head = self.engine_api.canonical_stream.next().await.unwrap(); let tx = head.tip().body().transactions().first(); assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice()); loop { // wait for the block to commit tokio::time::sleep(std::time::Duration::from_millis(20)).await; if let Some(latest_block) = self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? { if latest_block.header().number() == block_number { // make sure the block hash we submitted via FCU engine api is the new latest // block using an RPC call assert_eq!(latest_block.header().hash_slow(), block_hash); break } } } Ok(()) } /// Gets block hash by number. pub fn block_hash(&self, number: u64) -> BlockHash { self.inner .provider .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number)) .unwrap() .unwrap() .hash() } /// Sends FCU and waits for the node to sync to the given block. pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> { let start = std::time::Instant::now(); while self .inner .provider .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))? .is_none_or(|h| h.hash() != block) { tokio::time::sleep(std::time::Duration::from_millis(100)).await; self.engine_api.update_forkchoice(block, block).await?; assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out"); } // Hack to make sure that all components have time to process canonical state update. // Otherwise, this might result in e.g "nonce too low" errors when advancing chain further, // making tests flaky. tokio::time::sleep(std::time::Duration::from_millis(1000)).await; Ok(()) } /// Returns the RPC URL. pub fn rpc_url(&self) -> Url { let addr = self.inner.rpc_server_handle().http_local_addr().unwrap(); format!("http://{}", addr).parse().unwrap() } }