diff --git a/crates/storage/db-api/src/models/metadata.rs b/crates/storage/db-api/src/models/metadata.rs index 6fa9ea6443..6586c8b7f4 100644 --- a/crates/storage/db-api/src/models/metadata.rs +++ b/crates/storage/db-api/src/models/metadata.rs @@ -101,4 +101,11 @@ impl StorageSettings { self.account_changesets_in_static_files = value; self } + + /// Returns `true` if any tables are configured to be stored in `RocksDB`. + pub const fn any_in_rocksdb(&self) -> bool { + self.transaction_hash_numbers_in_rocksdb || + self.account_history_in_rocksdb || + self.storages_history_in_rocksdb + } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 692bc7737c..dec302f8f0 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -4,7 +4,7 @@ use crate::{ }, providers::{ database::{chain::ChainStorage, metrics}, - rocksdb::RocksDBProvider, + rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx}, static_file::{StaticFileWriteCtx, StaticFileWriter}, NodeTypesForProvider, StaticFileProvider, }, @@ -188,8 +188,8 @@ pub struct DatabaseProvider { /// `RocksDB` provider rocksdb_provider: RocksDBProvider, /// Pending `RocksDB` batches to be committed at provider commit time. - #[cfg(all(unix, feature = "rocksdb"))] - pending_rocksdb_batches: parking_lot::Mutex>>, + #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))] + pending_rocksdb_batches: PendingRocksDBBatches, /// Minimum distance from tip required for pruning minimum_pruning_distance: u64, /// Database provider metrics @@ -205,10 +205,10 @@ impl Debug for DatabaseProvider { .field("prune_modes", &self.prune_modes) .field("storage", &self.storage) .field("storage_settings", &self.storage_settings) - .field("rocksdb_provider", &self.rocksdb_provider); - #[cfg(all(unix, feature = "rocksdb"))] - s.field("pending_rocksdb_batches", &""); - s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish() + .field("rocksdb_provider", &self.rocksdb_provider) + .field("pending_rocksdb_batches", &"") + .field("minimum_pruning_distance", &self.minimum_pruning_distance) + .finish() } } @@ -336,8 +336,7 @@ impl DatabaseProvider { storage, storage_settings, rocksdb_provider, - #[cfg(all(unix, feature = "rocksdb"))] - pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()), + pending_rocksdb_batches: Default::default(), minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE, metrics: metrics::DatabaseProviderMetrics::default(), } @@ -403,6 +402,17 @@ impl DatabaseProvider RocksDBWriteCtx { + RocksDBWriteCtx { + first_block_number: first_block, + prune_tx_lookup: self.prune_modes.transaction_lookup, + storage_settings: self.cached_storage_settings(), + pending_batches: self.pending_rocksdb_batches.clone(), + } + } + /// Writes executed blocks and state to storage. /// /// This method parallelizes static file (SF) writes with MDBX writes. @@ -452,6 +462,10 @@ impl DatabaseProvider DatabaseProvider(start.elapsed()) }); + // RocksDB writes + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| { + s.spawn(|| { + let start = Instant::now(); + rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?; + Ok::<_, ProviderError>(start.elapsed()) + }) + }); + // MDBX writes let mdbx_start = Instant::now(); @@ -557,6 +581,12 @@ impl DatabaseProvider DatabaseProvider { storage, storage_settings, rocksdb_provider, - #[cfg(all(unix, feature = "rocksdb"))] - pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()), + pending_rocksdb_batches: Default::default(), minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE, metrics: metrics::DatabaseProviderMetrics::default(), } @@ -3167,14 +3196,13 @@ impl HistoryWriter for DatabaseProvi #[instrument(level = "debug", target = "providers::db", skip_all)] fn update_history_indices(&self, range: RangeInclusive) -> ProviderResult<()> { - // account history stage - { + let storage_settings = self.cached_storage_settings(); + if !storage_settings.account_history_in_rocksdb { let indices = self.changed_accounts_and_blocks_with_range(range.clone())?; self.insert_account_history_index(indices)?; } - // storage history stage - { + if !storage_settings.storages_history_in_rocksdb { let indices = self.changed_storages_and_blocks_with_range(range)?; self.insert_storage_history_index(indices)?; } diff --git a/crates/storage/provider/src/providers/rocksdb/mod.rs b/crates/storage/provider/src/providers/rocksdb/mod.rs index 5c6cf11f32..f9b4ff8304 100644 --- a/crates/storage/provider/src/providers/rocksdb/mod.rs +++ b/crates/storage/provider/src/providers/rocksdb/mod.rs @@ -4,4 +4,5 @@ mod invariants; mod metrics; mod provider; +pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx}; pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx}; diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 670ab0ccba..88f09a9d35 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1,11 +1,16 @@ use super::metrics::{RocksDBMetrics, RocksDBOperation}; use crate::providers::{needs_prev_shard_check, HistoryInfo}; -use alloy_primitives::{Address, BlockNumber, B256}; +use alloy_consensus::transaction::TxHashRef; +use alloy_primitives::{Address, BlockNumber, TxNumber, B256}; +use parking_lot::Mutex; +use reth_chain_state::ExecutedBlock; use reth_db_api::{ - models::{storage_sharded_key::StorageShardedKey, ShardedKey}, + models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings}, table::{Compress, Decode, Decompress, Encode, Table}, tables, BlockNumberList, DatabaseError, }; +use reth_primitives_traits::BlockBody as _; +use reth_prune_types::PruneMode; use reth_storage_errors::{ db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel}, provider::{ProviderError, ProviderResult}, @@ -16,11 +21,41 @@ use rocksdb::{ OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions, }; use std::{ + collections::BTreeMap, fmt, path::{Path, PathBuf}, sync::Arc, + thread, time::Instant, }; +use tracing::instrument; + +/// Pending `RocksDB` batches type alias. +pub(crate) type PendingRocksDBBatches = Arc>>>; + +/// Context for `RocksDB` block writes. +#[derive(Clone)] +pub(crate) struct RocksDBWriteCtx { + /// The first block number being written. + pub first_block_number: BlockNumber, + /// The prune mode for transaction lookup, if any. + pub prune_tx_lookup: Option, + /// Storage settings determining what goes to `RocksDB`. + pub storage_settings: StorageSettings, + /// Pending batches to push to after writing. + pub pending_batches: PendingRocksDBBatches, +} + +impl fmt::Debug for RocksDBWriteCtx { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RocksDBWriteCtx") + .field("first_block_number", &self.first_block_number) + .field("prune_tx_lookup", &self.prune_tx_lookup) + .field("storage_settings", &self.storage_settings) + .field("pending_batches", &"") + .finish() + } +} /// Default cache size for `RocksDB` block cache (128 MB). const DEFAULT_CACHE_SIZE: usize = 128 << 20; @@ -474,6 +509,125 @@ impl RocksDBProvider { })) }) } + + /// Writes all `RocksDB` data for multiple blocks in parallel. + /// + /// This handles transaction hash numbers, account history, and storage history based on + /// the provided storage settings. Each operation runs in parallel with its own batch, + /// pushing to `ctx.pending_batches` for later commit. + #[instrument(level = "debug", target = "providers::db", skip_all)] + pub(crate) fn write_blocks_data( + &self, + blocks: &[ExecutedBlock], + tx_nums: &[TxNumber], + ctx: RocksDBWriteCtx, + ) -> ProviderResult<()> { + if !ctx.storage_settings.any_in_rocksdb() { + return Ok(()); + } + + thread::scope(|s| { + let handles: Vec<_> = [ + (ctx.storage_settings.transaction_hash_numbers_in_rocksdb && + ctx.prune_tx_lookup.is_none_or(|m| !m.is_full())) + .then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))), + ctx.storage_settings + .account_history_in_rocksdb + .then(|| s.spawn(|| self.write_account_history(blocks, &ctx))), + ctx.storage_settings + .storages_history_in_rocksdb + .then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))), + ] + .into_iter() + .enumerate() + .filter_map(|(i, h)| h.map(|h| (i, h))) + .collect(); + + for (i, handle) in handles { + handle.join().map_err(|_| { + ProviderError::Database(DatabaseError::Other(format!( + "rocksdb write thread {i} panicked" + ))) + })??; + } + + Ok(()) + }) + } + + /// Writes transaction hash to number mappings for the given blocks. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_tx_hash_numbers( + &self, + blocks: &[ExecutedBlock], + tx_nums: &[TxNumber], + ctx: &RocksDBWriteCtx, + ) -> ProviderResult<()> { + let mut batch = self.batch(); + for (block, &first_tx_num) in blocks.iter().zip(tx_nums) { + let body = block.recovered_block().body(); + let mut tx_num = first_tx_num; + for transaction in body.transactions_iter() { + batch.put::(*transaction.tx_hash(), &tx_num)?; + tx_num += 1; + } + } + ctx.pending_batches.lock().push(batch.into_inner()); + Ok(()) + } + + /// Writes account history indices for the given blocks. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_account_history( + &self, + blocks: &[ExecutedBlock], + ctx: &RocksDBWriteCtx, + ) -> ProviderResult<()> { + let mut batch = self.batch(); + let mut account_history: BTreeMap> = BTreeMap::new(); + for (block_idx, block) in blocks.iter().enumerate() { + let block_number = ctx.first_block_number + block_idx as u64; + let bundle = &block.execution_outcome().bundle; + for &address in bundle.state().keys() { + account_history.entry(address).or_default().push(block_number); + } + } + for (address, blocks) in account_history { + let key = ShardedKey::new(address, u64::MAX); + let value = BlockNumberList::new_pre_sorted(blocks); + batch.put::(key, &value)?; + } + ctx.pending_batches.lock().push(batch.into_inner()); + Ok(()) + } + + /// Writes storage history indices for the given blocks. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_storage_history( + &self, + blocks: &[ExecutedBlock], + ctx: &RocksDBWriteCtx, + ) -> ProviderResult<()> { + let mut batch = self.batch(); + let mut storage_history: BTreeMap<(Address, B256), Vec> = BTreeMap::new(); + for (block_idx, block) in blocks.iter().enumerate() { + let block_number = ctx.first_block_number + block_idx as u64; + let bundle = &block.execution_outcome().bundle; + for (&address, account) in bundle.state() { + for &slot in account.storage.keys() { + let key = B256::new(slot.to_be_bytes()); + storage_history.entry((address, key)).or_default().push(block_number); + } + } + } + for ((address, slot), blocks) in storage_history { + let key = StorageShardedKey::new(address, slot, u64::MAX); + let value = BlockNumberList::new_pre_sorted(blocks); + batch.put::(key, &value)?; + } + ctx.pending_batches.lock().push(batch.into_inner()); + Ok(()) + } } /// Handle for building a batch of operations atomically. diff --git a/crates/storage/provider/src/providers/rocksdb_stub.rs b/crates/storage/provider/src/providers/rocksdb_stub.rs index 5fac73eca7..0160ef8702 100644 --- a/crates/storage/provider/src/providers/rocksdb_stub.rs +++ b/crates/storage/provider/src/providers/rocksdb_stub.rs @@ -2,28 +2,42 @@ //! //! This module provides placeholder types that allow the code to compile when `RocksDB` is not //! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled). -//! Operations will produce errors if actually attempted. +//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here. -use reth_db_api::table::{Encode, Table}; -use reth_storage_errors::{ - db::LogLevel, - provider::{ProviderError::UnsupportedProvider, ProviderResult}, -}; -use std::path::Path; +use alloy_primitives::BlockNumber; +use parking_lot::Mutex; +use reth_db_api::models::StorageSettings; +use reth_prune_types::PruneMode; +use reth_storage_errors::{db::LogLevel, provider::ProviderResult}; +use std::{path::Path, sync::Arc}; + +/// Pending `RocksDB` batches type alias (stub - uses unit type). +pub(crate) type PendingRocksDBBatches = Arc>>; + +/// Context for `RocksDB` block writes (stub). +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) struct RocksDBWriteCtx { + /// The first block number being written. + pub first_block_number: BlockNumber, + /// The prune mode for transaction lookup, if any. + pub prune_tx_lookup: Option, + /// Storage settings determining what goes to `RocksDB`. + pub storage_settings: StorageSettings, + /// Pending batches (stub - unused). + pub pending_batches: PendingRocksDBBatches, +} /// A stub `RocksDB` provider. /// /// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix -/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the -/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations -/// route to MDBX instead. +/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider` +/// are cfg-guarded in the calling code, so this stub only provides type definitions. #[derive(Debug, Clone)] pub struct RocksDBProvider; impl RocksDBProvider { /// Creates a new stub `RocksDB` provider. - /// - /// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported. pub fn new(_path: impl AsRef) -> ProviderResult { Ok(Self) } @@ -33,130 +47,22 @@ impl RocksDBProvider { RocksDBBuilder::new(path) } - /// Get a value from `RocksDB` (stub implementation). - pub fn get(&self, _key: T::Key) -> ProviderResult> { - Err(UnsupportedProvider) - } - - /// Get a value from `RocksDB` using pre-encoded key (stub implementation). - pub const fn get_encoded( - &self, - _key: &::Encoded, - ) -> ProviderResult> { - Err(UnsupportedProvider) - } - - /// Put a value into `RocksDB` (stub implementation). - pub fn put(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Put a value into `RocksDB` using pre-encoded key (stub implementation). - pub const fn put_encoded( - &self, - _key: &::Encoded, - _value: &T::Value, - ) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Delete a value from `RocksDB` (stub implementation). - pub fn delete(&self, _key: T::Key) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Write a batch of operations (stub implementation). - pub fn write_batch(&self, _f: F) -> ProviderResult<()> - where - F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>, - { - Err(UnsupportedProvider) - } - - /// Creates a new transaction (stub implementation). - pub const fn tx(&self) -> RocksTx { - RocksTx - } - - /// Creates a new batch for atomic writes (stub implementation). - pub const fn batch(&self) -> RocksDBBatch { - RocksDBBatch - } - - /// Gets the first key-value pair from a table (stub implementation). - pub const fn first(&self) -> ProviderResult> { - Ok(None) - } - - /// Gets the last key-value pair from a table (stub implementation). - pub const fn last(&self) -> ProviderResult> { - Ok(None) - } - - /// Creates an iterator for the specified table (stub implementation). - /// - /// Returns an empty iterator. This is consistent with `first()` and `last()` returning - /// `Ok(None)` - the stub behaves as if the database is empty rather than unavailable. - pub const fn iter(&self) -> ProviderResult> { - Ok(RocksDBIter { _marker: std::marker::PhantomData }) - } - /// Check consistency of `RocksDB` tables (stub implementation). /// /// Returns `None` since there is no `RocksDB` data to check when the feature is disabled. pub const fn check_consistency( &self, _provider: &Provider, - ) -> ProviderResult> { + ) -> ProviderResult> { Ok(None) } } -/// A stub batch writer for `RocksDB` on non-Unix platforms. +/// A stub batch writer for `RocksDB`. #[derive(Debug)] pub struct RocksDBBatch; -impl RocksDBBatch { - /// Puts a value into the batch (stub implementation). - pub fn put(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Puts a value into the batch using pre-encoded key (stub implementation). - pub const fn put_encoded( - &self, - _key: &::Encoded, - _value: &T::Value, - ) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Deletes a value from the batch (stub implementation). - pub fn delete(&self, _key: T::Key) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Commits the batch (stub implementation). - pub const fn commit(self) -> ProviderResult<()> { - Err(UnsupportedProvider) - } -} - -/// A stub iterator for `RocksDB` (non-transactional). -#[derive(Debug)] -pub struct RocksDBIter<'a, T> { - _marker: std::marker::PhantomData<(&'a (), T)>, -} - -impl Iterator for RocksDBIter<'_, T> { - type Item = ProviderResult<(T::Key, T::Value)>; - - fn next(&mut self) -> Option { - None - } -} - -/// A stub builder for `RocksDB` on non-Unix platforms. +/// A stub builder for `RocksDB`. #[derive(Debug)] pub struct RocksDBBuilder; @@ -167,7 +73,7 @@ impl RocksDBBuilder { } /// Adds a column family for a specific table type (stub implementation). - pub const fn with_table(self) -> Self { + pub const fn with_table(self) -> Self { self } @@ -205,71 +111,3 @@ impl RocksDBBuilder { /// A stub transaction for `RocksDB`. #[derive(Debug)] pub struct RocksTx; - -impl RocksTx { - /// Gets a value from the specified table (stub implementation). - pub fn get(&self, _key: T::Key) -> ProviderResult> { - Err(UnsupportedProvider) - } - - /// Gets a value using pre-encoded key (stub implementation). - pub const fn get_encoded( - &self, - _key: &::Encoded, - ) -> ProviderResult> { - Err(UnsupportedProvider) - } - - /// Puts a value into the specified table (stub implementation). - pub fn put(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Puts a value using pre-encoded key (stub implementation). - pub const fn put_encoded( - &self, - _key: &::Encoded, - _value: &T::Value, - ) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Deletes a value from the specified table (stub implementation). - pub fn delete(&self, _key: T::Key) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Creates an iterator for the specified table (stub implementation). - pub const fn iter(&self) -> ProviderResult> { - Err(UnsupportedProvider) - } - - /// Creates an iterator starting from the given key (stub implementation). - pub fn iter_from(&self, _key: T::Key) -> ProviderResult> { - Err(UnsupportedProvider) - } - - /// Commits the transaction (stub implementation). - pub const fn commit(self) -> ProviderResult<()> { - Err(UnsupportedProvider) - } - - /// Rolls back the transaction (stub implementation). - pub const fn rollback(self) -> ProviderResult<()> { - Err(UnsupportedProvider) - } -} - -/// A stub iterator for `RocksDB` transactions. -#[derive(Debug)] -pub struct RocksTxIter<'a, T> { - _marker: std::marker::PhantomData<(&'a (), T)>, -} - -impl Iterator for RocksTxIter<'_, T> { - type Item = ProviderResult<(T::Key, T::Value)>; - - fn next(&mut self) -> Option { - None - } -}