mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
feat/rocks
...
wt-pr2a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
082daa7fc1 |
@@ -162,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
let access_rights =
|
||||
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
|
||||
db_exec!(self.env, tool, N, access_rights, {
|
||||
command.execute(&tool, ctx.task_executor, &data_dir)?;
|
||||
command.execute(&tool, ctx.task_executor.clone(), &data_dir)?;
|
||||
});
|
||||
}
|
||||
Subcommands::StaticFileHeader(command) => {
|
||||
|
||||
@@ -15,8 +15,7 @@ use reth_db_common::{
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
use reth_node_core::args::StageEnum;
|
||||
use reth_provider::{
|
||||
DBProvider, DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
|
||||
StaticFileWriter, StorageSettingsCache,
|
||||
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter,
|
||||
};
|
||||
use reth_prune::PruneSegment;
|
||||
use reth_stages::StageId;
|
||||
@@ -169,20 +168,8 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
)?;
|
||||
}
|
||||
StageEnum::AccountHistory | StageEnum::StorageHistory => {
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
if settings.account_history_in_rocksdb {
|
||||
rocksdb.clear::<tables::AccountsHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::AccountsHistory>()?;
|
||||
}
|
||||
|
||||
if settings.storages_history_in_rocksdb {
|
||||
rocksdb.clear::<tables::StoragesHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::StoragesHistory>()?;
|
||||
}
|
||||
tx.clear::<tables::AccountsHistory>()?;
|
||||
tx.clear::<tables::StoragesHistory>()?;
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
|
||||
@@ -190,14 +177,7 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
|
||||
}
|
||||
StageEnum::TxLookup => {
|
||||
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
|
||||
tool.provider_factory
|
||||
.rocksdb_provider()
|
||||
.clear::<tables::TransactionHashNumbers>()?;
|
||||
} else {
|
||||
tx.clear::<tables::TransactionHashNumbers>()?;
|
||||
}
|
||||
|
||||
tx.clear::<tables::TransactionHashNumbers>()?;
|
||||
reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::TransactionLookup)?;
|
||||
|
||||
@@ -96,17 +96,12 @@ where
|
||||
let tx_range_end = *tx_range.end();
|
||||
|
||||
// Retrieve transactions in the range and calculate their hashes in parallel
|
||||
let mut hashes = provider
|
||||
let hashes = provider
|
||||
.transactions_by_tx_range(tx_range.clone())?
|
||||
.into_par_iter()
|
||||
.map(|transaction| transaction.trie_hash())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort hashes to enable efficient cursor traversal through the TransactionHashNumbers
|
||||
// table, which is keyed by hash. Without sorting, each seek would be O(log n) random
|
||||
// access; with sorting, the cursor advances sequentially through the B+tree.
|
||||
hashes.sort_unstable();
|
||||
|
||||
// Number of transactions retrieved from the database should match the tx range count
|
||||
let tx_count = tx_range.count();
|
||||
if hashes.len() != tx_count {
|
||||
|
||||
@@ -412,16 +412,8 @@ impl Transaction<RW> {
|
||||
/// Returns a buffer which can be used to write a value into the item at the
|
||||
/// given key and with the given length. The buffer must be completely
|
||||
/// filled by the caller.
|
||||
///
|
||||
/// This should not be used on dupsort tables.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that the returned buffer is not used after the transaction is
|
||||
/// committed or aborted, or if another value is inserted. To be clear: the second call to
|
||||
/// this function is not permitted while the returned slice is reachable.
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub unsafe fn reserve(
|
||||
pub fn reserve(
|
||||
&self,
|
||||
dbi: ffi::MDBX_dbi,
|
||||
key: impl AsRef<[u8]>,
|
||||
|
||||
@@ -105,11 +105,8 @@ fn test_reserve() {
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let dbi = txn.open_db(None).unwrap().dbi();
|
||||
{
|
||||
unsafe {
|
||||
// SAFETY: the returned slice is used before the transaction is committed or aborted.
|
||||
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
|
||||
writer.write_all(b"val1").unwrap();
|
||||
}
|
||||
let mut writer = txn.reserve(dbi, b"key1", 4, WriteFlags::empty()).unwrap();
|
||||
writer.write_all(b"val1").unwrap();
|
||||
}
|
||||
txn.commit().unwrap();
|
||||
|
||||
|
||||
@@ -44,9 +44,7 @@ use std::{
|
||||
use tracing::trace;
|
||||
|
||||
mod provider;
|
||||
pub use provider::{
|
||||
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
|
||||
};
|
||||
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
|
||||
|
||||
use super::ProviderNodeTypes;
|
||||
use reth_trie::KeccakKeyHasher;
|
||||
@@ -232,21 +230,6 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns a provider configured for unwind operations (MDBX-first commit order).
|
||||
#[track_caller]
|
||||
pub fn unwind_provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
|
||||
Ok(DatabaseProviderRW(DatabaseProvider::new_unwind_rw(
|
||||
self.db.tx_mut()?,
|
||||
self.chain_spec.clone(),
|
||||
self.static_file_provider.clone(),
|
||||
self.prune_modes.clone(),
|
||||
self.storage.clone(),
|
||||
self.storage_settings.clone(),
|
||||
self.rocksdb_provider.clone(),
|
||||
self.changeset_cache.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
/// State provider for latest block
|
||||
#[track_caller]
|
||||
pub fn latest(&self) -> ProviderResult<StateProviderBox> {
|
||||
|
||||
@@ -84,17 +84,6 @@ use std::{
|
||||
};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
/// Determines the commit order for database operations.
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
pub enum CommitOrder {
|
||||
/// Normal commit order: static files first, then `RocksDB`, then MDBX.
|
||||
#[default]
|
||||
Normal,
|
||||
/// Unwind commit order: MDBX first, then `RocksDB`, then static files.
|
||||
/// Used for unwind operations to allow recovery by truncating static files on restart.
|
||||
Unwind,
|
||||
}
|
||||
|
||||
/// A [`DatabaseProvider`] that holds a read-only database transaction.
|
||||
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
|
||||
|
||||
@@ -197,8 +186,6 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_batches: PendingRocksDBBatches,
|
||||
/// Commit order for this provider.
|
||||
commit_order: CommitOrder,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
/// Database provider metrics
|
||||
@@ -328,35 +315,6 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// Internal constructor with commit order.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new_rw_inner(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
commit_order: CommitOrder,
|
||||
) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
commit_order,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a provider with an inner read-write transaction.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_rw(
|
||||
@@ -369,7 +327,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> Self {
|
||||
Self::new_rw_inner(
|
||||
Self {
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
@@ -378,35 +336,10 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
CommitOrder::Normal,
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a provider with an inner read-write transaction for unwind operations.
|
||||
///
|
||||
/// Uses MDBX-first commit order for `RocksDB` batches.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_unwind_rw(
|
||||
tx: TX,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> Self {
|
||||
Self::new_rw_inner(
|
||||
tx,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
CommitOrder::Unwind,
|
||||
)
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -943,7 +876,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
commit_order: CommitOrder::Normal,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -3551,8 +3483,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
// it is interrupted before the static files commit, we can just
|
||||
// truncate the static files according to the
|
||||
// checkpoints on the next start-up.
|
||||
if self.static_file_provider.has_unwind_queued() || self.commit_order == CommitOrder::Unwind
|
||||
{
|
||||
if self.static_file_provider.has_unwind_queued() {
|
||||
self.tx.commit()?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
|
||||
@@ -712,9 +712,6 @@ impl RocksDBProvider {
|
||||
/// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
|
||||
/// This end key must exceed the maximum encoded key size for any table.
|
||||
/// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the provider is in read-only mode.
|
||||
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
|
||||
let cf = self.get_cf_handle::<T>()?;
|
||||
|
||||
@@ -1072,8 +1069,13 @@ impl RocksDBProvider {
|
||||
for (block_idx, block) in blocks.iter().enumerate() {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let bundle = &block.execution_outcome().state;
|
||||
for &address in bundle.state().keys() {
|
||||
account_history.entry(address).or_default().push(block_number);
|
||||
// Only record accounts where account-info changed OR account was destroyed.
|
||||
// Skip accounts that only had storage changes - this matches MDBX semantics
|
||||
// where AccountsHistory tracks account-level changes, not storage-only touches.
|
||||
for (&address, account) in bundle.state() {
|
||||
if account.is_info_changed() || account.was_destroyed() {
|
||||
account_history.entry(address).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1098,9 +1100,14 @@ impl RocksDBProvider {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let bundle = &block.execution_outcome().state;
|
||||
for (&address, account) in bundle.state() {
|
||||
for &slot in account.storage.keys() {
|
||||
let key = B256::new(slot.to_be_bytes());
|
||||
storage_history.entry((address, key)).or_default().push(block_number);
|
||||
// Only record storage slots that actually changed value.
|
||||
// This matches MDBX semantics where StoragesHistory only tracks
|
||||
// slots with value changes, not just touched slots.
|
||||
for (&slot, storage_slot) in &account.storage {
|
||||
if storage_slot.is_changed() {
|
||||
let key = B256::new(slot.to_be_bytes());
|
||||
storage_history.entry((address, key)).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,13 +81,6 @@ impl RocksDBProvider {
|
||||
pub const fn table_stats(&self) -> Vec<RocksDBTableStats> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Clears all entries from the specified table (stub implementation).
|
||||
///
|
||||
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn clear<T>(&self) -> ProviderResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseMetrics for RocksDBProvider {
|
||||
|
||||
Reference in New Issue
Block a user