mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6be57ce9e1 | ||
|
|
b997bee71b | ||
|
|
e3c7413747 | ||
|
|
0bf2335c8c | ||
|
|
1f7a5acae1 |
@@ -1351,7 +1351,8 @@ mod rocksdb_tests {
|
||||
|
||||
// Run queries against both backends using EitherReader
|
||||
let mdbx_ro = factory.database_provider_ro().unwrap();
|
||||
let rocks_tx = rocks_provider.tx();
|
||||
// Use `with_assume_history_complete()` since both backends have identical data
|
||||
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
|
||||
|
||||
for (i, query) in queries.iter().enumerate() {
|
||||
// MDBX query via EitherReader
|
||||
@@ -1443,7 +1444,8 @@ mod rocksdb_tests {
|
||||
|
||||
// Run queries against both backends using EitherReader
|
||||
let mdbx_ro = factory.database_provider_ro().unwrap();
|
||||
let rocks_tx = rocks_provider.tx();
|
||||
// Use `with_assume_history_complete()` since both backends have identical data
|
||||
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
|
||||
|
||||
for (i, query) in queries.iter().enumerate() {
|
||||
// MDBX query via EitherReader
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::{
|
||||
changesets_utils::StorageRevertsIter,
|
||||
providers::{
|
||||
database::{chain::ChainStorage, metrics},
|
||||
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
|
||||
rocksdb::{PendingHistory, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
|
||||
static_file::{StaticFileWriteCtx, StaticFileWriter},
|
||||
NodeTypesForProvider, StaticFileProvider,
|
||||
},
|
||||
@@ -186,6 +186,10 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_batches: PendingRocksDBBatches,
|
||||
/// Pending history writes accumulated across `save_blocks()` calls.
|
||||
/// Materialized into `RocksDB` shards at commit time after MDBX commits.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_history: PendingHistory,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
/// Database provider metrics
|
||||
@@ -204,6 +208,7 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
|
||||
.field("rocksdb_provider", &self.rocksdb_provider)
|
||||
.field("changeset_cache", &self.changeset_cache)
|
||||
.field("pending_rocksdb_batches", &"<pending batches>")
|
||||
.field("pending_rocksdb_history", &"<pending history>")
|
||||
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
|
||||
.finish()
|
||||
}
|
||||
@@ -337,6 +342,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
pending_rocksdb_history: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -410,6 +416,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
prune_tx_lookup: self.prune_modes.transaction_lookup,
|
||||
storage_settings: self.cached_storage_settings(),
|
||||
pending_batches: self.pending_rocksdb_batches.clone(),
|
||||
pending_history: self.pending_rocksdb_history.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -876,6 +883,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
pending_rocksdb_history: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -3478,6 +3486,19 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
}
|
||||
|
||||
/// Commit database transaction, static files, and pending `RocksDB` batches.
|
||||
///
|
||||
/// # Commit Order (Critical for Correctness)
|
||||
///
|
||||
/// `RocksDB` history indices must commit AFTER MDBX changesets to prevent a race
|
||||
/// condition where history indices point to non-existent changesets:
|
||||
///
|
||||
/// 1. Static files finalize (headers, transactions, receipts)
|
||||
/// 2. MDBX commits (changesets become visible)
|
||||
/// 3. `RocksDB` batches commit (non-history data like tx hashes)
|
||||
/// 4. `RocksDB` history commits (account/storage history indices)
|
||||
///
|
||||
/// This ordering ensures that when a reader follows a `RocksDB` history index
|
||||
/// to a changeset, the changeset exists in MDBX.
|
||||
fn commit(self) -> ProviderResult<()> {
|
||||
// For unwinding it makes more sense to commit the database first, since if
|
||||
// it is interrupted before the static files commit, we can just
|
||||
@@ -3488,10 +3509,15 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
// Commit non-history batches
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
|
||||
// Commit pending history (after MDBX, so changesets exist)
|
||||
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
|
||||
self.rocksdb_provider.commit_pending_history(history)?;
|
||||
}
|
||||
|
||||
self.static_file_provider.commit()?;
|
||||
@@ -3503,20 +3529,30 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
self.static_file_provider.finalize()?;
|
||||
timings.sf = start.elapsed();
|
||||
|
||||
// CRITICAL: MDBX must commit BEFORE RocksDB history indices.
|
||||
// This prevents a race where history says "look at changeset N" but
|
||||
// the changeset doesn't exist yet.
|
||||
let start = Instant::now();
|
||||
self.tx.commit()?;
|
||||
timings.mdbx = start.elapsed();
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let start = Instant::now();
|
||||
|
||||
// Commit non-history batches (tx hashes, etc.)
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
|
||||
// Commit pending history (after MDBX, so changesets exist)
|
||||
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
|
||||
self.rocksdb_provider.commit_pending_history(history)?;
|
||||
|
||||
timings.rocksdb = start.elapsed();
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
self.tx.commit()?;
|
||||
timings.mdbx = start.elapsed();
|
||||
|
||||
self.metrics.record_commit(&timings);
|
||||
}
|
||||
|
||||
|
||||
@@ -262,7 +262,7 @@ impl RocksDBProvider {
|
||||
let (key, value) = result?;
|
||||
let highest = key.sharded_key.highest_block_number;
|
||||
if highest == u64::MAX {
|
||||
if let Some(max_in_shard) = value.max() &&
|
||||
if let Some(max_in_shard) = value.iter().max() &&
|
||||
max_in_shard > max_sentinel_block
|
||||
{
|
||||
max_sentinel_block = max_in_shard;
|
||||
@@ -416,7 +416,7 @@ impl RocksDBProvider {
|
||||
let highest = key.highest_block_number;
|
||||
if highest == u64::MAX {
|
||||
// Sentinel shard: check the actual max block number in the shard
|
||||
if let Some(max_in_shard) = value.max() &&
|
||||
if let Some(max_in_shard) = value.iter().max() &&
|
||||
max_in_shard > max_sentinel_block
|
||||
{
|
||||
max_sentinel_block = max_in_shard;
|
||||
@@ -1064,7 +1064,7 @@ mod tests {
|
||||
// This simulates a scenario where history tracking started but no shards were completed
|
||||
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
|
||||
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
|
||||
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
|
||||
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
|
||||
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
|
||||
@@ -1078,8 +1078,8 @@ mod tests {
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set a checkpoint that matches the sentinel shard data (max block = 30)
|
||||
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
|
||||
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
|
||||
// where indexing is complete through the checkpoint block
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
@@ -1108,7 +1108,7 @@ mod tests {
|
||||
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
|
||||
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
|
||||
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
|
||||
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
|
||||
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
|
||||
@@ -1122,8 +1122,8 @@ mod tests {
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set a checkpoint that matches the sentinel shard data (max block = 30)
|
||||
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
|
||||
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
|
||||
// where indexing is complete through the checkpoint block
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
@@ -1466,101 +1466,4 @@ mod tests {
|
||||
"Should require unwind to block 50 to rebuild AccountsHistory"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_storages_history_sentinel_shard_rewrite() {
|
||||
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
|
||||
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::StoragesHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let address = Address::random();
|
||||
let storage_key = B256::random();
|
||||
|
||||
// Create a sentinel shard with blocks both above and below checkpoint 100
|
||||
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
|
||||
let sentinel_key = StorageShardedKey::new(address, storage_key, u64::MAX);
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
|
||||
rocksdb.put::<tables::StoragesHistory>(sentinel_key.clone(), &block_list).unwrap();
|
||||
|
||||
// Create a test provider factory for MDBX
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set checkpoint to block 100
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// Run consistency check - should prune the sentinel shard
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
|
||||
|
||||
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
|
||||
let rewritten = rocksdb.get::<tables::StoragesHistory>(sentinel_key).unwrap();
|
||||
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
|
||||
|
||||
let rewritten_list = rewritten.unwrap();
|
||||
let blocks: Vec<u64> = rewritten_list.iter().collect();
|
||||
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_accounts_history_sentinel_shard_rewrite() {
|
||||
use reth_db_api::models::ShardedKey;
|
||||
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::AccountsHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let address = Address::random();
|
||||
|
||||
// Create a sentinel shard with blocks both above and below checkpoint 100
|
||||
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
|
||||
let sentinel_key = ShardedKey::new(address, u64::MAX);
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
|
||||
rocksdb.put::<tables::AccountsHistory>(sentinel_key.clone(), &block_list).unwrap();
|
||||
|
||||
// Create a test provider factory for MDBX
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set checkpoint to block 100
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// Run consistency check - should prune the sentinel shard
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
|
||||
|
||||
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
|
||||
let rewritten = rocksdb.get::<tables::AccountsHistory>(sentinel_key).unwrap();
|
||||
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
|
||||
|
||||
let rewritten_list = rewritten.unwrap();
|
||||
let blocks: Vec<u64> = rewritten_list.iter().collect();
|
||||
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,10 @@ mod invariants;
|
||||
mod metrics;
|
||||
mod provider;
|
||||
|
||||
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
|
||||
#[allow(unused_imports)]
|
||||
pub(crate) use provider::{
|
||||
PendingHistory, PendingHistoryWrites, PendingRocksDBBatches, RocksDBWriteCtx,
|
||||
};
|
||||
pub use provider::{
|
||||
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
|
||||
};
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_db_api::{
|
||||
table::{Compress, Decode, Decompress, Encode, Table},
|
||||
tables, BlockNumberList, DatabaseError,
|
||||
};
|
||||
use reth_primitives_traits::BlockBody as _;
|
||||
use reth_primitives_traits::{AlloyBlockHeader as _, BlockBody as _};
|
||||
use reth_prune_types::PruneMode;
|
||||
use reth_storage_errors::{
|
||||
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
|
||||
@@ -57,6 +57,24 @@ pub struct RocksDBTableStats {
|
||||
pub pending_compaction_bytes: u64,
|
||||
}
|
||||
|
||||
/// Pending history writes that are accumulated across multiple `save_blocks()` calls
|
||||
/// and materialized into `RocksDB` shards at commit time.
|
||||
///
|
||||
/// This exists because [`RocksDBBatch::append_account_history_shard`] and
|
||||
/// [`RocksDBBatch::append_storage_history_shard`] read from committed state, not pending writes.
|
||||
/// By accumulating all history deltas in memory and materializing once at commit time, we ensure
|
||||
/// each key is written exactly once with all its indices.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct PendingHistoryWrites {
|
||||
/// Account history: address -> list of block numbers where the account was modified.
|
||||
pub accounts: BTreeMap<Address, Vec<u64>>,
|
||||
/// Storage history: (address, slot) -> list of block numbers where the storage was modified.
|
||||
pub storages: BTreeMap<(Address, B256), Vec<u64>>,
|
||||
}
|
||||
|
||||
/// Pending history writes type alias.
|
||||
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
|
||||
|
||||
/// Context for `RocksDB` block writes.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RocksDBWriteCtx {
|
||||
@@ -68,6 +86,9 @@ pub(crate) struct RocksDBWriteCtx {
|
||||
pub storage_settings: StorageSettings,
|
||||
/// Pending batches to push to after writing.
|
||||
pub pending_batches: PendingRocksDBBatches,
|
||||
/// Pending history writes accumulated across `save_blocks()` calls.
|
||||
/// Materialized into `RocksDB` shards at commit time.
|
||||
pub pending_history: PendingHistory,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksDBWriteCtx {
|
||||
@@ -77,6 +98,7 @@ impl fmt::Debug for RocksDBWriteCtx {
|
||||
.field("prune_tx_lookup", &self.prune_tx_lookup)
|
||||
.field("storage_settings", &self.storage_settings)
|
||||
.field("pending_batches", &"<pending batches>")
|
||||
.field("pending_history", &"<pending history>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -594,7 +616,7 @@ impl RocksDBProvider {
|
||||
let write_options = WriteOptions::default();
|
||||
let txn_options = OptimisticTransactionOptions::default();
|
||||
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
|
||||
RocksTx { inner, provider: self }
|
||||
RocksTx { inner, provider: self, assume_history_complete: false }
|
||||
}
|
||||
|
||||
/// Creates a new batch for atomic writes.
|
||||
@@ -1057,56 +1079,109 @@ impl RocksDBProvider {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes account history indices for the given blocks.
|
||||
/// Accumulates account history indices for the given blocks.
|
||||
///
|
||||
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
|
||||
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
|
||||
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
|
||||
///
|
||||
/// Only accounts with info changes (nonce, balance, `code_hash`) or lifecycle changes
|
||||
/// (created/destroyed) are included, matching MDBX `AccountChangeSets` behavior.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
fn write_account_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
ctx: &RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
let mut batch = self.batch();
|
||||
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
|
||||
for (block_idx, block) in blocks.iter().enumerate() {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let mut local: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
|
||||
for block in blocks {
|
||||
let block_number = block.recovered_block().number();
|
||||
let bundle = &block.execution_outcome().state;
|
||||
for &address in bundle.state().keys() {
|
||||
account_history.entry(address).or_default().push(block_number);
|
||||
for (&address, account) in bundle.state() {
|
||||
let info_changed = account.is_info_changed();
|
||||
let was_destroyed = account.was_destroyed();
|
||||
|
||||
if info_changed || was_destroyed {
|
||||
local.entry(address).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write account history using proper shard append logic
|
||||
for (address, indices) in account_history {
|
||||
batch.append_account_history_shard(address, indices)?;
|
||||
let mut pending = ctx.pending_history.lock();
|
||||
for (address, mut indices) in local {
|
||||
pending.accounts.entry(address).or_default().append(&mut indices);
|
||||
}
|
||||
ctx.pending_batches.lock().push(batch.into_inner());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes storage history indices for the given blocks.
|
||||
/// Accumulates storage history indices for the given blocks.
|
||||
///
|
||||
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
|
||||
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
|
||||
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
|
||||
///
|
||||
/// Only storage slots with actual value changes are included, matching MDBX behavior.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
fn write_storage_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
ctx: &RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
let mut batch = self.batch();
|
||||
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
|
||||
for (block_idx, block) in blocks.iter().enumerate() {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let mut local: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
|
||||
for block in blocks {
|
||||
let block_number = block.recovered_block().number();
|
||||
let bundle = &block.execution_outcome().state;
|
||||
for (&address, account) in bundle.state() {
|
||||
for &slot in account.storage.keys() {
|
||||
let key = B256::new(slot.to_be_bytes());
|
||||
storage_history.entry((address, key)).or_default().push(block_number);
|
||||
for (&slot, storage_slot) in &account.storage {
|
||||
if storage_slot.is_changed() {
|
||||
let key = B256::new(slot.to_be_bytes());
|
||||
local.entry((address, key)).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write storage history using proper shard append logic
|
||||
for ((address, slot), indices) in storage_history {
|
||||
let mut pending = ctx.pending_history.lock();
|
||||
for ((address, slot), mut indices) in local {
|
||||
pending.storages.entry((address, slot)).or_default().append(&mut indices);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Materializes pending history writes into `RocksDB` shards and commits them.
|
||||
///
|
||||
/// This is called at provider commit time, AFTER MDBX has committed. Each key is
|
||||
/// written exactly once, ensuring [`RocksDBBatch::append_account_history_shard`] and
|
||||
/// [`RocksDBBatch::append_storage_history_shard`] see committed state.
|
||||
pub(crate) fn commit_pending_history(
|
||||
&self,
|
||||
history: PendingHistoryWrites,
|
||||
) -> ProviderResult<()> {
|
||||
if history.accounts.is_empty() && history.storages.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut batch = self.batch();
|
||||
|
||||
// Write account history shards
|
||||
for (address, mut indices) in history.accounts {
|
||||
// Sort and dedup in case of duplicate block numbers from multiple save_blocks calls
|
||||
indices.sort_unstable();
|
||||
indices.dedup();
|
||||
batch.append_account_history_shard(address, indices)?;
|
||||
}
|
||||
|
||||
// Write storage history shards
|
||||
for ((address, slot), mut indices) in history.storages {
|
||||
indices.sort_unstable();
|
||||
indices.dedup();
|
||||
batch.append_storage_history_shard(address, slot, indices)?;
|
||||
}
|
||||
ctx.pending_batches.lock().push(batch.into_inner());
|
||||
|
||||
if !batch.is_empty() {
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1521,10 +1596,14 @@ impl<'a> RocksDBBatch<'a> {
|
||||
/// - Iteration over uncommitted data
|
||||
///
|
||||
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
|
||||
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
|
||||
/// `DbTx`/`DbTxMut` traits directly; use `RocksDB`-specific methods instead.
|
||||
pub struct RocksTx<'db> {
|
||||
inner: Transaction<'db, OptimisticTransactionDB>,
|
||||
provider: &'db RocksDBProvider,
|
||||
/// When true, assume `RocksDB` has complete history (like MDBX) and return `NotYetWritten`
|
||||
/// when querying before the first history entry. When false (default), return
|
||||
/// `MaybeInPlainState` for hybrid storage safety.
|
||||
assume_history_complete: bool,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksTx<'_> {
|
||||
@@ -1534,6 +1613,20 @@ impl fmt::Debug for RocksTx<'_> {
|
||||
}
|
||||
|
||||
impl<'db> RocksTx<'db> {
|
||||
/// Sets the `assume_history_complete` flag to true.
|
||||
///
|
||||
/// When enabled, history queries will return `NotYetWritten` (like MDBX) instead of
|
||||
/// `MaybeInPlainState` when querying before the first history entry.
|
||||
///
|
||||
/// WARNING: Only use in tests where `RocksDB` has complete history (identical to MDBX).
|
||||
/// In production, `RocksDB` may have partial history, so this flag would cause incorrect
|
||||
/// behavior (treating accounts as non-existent when they exist in MDBX).
|
||||
#[cfg(test)]
|
||||
pub const fn with_assume_history_complete(mut self) -> Self {
|
||||
self.assume_history_complete = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
|
||||
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
|
||||
let encoded_key = key.encode();
|
||||
@@ -1700,13 +1793,19 @@ impl<'db> RocksTx<'db> {
|
||||
where
|
||||
T: Table<Value = BlockNumberList>,
|
||||
{
|
||||
// History may be pruned if a lowest available block is set.
|
||||
let is_maybe_pruned = lowest_available_block_number.is_some();
|
||||
// When RocksDB can't find history for an address, we must fall back to plain state.
|
||||
//
|
||||
// This is critical for hybrid storage: RocksDB only contains history indices for blocks
|
||||
// written AFTER it was enabled. Accounts modified before that exist only in MDBX.
|
||||
// Returning `NotYetWritten` would incorrectly treat them as non-existent (nonce 0).
|
||||
//
|
||||
// When `assume_history_complete` is true (for tests with identical data), return
|
||||
// `NotYetWritten` to match MDBX semantics.
|
||||
let fallback = || {
|
||||
Ok(if is_maybe_pruned {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
} else {
|
||||
Ok(if self.assume_history_complete {
|
||||
HistoryInfo::NotYetWritten
|
||||
} else {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
})
|
||||
};
|
||||
|
||||
@@ -1757,11 +1856,27 @@ impl<'db> RocksTx<'db> {
|
||||
false
|
||||
};
|
||||
|
||||
Ok(HistoryInfo::from_lookup(
|
||||
// For RocksDB, we must handle `is_before_first_write` specially.
|
||||
//
|
||||
// When RocksDB says the target block is before the first recorded write:
|
||||
// - MDBX would return `NotYetWritten` (account doesn't exist) because MDBX has full history
|
||||
// - RocksDB should return `MaybeInPlainState` because the account may exist in MDBX from
|
||||
// before RocksDB was enabled
|
||||
//
|
||||
// We use `from_lookup` but then override `NotYetWritten` -> `MaybeInPlainState`
|
||||
// unless `assume_history_complete` is true (for tests with identical data).
|
||||
let result = HistoryInfo::from_lookup(
|
||||
found_block,
|
||||
is_before_first_write,
|
||||
lowest_available_block_number,
|
||||
))
|
||||
);
|
||||
|
||||
Ok(match result {
|
||||
HistoryInfo::NotYetWritten if !self.assume_history_complete => {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
}
|
||||
other => other,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
|
||||
@@ -2257,7 +2372,7 @@ mod tests {
|
||||
|
||||
/// Tests the edge case where block < `lowest_available_block_number`.
|
||||
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
|
||||
/// so we keep this RocksDB-specific test to verify the low-level behavior.
|
||||
/// so we keep this `RocksDB`-specific test to verify the low-level behavior.
|
||||
#[test]
|
||||
fn test_account_history_info_pruned_before_first_entry() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
|
||||
@@ -32,6 +32,13 @@ pub struct RocksDBTableStats {
|
||||
pub pending_compaction_bytes: u64,
|
||||
}
|
||||
|
||||
/// Pending history writes (stub - empty struct).
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct PendingHistoryWrites;
|
||||
|
||||
/// Pending history writes type alias (stub).
|
||||
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
|
||||
|
||||
/// Context for `RocksDB` block writes (stub).
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(dead_code)]
|
||||
@@ -44,6 +51,8 @@ pub(crate) struct RocksDBWriteCtx {
|
||||
pub storage_settings: StorageSettings,
|
||||
/// Pending batches (stub - unused).
|
||||
pub pending_batches: PendingRocksDBBatches,
|
||||
/// Pending history writes (stub - unused).
|
||||
pub pending_history: PendingHistory,
|
||||
}
|
||||
|
||||
/// A stub `RocksDB` provider.
|
||||
|
||||
Reference in New Issue
Block a user