diff --git a/bin/reth/src/config.rs b/bin/reth/src/config.rs index 69607e1f5e..54105395b7 100644 --- a/bin/reth/src/config.rs +++ b/bin/reth/src/config.rs @@ -53,6 +53,8 @@ pub struct StageConfig { pub bodies: BodiesConfig, /// Sender recovery stage configuration. pub sender_recovery: SenderRecoveryConfig, + /// Execution stage configuration. + pub execution: ExecutionConfig, } /// Header stage configuration. @@ -127,3 +129,16 @@ impl Default for SenderRecoveryConfig { Self { commit_threshold: 5_000, batch_size: 1000 } } } + +/// Execution stage configuration. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ExecutionConfig { + /// The maximum number of blocks to execution before committing progress to the database. + pub commit_threshold: u64, +} + +impl Default for ExecutionConfig { + fn default() -> Self { + Self { commit_threshold: 5_000 } + } +} diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 91ab78422e..515bb82c59 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -151,7 +151,10 @@ impl Command { batch_size: config.stages.sender_recovery.batch_size, commit_threshold: config.stages.sender_recovery.commit_threshold, }) - .push(ExecutionStage { config: ExecutorConfig::new_ethereum() }); + .push(ExecutionStage { + config: ExecutorConfig::new_ethereum(), + commit_threshold: config.stages.execution.commit_threshold, + }); if let Some(tip) = self.tip { debug!("Tip manually set: {}", tip); diff --git a/bin/reth/src/test_eth_chain/runner.rs b/bin/reth/src/test_eth_chain/runner.rs index c8308ae9ea..b7cfa54534 100644 --- a/bin/reth/src/test_eth_chain/runner.rs +++ b/bin/reth/src/test_eth_chain/runner.rs @@ -15,7 +15,7 @@ use reth_primitives::{ H256, U256, }; use reth_rlp::Decodable; -use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, Transaction}; +use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, StageId, Transaction}; use std::{ collections::HashMap, ffi::OsStr, @@ -115,8 +115,10 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> { let genesis_block = SealedBlock { header, body: vec![], ommers: vec![] }; reth_provider::insert_canonical_block(&tx, &genesis_block, has_block_reward)?; + let mut last_block = None; suite.blocks.iter().try_for_each(|block| -> eyre::Result<()> { let decoded = SealedBlock::decode(&mut block.rlp.as_ref())?; + last_block = Some(decoded.number); reth_provider::insert_canonical_block(&tx, &decoded, has_block_reward)?; Ok(()) })?; @@ -165,11 +167,16 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> { // Initialize the execution stage // Hardcode the chain_id to Ethereum 1. - let mut stage = - ExecutionStage::new(reth_executor::Config { chain_id: U256::from(1), spec_upgrades }); + let mut stage = ExecutionStage::new( + reth_executor::Config { chain_id: U256::from(1), spec_upgrades }, + 1000, + ); // Call execution stage - let input = ExecInput::default(); + let input = ExecInput { + previous_stage: last_block.map(|b| (StageId(""), b)), + stage_progress: None, + }; { let mut transaction = Transaction::new(db.as_ref())?; @@ -197,12 +204,12 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> { tracing::trace!("Our storage:{:?}", storage); for (address, test_account) in state.iter() { // check account - let our_account = tx - .get::(*address)? - .ok_or(eyre!("Account is missing:{address} expected:{:?}", test_account))?; + let our_account = tx.get::(*address)?.ok_or( + eyre!("Account is missing: {address} expected: {:?}", test_account), + )?; if test_account.balance.0 != our_account.balance { return Err(eyre!( - "Account {address} balance diff, expected {} got{}", + "Account {address} balance diff, expected {} got {}", test_account.balance.0, our_account.balance )) diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index fd463edcb0..f48243d09e 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,3 +1,5 @@ +use std::ops::RangeInclusive; + use crate::{db::Transaction, error::StageError, id::StageId}; use async_trait::async_trait; use reth_db::database::Database; @@ -17,6 +19,29 @@ impl ExecInput { pub fn previous_stage_progress(&self) -> BlockNumber { self.previous_stage.as_ref().map(|(_, num)| *num).unwrap_or_default() } + + /// Return next execution action. + /// + /// [ExecAction::Done] is returned if there are no blocks to execute in this stage. + /// [ExecAction::Run] is returned if the stage should proceed with execution. + pub fn next_action(&self, max_threshold: Option) -> ExecAction { + // Extract information about the stage progress + let stage_progress = self.stage_progress.unwrap_or_default(); + let previous_stage_progress = self.previous_stage_progress(); + + let start_block = stage_progress + 1; + let end_block = match max_threshold { + Some(threshold) => previous_stage_progress.min(stage_progress + threshold), + None => previous_stage_progress, + }; + let capped = end_block < previous_stage_progress; + + if start_block <= end_block { + ExecAction::Run { range: start_block..=end_block, capped } + } else { + ExecAction::Done { stage_progress, target: end_block } + } + } } /// Stage unwind input, see [Stage::unwind]. @@ -46,6 +71,26 @@ pub struct UnwindOutput { pub stage_progress: BlockNumber, } +/// Controls whether a stage should continue execution or not. +#[derive(Debug)] +pub enum ExecAction { + /// The stage should continue with execution. + Run { + /// The execution block range + range: RangeInclusive, + /// The flag indicating whether the range was capped + /// by some max blocks parameter + capped: bool, + }, + /// The stage should terminate since there are no blocks to execute. + Done { + /// The current stage progress + stage_progress: BlockNumber, + /// The execution target provided in [ExecInput]. + target: BlockNumber, + }, +} + /// A stage is a segmented part of the syncing process of the node. /// /// Each stage takes care of a well-defined task, such as downloading headers or executing @@ -79,3 +124,19 @@ pub trait Stage: Send + Sync { input: UnwindInput, ) -> Result; } + +/// Get the next execute action for the stage. Return if the stage has no +/// blocks to process. +macro_rules! exec_or_return { + ($input: expr, $threshold: expr, $log_target: expr) => { + match $input.next_action(Some($threshold)) { + ExecAction::Run { range, capped } => (range.into_inner(), capped), + ExecAction::Done { stage_progress, target } => { + info!(target: $log_target, stage_progress, target, "Target block already reached"); + return Ok(ExecOutput { stage_progress, done: true }) + } + } + }; +} + +pub(crate) use exec_or_return; diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index fdf74c7a19..9ea0e1b1f4 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, + db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput, + Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use futures_util::StreamExt; use reth_db::{ @@ -78,26 +78,10 @@ impl Stage for BodyStage, input: ExecInput, ) -> Result { - let previous_stage_progress = input.previous_stage_progress(); - if previous_stage_progress == 0 { - error!(target: "sync::stages::bodies", "The body stage is running first, no work can be done"); - return Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::BlockBody { - number: 0, - })) - } + let ((start_block, end_block), capped) = + exec_or_return!(input, self.commit_threshold, "sync::stages::bodies"); - // The block we ended at last sync, and the one we are starting on now - let stage_progress = input.stage_progress.unwrap_or_default(); - let starting_block = stage_progress + 1; - - // Short circuit in case we already reached the target block - let target = previous_stage_progress.min(starting_block + self.commit_threshold); - if target <= stage_progress { - info!(target: "sync::stages::bodies", stage_progress, target, "Target block already reached"); - return Ok(ExecOutput { stage_progress, done: true }) - } - - let bodies_to_download = self.bodies_to_download::(tx, starting_block, target)?; + let bodies_to_download = self.bodies_to_download::(tx, start_block, end_block)?; // Cursors used to write bodies, ommers and transactions let mut body_cursor = tx.cursor_mut::()?; @@ -109,13 +93,13 @@ impl Stage for BodyStage()?; // Get id for the first transaction and first transition in the block - let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(starting_block)?; + let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(start_block)?; // NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator // on every iteration of the while loop -_- let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter()); - let mut highest_block = stage_progress; - trace!(target: "sync::stages::bodies", stage_progress, target, start_tx_id = current_tx_id, transition_id, "Commencing sync"); + let mut highest_block = input.stage_progress.unwrap_or_default(); + debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = current_tx_id, transition_id, "Commencing sync"); while let Some(result) = bodies_stream.next().await { let Ok(response) = result else { error!(target: "sync::stages::bodies", block = highest_block + 1, error = ?result.unwrap_err(), "Error downloading block"); @@ -188,10 +172,8 @@ impl Stage for BodyStage Stage for BodyStage()?; let mut tx_transition_cursor = tx.cursor_mut::()?; - // let mut entry = tx_count_cursor.last()?; let mut entry = body_cursor.last()?; while let Some((key, body)) = entry { if key.number() <= input.unwind_to { diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 6f09ef281c..270e760858 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, + db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput, + Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -53,27 +53,26 @@ const EXECUTION: StageId = StageId("Execution"); pub struct ExecutionStage { /// Executor configuration. pub config: Config, + /// Commit threshold + pub commit_threshold: u64, } impl Default for ExecutionStage { fn default() -> Self { Self { config: Config { chain_id: U256::from(1), spec_upgrades: SpecUpgrades::new_ethereum() }, + commit_threshold: 1000, } } } impl ExecutionStage { /// Create new execution stage with specified config. - pub fn new(config: Config) -> Self { - Self { config } + pub fn new(config: Config, commit_threshold: u64) -> Self { + Self { config, commit_threshold } } } -/// Specify batch sizes of block in execution -/// TODO make this as config -const BATCH_SIZE: u64 = 1000; - #[async_trait::async_trait] impl Stage for ExecutionStage { /// Return the id of the stage @@ -87,10 +86,9 @@ impl Stage for ExecutionStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - // none and zero are same as for genesis block (zeroed block) we are making assumption to - // not have transaction. + let ((start_block, end_block), capped) = + exec_or_return!(input, self.commit_threshold, "sync::stages::execution"); let last_block = input.stage_progress.unwrap_or_default(); - let start_block = last_block + 1; // Get next canonical block hashes to execute. let mut canonicals = tx.cursor::()?; @@ -108,16 +106,10 @@ impl Stage for ExecutionStage { // get canonical blocks (num,hash) let canonical_batch = canonicals .walk(start_block)? - .take(BATCH_SIZE as usize) // TODO: commit_threshold + .take_while(|entry| entry.as_ref().map(|e| e.0 <= end_block).unwrap_or_default()) .map(|i| i.map(BlockNumHash)) .collect::, _>>()?; - // no more canonical blocks, we are done with execution. - if canonical_batch.is_empty() { - info!(target: "sync::stages::execution", stage_progress = last_block, "Target block already reached"); - return Ok(ExecOutput { stage_progress: last_block, done: true }) - } - // Get block headers and bodies from canonical hashes let block_batch = canonical_batch .iter() @@ -288,10 +280,9 @@ impl Stage for ExecutionStage { } } - let stage_progress = last_block + canonical_batch.len() as u64; - let done = canonical_batch.len() < BATCH_SIZE as usize; - info!(target: "sync::stages::execution", done, stage_progress, "Sync iteration finished"); - Ok(ExecOutput { done, stage_progress }) + let done = !capped; + info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished"); + Ok(ExecOutput { stage_progress: end_block, done }) } /// Unwind the stage. @@ -391,6 +382,8 @@ impl Stage for ExecutionStage { mod tests { use std::ops::{Deref, DerefMut}; + use crate::test_utils::PREV_STAGE_ID; + use super::*; use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; use reth_primitives::{hex_literal::hex, keccak256, Account, SealedBlock, H160, U256}; @@ -404,7 +397,7 @@ mod tests { let state_db = create_test_db::(EnvKind::RW); let mut tx = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { - previous_stage: None, + previous_stage: Some((PREV_STAGE_ID, 1)), /// The progress of this stage the last time it was executed. stage_progress: None, }; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index ffe1ded0eb..4923b4adad 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,5 +1,6 @@ use crate::{ - db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, + db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, + UnwindInput, UnwindOutput, }; use itertools::Itertools; use rayon::prelude::*; @@ -59,25 +60,19 @@ impl Stage for SenderRecoveryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let stage_progress = input.stage_progress.unwrap_or_default(); - let previous_stage_progress = input.previous_stage_progress(); - let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold); - - if max_block_num <= stage_progress { - info!(target: "sync::stages::sender_recovery", target = max_block_num, stage_progress, "Target block already reached"); - return Ok(ExecOutput { stage_progress, done: true }) - } + let ((start_block, end_block), capped) = + exec_or_return!(input, self.commit_threshold, "sync::stages::sender_recovery"); // Look up the start index for the transaction range - let start_tx_index = tx.get_block_body_by_num(stage_progress + 1)?.start_tx_id; + let start_tx_index = tx.get_block_body_by_num(start_block)?.start_tx_id; // Look up the end index for transaction range (inclusive) - let end_tx_index = tx.get_block_body_by_num(max_block_num)?.last_tx_index(); + let end_tx_index = tx.get_block_body_by_num(end_block)?.last_tx_index(); // No transactions to walk over if start_tx_index > end_tx_index { info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Target transaction already reached"); - return Ok(ExecOutput { stage_progress: max_block_num, done: true }) + return Ok(ExecOutput { stage_progress: end_block, done: true }) } // Acquire the cursor for inserting elements @@ -110,9 +105,9 @@ impl Stage for SenderRecoveryStage { recovered.into_iter().try_for_each(|(id, sender)| senders_cursor.append(id, sender))?; } - let done = max_block_num >= previous_stage_progress; - info!(target: "sync::stages::sender_recovery", stage_progress = max_block_num, done, "Sync iteration finished"); - Ok(ExecOutput { stage_progress: max_block_num, done }) + let done = !capped; + info!(target: "sync::stages::sender_recovery", stage_progress = end_block, done, "Sync iteration finished"); + Ok(ExecOutput { stage_progress: end_block, done }) } /// Unwind the stage. diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index 8c2c031f1b..c3cb067d54 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -1,6 +1,6 @@ use crate::{ - db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, + db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput, + Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -37,16 +37,8 @@ impl Stage for TotalDifficultyStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let stage_progress = input.stage_progress.unwrap_or_default(); - let previous_stage_progress = input.previous_stage_progress(); - - let start_block = stage_progress + 1; - let end_block = previous_stage_progress.min(start_block + self.commit_threshold); - - if start_block > end_block { - info!(target: "sync::stages::total_difficulty", stage_progress, "Target block already reached"); - return Ok(ExecOutput { stage_progress, done: true }) - } + let ((start_block, end_block), capped) = + exec_or_return!(input, self.commit_threshold, "sync::stages::total_difficulty"); debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync"); @@ -55,7 +47,7 @@ impl Stage for TotalDifficultyStage { let mut cursor_headers = tx.cursor_mut::()?; // Get latest total difficulty - let last_header_key = tx.get_block_numhash(stage_progress)?; + let last_header_key = tx.get_block_numhash(input.stage_progress.unwrap_or_default())?; let last_entry = cursor_td .seek_exact(last_header_key)? .ok_or(DatabaseIntegrityError::TotalDifficulty { number: last_header_key.number() })?; @@ -74,7 +66,7 @@ impl Stage for TotalDifficultyStage { cursor_td.append(key, td.into())?; } - let done = end_block >= previous_stage_progress; + let done = !capped; info!(target: "sync::stages::total_difficulty", stage_progress = end_block, done, "Sync iteration finished"); Ok(ExecOutput { done, stage_progress: end_block }) }