diff --git a/Cargo.lock b/Cargo.lock index 928af39457..0b2e0b56bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10583,6 +10583,7 @@ dependencies = [ "reth-stages-api", "reth-static-file", "reth-static-file-types", + "reth-storage-api", "reth-storage-errors", "reth-testing-utils", "reth-trie", diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index 32114c58e1..462a6d74c7 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -75,6 +75,7 @@ reth-network-p2p = { workspace = true, features = ["test-utils"] } reth-downloaders.workspace = true reth-static-file.workspace = true reth-stages-api = { workspace = true, features = ["test-utils"] } +reth-storage-api.workspace = true reth-testing-utils.workspace = true reth-trie = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } @@ -116,6 +117,7 @@ test-utils = [ "reth-ethereum-primitives?/test-utils", "reth-evm-ethereum/test-utils", ] +rocksdb = ["reth-provider/rocksdb"] [[bench]] name = "criterion" diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index daf63828b0..087a040f79 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -3,17 +3,16 @@ use alloy_primitives::{TxHash, TxNumber}; use num_traits::Zero; use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db_api::{ - cursor::{DbCursorRO, DbCursorRW}, - table::Value, + table::{Decode, Decompress, Value}, tables, transaction::DbTxMut, - RawKey, RawValue, }; use reth_etl::Collector; use reth_primitives_traits::{NodePrimitives, SignedTransaction}; use reth_provider::{ - BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter, - StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt, + BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter, + RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache, + TransactionsProvider, TransactionsProviderExt, }; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ @@ -65,7 +64,9 @@ where + PruneCheckpointReader + StatsReader + StaticFileProviderFactory> - + TransactionsProviderExt, + + TransactionsProviderExt + + StorageSettingsCache + + RocksDBProviderFactory, { /// Return the id of the stage fn id(&self) -> StageId { @@ -150,16 +151,27 @@ where ); if range_output.is_final_range { - let append_only = - provider.count_entries::()?.is_zero(); - let mut txhash_cursor = provider - .tx_ref() - .cursor_write::>()?; - let total_hashes = hash_collector.len(); let interval = (total_hashes / 10).max(1); + + // Use append mode when table is empty (first sync) - significantly faster + let append_only = + provider.count_entries::()?.is_zero(); + + // Create RocksDB batch if feature is enabled + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb = provider.rocksdb_provider(); + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb_batch = rocksdb.batch(); + #[cfg(not(all(unix, feature = "rocksdb")))] + let rocksdb_batch = (); + + // Create writer that routes to either MDBX or RocksDB based on settings + let mut writer = + EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; + for (index, hash_to_number) in hash_collector.iter()?.enumerate() { - let (hash, number) = hash_to_number?; + let (hash_bytes, number_bytes) = hash_to_number?; if index > 0 && index.is_multiple_of(interval) { info!( target: "sync::stages::transaction_lookup", @@ -169,12 +181,16 @@ where ); } - let key = RawKey::::from_vec(hash); - if append_only { - txhash_cursor.append(key, &RawValue::::from_vec(number))? - } else { - txhash_cursor.insert(key, &RawValue::::from_vec(number))? - } + // Decode from raw ETL bytes + let hash = TxHash::decode(&hash_bytes)?; + let tx_num = TxNumber::decompress(&number_bytes)?; + writer.put_transaction_hash_number(hash, tx_num, append_only)?; + } + + // Extract and register RocksDB batch for commit at provider level + #[cfg(all(unix, feature = "rocksdb"))] + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); } trace!(target: "sync::stages::transaction_lookup", @@ -199,11 +215,19 @@ where provider: &Provider, input: UnwindInput, ) -> Result { - let tx = provider.tx_ref(); let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); - // Cursor to unwind tx hash to number - let mut tx_hash_number_cursor = tx.cursor_write::()?; + // Create RocksDB batch if feature is enabled + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb = provider.rocksdb_provider(); + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb_batch = rocksdb.batch(); + #[cfg(not(all(unix, feature = "rocksdb")))] + let rocksdb_batch = (); + + // Create writer that routes to either MDBX or RocksDB based on settings + let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; + let static_file_provider = provider.static_file_provider(); let rev_walker = provider .block_body_indices_range(range.clone())? @@ -218,15 +242,18 @@ where // Delete all transactions that belong to this block for tx_id in body.tx_num_range() { - // First delete the transaction and hash to id mapping - if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? && - tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some() - { - tx_hash_number_cursor.delete_current()?; + if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? { + writer.delete_transaction_hash_number(transaction.trie_hash())?; } } } + // Extract and register RocksDB batch for commit at provider level + #[cfg(all(unix, feature = "rocksdb"))] + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), @@ -266,7 +293,7 @@ mod tests { }; use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; - use reth_db_api::transaction::DbTx; + use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_ethereum_primitives::Block; use reth_primitives_traits::SealedBlock; use reth_provider::{ @@ -581,4 +608,160 @@ mod tests { self.ensure_no_hash_by_block(input.unwind_to) } } + + #[cfg(all(unix, feature = "rocksdb"))] + mod rocksdb_tests { + use super::*; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage + /// writes transaction hash mappings to `RocksDB` instead of MDBX. + #[tokio::test] + async fn execute_writes_to_rocksdb_when_enabled() { + let (previous_stage, stage_progress) = (110, 100); + let mut rng = generators::rng(); + + // Set up the runner + let runner = TransactionLookupTestRunner::default(); + + // Enable RocksDB for transaction hash numbers + runner.db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let input = ExecInput { + target: Some(previous_stage), + checkpoint: Some(StageCheckpoint::new(stage_progress)), + }; + + // Insert blocks with transactions + let blocks = random_block_range( + &mut rng, + stage_progress + 1..=previous_stage, + BlockRangeParams { + parent: Some(B256::ZERO), + tx_count: 1..3, // Ensure we have transactions + ..Default::default() + }, + ); + runner + .db + .insert_blocks(blocks.iter(), StorageKind::Static) + .expect("failed to insert blocks"); + + // Count expected transactions + let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum(); + assert!(expected_tx_count > 0, "test requires at least one transaction"); + + // Execute the stage + let rx = runner.execute(input); + let result = rx.await.unwrap(); + assert!(result.is_ok(), "stage execution failed: {:?}", result); + + // Verify MDBX table is empty (data should be in RocksDB) + let mdbx_count = runner.db.count_entries::().unwrap(); + assert_eq!( + mdbx_count, 0, + "MDBX TransactionHashNumbers should be empty when RocksDB is enabled" + ); + + // Verify RocksDB has the data + let rocksdb = runner.db.factory.rocksdb_provider(); + let mut rocksdb_count = 0; + for block in &blocks { + for tx in &block.body().transactions { + let hash = *tx.tx_hash(); + let result = rocksdb.get::(hash).unwrap(); + assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash); + rocksdb_count += 1; + } + } + assert_eq!( + rocksdb_count, expected_tx_count, + "RocksDB should contain all transaction hashes" + ); + } + + /// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage + /// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX. + #[tokio::test] + async fn unwind_deletes_from_rocksdb_when_enabled() { + let (previous_stage, stage_progress) = (110, 100); + let mut rng = generators::rng(); + + // Set up the runner + let runner = TransactionLookupTestRunner::default(); + + // Enable RocksDB for transaction hash numbers + runner.db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + // Insert blocks with transactions + let blocks = random_block_range( + &mut rng, + stage_progress + 1..=previous_stage, + BlockRangeParams { + parent: Some(B256::ZERO), + tx_count: 1..3, // Ensure we have transactions + ..Default::default() + }, + ); + runner + .db + .insert_blocks(blocks.iter(), StorageKind::Static) + .expect("failed to insert blocks"); + + // Count expected transactions + let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum(); + assert!(expected_tx_count > 0, "test requires at least one transaction"); + + // Execute the stage first to populate RocksDB + let exec_input = ExecInput { + target: Some(previous_stage), + checkpoint: Some(StageCheckpoint::new(stage_progress)), + }; + let rx = runner.execute(exec_input); + let result = rx.await.unwrap(); + assert!(result.is_ok(), "stage execution failed: {:?}", result); + + // Verify RocksDB has the data before unwind + let rocksdb = runner.db.factory.rocksdb_provider(); + for block in &blocks { + for tx in &block.body().transactions { + let hash = *tx.tx_hash(); + let result = rocksdb.get::(hash).unwrap(); + assert!( + result.is_some(), + "Transaction hash {:?} should exist before unwind", + hash + ); + } + } + + // Now unwind to stage_progress (removing all the blocks we added) + let unwind_input = UnwindInput { + checkpoint: StageCheckpoint::new(previous_stage), + unwind_to: stage_progress, + bad_block: None, + }; + let unwind_result = runner.unwind(unwind_input).await; + assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result); + + // Verify RocksDB data is deleted after unwind + let rocksdb = runner.db.factory.rocksdb_provider(); + for block in &blocks { + for tx in &block.body().transactions { + let hash = *tx.tx_hash(); + let result = rocksdb.get::(hash).unwrap(); + assert!( + result.is_none(), + "Transaction hash {:?} should be deleted from RocksDB after unwind", + hash + ); + } + } + } + } } diff --git a/crates/stages/stages/src/test_utils/test_db.rs b/crates/stages/stages/src/test_utils/test_db.rs index e404450379..c137c38826 100644 --- a/crates/stages/stages/src/test_utils/test_db.rs +++ b/crates/stages/stages/src/test_utils/test_db.rs @@ -50,7 +50,7 @@ impl Default for TestStageDB { create_test_rw_db(), MAINNET.clone(), StaticFileProvider::read_write(static_dir_path).unwrap(), - RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(), + RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(), ) .expect("failed to create test provider factory"), } @@ -68,7 +68,7 @@ impl TestStageDB { create_test_rw_db_with_path(path), MAINNET.clone(), StaticFileProvider::read_write(static_dir_path).unwrap(), - RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(), + RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(), ) .expect("failed to create test provider factory"), } diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 9e976b057a..80ca829e6e 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -187,6 +187,21 @@ impl<'a> EitherWriter<'a, (), ()> { } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> { + /// Extracts the raw `RocksDB` write batch from this writer, if it contains one. + /// + /// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant, + /// `None` for other variants. + /// + /// This is used to defer `RocksDB` commits to the provider level, ensuring all + /// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place. + #[cfg(all(unix, feature = "rocksdb"))] + pub fn into_raw_rocksdb_batch(self) -> Option> { + match self { + Self::Database(_) | Self::StaticFile(_) => None, + Self::RocksDB(batch) => Some(batch.into_inner()), + } + } + /// Increment the block number. /// /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`]. @@ -304,13 +319,24 @@ where CURSOR: DbCursorRW + DbCursorRO, { /// Puts a transaction hash number mapping. + /// + /// When `append_only` is true, uses `cursor.append()` which is significantly faster + /// but requires entries to be inserted in order and the table to be empty. + /// When false, uses `cursor.insert()` which handles arbitrary insertion order. pub fn put_transaction_hash_number( &mut self, hash: TxHash, tx_num: TxNumber, + append_only: bool, ) -> ProviderResult<()> { match self { - Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?), + Self::Database(cursor) => { + if append_only { + Ok(cursor.append(hash, &tx_num)?) + } else { + Ok(cursor.insert(hash, &tx_num)?) + } + } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] Self::RocksDB(batch) => batch.put::(hash, &tx_num), @@ -663,12 +689,18 @@ mod tests { #[cfg(all(test, unix, feature = "rocksdb"))] mod rocksdb_tests { - use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider}; + use super::*; + use crate::{ + providers::rocksdb::{RocksDBBuilder, RocksDBProvider}, + test_utils::create_test_provider_factory, + RocksDBProviderFactory, + }; use alloy_primitives::{Address, B256}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey}, tables, }; + use reth_storage_api::{DatabaseProviderFactory, StorageSettings}; use tempfile::TempDir; fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) { @@ -682,6 +714,87 @@ mod rocksdb_tests { (temp_dir, provider) } + /// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer + /// when the storage setting is enabled, and that put operations followed by commit + /// persist the data to `RocksDB`. + #[test] + fn test_either_writer_transaction_hash_numbers_with_rocksdb() { + let factory = create_test_provider_factory(); + + // Enable RocksDB for transaction hash numbers + factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let hash1 = B256::from([1u8; 32]); + let hash2 = B256::from([2u8; 32]); + let tx_num1 = 100u64; + let tx_num2 = 200u64; + + // Get the RocksDB batch from the provider + let rocksdb = factory.rocksdb_provider(); + let batch = rocksdb.batch(); + + // Create EitherWriter with RocksDB + let provider = factory.database_provider_rw().unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); + + // Verify we got a RocksDB writer + assert!(matches!(writer, EitherWriter::RocksDB(_))); + + // Write transaction hash numbers (append_only=false since we're using RocksDB) + writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); + writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); + + // Extract the batch and register with provider for commit + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + + // Commit via provider - this commits RocksDB batch too + provider.commit().unwrap(); + + // Verify data was written to RocksDB + let rocksdb = factory.rocksdb_provider(); + assert_eq!(rocksdb.get::(hash1).unwrap(), Some(tx_num1)); + assert_eq!(rocksdb.get::(hash2).unwrap(), Some(tx_num2)); + } + + /// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`. + #[test] + fn test_either_writer_delete_transaction_hash_number_with_rocksdb() { + let factory = create_test_provider_factory(); + + // Enable RocksDB for transaction hash numbers + factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let hash = B256::from([1u8; 32]); + let tx_num = 100u64; + + // First, write a value directly to RocksDB + let rocksdb = factory.rocksdb_provider(); + rocksdb.put::(hash, &tx_num).unwrap(); + assert_eq!(rocksdb.get::(hash).unwrap(), Some(tx_num)); + + // Now delete using EitherWriter + let batch = rocksdb.batch(); + let provider = factory.database_provider_rw().unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); + writer.delete_transaction_hash_number(hash).unwrap(); + + // Extract the batch and commit via provider + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + provider.commit().unwrap(); + + // Verify deletion + let rocksdb = factory.rocksdb_provider(); + assert_eq!(rocksdb.get::(hash).unwrap(), None); + } + #[test] fn test_rocksdb_batch_transaction_hash_numbers() { let (_temp_dir, provider) = create_rocksdb_provider(); @@ -816,4 +929,65 @@ mod rocksdb_tests { // Verify deletion assert_eq!(provider.get::(key).unwrap(), None); } + + /// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level. + /// + /// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically + /// in a single place, making it easier to reason about commit ordering and consistency. + #[test] + fn test_rocksdb_commits_at_provider_level() { + let factory = create_test_provider_factory(); + + // Enable RocksDB for transaction hash numbers + factory.set_storage_settings_cache( + StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true), + ); + + let hash1 = B256::from([1u8; 32]); + let hash2 = B256::from([2u8; 32]); + let tx_num1 = 100u64; + let tx_num2 = 200u64; + + // Get the RocksDB batch from the provider + let rocksdb = factory.rocksdb_provider(); + let batch = rocksdb.batch(); + + // Create provider and EitherWriter + let provider = factory.database_provider_rw().unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); + + // Write transaction hash numbers (append_only=false since we're using RocksDB) + writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); + writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); + + // Extract the raw batch from the writer and register it with the provider + let raw_batch = writer.into_raw_rocksdb_batch(); + if let Some(batch) = raw_batch { + provider.set_pending_rocksdb_batch(batch); + } + + // Data should NOT be visible yet (batch not committed) + let rocksdb = factory.rocksdb_provider(); + assert_eq!( + rocksdb.get::(hash1).unwrap(), + None, + "Data should not be visible before provider.commit()" + ); + + // Commit the provider - this should commit both MDBX and RocksDB + provider.commit().unwrap(); + + // Now data should be visible in RocksDB + let rocksdb = factory.rocksdb_provider(); + assert_eq!( + rocksdb.get::(hash1).unwrap(), + Some(tx_num1), + "Data should be visible after provider.commit()" + ); + assert_eq!( + rocksdb.get::(hash2).unwrap(), + Some(tx_num2), + "Data should be visible after provider.commit()" + ); + } } diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 9a6286cc8b..44ae667f50 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -181,6 +181,11 @@ impl RocksDBProviderFactory for BlockchainProvider { fn rocksdb_provider(&self) -> RocksDBProvider { self.database.rocksdb_provider() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { + unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead") + } } impl HeaderProvider for BlockchainProvider { diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 0270f24644..64323d65e9 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -153,6 +153,11 @@ impl RocksDBProviderFactory for ProviderFactory { fn rocksdb_provider(&self) -> RocksDBProvider { self.rocksdb_provider.clone() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { + unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead") + } } impl>> ProviderFactory { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index a5ae621d27..56bb31f8e6 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -151,7 +151,6 @@ impl From> /// A provider struct that fetches data from the database. /// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`] -#[derive(Debug)] pub struct DatabaseProvider { /// Database transaction. tx: TX, @@ -167,10 +166,29 @@ pub struct DatabaseProvider { storage_settings: Arc>, /// `RocksDB` provider rocksdb_provider: RocksDBProvider, + /// Pending `RocksDB` batches to be committed at provider commit time. + #[cfg(all(unix, feature = "rocksdb"))] + pending_rocksdb_batches: parking_lot::Mutex>>, /// Minimum distance from tip required for pruning minimum_pruning_distance: u64, } +impl Debug for DatabaseProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut s = f.debug_struct("DatabaseProvider"); + s.field("tx", &self.tx) + .field("chain_spec", &self.chain_spec) + .field("static_file_provider", &self.static_file_provider) + .field("prune_modes", &self.prune_modes) + .field("storage", &self.storage) + .field("storage_settings", &self.storage_settings) + .field("rocksdb_provider", &self.rocksdb_provider); + #[cfg(all(unix, feature = "rocksdb"))] + s.field("pending_rocksdb_batches", &""); + s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish() + } +} + impl DatabaseProvider { /// Returns reference to prune modes. pub const fn prune_modes_ref(&self) -> &PruneModes { @@ -259,6 +277,11 @@ impl RocksDBProviderFactory for DatabaseProvider { fn rocksdb_provider(&self) -> RocksDBProvider { self.rocksdb_provider.clone() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction) { + self.pending_rocksdb_batches.lock().push(batch); + } } impl> ChainSpecProvider @@ -290,6 +313,8 @@ impl DatabaseProvider { storage, storage_settings, rocksdb_provider, + #[cfg(all(unix, feature = "rocksdb"))] + pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()), minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE, } } @@ -545,6 +570,8 @@ impl DatabaseProvider { storage, storage_settings, rocksdb_provider, + #[cfg(all(unix, feature = "rocksdb"))] + pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()), minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE, } } @@ -3178,7 +3205,7 @@ impl DBProvider for DatabaseProvider self.prune_modes_ref() } - /// Commit database transaction and static files. + /// Commit database transaction, static files, and pending `RocksDB` batches. fn commit(self) -> ProviderResult { // For unwinding it makes more sense to commit the database first, since if // it is interrupted before the static files commit, we can just @@ -3186,9 +3213,27 @@ impl DBProvider for DatabaseProvider // checkpoints on the next start-up. if self.static_file_provider.has_unwind_queued() { self.tx.commit()?; + + #[cfg(all(unix, feature = "rocksdb"))] + { + let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock()); + for batch in batches { + self.rocksdb_provider.commit_batch(batch)?; + } + } + self.static_file_provider.commit()?; } else { self.static_file_provider.commit()?; + + #[cfg(all(unix, feature = "rocksdb"))] + { + let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock()); + for batch in batches { + self.rocksdb_provider.commit_batch(batch)?; + } + } + self.tx.commit()?; } diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 1d6f4b230a..5039e86d3f 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -450,6 +450,19 @@ impl RocksDBProvider { batch_handle.commit() }) } + + /// Commits a raw `WriteBatchWithTransaction` to `RocksDB`. + /// + /// This is used when the batch was extracted via [`RocksDBBatch::into_inner`] + /// and needs to be committed at a later point (e.g., at provider commit time). + pub fn commit_batch(&self, batch: WriteBatchWithTransaction) -> ProviderResult<()> { + self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| { + ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + }) + } } /// Handle for building a batch of operations atomically. @@ -529,6 +542,13 @@ impl<'a> RocksDBBatch<'a> { pub const fn provider(&self) -> &RocksDBProvider { self.provider } + + /// Consumes the batch and returns the underlying `WriteBatchWithTransaction`. + /// + /// This is used to defer commits to the provider level. + pub fn into_inner(self) -> WriteBatchWithTransaction { + self.inner + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics. diff --git a/crates/storage/provider/src/test_utils/mod.rs b/crates/storage/provider/src/test_utils/mod.rs index 6b1e7c4b84..3002f31abd 100644 --- a/crates/storage/provider/src/test_utils/mod.rs +++ b/crates/storage/provider/src/test_utils/mod.rs @@ -1,5 +1,5 @@ use crate::{ - providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider}, + providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBBuilder, StaticFileProvider}, HashingWriter, ProviderFactory, TrieWriter, }; use alloy_primitives::B256; @@ -62,7 +62,10 @@ pub fn create_test_provider_factory_with_node_types( db, chain_spec, StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"), - RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"), + RocksDBBuilder::new(&rocksdb_dir) + .with_default_tables() + .build() + .expect("failed to create test RocksDB provider"), ) .expect("failed to create test provider factory") } diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 61609c3758..64eff68b03 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -29,4 +29,9 @@ impl RocksDBProviderFactory for NoopProvider< fn rocksdb_provider(&self) -> RocksDBProvider { RocksDBProvider::builder(PathBuf::default()).build().unwrap() } + + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { + // No-op for NoopProvider + } } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index c1ffcd4358..9d2186677d 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -6,4 +6,11 @@ use crate::providers::RocksDBProvider; pub trait RocksDBProviderFactory { /// Returns the `RocksDB` provider. fn rocksdb_provider(&self) -> RocksDBProvider; + + /// Adds a pending `RocksDB` batch to be committed when this provider is committed. + /// + /// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file + /// commits, ensuring atomicity across all storage backends. + #[cfg(all(unix, feature = "rocksdb"))] + fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction); }