Compare commits

...

2 Commits

Author SHA1 Message Date
joshieDo
80e2e18611 Merge remote-tracking branch 'origin/main' into feat/rocksdb-unwind-commit 2026-01-22 12:57:45 +00:00
joshieDo
d699891b0b feat(provider): add rocksdb deferred batch commits with unwind support
Adds support for deferred RocksDB batch operations that commit at provider
commit time with proper unwind commit ordering.

Changes:
- Add delete_range<T>() method to RocksDBBatch for clearing entire tables
- Add rocksdb_unwind_queued flag to DatabaseProvider to signal unwind commit order
- Add set_rocksdb_unwind_queued() setter method
- Update commit() to check rocksdb_unwind_queued in addition to static file unwind

When rocksdb_unwind_queued is set, the commit order becomes:
MDBX first -> RocksDB batches -> static files

This allows RocksDB batch operations (like table clears) to be deferred and
committed atomically with the rest of the provider state.
2026-01-22 12:47:02 +00:00
3 changed files with 96 additions and 7 deletions

View File

@@ -44,7 +44,9 @@ use std::{
use tracing::trace;
mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
pub use provider::{
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
};
use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;
@@ -230,6 +232,21 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
)))
}
/// Returns a provider configured for unwind operations (MDBX-first commit order).
#[track_caller]
pub fn unwind_provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
Ok(DatabaseProviderRW(DatabaseProvider::new_unwind_rw(
self.db.tx_mut()?,
self.chain_spec.clone(),
self.static_file_provider.clone(),
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
)))
}
/// State provider for latest block
#[track_caller]
pub fn latest(&self) -> ProviderResult<StateProviderBox> {

View File

@@ -84,6 +84,17 @@ use std::{
};
use tracing::{debug, instrument, trace};
/// Determines the commit order for database operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum CommitOrder {
/// Normal commit order: static files first, then `RocksDB`, then MDBX.
#[default]
Normal,
/// Unwind commit order: MDBX first, then `RocksDB`, then static files.
/// Used for unwind operations to allow recovery by truncating static files on restart.
Unwind,
}
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
@@ -186,6 +197,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Commit order for this provider.
commit_order: CommitOrder,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -315,6 +328,35 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
}
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
/// Internal constructor with commit order.
#[allow(clippy::too_many_arguments)]
fn new_rw_inner(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
commit_order: CommitOrder,
) -> Self {
Self {
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
}
/// Creates a provider with an inner read-write transaction.
#[allow(clippy::too_many_arguments)]
pub fn new_rw(
@@ -327,7 +369,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self {
Self::new_rw_inner(
tx,
chain_spec,
static_file_provider,
@@ -336,10 +378,35 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage_settings,
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
CommitOrder::Normal,
)
}
/// Creates a provider with an inner read-write transaction for unwind operations.
///
/// Uses MDBX-first commit order for `RocksDB` batches.
#[allow(clippy::too_many_arguments)]
pub fn new_unwind_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self::new_rw_inner(
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
CommitOrder::Unwind,
)
}
}
@@ -876,6 +943,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order: CommitOrder::Normal,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3483,7 +3551,8 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// it is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() {
if self.static_file_provider.has_unwind_queued() || self.commit_order == CommitOrder::Unwind
{
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]

View File

@@ -712,6 +712,9 @@ impl RocksDBProvider {
/// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
/// This end key must exceed the maximum encoded key size for any table.
/// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
///
/// # Panics
/// Panics if the provider is in read-only mode.
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
let cf = self.get_cf_handle::<T>()?;