Compare commits

..

6 Commits

Author SHA1 Message Date
joshieDo
80e2e18611 Merge remote-tracking branch 'origin/main' into feat/rocksdb-unwind-commit 2026-01-22 12:57:45 +00:00
joshieDo
d699891b0b feat(provider): add rocksdb deferred batch commits with unwind support
Adds support for deferred RocksDB batch operations that commit at provider
commit time with proper unwind commit ordering.

Changes:
- Add delete_range<T>() method to RocksDBBatch for clearing entire tables
- Add rocksdb_unwind_queued flag to DatabaseProvider to signal unwind commit order
- Add set_rocksdb_unwind_queued() setter method
- Update commit() to check rocksdb_unwind_queued in addition to static file unwind

When rocksdb_unwind_queued is set, the commit order becomes:
MDBX first -> RocksDB batches -> static files

This allows RocksDB batch operations (like table clears) to be deferred and
committed atomically with the rest of the provider state.
2026-01-22 12:47:02 +00:00
andrewshab
4b1c341ced fix: remove redundant clone (#21300) 2026-01-22 12:43:19 +00:00
Georgios Konstantopoulos
865f8f8951 perf(prune): sort tx hashes for efficient TransactionLookup pruning (#21297) 2026-01-22 12:10:07 +00:00
joshieDo
492fc20fd1 fix(cli): clear rocksdb tables in drop-stage command (#21299)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-22 12:09:36 +00:00
Sergei Shulepov
ad9886abb8 fix(mdbx): mark reserve as unsafe (#21263) 2026-01-22 12:03:12 +00:00
10 changed files with 156 additions and 57 deletions

View File

@@ -162,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
let access_rights =
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
db_exec!(self.env, tool, N, access_rights, {
command.execute(&tool, ctx.task_executor.clone(), &data_dir)?;
command.execute(&tool, ctx.task_executor, &data_dir)?;
});
}
Subcommands::StaticFileHeader(command) => {

View File

@@ -15,7 +15,8 @@ use reth_db_common::{
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
use reth_node_core::args::StageEnum;
use reth_provider::{
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter,
DBProvider, DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
StaticFileWriter, StorageSettingsCache,
};
use reth_prune::PruneSegment;
use reth_stages::StageId;
@@ -168,8 +169,20 @@ impl<C: ChainSpecParser> Command<C> {
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
tx.clear::<tables::AccountsHistory>()?;
tx.clear::<tables::StoragesHistory>()?;
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.account_history_in_rocksdb {
rocksdb.clear::<tables::AccountsHistory>()?;
} else {
tx.clear::<tables::AccountsHistory>()?;
}
if settings.storages_history_in_rocksdb {
rocksdb.clear::<tables::StoragesHistory>()?;
} else {
tx.clear::<tables::StoragesHistory>()?;
}
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
@@ -177,7 +190,14 @@ impl<C: ChainSpecParser> Command<C> {
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
}
StageEnum::TxLookup => {
tx.clear::<tables::TransactionHashNumbers>()?;
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
tool.provider_factory
.rocksdb_provider()
.clear::<tables::TransactionHashNumbers>()?;
} else {
tx.clear::<tables::TransactionHashNumbers>()?;
}
reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
reset_stage_checkpoint(tx, StageId::TransactionLookup)?;

View File

@@ -96,12 +96,17 @@ where
let tx_range_end = *tx_range.end();
// Retrieve transactions in the range and calculate their hashes in parallel
let hashes = provider
let mut hashes = provider
.transactions_by_tx_range(tx_range.clone())?
.into_par_iter()
.map(|transaction| transaction.trie_hash())
.collect::<Vec<_>>();
// Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
// table, which is keyed by hash. Without sorting, each seek would be O(log n) random
// access; with sorting, the cursor advances sequentially through the B+tree.
hashes.sort_unstable();
// Number of transactions retrieved from the database should match the tx range count
let tx_count = tx_range.count();
if hashes.len() != tx_count {

View File

@@ -412,8 +412,16 @@ impl Transaction<RW> {
/// Returns a buffer which can be used to write a value into the item at the
/// given key and with the given length. The buffer must be completely
/// filled by the caller.
///
/// This should not be used on dupsort tables.
///
/// # Safety
///
/// The caller must ensure that the returned buffer is not used after the transaction is
/// committed or aborted, or if another value is inserted. To be clear: the second call to
/// this function is not permitted while the returned slice is reachable.
#[allow(clippy::mut_from_ref)]
pub fn reserve(
pub unsafe fn reserve(
&self,
dbi: ffi::MDBX_dbi,
key: impl AsRef<[u8]>,

View File

@@ -105,8 +105,11 @@ fn test_reserve() {
let txn = env.begin_rw_txn().unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
writer.write_all(b"val1").unwrap();
unsafe {
// SAFETY: the returned slice is used before the transaction is committed or aborted.
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
writer.write_all(b"val1").unwrap();
}
}
txn.commit().unwrap();

View File

@@ -1351,8 +1351,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1444,8 +1443,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader

View File

@@ -44,7 +44,9 @@ use std::{
use tracing::trace;
mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
pub use provider::{
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
};
use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;
@@ -230,6 +232,21 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
)))
}
/// Returns a provider configured for unwind operations (MDBX-first commit order).
#[track_caller]
pub fn unwind_provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
Ok(DatabaseProviderRW(DatabaseProvider::new_unwind_rw(
self.db.tx_mut()?,
self.chain_spec.clone(),
self.static_file_provider.clone(),
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
)))
}
/// State provider for latest block
#[track_caller]
pub fn latest(&self) -> ProviderResult<StateProviderBox> {

View File

@@ -84,6 +84,17 @@ use std::{
};
use tracing::{debug, instrument, trace};
/// Determines the commit order for database operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum CommitOrder {
/// Normal commit order: static files first, then `RocksDB`, then MDBX.
#[default]
Normal,
/// Unwind commit order: MDBX first, then `RocksDB`, then static files.
/// Used for unwind operations to allow recovery by truncating static files on restart.
Unwind,
}
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
@@ -186,6 +197,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Commit order for this provider.
commit_order: CommitOrder,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -315,6 +328,35 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
}
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
/// Internal constructor with commit order.
#[allow(clippy::too_many_arguments)]
fn new_rw_inner(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
commit_order: CommitOrder,
) -> Self {
Self {
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
}
/// Creates a provider with an inner read-write transaction.
#[allow(clippy::too_many_arguments)]
pub fn new_rw(
@@ -327,7 +369,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self {
Self::new_rw_inner(
tx,
chain_spec,
static_file_provider,
@@ -336,10 +378,35 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage_settings,
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
CommitOrder::Normal,
)
}
/// Creates a provider with an inner read-write transaction for unwind operations.
///
/// Uses MDBX-first commit order for `RocksDB` batches.
#[allow(clippy::too_many_arguments)]
pub fn new_unwind_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self::new_rw_inner(
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
CommitOrder::Unwind,
)
}
}
@@ -876,6 +943,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order: CommitOrder::Normal,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3483,7 +3551,8 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// it is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() {
if self.static_file_provider.has_unwind_queued() || self.commit_order == CommitOrder::Unwind
{
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]

View File

@@ -594,7 +594,7 @@ impl RocksDBProvider {
let write_options = WriteOptions::default();
let txn_options = OptimisticTransactionOptions::default();
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self, assume_history_complete: false }
RocksTx { inner, provider: self }
}
/// Creates a new batch for atomic writes.
@@ -712,6 +712,9 @@ impl RocksDBProvider {
/// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
/// This end key must exceed the maximum encoded key size for any table.
/// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
///
/// # Panics
/// Panics if the provider is in read-only mode.
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
let cf = self.get_cf_handle::<T>()?;
@@ -1525,10 +1528,6 @@ impl<'a> RocksDBBatch<'a> {
pub struct RocksTx<'db> {
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
/// When true, assume `RocksDB` has complete history (like `MDBX`) and return `NotYetWritten`
/// when querying before the first history entry. When false (default), return
/// `MaybeInPlainState` for hybrid storage safety.
assume_history_complete: bool,
}
impl fmt::Debug for RocksTx<'_> {
@@ -1538,16 +1537,6 @@ impl fmt::Debug for RocksTx<'_> {
}
impl<'db> RocksTx<'db> {
/// Sets the `assume_history_complete` flag to true.
///
/// When enabled, history queries will return `NotYetWritten` (like `MDBX`) instead of
/// `MaybeInPlainState` when querying before the first history entry. Use this in tests
/// where `RocksDB` and `MDBX` have identical data.
pub const fn with_assume_history_complete(mut self) -> Self {
self.assume_history_complete = true;
self
}
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
let encoded_key = key.encode();
@@ -1714,19 +1703,10 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// Determines whether to soften NotYetWritten -> MaybeInPlainState.
//
// We soften when:
// 1. `assume_history_complete` is false (hybrid storage - RocksDB may not have full
// history)
// 2. OR history may be pruned (`lowest_available_block_number.is_some()`)
//
// We only return NotYetWritten when we're certain history is complete AND not pruned.
let should_soften_not_yet_written =
!self.assume_history_complete || lowest_available_block_number.is_some();
// History may be pruned if a lowest available block is set.
let is_maybe_pruned = lowest_available_block_number.is_some();
let fallback = || {
Ok(if should_soften_not_yet_written {
Ok(if is_maybe_pruned {
HistoryInfo::MaybeInPlainState
} else {
HistoryInfo::NotYetWritten
@@ -1780,19 +1760,11 @@ impl<'db> RocksTx<'db> {
false
};
// Apply the same softening logic to `from_lookup` result.
let result = HistoryInfo::from_lookup(
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
);
Ok(match result {
HistoryInfo::NotYetWritten if should_soften_not_yet_written => {
HistoryInfo::MaybeInPlainState
}
other => other,
})
))
}
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.

View File

@@ -81,6 +81,13 @@ impl RocksDBProvider {
pub const fn table_stats(&self) -> Vec<RocksDBTableStats> {
Vec::new()
}
/// Clears all entries from the specified table (stub implementation).
///
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
pub const fn clear<T>(&self) -> ProviderResult<()> {
Ok(())
}
}
impl DatabaseMetrics for RocksDBProvider {