Compare commits

..

1 Commits

Author SHA1 Message Date
yongkangc
082daa7fc1 fix(rocksdb): filter account/storage history to match MDBX semantics 2026-01-22 12:58:59 +00:00
9 changed files with 31 additions and 153 deletions

View File

@@ -162,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
let access_rights =
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
db_exec!(self.env, tool, N, access_rights, {
command.execute(&tool, ctx.task_executor, &data_dir)?;
command.execute(&tool, ctx.task_executor.clone(), &data_dir)?;
});
}
Subcommands::StaticFileHeader(command) => {

View File

@@ -15,8 +15,7 @@ use reth_db_common::{
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
use reth_node_core::args::StageEnum;
use reth_provider::{
DBProvider, DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
StaticFileWriter, StorageSettingsCache,
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter,
};
use reth_prune::PruneSegment;
use reth_stages::StageId;
@@ -169,20 +168,8 @@ impl<C: ChainSpecParser> Command<C> {
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
let settings = provider_rw.cached_storage_settings();
let rocksdb = tool.provider_factory.rocksdb_provider();
if settings.account_history_in_rocksdb {
rocksdb.clear::<tables::AccountsHistory>()?;
} else {
tx.clear::<tables::AccountsHistory>()?;
}
if settings.storages_history_in_rocksdb {
rocksdb.clear::<tables::StoragesHistory>()?;
} else {
tx.clear::<tables::StoragesHistory>()?;
}
tx.clear::<tables::AccountsHistory>()?;
tx.clear::<tables::StoragesHistory>()?;
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
@@ -190,14 +177,7 @@ impl<C: ChainSpecParser> Command<C> {
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
}
StageEnum::TxLookup => {
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
tool.provider_factory
.rocksdb_provider()
.clear::<tables::TransactionHashNumbers>()?;
} else {
tx.clear::<tables::TransactionHashNumbers>()?;
}
tx.clear::<tables::TransactionHashNumbers>()?;
reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
reset_stage_checkpoint(tx, StageId::TransactionLookup)?;

View File

@@ -96,17 +96,12 @@ where
let tx_range_end = *tx_range.end();
// Retrieve transactions in the range and calculate their hashes in parallel
let mut hashes = provider
let hashes = provider
.transactions_by_tx_range(tx_range.clone())?
.into_par_iter()
.map(|transaction| transaction.trie_hash())
.collect::<Vec<_>>();
// Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
// table, which is keyed by hash. Without sorting, each seek would be O(log n) random
// access; with sorting, the cursor advances sequentially through the B+tree.
hashes.sort_unstable();
// Number of transactions retrieved from the database should match the tx range count
let tx_count = tx_range.count();
if hashes.len() != tx_count {

View File

@@ -412,16 +412,8 @@ impl Transaction<RW> {
/// Returns a buffer which can be used to write a value into the item at the
/// given key and with the given length. The buffer must be completely
/// filled by the caller.
///
/// This should not be used on dupsort tables.
///
/// # Safety
///
/// The caller must ensure that the returned buffer is not used after the transaction is
/// committed or aborted, or if another value is inserted. To be clear: the second call to
/// this function is not permitted while the returned slice is reachable.
#[allow(clippy::mut_from_ref)]
pub unsafe fn reserve(
pub fn reserve(
&self,
dbi: ffi::MDBX_dbi,
key: impl AsRef<[u8]>,

View File

@@ -105,11 +105,8 @@ fn test_reserve() {
let txn = env.begin_rw_txn().unwrap();
let dbi = txn.open_db(None).unwrap().dbi();
{
unsafe {
// SAFETY: the returned slice is used before the transaction is committed or aborted.
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
writer.write_all(b"val1").unwrap();
}
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
writer.write_all(b"val1").unwrap();
}
txn.commit().unwrap();

View File

@@ -44,9 +44,7 @@ use std::{
use tracing::trace;
mod provider;
pub use provider::{
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
};
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;
@@ -232,21 +230,6 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
)))
}
/// Returns a provider configured for unwind operations (MDBX-first commit order).
#[track_caller]
pub fn unwind_provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
Ok(DatabaseProviderRW(DatabaseProvider::new_unwind_rw(
self.db.tx_mut()?,
self.chain_spec.clone(),
self.static_file_provider.clone(),
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
)))
}
/// State provider for latest block
#[track_caller]
pub fn latest(&self) -> ProviderResult<StateProviderBox> {

View File

@@ -84,17 +84,6 @@ use std::{
};
use tracing::{debug, instrument, trace};
/// Determines the commit order for database operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum CommitOrder {
/// Normal commit order: static files first, then `RocksDB`, then MDBX.
#[default]
Normal,
/// Unwind commit order: MDBX first, then `RocksDB`, then static files.
/// Used for unwind operations to allow recovery by truncating static files on restart.
Unwind,
}
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
@@ -197,8 +186,6 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Commit order for this provider.
commit_order: CommitOrder,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -328,35 +315,6 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
}
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
/// Internal constructor with commit order.
#[allow(clippy::too_many_arguments)]
fn new_rw_inner(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
commit_order: CommitOrder,
) -> Self {
Self {
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
}
/// Creates a provider with an inner read-write transaction.
#[allow(clippy::too_many_arguments)]
pub fn new_rw(
@@ -369,7 +327,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self::new_rw_inner(
Self {
tx,
chain_spec,
static_file_provider,
@@ -378,35 +336,10 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage_settings,
rocksdb_provider,
changeset_cache,
CommitOrder::Normal,
)
}
/// Creates a provider with an inner read-write transaction for unwind operations.
///
/// Uses MDBX-first commit order for `RocksDB` batches.
#[allow(clippy::too_many_arguments)]
pub fn new_unwind_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self::new_rw_inner(
tx,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
CommitOrder::Unwind,
)
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
}
}
@@ -943,7 +876,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
commit_order: CommitOrder::Normal,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3551,8 +3483,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// it is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() || self.commit_order == CommitOrder::Unwind
{
if self.static_file_provider.has_unwind_queued() {
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]

View File

@@ -712,9 +712,6 @@ impl RocksDBProvider {
/// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
/// This end key must exceed the maximum encoded key size for any table.
/// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
///
/// # Panics
/// Panics if the provider is in read-only mode.
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
let cf = self.get_cf_handle::<T>()?;
@@ -1072,8 +1069,13 @@ impl RocksDBProvider {
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
// Only record accounts where account-info changed OR account was destroyed.
// Skip accounts that only had storage changes - this matches MDBX semantics
// where AccountsHistory tracks account-level changes, not storage-only touches.
for (&address, account) in bundle.state() {
if account.is_info_changed() || account.was_destroyed() {
account_history.entry(address).or_default().push(block_number);
}
}
}
@@ -1098,9 +1100,14 @@ impl RocksDBProvider {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
// Only record storage slots that actually changed value.
// This matches MDBX semantics where StoragesHistory only tracks
// slots with value changes, not just touched slots.
for (&slot, storage_slot) in &account.storage {
if storage_slot.is_changed() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
}

View File

@@ -81,13 +81,6 @@ impl RocksDBProvider {
pub const fn table_stats(&self) -> Vec<RocksDBTableStats> {
Vec::new()
}
/// Clears all entries from the specified table (stub implementation).
///
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
pub const fn clear<T>(&self) -> ProviderResult<()> {
Ok(())
}
}
impl DatabaseMetrics for RocksDBProvider {