From a29d8bdbdfe38bbcb96574429a3a29305ac17d5c Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 10 Jul 2024 12:10:41 -0400 Subject: [PATCH] feat: remove get or take usage (#9412) --- crates/stages/stages/src/stages/execution.rs | 4 +- .../provider/src/providers/database/mod.rs | 6 +- .../src/providers/database/provider.rs | 776 ++++++++++++++---- crates/storage/provider/src/traits/block.rs | 12 +- 4 files changed, 609 insertions(+), 189 deletions(-) diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index d3dcdd1bb2..4e2f249a87 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -410,7 +410,7 @@ where // Unwind account and storage changesets, as well as receipts. // // This also updates `PlainStorageState` and `PlainAccountState`. - let bundle_state_with_receipts = provider.unwind_or_peek_state::(range.clone())?; + let bundle_state_with_receipts = provider.take_state(range.clone())?; // Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent. if self.exex_manager_handle.has_exexs() { @@ -444,7 +444,7 @@ where // files do not support filters. // // If we hit this case, the receipts have already been unwound by the call to - // `unwind_or_peek_state`. + // `take_state`. } // Update the checkpoint. diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index c8d97a6e98..3faaf3f1d2 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -703,7 +703,7 @@ mod tests { } #[test] - fn get_take_block_transaction_range_recover_senders() { + fn take_block_transaction_range_recover_senders() { let factory = create_test_provider_factory(); let mut rng = generators::rng(); @@ -718,7 +718,7 @@ mod tests { Ok(_) ); - let senders = provider.get_or_take::(range.clone()); + let senders = provider.take::(range.clone()); assert_eq!( senders, Ok(range @@ -733,7 +733,7 @@ mod tests { let db_senders = provider.senders_by_tx_range(range); assert_eq!(db_senders, Ok(vec![])); - let result = provider.get_take_block_transaction_range::(0..=0); + let result = provider.take_block_transaction_range(0..=0); assert_eq!( result, Ok(vec![( diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index fba739cca4..910df75833 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -585,13 +585,7 @@ impl DatabaseProvider { Ok(self.tx.commit()?) } - // TODO(joshie) TEMPORARY should be moved to trait providers - /// Unwind or peek at last N blocks of state recreating the [`ExecutionOutcome`]. - /// - /// If UNWIND it set to true tip and latest state will be unwind - /// and returned back with all the blocks - /// - /// If UNWIND is false we will just read the state/blocks and return them. + /// Return the last N blocks of state, recreating the [`ExecutionOutcome`]. /// /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the /// transaction ids. @@ -610,7 +604,7 @@ impl DatabaseProvider { /// 1. Take the old value from the changeset /// 2. Take the new value from the local state /// 3. Set the local state to the value in the changeset - pub fn unwind_or_peek_state( + pub fn get_state( &self, range: RangeInclusive, ) -> ProviderResult { @@ -620,7 +614,7 @@ impl DatabaseProvider { let start_block_number = *range.start(); // We are not removing block meta as it is used to get block changesets. - let block_bodies = self.get_or_take::(range.clone())?; + let block_bodies = self.get::(range.clone())?; // get transaction receipts let from_transaction_num = @@ -630,9 +624,8 @@ impl DatabaseProvider { let storage_range = BlockNumberAddress::range(range.clone()); - let storage_changeset = - self.get_or_take::(storage_range)?; - let account_changeset = self.get_or_take::(range)?; + let storage_changeset = self.get::(storage_range)?; + let account_changeset = self.get::(range)?; // iterate previous value and get plain state value to create changeset // Double option around Account represent if Account state is know (first option) and @@ -701,45 +694,178 @@ impl DatabaseProvider { .push(old_storage); } - if TAKE { - // iterate over local plain state remove all account and all storages. - for (address, (old_account, new_account, storage)) in &state { - // revert account if needed. - if old_account != new_account { - let existing_entry = plain_accounts_cursor.seek_exact(*address)?; - if let Some(account) = old_account { - plain_accounts_cursor.upsert(*address, *account)?; - } else if existing_entry.is_some() { - plain_accounts_cursor.delete_current()?; - } + // iterate over block body and create ExecutionResult + let mut receipt_iter = + self.get::(from_transaction_num..=to_transaction_num)?.into_iter(); + + let mut receipts = Vec::new(); + // loop break if we are at the end of the blocks. + for (_, block_body) in block_bodies { + let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); + for _ in block_body.tx_num_range() { + if let Some((_, receipt)) = receipt_iter.next() { + block_receipts.push(Some(receipt)); + } + } + receipts.push(block_receipts); + } + + Ok(ExecutionOutcome::new_init( + state, + reverts, + Vec::new(), + receipts.into(), + start_block_number, + Vec::new(), + )) + } + + /// Take the last N blocks of state, recreating the [`ExecutionOutcome`]. + /// + /// The latest state will be unwound and returned back with all the blocks + /// + /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the + /// transaction ids. + /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the + /// [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct + /// the changesets. + /// - In order to have both the old and new values in the changesets, we also access the + /// plain state tables. + /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot, + /// we: + /// 1. Take the old value from the changeset + /// 2. Take the new value from the plain state + /// 3. Save the old value to the local state + /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we + /// have seen before we: + /// 1. Take the old value from the changeset + /// 2. Take the new value from the local state + /// 3. Set the local state to the value in the changeset + pub fn take_state( + &self, + range: RangeInclusive, + ) -> ProviderResult { + if range.is_empty() { + return Ok(ExecutionOutcome::default()) + } + let start_block_number = *range.start(); + + // We are not removing block meta as it is used to get block changesets. + let block_bodies = self.get::(range.clone())?; + + // get transaction receipts + let from_transaction_num = + block_bodies.first().expect("already checked if there are blocks").1.first_tx_num(); + let to_transaction_num = + block_bodies.last().expect("already checked if there are blocks").1.last_tx_num(); + + let storage_range = BlockNumberAddress::range(range.clone()); + + let storage_changeset = self.take::(storage_range)?; + let account_changeset = self.take::(range)?; + + // iterate previous value and get plain state value to create changeset + // Double option around Account represent if Account state is know (first option) and + // account is removed (Second Option) + + let mut state: BundleStateInit = HashMap::new(); + + // This is not working for blocks that are not at tip. as plain state is not the last + // state of end range. We should rename the functions or add support to access + // History state. Accessing history state can be tricky but we are not gaining + // anything. + let mut plain_accounts_cursor = self.tx.cursor_write::()?; + let mut plain_storage_cursor = self.tx.cursor_dup_write::()?; + + let mut reverts: RevertsInit = HashMap::new(); + + // add account changeset changes + for (block_number, account_before) in account_changeset.into_iter().rev() { + let AccountBeforeTx { info: old_info, address } = account_before; + match state.entry(address) { + hash_map::Entry::Vacant(entry) => { + let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1); + entry.insert((old_info, new_info, HashMap::new())); + } + hash_map::Entry::Occupied(mut entry) => { + // overwrite old account state. + entry.get_mut().0 = old_info; + } + } + // insert old info into reverts. + reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info); + } + + // add storage changeset changes + for (block_and_address, old_storage) in storage_changeset.into_iter().rev() { + let BlockNumberAddress((block_number, address)) = block_and_address; + // get account state or insert from plain state. + let account_state = match state.entry(address) { + hash_map::Entry::Vacant(entry) => { + let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1); + entry.insert((present_info, present_info, HashMap::new())) + } + hash_map::Entry::Occupied(entry) => entry.into_mut(), + }; + + // match storage. + match account_state.2.entry(old_storage.key) { + hash_map::Entry::Vacant(entry) => { + let new_storage = plain_storage_cursor + .seek_by_key_subkey(address, old_storage.key)? + .filter(|storage| storage.key == old_storage.key) + .unwrap_or_default(); + entry.insert((old_storage.value, new_storage.value)); + } + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().0 = old_storage.value; + } + }; + + reverts + .entry(block_number) + .or_default() + .entry(address) + .or_default() + .1 + .push(old_storage); + } + + // iterate over local plain state remove all account and all storages. + for (address, (old_account, new_account, storage)) in &state { + // revert account if needed. + if old_account != new_account { + let existing_entry = plain_accounts_cursor.seek_exact(*address)?; + if let Some(account) = old_account { + plain_accounts_cursor.upsert(*address, *account)?; + } else if existing_entry.is_some() { + plain_accounts_cursor.delete_current()?; + } + } + + // revert storages + for (storage_key, (old_storage_value, _new_storage_value)) in storage { + let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value }; + // delete previous value + // TODO: This does not use dupsort features + if plain_storage_cursor + .seek_by_key_subkey(*address, *storage_key)? + .filter(|s| s.key == *storage_key) + .is_some() + { + plain_storage_cursor.delete_current()? } - // revert storages - for (storage_key, (old_storage_value, _new_storage_value)) in storage { - let storage_entry = - StorageEntry { key: *storage_key, value: *old_storage_value }; - // delete previous value - // TODO: This does not use dupsort features - if plain_storage_cursor - .seek_by_key_subkey(*address, *storage_key)? - .filter(|s| s.key == *storage_key) - .is_some() - { - plain_storage_cursor.delete_current()? - } - - // insert value if needed - if *old_storage_value != U256::ZERO { - plain_storage_cursor.upsert(*address, storage_entry)?; - } + // insert value if needed + if *old_storage_value != U256::ZERO { + plain_storage_cursor.upsert(*address, storage_entry)?; } } } // iterate over block body and create ExecutionResult - let mut receipt_iter = self - .get_or_take::(from_transaction_num..=to_transaction_num)? - .into_iter(); + let mut receipt_iter = + self.take::(from_transaction_num..=to_transaction_num)?.into_iter(); let mut receipts = Vec::new(); // loop break if we are at the end of the blocks. @@ -779,22 +905,6 @@ impl DatabaseProvider { Ok(entries) } - /// Return a list of entries from the table, based on the given range. - /// - /// If TAKE is true, opened cursor will delete and return the entries for the given range. - /// Otherwise, they will just be returned. - #[inline] - pub fn get_or_take( - &self, - range: impl RangeBounds, - ) -> Result>, DatabaseError> { - if TAKE { - self.take::(range) - } else { - self.get::(range) - } - } - /// Return a list of entries from the table, based on the given range. #[inline] pub fn get( @@ -820,13 +930,13 @@ impl DatabaseProvider { Ok(items) } - /// Get requested blocks transaction with signer - pub(crate) fn get_take_block_transaction_range( + /// Get requested blocks transaction with senders + pub(crate) fn get_block_transaction_range( &self, range: impl RangeBounds + Clone, ) -> ProviderResult)>> { // Raad range of block bodies to get all transactions id's of this range. - let block_bodies = self.get_or_take::(range)?; + let block_bodies = self.get::(range)?; if block_bodies.is_empty() { return Ok(Vec::new()) @@ -843,14 +953,13 @@ impl DatabaseProvider { // Get transactions and senders let transactions = self - .get_or_take::(first_transaction..=last_transaction)? + .get::(first_transaction..=last_transaction)? .into_iter() .map(|(id, tx)| (id, tx.into())) .collect::>(); - let mut senders = self.get_or_take::( - first_transaction..=last_transaction, - )?; + let mut senders = + self.get::(first_transaction..=last_transaction)?; // Recover senders manually if not found in db // NOTE: Transactions are always guaranteed to be in the database whereas @@ -917,22 +1026,207 @@ impl DatabaseProvider { ); } - if TAKE { - // Remove TransactionHashNumbers - let mut tx_hash_cursor = self.tx.cursor_write::()?; - for (_, tx) in &transactions { - if tx_hash_cursor.seek_exact(tx.hash())?.is_some() { - tx_hash_cursor.delete_current()?; + // Merge transaction into blocks + let mut block_tx = Vec::with_capacity(block_bodies.len()); + let mut senders = senders.into_iter(); + let mut transactions = transactions.into_iter(); + for (block_number, block_body) in block_bodies { + let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize); + for _ in block_body.tx_num_range() { + let tx = transactions.next(); + let sender = senders.next(); + + let recovered = match (tx, sender) { + (Some((tx_id, tx)), Some((sender_tx_id, sender))) => { + if tx_id != sender_tx_id { + Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) + } else { + Ok(TransactionSignedEcRecovered::from_signed_transaction(tx, sender)) + } + } + (Some((tx_id, _)), _) | (_, Some((tx_id, _))) => { + Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) + } + (None, None) => Err(ProviderError::BlockBodyTransactionCount), + }?; + one_block_tx.push(recovered) + } + block_tx.push((block_number, one_block_tx)); + } + + Ok(block_tx) + } + + /// Remove requested block transactions, without returning them. + /// + /// This will remove block data for the given range from the following tables: + /// * [`BlockBodyIndices`](tables::BlockBodyIndices) + /// * [`Transactions`](tables::Transactions) + /// * [`TransactionSenders`](tables::TransactionSenders) + /// * [`TransactionHashNumbers`](tables::TransactionHashNumbers) + /// * [`TransactionBlocks`](tables::TransactionBlocks) + pub fn remove_block_transaction_range( + &self, + range: impl RangeBounds + Clone, + ) -> ProviderResult<()> { + // Raad range of block bodies to get all transactions id's of this range. + let block_bodies = self.take::(range)?; + + if block_bodies.is_empty() { + return Ok(()) + } + + // Compute the first and last tx ID in the range + let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num(); + let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num(); + + // If this is the case then all of the blocks in the range are empty + if last_transaction < first_transaction { + return Ok(()) + } + + // Get transactions so we can then remove + let transactions = self + .take::(first_transaction..=last_transaction)? + .into_iter() + .map(|(id, tx)| (id, tx.into())) + .collect::>(); + + // remove senders + self.remove::(first_transaction..=last_transaction)?; + + // Remove TransactionHashNumbers + let mut tx_hash_cursor = self.tx.cursor_write::()?; + for (_, tx) in &transactions { + if tx_hash_cursor.seek_exact(tx.hash())?.is_some() { + tx_hash_cursor.delete_current()?; + } + } + + // Remove TransactionBlocks index if there are transaction present + if !transactions.is_empty() { + let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0; + self.remove::(tx_id_range)?; + } + + Ok(()) + } + + /// Get requested blocks transaction with senders, also removing them from the database + /// + /// This will remove block data for the given range from the following tables: + /// * [`BlockBodyIndices`](tables::BlockBodyIndices) + /// * [`Transactions`](tables::Transactions) + /// * [`TransactionSenders`](tables::TransactionSenders) + /// * [`TransactionHashNumbers`](tables::TransactionHashNumbers) + /// * [`TransactionBlocks`](tables::TransactionBlocks) + pub fn take_block_transaction_range( + &self, + range: impl RangeBounds + Clone, + ) -> ProviderResult)>> { + // Raad range of block bodies to get all transactions id's of this range. + let block_bodies = self.get::(range)?; + + if block_bodies.is_empty() { + return Ok(Vec::new()) + } + + // Compute the first and last tx ID in the range + let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num(); + let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num(); + + // If this is the case then all of the blocks in the range are empty + if last_transaction < first_transaction { + return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect()) + } + + // Get transactions and senders + let transactions = self + .take::(first_transaction..=last_transaction)? + .into_iter() + .map(|(id, tx)| (id, tx.into())) + .collect::>(); + + let mut senders = + self.take::(first_transaction..=last_transaction)?; + + // Recover senders manually if not found in db + // NOTE: Transactions are always guaranteed to be in the database whereas + // senders might be pruned. + if senders.len() != transactions.len() { + if senders.len() > transactions.len() { + error!(target: "providers::db", senders=%senders.len(), transactions=%transactions.len(), + first_tx=%first_transaction, last_tx=%last_transaction, + "unexpected senders and transactions mismatch"); + } + let missing = transactions.len().saturating_sub(senders.len()); + senders.reserve(missing); + // Find all missing senders, their corresponding tx numbers and indexes to the original + // `senders` vector at which the recovered senders will be inserted. + let mut missing_senders = Vec::with_capacity(missing); + { + let mut senders = senders.iter().peekable(); + + // `transactions` contain all entries. `senders` contain _some_ of the senders for + // these transactions. Both are sorted and indexed by `TxNumber`. + // + // The general idea is to iterate on both `transactions` and `senders`, and advance + // the `senders` iteration only if it matches the current `transactions` entry's + // `TxNumber`. Otherwise, add the transaction to the list of missing senders. + for (i, (tx_number, transaction)) in transactions.iter().enumerate() { + if let Some((sender_tx_number, _)) = senders.peek() { + if sender_tx_number == tx_number { + // If current sender's `TxNumber` matches current transaction's + // `TxNumber`, advance the senders iterator. + senders.next(); + } else { + // If current sender's `TxNumber` doesn't match current transaction's + // `TxNumber`, add it to missing senders. + missing_senders.push((i, tx_number, transaction)); + } + } else { + // If there's no more senders left, but we're still iterating over + // transactions, add them to missing senders + missing_senders.push((i, tx_number, transaction)); + } } } - // Remove TransactionBlocks index if there are transaction present - if !transactions.is_empty() { - let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0; - // NOTE: we are in this branch because `TAKE` is true, so we can use the `remove` - // method - self.remove::(tx_id_range)?; + // Recover senders + let recovered_senders = TransactionSigned::recover_signers( + missing_senders.iter().map(|(_, _, tx)| *tx).collect::>(), + missing_senders.len(), + ) + .ok_or(ProviderError::SenderRecoveryError)?; + + // Insert recovered senders along with tx numbers at the corresponding indexes to the + // original `senders` vector + for ((i, tx_number, _), sender) in missing_senders.into_iter().zip(recovered_senders) { + // Insert will put recovered senders at necessary positions and shift the rest + senders.insert(i, (*tx_number, sender)); } + + // Debug assertions which are triggered during the test to ensure that all senders are + // present and sorted + debug_assert_eq!(senders.len(), transactions.len(), "missing one or more senders"); + debug_assert!( + senders.iter().tuple_windows().all(|(a, b)| a.0 < b.0), + "senders not sorted" + ); + } + + // Remove TransactionHashNumbers + let mut tx_hash_cursor = self.tx.cursor_write::()?; + for (_, tx) in &transactions { + if tx_hash_cursor.seek_exact(tx.hash())?.is_some() { + tx_hash_cursor.delete_current()?; + } + } + + // Remove TransactionBlocks index if there are transaction present + if !transactions.is_empty() { + let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0; + self.remove::(tx_id_range)?; } // Merge transaction into blocks @@ -966,8 +1260,42 @@ impl DatabaseProvider { Ok(block_tx) } - /// Get or unwind the given range of blocks. - pub fn get_take_block_range( + /// Remove the given range of blocks, without returning any of the blocks. + /// + /// This will remove block data for the given range from the following tables: + /// * [`HeaderNumbers`](tables::HeaderNumbers) + /// * [`CanonicalHeaders`](tables::CanonicalHeaders) + /// * [`BlockOmmers`](tables::BlockOmmers) + /// * [`BlockWithdrawals`](tables::BlockWithdrawals) + /// * [`BlockRequests`](tables::BlockRequests) + /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) + /// + /// This will also remove transaction data according to + /// [`remove_block_transaction_range`](Self::remove_block_transaction_range). + pub fn remove_block_range( + &self, + range: impl RangeBounds + Clone, + ) -> ProviderResult<()> { + let block_headers = self.remove::(range.clone())?; + if block_headers == 0 { + return Ok(()) + } + + self.unwind_table_by_walker::( + range.clone(), + )?; + self.remove::(range.clone())?; + self.remove::(range.clone())?; + self.remove::(range.clone())?; + self.remove::(range.clone())?; + self.remove_block_transaction_range(range.clone())?; + self.remove::(range)?; + + Ok(()) + } + + /// Get the given range of blocks. + pub fn get_block_range( &self, range: impl RangeBounds + Clone, ) -> ProviderResult> { @@ -980,33 +1308,128 @@ impl DatabaseProvider { // - Requests // - Signers - let block_headers = self.get_or_take::(range.clone())?; + let block_headers = self.get::(range.clone())?; if block_headers.is_empty() { return Ok(Vec::new()) } - let block_header_hashes = - self.get_or_take::(range.clone())?; - let block_ommers = self.get_or_take::(range.clone())?; - let block_withdrawals = - self.get_or_take::(range.clone())?; - let block_requests = self.get_or_take::(range.clone())?; + let block_header_hashes = self.get::(range.clone())?; + let block_ommers = self.get::(range.clone())?; + let block_withdrawals = self.get::(range.clone())?; + let block_requests = self.get::(range.clone())?; - let block_tx = self.get_take_block_transaction_range::(range.clone())?; + let block_tx = self.get_block_transaction_range(range)?; - if TAKE { - // rm HeaderTerminalDifficulties - // NOTE: we are in this branch because `TAKE` is true, so we can use the `remove` method - self.remove::(range)?; - // rm HeaderNumbers - let mut header_number_cursor = self.tx.cursor_write::()?; - for (_, hash) in &block_header_hashes { - if header_number_cursor.seek_exact(*hash)?.is_some() { - header_number_cursor.delete_current()?; + // merge all into block + let block_header_iter = block_headers.into_iter(); + let block_header_hashes_iter = block_header_hashes.into_iter(); + let block_tx_iter = block_tx.into_iter(); + + // Ommers can be empty for some blocks + let mut block_ommers_iter = block_ommers.into_iter(); + let mut block_withdrawals_iter = block_withdrawals.into_iter(); + let mut block_requests_iter = block_requests.into_iter(); + let mut block_ommers = block_ommers_iter.next(); + let mut block_withdrawals = block_withdrawals_iter.next(); + let mut block_requests = block_requests_iter.next(); + + let mut blocks = Vec::new(); + for ((main_block_number, header), (_, header_hash), (_, tx)) in + izip!(block_header_iter.into_iter(), block_header_hashes_iter, block_tx_iter) + { + let header = header.seal(header_hash); + + let (body, senders) = tx.into_iter().map(|tx| tx.to_components()).unzip(); + + // Ommers can be missing + let mut ommers = Vec::new(); + if let Some((block_number, _)) = block_ommers.as_ref() { + if *block_number == main_block_number { + ommers = block_ommers.take().unwrap().1.ommers; + block_ommers = block_ommers_iter.next(); } + }; + + // withdrawal can be missing + let shanghai_is_active = + self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp); + let mut withdrawals = Some(Withdrawals::default()); + if shanghai_is_active { + if let Some((block_number, _)) = block_withdrawals.as_ref() { + if *block_number == main_block_number { + withdrawals = Some(block_withdrawals.take().unwrap().1.withdrawals); + block_withdrawals = block_withdrawals_iter.next(); + } + } + } else { + withdrawals = None } + + // requests can be missing + let prague_is_active = self.chain_spec.is_prague_active_at_timestamp(header.timestamp); + let mut requests = Some(Requests::default()); + if prague_is_active { + if let Some((block_number, _)) = block_requests.as_ref() { + if *block_number == main_block_number { + requests = Some(block_requests.take().unwrap().1); + block_requests = block_requests_iter.next(); + } + } + } else { + requests = None; + } + + blocks.push(SealedBlockWithSenders { + block: SealedBlock { header, body, ommers, withdrawals, requests }, + senders, + }) } + Ok(blocks) + } + + /// Remove the given range of blocks, and return them. + /// + /// This will remove block data for the given range from the following tables: + /// * [`HeaderNumbers`](tables::HeaderNumbers) + /// * [`CanonicalHeaders`](tables::CanonicalHeaders) + /// * [`BlockOmmers`](tables::BlockOmmers) + /// * [`BlockWithdrawals`](tables::BlockWithdrawals) + /// * [`BlockRequests`](tables::BlockRequests) + /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) + /// + /// This will also remove transaction data according to + /// [`take_block_transaction_range`](Self::take_block_transaction_range). + pub fn take_block_range( + &self, + range: impl RangeBounds + Clone, + ) -> ProviderResult> { + // For blocks we need: + // + // - Headers + // - Bodies (transactions) + // - Uncles/ommers + // - Withdrawals + // - Requests + // - Signers + + let block_headers = self.take::(range.clone())?; + if block_headers.is_empty() { + return Ok(Vec::new()) + } + + self.unwind_table_by_walker::( + range.clone(), + )?; + let block_header_hashes = self.take::(range.clone())?; + let block_ommers = self.take::(range.clone())?; + let block_withdrawals = self.take::(range.clone())?; + let block_requests = self.take::(range.clone())?; + let block_tx = self.take_block_transaction_range(range.clone())?; + + // rm HeaderTerminalDifficulties + self.remove::(range)?; + // merge all into block let block_header_iter = block_headers.into_iter(); let block_header_hashes_iter = block_header_hashes.into_iter(); @@ -2568,95 +2991,102 @@ impl HistoryWriter for DatabaseProvider { } impl BlockExecutionWriter for DatabaseProvider { - /// Return range of blocks and its execution result - fn get_or_take_block_and_execution_range( + fn get_block_and_execution_range( &self, range: RangeInclusive, ) -> ProviderResult { - if TAKE { - let storage_range = BlockNumberAddress::range(range.clone()); + // get blocks + let blocks = self.get_block_range(range.clone())?; - // Unwind account hashes. Add changed accounts to account prefix set. - let hashed_addresses = self.unwind_account_hashing(range.clone())?; - let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len()); - let mut destroyed_accounts = HashSet::default(); - for (hashed_address, account) in hashed_addresses { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - if account.is_none() { - destroyed_accounts.insert(hashed_address); - } + // get execution res + let execution_state = self.get_state(range)?; + + Ok(Chain::new(blocks, execution_state, None)) + } + + fn take_block_and_execution_range( + &self, + range: RangeInclusive, + ) -> ProviderResult { + let storage_range = BlockNumberAddress::range(range.clone()); + + // Unwind account hashes. Add changed accounts to account prefix set. + let hashed_addresses = self.unwind_account_hashing(range.clone())?; + let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len()); + let mut destroyed_accounts = HashSet::default(); + for (hashed_address, account) in hashed_addresses { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + if account.is_none() { + destroyed_accounts.insert(hashed_address); } - - // Unwind account history indices. - self.unwind_account_history_indices(range.clone())?; - - // Unwind storage hashes. Add changed account and storage keys to corresponding prefix - // sets. - let mut storage_prefix_sets = HashMap::::default(); - let storage_entries = self.unwind_storage_hashing(storage_range.clone())?; - for (hashed_address, hashed_slots) in storage_entries { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len()); - for slot in hashed_slots { - storage_prefix_set.insert(Nibbles::unpack(slot)); - } - storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze()); - } - - // Unwind storage history indices. - self.unwind_storage_history_indices(storage_range)?; - - // Calculate the reverted merkle root. - // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets - // are pre-loaded. - let prefix_sets = TriePrefixSets { - account_prefix_set: account_prefix_set.freeze(), - storage_prefix_sets, - destroyed_accounts, - }; - let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx) - .with_prefix_sets(prefix_sets) - .root_with_updates() - .map_err(Into::::into)?; - - let parent_number = range.start().saturating_sub(1); - let parent_state_root = self - .header_by_number(parent_number)? - .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))? - .state_root; - - // state root should be always correct as we are reverting state. - // but for sake of double verification we will check it again. - if new_state_root != parent_state_root { - let parent_hash = self - .block_hash(parent_number)? - .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?; - return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch { - root: GotExpected { got: new_state_root, expected: parent_state_root }, - block_number: parent_number, - block_hash: parent_hash, - }))) - } - trie_updates.write_to_database(&self.tx)?; } + // Unwind account history indices. + self.unwind_account_history_indices(range.clone())?; + + // Unwind storage hashes. Add changed account and storage keys to corresponding prefix + // sets. + let mut storage_prefix_sets = HashMap::::default(); + let storage_entries = self.unwind_storage_hashing(storage_range.clone())?; + for (hashed_address, hashed_slots) in storage_entries { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len()); + for slot in hashed_slots { + storage_prefix_set.insert(Nibbles::unpack(slot)); + } + storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze()); + } + + // Unwind storage history indices. + self.unwind_storage_history_indices(storage_range)?; + + // Calculate the reverted merkle root. + // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets + // are pre-loaded. + let prefix_sets = TriePrefixSets { + account_prefix_set: account_prefix_set.freeze(), + storage_prefix_sets, + destroyed_accounts, + }; + let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx) + .with_prefix_sets(prefix_sets) + .root_with_updates() + .map_err(Into::::into)?; + + let parent_number = range.start().saturating_sub(1); + let parent_state_root = self + .header_by_number(parent_number)? + .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))? + .state_root; + + // state root should be always correct as we are reverting state. + // but for sake of double verification we will check it again. + if new_state_root != parent_state_root { + let parent_hash = self + .block_hash(parent_number)? + .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?; + return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch { + root: GotExpected { got: new_state_root, expected: parent_state_root }, + block_number: parent_number, + block_hash: parent_hash, + }))) + } + trie_updates.write_to_database(&self.tx)?; + // get blocks - let blocks = self.get_take_block_range::(range.clone())?; + let blocks = self.take_block_range(range.clone())?; let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1)); + // get execution res - let execution_state = self.unwind_or_peek_state::(range.clone())?; + let execution_state = self.take_state(range.clone())?; // remove block bodies it is needed for both get block range and get block execution results // that is why it is deleted afterwards. - if TAKE { - // rm block bodies - // NOTE: we are in this branch because `TAKE` is true, so we can use the `remove` method - self.remove::(range)?; + self.remove::(range)?; - // Update pipeline progress - if let Some(fork_number) = unwind_to { - self.update_pipeline_stages(fork_number, true)?; - } + // Update pipeline progress + if let Some(fork_number) = unwind_to { + self.update_pipeline_stages(fork_number, true)?; } Ok(Chain::new(blocks, execution_state, None)) diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index 3d0cf3c0cb..c7b329ae0b 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -13,22 +13,12 @@ pub trait BlockExecutionWriter: BlockWriter + BlockReader + Send + Sync { fn get_block_and_execution_range( &self, range: RangeInclusive, - ) -> ProviderResult { - self.get_or_take_block_and_execution_range::(range) - } + ) -> ProviderResult; /// Take range of blocks and its execution result fn take_block_and_execution_range( &self, range: RangeInclusive, - ) -> ProviderResult { - self.get_or_take_block_and_execution_range::(range) - } - - /// Return range of blocks and its execution result - fn get_or_take_block_and_execution_range( - &self, - range: RangeInclusive, ) -> ProviderResult; }