From 112b2332e33c074018080041337d066be60cd296 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 11 Jul 2024 20:56:09 +0200 Subject: [PATCH] chore: use `DatabaseProviderRW` instead of `TX` on `stages` (#9451) --- crates/stages/stages/src/stages/headers.rs | 7 +++--- .../src/stages/index_account_history.rs | 4 ++-- .../src/stages/index_storage_history.rs | 4 ++-- .../stages/src/stages/sender_recovery.rs | 22 +++++++++---------- crates/stages/stages/src/stages/utils.rs | 21 +++++++++--------- 5 files changed, 30 insertions(+), 28 deletions(-) diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 1065869915..4b23d66179 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -90,7 +90,7 @@ where /// database table. fn write_headers( &mut self, - tx: &::TXMut, + provider: &DatabaseProviderRW, static_file_provider: StaticFileProvider, ) -> Result { let total_headers = self.header_collector.len(); @@ -143,7 +143,8 @@ where info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index"); - let mut cursor_header_numbers = tx.cursor_write::>()?; + let mut cursor_header_numbers = + provider.tx_ref().cursor_write::>()?; let mut first_sync = false; // If we only have the genesis block hash, then we are at first sync, and we can remove it, @@ -281,7 +282,7 @@ where // Write the headers and related tables to DB from ETL space let to_be_processed = self.hash_collector.len() as u64; let last_header_number = - self.write_headers::(provider.tx_ref(), provider.static_file_provider().clone())?; + self.write_headers(provider, provider.static_file_provider().clone())?; // Clear ETL collectors self.hash_collector.clear(); diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 7f83185277..ba8aac3e60 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -103,7 +103,7 @@ impl Stage for IndexAccountHistoryStage { info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices"); let collector = collect_history_indices::<_, tables::AccountChangeSets, tables::AccountsHistory, _>( - provider.tx_ref(), + provider, range.clone(), ShardedKey::new, |(index, value)| (index, value.address), @@ -112,7 +112,7 @@ impl Stage for IndexAccountHistoryStage { info!(target: "sync::stages::index_account_history::exec", "Loading indices into database"); load_history_indices::<_, tables::AccountsHistory, _>( - provider.tx_ref(), + provider, collector, first_sync, ShardedKey::new, diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 23332f2d6e..8f3829e2b8 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -106,7 +106,7 @@ impl Stage for IndexStorageHistoryStage { info!(target: "sync::stages::index_storage_history::exec", ?first_sync, "Collecting indices"); let collector = collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>( - provider.tx_ref(), + provider, BlockNumberAddress::range(range.clone()), |AddressStorageKey((address, storage_key)), highest_block_number| { StorageShardedKey::new(address, storage_key, highest_block_number) @@ -117,7 +117,7 @@ impl Stage for IndexStorageHistoryStage { info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); load_history_indices::<_, tables::StoragesHistory, _>( - provider.tx_ref(), + provider, collector, first_sync, |AddressStorageKey((address, storage_key)), highest_block_number| { diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index 41df7795c3..cfb6538d81 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -84,10 +84,8 @@ impl Stage for SenderRecoveryStage { }) } - let tx = provider.tx_ref(); - // Acquire the cursor for inserting elements - let mut senders_cursor = tx.cursor_write::()?; + let mut senders_cursor = provider.tx_ref().cursor_write::()?; info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders"); @@ -98,7 +96,7 @@ impl Stage for SenderRecoveryStage { .collect::>>(); for range in batch { - recover_range(range, provider, tx, &mut senders_cursor)?; + recover_range(range, provider, &mut senders_cursor)?; } Ok(ExecOutput { @@ -130,14 +128,15 @@ impl Stage for SenderRecoveryStage { } } -fn recover_range( +fn recover_range( tx_range: Range, provider: &DatabaseProviderRW, - tx: &::TXMut, - senders_cursor: &mut <::TXMut as DbTxMut>::CursorMut< - tables::TransactionSenders, - >, -) -> Result<(), StageError> { + senders_cursor: &mut CURSOR, +) -> Result<(), StageError> +where + DB: Database, + CURSOR: DbCursorRW, +{ debug!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders batch"); // Preallocate channels @@ -193,7 +192,8 @@ fn recover_range( return match *error { SenderRecoveryStageError::FailedRecovery(err) => { // get the block number for the bad transaction - let block_number = tx + let block_number = provider + .tx_ref() .get::(err.tx)? .ok_or(ProviderError::BlockNumberForTransactionIndexNotFound)?; diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 6665d2278c..3b623c358e 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -1,6 +1,6 @@ //! Utils for `stages`. use reth_config::config::EtlConfig; -use reth_db::BlockNumberList; +use reth_db::{BlockNumberList, Database}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, models::sharded_key::NUM_OF_INDICES_IN_SHARD, @@ -10,6 +10,7 @@ use reth_db_api::{ }; use reth_etl::Collector; use reth_primitives::BlockNumber; +use reth_provider::DatabaseProviderRW; use reth_stages_api::StageError; use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; @@ -34,20 +35,20 @@ const DEFAULT_CACHE_THRESHOLD: u64 = 100_000; /// /// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and /// `(Address1.300, [100,300])`. The entries may be stored across one or more files. -pub(crate) fn collect_history_indices( - tx: &TX, +pub(crate) fn collect_history_indices( + provider: &DatabaseProviderRW, range: impl RangeBounds, sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key, partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P), etl_config: &EtlConfig, ) -> Result, StageError> where - TX: DbTxMut + DbTx, + DB: Database, CS: Table, H: Table, P: Copy + Eq + Hash, { - let mut changeset_cursor = tx.cursor_read::()?; + let mut changeset_cursor = provider.tx_ref().cursor_read::()?; let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone()); let mut cache: HashMap> = HashMap::new(); @@ -64,7 +65,7 @@ where }; // observability - let total_changesets = tx.entries::()?; + let total_changesets = provider.tx_ref().entries::()?; let interval = (total_changesets / 1000).max(1); let mut flush_counter = 0; @@ -101,8 +102,8 @@ where /// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length /// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial /// key shard is stored. -pub(crate) fn load_history_indices( - tx: &TX, +pub(crate) fn load_history_indices( + provider: &DatabaseProviderRW, mut collector: Collector, append_only: bool, sharded_key_factory: impl Clone + Fn(P, u64) -> ::Key, @@ -110,11 +111,11 @@ pub(crate) fn load_history_indices( get_partial: impl Fn(::Key) -> P, ) -> Result<(), StageError> where - TX: DbTxMut + DbTx, + DB: Database, H: Table, P: Copy + Default + Eq, { - let mut write_cursor = tx.cursor_write::()?; + let mut write_cursor = provider.tx_ref().cursor_write::()?; let mut current_partial = P::default(); let mut current_list = Vec::::new();