mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
main
...
feat/rocks
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80e2e18611 | ||
|
|
d699891b0b |
@@ -44,7 +44,9 @@ use std::{
|
||||
use tracing::trace;
|
||||
|
||||
mod provider;
|
||||
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
|
||||
pub use provider::{
|
||||
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
|
||||
};
|
||||
|
||||
use super::ProviderNodeTypes;
|
||||
use reth_trie::KeccakKeyHasher;
|
||||
@@ -230,6 +232,21 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns a provider configured for unwind operations (MDBX-first commit order).
|
||||
#[track_caller]
|
||||
pub fn unwind_provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
|
||||
Ok(DatabaseProviderRW(DatabaseProvider::new_unwind_rw(
|
||||
self.db.tx_mut()?,
|
||||
self.chain_spec.clone(),
|
||||
self.static_file_provider.clone(),
|
||||
self.prune_modes.clone(),
|
||||
self.storage.clone(),
|
||||
self.storage_settings.clone(),
|
||||
self.rocksdb_provider.clone(),
|
||||
self.changeset_cache.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
/// State provider for latest block
|
||||
#[track_caller]
|
||||
pub fn latest(&self) -> ProviderResult<StateProviderBox> {
|
||||
|
||||
@@ -84,6 +84,17 @@ use std::{
|
||||
};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
/// Determines the commit order for database operations.
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub enum CommitOrder {
|
||||
/// Normal commit order: static files first, then `RocksDB`, then MDBX.
|
||||
#[default]
|
||||
Normal,
|
||||
/// Unwind commit order: MDBX first, then `RocksDB`, then static files.
|
||||
/// Used for unwind operations to allow recovery by truncating static files on restart.
|
||||
Unwind,
|
||||
}
|
||||
|
||||
/// A [`DatabaseProvider`] that holds a read-only database transaction.
|
||||
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
|
||||
|
||||
@@ -186,6 +197,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_batches: PendingRocksDBBatches,
|
||||
/// Commit order for this provider.
|
||||
commit_order: CommitOrder,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
/// Database provider metrics
|
||||
@@ -315,6 +328,35 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// Internal constructor with commit order.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new_rw_inner(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
commit_order: CommitOrder,
|
||||
) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
commit_order,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a provider with an inner read-write transaction.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_rw(
|
||||
@@ -327,7 +369,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> Self {
|
||||
Self {
|
||||
Self::new_rw_inner(
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
@@ -336,10 +378,35 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
CommitOrder::Normal,
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a provider with an inner read-write transaction for unwind operations.
|
||||
///
|
||||
/// Uses MDBX-first commit order for `RocksDB` batches.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_unwind_rw(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> Self {
|
||||
Self::new_rw_inner(
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
CommitOrder::Unwind,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -876,6 +943,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
commit_order: CommitOrder::Normal,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -3483,7 +3551,8 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
// it is interrupted before the static files commit, we can just
|
||||
// truncate the static files according to the
|
||||
// checkpoints on the next start-up.
|
||||
if self.static_file_provider.has_unwind_queued() {
|
||||
if self.static_file_provider.has_unwind_queued() || self.commit_order == CommitOrder::Unwind
|
||||
{
|
||||
self.tx.commit()?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
|
||||
@@ -712,6 +712,9 @@ impl RocksDBProvider {
|
||||
/// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
|
||||
/// This end key must exceed the maximum encoded key size for any table.
|
||||
/// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the provider is in read-only mode.
|
||||
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
|
||||
let cf = self.get_cf_handle::<T>()?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user