Compare commits

...

22 Commits

Author SHA1 Message Date
klkvr
dcefa431a1 wip 2026-04-23 21:41:24 +04:00
Brian Picciano
63f3b1dc97 test(stages): drop reader before execution unwind
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db9e5-51dc-763e-b1ec-4e09fbc9e8e8
Co-authored-by: Amp <amp@ampcode.com>
2026-04-23 12:50:04 +00:00
Brian Picciano
17d58a73d5 fix(provider): wait for readers before unwind static-file commit
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db9e5-51dc-763e-b1ec-4e09fbc9e8e8
Co-authored-by: Amp <amp@ampcode.com>
2026-04-23 12:26:28 +00:00
Brian Picciano
2bb7e7ff15 fix(engine): address payload validator CI failures
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db5d3-fb9b-7309-b8e9-dfc2ca6290a7
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 15:39:22 +00:00
Brian Picciano
ecf87ffac1 Merge remote-tracking branch 'origin/main' into mediocregopher/historical-state-provider-changeset-cache 2026-04-22 14:01:05 +00:00
Brian Picciano
ef63a7a09d fix(provider): avoid extra clones in historical proofs
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db4f1-b5cb-75ee-b759-0c6badfe0ff6
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 13:09:38 +00:00
Brian Picciano
7fd39e6ac2 refactor(provider): rename historical changeset cache field
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db4f1-b5cb-75ee-b759-0c6badfe0ff6
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 13:06:59 +00:00
Brian Picciano
4b9af81f3f refactor(provider): restore overlay comments
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db4f1-b5cb-75ee-b759-0c6badfe0ff6
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 13:04:29 +00:00
Brian Picciano
93cb59ef7a refactor(engine): inline serial provider builder reuse
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db4f1-b5cb-75ee-b759-0c6badfe0ff6
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 11:38:08 +00:00
Brian Picciano
5357dc9fea fix(engine): use state provider for serial state root fallback
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db4f1-b5cb-75ee-b759-0c6badfe0ff6
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 11:31:54 +00:00
Brian Picciano
5561eb0c17 fix(provider): avoid db-tip lookup for unanchored overlays
Co-Authored-By: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
2026-04-21 16:22:34 +00:00
Brian Picciano
79fc5c5d7c fix(engine): remove redundant overlay-builder clones
Co-Authored-By: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
2026-04-21 16:11:16 +00:00
Brian Picciano
93b0568729 refactor(provider): use build_overlay in historical trie paths
Co-Authored-By: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
2026-04-21 15:56:01 +00:00
Brian Picciano
56112da7a7 refactor(provider): warn on old historical overlay reverts
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db0ad-77b5-76ba-8b0b-6683a56f63c5
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 15:46:51 +00:00
Brian Picciano
2fde20fe2a refactor(provider): preserve historical overlay prefix sets
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db0ad-77b5-76ba-8b0b-6683a56f63c5
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 15:44:44 +00:00
Brian Picciano
73f8609233 refactor(provider): add historical overlay helper
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db0ad-77b5-76ba-8b0b-6683a56f63c5
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 15:41:07 +00:00
Brian Picciano
52eec466d7 refactor(provider): keep overlay caching in factory
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db074-fac6-76d2-8950-3e84b3c895a4
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 15:27:43 +00:00
Brian Picciano
f652994b35 refactor(provider): extract overlay builder
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db074-fac6-76d2-8950-3e84b3c895a4
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 15:19:18 +00:00
Brian Picciano
5bd1a9932f fix(provider): restore minimal historical cache plumbing
Restore the historical provider behavior to match origin/main while keeping the ChangesetCache constructor plumbing and shared reorg-validator cache wiring.

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db05b-f81c-754e-9229-81b57d6d32f5
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 14:22:06 +00:00
Brian Picciano
58d2a3e815 refactor(engine): simplify state provider builder cache wiring
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db037-4b3b-77c1-9832-42d24c58560e
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 14:00:29 +00:00
Brian Picciano
31c0f96feb test(node): tighten ethereum e2e helper provider bounds
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db037-4b3b-77c1-9832-42d24c58560e
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 13:39:01 +00:00
Brian Picciano
c11348fb8c fix(provider): require changeset cache for historical state providers
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db016-ea05-74e4-a085-27f4173a1f9f
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 13:13:46 +00:00
11 changed files with 458 additions and 326 deletions

View File

