From 1c2726290207c4f1740e966cc4870dfe6cb4e30e Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 11 Jul 2024 22:27:33 +0200 Subject: [PATCH] chore: move `get` methods to `DatabaseProvider` (#9454) --- .../src/providers/database/provider.rs | 485 +++++++++--------- crates/storage/provider/src/traits/block.rs | 13 +- 2 files changed, 252 insertions(+), 246 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 393256d8b6..caa8c16297 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -5,10 +5,10 @@ use crate::{ traits::{ AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter, }, - AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, - EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter, HashingWriter, HeaderProvider, - HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider, HistoryWriter, - LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader, + AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, + BlockReader, BlockWriter, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter, + HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider, + HistoryWriter, LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, StageCheckpointReader, StateProviderBox, StateWriter, StatsReader, StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider, @@ -306,6 +306,15 @@ impl DatabaseProvider { .collect::, DatabaseError>>() } + /// Return a list of entries from the table, based on the given range. + #[inline] + pub fn get( + &self, + range: impl RangeBounds, + ) -> Result>, DatabaseError> { + self.tx.cursor_read::()?.walk_range(range)?.collect::, _>>() + } + /// Iterates over read only values in the given table and collects them into a vector. /// /// Early-returns if the range is empty, without opening a cursor transaction. @@ -577,12 +586,226 @@ impl DatabaseProvider { assemble_block(header, body, ommers, withdrawals, requests, senders) }) } -} -impl DatabaseProvider { - /// Commit database transaction. - pub fn commit(self) -> ProviderResult { - Ok(self.tx.commit()?) + /// Get requested blocks transaction with senders + pub(crate) fn get_block_transaction_range( + &self, + range: impl RangeBounds + Clone, + ) -> ProviderResult)>> { + // Raad range of block bodies to get all transactions id's of this range. + let block_bodies = self.get::(range)?; + + if block_bodies.is_empty() { + return Ok(Vec::new()) + } + + // Compute the first and last tx ID in the range + let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num(); + let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num(); + + // If this is the case then all of the blocks in the range are empty + if last_transaction < first_transaction { + return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect()) + } + + // Get transactions and senders + let transactions = self + .get::(first_transaction..=last_transaction)? + .into_iter() + .map(|(id, tx)| (id, tx.into())) + .collect::>(); + + let mut senders = + self.get::(first_transaction..=last_transaction)?; + + // Recover senders manually if not found in db + // NOTE: Transactions are always guaranteed to be in the database whereas + // senders might be pruned. + if senders.len() != transactions.len() { + if senders.len() > transactions.len() { + error!(target: "providers::db", senders=%senders.len(), transactions=%transactions.len(), + first_tx=%first_transaction, last_tx=%last_transaction, + "unexpected senders and transactions mismatch"); + } + let missing = transactions.len().saturating_sub(senders.len()); + senders.reserve(missing); + // Find all missing senders, their corresponding tx numbers and indexes to the original + // `senders` vector at which the recovered senders will be inserted. + let mut missing_senders = Vec::with_capacity(missing); + { + let mut senders = senders.iter().peekable(); + + // `transactions` contain all entries. `senders` contain _some_ of the senders for + // these transactions. Both are sorted and indexed by `TxNumber`. + // + // The general idea is to iterate on both `transactions` and `senders`, and advance + // the `senders` iteration only if it matches the current `transactions` entry's + // `TxNumber`. Otherwise, add the transaction to the list of missing senders. + for (i, (tx_number, transaction)) in transactions.iter().enumerate() { + if let Some((sender_tx_number, _)) = senders.peek() { + if sender_tx_number == tx_number { + // If current sender's `TxNumber` matches current transaction's + // `TxNumber`, advance the senders iterator. + senders.next(); + } else { + // If current sender's `TxNumber` doesn't match current transaction's + // `TxNumber`, add it to missing senders. + missing_senders.push((i, tx_number, transaction)); + } + } else { + // If there's no more senders left, but we're still iterating over + // transactions, add them to missing senders + missing_senders.push((i, tx_number, transaction)); + } + } + } + + // Recover senders + let recovered_senders = TransactionSigned::recover_signers( + missing_senders.iter().map(|(_, _, tx)| *tx).collect::>(), + missing_senders.len(), + ) + .ok_or(ProviderError::SenderRecoveryError)?; + + // Insert recovered senders along with tx numbers at the corresponding indexes to the + // original `senders` vector + for ((i, tx_number, _), sender) in missing_senders.into_iter().zip(recovered_senders) { + // Insert will put recovered senders at necessary positions and shift the rest + senders.insert(i, (*tx_number, sender)); + } + + // Debug assertions which are triggered during the test to ensure that all senders are + // present and sorted + debug_assert_eq!(senders.len(), transactions.len(), "missing one or more senders"); + debug_assert!( + senders.iter().tuple_windows().all(|(a, b)| a.0 < b.0), + "senders not sorted" + ); + } + + // Merge transaction into blocks + let mut block_tx = Vec::with_capacity(block_bodies.len()); + let mut senders = senders.into_iter(); + let mut transactions = transactions.into_iter(); + for (block_number, block_body) in block_bodies { + let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize); + for _ in block_body.tx_num_range() { + let tx = transactions.next(); + let sender = senders.next(); + + let recovered = match (tx, sender) { + (Some((tx_id, tx)), Some((sender_tx_id, sender))) => { + if tx_id != sender_tx_id { + Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) + } else { + Ok(TransactionSignedEcRecovered::from_signed_transaction(tx, sender)) + } + } + (Some((tx_id, _)), _) | (_, Some((tx_id, _))) => { + Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) + } + (None, None) => Err(ProviderError::BlockBodyTransactionCount), + }?; + one_block_tx.push(recovered) + } + block_tx.push((block_number, one_block_tx)); + } + + Ok(block_tx) + } + + /// Get the given range of blocks. + pub fn get_block_range( + &self, + range: impl RangeBounds + Clone, + ) -> ProviderResult> { + // For blocks we need: + // + // - Headers + // - Bodies (transactions) + // - Uncles/ommers + // - Withdrawals + // - Requests + // - Signers + + let block_headers = self.get::(range.clone())?; + if block_headers.is_empty() { + return Ok(Vec::new()) + } + + let block_header_hashes = self.get::(range.clone())?; + let block_ommers = self.get::(range.clone())?; + let block_withdrawals = self.get::(range.clone())?; + let block_requests = self.get::(range.clone())?; + + let block_tx = self.get_block_transaction_range(range)?; + + // merge all into block + let block_header_iter = block_headers.into_iter(); + let block_header_hashes_iter = block_header_hashes.into_iter(); + let block_tx_iter = block_tx.into_iter(); + + // Ommers can be empty for some blocks + let mut block_ommers_iter = block_ommers.into_iter(); + let mut block_withdrawals_iter = block_withdrawals.into_iter(); + let mut block_requests_iter = block_requests.into_iter(); + let mut block_ommers = block_ommers_iter.next(); + let mut block_withdrawals = block_withdrawals_iter.next(); + let mut block_requests = block_requests_iter.next(); + + let mut blocks = Vec::new(); + for ((main_block_number, header), (_, header_hash), (_, tx)) in + izip!(block_header_iter.into_iter(), block_header_hashes_iter, block_tx_iter) + { + let header = header.seal(header_hash); + + let (body, senders) = tx.into_iter().map(|tx| tx.to_components()).unzip(); + + // Ommers can be missing + let mut ommers = Vec::new(); + if let Some((block_number, _)) = block_ommers.as_ref() { + if *block_number == main_block_number { + ommers = block_ommers.take().unwrap().1.ommers; + block_ommers = block_ommers_iter.next(); + } + }; + + // withdrawal can be missing + let shanghai_is_active = + self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp); + let mut withdrawals = Some(Withdrawals::default()); + if shanghai_is_active { + if let Some((block_number, _)) = block_withdrawals.as_ref() { + if *block_number == main_block_number { + withdrawals = Some(block_withdrawals.take().unwrap().1.withdrawals); + block_withdrawals = block_withdrawals_iter.next(); + } + } + } else { + withdrawals = None + } + + // requests can be missing + let prague_is_active = self.chain_spec.is_prague_active_at_timestamp(header.timestamp); + let mut requests = Some(Requests::default()); + if prague_is_active { + if let Some((block_number, _)) = block_requests.as_ref() { + if *block_number == main_block_number { + requests = Some(block_requests.take().unwrap().1); + block_requests = block_requests_iter.next(); + } + } + } else { + requests = None; + } + + blocks.push(SealedBlockWithSenders { + block: SealedBlock { header, body, ommers, withdrawals, requests }, + senders, + }) + } + + Ok(blocks) } /// Return the last N blocks of state, recreating the [`ExecutionOutcome`]. @@ -637,8 +860,8 @@ impl DatabaseProvider { // state of end range. We should rename the functions or add support to access // History state. Accessing history state can be tricky but we are not gaining // anything. - let mut plain_accounts_cursor = self.tx.cursor_write::()?; - let mut plain_storage_cursor = self.tx.cursor_dup_write::()?; + let mut plain_accounts_cursor = self.tx.cursor_read::()?; + let mut plain_storage_cursor = self.tx.cursor_dup_read::()?; let mut reverts: RevertsInit = HashMap::new(); @@ -719,7 +942,13 @@ impl DatabaseProvider { Vec::new(), )) } +} +impl DatabaseProvider { + /// Commit database transaction. + pub fn commit(self) -> ProviderResult { + Ok(self.tx.commit()?) + } /// Take the last N blocks of state, recreating the [`ExecutionOutcome`]. /// /// The latest state will be unwound and returned back with all the blocks @@ -905,15 +1134,6 @@ impl DatabaseProvider { Ok(entries) } - /// Return a list of entries from the table, based on the given range. - #[inline] - pub fn get( - &self, - range: impl RangeBounds, - ) -> Result>, DatabaseError> { - self.tx.cursor_read::()?.walk_range(range)?.collect::, _>>() - } - /// Return a list of entries from the table, and remove them, based on the given range. #[inline] pub fn take( @@ -930,133 +1150,6 @@ impl DatabaseProvider { Ok(items) } - /// Get requested blocks transaction with senders - pub(crate) fn get_block_transaction_range( - &self, - range: impl RangeBounds + Clone, - ) -> ProviderResult)>> { - // Raad range of block bodies to get all transactions id's of this range. - let block_bodies = self.get::(range)?; - - if block_bodies.is_empty() { - return Ok(Vec::new()) - } - - // Compute the first and last tx ID in the range - let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num(); - let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num(); - - // If this is the case then all of the blocks in the range are empty - if last_transaction < first_transaction { - return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect()) - } - - // Get transactions and senders - let transactions = self - .get::(first_transaction..=last_transaction)? - .into_iter() - .map(|(id, tx)| (id, tx.into())) - .collect::>(); - - let mut senders = - self.get::(first_transaction..=last_transaction)?; - - // Recover senders manually if not found in db - // NOTE: Transactions are always guaranteed to be in the database whereas - // senders might be pruned. - if senders.len() != transactions.len() { - if senders.len() > transactions.len() { - error!(target: "providers::db", senders=%senders.len(), transactions=%transactions.len(), - first_tx=%first_transaction, last_tx=%last_transaction, - "unexpected senders and transactions mismatch"); - } - let missing = transactions.len().saturating_sub(senders.len()); - senders.reserve(missing); - // Find all missing senders, their corresponding tx numbers and indexes to the original - // `senders` vector at which the recovered senders will be inserted. - let mut missing_senders = Vec::with_capacity(missing); - { - let mut senders = senders.iter().peekable(); - - // `transactions` contain all entries. `senders` contain _some_ of the senders for - // these transactions. Both are sorted and indexed by `TxNumber`. - // - // The general idea is to iterate on both `transactions` and `senders`, and advance - // the `senders` iteration only if it matches the current `transactions` entry's - // `TxNumber`. Otherwise, add the transaction to the list of missing senders. - for (i, (tx_number, transaction)) in transactions.iter().enumerate() { - if let Some((sender_tx_number, _)) = senders.peek() { - if sender_tx_number == tx_number { - // If current sender's `TxNumber` matches current transaction's - // `TxNumber`, advance the senders iterator. - senders.next(); - } else { - // If current sender's `TxNumber` doesn't match current transaction's - // `TxNumber`, add it to missing senders. - missing_senders.push((i, tx_number, transaction)); - } - } else { - // If there's no more senders left, but we're still iterating over - // transactions, add them to missing senders - missing_senders.push((i, tx_number, transaction)); - } - } - } - - // Recover senders - let recovered_senders = TransactionSigned::recover_signers( - missing_senders.iter().map(|(_, _, tx)| *tx).collect::>(), - missing_senders.len(), - ) - .ok_or(ProviderError::SenderRecoveryError)?; - - // Insert recovered senders along with tx numbers at the corresponding indexes to the - // original `senders` vector - for ((i, tx_number, _), sender) in missing_senders.into_iter().zip(recovered_senders) { - // Insert will put recovered senders at necessary positions and shift the rest - senders.insert(i, (*tx_number, sender)); - } - - // Debug assertions which are triggered during the test to ensure that all senders are - // present and sorted - debug_assert_eq!(senders.len(), transactions.len(), "missing one or more senders"); - debug_assert!( - senders.iter().tuple_windows().all(|(a, b)| a.0 < b.0), - "senders not sorted" - ); - } - - // Merge transaction into blocks - let mut block_tx = Vec::with_capacity(block_bodies.len()); - let mut senders = senders.into_iter(); - let mut transactions = transactions.into_iter(); - for (block_number, block_body) in block_bodies { - let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize); - for _ in block_body.tx_num_range() { - let tx = transactions.next(); - let sender = senders.next(); - - let recovered = match (tx, sender) { - (Some((tx_id, tx)), Some((sender_tx_id, sender))) => { - if tx_id != sender_tx_id { - Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) - } else { - Ok(TransactionSignedEcRecovered::from_signed_transaction(tx, sender)) - } - } - (Some((tx_id, _)), _) | (_, Some((tx_id, _))) => { - Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) - } - (None, None) => Err(ProviderError::BlockBodyTransactionCount), - }?; - one_block_tx.push(recovered) - } - block_tx.push((block_number, one_block_tx)); - } - - Ok(block_tx) - } - /// Remove requested block transactions, without returning them. /// /// This will remove block data for the given range from the following tables: @@ -1294,100 +1387,6 @@ impl DatabaseProvider { Ok(()) } - /// Get the given range of blocks. - pub fn get_block_range( - &self, - range: impl RangeBounds + Clone, - ) -> ProviderResult> { - // For blocks we need: - // - // - Headers - // - Bodies (transactions) - // - Uncles/ommers - // - Withdrawals - // - Requests - // - Signers - - let block_headers = self.get::(range.clone())?; - if block_headers.is_empty() { - return Ok(Vec::new()) - } - - let block_header_hashes = self.get::(range.clone())?; - let block_ommers = self.get::(range.clone())?; - let block_withdrawals = self.get::(range.clone())?; - let block_requests = self.get::(range.clone())?; - - let block_tx = self.get_block_transaction_range(range)?; - - // merge all into block - let block_header_iter = block_headers.into_iter(); - let block_header_hashes_iter = block_header_hashes.into_iter(); - let block_tx_iter = block_tx.into_iter(); - - // Ommers can be empty for some blocks - let mut block_ommers_iter = block_ommers.into_iter(); - let mut block_withdrawals_iter = block_withdrawals.into_iter(); - let mut block_requests_iter = block_requests.into_iter(); - let mut block_ommers = block_ommers_iter.next(); - let mut block_withdrawals = block_withdrawals_iter.next(); - let mut block_requests = block_requests_iter.next(); - - let mut blocks = Vec::new(); - for ((main_block_number, header), (_, header_hash), (_, tx)) in - izip!(block_header_iter.into_iter(), block_header_hashes_iter, block_tx_iter) - { - let header = header.seal(header_hash); - - let (body, senders) = tx.into_iter().map(|tx| tx.to_components()).unzip(); - - // Ommers can be missing - let mut ommers = Vec::new(); - if let Some((block_number, _)) = block_ommers.as_ref() { - if *block_number == main_block_number { - ommers = block_ommers.take().unwrap().1.ommers; - block_ommers = block_ommers_iter.next(); - } - }; - - // withdrawal can be missing - let shanghai_is_active = - self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp); - let mut withdrawals = Some(Withdrawals::default()); - if shanghai_is_active { - if let Some((block_number, _)) = block_withdrawals.as_ref() { - if *block_number == main_block_number { - withdrawals = Some(block_withdrawals.take().unwrap().1.withdrawals); - block_withdrawals = block_withdrawals_iter.next(); - } - } - } else { - withdrawals = None - } - - // requests can be missing - let prague_is_active = self.chain_spec.is_prague_active_at_timestamp(header.timestamp); - let mut requests = Some(Requests::default()); - if prague_is_active { - if let Some((block_number, _)) = block_requests.as_ref() { - if *block_number == main_block_number { - requests = Some(block_requests.take().unwrap().1); - block_requests = block_requests_iter.next(); - } - } - } else { - requests = None; - } - - blocks.push(SealedBlockWithSenders { - block: SealedBlock { header, body, ommers, withdrawals, requests }, - senders, - }) - } - - Ok(blocks) - } - /// Remove the given range of blocks, and return them. /// /// This will remove block data for the given range from the following tables: @@ -2990,7 +2989,7 @@ impl HistoryWriter for DatabaseProvider { } } -impl BlockExecutionWriter for DatabaseProviderRW { +impl BlockExecutionReader for DatabaseProvider { fn get_block_and_execution_range( &self, range: RangeInclusive, @@ -3003,7 +3002,9 @@ impl BlockExecutionWriter for DatabaseProviderRW { Ok(Chain::new(blocks, execution_state, None)) } +} +impl BlockExecutionWriter for DatabaseProviderRW { fn take_block_and_execution_range( &self, range: RangeInclusive, diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index a4052d49b6..09ca8d3a7a 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -1,6 +1,7 @@ use reth_db_api::models::StoredBlockBodyIndices; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{BlockNumber, SealedBlockWithSenders}; +use reth_storage_api::BlockReader; use reth_storage_errors::provider::ProviderResult; use reth_trie::{updates::TrieUpdates, HashedPostState}; use std::ops::RangeInclusive; @@ -8,14 +9,18 @@ use std::ops::RangeInclusive; /// BlockExecution Writer #[auto_impl::auto_impl(&, Arc, Box)] pub trait BlockExecutionWriter: BlockWriter + Send + Sync { - /// Get range of blocks and its execution result - fn get_block_and_execution_range( + /// Take range of blocks and its execution result + fn take_block_and_execution_range( &self, range: RangeInclusive, ) -> ProviderResult; +} - /// Take range of blocks and its execution result - fn take_block_and_execution_range( +/// BlockExecution Writer +#[auto_impl::auto_impl(&, Arc, Box)] +pub trait BlockExecutionReader: BlockReader + Send + Sync { + /// Get range of blocks and its execution result + fn get_block_and_execution_range( &self, range: RangeInclusive, ) -> ProviderResult;