From 0bf360e460715496e1a0bec6f62f5d5d0f81a710 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Tue, 6 Jun 2023 23:51:19 +0300 Subject: [PATCH] feat(stages): normalized sender recovery commits (#3013) Co-authored-by: Georgios Konstantopoulos --- crates/config/src/config.rs | 4 +- crates/stages/src/stage.rs | 51 ++++++- crates/stages/src/stages/execution.rs | 2 +- crates/stages/src/stages/hashing_account.rs | 2 +- crates/stages/src/stages/hashing_storage.rs | 2 +- .../src/stages/index_account_history.rs | 5 +- .../src/stages/index_storage_history.rs | 5 +- crates/stages/src/stages/sender_recovery.rs | 128 +++++++++++------- crates/stages/src/stages/tx_lookup.rs | 2 +- crates/stages/src/test_utils/test_db.rs | 1 + 10 files changed, 132 insertions(+), 70 deletions(-) diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 7f4b720944..a8faafaf74 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -178,13 +178,13 @@ impl From for BodiesDownloaderBuilder { #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)] #[serde(default)] pub struct SenderRecoveryConfig { - /// The maximum number of blocks to process before committing progress to the database. + /// The maximum number of transactions to process before committing progress to the database. pub commit_threshold: u64, } impl Default for SenderRecoveryConfig { fn default() -> Self { - Self { commit_threshold: 5_000 } + Self { commit_threshold: 50_000 } } } diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 64b462d571..0abad4d9f4 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,11 +1,11 @@ use crate::error::StageError; use async_trait::async_trait; -use reth_db::database::Database; +use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_primitives::{ stage::{StageCheckpoint, StageId}, - BlockNumber, + BlockNumber, TxNumber, }; -use reth_provider::Transaction; +use reth_provider::{ProviderError, Transaction}; use std::{ cmp::{max, min}, ops::RangeInclusive, @@ -26,6 +26,18 @@ impl ExecInput { self.checkpoint.unwrap_or_default() } + /// Return the next block number after the current + /// +1 is needed to skip the present block and always start from block number 1, not 0. + pub fn next_block(&self) -> BlockNumber { + let current_block = self.checkpoint(); + current_block.block_number + 1 + } + + /// Returns `true` if the target block number has already been reached. + pub fn target_reached(&self) -> bool { + self.checkpoint().block_number >= self.previous_stage_checkpoint_block_number() + } + /// Return the progress of the previous stage or default. pub fn previous_stage_checkpoint_block_number(&self) -> BlockNumber { self.previous_stage.map(|(_, block_number)| block_number).unwrap_or_default() @@ -49,7 +61,6 @@ impl ExecInput { threshold: u64, ) -> (RangeInclusive, bool) { let current_block = self.checkpoint(); - // +1 is to skip present block and always start from block number 1, not 0. let start = current_block.block_number + 1; let target = self.previous_stage_checkpoint_block_number(); @@ -58,6 +69,38 @@ impl ExecInput { let is_final_range = end == target; (start..=end, is_final_range) } + + /// Return the next block range determined the number of transactions within it. + /// This function walks the the block indices until either the end of the range is reached or + /// the number of transactions exceeds the threshold. + pub fn next_block_range_with_transaction_threshold( + &self, + tx: &Transaction<'_, DB>, + tx_threshold: u64, + ) -> Result<(RangeInclusive, RangeInclusive, bool), StageError> { + let start_block = self.next_block(); + let start_block_body = tx + .get::(start_block)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?; + + let target_block = self.previous_stage_checkpoint_block_number(); + + let first_tx_number = start_block_body.first_tx_num(); + let mut last_tx_number = start_block_body.last_tx_num(); + let mut end_block_number = start_block; + let mut body_indices_cursor = tx.cursor_read::()?; + for entry in body_indices_cursor.walk_range(start_block..=target_block)? { + let (block, body) = entry?; + last_tx_number = body.last_tx_num(); + end_block_number = block; + let tx_count = (first_tx_number..=last_tx_number).count() as u64; + if tx_count > tx_threshold { + break + } + } + let is_final_range = end_block_number >= target_block; + Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range)) + } } /// Stage unwind input, see [Stage::unwind]. diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 6021c1a404..63bb89efd7 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -138,7 +138,7 @@ impl ExecutionStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let start_block = input.checkpoint().block_number + 1; + let start_block = input.next_block(); let max_block = input.previous_stage_checkpoint_block_number(); // Build executor diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 82d1f68235..39def8647e 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -552,7 +552,7 @@ mod tests { output: Option, ) -> Result<(), TestRunnerError> { if let Some(output) = output { - let start_block = input.checkpoint().block_number + 1; + let start_block = input.next_block(); let end_block = output.checkpoint.block_number; if start_block > end_block { return Ok(()) diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index c25cd94385..fc19a20506 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -486,7 +486,7 @@ mod tests { type Seed = Vec; fn seed_execution(&mut self, input: ExecInput) -> Result { - let stage_progress = input.checkpoint().block_number + 1; + let stage_progress = input.next_block(); let end = input.previous_stage_checkpoint_block_number(); let n_accounts = 31; diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index b85695774b..a435f99b24 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -216,10 +216,7 @@ mod tests { ExecOutput { checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { - from: input.checkpoint().block_number + 1, - to: run_to - }, + block_range: CheckpointBlockRange { from: input.next_block(), to: run_to }, progress: EntitiesCheckpoint { processed: 2, total: 2 } } ), diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 19d5427ae1..6bb328d0ba 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -229,10 +229,7 @@ mod tests { ExecOutput { checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( IndexHistoryCheckpoint { - block_range: CheckpointBlockRange { - from: input.checkpoint().block_number + 1, - to: run_to - }, + block_range: CheckpointBlockRange { from: input.next_block(), to: run_to }, progress: EntitiesCheckpoint { processed: 2, total: 2 } } ), diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 3cc0f12153..a3cf8c22ac 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -38,7 +38,7 @@ impl SenderRecoveryStage { impl Default for SenderRecoveryStage { fn default() -> Self { - Self { commit_threshold: 500_000 } + Self { commit_threshold: 50_000 } } } @@ -59,21 +59,17 @@ impl Stage for SenderRecoveryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); - if range.is_empty() { - return Ok(ExecOutput::done(*range.end())) + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint().block_number)) } - let (start_block, end_block) = range.clone().into_inner(); - // Look up the start index for the transaction range - let first_tx_num = tx.block_body_indices(start_block)?.first_tx_num(); - - // Look up the end index for transaction range (inclusive) - let last_tx_num = tx.block_body_indices(end_block)?.last_tx_num(); + let (tx_range, block_range, is_final_range) = + input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?; + let end_block = *block_range.end(); // No transactions to walk over - if first_tx_num > last_tx_num { - info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached"); + if tx_range.is_empty() { + info!(target: "sync::stages::sender_recovery", ?tx_range, "Target transaction already reached"); return Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), @@ -87,11 +83,11 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor over the transactions let mut tx_cursor = tx.cursor_read::>()?; // Walk the transactions from start to end index (inclusive) - let tx_walker = - tx_cursor.walk_range(RawKey::new(first_tx_num)..=RawKey::new(last_tx_num))?; + let raw_tx_range = RawKey::new(*tx_range.start())..=RawKey::new(*tx_range.end()); + let tx_walker = tx_cursor.walk_range(raw_tx_range)?; // Iterate over transactions in chunks - info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Recovering senders"); + info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders"); // channels used to return result of sender recovery. let mut channels = Vec::new(); @@ -102,12 +98,11 @@ impl Stage for SenderRecoveryStage { // We try to evenly divide the transactions to recover across all threads in the threadpool. // Chunks are submitted instead of individual transactions to reduce the overhead of work // stealing in the threadpool workers. - for chunk in - &tx_walker.chunks(self.commit_threshold as usize / rayon::current_num_threads()) - { + let chunk_size = self.commit_threshold as usize / rayon::current_num_threads(); + for chunk in &tx_walker.chunks(chunk_size) { // An _unordered_ channel to receive results from a rayon job - let (tx, rx) = mpsc::unbounded_channel(); - channels.push(rx); + let (recovered_senders_tx, recovered_senders_rx) = mpsc::unbounded_channel(); + channels.push(recovered_senders_rx); // Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send) let chunk: Vec<_> = chunk.collect(); @@ -140,7 +135,8 @@ impl Stage for SenderRecoveryStage { let mut rlp_buf = Vec::with_capacity(128); for entry in chunk { rlp_buf.clear(); - let _ = tx.send(recover(entry, &mut rlp_buf)); + let recovery_result = recover_sender(entry, &mut rlp_buf); + let _ = recovered_senders_tx.send(recovery_result); } }); } @@ -204,6 +200,24 @@ impl Stage for SenderRecoveryStage { } } +fn recover_sender( + entry: Result<(RawKey, RawValue), DatabaseError>, + rlp_buf: &mut Vec, +) -> Result<(u64, H160), Box> { + let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?; + let tx_id = tx_id.key().expect("key to be formated"); + + let tx = transaction.value().expect("value to be formated"); + tx.transaction.encode_without_signature(rlp_buf); + + let sender = tx + .signature + .recover_signer(keccak256(rlp_buf)) + .ok_or(StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }))?; + + Ok((tx_id, sender)) +} + fn stage_checkpoint( tx: &Transaction<'_, DB>, ) -> Result { @@ -289,53 +303,63 @@ mod tests { /// Execute the stage twice with input range that exceeds the commit threshold #[tokio::test] async fn execute_intermediate_commit() { - let threshold = 50; + let threshold = 10; let mut runner = SenderRecoveryTestRunner::default(); runner.set_threshold(threshold); let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold + + // Manually seed once with full input range + let seed = random_block_range(stage_progress + 1..=previous_stage, H256::zero(), 0..4); // set tx count range high enough to hit the threshold + runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution"); + + let total_transactions = runner.tx.table::().unwrap().len() as u64; + let first_input = ExecInput { previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; - // Seed only once with full input range - runner.seed_execution(first_input).expect("failed to seed execution"); - - let total_transactions = runner.tx.table::().unwrap().len() as u64; - // Execute first time let result = runner.execute(first_input).await.unwrap(); - let expected_progress = stage_progress + threshold; - assert_matches!( - result, - Ok(ExecOutput { checkpoint: StageCheckpoint { - block_number, - stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { - processed, - total - })) - }, done: false }) if block_number == expected_progress && - processed == runner.tx.table::().unwrap().len() as u64 && - total == total_transactions + let mut tx_count = 0; + let expected_progress = seed + .iter() + .find(|x| { + tx_count += x.body.len(); + tx_count as u64 > threshold + }) + .map(|x| x.number) + .unwrap_or(previous_stage); + assert_matches!(result, Ok(_)); + assert_eq!( + result.unwrap(), + ExecOutput { + checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( + EntitiesCheckpoint { + processed: runner.tx.table::().unwrap().len() as u64, + total: total_transactions + } + ), + done: false + } ); - // Execute second time + // Execute second time to completion + runner.set_threshold(u64::MAX); let second_input = ExecInput { previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput { checkpoint: StageCheckpoint { - block_number, - stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { - processed, - total - })) - }, done: true }) if block_number == previous_stage && - processed == runner.tx.table::().unwrap().len() as u64 && - total == total_transactions + assert_matches!(result, Ok(_)); + assert_eq!( + result.as_ref().unwrap(), + &ExecOutput { + checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( + EntitiesCheckpoint { processed: total_transactions, total: total_transactions } + ), + done: true + } ); assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); @@ -410,7 +434,7 @@ mod tests { ) -> Result<(), TestRunnerError> { match output { Some(output) => self.tx.query(|tx| { - let start_block = input.checkpoint().block_number + 1; + let start_block = input.next_block(); let end_block = output.checkpoint.block_number; if start_block > end_block { diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index b575de9ac7..2471000127 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -389,7 +389,7 @@ mod tests { ) -> Result<(), TestRunnerError> { match output { Some(output) => self.tx.query(|tx| { - let start_block = input.checkpoint().block_number + 1; + let start_block = input.next_block(); let end_block = output.checkpoint.block_number; if start_block > end_block { diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index eee758cc9c..41a0f49569 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -19,6 +19,7 @@ use reth_provider::Transaction; use std::{ borrow::Borrow, collections::BTreeMap, + ops::RangeInclusive, path::{Path, PathBuf}, sync::Arc, };