fix(provider): add CommitOrder for RocksDB/MDBX unwind atomicity (#21311)

This commit is contained in:
joshieDo
2026-01-22 14:54:47 +00:00
committed by GitHub
parent ad5b533ad1
commit 12cf3d685b
4 changed files with 103 additions and 8 deletions

View File

@@ -15,8 +15,8 @@ use reth_db_common::{
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
use reth_node_core::args::StageEnum;
use reth_provider::{
DBProvider, DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
StaticFileWriter, StorageSettingsCache,
DBProvider, RocksDBProviderFactory, StaticFileProviderFactory, StaticFileWriter,
StorageSettingsCache,
};
use reth_prune::PruneSegment;
use reth_stages::StageId;
@@ -95,7 +95,7 @@ impl<C: ChainSpecParser> Command<C> {
}
}
let provider_rw = tool.provider_factory.database_provider_rw()?;
let provider_rw = tool.provider_factory.unwind_provider_rw()?;
let tx = provider_rw.tx_ref();
match self.stage {

View File

@@ -316,7 +316,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
let _locked_sf_producer = self.static_file_producer.lock();
let mut provider_rw =
self.provider_factory.database_provider_rw()?.disable_long_read_transaction_safety();
self.provider_factory.unwind_provider_rw()?.disable_long_read_transaction_safety();
for stage in unwind_pipeline {
let stage_id = stage.id();
@@ -396,7 +396,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
stage.post_unwind_commit()?;
provider_rw = self.provider_factory.database_provider_rw()?;
provider_rw = self.provider_factory.unwind_provider_rw()?;
}
Err(err) => {
self.event_sender.notify(PipelineEvent::Error { stage_id });

View File

@@ -44,7 +44,9 @@ use std::{
use tracing::trace;
mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
pub use provider::{
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
};
use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;
@@ -230,6 +232,25 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
)))
}
/// Returns a provider with a created `DbTxMut` inside, configured for unwind operations.
/// Uses unwind commit order (MDBX first, then `RocksDB`, then static files) to allow
/// recovery by truncating static files on restart if interrupted.
#[track_caller]
pub fn unwind_provider_rw(
&self,
) -> ProviderResult<DatabaseProvider<<N::DB as Database>::TXMut, N>> {
Ok(DatabaseProvider::new_unwind_rw(
self.db.tx_mut()?,
self.chain_spec.clone(),
self.static_file_provider.clone(),
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
))
}
/// State provider for latest block
#[track_caller]
pub fn latest(&self) -> ProviderResult<StateProviderBox> {

View File

@@ -84,6 +84,24 @@ use std::{
};
use tracing::{debug, instrument, trace};
/// Determines the commit order for database operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum CommitOrder {
/// Normal commit order: static files first, then `RocksDB`, then MDBX.
#[default]
Normal,
/// Unwind commit order: MDBX first, then `RocksDB`, then static files.
/// Used for unwind operations to allow recovery by truncating static files on restart.
Unwind,
}
impl CommitOrder {
/// Returns true if this is unwind commit order.
pub const fn is_unwind(&self) -> bool {
matches!(self, Self::Unwind)
}
}
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
@@ -186,6 +204,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Commit order for database operations.
commit_order: CommitOrder,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -204,6 +224,7 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("rocksdb_provider", &self.rocksdb_provider)
.field("changeset_cache", &self.changeset_cache)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("commit_order", &self.commit_order)
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
@@ -317,7 +338,7 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
/// Creates a provider with an inner read-write transaction.
#[allow(clippy::too_many_arguments)]
pub fn new_rw(
fn new_rw_inner(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
@@ -326,6 +347,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
commit_order: CommitOrder,
) -> Self {
Self {
tx,
@@ -337,10 +359,61 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
}
/// Creates a provider with an inner read-write transaction using normal commit order.
#[allow(clippy::too_many_arguments)]
pub fn new_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self::new_rw_inner(
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
CommitOrder::Normal,
)
}
/// Creates a provider with an inner read-write transaction using unwind commit order.
#[allow(clippy::too_many_arguments)]
pub fn new_unwind_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self::new_rw_inner(
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
CommitOrder::Unwind,
)
}
}
impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
@@ -876,6 +949,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order: CommitOrder::Normal,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3483,7 +3557,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// it is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() {
if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]