mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
6 Commits
wt-pr2b
...
feat/rocks
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80e2e18611 | ||
|
|
d699891b0b | ||
|
|
4b1c341ced | ||
|
|
865f8f8951 | ||
|
|
492fc20fd1 | ||
|
|
ad9886abb8 |
@@ -162,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
let access_rights =
|
||||
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
|
||||
db_exec!(self.env, tool, N, access_rights, {
|
||||
command.execute(&tool, ctx.task_executor.clone(), &data_dir)?;
|
||||
command.execute(&tool, ctx.task_executor, &data_dir)?;
|
||||
});
|
||||
}
|
||||
Subcommands::StaticFileHeader(command) => {
|
||||
|
||||
@@ -15,7 +15,8 @@ use reth_db_common::{
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
use reth_node_core::args::StageEnum;
|
||||
use reth_provider::{
|
||||
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter,
|
||||
DBProvider, DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
|
||||
StaticFileWriter, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune::PruneSegment;
|
||||
use reth_stages::StageId;
|
||||
@@ -168,8 +169,20 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
)?;
|
||||
}
|
||||
StageEnum::AccountHistory | StageEnum::StorageHistory => {
|
||||
tx.clear::<tables::AccountsHistory>()?;
|
||||
tx.clear::<tables::StoragesHistory>()?;
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
if settings.account_history_in_rocksdb {
|
||||
rocksdb.clear::<tables::AccountsHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::AccountsHistory>()?;
|
||||
}
|
||||
|
||||
if settings.storages_history_in_rocksdb {
|
||||
rocksdb.clear::<tables::StoragesHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::StoragesHistory>()?;
|
||||
}
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
|
||||
@@ -177,7 +190,14 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
|
||||
}
|
||||
StageEnum::TxLookup => {
|
||||
tx.clear::<tables::TransactionHashNumbers>()?;
|
||||
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
|
||||
tool.provider_factory
|
||||
.rocksdb_provider()
|
||||
.clear::<tables::TransactionHashNumbers>()?;
|
||||
} else {
|
||||
tx.clear::<tables::TransactionHashNumbers>()?;
|
||||
}
|
||||
|
||||
reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::TransactionLookup)?;
|
||||
|
||||
@@ -96,12 +96,17 @@ where
|
||||
let tx_range_end = *tx_range.end();
|
||||
|
||||
// Retrieve transactions in the range and calculate their hashes in parallel
|
||||
let hashes = provider
|
||||
let mut hashes = provider
|
||||
.transactions_by_tx_range(tx_range.clone())?
|
||||
.into_par_iter()
|
||||
.map(|transaction| transaction.trie_hash())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
|
||||
// table, which is keyed by hash. Without sorting, each seek would be O(log n) random
|
||||
// access; with sorting, the cursor advances sequentially through the B+tree.
|
||||
hashes.sort_unstable();
|
||||
|
||||
// Number of transactions retrieved from the database should match the tx range count
|
||||
let tx_count = tx_range.count();
|
||||
if hashes.len() != tx_count {
|
||||
|
||||
@@ -412,8 +412,16 @@ impl Transaction<RW> {
|
||||
/// Returns a buffer which can be used to write a value into the item at the
|
||||
/// given key and with the given length. The buffer must be completely
|
||||
/// filled by the caller.
|
||||
///
|
||||
/// This should not be used on dupsort tables.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that the returned buffer is not used after the transaction is
|
||||
/// committed or aborted, or if another value is inserted. To be clear: the second call to
|
||||
/// this function is not permitted while the returned slice is reachable.
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub fn reserve(
|
||||
pub unsafe fn reserve(
|
||||
&self,
|
||||
dbi: ffi::MDBX_dbi,
|
||||
key: impl AsRef<[u8]>,
|
||||
|
||||
@@ -105,8 +105,11 @@ fn test_reserve() {
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let dbi = txn.open_db(None).unwrap().dbi();
|
||||
{
|
||||
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
|
||||
writer.write_all(b"val1").unwrap();
|
||||
unsafe {
|
||||
// SAFETY: the returned slice is used before the transaction is committed or aborted.
|
||||
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
|
||||
writer.write_all(b"val1").unwrap();
|
||||
}
|
||||
}
|
||||
txn.commit().unwrap();
|
||||
|
||||
|
||||
@@ -1351,8 +1351,7 @@ mod rocksdb_tests {
|
||||
|
||||
// Run queries against both backends using EitherReader
|
||||
let mdbx_ro = factory.database_provider_ro().unwrap();
|
||||
// Use `with_assume_history_complete()` since both backends have identical data
|
||||
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
|
||||
let rocks_tx = rocks_provider.tx();
|
||||
|
||||
for (i, query) in queries.iter().enumerate() {
|
||||
// MDBX query via EitherReader
|
||||
@@ -1444,8 +1443,7 @@ mod rocksdb_tests {
|
||||
|
||||
// Run queries against both backends using EitherReader
|
||||
let mdbx_ro = factory.database_provider_ro().unwrap();
|
||||
// Use `with_assume_history_complete()` since both backends have identical data
|
||||
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
|
||||
let rocks_tx = rocks_provider.tx();
|
||||
|
||||
for (i, query) in queries.iter().enumerate() {
|
||||
// MDBX query via EitherReader
|
||||
|
||||
@@ -44,7 +44,9 @@ use std::{
|
||||
use tracing::trace;
|
||||
|
||||
mod provider;
|
||||
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
|
||||
pub use provider::{
|
||||
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
|
||||
};
|
||||
|
||||
use super::ProviderNodeTypes;
|
||||
use reth_trie::KeccakKeyHasher;
|
||||
@@ -230,6 +232,21 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns a provider configured for unwind operations (MDBX-first commit order).
|
||||
#[track_caller]
|
||||
pub fn unwind_provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
|
||||
Ok(DatabaseProviderRW(DatabaseProvider::new_unwind_rw(
|
||||
self.db.tx_mut()?,
|
||||
self.chain_spec.clone(),
|
||||
self.static_file_provider.clone(),
|
||||
self.prune_modes.clone(),
|
||||
self.storage.clone(),
|
||||
self.storage_settings.clone(),
|
||||
self.rocksdb_provider.clone(),
|
||||
self.changeset_cache.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
/// State provider for latest block
|
||||
#[track_caller]
|
||||
pub fn latest(&self) -> ProviderResult<StateProviderBox> {
|
||||
|
||||
@@ -84,6 +84,17 @@ use std::{
|
||||
};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
/// Determines the commit order for database operations.
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub enum CommitOrder {
|
||||
/// Normal commit order: static files first, then `RocksDB`, then MDBX.
|
||||
#[default]
|
||||
Normal,
|
||||
/// Unwind commit order: MDBX first, then `RocksDB`, then static files.
|
||||
/// Used for unwind operations to allow recovery by truncating static files on restart.
|
||||
Unwind,
|
||||
}
|
||||
|
||||
/// A [`DatabaseProvider`] that holds a read-only database transaction.
|
||||
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
|
||||
|
||||
@@ -186,6 +197,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_batches: PendingRocksDBBatches,
|
||||
/// Commit order for this provider.
|
||||
commit_order: CommitOrder,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
/// Database provider metrics
|
||||
@@ -315,6 +328,35 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// Internal constructor with commit order.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new_rw_inner(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
commit_order: CommitOrder,
|
||||
) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
commit_order,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a provider with an inner read-write transaction.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_rw(
|
||||
@@ -327,7 +369,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> Self {
|
||||
Self {
|
||||
Self::new_rw_inner(
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
@@ -336,10 +378,35 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
CommitOrder::Normal,
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a provider with an inner read-write transaction for unwind operations.
|
||||
///
|
||||
/// Uses MDBX-first commit order for `RocksDB` batches.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_unwind_rw(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> Self {
|
||||
Self::new_rw_inner(
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
CommitOrder::Unwind,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -876,6 +943,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
commit_order: CommitOrder::Normal,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -3483,7 +3551,8 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
// it is interrupted before the static files commit, we can just
|
||||
// truncate the static files according to the
|
||||
// checkpoints on the next start-up.
|
||||
if self.static_file_provider.has_unwind_queued() {
|
||||
if self.static_file_provider.has_unwind_queued() || self.commit_order == CommitOrder::Unwind
|
||||
{
|
||||
self.tx.commit()?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
|
||||
@@ -594,7 +594,7 @@ impl RocksDBProvider {
|
||||
let write_options = WriteOptions::default();
|
||||
let txn_options = OptimisticTransactionOptions::default();
|
||||
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
|
||||
RocksTx { inner, provider: self, assume_history_complete: false }
|
||||
RocksTx { inner, provider: self }
|
||||
}
|
||||
|
||||
/// Creates a new batch for atomic writes.
|
||||
@@ -712,6 +712,9 @@ impl RocksDBProvider {
|
||||
/// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
|
||||
/// This end key must exceed the maximum encoded key size for any table.
|
||||
/// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the provider is in read-only mode.
|
||||
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
|
||||
let cf = self.get_cf_handle::<T>()?;
|
||||
|
||||
@@ -1525,10 +1528,6 @@ impl<'a> RocksDBBatch<'a> {
|
||||
pub struct RocksTx<'db> {
|
||||
inner: Transaction<'db, OptimisticTransactionDB>,
|
||||
provider: &'db RocksDBProvider,
|
||||
/// When true, assume `RocksDB` has complete history (like `MDBX`) and return `NotYetWritten`
|
||||
/// when querying before the first history entry. When false (default), return
|
||||
/// `MaybeInPlainState` for hybrid storage safety.
|
||||
assume_history_complete: bool,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksTx<'_> {
|
||||
@@ -1538,16 +1537,6 @@ impl fmt::Debug for RocksTx<'_> {
|
||||
}
|
||||
|
||||
impl<'db> RocksTx<'db> {
|
||||
/// Sets the `assume_history_complete` flag to true.
|
||||
///
|
||||
/// When enabled, history queries will return `NotYetWritten` (like `MDBX`) instead of
|
||||
/// `MaybeInPlainState` when querying before the first history entry. Use this in tests
|
||||
/// where `RocksDB` and `MDBX` have identical data.
|
||||
pub const fn with_assume_history_complete(mut self) -> Self {
|
||||
self.assume_history_complete = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
|
||||
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
|
||||
let encoded_key = key.encode();
|
||||
@@ -1714,19 +1703,10 @@ impl<'db> RocksTx<'db> {
|
||||
where
|
||||
T: Table<Value = BlockNumberList>,
|
||||
{
|
||||
// Determines whether to soften NotYetWritten -> MaybeInPlainState.
|
||||
//
|
||||
// We soften when:
|
||||
// 1. `assume_history_complete` is false (hybrid storage - RocksDB may not have full
|
||||
// history)
|
||||
// 2. OR history may be pruned (`lowest_available_block_number.is_some()`)
|
||||
//
|
||||
// We only return NotYetWritten when we're certain history is complete AND not pruned.
|
||||
let should_soften_not_yet_written =
|
||||
!self.assume_history_complete || lowest_available_block_number.is_some();
|
||||
|
||||
// History may be pruned if a lowest available block is set.
|
||||
let is_maybe_pruned = lowest_available_block_number.is_some();
|
||||
let fallback = || {
|
||||
Ok(if should_soften_not_yet_written {
|
||||
Ok(if is_maybe_pruned {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
} else {
|
||||
HistoryInfo::NotYetWritten
|
||||
@@ -1780,19 +1760,11 @@ impl<'db> RocksTx<'db> {
|
||||
false
|
||||
};
|
||||
|
||||
// Apply the same softening logic to `from_lookup` result.
|
||||
let result = HistoryInfo::from_lookup(
|
||||
Ok(HistoryInfo::from_lookup(
|
||||
found_block,
|
||||
is_before_first_write,
|
||||
lowest_available_block_number,
|
||||
);
|
||||
|
||||
Ok(match result {
|
||||
HistoryInfo::NotYetWritten if should_soften_not_yet_written => {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
}
|
||||
other => other,
|
||||
})
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
|
||||
|
||||
@@ -81,6 +81,13 @@ impl RocksDBProvider {
|
||||
pub const fn table_stats(&self) -> Vec<RocksDBTableStats> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Clears all entries from the specified table (stub implementation).
|
||||
///
|
||||
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn clear<T>(&self) -> ProviderResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseMetrics for RocksDBProvider {
|
||||
|
||||
Reference in New Issue
Block a user