@@ -974,7 +974,7 @@ mod tests {
use reth_evm_ethereum::EthEvmConfig;
use reth_primitives_traits::{Account, Recovered, StorageEntry};
use reth_provider::{
providers::{BlockchainProvider, OverlayStateProviderFactory},
providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
test_utils::create_test_provider_factory_with_chain_spec,
ChainSpecProvider, HashingWriter,
};
@@ -1249,7 +1249,10 @@ mod tests {
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
OverlayStateProviderFactory::new(
provider_factory,
OverlayBuilder::new(ChangesetCache::new()),
),
&TreeConfig::default(),
None, // No BAL for test
);

View File

@@ -894,7 +894,8 @@ mod tests {
use super::*;
use alloy_primitives::{keccak256, Address, B256, U256};
use reth_provider::{
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
providers::{OverlayBuilder, OverlayStateProviderFactory},
test_utils::create_test_provider_factory,
};
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::proof_task::ProofTaskCtx;
@@ -983,8 +984,10 @@ mod tests {
fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
let runtime = reth_tasks::Runtime::test();
let provider_factory = create_test_provider_factory();
let overlay_factory =
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new());
let overlay_factory = OverlayStateProviderFactory::new(
provider_factory,
OverlayBuilder::new(ChangesetCache::new()),
);
let proof_worker_handle =
ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);

View File

@@ -77,10 +77,11 @@ use reth_primitives_traits::{
RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
providers::{OverlayBuilder, OverlayStateProviderFactory},
BlockExecutionOutput, BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory,
DatabaseProviderROFactory, HashedPostStateProvider, ProviderError, PruneCheckpointReader,
StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateReader,
StorageChangeSetReader, StorageSettingsCache,
};
use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState, StateRoot};
@@ -523,16 +524,18 @@ where
// Create overlay factory for payload processor (StateRootTask path needs it for
// multiproofs)
let provider_factory = self.provider.clone();
let overlay_builder = OverlayBuilder::new(self.changeset_cache.clone())
.with_block_hash(Some(anchor_hash))
.with_lazy_overlay(lazy_overlay);
let overlay_factory =
OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
.with_block_hash(Some(anchor_hash))
.with_lazy_overlay(lazy_overlay);
OverlayStateProviderFactory::new(provider_factory.clone(), overlay_builder.clone());
// Spawn the appropriate processor based on strategy
let mut handle = ensure_ok!(self.spawn_payload_processor(
env.clone(),
txs,
provider_builder,
provider_builder.clone(),
overlay_factory.clone(),
strategy,
block_access_list,
@@ -664,7 +667,7 @@ where
let task_result = ensure_ok_post_block!(
self.await_state_root_with_timeout(
&mut handle,
overlay_factory.clone(),
provider_builder.clone(),
&hashed_state,
),
block
@@ -685,22 +688,6 @@ where
trie_debug_recorders = debug_recorders;
}
// Compare trie updates with serial computation if configured
if self.config.always_compare_trie_updates() {
let _has_diff = self.compare_trie_updates_with_serial(
overlay_factory.clone(),
&hashed_state,
trie_updates.as_ref().clone(),
);
#[cfg(feature = "trie-debug")]
if _has_diff {
Self::write_trie_debug_recorders(
block.header().number(),
&trie_debug_recorders,
);
}
}
// we double check the state root here for good measure
if state_root == block.header().state_root() {
maybe_state_root = Some((state_root, trie_updates, elapsed))
@@ -727,7 +714,11 @@ where
}
StateRootStrategy::Parallel => {
debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
match self.compute_state_root_parallel(
provider_factory.clone(),
overlay_builder.clone(),
&hashed_state,
) {
Ok(result) => {
let elapsed = root_time.elapsed();
info!(
@@ -763,10 +754,20 @@ where
}
let (root, updates) = ensure_ok_post_block!(
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
Self::compute_state_root_serial_with_provider(
provider_builder.clone(),
&hashed_state
),
block
);
self.compare_trie_updates_with_serial(
provider_factory.clone(),
overlay_builder.clone(),
&hashed_state,
updates.clone(),
);
if state_root_task_failed {
self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
}
@@ -1087,7 +1088,8 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn compute_state_root_parallel(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
provider_factory: P,
overlay_builder: OverlayBuilder,
hashed_state: &LazyHashedPostState,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let hashed_state = hashed_state.get();
@@ -1095,34 +1097,32 @@ where
// need to use the prefix sets which were generated from it to indicate to the
// ParallelStateRoot which parts of the trie need to be recomputed.
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
let overlay_factory = OverlayStateProviderFactory::new(
provider_factory,
overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted()),
);
ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
.incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
///
/// Uses an overlay factory which provides the state of the parent block, along with the
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
/// Uses the same provider construction path as main execution and computes the state root and
/// trie updates for this block directly via
/// [`reth_provider::StateRootProvider::state_root_with_updates`].
fn compute_state_root_serial(
overlay_factory: OverlayStateProviderFactory<P>,
state_provider: StateProviderBox,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let hashed_state = hashed_state.get();
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// StateRoot which parts of the trie need to be recomputed.
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
state_provider.state_root_with_updates(hashed_state.get().clone())
}
let provider = overlay_factory.database_provider_ro()?;
Ok(StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets)
.root_with_updates()?)
fn compute_state_root_serial_with_provider(
provider_builder: StateProviderBuilder<N, P>,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let provider = provider_builder.build()?;
provider.state_root_with_updates(hashed_state.get().clone())
}
/// Awaits the state root from the background task, with an optional timeout fallback.
@@ -1147,7 +1147,7 @@ where
fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
&self,
handle: &mut PayloadHandle<Tx, Err, R>,
overlay_factory: OverlayStateProviderFactory<P>,
state_provider_builder: StateProviderBuilder<N, P>,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
let Some(timeout) = self.config.state_root_task_timeout() else {
@@ -1172,10 +1172,11 @@ where
let (seq_tx, seq_rx) =
std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
let seq_overlay = overlay_factory;
let seq_hashed_state = hashed_state.clone();
self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
let result = state_provider_builder.build().and_then(|provider| {
Self::compute_state_root_serial(provider, &seq_hashed_state)
});
let _ = seq_tx.send(result);
});
@@ -1239,54 +1240,48 @@ where
/// updates.
fn compare_trie_updates_with_serial(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
provider_factory: P,
overlay_builder: OverlayBuilder,
hashed_state: &LazyHashedPostState,
task_trie_updates: TrieUpdates,
) -> bool {
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
Ok((serial_root, serial_trie_updates)) => {
debug!(
target: "engine::tree::payload_validator",
?serial_root,
"Serial state root computation finished for comparison"
);
let hashed_state = hashed_state.get();
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// StateRoot which parts of the trie need to be recomputed.
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_builder =
overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
let overlay_factory = OverlayStateProviderFactory::new(provider_factory, overlay_builder);
// Get a database provider to use as trie cursor factory
match overlay_factory.database_provider_ro() {
Ok(provider) => {
match super::trie_updates::compare_trie_updates(
&provider,
task_trie_updates,
serial_trie_updates,
) {
Ok(has_diff) => return has_diff,
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
"Error comparing trie updates"
);
return true;
}
}
}
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
"Failed to get database provider for trie update comparison"
);
}
}
}
let provider = overlay_factory.database_provider_ro().unwrap();
let (serial_root, serial_trie_updates) = StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.unwrap();
debug!(
target: "engine::tree::payload_validator",
?serial_root,
"Serial state root computation finished for comparison"
);
match super::trie_updates::compare_trie_updates(
&provider,
task_trie_updates,
serial_trie_updates,
) {
Ok(has_diff) => return has_diff,
Err(err) => {
warn!(
target: "engine::tree::payload_validator",
%err,
"Failed to compute serial state root for comparison"
"Error comparing trie updates"
);
return true;
}
}
false
@@ -2026,10 +2021,12 @@ where
state: &EngineApiTreeState<N>,
) -> Option<StateRootHandle> {
let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, state);
let overlay_factory =
OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
let overlay_factory = OverlayStateProviderFactory::new(
self.provider.clone(),
OverlayBuilder::new(self.changeset_cache.clone())
.with_block_hash(Some(anchor_hash))
.with_lazy_overlay(lazy_overlay);
.with_lazy_overlay(lazy_overlay),
);
Some(self.payload_processor.spawn_state_root(
overlay_factory,

View File

@@ -205,10 +205,12 @@ impl EngineNodeLauncher {
ctx.blockchain_db().clone(),
ctx.components().evm_config().clone(),
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
validator_builder
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.build_tree_validator(
&add_ons_ctx,
engine_tree_config.clone(),
changeset_cache.clone(),
)
.await
},
node_config.debug.reorg_frequency,

View File

@@ -1020,41 +1020,46 @@ mod tests {
done: true
} if processed == total && total == block.gas_used);
let provider = factory.provider().unwrap();
{
let provider = factory.provider().unwrap();
// check post state
let account1 = address!("0x1000000000000000000000000000000000000000");
let account1_info =
Account { balance: U256::ZERO, nonce: 0x00, bytecode_hash: Some(code_hash) };
let account2 = address!("0x2adc25665018aa1fe0e6bc666dac8fc2697ff9ba");
let account2_info = Account {
balance: U256::from(0x1bc16d674ece94bau128),
nonce: 0x00,
bytecode_hash: None,
};
let account3 = address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b");
let account3_info = Account {
balance: U256::from(0x3635c9adc5de996b46u128),
nonce: 0x01,
bytecode_hash: None,
};
// check post state
let account1 = address!("0x1000000000000000000000000000000000000000");
let account1_info =
Account { balance: U256::ZERO, nonce: 0x00, bytecode_hash: Some(code_hash) };
let account2 = address!("0x2adc25665018aa1fe0e6bc666dac8fc2697ff9ba");
let account2_info = Account {
balance: U256::from(0x1bc16d674ece94bau128),
nonce: 0x00,
bytecode_hash: None,
};
let account3 = address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b");
let account3_info = Account {
balance: U256::from(0x3635c9adc5de996b46u128),
nonce: 0x01,
bytecode_hash: None,
};
// assert accounts
assert!(
matches!(provider.basic_account(&account1), Ok(Some(acc)) if acc == account1_info)
);
assert!(
matches!(provider.basic_account(&account2), Ok(Some(acc)) if acc == account2_info)
);
assert!(
matches!(provider.basic_account(&account3), Ok(Some(acc)) if acc == account3_info)
);
// assert storage
// Get on dupsort would return only first value. This is good enough for this test.
assert!(matches!(
provider.tx_ref().get::<tables::PlainStorageState>(account1),
Ok(Some(entry)) if entry.key == B256::with_last_byte(1) && entry.value == U256::from(2)
));
// assert accounts
assert!(matches!(
provider.basic_account(&account1),
Ok(Some(acc)) if acc == account1_info
));
assert!(matches!(
provider.basic_account(&account2),
Ok(Some(acc)) if acc == account2_info
));
assert!(matches!(
provider.basic_account(&account3),
Ok(Some(acc)) if acc == account3_info
));
// assert storage
// Get on dupsort would return only first value. This is good enough for this test.
assert!(matches!(
provider.tx_ref().get::<tables::PlainStorageState>(account1),
Ok(Some(entry)) if entry.key == B256::with_last_byte(1) && entry.value == U256::from(2)
));
}
let mut provider = factory.database_provider_rw().unwrap();
let mut stage = stage();

View File

@@ -264,8 +264,8 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
/// This keeps MDBX as the first durable step so an interrupted unwind can be recovered by
/// truncating static files from checkpoints on the next startup.
///
/// For `storage_v2`, this waits after the MDBX commit so readers holding older MDBX-visible
/// views cannot overlap the `RocksDB` unwind.
/// This waits after the MDBX commit so readers holding older MDBX-visible views cannot overlap
/// later cross-store unwind steps.
///
/// Historical `storage_v2` reads ignore `RocksDB` history entries above their MDBX-visible tip,
/// so no additional post-`RocksDB` wait is needed before static-file commit.
@@ -274,11 +274,11 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
let reader_txn_tracker = self.reader_txn_tracker.clone();
self.tx.commit()?;
if storage_v2 {
if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
reader_txn_tracker.wait_for_pre_commit_readers();
}
if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
reader_txn_tracker.wait_for_pre_commit_readers();
}
if storage_v2 {
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
@@ -316,8 +316,8 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
let storage_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
let mut state_provider =
HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
if let Some(prune_checkpoint_block_number) =
@@ -933,8 +933,9 @@ impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for Databa
self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let changeset_cache = self.changeset_cache.clone();
let mut state_provider = HistoricalStateProvider::new(self, block_number);
let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
@@ -3960,7 +3961,6 @@ mod tests {
#[test]
fn unwind_commit_waits_for_pre_commit_readers() {
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());
let reader = factory.provider().unwrap();
let provider_rw = factory.unwind_provider_rw().unwrap();
@@ -4970,7 +4970,9 @@ mod tests {
assert_eq!(account_cs[0].address, address);
let historical_value =
HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
.storage(address, slot_key)
.unwrap();
assert_eq!(historical_value, None);
}

View File

@@ -20,7 +20,7 @@ pub use state::{
HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks,
},
latest::{LatestStateProvider, LatestStateProviderRef},
overlay::{OverlayStateProvider, OverlayStateProviderFactory},
overlay::{OverlayBuilder, OverlayStateProvider, OverlayStateProviderFactory},
};
mod consistent_view;

View File

@@ -1,3 +1,4 @@
use super::overlay::{Overlay, OverlayBuilder, OverlaySource};
use crate::{
AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider,
ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider,
@@ -13,8 +14,9 @@ use reth_db_api::{
};
use reth_primitives_traits::{Account, Bytecode};
use reth_storage_api::{
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider,
StorageChangeSetReader, StorageRootProvider, StorageSettingsCache,
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, PruneCheckpointReader,
StageCheckpointReader, StateProofProvider, StorageChangeSetReader, StorageRootProvider,
StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{
@@ -23,16 +25,15 @@ use reth_trie::{
trie_cursor::InMemoryTrieCursorFactory,
updates::TrieUpdates,
witness::TrieWitness,
AccountProof, ExecutionWitnessMode, HashedPostState, HashedPostStateSorted, HashedStorage,
KeccakKeyHasher, MultiProof, MultiProofTargets, StateRoot, StorageMultiProof, StorageRoot,
TrieInput, TrieInputSorted,
AccountProof, ExecutionWitnessMode, HashedPostState, HashedStorage, KeccakKeyHasher,
MultiProof, MultiProofTargets, StateRoot, StorageMultiProof, StorageRoot, TrieInput,
TrieInputSorted,
};
use reth_trie_db::{
hashed_storage_from_reverts_with_provider, DatabaseProof, DatabaseStateRoot,
DatabaseStorageProof, DatabaseStorageRoot,
ChangesetCache, DatabaseProof, DatabaseStateRoot, DatabaseStorageProof, DatabaseStorageRoot,
};
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
type DbStateRoot<'a, TX, A> = StateRoot<
reth_trie_db::DatabaseTrieCursorFactory<&'a TX, A>,
@@ -123,6 +124,8 @@ impl HistoryInfo {
pub struct HistoricalStateProviderRef<'b, Provider> {
/// Database provider
provider: &'b Provider,
/// Changeset cache handle for retrieving trie changesets.
changeset_cache: ChangesetCache,
/// Block number is main index for the history state of accounts and storages.
block_number: BlockNumber,
/// Lowest blocks at which different parts of the state are available.
@@ -133,8 +136,17 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
HistoricalStateProviderRef<'b, Provider>
{
/// Create new `StateProvider` for historical block number
pub fn new(provider: &'b Provider, block_number: BlockNumber) -> Self {
Self { provider, block_number, lowest_available_blocks: Default::default() }
pub fn new(
provider: &'b Provider,
block_number: BlockNumber,
changeset_cache: ChangesetCache,
) -> Self {
Self {
provider,
changeset_cache,
block_number,
lowest_available_blocks: Default::default(),
}
}
/// Create new `StateProvider` for historical block number and lowest block numbers at which
@@ -143,8 +155,9 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
provider: &'b Provider,
block_number: BlockNumber,
lowest_available_blocks: LowestAvailableBlocks,
changeset_cache: ChangesetCache,
) -> Self {
Self { provider, block_number, lowest_available_blocks }
Self { provider, changeset_cache, block_number, lowest_available_blocks }
}
/// Lookup an account in the `AccountsHistory` table using `EitherReader`.
@@ -253,17 +266,11 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
Ok(tip.saturating_sub(self.block_number) > limit)
}
/// Retrieve revert hashed state for this history provider.
fn revert_state(&self) -> ProviderResult<HashedPostStateSorted>
fn build_overlay(&self, input: TrieInputSorted) -> ProviderResult<TrieInputSorted>
where
Provider: StorageSettingsCache,
Provider:
BlockHashReader + PruneCheckpointReader + StageCheckpointReader + StorageSettingsCache,
{
if !self.lowest_available_blocks.is_account_history_available(self.block_number) ||
!self.lowest_available_blocks.is_storage_history_available(self.block_number)
{
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
if self.check_distance_against_limit(EPOCH_SLOTS)? {
tracing::warn!(
target: "providers::historical_sp",
@@ -272,27 +279,22 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
);
}
reth_trie_db::from_reverts_auto(self.provider, self.block_number..)
}
// Historical providers expose state at the start of `self.block_number`, so the overlay
// builder needs the previous canonical block hash to preserve those semantics.
let target_block = self.block_number.saturating_sub(1);
let block_hash = self
.provider
.block_hash(target_block)?
.ok_or_else(|| ProviderError::HeaderNotFound(target_block.into()))?;
/// Retrieve revert hashed storage for this history provider and target address.
fn revert_storage(&self, address: Address) -> ProviderResult<HashedStorage>
where
Provider: StorageSettingsCache,
{
if !self.lowest_available_blocks.is_storage_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
let TrieInputSorted { nodes, state, prefix_sets } = input;
let overlay_builder = OverlayBuilder::new(self.changeset_cache.clone())
.with_block_hash(Some(block_hash))
.with_overlay_source(Some(OverlaySource::Immediate { trie: nodes, state }));
let Overlay { trie_updates, hashed_post_state } =
overlay_builder.build_overlay(self.provider)?;
if self.check_distance_against_limit(EPOCH_SLOTS * 10)? {
tracing::warn!(
target: "providers::historical_sp",
target = self.block_number,
"Attempt to calculate storage root for an old block might result in OOM"
);
}
hashed_storage_from_reverts_with_provider(self.provider, address, self.block_number)
Ok(TrieInputSorted::new(trie_updates, hashed_post_state, prefix_sets))
}
/// Set the lowest block number at which the account history is available.
@@ -378,26 +380,25 @@ impl<
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader
+ BlockHashReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache,
> StateRootProvider for HistoricalStateProviderRef<'_, Provider>
{
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_state = self.revert_state()?;
let hashed_state_sorted = hashed_state.into_sorted();
revert_state.extend_ref_and_sort(&hashed_state_sorted);
Ok(<DbStateRoot<'_, _, A>>::overlay_root(self.tx(), &revert_state)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(hashed_state),
))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes(self.tx(), input)?)
})
}
fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes(
self.tx(),
TrieInputSorted::from_unsorted(input),
)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(input))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes(self.tx(), input)?)
})
}
@@ -406,10 +407,10 @@ impl<
hashed_state: HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_state = self.revert_state()?;
let hashed_state_sorted = hashed_state.into_sorted();
revert_state.extend_ref_and_sort(&hashed_state_sorted);
Ok(<DbStateRoot<'_, _, A>>::overlay_root_with_updates(self.tx(), &revert_state)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(hashed_state),
))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes_with_updates(self.tx(), input)?)
})
}
@@ -418,12 +419,8 @@ impl<
input: TrieInput,
) -> ProviderResult<(B256, TrieUpdates)> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes_with_updates(
self.tx(),
TrieInputSorted::from_unsorted(input),
)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(input))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes_with_updates(self.tx(), input)?)
})
}
}
@@ -433,6 +430,9 @@ impl<
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader
+ BlockHashReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache,
> StorageRootProvider for HistoricalStateProviderRef<'_, Provider>
{
@@ -442,9 +442,20 @@ impl<
hashed_storage: HashedStorage,
) -> ProviderResult<B256> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_storage = self.revert_storage(address)?;
revert_storage.extend(&hashed_storage);
<DbStorageRoot<'_, _, A>>::overlay_root(self.tx(), address, revert_storage)
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(HashedPostState::from_hashed_storage(
alloy_primitives::keccak256(address),
hashed_storage,
)),
))?;
let hashed_storage = input
.state
.account_storages()
.get(&alloy_primitives::keccak256(address))
.cloned()
.unwrap_or_default()
.into();
<DbStorageRoot<'_, _, A>>::overlay_root(self.tx(), address, hashed_storage)
.map_err(|err| ProviderError::Database(err.into()))
})
}
@@ -456,13 +467,24 @@ impl<
hashed_storage: HashedStorage,
) -> ProviderResult<reth_trie::StorageProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_storage = self.revert_storage(address)?;
revert_storage.extend(&hashed_storage);
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(HashedPostState::from_hashed_storage(
alloy_primitives::keccak256(address),
hashed_storage,
)),
))?;
let hashed_storage = input
.state
.account_storages()
.get(&alloy_primitives::keccak256(address))
.cloned()
.unwrap_or_default()
.into();
<DbStorageProof<'_, _, A>>::overlay_storage_proof(
self.tx(),
address,
slot,
revert_storage,
hashed_storage,
)
.map_err(ProviderError::from)
})
@@ -475,13 +497,24 @@ impl<
hashed_storage: HashedStorage,
) -> ProviderResult<StorageMultiProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_storage = self.revert_storage(address)?;
revert_storage.extend(&hashed_storage);
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(HashedPostState::from_hashed_storage(
alloy_primitives::keccak256(address),
hashed_storage,
)),
))?;
let hashed_storage = input
.state
.account_storages()
.get(&alloy_primitives::keccak256(address))
.cloned()
.unwrap_or_default()
.into();
<DbStorageProof<'_, _, A>>::overlay_storage_multiproof(
self.tx(),
address,
slots,
revert_storage,
hashed_storage,
)
.map_err(ProviderError::from)
})
@@ -493,6 +526,9 @@ impl<
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader
+ BlockHashReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache,
> StateProofProvider for HistoricalStateProviderRef<'_, Provider>
{
@@ -504,8 +540,13 @@ impl<
slots: &[B256],
) -> ProviderResult<AccountProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
let TrieInputSorted { nodes, state, prefix_sets } =
self.build_overlay(TrieInputSorted::from_unsorted(input))?;
let input = TrieInput::new(
Arc::unwrap_or_clone(nodes).into(),
Arc::unwrap_or_clone(state).into(),
prefix_sets,
);
let proof = <DbProof<'_, _, A> as DatabaseProof>::from_tx(self.tx());
proof.overlay_account_proof(input, address, slots).map_err(ProviderError::from)
})
@@ -517,8 +558,13 @@ impl<
targets: MultiProofTargets,
) -> ProviderResult<MultiProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
let TrieInputSorted { nodes, state, prefix_sets } =
self.build_overlay(TrieInputSorted::from_unsorted(input))?;
let input = TrieInput::new(
Arc::unwrap_or_clone(nodes).into(),
Arc::unwrap_or_clone(state).into(),
prefix_sets,
);
let proof = <DbProof<'_, _, A> as DatabaseProof>::from_tx(self.tx());
proof.overlay_multiproof(input, targets).map_err(ProviderError::from)
})
@@ -531,21 +577,19 @@ impl<
mode: ExecutionWitnessMode,
) -> ProviderResult<Vec<Bytes>> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
let nodes_sorted = input.nodes.into_sorted();
let state_sorted = input.state.into_sorted();
let TrieInputSorted { nodes, state, prefix_sets } =
self.build_overlay(TrieInputSorted::from_unsorted(input))?;
let witness = TrieWitness::new(
InMemoryTrieCursorFactory::new(
reth_trie_db::DatabaseTrieCursorFactory::<_, A>::new(self.tx()),
&nodes_sorted,
nodes.as_ref(),
),
HashedPostStateCursorFactory::new(
reth_trie_db::DatabaseHashedCursorFactory::new(self.tx()),
&state_sorted,
state.as_ref(),
),
)
.with_prefix_sets_mut(input.prefix_sets)
.with_prefix_sets_mut(prefix_sets)
.with_execution_witness_mode(mode);
let witness =
if mode.is_canonical() { witness } else { witness.always_include_root_node() };
@@ -572,6 +616,8 @@ impl<
+ BlockHashReader
+ ChangeSetReader
+ StorageChangeSetReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
@@ -602,6 +648,8 @@ impl<Provider: DBProvider + BlockNumReader> BytecodeReader
pub struct HistoricalStateProvider<Provider> {
/// Database provider.
provider: Provider,
/// Changeset cache handle for retrieving trie changesets.
changeset_cache: ChangesetCache,
/// State at the block number is the main indexer of the state.
block_number: BlockNumber,
/// Lowest blocks at which different parts of the state are available.
@@ -612,8 +660,17 @@ impl<Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + BlockNumR
HistoricalStateProvider<Provider>
{
/// Create new `StateProvider` for historical block number
pub fn new(provider: Provider, block_number: BlockNumber) -> Self {
Self { provider, block_number, lowest_available_blocks: Default::default() }
pub fn new(
provider: Provider,
block_number: BlockNumber,
changeset_cache: ChangesetCache,
) -> Self {
Self {
provider,
changeset_cache,
block_number,
lowest_available_blocks: Default::default(),
}
}
/// Set the lowest block number at which the account history is available.
@@ -636,17 +693,18 @@ impl<Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + BlockNumR
/// Returns a new provider that takes the `TX` as reference
#[inline(always)]
const fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> {
fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> {
HistoricalStateProviderRef::new_with_lowest_available_blocks(
&self.provider,
self.block_number,
self.lowest_available_blocks,
self.changeset_cache.clone(),
)
}
}
// Delegates all provider impls to [HistoricalStateProviderRef]
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageChangeSetReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageChangeSetReader + PruneCheckpointReader + StageCheckpointReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
/// Lowest blocks at which different parts of the state are available.
/// They may be [Some] if pruning is enabled.
@@ -779,9 +837,11 @@ mod tests {
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{
BlockHashReader, BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
NodePrimitivesProvider, StorageChangeSetReader, StorageSettingsCache,
NodePrimitivesProvider, PruneCheckpointReader, StageCheckpointReader,
StorageChangeSetReader, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderError;
use reth_trie_db::ChangesetCache;
const ADDRESS: Address = address!("0x0000000000000000000000000000000000000001");
const HIGHER_ADDRESS: Address = address!("0x0000000000000000000000000000000000000005");
@@ -796,6 +856,8 @@ mod tests {
+ BlockHashReader
+ ChangeSetReader
+ StorageChangeSetReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
@@ -870,48 +932,49 @@ mod tests {
// run
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 2).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 2, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at3
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at3
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at7
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at7
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at10
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at10
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at15
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_plain
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).basic_account(&HIGHER_ADDRESS),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.basic_account(&HIGHER_ADDRESS),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).basic_account(&HIGHER_ADDRESS),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).basic_account(&HIGHER_ADDRESS),
Ok(Some(acc)) if acc == higher_acc_plain
));
}
@@ -970,43 +1033,46 @@ mod tests {
// run
assert!(matches!(
HistoricalStateProviderRef::new(&db, 0).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 0, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at15.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_plain.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.storage(HIGHER_ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).storage(HIGHER_ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == higher_entry_plain.value
));
}
@@ -1025,6 +1091,7 @@ mod tests {
account_history_block_number: Some(3),
storage_history_block_number: Some(3),
},
ChangesetCache::new(),
);
assert!(matches!(
provider.account_history_lookup(ADDRESS),
@@ -1044,6 +1111,7 @@ mod tests {
account_history_block_number: Some(2),
storage_history_block_number: Some(2),
},
ChangesetCache::new(),
);
assert!(matches!(
provider.account_history_lookup(ADDRESS),
@@ -1063,6 +1131,7 @@ mod tests {
account_history_block_number: Some(1),
storage_history_block_number: Some(1),
},
ChangesetCache::new(),
);
assert!(matches!(
provider.account_history_lookup(ADDRESS),
@@ -1143,43 +1212,46 @@ mod tests {
let db = factory.provider().unwrap();
assert!(matches!(
HistoricalStateProviderRef::new(&db, 0).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 0, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at15.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_plain.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.storage(HIGHER_ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).storage(HIGHER_ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == higher_entry_plain.value
));
}
@@ -1283,43 +1355,46 @@ mod tests {
let db = factory.provider().unwrap();
assert!(matches!(
HistoricalStateProviderRef::new(&db, 0).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 0, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(7)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(7)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(10)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(10)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(15)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(100)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.storage(HIGHER_ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).storage(HIGHER_ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(1000)
));
}

View File

@@ -51,9 +51,9 @@ pub(crate) struct OverlayStateProviderMetrics {
/// Contains all fields required to initialize an [`OverlayStateProvider`].
#[derive(Debug, Clone)]
struct Overlay {
trie_updates: Arc<TrieUpdatesSorted>,
hashed_post_state: Arc<HashedPostStateSorted>,
pub(super) struct Overlay {
pub(super) trie_updates: Arc<TrieUpdatesSorted>,
pub(super) hashed_post_state: Arc<HashedPostStateSorted>,
}
/// Source of overlay data for [`OverlayStateProviderFactory`].
@@ -85,14 +85,12 @@ impl OverlaySource {
}
}
/// Factory for creating overlay state providers with optional reverts and overlays.
/// Builder for calculating trie and hashed-state overlays.
///
/// This factory allows building an `OverlayStateProvider` whose DB state has been reverted to a
/// particular block, and/or with additional overlay information added on top.
/// This stores the overlay configuration and the logic for resolving immediate/lazy overlays and
/// collecting reverts. It is intentionally independent from any provider factory or overlay cache.
#[derive(Debug, Clone)]
pub struct OverlayStateProviderFactory<F> {
/// The underlying database provider factory
factory: F,
pub struct OverlayBuilder {
/// Optional block hash for collecting reverts
block_hash: Option<B256>,
/// Optional overlay source (lazy or immediate).
@@ -101,21 +99,16 @@ pub struct OverlayStateProviderFactory<F> {
changeset_cache: ChangesetCache,
/// Metrics for tracking provider operations
metrics: OverlayStateProviderMetrics,
/// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
/// then a new entry will get added to this, but in most cases only one entry is present.
overlay_cache: Arc<DashMap<BlockNumber, Overlay>>,
}
impl<F> OverlayStateProviderFactory<F> {
/// Create a new overlay state provider factory
pub fn new(factory: F, changeset_cache: ChangesetCache) -> Self {
impl OverlayBuilder {
/// Create a new overlay builder.
pub fn new(changeset_cache: ChangesetCache) -> Self {
Self {
factory,
block_hash: None,
overlay_source: None,
changeset_cache,
metrics: OverlayStateProviderMetrics::default(),
overlay_cache: Default::default(),
}
}
@@ -131,8 +124,6 @@ impl<F> OverlayStateProviderFactory<F> {
/// This overlay will be applied on top of any reverts applied via `with_block_hash`.
pub fn with_overlay_source(mut self, source: Option<OverlaySource>) -> Self {
self.overlay_source = source;
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
self
}
@@ -141,8 +132,6 @@ impl<F> OverlayStateProviderFactory<F> {
/// Convenience method that wraps the lazy overlay in `OverlaySource::Lazy`.
pub fn with_lazy_overlay(mut self, lazy_overlay: Option<LazyOverlay>) -> Self {
self.overlay_source = lazy_overlay.map(OverlaySource::Lazy);
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
self
}
@@ -158,8 +147,6 @@ impl<F> OverlayStateProviderFactory<F> {
trie: Arc::new(TrieUpdatesSorted::default()),
state,
});
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
}
self
}
@@ -186,23 +173,9 @@ impl<F> OverlayStateProviderFactory<F> {
});
}
}
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
self
}
}
impl<F> OverlayStateProviderFactory<F>
where
F: DatabaseProviderFactory,
F::Provider: StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StorageSettingsCache,
{
/// Resolves the effective overlay (trie updates, hashed state).
///
/// If an overlay source is set, it is resolved (blocking if lazy).
@@ -217,10 +190,13 @@ where
}
/// Returns the block number for [`Self`]'s `block_hash` field, if any.
fn get_requested_block_number(
fn get_requested_block_number<Provider>(
&self,
provider: &F::Provider,
) -> ProviderResult<Option<BlockNumber>> {
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: BlockNumReader,
{
if let Some(block_hash) = self.block_hash {
Ok(Some(
provider
@@ -234,7 +210,10 @@ where
/// Returns the block which is at the tip of the DB, i.e. the block which the state tables of
/// the DB are currently synced to.
fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
fn get_db_tip_block_number<Provider>(&self, provider: &Provider) -> ProviderResult<BlockNumber>
where
Provider: StageCheckpointReader,
{
provider
.get_stage_checkpoint(StageId::Finish)?
.as_ref()
@@ -247,12 +226,15 @@ where
///
/// Takes into account both the stage checkpoint and the prune checkpoint to determine the
/// available data range.
fn reverts_required(
fn reverts_required<Provider>(
&self,
provider: &F::Provider,
provider: &Provider,
db_tip_block: BlockNumber,
requested_block: BlockNumber,
) -> ProviderResult<bool> {
) -> ProviderResult<bool>
where
Provider: PruneCheckpointReader,
{
// If the requested block is the DB tip then there won't be any reverts necessary, and we
// can simply return Ok.
if db_tip_block == requested_block {
@@ -288,11 +270,20 @@ where
skip_all,
fields(%db_tip_block)
)]
fn calculate_overlay(
fn calculate_overlay<Provider>(
&self,
provider: &F::Provider,
provider: &Provider,
db_tip_block: BlockNumber,
) -> ProviderResult<Overlay> {
) -> ProviderResult<Overlay>
where
Provider: ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ StorageSettingsCache,
{
//
// Set up variables we'll use for recording metrics. There's two different code-paths here,
// and we want to make sure both record metrics, so we do metrics recording after.
@@ -404,23 +395,74 @@ where
Ok(Overlay { trie_updates, hashed_post_state })
}
/// Fetches an [`Overlay`] from the cache based on the current db tip block. If there is no
/// cached value then this calculates the [`Overlay`] and populates the cache.
/// Builds the effective overlay for the given provider.
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
fn get_overlay(&self, provider: &F::Provider) -> ProviderResult<Overlay> {
// No anchor block — just resolve the in-memory overlay directly.
pub(super) fn build_overlay<Provider>(&self, provider: &Provider) -> ProviderResult<Overlay>
where
Provider: StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StorageSettingsCache,
{
if self.block_hash.is_none() {
let (trie_updates, hashed_post_state) = self.resolve_overlays();
return Ok(Overlay { trie_updates, hashed_post_state })
}
let db_tip_block = self.get_db_tip_block_number(provider)?;
self.calculate_overlay(provider, db_tip_block)
}
}
/// Factory for creating overlay state providers with optional reverts and overlays.
///
/// This factory allows building an `OverlayStateProvider` whose DB state has been reverted to a
/// particular block, and/or with additional overlay information added on top.
#[derive(Debug, Clone)]
pub struct OverlayStateProviderFactory<F> {
/// The underlying database provider factory
factory: F,
/// Overlay builder containing the configuration and overlay calculation logic.
overlay_builder: OverlayBuilder,
/// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
/// then a new entry will get added to this, but in most cases only one entry is present.
overlay_cache: Arc<DashMap<BlockNumber, Overlay>>,
}
impl<F> OverlayStateProviderFactory<F> {
/// Create a new overlay state provider factory
pub fn new(factory: F, overlay_builder: OverlayBuilder) -> Self {
Self { factory, overlay_builder, overlay_cache: Default::default() }
}
/// Fetches an [`Overlay`] from the cache based on the current db tip block. If there is no
/// cached value then this calculates the [`Overlay`] and populates the cache.
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
fn get_overlay<Provider>(&self, provider: &Provider) -> ProviderResult<Overlay>
where
Provider: StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StorageSettingsCache,
{
// No anchor block — just resolve the in-memory overlay directly.
if self.overlay_builder.block_hash.is_none() {
return self.overlay_builder.build_overlay(provider)
}
let db_tip_block = self.overlay_builder.get_db_tip_block_number(provider)?;
let overlay = match self.overlay_cache.entry(db_tip_block) {
dashmap::Entry::Occupied(entry) => entry.get().clone(),
dashmap::Entry::Vacant(entry) => {
self.metrics.overlay_cache_misses.increment(1);
let overlay = self.calculate_overlay(provider, db_tip_block)?;
self.overlay_builder.metrics.overlay_cache_misses.increment(1);
let overlay = self.overlay_builder.build_overlay(provider)?;
entry.insert(overlay.clone());
overlay
}
@@ -451,14 +493,14 @@ where
let provider = {
let start = Instant::now();
let res = self.factory.database_provider_ro()?;
self.metrics.create_provider_duration.record(start.elapsed());
self.overlay_builder.metrics.create_provider_duration.record(start.elapsed());
res
};
let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
let is_v2 = provider.cached_storage_settings().is_v2();
self.metrics.database_provider_ro_duration.record(overall_start.elapsed());
self.overlay_builder.metrics.database_provider_ro_duration.record(overall_start.elapsed());
Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state, is_v2))
}
}

View File

@@ -1164,7 +1164,7 @@ mod tests {
let changeset_cache = reth_trie_db::ChangesetCache::new();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(
provider_factory,
changeset_cache,
reth_provider::providers::OverlayBuilder::new(changeset_cache),
);
let ctx = test_ctx(factory);

View File

@@ -283,9 +283,10 @@ mod tests {
async fn random_parallel_root() {
let factory = create_test_provider_factory();
let changeset_cache = reth_trie_db::ChangesetCache::new();
let overlay_builder = reth_provider::providers::OverlayBuilder::new(changeset_cache);
let mut overlay_factory = reth_provider::providers::OverlayStateProviderFactory::new(
factory.clone(),
changeset_cache,
overlay_builder.clone(),
);
let mut rng = rand::rng();
@@ -362,8 +363,10 @@ mod tests {
}
let prefix_sets = hashed_state.construct_prefix_sets();
overlay_factory =
overlay_factory.with_hashed_state_overlay(Some(Arc::new(hashed_state.into_sorted())));
overlay_factory = reth_provider::providers::OverlayStateProviderFactory::new(
factory,
overlay_builder.with_hashed_state_overlay(Some(Arc::new(hashed_state.into_sorted()))),
);
assert_eq!(
ParallelStateRoot::new(overlay_factory, prefix_sets.freeze(), runtime)