From 66848d98ef33b9bcbf18808afafbb8e82275af41 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 16 Sep 2024 14:49:24 +0100 Subject: [PATCH] feat: use `DbProvider` on `UnifiedStorageWriter` (#10933) --- crates/stages/stages/src/stages/execution.rs | 2 +- crates/storage/db-common/src/init.rs | 2 +- .../src/providers/database/provider.rs | 26 +++++- crates/storage/provider/src/writer/mod.rs | 84 ++++++++++++------- .../storage-api/src/database_provider.rs | 4 + 5 files changed, 83 insertions(+), 35 deletions(-) diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index d09343d00c..dddb892f1c 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -354,7 +354,7 @@ where let time = Instant::now(); // write output - let mut writer = UnifiedStorageWriter::new(provider, static_file_producer); + let mut writer = UnifiedStorageWriter::new(&provider, static_file_producer); writer.write_to_storage(state, OriginalValuesKnown::Yes)?; let db_write_duration = time.elapsed(); diff --git a/crates/storage/db-common/src/init.rs b/crates/storage/db-common/src/init.rs index d74f36f53e..2fa2026b84 100644 --- a/crates/storage/db-common/src/init.rs +++ b/crates/storage/db-common/src/init.rs @@ -208,7 +208,7 @@ pub fn insert_state<'a, 'b, DB: Database>( Vec::new(), ); - let mut storage_writer = UnifiedStorageWriter::from_database(provider); + let mut storage_writer = UnifiedStorageWriter::from_database(&provider); storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?; trace!(target: "reth::cli", "Inserted state"); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 91d27ac707..b7bc0cd809 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -92,6 +92,12 @@ impl DerefMut for DatabaseProviderRW { } } +impl AsRef::TXMut>> for DatabaseProviderRW { + fn as_ref(&self) -> &DatabaseProvider<::TXMut> { + &self.0 + } +} + impl DatabaseProviderRW { /// Commit database transaction and static file if it exists. pub fn commit(self) -> ProviderResult { @@ -104,6 +110,12 @@ impl DatabaseProviderRW { } } +impl From> for DatabaseProvider<::TXMut> { + fn from(provider: DatabaseProviderRW) -> Self { + provider.0 + } +} + /// A provider struct that fetches data from the database. /// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`] #[derive(Debug)] @@ -142,6 +154,12 @@ impl DatabaseProvider { } } +impl AsRef for DatabaseProvider { + fn as_ref(&self) -> &Self { + self + } +} + impl TryIntoHistoricalStateProvider for DatabaseProvider { fn try_into_history_at_block( self, @@ -3120,7 +3138,7 @@ impl StateReader for DatabaseProvider { } } -impl BlockExecutionWriter for DatabaseProvider { +impl BlockExecutionWriter for DatabaseProvider { fn take_block_and_execution_range( &self, range: RangeInclusive, @@ -3298,7 +3316,7 @@ impl BlockExecutionWriter for DatabaseProvider { } } -impl BlockWriter for DatabaseProvider { +impl BlockWriter for DatabaseProvider { /// Inserts the block into the database, always modifying the following tables: /// * [`CanonicalHeaders`](tables::CanonicalHeaders) /// * [`Headers`](tables::Headers) @@ -3591,6 +3609,10 @@ impl DBProvider for DatabaseProvider { fn into_tx(self) -> Self::Tx { self.tx } + + fn prune_modes_ref(&self) -> &PruneModes { + self.prune_modes_ref() + } } /// Helper method to recover senders for any blocks in the db which do not have senders. This diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 7e02b1f7ba..1af5423827 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -1,8 +1,7 @@ use crate::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter}, writer::static_file::StaticFileWriter, - BlockExecutionWriter, BlockWriter, DatabaseProvider, DatabaseProviderRW, HistoryWriter, - StateChangeWriter, StateWriter, TrieWriter, + BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, TrieWriter, }; use reth_chain_state::ExecutedBlock; use reth_db::{ @@ -10,7 +9,6 @@ use reth_db::{ models::CompactU256, tables, transaction::{DbTx, DbTxMut}, - Database, }; use reth_errors::{ProviderError, ProviderResult}; use reth_execution_types::ExecutionOutcome; @@ -19,7 +17,7 @@ use reth_primitives::{ }; use reth_stages_types::{StageCheckpoint, StageId}; use reth_storage_api::{ - BlockNumReader, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt, + DBProvider, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt, }; use reth_storage_errors::writer::UnifiedStorageWriterError; use revm::db::OriginalValuesKnown; @@ -38,29 +36,38 @@ enum StorageType { /// [`UnifiedStorageWriter`] is responsible for managing the writing to storage with both database /// and static file providers. #[derive(Debug)] -pub struct UnifiedStorageWriter<'a, TX, SF> { - database: &'a DatabaseProvider, - static_file: Option, +pub struct UnifiedStorageWriter<'a, ProviderDB, ProviderSF> { + database: &'a ProviderDB, + static_file: Option, } -impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> { +impl<'a, ProviderDB, ProviderSF> UnifiedStorageWriter<'a, ProviderDB, ProviderSF> { /// Creates a new instance of [`UnifiedStorageWriter`]. /// /// # Parameters /// - `database`: An optional reference to a database provider. /// - `static_file`: An optional mutable reference to a static file instance. - pub const fn new(database: &'a DatabaseProvider, static_file: Option) -> Self { - Self { database, static_file } + pub fn new

(database: &'a P, static_file: Option) -> Self + where + P: AsRef, + { + Self { database: database.as_ref(), static_file } } /// Creates a new instance of [`UnifiedStorageWriter`] from a database provider and a static /// file instance. - pub const fn from(database: &'a DatabaseProvider, static_file: SF) -> Self { + pub fn from

(database: &'a P, static_file: ProviderSF) -> Self + where + P: AsRef, + { Self::new(database, Some(static_file)) } /// Creates a new instance of [`UnifiedStorageWriter`] from a database provider. - pub const fn from_database(database: &'a DatabaseProvider) -> Self { + pub fn from_database

(database: &'a P) -> Self + where + P: AsRef, + { Self::new(database, None) } @@ -68,7 +75,7 @@ impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> { /// /// # Panics /// If the database provider is not set. - const fn database(&self) -> &DatabaseProvider { + const fn database(&self) -> &ProviderDB { self.database } @@ -76,7 +83,7 @@ impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> { /// /// # Panics /// If the static file instance is not set. - fn static_file(&self) -> &SF { + fn static_file(&self) -> &ProviderSF { self.static_file.as_ref().expect("should exist") } @@ -84,7 +91,7 @@ impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> { /// /// # Panics /// If the static file instance is not set. - fn static_file_mut(&mut self) -> &mut SF { + fn static_file_mut(&mut self) -> &mut ProviderSF { self.static_file.as_mut().expect("should exist") } @@ -111,12 +118,15 @@ impl UnifiedStorageWriter<'_, (), ()> { /// start-up. /// /// NOTE: If unwinding data from storage, use `commit_unwind` instead! - pub fn commit( - database: DatabaseProviderRW, + pub fn commit

( + database: impl Into

+ AsRef

, static_file: StaticFileProvider, - ) -> ProviderResult<()> { + ) -> ProviderResult<()> + where + P: DBProvider, + { static_file.commit()?; - database.commit()?; + database.into().into_tx().commit()?; Ok(()) } @@ -128,19 +138,30 @@ impl UnifiedStorageWriter<'_, (), ()> { /// checkpoints on the next start-up. /// /// NOTE: Should only be used after unwinding data from storage! - pub fn commit_unwind( - database: DatabaseProviderRW, + pub fn commit_unwind

( + database: impl Into

+ AsRef

, static_file: StaticFileProvider, - ) -> ProviderResult<()> { - database.commit()?; + ) -> ProviderResult<()> + where + P: DBProvider, + { + database.into().into_tx().commit()?; static_file.commit()?; Ok(()) } } -impl<'a, 'b, TX> UnifiedStorageWriter<'a, TX, &'b StaticFileProvider> +impl<'a, 'b, ProviderDB> UnifiedStorageWriter<'a, ProviderDB, &'b StaticFileProvider> where - TX: DbTxMut + DbTx, + ProviderDB: DBProvider + + BlockWriter + + TransactionsProviderExt + + StateChangeWriter + + TrieWriter + + HistoryWriter + + StageCheckpointWriter + + BlockExecutionWriter + + AsRef, { /// Writes executed blocks and receipts to storage. pub fn save_blocks(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> { @@ -296,9 +317,9 @@ where } } -impl<'a, 'b, TX> UnifiedStorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>> +impl<'a, 'b, ProviderDB> UnifiedStorageWriter<'a, ProviderDB, StaticFileProviderRWRefMut<'b>> where - TX: DbTx, + ProviderDB: DBProvider + HeaderProvider, { /// Ensures that the static file writer is set and of the right [`StaticFileSegment`] variant. /// @@ -407,9 +428,9 @@ where } } -impl<'a, 'b, TX> UnifiedStorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>> +impl<'a, 'b, ProviderDB> UnifiedStorageWriter<'a, ProviderDB, StaticFileProviderRWRefMut<'b>> where - TX: DbTxMut + DbTx, + ProviderDB: DBProvider + HeaderProvider, { /// Appends receipts block by block. /// @@ -488,9 +509,10 @@ where } } -impl<'a, 'b, TX> StateWriter for UnifiedStorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>> +impl<'a, 'b, ProviderDB> StateWriter + for UnifiedStorageWriter<'a, ProviderDB, StaticFileProviderRWRefMut<'b>> where - TX: DbTxMut + DbTx, + ProviderDB: DBProvider + StateChangeWriter + HeaderProvider, { /// Write the data and receipts to the database or static files if `static_file_producer` is /// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts. diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs index b3e233b0f3..6a463ed01e 100644 --- a/crates/storage/storage-api/src/database_provider.rs +++ b/crates/storage/storage-api/src/database_provider.rs @@ -1,4 +1,5 @@ use reth_db_api::{database::Database, transaction::DbTx}; +use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderResult; /// Database provider. @@ -30,6 +31,9 @@ pub trait DBProvider: Send + Sync + Sized + 'static { fn commit(self) -> ProviderResult { Ok(self.into_tx().commit()?) } + + /// Returns a reference to prune modes. + fn prune_modes_ref(&self) -> &PruneModes; } /// Database provider factory.