feat: add rocksdb to save_blocks (#21003)

Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
This commit is contained in:
joshieDo
2026-01-15 18:33:19 +00:00
committed by GitHub
parent b1f107b171
commit e9b079ad62
5 changed files with 237 additions and 209 deletions

View File

@@ -101,4 +101,11 @@ impl StorageSettings {
self.account_changesets_in_static_files = value;
self
}
/// Returns `true` if any tables are configured to be stored in `RocksDB`.
pub const fn any_in_rocksdb(&self) -> bool {
self.transaction_hash_numbers_in_rocksdb ||
self.account_history_in_rocksdb ||
self.storages_history_in_rocksdb
}
}

View File

@@ -4,7 +4,7 @@ use crate::{
},
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::RocksDBProvider,
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
@@ -188,8 +188,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -205,10 +205,10 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
.field("rocksdb_provider", &self.rocksdb_provider)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
}
@@ -336,8 +336,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -403,6 +402,17 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
})
}
/// Creates the context for `RocksDB` writes.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
RocksDBWriteCtx {
first_block_number: first_block,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
}
}
/// Writes executed blocks and state to storage.
///
/// This method parallelizes static file (SF) writes with MDBX writes.
@@ -452,6 +462,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_provider = self.rocksdb_provider.clone();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
thread::scope(|s| {
// SF writes
@@ -461,6 +475,16 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok::<_, ProviderError>(start.elapsed())
});
// RocksDB writes
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
s.spawn(|| {
let start = Instant::now();
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
Ok::<_, ProviderError>(start.elapsed())
})
});
// MDBX writes
let mdbx_start = Instant::now();
@@ -557,6 +581,12 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
.join()
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
// Wait for RocksDB thread
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(handle) = rocksdb_handle {
timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
}
timings.total = total_start.elapsed();
self.metrics.record_save_blocks(&timings);
@@ -821,8 +851,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3167,14 +3196,13 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
// account history stage
{
let storage_settings = self.cached_storage_settings();
if !storage_settings.account_history_in_rocksdb {
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?;
}
// storage history stage
{
if !storage_settings.storages_history_in_rocksdb {
let indices = self.changed_storages_and_blocks_with_range(range)?;
self.insert_storage_history_index(indices)?;
}

View File

@@ -4,4 +4,5 @@ mod invariants;
mod metrics;
mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};

View File

@@ -1,11 +1,16 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use alloy_primitives::{Address, BlockNumber, B256};
use alloy_consensus::transaction::TxHashRef;
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use parking_lot::Mutex;
use reth_chain_state::ExecutedBlock;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
provider::{ProviderError, ProviderResult},
@@ -16,11 +21,41 @@ use rocksdb::{
OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
};
use std::{
collections::BTreeMap,
fmt,
path::{Path, PathBuf},
sync::Arc,
thread,
time::Instant,
};
use tracing::instrument;
/// Pending `RocksDB` batches type alias.
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
}
impl fmt::Debug for RocksDBWriteCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBWriteCtx")
.field("first_block_number", &self.first_block_number)
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.finish()
}
}
/// Default cache size for `RocksDB` block cache (128 MB).
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
@@ -474,6 +509,125 @@ impl RocksDBProvider {
}))
})
}
/// Writes all `RocksDB` data for multiple blocks in parallel.
///
/// This handles transaction hash numbers, account history, and storage history based on
/// the provided storage settings. Each operation runs in parallel with its own batch,
/// pushing to `ctx.pending_batches` for later commit.
#[instrument(level = "debug", target = "providers::db", skip_all)]
pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: RocksDBWriteCtx,
) -> ProviderResult<()> {
if !ctx.storage_settings.any_in_rocksdb() {
return Ok(());
}
thread::scope(|s| {
let handles: Vec<_> = [
(ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
.then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
ctx.storage_settings
.account_history_in_rocksdb
.then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
ctx.storage_settings
.storages_history_in_rocksdb
.then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
]
.into_iter()
.enumerate()
.filter_map(|(i, h)| h.map(|h| (i, h)))
.collect();
for (i, handle) in handles {
handle.join().map_err(|_| {
ProviderError::Database(DatabaseError::Other(format!(
"rocksdb write thread {i} panicked"
)))
})??;
}
Ok(())
})
}
/// Writes transaction hash to number mappings for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
let body = block.recovered_block().body();
let mut tx_num = first_tx_num;
for transaction in body.transactions_iter() {
batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes account history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
}
}
for (address, blocks) in account_history {
let key = ShardedKey::new(address, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::AccountsHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes storage history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
for ((address, slot), blocks) in storage_history {
let key = StorageShardedKey::new(address, slot, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::StoragesHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
}
/// Handle for building a batch of operations atomically.

View File

@@ -2,28 +2,42 @@
//!
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
//! Operations will produce errors if actually attempted.
//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here.
use reth_db_api::table::{Encode, Table};
use reth_storage_errors::{
db::LogLevel,
provider::{ProviderError::UnsupportedProvider, ProviderResult},
};
use std::path::Path;
use alloy_primitives::BlockNumber;
use parking_lot::Mutex;
use reth_db_api::models::StorageSettings;
use reth_prune_types::PruneMode;
use reth_storage_errors::{db::LogLevel, provider::ProviderResult};
use std::{path::Path, sync::Arc};
/// Pending `RocksDB` batches type alias (stub - uses unit type).
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
}
/// A stub `RocksDB` provider.
///
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the
/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations
/// route to MDBX instead.
/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider`
/// are cfg-guarded in the calling code, so this stub only provides type definitions.
#[derive(Debug, Clone)]
pub struct RocksDBProvider;
impl RocksDBProvider {
/// Creates a new stub `RocksDB` provider.
///
/// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported.
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
Ok(Self)
}
@@ -33,130 +47,22 @@ impl RocksDBProvider {
RocksDBBuilder::new(path)
}
/// Get a value from `RocksDB` (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Get a value from `RocksDB` using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Delete a value from `RocksDB` (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Write a batch of operations (stub implementation).
pub fn write_batch<F>(&self, _f: F) -> ProviderResult<()>
where
F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>,
{
Err(UnsupportedProvider)
}
/// Creates a new transaction (stub implementation).
pub const fn tx(&self) -> RocksTx {
RocksTx
}
/// Creates a new batch for atomic writes (stub implementation).
pub const fn batch(&self) -> RocksDBBatch {
RocksDBBatch
}
/// Gets the first key-value pair from a table (stub implementation).
pub const fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Gets the last key-value pair from a table (stub implementation).
pub const fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Creates an iterator for the specified table (stub implementation).
///
/// Returns an empty iterator. This is consistent with `first()` and `last()` returning
/// `Ok(None)` - the stub behaves as if the database is empty rather than unavailable.
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
Ok(RocksDBIter { _marker: std::marker::PhantomData })
}
/// Check consistency of `RocksDB` tables (stub implementation).
///
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
pub const fn check_consistency<Provider>(
&self,
_provider: &Provider,
) -> ProviderResult<Option<alloy_primitives::BlockNumber>> {
) -> ProviderResult<Option<BlockNumber>> {
Ok(None)
}
}
/// A stub batch writer for `RocksDB` on non-Unix platforms.
/// A stub batch writer for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBatch;
impl RocksDBBatch {
/// Puts a value into the batch (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value into the batch using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the batch (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Commits the batch (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` (non-transactional).
#[derive(Debug)]
pub struct RocksDBIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksDBIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
/// A stub builder for `RocksDB` on non-Unix platforms.
/// A stub builder for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBuilder;
@@ -167,7 +73,7 @@ impl RocksDBBuilder {
}
/// Adds a column family for a specific table type (stub implementation).
pub const fn with_table<T: Table>(self) -> Self {
pub const fn with_table<T>(self) -> Self {
self
}
@@ -205,71 +111,3 @@ impl RocksDBBuilder {
/// A stub transaction for `RocksDB`.
#[derive(Debug)]
pub struct RocksTx;
impl RocksTx {
/// Gets a value from the specified table (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Gets a value using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Puts a value into the specified table (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the specified table (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Creates an iterator for the specified table (stub implementation).
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Creates an iterator starting from the given key (stub implementation).
pub fn iter_from<T: Table>(&self, _key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Commits the transaction (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Rolls back the transaction (stub implementation).
pub const fn rollback(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` transactions.
#[derive(Debug)]
pub struct RocksTxIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksTxIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}