From 516acefa3491b1b5d0b6ae6b7df7384649d5608d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 19 Jul 2024 07:59:30 +0100 Subject: [PATCH] chore(exex): organize backfill modules (#9631) --- crates/exex/exex/Cargo.toml | 26 +- crates/exex/exex/src/backfill/factory.rs | 65 +++ crates/exex/exex/src/backfill/job.rs | 267 +++++++++- crates/exex/exex/src/backfill/mod.rs | 515 +------------------- crates/exex/exex/src/backfill/stream.rs | 74 ++- crates/exex/exex/src/backfill/test_utils.rs | 162 ++++++ 6 files changed, 570 insertions(+), 539 deletions(-) create mode 100644 crates/exex/exex/src/backfill/factory.rs create mode 100644 crates/exex/exex/src/backfill/test_utils.rs diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 8db8c3c942..48e658c408 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -14,40 +14,40 @@ workspace = true [dependencies] ## reth reth-config.workspace = true +reth-evm.workspace = true reth-exex-types.workspace = true reth-metrics.workspace = true +reth-network.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true -reth-primitives.workspace = true -reth-primitives-traits.workspace = true -reth-provider.workspace = true -reth-tasks.workspace = true -reth-tracing.workspace = true -reth-network.workspace = true reth-payload-builder.workspace = true -reth-evm.workspace = true +reth-primitives-traits.workspace = true +reth-primitives.workspace = true +reth-provider.workspace = true reth-prune-types.workspace = true reth-revm.workspace = true reth-stages-api.workspace = true +reth-tasks.workspace = true +reth-tracing.workspace = true ## async -tokio.workspace = true -tokio-util.workspace = true futures.workspace = true +tokio-util.workspace = true +tokio.workspace = true ## misc eyre.workspace = true metrics.workspace = true [dev-dependencies] -reth-chainspec.workspace = true -reth-evm-ethereum.workspace = true -reth-testing-utils.workspace = true reth-blockchain-tree.workspace = true +reth-chainspec.workspace = true +reth-db-api.workspace = true reth-db-common.workspace = true +reth-evm-ethereum.workspace = true reth-node-api.workspace = true reth-provider = { workspace = true, features = ["test-utils"] } -reth-db-api.workspace = true +reth-testing-utils.workspace = true secp256k1.workspace = true diff --git a/crates/exex/exex/src/backfill/factory.rs b/crates/exex/exex/src/backfill/factory.rs new file mode 100644 index 0000000000..3190fcaae0 --- /dev/null +++ b/crates/exex/exex/src/backfill/factory.rs @@ -0,0 +1,65 @@ +use crate::BackfillJob; +use std::ops::RangeInclusive; + +use reth_node_api::FullNodeComponents; +use reth_primitives::BlockNumber; +use reth_prune_types::PruneModes; +use reth_stages_api::ExecutionStageThresholds; + +/// Factory for creating new backfill jobs. +#[derive(Debug, Clone)] +pub struct BackfillJobFactory { + executor: E, + provider: P, + prune_modes: PruneModes, + thresholds: ExecutionStageThresholds, +} + +impl BackfillJobFactory { + /// Creates a new [`BackfillJobFactory`]. + pub fn new(executor: E, provider: P) -> Self { + Self { + executor, + provider, + prune_modes: PruneModes::none(), + thresholds: ExecutionStageThresholds::default(), + } + } + + /// Sets the prune modes + pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self { + self.prune_modes = prune_modes; + self + } + + /// Sets the thresholds + pub const fn with_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self { + self.thresholds = thresholds; + self + } +} + +impl BackfillJobFactory { + /// Creates a new backfill job for the given range. + pub fn backfill(&self, range: RangeInclusive) -> BackfillJob { + BackfillJob { + executor: self.executor.clone(), + provider: self.provider.clone(), + prune_modes: self.prune_modes.clone(), + range, + thresholds: self.thresholds.clone(), + } + } +} + +impl BackfillJobFactory<(), ()> { + /// Creates a new [`BackfillJobFactory`] from [`FullNodeComponents`]. + pub fn new_from_components( + components: Node, + ) -> BackfillJobFactory { + BackfillJobFactory::<_, _>::new( + components.block_executor().clone(), + components.provider().clone(), + ) + } +} diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index fa8695740b..64bb3a2642 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -1,15 +1,156 @@ -use reth_evm::execute::{ - BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor, +use crate::BackFillJobStream; +use std::{ + ops::RangeInclusive, + time::{Duration, Instant}, }; -use reth_primitives::{BlockNumber, BlockWithSenders, Receipt}; -use reth_provider::{ - BlockReader, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant, -}; -use reth_revm::database::StateProviderDatabase; -use reth_tracing::tracing::trace; -use std::ops::RangeInclusive; -use crate::BackfillJob; +use reth_evm::execute::{ + BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor, +}; +use reth_primitives::{Block, BlockNumber, BlockWithSenders, Receipt}; +use reth_primitives_traits::format_gas_throughput; +use reth_provider::{ + BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant, +}; +use reth_prune_types::PruneModes; +use reth_revm::database::StateProviderDatabase; +use reth_stages_api::ExecutionStageThresholds; +use reth_tracing::tracing::{debug, trace}; + +/// Backfill job started for a specific range. +/// +/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds +/// and yields [`Chain`] +#[derive(Debug)] +pub struct BackfillJob { + pub(crate) executor: E, + pub(crate) provider: P, + pub(crate) prune_modes: PruneModes, + pub(crate) thresholds: ExecutionStageThresholds, + pub(crate) range: RangeInclusive, +} + +impl Iterator for BackfillJob +where + E: BlockExecutorProvider, + P: HeaderProvider + BlockReader + StateProviderFactory, +{ + type Item = Result; + + fn next(&mut self) -> Option { + if self.range.is_empty() { + return None + } + + Some(self.execute_range()) + } +} + +impl BackfillJob +where + E: BlockExecutorProvider, + P: BlockReader + HeaderProvider + StateProviderFactory, +{ + fn execute_range(&mut self) -> Result { + let mut executor = self.executor.batch_executor(StateProviderDatabase::new( + self.provider.history_by_block_number(self.range.start().saturating_sub(1))?, + )); + executor.set_prune_modes(self.prune_modes.clone()); + + let mut fetch_block_duration = Duration::default(); + let mut execution_duration = Duration::default(); + let mut cumulative_gas = 0; + let batch_start = Instant::now(); + + let mut blocks = Vec::new(); + for block_number in self.range.clone() { + // Fetch the block + let fetch_block_start = Instant::now(); + + let td = self + .provider + .header_td_by_number(block_number)? + .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; + + // we need the block's transactions along with their hashes + let block = self + .provider + .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)? + .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; + + fetch_block_duration += fetch_block_start.elapsed(); + + cumulative_gas += block.gas_used; + + // Configure the executor to use the current state. + trace!(target: "exex::backfill", number = block_number, txs = block.body.len(), "Executing block"); + + // Execute the block + let execute_start = Instant::now(); + + // Unseal the block for execution + let (block, senders) = block.into_components(); + let (unsealed_header, hash) = block.header.split(); + let block = Block { + header: unsealed_header, + body: block.body, + ommers: block.ommers, + withdrawals: block.withdrawals, + requests: block.requests, + } + .with_senders_unchecked(senders); + + executor.execute_and_verify_one((&block, td).into())?; + execution_duration += execute_start.elapsed(); + + // TODO(alexey): report gas metrics using `block.header.gas_used` + + // Seal the block back and save it + blocks.push(block.seal(hash)); + + // Check if we should commit now + let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64; + if self.thresholds.is_end_of_batch( + block_number - *self.range.start(), + bundle_size_hint, + cumulative_gas, + batch_start.elapsed(), + ) { + break + } + } + + let last_block_number = blocks.last().expect("blocks should not be empty").number; + debug!( + target: "exex::backfill", + range = ?*self.range.start()..=last_block_number, + block_fetch = ?fetch_block_duration, + execution = ?execution_duration, + throughput = format_gas_throughput(cumulative_gas, execution_duration), + "Finished executing block range" + ); + self.range = last_block_number + 1..=*self.range.end(); + + let chain = Chain::new(blocks, executor.finalize(), None); + Ok(chain) + } +} + +impl BackfillJob { + /// Converts the backfill job into a single block backfill job. + pub fn into_single_blocks(self) -> SingleBlockBackfillJob { + self.into() + } + + /// Converts the backfill job into a backfill job stream. + pub fn into_stream(self) -> BackFillJobStream + where + E: BlockExecutorProvider + Clone + 'static, + P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static, + { + BackFillJobStream::new(self.into_single_blocks()) + } +} /// Single block Backfill job started for a specific range. /// @@ -72,3 +213,109 @@ impl From> for SingleBlockBackfillJob { Self { executor: value.executor, provider: value.provider, range: value.range } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::{ + backfill::test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome}, + BackfillJobFactory, + }; + use reth_blockchain_tree::noop::NoopBlockchainTree; + use reth_db_common::init::init_genesis; + use reth_evm_ethereum::execute::EthExecutorProvider; + use reth_primitives::public_key_to_address; + use reth_provider::{ + providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, + }; + use reth_testing_utils::generators; + use secp256k1::Keypair; + + #[test] + fn test_backfill() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Create a key pair for the sender + let key_pair = Keypair::new_global(&mut generators::rng()); + let address = public_key_to_address(key_pair.public_key()); + + let chain_spec = chain_spec(address); + + let executor = EthExecutorProvider::ethereum(chain_spec.clone()); + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); + init_genesis(provider_factory.clone())?; + let blockchain_db = BlockchainProvider::new( + provider_factory.clone(), + Arc::new(NoopBlockchainTree::default()), + )?; + + let blocks_and_execution_outputs = + blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; + let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap(); + let execution_outcome = to_execution_outcome(block.number, block_execution_output); + + // Backfill the first block + let factory = BackfillJobFactory::new(executor, blockchain_db); + let job = factory.backfill(1..=1); + let chains = job.collect::, _>>()?; + + // Assert that the backfill job produced the same chain as we got before when we were + // executing only the first block + assert_eq!(chains.len(), 1); + let mut chain = chains.into_iter().next().unwrap(); + chain.execution_outcome_mut().bundle.reverts.sort(); + assert_eq!(chain.blocks(), &[(1, block.clone())].into()); + assert_eq!(chain.execution_outcome(), &execution_outcome); + + Ok(()) + } + + #[test] + fn test_single_block_backfill() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Create a key pair for the sender + let key_pair = Keypair::new_global(&mut generators::rng()); + let address = public_key_to_address(key_pair.public_key()); + + let chain_spec = chain_spec(address); + + let executor = EthExecutorProvider::ethereum(chain_spec.clone()); + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); + init_genesis(provider_factory.clone())?; + let blockchain_db = BlockchainProvider::new( + provider_factory.clone(), + Arc::new(NoopBlockchainTree::default()), + )?; + + let blocks_and_execution_outcomes = + blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; + + // Backfill the first block + let factory = BackfillJobFactory::new(executor, blockchain_db); + let job = factory.backfill(1..=1); + let single_job = job.into_single_blocks(); + let block_execution_it = single_job.into_iter(); + + // Assert that the backfill job only produces a single block + let blocks_and_outcomes = block_execution_it.collect::>(); + assert_eq!(blocks_and_outcomes.len(), 1); + + // Assert that the backfill job single block iterator produces the expected output for each + // block + for (i, res) in blocks_and_outcomes.into_iter().enumerate() { + let (block, mut execution_output) = res?; + execution_output.state.reverts.sort(); + + let sealed_block_with_senders = blocks_and_execution_outcomes[i].0.clone(); + let expected_block = sealed_block_with_senders.unseal(); + let expected_output = &blocks_and_execution_outcomes[i].1; + + assert_eq!(block, expected_block); + assert_eq!(&execution_output, expected_output); + } + + Ok(()) + } +} diff --git a/crates/exex/exex/src/backfill/mod.rs b/crates/exex/exex/src/backfill/mod.rs index 9eca4665aa..51f126223f 100644 --- a/crates/exex/exex/src/backfill/mod.rs +++ b/crates/exex/exex/src/backfill/mod.rs @@ -1,514 +1,9 @@ -use job::SingleBlockBackfillJob; -use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider}; -use reth_node_api::FullNodeComponents; -use reth_primitives::{Block, BlockNumber}; -use reth_primitives_traits::format_gas_throughput; -use reth_provider::{ - BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant, -}; -use reth_prune_types::PruneModes; -use reth_revm::database::StateProviderDatabase; -use reth_stages_api::ExecutionStageThresholds; -use reth_tracing::tracing::{debug, trace}; -use std::{ - ops::RangeInclusive, - time::{Duration, Instant}, -}; -use stream::BackFillJobStream; - +mod factory; mod job; mod stream; - -/// Factory for creating new backfill jobs. -#[derive(Debug, Clone)] -pub struct BackfillJobFactory { - executor: E, - provider: P, - prune_modes: PruneModes, - thresholds: ExecutionStageThresholds, -} - -impl BackfillJobFactory { - /// Creates a new [`BackfillJobFactory`]. - pub fn new(executor: E, provider: P) -> Self { - Self { - executor, - provider, - prune_modes: PruneModes::none(), - thresholds: ExecutionStageThresholds::default(), - } - } - - /// Sets the prune modes - pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self { - self.prune_modes = prune_modes; - self - } - - /// Sets the thresholds - pub const fn with_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self { - self.thresholds = thresholds; - self - } -} - -impl BackfillJobFactory { - /// Creates a new backfill job for the given range. - pub fn backfill(&self, range: RangeInclusive) -> BackfillJob { - BackfillJob { - executor: self.executor.clone(), - provider: self.provider.clone(), - prune_modes: self.prune_modes.clone(), - range, - thresholds: self.thresholds.clone(), - } - } -} - -impl BackfillJobFactory<(), ()> { - /// Creates a new [`BackfillJobFactory`] from [`FullNodeComponents`]. - pub fn new_from_components( - components: Node, - ) -> BackfillJobFactory { - BackfillJobFactory::<_, _>::new( - components.block_executor().clone(), - components.provider().clone(), - ) - } -} - -/// Backfill job started for a specific range. -/// -/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds -/// and yields [`Chain`] -#[derive(Debug)] -pub struct BackfillJob { - executor: E, - provider: P, - prune_modes: PruneModes, - thresholds: ExecutionStageThresholds, - range: RangeInclusive, -} - -impl Iterator for BackfillJob -where - E: BlockExecutorProvider, - P: HeaderProvider + BlockReader + StateProviderFactory, -{ - type Item = Result; - - fn next(&mut self) -> Option { - if self.range.is_empty() { - return None - } - - Some(self.execute_range()) - } -} - -impl BackfillJob -where - E: BlockExecutorProvider, - P: BlockReader + HeaderProvider + StateProviderFactory, -{ - fn execute_range(&mut self) -> Result { - let mut executor = self.executor.batch_executor(StateProviderDatabase::new( - self.provider.history_by_block_number(self.range.start().saturating_sub(1))?, - )); - executor.set_prune_modes(self.prune_modes.clone()); - - let mut fetch_block_duration = Duration::default(); - let mut execution_duration = Duration::default(); - let mut cumulative_gas = 0; - let batch_start = Instant::now(); - - let mut blocks = Vec::new(); - for block_number in self.range.clone() { - // Fetch the block - let fetch_block_start = Instant::now(); - - let td = self - .provider - .header_td_by_number(block_number)? - .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; - - // we need the block's transactions along with their hashes - let block = self - .provider - .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)? - .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; - - fetch_block_duration += fetch_block_start.elapsed(); - - cumulative_gas += block.gas_used; - - // Configure the executor to use the current state. - trace!(target: "exex::backfill", number = block_number, txs = block.body.len(), "Executing block"); - - // Execute the block - let execute_start = Instant::now(); - - // Unseal the block for execution - let (block, senders) = block.into_components(); - let (unsealed_header, hash) = block.header.split(); - let block = Block { - header: unsealed_header, - body: block.body, - ommers: block.ommers, - withdrawals: block.withdrawals, - requests: block.requests, - } - .with_senders_unchecked(senders); - - executor.execute_and_verify_one((&block, td).into())?; - execution_duration += execute_start.elapsed(); - - // TODO(alexey): report gas metrics using `block.header.gas_used` - - // Seal the block back and save it - blocks.push(block.seal(hash)); - - // Check if we should commit now - let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64; - if self.thresholds.is_end_of_batch( - block_number - *self.range.start(), - bundle_size_hint, - cumulative_gas, - batch_start.elapsed(), - ) { - break - } - } - - let last_block_number = blocks.last().expect("blocks should not be empty").number; - debug!( - target: "exex::backfill", - range = ?*self.range.start()..=last_block_number, - block_fetch = ?fetch_block_duration, - execution = ?execution_duration, - throughput = format_gas_throughput(cumulative_gas, execution_duration), - "Finished executing block range" - ); - self.range = last_block_number + 1..=*self.range.end(); - - let chain = Chain::new(blocks, executor.finalize(), None); - Ok(chain) - } -} - -impl BackfillJob { - /// Converts the backfill job into a single block backfill job. - pub fn into_single_blocks(self) -> SingleBlockBackfillJob { - self.into() - } - - /// Converts the backfill job into a backfill job stream. - pub fn into_stream(self) -> BackFillJobStream - where - E: BlockExecutorProvider + Clone + 'static, - P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static, - { - BackFillJobStream::new(self.into_single_blocks()) - } -} - #[cfg(test)] -mod tests { - use crate::BackfillJobFactory; - use eyre::OptionExt; - use futures::StreamExt; - use reth_blockchain_tree::noop::NoopBlockchainTree; - use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET}; - use reth_db_common::init::init_genesis; - use reth_evm::execute::{ - BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, - }; - use reth_evm_ethereum::execute::EthExecutorProvider; - use reth_primitives::{ - b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, BlockWithSenders, - Genesis, GenesisAccount, Header, Receipt, Requests, SealedBlockWithSenders, Transaction, - TxEip2930, TxKind, U256, - }; - use reth_provider::{ - providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, - BlockWriter, ExecutionOutcome, LatestStateProviderRef, ProviderFactory, - }; - use reth_revm::database::StateProviderDatabase; - use reth_testing_utils::generators::{self, sign_tx_with_key_pair}; - use secp256k1::Keypair; - use std::sync::Arc; +mod test_utils; - fn to_execution_outcome( - block_number: u64, - block_execution_output: &BlockExecutionOutput, - ) -> ExecutionOutcome { - ExecutionOutcome { - bundle: block_execution_output.state.clone(), - receipts: block_execution_output.receipts.clone().into(), - first_block: block_number, - requests: vec![Requests(block_execution_output.requests.clone())], - } - } - - fn chain_spec(address: Address) -> Arc { - // Create a chain spec with a genesis state that contains the - // provided sender - Arc::new( - ChainSpecBuilder::default() - .chain(MAINNET.chain) - .genesis(Genesis { - alloc: [( - address, - GenesisAccount { balance: U256::from(ETH_TO_WEI), ..Default::default() }, - )] - .into(), - ..MAINNET.genesis.clone() - }) - .paris_activated() - .build(), - ) - } - - fn execute_block_and_commit_to_database( - provider_factory: &ProviderFactory, - chain_spec: Arc, - block: &BlockWithSenders, - ) -> eyre::Result> - where - DB: reth_db_api::database::Database, - { - let provider = provider_factory.provider()?; - - // Execute the block to produce a block execution output - let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec) - .executor(StateProviderDatabase::new(LatestStateProviderRef::new( - provider.tx_ref(), - provider.static_file_provider().clone(), - ))) - .execute(BlockExecutionInput { block, total_difficulty: U256::ZERO })?; - block_execution_output.state.reverts.sort(); - - // Convert the block execution output to an execution outcome for committing to the database - let execution_outcome = to_execution_outcome(block.number, &block_execution_output); - - // Commit the block's execution outcome to the database - let provider_rw = provider_factory.provider_rw()?; - let block = block.clone().seal_slow(); - provider_rw.append_blocks_with_state( - vec![block], - execution_outcome, - Default::default(), - Default::default(), - )?; - provider_rw.commit()?; - - Ok(block_execution_output) - } - - fn blocks_and_execution_outputs( - provider_factory: ProviderFactory, - chain_spec: Arc, - key_pair: Keypair, - ) -> eyre::Result)>> - where - DB: reth_db_api::database::Database, - { - // First block has a transaction that transfers some ETH to zero address - let block1 = Block { - header: Header { - parent_hash: chain_spec.genesis_hash(), - receipts_root: b256!( - "d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e" - ), - difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"), - number: 1, - gas_limit: 21000, - gas_used: 21000, - ..Default::default() - }, - body: vec![sign_tx_with_key_pair( - key_pair, - Transaction::Eip2930(TxEip2930 { - chain_id: chain_spec.chain.id(), - nonce: 0, - gas_limit: 21000, - gas_price: 1_500_000_000, - to: TxKind::Call(Address::ZERO), - value: U256::from(0.1 * ETH_TO_WEI as f64), - ..Default::default() - }), - )], - ..Default::default() - } - .with_recovered_senders() - .ok_or_eyre("failed to recover senders")?; - - // Second block resends the same transaction with increased nonce - let block2 = Block { - header: Header { - parent_hash: block1.header.hash_slow(), - receipts_root: b256!( - "d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e" - ), - difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"), - number: 2, - gas_limit: 21000, - gas_used: 21000, - ..Default::default() - }, - body: vec![sign_tx_with_key_pair( - key_pair, - Transaction::Eip2930(TxEip2930 { - chain_id: chain_spec.chain.id(), - nonce: 1, - gas_limit: 21000, - gas_price: 1_500_000_000, - to: TxKind::Call(Address::ZERO), - value: U256::from(0.1 * ETH_TO_WEI as f64), - ..Default::default() - }), - )], - ..Default::default() - } - .with_recovered_senders() - .ok_or_eyre("failed to recover senders")?; - - let block_output1 = - execute_block_and_commit_to_database(&provider_factory, chain_spec.clone(), &block1)?; - let block_output2 = - execute_block_and_commit_to_database(&provider_factory, chain_spec, &block2)?; - - let block1 = block1.seal_slow(); - let block2 = block2.seal_slow(); - - Ok(vec![(block1, block_output1), (block2, block_output2)]) - } - - #[test] - fn test_backfill() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - - // Create a key pair for the sender - let key_pair = Keypair::new_global(&mut generators::rng()); - let address = public_key_to_address(key_pair.public_key()); - - let chain_spec = chain_spec(address); - - let executor = EthExecutorProvider::ethereum(chain_spec.clone()); - let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); - init_genesis(provider_factory.clone())?; - let blockchain_db = BlockchainProvider::new( - provider_factory.clone(), - Arc::new(NoopBlockchainTree::default()), - )?; - - let blocks_and_execution_outputs = - blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; - let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap(); - let execution_outcome = to_execution_outcome(block.number, block_execution_output); - - // Backfill the first block - let factory = BackfillJobFactory::new(executor, blockchain_db); - let job = factory.backfill(1..=1); - let chains = job.collect::, _>>()?; - - // Assert that the backfill job produced the same chain as we got before when we were - // executing only the first block - assert_eq!(chains.len(), 1); - let mut chain = chains.into_iter().next().unwrap(); - chain.execution_outcome_mut().bundle.reverts.sort(); - assert_eq!(chain.blocks(), &[(1, block.clone())].into()); - assert_eq!(chain.execution_outcome(), &execution_outcome); - - Ok(()) - } - - #[test] - fn test_single_block_backfill() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - - // Create a key pair for the sender - let key_pair = Keypair::new_global(&mut generators::rng()); - let address = public_key_to_address(key_pair.public_key()); - - let chain_spec = chain_spec(address); - - let executor = EthExecutorProvider::ethereum(chain_spec.clone()); - let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); - init_genesis(provider_factory.clone())?; - let blockchain_db = BlockchainProvider::new( - provider_factory.clone(), - Arc::new(NoopBlockchainTree::default()), - )?; - - let blocks_and_execution_outcomes = - blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; - - // Backfill the first block - let factory = BackfillJobFactory::new(executor, blockchain_db); - let job = factory.backfill(1..=1); - let single_job = job.into_single_blocks(); - let block_execution_it = single_job.into_iter(); - - // Assert that the backfill job only produces a single block - let blocks_and_outcomes = block_execution_it.collect::>(); - assert_eq!(blocks_and_outcomes.len(), 1); - - // Assert that the backfill job single block iterator produces the expected output for each - // block - for (i, res) in blocks_and_outcomes.into_iter().enumerate() { - let (block, mut execution_output) = res?; - execution_output.state.reverts.sort(); - - let sealed_block_with_senders = blocks_and_execution_outcomes[i].0.clone(); - let expected_block = sealed_block_with_senders.unseal(); - let expected_output = &blocks_and_execution_outcomes[i].1; - - assert_eq!(block, expected_block); - assert_eq!(&execution_output, expected_output); - } - - Ok(()) - } - - #[tokio::test] - async fn test_async_backfill() -> eyre::Result<()> { - reth_tracing::init_test_tracing(); - - // Create a key pair for the sender - let key_pair = Keypair::new_global(&mut generators::rng()); - let address = public_key_to_address(key_pair.public_key()); - - let chain_spec = chain_spec(address); - - let executor = EthExecutorProvider::ethereum(chain_spec.clone()); - let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); - init_genesis(provider_factory.clone())?; - let blockchain_db = BlockchainProvider::new( - provider_factory.clone(), - Arc::new(NoopBlockchainTree::default()), - )?; - - // Create first 2 blocks - let blocks_and_execution_outcomes = - blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; - - // Backfill the first block - let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone()); - let mut backfill_stream = factory.backfill(1..=1).into_stream(); - - // execute first block - let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap(); - execution_output.state.reverts.sort(); - let sealed_block_with_senders = blocks_and_execution_outcomes[0].0.clone(); - let expected_block = sealed_block_with_senders.unseal(); - let expected_output = &blocks_and_execution_outcomes[0].1; - assert_eq!(block, expected_block); - assert_eq!(&execution_output, expected_output); - - // expect no more blocks - assert!(backfill_stream.next().await.is_none()); - - Ok(()) - } -} +pub use factory::BackfillJobFactory; +pub use job::{BackfillJob, SingleBlockBackfillJob}; +pub use stream::BackFillJobStream; diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs index 25022e42ef..cd0141c6bb 100644 --- a/crates/exex/exex/src/backfill/stream.rs +++ b/crates/exex/exex/src/backfill/stream.rs @@ -1,4 +1,10 @@ -use super::job::SingleBlockBackfillJob; +use crate::SingleBlockBackfillJob; +use std::{ + ops::RangeInclusive, + pin::Pin, + task::{ready, Context, Poll}, +}; + use futures::{ stream::{FuturesOrdered, Stream}, StreamExt, @@ -6,11 +12,6 @@ use futures::{ use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider}; use reth_primitives::{BlockNumber, BlockWithSenders, Receipt}; use reth_provider::{BlockReader, HeaderProvider, StateProviderFactory}; -use std::{ - ops::RangeInclusive, - pin::Pin, - task::{ready, Context, Poll}, -}; use tokio::task::JoinHandle; type BackfillTasks = FuturesOrdered< @@ -97,3 +98,64 @@ where } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::{ + backfill::test_utils::{blocks_and_execution_outputs, chain_spec}, + BackfillJobFactory, + }; + use futures::StreamExt; + use reth_blockchain_tree::noop::NoopBlockchainTree; + use reth_db_common::init::init_genesis; + use reth_evm_ethereum::execute::EthExecutorProvider; + use reth_primitives::public_key_to_address; + use reth_provider::{ + providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, + }; + use reth_testing_utils::generators; + use secp256k1::Keypair; + + #[tokio::test] + async fn test_async_backfill() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Create a key pair for the sender + let key_pair = Keypair::new_global(&mut generators::rng()); + let address = public_key_to_address(key_pair.public_key()); + + let chain_spec = chain_spec(address); + + let executor = EthExecutorProvider::ethereum(chain_spec.clone()); + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); + init_genesis(provider_factory.clone())?; + let blockchain_db = BlockchainProvider::new( + provider_factory.clone(), + Arc::new(NoopBlockchainTree::default()), + )?; + + // Create first 2 blocks + let blocks_and_execution_outcomes = + blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; + + // Backfill the first block + let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone()); + let mut backfill_stream = factory.backfill(1..=1).into_stream(); + + // execute first block + let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap(); + execution_output.state.reverts.sort(); + let sealed_block_with_senders = blocks_and_execution_outcomes[0].0.clone(); + let expected_block = sealed_block_with_senders.unseal(); + let expected_output = &blocks_and_execution_outcomes[0].1; + assert_eq!(block, expected_block); + assert_eq!(&execution_output, expected_output); + + // expect no more blocks + assert!(backfill_stream.next().await.is_none()); + + Ok(()) + } +} diff --git a/crates/exex/exex/src/backfill/test_utils.rs b/crates/exex/exex/src/backfill/test_utils.rs new file mode 100644 index 0000000000..05b41cd2b9 --- /dev/null +++ b/crates/exex/exex/src/backfill/test_utils.rs @@ -0,0 +1,162 @@ +use std::sync::Arc; + +use eyre::OptionExt; +use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET}; +use reth_evm::execute::{ + BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, +}; +use reth_evm_ethereum::execute::EthExecutorProvider; +use reth_primitives::{ + b256, constants::ETH_TO_WEI, Address, Block, BlockWithSenders, Genesis, GenesisAccount, Header, + Receipt, Requests, SealedBlockWithSenders, Transaction, TxEip2930, TxKind, U256, +}; +use reth_provider::{BlockWriter as _, ExecutionOutcome, LatestStateProviderRef, ProviderFactory}; +use reth_revm::database::StateProviderDatabase; +use reth_testing_utils::generators::sign_tx_with_key_pair; +use secp256k1::Keypair; + +pub(crate) fn to_execution_outcome( + block_number: u64, + block_execution_output: &BlockExecutionOutput, +) -> ExecutionOutcome { + ExecutionOutcome { + bundle: block_execution_output.state.clone(), + receipts: block_execution_output.receipts.clone().into(), + first_block: block_number, + requests: vec![Requests(block_execution_output.requests.clone())], + } +} + +pub(crate) fn chain_spec(address: Address) -> Arc { + // Create a chain spec with a genesis state that contains the + // provided sender + Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(Genesis { + alloc: [( + address, + GenesisAccount { balance: U256::from(ETH_TO_WEI), ..Default::default() }, + )] + .into(), + ..MAINNET.genesis.clone() + }) + .paris_activated() + .build(), + ) +} + +pub(crate) fn execute_block_and_commit_to_database( + provider_factory: &ProviderFactory, + chain_spec: Arc, + block: &BlockWithSenders, +) -> eyre::Result> +where + DB: reth_db_api::database::Database, +{ + let provider = provider_factory.provider()?; + + // Execute the block to produce a block execution output + let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec) + .executor(StateProviderDatabase::new(LatestStateProviderRef::new( + provider.tx_ref(), + provider.static_file_provider().clone(), + ))) + .execute(BlockExecutionInput { block, total_difficulty: U256::ZERO })?; + block_execution_output.state.reverts.sort(); + + // Convert the block execution output to an execution outcome for committing to the database + let execution_outcome = to_execution_outcome(block.number, &block_execution_output); + + // Commit the block's execution outcome to the database + let provider_rw = provider_factory.provider_rw()?; + let block = block.clone().seal_slow(); + provider_rw.append_blocks_with_state( + vec![block], + execution_outcome, + Default::default(), + Default::default(), + )?; + provider_rw.commit()?; + + Ok(block_execution_output) +} + +pub(crate) fn blocks_and_execution_outputs( + provider_factory: ProviderFactory, + chain_spec: Arc, + key_pair: Keypair, +) -> eyre::Result)>> +where + DB: reth_db_api::database::Database, +{ + // First block has a transaction that transfers some ETH to zero address + let block1 = Block { + header: Header { + parent_hash: chain_spec.genesis_hash(), + receipts_root: b256!( + "d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e" + ), + difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"), + number: 1, + gas_limit: 21000, + gas_used: 21000, + ..Default::default() + }, + body: vec![sign_tx_with_key_pair( + key_pair, + Transaction::Eip2930(TxEip2930 { + chain_id: chain_spec.chain.id(), + nonce: 0, + gas_limit: 21000, + gas_price: 1_500_000_000, + to: TxKind::Call(Address::ZERO), + value: U256::from(0.1 * ETH_TO_WEI as f64), + ..Default::default() + }), + )], + ..Default::default() + } + .with_recovered_senders() + .ok_or_eyre("failed to recover senders")?; + + // Second block resends the same transaction with increased nonce + let block2 = Block { + header: Header { + parent_hash: block1.header.hash_slow(), + receipts_root: b256!( + "d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e" + ), + difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"), + number: 2, + gas_limit: 21000, + gas_used: 21000, + ..Default::default() + }, + body: vec![sign_tx_with_key_pair( + key_pair, + Transaction::Eip2930(TxEip2930 { + chain_id: chain_spec.chain.id(), + nonce: 1, + gas_limit: 21000, + gas_price: 1_500_000_000, + to: TxKind::Call(Address::ZERO), + value: U256::from(0.1 * ETH_TO_WEI as f64), + ..Default::default() + }), + )], + ..Default::default() + } + .with_recovered_senders() + .ok_or_eyre("failed to recover senders")?; + + let block_output1 = + execute_block_and_commit_to_database(&provider_factory, chain_spec.clone(), &block1)?; + let block_output2 = + execute_block_and_commit_to_database(&provider_factory, chain_spec, &block2)?; + + let block1 = block1.seal_slow(); + let block2 = block2.seal_slow(); + + Ok(vec![(block1, block_output1), (block2, block_output2)]) +}