feat(sync): standardize stage control flow checks (#681)

* feat(sync): standardize stage control flow checks

* fix input parameter for test_eth_chain

* clean up & put behind macro

* Update crates/stages/src/stage.rs

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>

* address comments

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Roman Krasiuk
2023-01-04 21:03:36 +02:00
committed by GitHub
parent 8c413ad0a9
commit ec88f1deef
8 changed files with 137 additions and 90 deletions

View File

@@ -53,6 +53,8 @@ pub struct StageConfig {
pub bodies: BodiesConfig,
/// Sender recovery stage configuration.
pub sender_recovery: SenderRecoveryConfig,
/// Execution stage configuration.
pub execution: ExecutionConfig,
}
/// Header stage configuration.
@@ -127,3 +129,16 @@ impl Default for SenderRecoveryConfig {
Self { commit_threshold: 5_000, batch_size: 1000 }
}
}
/// Execution stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecutionConfig {
/// The maximum number of blocks to execution before committing progress to the database.
pub commit_threshold: u64,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self { commit_threshold: 5_000 }
}
}

View File

@@ -151,7 +151,10 @@ impl Command {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.push(ExecutionStage { config: ExecutorConfig::new_ethereum() });
.push(ExecutionStage {
config: ExecutorConfig::new_ethereum(),
commit_threshold: config.stages.execution.commit_threshold,
});
if let Some(tip) = self.tip {
debug!("Tip manually set: {}", tip);

View File

@@ -15,7 +15,7 @@ use reth_primitives::{
H256, U256,
};
use reth_rlp::Decodable;
use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, Transaction};
use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, StageId, Transaction};
use std::{
collections::HashMap,
ffi::OsStr,
@@ -115,8 +115,10 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
let genesis_block = SealedBlock { header, body: vec![], ommers: vec![] };
reth_provider::insert_canonical_block(&tx, &genesis_block, has_block_reward)?;
let mut last_block = None;
suite.blocks.iter().try_for_each(|block| -> eyre::Result<()> {
let decoded = SealedBlock::decode(&mut block.rlp.as_ref())?;
last_block = Some(decoded.number);
reth_provider::insert_canonical_block(&tx, &decoded, has_block_reward)?;
Ok(())
})?;
@@ -165,11 +167,16 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
// Initialize the execution stage
// Hardcode the chain_id to Ethereum 1.
let mut stage =
ExecutionStage::new(reth_executor::Config { chain_id: U256::from(1), spec_upgrades });
let mut stage = ExecutionStage::new(
reth_executor::Config { chain_id: U256::from(1), spec_upgrades },
1000,
);
// Call execution stage
let input = ExecInput::default();
let input = ExecInput {
previous_stage: last_block.map(|b| (StageId(""), b)),
stage_progress: None,
};
{
let mut transaction = Transaction::new(db.as_ref())?;
@@ -197,12 +204,12 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
tracing::trace!("Our storage:{:?}", storage);
for (address, test_account) in state.iter() {
// check account
let our_account = tx
.get::<tables::PlainAccountState>(*address)?
.ok_or(eyre!("Account is missing:{address} expected:{:?}", test_account))?;
let our_account = tx.get::<tables::PlainAccountState>(*address)?.ok_or(
eyre!("Account is missing: {address} expected: {:?}", test_account),
)?;
if test_account.balance.0 != our_account.balance {
return Err(eyre!(
"Account {address} balance diff, expected {} got{}",
"Account {address} balance diff, expected {} got {}",
test_account.balance.0,
our_account.balance
))

View File

@@ -1,3 +1,5 @@
use std::ops::RangeInclusive;
use crate::{db::Transaction, error::StageError, id::StageId};
use async_trait::async_trait;
use reth_db::database::Database;
@@ -17,6 +19,29 @@ impl ExecInput {
pub fn previous_stage_progress(&self) -> BlockNumber {
self.previous_stage.as_ref().map(|(_, num)| *num).unwrap_or_default()
}
/// Return next execution action.
///
/// [ExecAction::Done] is returned if there are no blocks to execute in this stage.
/// [ExecAction::Run] is returned if the stage should proceed with execution.
pub fn next_action(&self, max_threshold: Option<u64>) -> ExecAction {
// Extract information about the stage progress
let stage_progress = self.stage_progress.unwrap_or_default();
let previous_stage_progress = self.previous_stage_progress();
let start_block = stage_progress + 1;
let end_block = match max_threshold {
Some(threshold) => previous_stage_progress.min(stage_progress + threshold),
None => previous_stage_progress,
};
let capped = end_block < previous_stage_progress;
if start_block <= end_block {
ExecAction::Run { range: start_block..=end_block, capped }
} else {
ExecAction::Done { stage_progress, target: end_block }
}
}
}
/// Stage unwind input, see [Stage::unwind].
@@ -46,6 +71,26 @@ pub struct UnwindOutput {
pub stage_progress: BlockNumber,
}
/// Controls whether a stage should continue execution or not.
#[derive(Debug)]
pub enum ExecAction {
/// The stage should continue with execution.
Run {
/// The execution block range
range: RangeInclusive<BlockNumber>,
/// The flag indicating whether the range was capped
/// by some max blocks parameter
capped: bool,
},
/// The stage should terminate since there are no blocks to execute.
Done {
/// The current stage progress
stage_progress: BlockNumber,
/// The execution target provided in [ExecInput].
target: BlockNumber,
},
}
/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing
@@ -79,3 +124,19 @@ pub trait Stage<DB: Database>: Send + Sync {
input: UnwindInput,
) -> Result<UnwindOutput, StageError>;
}
/// Get the next execute action for the stage. Return if the stage has no
/// blocks to process.
macro_rules! exec_or_return {
($input: expr, $threshold: expr, $log_target: expr) => {
match $input.next_action(Some($threshold)) {
ExecAction::Run { range, capped } => (range.into_inner(), capped),
ExecAction::Done { stage_progress, target } => {
info!(target: $log_target, stage_progress, target, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
}
};
}
pub(crate) use exec_or_return;

View File

@@ -1,6 +1,6 @@
use crate::{
db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput,
Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use futures_util::StreamExt;
use reth_db::{
@@ -78,26 +78,10 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let previous_stage_progress = input.previous_stage_progress();
if previous_stage_progress == 0 {
error!(target: "sync::stages::bodies", "The body stage is running first, no work can be done");
return Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::BlockBody {
number: 0,
}))
}
let ((start_block, end_block), capped) =
exec_or_return!(input, self.commit_threshold, "sync::stages::bodies");
// The block we ended at last sync, and the one we are starting on now
let stage_progress = input.stage_progress.unwrap_or_default();
let starting_block = stage_progress + 1;
// Short circuit in case we already reached the target block
let target = previous_stage_progress.min(starting_block + self.commit_threshold);
if target <= stage_progress {
info!(target: "sync::stages::bodies", stage_progress, target, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
let bodies_to_download = self.bodies_to_download::<DB>(tx, starting_block, target)?;
let bodies_to_download = self.bodies_to_download::<DB>(tx, start_block, end_block)?;
// Cursors used to write bodies, ommers and transactions
let mut body_cursor = tx.cursor_mut::<tables::BlockBodies>()?;
@@ -109,13 +93,13 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let mut tx_transition_cursor = tx.cursor_mut::<tables::TxTransitionIndex>()?;
// Get id for the first transaction and first transition in the block
let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(starting_block)?;
let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(start_block)?;
// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_-
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = stage_progress;
trace!(target: "sync::stages::bodies", stage_progress, target, start_tx_id = current_tx_id, transition_id, "Commencing sync");
let mut highest_block = input.stage_progress.unwrap_or_default();
debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = current_tx_id, transition_id, "Commencing sync");
while let Some(result) = bodies_stream.next().await {
let Ok(response) = result else {
error!(target: "sync::stages::bodies", block = highest_block + 1, error = ?result.unwrap_err(), "Error downloading block");
@@ -188,10 +172,8 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// The stage is "done" if:
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
let capped = target < previous_stage_progress;
let done = highest_block < target || !capped;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target, done, "Sync iteration finished");
let done = !capped && highest_block == end_block;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: highest_block, done })
}
@@ -210,7 +192,6 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let mut block_transition_cursor = tx.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_mut::<tables::TxTransitionIndex>()?;
// let mut entry = tx_count_cursor.last()?;
let mut entry = body_cursor.last()?;
while let Some((key, body)) = entry {
if key.number() <= input.unwind_to {

View File

@@ -1,6 +1,6 @@
use crate::{
db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput,
Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@@ -53,27 +53,26 @@ const EXECUTION: StageId = StageId("Execution");
pub struct ExecutionStage {
/// Executor configuration.
pub config: Config,
/// Commit threshold
pub commit_threshold: u64,
}
impl Default for ExecutionStage {
fn default() -> Self {
Self {
config: Config { chain_id: U256::from(1), spec_upgrades: SpecUpgrades::new_ethereum() },
commit_threshold: 1000,
}
}
}
impl ExecutionStage {
/// Create new execution stage with specified config.
pub fn new(config: Config) -> Self {
Self { config }
pub fn new(config: Config, commit_threshold: u64) -> Self {
Self { config, commit_threshold }
}
}
/// Specify batch sizes of block in execution
/// TODO make this as config
const BATCH_SIZE: u64 = 1000;
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for ExecutionStage {
/// Return the id of the stage
@@ -87,10 +86,9 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
// none and zero are same as for genesis block (zeroed block) we are making assumption to
// not have transaction.
let ((start_block, end_block), capped) =
exec_or_return!(input, self.commit_threshold, "sync::stages::execution");
let last_block = input.stage_progress.unwrap_or_default();
let start_block = last_block + 1;
// Get next canonical block hashes to execute.
let mut canonicals = tx.cursor::<tables::CanonicalHeaders>()?;
@@ -108,16 +106,10 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// get canonical blocks (num,hash)
let canonical_batch = canonicals
.walk(start_block)?
.take(BATCH_SIZE as usize) // TODO: commit_threshold
.take_while(|entry| entry.as_ref().map(|e| e.0 <= end_block).unwrap_or_default())
.map(|i| i.map(BlockNumHash))
.collect::<Result<Vec<_>, _>>()?;
// no more canonical blocks, we are done with execution.
if canonical_batch.is_empty() {
info!(target: "sync::stages::execution", stage_progress = last_block, "Target block already reached");
return Ok(ExecOutput { stage_progress: last_block, done: true })
}
// Get block headers and bodies from canonical hashes
let block_batch = canonical_batch
.iter()
@@ -288,10 +280,9 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
}
}
let stage_progress = last_block + canonical_batch.len() as u64;
let done = canonical_batch.len() < BATCH_SIZE as usize;
info!(target: "sync::stages::execution", done, stage_progress, "Sync iteration finished");
Ok(ExecOutput { done, stage_progress })
let done = !capped;
info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: end_block, done })
}
/// Unwind the stage.
@@ -391,6 +382,8 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
mod tests {
use std::ops::{Deref, DerefMut};
use crate::test_utils::PREV_STAGE_ID;
use super::*;
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap};
use reth_primitives::{hex_literal::hex, keccak256, Account, SealedBlock, H160, U256};
@@ -404,7 +397,7 @@ mod tests {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let input = ExecInput {
previous_stage: None,
previous_stage: Some((PREV_STAGE_ID, 1)),
/// The progress of this stage the last time it was executed.
stage_progress: None,
};

View File

@@ -1,5 +1,6 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use itertools::Itertools;
use rayon::prelude::*;
@@ -59,25 +60,19 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let previous_stage_progress = input.previous_stage_progress();
let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold);
if max_block_num <= stage_progress {
info!(target: "sync::stages::sender_recovery", target = max_block_num, stage_progress, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
let ((start_block, end_block), capped) =
exec_or_return!(input, self.commit_threshold, "sync::stages::sender_recovery");
// Look up the start index for the transaction range
let start_tx_index = tx.get_block_body_by_num(stage_progress + 1)?.start_tx_id;
let start_tx_index = tx.get_block_body_by_num(start_block)?.start_tx_id;
// Look up the end index for transaction range (inclusive)
let end_tx_index = tx.get_block_body_by_num(max_block_num)?.last_tx_index();
let end_tx_index = tx.get_block_body_by_num(end_block)?.last_tx_index();
// No transactions to walk over
if start_tx_index > end_tx_index {
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Target transaction already reached");
return Ok(ExecOutput { stage_progress: max_block_num, done: true })
return Ok(ExecOutput { stage_progress: end_block, done: true })
}
// Acquire the cursor for inserting elements
@@ -110,9 +105,9 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
recovered.into_iter().try_for_each(|(id, sender)| senders_cursor.append(id, sender))?;
}
let done = max_block_num >= previous_stage_progress;
info!(target: "sync::stages::sender_recovery", stage_progress = max_block_num, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: max_block_num, done })
let done = !capped;
info!(target: "sync::stages::sender_recovery", stage_progress = end_block, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: end_block, done })
}
/// Unwind the stage.

View File

@@ -1,6 +1,6 @@
use crate::{
db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
db::Transaction, exec_or_return, DatabaseIntegrityError, ExecAction, ExecInput, ExecOutput,
Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@@ -37,16 +37,8 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let previous_stage_progress = input.previous_stage_progress();
let start_block = stage_progress + 1;
let end_block = previous_stage_progress.min(start_block + self.commit_threshold);
if start_block > end_block {
info!(target: "sync::stages::total_difficulty", stage_progress, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
let ((start_block, end_block), capped) =
exec_or_return!(input, self.commit_threshold, "sync::stages::total_difficulty");
debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync");
@@ -55,7 +47,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
let mut cursor_headers = tx.cursor_mut::<tables::Headers>()?;
// Get latest total difficulty
let last_header_key = tx.get_block_numhash(stage_progress)?;
let last_header_key = tx.get_block_numhash(input.stage_progress.unwrap_or_default())?;
let last_entry = cursor_td
.seek_exact(last_header_key)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: last_header_key.number() })?;
@@ -74,7 +66,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
cursor_td.append(key, td.into())?;
}
let done = end_block >= previous_stage_progress;
let done = !capped;
info!(target: "sync::stages::total_difficulty", stage_progress = end_block, done, "Sync iteration finished");
Ok(ExecOutput { done, stage_progress: end_block })
}