From 748f90ed3cbe7033cb0913ddaa5120eefed12141 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Sat, 13 May 2023 02:50:47 +0900 Subject: [PATCH] fix: unwind stages with respect to commit threshold (#2500) --- Cargo.lock | 4 +- bin/reth/src/stage/mod.rs | 184 +++++++++--------- crates/stages/src/pipeline/mod.rs | 3 +- crates/stages/src/stage.rs | 31 +-- crates/stages/src/stages/bodies.rs | 4 +- crates/stages/src/stages/execution.rs | 28 +-- crates/stages/src/stages/hashing_account.rs | 12 +- crates/stages/src/stages/hashing_storage.rs | 9 +- crates/stages/src/stages/headers.rs | 3 +- .../src/stages/index_account_history.rs | 9 +- .../src/stages/index_storage_history.rs | 9 +- crates/stages/src/stages/merkle.rs | 7 +- crates/stages/src/stages/sender_recovery.rs | 12 +- crates/stages/src/stages/total_difficulty.rs | 12 +- crates/stages/src/stages/tx_lookup.rs | 13 +- 15 files changed, 185 insertions(+), 155 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05389b5962..4ae3bca4dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5547,9 +5547,9 @@ dependencies = [ [[package]] name = "revm-precompile" -version = "2.0.2" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a3eabf08ea9e4063f5531b8735e29344d9d6eaebaa314c58253f6c17fcdf2d" +checksum = "41320af3bd6a65153d38eb1d3638ba89104cc9513c7feedb2d8510e8307dab29" dependencies = [ "k256 0.13.1", "num", diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index fe365d2654..c721e88cc9 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -20,7 +20,7 @@ use reth_stages::{ BodyStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, SenderRecoveryStage, TransactionLookupStage, }, - ExecInput, Stage, StageId, UnwindInput, + ExecInput, ExecOutput, Stage, StageId, UnwindInput, }; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tracing::*; @@ -77,6 +77,10 @@ pub struct Command { #[arg(long, short)] to: u64, + /// Batch size for stage execution and unwind + #[arg(long)] + batch_size: Option, + /// Normally, running the stage requires unwinding for stages that already /// have been run, in order to not rewrite to the same database slots. /// @@ -103,13 +107,6 @@ impl Command { let config: Config = confy::load_path(config_path).unwrap_or_default(); info!(target: "reth::cli", "reth {} starting stage {:?}", clap::crate_version!(), self.stage); - let input = ExecInput { - previous_stage: Some((StageId("No Previous Stage"), self.to)), - stage_progress: Some(self.from), - }; - - let unwind = UnwindInput { stage_progress: self.to, unwind_to: self.from, bad_block: None }; - // use the overridden db path if specified let db_path = data_dir.db_path(); @@ -122,102 +119,105 @@ impl Command { prometheus_exporter::initialize_with_db_metrics(listen_addr, Arc::clone(&db)).await?; } - let num_blocks = self.to - self.from + 1; + let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1); - match self.stage { - StageEnum::Bodies => { - let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())); + let (mut exec_stage, mut unwind_stage): (Box>, Option>>) = + match self.stage { + StageEnum::Bodies => { + let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())); - let mut config = config; - config.peers.connect_trusted_nodes_only = self.network.trusted_only; - if !self.network.trusted_peers.is_empty() { - self.network.trusted_peers.iter().for_each(|peer| { - config.peers.trusted_nodes.insert(*peer); - }); - } + let mut config = config; + config.peers.connect_trusted_nodes_only = self.network.trusted_only; + if !self.network.trusted_peers.is_empty() { + self.network.trusted_peers.iter().for_each(|peer| { + config.peers.trusted_nodes.insert(*peer); + }); + } - let network_secret_path = self - .network - .p2p_secret_key - .clone() - .unwrap_or_else(|| data_dir.p2p_secret_path()); - let p2p_secret_key = get_secret_key(&network_secret_path)?; + let network_secret_path = self + .network + .p2p_secret_key + .clone() + .unwrap_or_else(|| data_dir.p2p_secret_path()); + let p2p_secret_key = get_secret_key(&network_secret_path)?; - let default_peers_path = data_dir.known_peers_path(); + let default_peers_path = data_dir.known_peers_path(); - let network = self - .network - .network_config(&config, self.chain.clone(), p2p_secret_key, default_peers_path) - .build(Arc::new(ShareableDatabase::new(db.clone(), self.chain.clone()))) - .start_network() - .await?; - let fetch_client = Arc::new(network.fetch_client().await?); - - let mut stage = BodyStage { - downloader: BodiesDownloaderBuilder::default() - .with_stream_batch_size(num_blocks as usize) - .with_request_limit(config.stages.bodies.downloader_request_limit) - .with_max_buffered_responses( - config.stages.bodies.downloader_max_buffered_responses, + let network = self + .network + .network_config( + &config, + self.chain.clone(), + p2p_secret_key, + default_peers_path, ) - .with_concurrent_requests_range( - config.stages.bodies.downloader_min_concurrent_requests..= - config.stages.bodies.downloader_max_concurrent_requests, - ) - .build(fetch_client.clone(), consensus.clone(), db.clone()), - consensus: consensus.clone(), - }; + .build(Arc::new(ShareableDatabase::new(db.clone(), self.chain.clone()))) + .start_network() + .await?; + let fetch_client = Arc::new(network.fetch_client().await?); - if !self.skip_unwind { - stage.unwind(&mut tx, unwind).await?; + let stage = BodyStage { + downloader: BodiesDownloaderBuilder::default() + .with_stream_batch_size(batch_size as usize) + .with_request_limit(config.stages.bodies.downloader_request_limit) + .with_max_buffered_responses( + config.stages.bodies.downloader_max_buffered_responses, + ) + .with_concurrent_requests_range( + config.stages.bodies.downloader_min_concurrent_requests..= + config.stages.bodies.downloader_max_concurrent_requests, + ) + .build(fetch_client, consensus.clone(), db.clone()), + consensus: consensus.clone(), + }; + + (Box::new(stage), None) } - stage.execute(&mut tx, input).await?; - } - StageEnum::Senders => { - let mut stage = SenderRecoveryStage { commit_threshold: num_blocks }; - - // Unwind first - if !self.skip_unwind { - stage.unwind(&mut tx, unwind).await?; + StageEnum::Senders => { + (Box::new(SenderRecoveryStage { commit_threshold: batch_size }), None) } - stage.execute(&mut tx, input).await?; - } - StageEnum::Execution => { - let factory = reth_revm::Factory::new(self.chain.clone()); - let mut stage = ExecutionStage::new( - factory, - ExecutionStageThresholds { - max_blocks: Some(num_blocks), - max_changes: None, - max_changesets: None, - }, - ); - if !self.skip_unwind { - stage.unwind(&mut tx, unwind).await?; + StageEnum::Execution => { + let factory = reth_revm::Factory::new(self.chain.clone()); + ( + Box::new(ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: Some(batch_size), + max_changes: None, + max_changesets: None, + }, + )), + None, + ) } - stage.execute(&mut tx, input).await?; + StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None), + StageEnum::Merkle => ( + Box::new(MerkleStage::default_execution()), + Some(Box::new(MerkleStage::default_unwind())), + ), + _ => return Ok(()), + }; + let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage); + + let mut input = ExecInput { + previous_stage: Some((StageId("No Previous Stage"), self.to)), + stage_progress: Some(self.from), + }; + + let mut unwind = + UnwindInput { stage_progress: self.to, unwind_to: self.from, bad_block: None }; + + if !self.skip_unwind { + while unwind.stage_progress > self.from { + let unwind_output = unwind_stage.unwind(&mut tx, unwind).await?; + unwind.stage_progress = unwind_output.stage_progress; } - StageEnum::TxLookup => { - let mut stage = TransactionLookupStage::new(num_blocks); + } - // Unwind first - if !self.skip_unwind { - stage.unwind(&mut tx, unwind).await?; - } - - stage.execute(&mut tx, input).await?; - } - StageEnum::Merkle => { - let mut stage = MerkleStage::default_execution(); - - // Unwind first - if !self.skip_unwind { - stage.unwind(&mut tx, unwind).await?; - } - - stage.execute(&mut tx, input).await?; - } - _ => {} + while let ExecOutput { stage_progress, done: false } = + exec_stage.execute(&mut tx, input).await? + { + input.stage_progress = Some(stage_progress) } Ok(()) diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index f15b6ef2bc..cf7294c1f9 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -280,6 +280,8 @@ where self.listeners .notify(PipelineEvent::Unwound { stage_id, result: unwind_output }); + + tx.commit()?; } Err(err) => { self.listeners.notify(PipelineEvent::Error { stage_id }); @@ -289,7 +291,6 @@ where } } - tx.commit()?; Ok(()) } diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 2fbdccef75..d1be0945f5 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -3,7 +3,10 @@ use async_trait::async_trait; use reth_db::database::Database; use reth_primitives::BlockNumber; use reth_provider::Transaction; -use std::{cmp::min, ops::RangeInclusive}; +use std::{ + cmp::{max, min}, + ops::RangeInclusive, +}; /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] @@ -42,8 +45,8 @@ impl ExecInput { &self, threshold: u64, ) -> (RangeInclusive, bool) { - // plus +1 is to skip present block and allways start from block number 1, not 0. let current_block = self.stage_progress.unwrap_or_default(); + // +1 is to skip present block and always start from block number 1, not 0. let start = current_block + 1; let target = self.previous_stage_progress(); @@ -66,20 +69,26 @@ pub struct UnwindInput { } impl UnwindInput { - /// Return next block range that needs to be executed. + /// Return next block range that needs to be unwound. pub fn unwind_block_range(&self) -> RangeInclusive { - self.unwind_block_range_with_threshold(u64::MAX) + self.unwind_block_range_with_threshold(u64::MAX).0 } - /// Return the next block range to execute. - pub fn unwind_block_range_with_threshold(&self, threshold: u64) -> RangeInclusive { - // plus +1 is to skip present block. - let start = self.unwind_to + 1; - let mut end = self.stage_progress; + /// Return the next block range to unwind and the block we're unwinding to. + pub fn unwind_block_range_with_threshold( + &self, + threshold: u64, + ) -> (RangeInclusive, BlockNumber, bool) { + // +1 is to skip the block we're unwinding to + let mut start = self.unwind_to + 1; + let end = self.stage_progress; - end = min(end, start.saturating_add(threshold)); + start = max(start, end.saturating_sub(threshold)); - start..=end + let unwind_to = start - 1; + + let is_final_range = unwind_to == self.unwind_to; + (start..=end, unwind_to, is_final_range) } } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index f9ea757307..7c52a6621c 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -155,7 +155,7 @@ impl Stage for BodyStage { // - We got fewer blocks than our target // - We reached our target and the target was not limited by the batch size of the stage let done = highest_block == to_block; - info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, done, "Sync iteration finished"); + info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, is_final_range = done, "Stage iteration finished"); Ok(ExecOutput { stage_progress: highest_block, done }) } @@ -165,7 +165,6 @@ impl Stage for BodyStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - info!(target: "sync::stages::bodies", to_block = input.unwind_to, "Unwinding"); // Cursors to unwind bodies, ommers let mut body_cursor = tx.cursor_write::()?; let mut transaction_cursor = tx.cursor_write::()?; @@ -209,6 +208,7 @@ impl Stage for BodyStage { tx.delete::(number, None)?; } + info!(target: "sync::stages::bodies", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); Ok(UnwindOutput { stage_progress: input.unwind_to }) } } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index eaabd96ceb..41b7f73e37 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -141,7 +141,7 @@ impl ExecutionStage { let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx)); // Progress tracking - let mut progress = start_block; + let mut stage_progress = start_block; // Execute block range let mut state = PostState::default(); @@ -164,7 +164,7 @@ impl ExecutionStage { // Merge state changes state.extend(block_state); - progress = block_number; + stage_progress = block_number; // Write history periodically to free up memory if self.thresholds.should_write_history(state.changeset_size() as u64) { @@ -176,7 +176,6 @@ impl ExecutionStage { // Check if we should commit now if self.thresholds.is_end_of_batch(block_number - start_block, state.size() as u64) { - info!(target: "sync::stages::execution", ?block_number, "Threshold hit, committing."); break } } @@ -186,7 +185,10 @@ impl ExecutionStage { trace!(target: "sync::stages::execution", accounts = state.accounts().len(), "Writing updated state to database"); state.write_to_db(&**tx)?; trace!(target: "sync::stages::execution", took = ?Instant::now().duration_since(start), "Wrote state"); - Ok(ExecOutput { stage_progress: progress, done: progress == max_block }) + + let is_final_range = stage_progress == max_block; + info!(target: "sync::stages::execution", stage_progress, is_final_range, "Stage iteration finished"); + Ok(ExecOutput { stage_progress, done: is_final_range }) } } @@ -237,22 +239,21 @@ impl Stage for ExecutionStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - info!(target: "sync::stages::execution", to_block = input.unwind_to, "Unwinding"); - // Acquire changeset cursors let mut account_changeset = tx.cursor_dup_write::()?; let mut storage_changeset = tx.cursor_dup_write::()?; - let block_range = input.unwind_to + 1..=input.stage_progress; + let (range, unwind_to, is_final_range) = + input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX)); - if block_range.is_empty() { + if range.is_empty() { return Ok(UnwindOutput { stage_progress: input.unwind_to }) } // get all batches for account change // Check if walk and walk_dup would do the same thing let account_changeset_batch = - account_changeset.walk_range(block_range.clone())?.collect::, _>>()?; + account_changeset.walk_range(range.clone())?.collect::, _>>()?; // revert all changes to PlainState for (_, changeset) in account_changeset_batch.into_iter().rev() { @@ -265,7 +266,7 @@ impl Stage for ExecutionStage { // get all batches for storage change let storage_changeset_batch = storage_changeset - .walk_range(BlockNumberAddress::range(block_range.clone()))? + .walk_range(BlockNumberAddress::range(range.clone()))? .collect::, _>>()?; // revert all changes to PlainStorage @@ -286,7 +287,7 @@ impl Stage for ExecutionStage { // Discard unwinded changesets let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?; while let Some((block_num, _)) = rev_acc_changeset_walker.next().transpose()? { - if block_num < *block_range.start() { + if block_num <= unwind_to { break } // delete all changesets @@ -295,14 +296,15 @@ impl Stage for ExecutionStage { let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?; while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? { - if key.block_number() < *block_range.start() { + if key.block_number() < *range.start() { break } // delete all changesets tx.delete::(key, None)?; } - Ok(UnwindOutput { stage_progress: input.unwind_to }) + info!(target: "sync::stages::execution", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { stage_progress: unwind_to }) } } diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 360cdff546..c66b359257 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -255,6 +255,7 @@ impl Stage for AccountHashingStage { if next_address.is_some() { // from block is correct here as were are iteration over state for this // particular block + info!(target: "sync::stages::hashing_account", stage_progress = input.stage_progress(), is_final_range = false, "Stage iteration finished"); return Ok(ExecOutput { stage_progress: input.stage_progress(), done: false }) } } else { @@ -269,7 +270,7 @@ impl Stage for AccountHashingStage { tx.insert_account_for_hashing(accounts.into_iter())?; } - info!(target: "sync::stages::hashing_account", "Stage finished"); + info!(target: "sync::stages::hashing_account", stage_progress = input.previous_stage_progress(), is_final_range = true, "Stage iteration finished"); Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true }) } @@ -279,15 +280,14 @@ impl Stage for AccountHashingStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - // There is no threshold on account unwind, we will always take changesets and - // apply past values to HashedAccount table. - - let range = input.unwind_block_range(); + let (range, unwind_progress, is_final_range) = + input.unwind_block_range_with_threshold(self.commit_threshold); // Aggregate all transition changesets and and make list of account that have been changed. tx.unwind_account_hashing(range)?; - Ok(UnwindOutput { stage_progress: input.unwind_to }) + info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { stage_progress: unwind_progress }) } } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 8b4d6cc672..19e637b7a7 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -181,6 +181,7 @@ impl Stage for StorageHashingStage { if current_key.is_some() { // `from_block` is correct here as were are iteration over state for this // particular block. + info!(target: "sync::stages::hashing_storage", stage_progress = input.stage_progress(), is_final_range = false, "Stage iteration finished"); return Ok(ExecOutput { stage_progress: input.stage_progress(), done: false }) } } else { @@ -194,7 +195,7 @@ impl Stage for StorageHashingStage { tx.insert_storage_for_hashing(storages.into_iter())?; } - info!(target: "sync::stages::hashing_storage", "Stage finished"); + info!(target: "sync::stages::hashing_storage", stage_progress = input.previous_stage_progress(), is_final_range = true, "Stage iteration finished"); Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true }) } @@ -204,11 +205,13 @@ impl Stage for StorageHashingStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let range = input.unwind_block_range(); + let (range, unwind_progress, is_final_range) = + input.unwind_block_range_with_threshold(self.commit_threshold); tx.unwind_storage_hashing(BlockNumberAddress::range(range))?; - Ok(UnwindOutput { stage_progress: input.unwind_to }) + info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { stage_progress: unwind_progress }) } } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index b419e72321..3c38985283 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -241,12 +241,13 @@ where input: UnwindInput, ) -> Result { // TODO: handle bad block - info!(target: "sync::stages::headers", to_block = input.unwind_to, "Unwinding"); tx.unwind_table_by_walker::( input.unwind_to + 1, )?; tx.unwind_table_by_num::(input.unwind_to)?; tx.unwind_table_by_num::(input.unwind_to)?; + + info!(target: "sync::stages::headers", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); Ok(UnwindOutput { stage_progress: input.unwind_to }) } } diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index f5e2abd367..d2035e6867 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -46,7 +46,7 @@ impl Stage for IndexAccountHistoryStage { // Insert changeset to history index tx.insert_account_history_index(indices)?; - info!(target: "sync::stages::index_account_history", "Stage finished"); + info!(target: "sync::stages::index_account_history", stage_progress = *range.end(), is_final_range, "Stage iteration finished"); Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range }) } @@ -56,13 +56,14 @@ impl Stage for IndexAccountHistoryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, "Unwinding"); - let range = input.unwind_block_range(); + let (range, unwind_progress, is_final_range) = + input.unwind_block_range_with_threshold(self.commit_threshold); tx.unwind_account_history_indices(range)?; + info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); // from HistoryIndex higher than that number. - Ok(UnwindOutput { stage_progress: input.unwind_to }) + Ok(UnwindOutput { stage_progress: unwind_progress }) } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index c9d5fd4d1b..df1b6719ae 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -46,7 +46,7 @@ impl Stage for IndexStorageHistoryStage { let indices = tx.get_storage_transition_ids_from_changeset(range.clone())?; tx.insert_storage_history_index(indices)?; - info!(target: "sync::stages::index_storage_history", "Stage finished"); + info!(target: "sync::stages::index_storage_history", stage_progress = *range.end(), done = is_final_range, "Stage iteration finished"); Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range }) } @@ -56,12 +56,13 @@ impl Stage for IndexStorageHistoryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, "Unwinding"); - let range = input.unwind_block_range(); + let (range, unwind_progress, is_final_range) = + input.unwind_block_range_with_threshold(self.commit_threshold); tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?; - Ok(UnwindOutput { stage_progress: input.unwind_to }) + info!(target: "sync::stages::index_storage_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { stage_progress: unwind_progress }) } } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index d050c48e1a..aecd3e63d8 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -226,7 +226,7 @@ impl Stage for MerkleStage { self.validate_state_root(trie_root, block_root, to_block)?; - info!(target: "sync::stages::merkle::exec", "Stage finished"); + info!(target: "sync::stages::merkle::exec", stage_progress = to_block, is_final_range = true, "Stage iteration finished"); Ok(ExecOutput { stage_progress: to_block, done: true }) } @@ -238,13 +238,14 @@ impl Stage for MerkleStage { ) -> Result { let range = input.unwind_block_range(); if matches!(self, MerkleStage::Execution { .. }) { - info!(target: "sync::stages::merkle::exec", "Stage is always skipped"); + info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); return Ok(UnwindOutput { stage_progress: input.unwind_to }) } if input.unwind_to == 0 { tx.clear::()?; tx.clear::()?; + info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); return Ok(UnwindOutput { stage_progress: input.unwind_to }) } @@ -264,7 +265,7 @@ impl Stage for MerkleStage { info!(target: "sync::stages::merkle::unwind", "Nothing to unwind"); } - info!(target: "sync::stages::merkle::unwind", "Stage finished"); + info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); Ok(UnwindOutput { stage_progress: input.unwind_to }) } } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 2c8d30f7c9..127b64d51d 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -136,7 +136,7 @@ impl Stage for SenderRecoveryStage { } } - info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Sync iteration finished"); + info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Stage iteration finished"); Ok(ExecOutput { stage_progress: end_block, done: is_final_range }) } @@ -146,11 +146,15 @@ impl Stage for SenderRecoveryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, "Unwinding"); + let (_, unwind_to, is_final_range) = + input.unwind_block_range_with_threshold(self.commit_threshold); + // Lookup latest tx id that we should unwind to - let latest_tx_id = tx.block_body_indices(input.unwind_to)?.last_tx_num(); + let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num(); tx.unwind_table_by_num::(latest_tx_id)?; - Ok(UnwindOutput { stage_progress: input.unwind_to }) + + info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { stage_progress: unwind_to }) } } diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index 88dd0a84d9..4f3c242e10 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -81,7 +81,7 @@ impl Stage for TotalDifficultyStage { .map_err(|error| StageError::Validation { block: header.number, error })?; cursor_td.append(block_number, td.into())?; } - info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Sync iteration finished"); + info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Stage iteration finished"); Ok(ExecOutput { stage_progress: end_block, done: is_final_range }) } @@ -91,9 +91,13 @@ impl Stage for TotalDifficultyStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, "Unwinding"); - tx.unwind_table_by_num::(input.unwind_to)?; - Ok(UnwindOutput { stage_progress: input.unwind_to }) + let (_, unwind_to, is_final_range) = + input.unwind_block_range_with_threshold(self.commit_threshold); + + tx.unwind_table_by_num::(unwind_to)?; + + info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { stage_progress: unwind_to }) } } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index a88b925886..e7a102d4db 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -143,7 +143,7 @@ impl Stage for TransactionLookupStage { } } - info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Sync iteration finished"); + info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Stage iteration finished"); Ok(ExecOutput { done: is_final_range, stage_progress: end_block }) } @@ -153,14 +153,16 @@ impl Stage for TransactionLookupStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, "Unwinding"); + let (range, unwind_to, is_final_range) = + input.unwind_block_range_with_threshold(self.commit_threshold); + // Cursors to unwind tx hash to number let mut body_cursor = tx.cursor_read::()?; let mut tx_hash_number_cursor = tx.cursor_write::()?; let mut transaction_cursor = tx.cursor_read::()?; - let mut rev_walker = body_cursor.walk_back(None)?; + let mut rev_walker = body_cursor.walk_back(Some(*range.end()))?; while let Some((number, body)) = rev_walker.next().transpose()? { - if number <= input.unwind_to { + if number <= unwind_to { break } @@ -175,7 +177,8 @@ impl Stage for TransactionLookupStage { } } - Ok(UnwindOutput { stage_progress: input.unwind_to }) + info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { stage_progress: unwind_to }) } }