From 43ffb8357551b46a2dc24786ab602f130366df7e Mon Sep 17 00:00:00 2001 From: Luca Provini Date: Thu, 18 Jul 2024 11:32:37 +0200 Subject: [PATCH] feat: BackFillJobStream (#9578) Co-authored-by: Matthias Seitz --- Cargo.lock | 1 + crates/exex/exex/Cargo.toml | 1 + crates/exex/exex/src/backfill/job.rs | 74 +++++++++++ .../exex/src/{backfill.rs => backfill/mod.rs} | 120 ++++++++---------- crates/exex/exex/src/backfill/stream.rs | 99 +++++++++++++++ 5 files changed, 231 insertions(+), 64 deletions(-) create mode 100644 crates/exex/exex/src/backfill/job.rs rename crates/exex/exex/src/{backfill.rs => backfill/mod.rs} (87%) create mode 100644 crates/exex/exex/src/backfill/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 8741d4a5c1..9f4b144e23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7269,6 +7269,7 @@ name = "reth-exex" version = "1.0.2" dependencies = [ "eyre", + "futures", "metrics", "reth-blockchain-tree", "reth-chainspec", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 58a2695b89..8db8c3c942 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -33,6 +33,7 @@ reth-stages-api.workspace = true ## async tokio.workspace = true tokio-util.workspace = true +futures.workspace = true ## misc eyre.workspace = true diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs new file mode 100644 index 0000000000..fa8695740b --- /dev/null +++ b/crates/exex/exex/src/backfill/job.rs @@ -0,0 +1,74 @@ +use reth_evm::execute::{ + BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor, +}; +use reth_primitives::{BlockNumber, BlockWithSenders, Receipt}; +use reth_provider::{ + BlockReader, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant, +}; +use reth_revm::database::StateProviderDatabase; +use reth_tracing::tracing::trace; +use std::ops::RangeInclusive; + +use crate::BackfillJob; + +/// Single block Backfill job started for a specific range. +/// +/// It implements [`Iterator`] which executes a block each time the +/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`]) +#[derive(Debug, Clone)] +pub struct SingleBlockBackfillJob { + executor: E, + provider: P, + pub(crate) range: RangeInclusive, +} + +impl Iterator for SingleBlockBackfillJob +where + E: BlockExecutorProvider, + P: HeaderProvider + BlockReader + StateProviderFactory, +{ + type Item = Result<(BlockWithSenders, BlockExecutionOutput), BlockExecutionError>; + + fn next(&mut self) -> Option { + self.range.next().map(|block_number| self.execute_block(block_number)) + } +} + +impl SingleBlockBackfillJob +where + E: BlockExecutorProvider, + P: HeaderProvider + BlockReader + StateProviderFactory, +{ + pub(crate) fn execute_block( + &self, + block_number: u64, + ) -> Result<(BlockWithSenders, BlockExecutionOutput), BlockExecutionError> { + let td = self + .provider + .header_td_by_number(block_number)? + .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; + + // Fetch the block with senders for execution. + let block_with_senders = self + .provider + .block_with_senders(block_number.into(), TransactionVariant::WithHash)? + .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; + + // Configure the executor to use the previous block's state. + let executor = self.executor.executor(StateProviderDatabase::new( + self.provider.history_by_block_number(block_number.saturating_sub(1))?, + )); + + trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block"); + + let block_execution_output = executor.execute((&block_with_senders, td).into())?; + + Ok((block_with_senders, block_execution_output)) + } +} + +impl From> for SingleBlockBackfillJob { + fn from(value: BackfillJob) -> Self { + Self { executor: value.executor, provider: value.provider, range: value.range } + } +} diff --git a/crates/exex/exex/src/backfill.rs b/crates/exex/exex/src/backfill/mod.rs similarity index 87% rename from crates/exex/exex/src/backfill.rs rename to crates/exex/exex/src/backfill/mod.rs index 36f0057343..9eca4665aa 100644 --- a/crates/exex/exex/src/backfill.rs +++ b/crates/exex/exex/src/backfill/mod.rs @@ -1,8 +1,7 @@ -use reth_evm::execute::{ - BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor, -}; +use job::SingleBlockBackfillJob; +use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider}; use reth_node_api::FullNodeComponents; -use reth_primitives::{Block, BlockNumber, BlockWithSenders, Receipt}; +use reth_primitives::{Block, BlockNumber}; use reth_primitives_traits::format_gas_throughput; use reth_provider::{ BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant, @@ -15,6 +14,10 @@ use std::{ ops::RangeInclusive, time::{Duration, Instant}, }; +use stream::BackFillJobStream; + +mod job; +mod stream; /// Factory for creating new backfill jobs. #[derive(Debug, Clone)] @@ -198,67 +201,14 @@ impl BackfillJob { pub fn into_single_blocks(self) -> SingleBlockBackfillJob { self.into() } -} -impl From> for SingleBlockBackfillJob { - fn from(value: BackfillJob) -> Self { - Self { executor: value.executor, provider: value.provider, range: value.range } - } -} - -/// Single block Backfill job started for a specific range. -/// -/// It implements [`Iterator`] which executes a block each time the -/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`]) -#[derive(Debug)] -pub struct SingleBlockBackfillJob { - executor: E, - provider: P, - range: RangeInclusive, -} - -impl Iterator for SingleBlockBackfillJob -where - E: BlockExecutorProvider, - P: HeaderProvider + BlockReader + StateProviderFactory, -{ - type Item = Result<(BlockWithSenders, BlockExecutionOutput), BlockExecutionError>; - - fn next(&mut self) -> Option { - self.range.next().map(|block_number| self.execute_block(block_number)) - } -} - -impl SingleBlockBackfillJob -where - E: BlockExecutorProvider, - P: HeaderProvider + BlockReader + StateProviderFactory, -{ - fn execute_block( - &self, - block_number: u64, - ) -> Result<(BlockWithSenders, BlockExecutionOutput), BlockExecutionError> { - let td = self - .provider - .header_td_by_number(block_number)? - .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; - - // Fetch the block with senders for execution. - let block_with_senders = self - .provider - .block_with_senders(block_number.into(), TransactionVariant::WithHash)? - .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; - - // Configure the executor to use the previous block's state. - let executor = self.executor.executor(StateProviderDatabase::new( - self.provider.history_by_block_number(block_number.saturating_sub(1))?, - )); - - trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block"); - - let block_execution_output = executor.execute((&block_with_senders, td).into())?; - - Ok((block_with_senders, block_execution_output)) + /// Converts the backfill job into a backfill job stream. + pub fn into_stream(self) -> BackFillJobStream + where + E: BlockExecutorProvider + Clone + 'static, + P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static, + { + BackFillJobStream::new(self.into_single_blocks()) } } @@ -266,6 +216,7 @@ where mod tests { use crate::BackfillJobFactory; use eyre::OptionExt; + use futures::StreamExt; use reth_blockchain_tree::noop::NoopBlockchainTree; use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET}; use reth_db_common::init::init_genesis; @@ -519,4 +470,45 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_async_backfill() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Create a key pair for the sender + let key_pair = Keypair::new_global(&mut generators::rng()); + let address = public_key_to_address(key_pair.public_key()); + + let chain_spec = chain_spec(address); + + let executor = EthExecutorProvider::ethereum(chain_spec.clone()); + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); + init_genesis(provider_factory.clone())?; + let blockchain_db = BlockchainProvider::new( + provider_factory.clone(), + Arc::new(NoopBlockchainTree::default()), + )?; + + // Create first 2 blocks + let blocks_and_execution_outcomes = + blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; + + // Backfill the first block + let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone()); + let mut backfill_stream = factory.backfill(1..=1).into_stream(); + + // execute first block + let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap(); + execution_output.state.reverts.sort(); + let sealed_block_with_senders = blocks_and_execution_outcomes[0].0.clone(); + let expected_block = sealed_block_with_senders.unseal(); + let expected_output = &blocks_and_execution_outcomes[0].1; + assert_eq!(block, expected_block); + assert_eq!(&execution_output, expected_output); + + // expect no more blocks + assert!(backfill_stream.next().await.is_none()); + + Ok(()) + } } diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs new file mode 100644 index 0000000000..25022e42ef --- /dev/null +++ b/crates/exex/exex/src/backfill/stream.rs @@ -0,0 +1,99 @@ +use super::job::SingleBlockBackfillJob; +use futures::{ + stream::{FuturesOrdered, Stream}, + StreamExt, +}; +use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider}; +use reth_primitives::{BlockNumber, BlockWithSenders, Receipt}; +use reth_provider::{BlockReader, HeaderProvider, StateProviderFactory}; +use std::{ + ops::RangeInclusive, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::task::JoinHandle; + +type BackfillTasks = FuturesOrdered< + JoinHandle), BlockExecutionError>>, +>; + +/// The default parallelism for active tasks in [`BackFillJobStream`]. +const DEFAULT_PARALLELISM: usize = 4; + +/// Stream for processing backfill jobs asynchronously. +/// +/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be +/// processed asynchronously but in order within a specified range. +#[derive(Debug)] +pub struct BackFillJobStream { + job: SingleBlockBackfillJob, + tasks: BackfillTasks, + range: RangeInclusive, + parallelism: usize, +} + +impl BackFillJobStream +where + E: BlockExecutorProvider + Clone + Send + 'static, + P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static, +{ + /// Creates a new [`BackFillJobStream`] with the default parallelism. + /// + /// # Parameters + /// - `job`: The [`SingleBlockBackfillJob`] to be executed asynchronously. + /// + /// # Returns + /// A new instance of [`BackFillJobStream`] with the default parallelism. + pub fn new(job: SingleBlockBackfillJob) -> Self { + let range = job.range.clone(); + Self { job, tasks: FuturesOrdered::new(), range, parallelism: DEFAULT_PARALLELISM } + } + + /// Configures the parallelism of the [`BackFillJobStream`] to handle active tasks. + /// + /// # Parameters + /// - `parallelism`: The parallelism to handle active tasks. + /// + /// # Returns + /// The modified instance of [`BackFillJobStream`] with the specified parallelism. + pub const fn with_parallelism(mut self, parallelism: usize) -> Self { + self.parallelism = parallelism; + self + } + + fn spawn_task( + &self, + block_number: BlockNumber, + ) -> JoinHandle), BlockExecutionError>> + { + let job = self.job.clone(); + tokio::task::spawn_blocking(move || job.execute_block(block_number)) + } +} + +impl Stream for BackFillJobStream +where + E: BlockExecutorProvider + Clone + Send + 'static, + P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static + Unpin, +{ + type Item = Result<(BlockWithSenders, BlockExecutionOutput), BlockExecutionError>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // Spawn new tasks only if we are below the parallelism configured. + while this.tasks.len() < this.parallelism { + if let Some(block_number) = this.range.next() { + let task = this.spawn_task(block_number); + this.tasks.push_back(task); + } else { + break; + } + } + + match ready!(this.tasks.poll_next_unpin(cx)) { + Some(res) => Poll::Ready(Some(res.map_err(|e| BlockExecutionError::Other(e.into()))?)), + None => Poll::Ready(None), + } + } +}