diff --git a/crates/engine/tree/src/database.rs b/crates/engine/tree/src/database.rs index e8670c92b0..e9b62111ab 100644 --- a/crates/engine/tree/src/database.rs +++ b/crates/engine/tree/src/database.rs @@ -8,8 +8,8 @@ use reth_db::database::Database; use reth_errors::ProviderResult; use reth_primitives::B256; use reth_provider::{ - bundle_state::HashedStateChanges, BlockExecutionWriter, BlockNumReader, BlockWriter, - HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter, + writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter, + OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter, }; use reth_prune::{Pruner, PrunerOutput}; use reth_stages_types::{StageCheckpoint, StageId}; @@ -82,14 +82,16 @@ impl DatabaseService { // Write state and changesets to the database. // Must be written after blocks because of the receipt lookup. let execution_outcome = block.execution_outcome().clone(); + // TODO: use single storage writer in task when sf / db tasks are combined execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?; // insert hashes and intermediate merkle nodes { let trie_updates = block.trie_updates().clone(); let hashed_state = block.hashed_state(); - HashedStateChanges(&hashed_state.clone().into_sorted()) - .write_to_db(&provider_rw)?; + // TODO: use single storage writer in task when sf / db tasks are combined + let storage_writer = StorageWriter::new(Some(&provider_rw), None); + storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?; trie_updates.write_to_database(provider_rw.tx_ref())?; } diff --git a/crates/storage/provider/src/bundle_state/hashed_state_changes.rs b/crates/storage/provider/src/bundle_state/hashed_state_changes.rs deleted file mode 100644 index 42111255c9..0000000000 --- a/crates/storage/provider/src/bundle_state/hashed_state_changes.rs +++ /dev/null @@ -1,123 +0,0 @@ -use crate::DatabaseProviderRW; -use itertools::Itertools; -use reth_db::{tables, Database}; -use reth_db_api::{ - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, - transaction::DbTxMut, - DatabaseError, -}; -use reth_primitives::{StorageEntry, U256}; -use reth_trie::HashedPostStateSorted; - -/// Changes to the hashed state. -#[derive(Debug)] -pub struct HashedStateChanges<'a>(pub &'a HashedPostStateSorted); - -impl HashedStateChanges<'_> { - /// Write the bundle state to the database. - pub fn write_to_db(self, provider: &DatabaseProviderRW) -> Result<(), DatabaseError> - where - DB: Database, - { - // Write hashed account updates. - let mut hashed_accounts_cursor = - provider.tx_ref().cursor_write::()?; - for (hashed_address, account) in self.0.accounts().accounts_sorted() { - if let Some(account) = account { - hashed_accounts_cursor.upsert(hashed_address, account)?; - } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() { - hashed_accounts_cursor.delete_current()?; - } - } - - // Write hashed storage changes. - let sorted_storages = self.0.account_storages().iter().sorted_by_key(|(key, _)| *key); - let mut hashed_storage_cursor = - provider.tx_ref().cursor_dup_write::()?; - for (hashed_address, storage) in sorted_storages { - if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() { - hashed_storage_cursor.delete_current_duplicates()?; - } - - for (hashed_slot, value) in storage.storage_slots_sorted() { - let entry = StorageEntry { key: hashed_slot, value }; - if let Some(db_entry) = - hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? - { - if db_entry.key == entry.key { - hashed_storage_cursor.delete_current()?; - } - } - - if entry.value != U256::ZERO { - hashed_storage_cursor.upsert(*hashed_address, entry)?; - } - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_utils::create_test_provider_factory; - use reth_db_api::transaction::DbTx; - use reth_primitives::{keccak256, Account, Address, B256}; - use reth_trie::{HashedPostState, HashedStorage}; - - #[test] - fn wiped_entries_are_removed() { - let provider_factory = create_test_provider_factory(); - - let addresses = (0..10).map(|_| Address::random()).collect::>(); - let destroyed_address = *addresses.first().unwrap(); - let destroyed_address_hashed = keccak256(destroyed_address); - let slot = B256::with_last_byte(1); - let hashed_slot = keccak256(slot); - { - let provider_rw = provider_factory.provider_rw().unwrap(); - let mut accounts_cursor = - provider_rw.tx_ref().cursor_write::().unwrap(); - let mut storage_cursor = - provider_rw.tx_ref().cursor_write::().unwrap(); - - for address in addresses { - let hashed_address = keccak256(address); - accounts_cursor - .insert(hashed_address, Account { nonce: 1, ..Default::default() }) - .unwrap(); - storage_cursor - .insert(hashed_address, StorageEntry { key: hashed_slot, value: U256::from(1) }) - .unwrap(); - } - provider_rw.commit().unwrap(); - } - - let mut hashed_state = HashedPostState::default(); - hashed_state.accounts.insert(destroyed_address_hashed, None); - hashed_state.storages.insert(destroyed_address_hashed, HashedStorage::new(true)); - - let provider_rw = provider_factory.provider_rw().unwrap(); - assert_eq!( - HashedStateChanges(&hashed_state.into_sorted()).write_to_db(&provider_rw), - Ok(()) - ); - provider_rw.commit().unwrap(); - - let provider = provider_factory.provider().unwrap(); - assert_eq!( - provider.tx_ref().get::(destroyed_address_hashed), - Ok(None) - ); - assert_eq!( - provider - .tx_ref() - .cursor_read::() - .unwrap() - .seek_by_key_subkey(destroyed_address_hashed, hashed_slot), - Ok(None) - ); - } -} diff --git a/crates/storage/provider/src/bundle_state/mod.rs b/crates/storage/provider/src/bundle_state/mod.rs index eaf3dab43e..3dad9389f6 100644 --- a/crates/storage/provider/src/bundle_state/mod.rs +++ b/crates/storage/provider/src/bundle_state/mod.rs @@ -2,11 +2,9 @@ //! This module contains all the logic related to bundle state. mod execution_outcome; -mod hashed_state_changes; mod state_changes; mod state_reverts; pub use execution_outcome::{AccountRevertInit, BundleStateInit, OriginalValuesKnown, RevertsInit}; -pub use hashed_state_changes::HashedStateChanges; pub use state_changes::StateChanges; pub use state_reverts::{StateReverts, StorageRevertsIter}; diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 4c773bdf70..8024601392 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1,10 +1,11 @@ use crate::{ - bundle_state::{BundleStateInit, HashedStateChanges, RevertsInit}, + bundle_state::{BundleStateInit, RevertsInit}, providers::{database::metrics, static_file::StaticFileWriter, StaticFileProvider}, to_range, traits::{ AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter, }, + writer::StorageWriter, AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider, @@ -3307,7 +3308,8 @@ impl BlockWriter for DatabaseProviderRW { // insert hashes and intermediate merkle nodes { - HashedStateChanges(&hashed_state).write_to_db(self)?; + let storage_writer = StorageWriter::new(Some(self), None); + storage_writer.write_hashed_state(&hashed_state)?; trie_updates.write_to_database(&self.tx)?; } durations_recorder.record_relative(metrics::Action::InsertHashes); diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 9bd8b2a192..f5a3554d3b 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -1,14 +1,16 @@ use crate::{providers::StaticFileProviderRWRefMut, DatabaseProviderRW}; +use itertools::Itertools; use reth_db::{ - cursor::DbCursorRO, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, tables, transaction::{DbTx, DbTxMut}, Database, }; use reth_errors::{ProviderError, ProviderResult}; -use reth_primitives::BlockNumber; +use reth_primitives::{BlockNumber, StorageEntry, U256}; use reth_storage_api::ReceiptWriter; use reth_storage_errors::writer::StorageWriterError; +use reth_trie::HashedPostStateSorted; use static_file::StaticFileWriter; mod database; @@ -93,6 +95,49 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> { Ok(()) } + /// Writes the hashed state changes to the database + pub fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> { + self.ensure_database_writer()?; + + // Write hashed account updates. + let mut hashed_accounts_cursor = + self.database_writer().tx_ref().cursor_write::()?; + for (hashed_address, account) in hashed_state.accounts().accounts_sorted() { + if let Some(account) = account { + hashed_accounts_cursor.upsert(hashed_address, account)?; + } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() { + hashed_accounts_cursor.delete_current()?; + } + } + + // Write hashed storage changes. + let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key); + let mut hashed_storage_cursor = + self.database_writer().tx_ref().cursor_dup_write::()?; + for (hashed_address, storage) in sorted_storages { + if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() { + hashed_storage_cursor.delete_current_duplicates()?; + } + + for (hashed_slot, value) in storage.storage_slots_sorted() { + let entry = StorageEntry { key: hashed_slot, value }; + if let Some(db_entry) = + hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? + { + if db_entry.key == entry.key { + hashed_storage_cursor.delete_current()?; + } + } + + if entry.value != U256::ZERO { + hashed_storage_cursor.upsert(*hashed_address, entry)?; + } + } + } + + Ok(()) + } + /// Appends receipts block by block. /// /// ATTENTION: If called from [`StorageWriter`] without a static file producer, it will always @@ -155,3 +200,64 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::create_test_provider_factory; + use reth_db_api::transaction::DbTx; + use reth_primitives::{keccak256, Account, Address, B256}; + use reth_trie::{HashedPostState, HashedStorage}; + + #[test] + fn wiped_entries_are_removed() { + let provider_factory = create_test_provider_factory(); + + let addresses = (0..10).map(|_| Address::random()).collect::>(); + let destroyed_address = *addresses.first().unwrap(); + let destroyed_address_hashed = keccak256(destroyed_address); + let slot = B256::with_last_byte(1); + let hashed_slot = keccak256(slot); + { + let provider_rw = provider_factory.provider_rw().unwrap(); + let mut accounts_cursor = + provider_rw.tx_ref().cursor_write::().unwrap(); + let mut storage_cursor = + provider_rw.tx_ref().cursor_write::().unwrap(); + + for address in addresses { + let hashed_address = keccak256(address); + accounts_cursor + .insert(hashed_address, Account { nonce: 1, ..Default::default() }) + .unwrap(); + storage_cursor + .insert(hashed_address, StorageEntry { key: hashed_slot, value: U256::from(1) }) + .unwrap(); + } + provider_rw.commit().unwrap(); + } + + let mut hashed_state = HashedPostState::default(); + hashed_state.accounts.insert(destroyed_address_hashed, None); + hashed_state.storages.insert(destroyed_address_hashed, HashedStorage::new(true)); + + let provider_rw = provider_factory.provider_rw().unwrap(); + let storage_writer = StorageWriter::new(Some(&provider_rw), None); + assert_eq!(storage_writer.write_hashed_state(&hashed_state.into_sorted()), Ok(())); + provider_rw.commit().unwrap(); + + let provider = provider_factory.provider().unwrap(); + assert_eq!( + provider.tx_ref().get::(destroyed_address_hashed), + Ok(None) + ); + assert_eq!( + provider + .tx_ref() + .cursor_read::() + .unwrap() + .seek_by_key_subkey(destroyed_address_hashed, hashed_slot), + Ok(None) + ); + } +} diff --git a/crates/trie/parallel/benches/root.rs b/crates/trie/parallel/benches/root.rs index c060428ac7..66d0593da1 100644 --- a/crates/trie/parallel/benches/root.rs +++ b/crates/trie/parallel/benches/root.rs @@ -5,8 +5,7 @@ use proptest_arbitrary_interop::arb; use rayon::ThreadPoolBuilder; use reth_primitives::{Account, B256, U256}; use reth_provider::{ - bundle_state::HashedStateChanges, providers::ConsistentDbView, - test_utils::create_test_provider_factory, + providers::ConsistentDbView, test_utils::create_test_provider_factory, writer::StorageWriter, }; use reth_tasks::pool::BlockingTaskPool; use reth_trie::{ @@ -27,7 +26,8 @@ pub fn calculate_state_root(c: &mut Criterion) { let provider_factory = create_test_provider_factory(); { let provider_rw = provider_factory.provider_rw().unwrap(); - HashedStateChanges(&db_state.into_sorted()).write_to_db(&provider_rw).unwrap(); + let storage_writer = StorageWriter::new(Some(&provider_rw), None); + storage_writer.write_hashed_state(&db_state.into_sorted()).unwrap(); let (_, updates) = StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap(); updates.write_to_database(provider_rw.tx_ref()).unwrap();