feat(trie): in-memory trie changesets (#20997)

This commit is contained in:
Brian Picciano
2026-01-16 02:06:31 +01:00
committed by GitHub
parent e25411c32b
commit a74cb9cbc3
59 changed files with 1768 additions and 985 deletions

9
Cargo.lock generated
View File

@@ -8411,6 +8411,7 @@ dependencies = [
name = "reth-engine-service"
version = "1.10.0"
dependencies = [
"alloy-eips",
"futures",
"pin-project",
"reth-chainspec",
@@ -8432,6 +8433,7 @@ dependencies = [
"reth-prune",
"reth-stages-api",
"reth-tasks",
"reth-trie-db",
"tokio",
"tokio-stream",
]
@@ -8495,6 +8497,7 @@ dependencies = [
"reth-testing-utils",
"reth-tracing",
"reth-trie",
"reth-trie-db",
"reth-trie-parallel",
"reth-trie-sparse",
"reth-trie-sparse-parallel",
@@ -9387,6 +9390,7 @@ dependencies = [
"reth-tokio-util",
"reth-tracing",
"reth-transaction-pool",
"reth-trie-db",
"secp256k1 0.30.0",
"serde_json",
"tempfile",
@@ -9876,6 +9880,7 @@ dependencies = [
"reth-tracing",
"reth-transaction-pool",
"reth-trie-common",
"reth-trie-db",
"revm",
"serde",
"serde_json",
@@ -11194,14 +11199,18 @@ dependencies = [
"alloy-consensus",
"alloy-primitives",
"alloy-rlp",
"metrics",
"parking_lot",
"proptest",
"proptest-arbitrary-interop",
"reth-chainspec",
"reth-db",
"reth-db-api",
"reth-execution-errors",
"reth-metrics",
"reth-primitives-traits",
"reth-provider",
"reth-stages-types",
"reth-storage-api",
"reth-storage-errors",
"reth-trie",

View File

@@ -15,7 +15,7 @@ use reth_db_common::{
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
use reth_node_core::args::StageEnum;
use reth_provider::{
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter, TrieWriter,
DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter,
};
use reth_prune::PruneSegment;
use reth_stages::StageId;
@@ -167,10 +167,6 @@ impl<C: ChainSpecParser> Command<C> {
None,
)?;
}
StageEnum::MerkleChangeSets => {
provider_rw.clear_trie_changesets()?;
reset_stage_checkpoint(tx, StageId::MerkleChangeSets)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
tx.clear::<tables::AccountsHistory>()?;
tx.clear::<tables::StoragesHistory>()?;

View File

@@ -550,7 +550,6 @@ impl PruneConfig {
/// - `Option<PruneMode>` fields: set from `other` only if `self` is `None`.
/// - `block_interval`: set from `other` only if `self.block_interval ==
/// DEFAULT_BLOCK_INTERVAL`.
/// - `merkle_changesets`: always set from `other`.
/// - `receipts_log_filter`: set from `other` only if `self` is empty and `other` is non-empty.
pub fn merge(&mut self, other: Self) {
let Self {
@@ -563,7 +562,6 @@ impl PruneConfig {
account_history,
storage_history,
bodies_history,
merkle_changesets,
receipts_log_filter,
},
} = other;
@@ -580,8 +578,6 @@ impl PruneConfig {
self.segments.account_history = self.segments.account_history.or(account_history);
self.segments.storage_history = self.segments.storage_history.or(storage_history);
self.segments.bodies_history = self.segments.bodies_history.or(bodies_history);
// Merkle changesets is not optional; always take the value from `other`
self.segments.merkle_changesets = merkle_changesets;
if self.segments.receipts_log_filter.0.is_empty() && !receipts_log_filter.0.is_empty() {
self.segments.receipts_log_filter = receipts_log_filter;
@@ -1091,7 +1087,6 @@ receipts = { distance = 16384 }
account_history: None,
storage_history: Some(PruneMode::Before(5000)),
bodies_history: None,
merkle_changesets: PruneMode::Before(0),
receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([(
Address::random(),
PruneMode::Full,
@@ -1108,7 +1103,6 @@ receipts = { distance = 16384 }
account_history: Some(PruneMode::Distance(2000)),
storage_history: Some(PruneMode::Distance(3000)),
bodies_history: None,
merkle_changesets: PruneMode::Distance(10000),
receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([
(Address::random(), PruneMode::Distance(1000)),
(Address::random(), PruneMode::Before(2000)),
@@ -1127,7 +1121,6 @@ receipts = { distance = 16384 }
assert_eq!(config1.segments.receipts, Some(PruneMode::Distance(1000)));
assert_eq!(config1.segments.account_history, Some(PruneMode::Distance(2000)));
assert_eq!(config1.segments.storage_history, Some(PruneMode::Before(5000)));
assert_eq!(config1.segments.merkle_changesets, PruneMode::Distance(10000));
assert_eq!(config1.segments.receipts_log_filter, original_filter);
}

View File

@@ -25,6 +25,7 @@ reth-tasks.workspace = true
reth-node-types.workspace = true
reth-chainspec.workspace = true
reth-engine-primitives.workspace = true
reth-trie-db.workspace = true
# async
futures.workspace = true
@@ -40,6 +41,8 @@ reth-evm-ethereum.workspace = true
reth-exex-types.workspace = true
reth-primitives-traits.workspace = true
reth-node-ethereum.workspace = true
reth-trie-db.workspace = true
alloy-eips.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

View File

@@ -26,6 +26,7 @@ use reth_provider::{
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use reth_trie_db::ChangesetCache;
use std::{
pin::Pin,
sync::Arc,
@@ -84,6 +85,7 @@ where
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
) -> Self
where
V: EngineValidator<N::Payload>,
@@ -109,6 +111,7 @@ where
tree_config,
engine_kind,
evm_config,
changeset_cache,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
@@ -156,6 +159,7 @@ mod tests {
};
use reth_prune::Pruner;
use reth_tasks::TokioTaskExecutor;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -188,6 +192,8 @@ mod tests {
let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
let evm_config = EthEvmConfig::new(chain_spec.clone());
let changeset_cache = ChangesetCache::new();
let engine_validator = BasicEngineValidator::new(
blockchain_db.clone(),
consensus.clone(),
@@ -195,6 +201,7 @@ mod tests {
engine_payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
@@ -214,6 +221,7 @@ mod tests {
TreeConfig::default(),
sync_metrics_tx,
evm_config,
changeset_cache,
);
}
}

View File

@@ -34,6 +34,7 @@ reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
reth-trie.workspace = true
reth-trie-db.workspace = true
# alloy
alloy-evm.workspace = true
@@ -133,6 +134,7 @@ test-utils = [
"reth-static-file",
"reth-tracing",
"reth-trie/test-utils",
"reth-trie-db/test-utils",
"reth-trie-sparse/test-utils",
"reth-prune-types?/test-utils",
"reth-trie-parallel/test-utils",

View File

@@ -239,7 +239,10 @@ fn bench_state_root(c: &mut Criterion) {
std::convert::identity,
),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider),
OverlayStateProviderFactory::new(
provider,
reth_trie_db::ChangesetCache::new(),
),
&TreeConfig::default(),
None,
);

View File

@@ -159,6 +159,7 @@ where
self.metrics.save_blocks_block_count.record(block_count as f64);
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block)
}
}

View File

@@ -30,11 +30,13 @@ use reth_payload_primitives::{
};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_provider::{
BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
StateProviderFactory, StateReader, TransactionVariant, TrieReader,
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, HashedPostStateProvider,
ProviderError, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_trie_db::ChangesetCache;
use revm::state::EvmState;
use state::TreeState;
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
@@ -271,6 +273,8 @@ where
engine_kind: EngineApiKind,
/// The EVM configuration.
evm_config: C,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
}
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
@@ -295,6 +299,7 @@ where
.field("metrics", &self.metrics)
.field("engine_kind", &self.engine_kind)
.field("evm_config", &self.evm_config)
.field("changeset_cache", &self.changeset_cache)
.finish()
}
}
@@ -307,11 +312,12 @@ where
+ StateProviderFactory
+ StateReader<Receipt = N::Receipt>
+ HashedPostStateProvider
+ TrieReader
+ Clone
+ 'static,
<P as DatabaseProviderFactory>::Provider:
BlockReader<Block = N::Block, Header = N::BlockHeader>,
<P as DatabaseProviderFactory>::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
+ StageCheckpointReader
+ ChangeSetReader
+ BlockNumReader,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
@@ -331,6 +337,7 @@ where
config: TreeConfig,
engine_kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
) -> Self {
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
@@ -351,6 +358,7 @@ where
incoming_tx,
engine_kind,
evm_config,
changeset_cache,
}
}
@@ -370,6 +378,7 @@ where
config: TreeConfig,
kind: EngineApiKind,
evm_config: C,
changeset_cache: ChangesetCache,
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
{
let best_block_number = provider.best_block_number().unwrap_or(0);
@@ -401,6 +410,7 @@ where
config,
kind,
evm_config,
changeset_cache,
);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Engine Task".to_string()).spawn(|| task.run()).unwrap();
@@ -1365,6 +1375,21 @@ where
debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
// Evict trie changesets for blocks below the finalized block, but keep at least 64 blocks
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
let min_threshold = last_persisted_block_number.saturating_sub(64);
let eviction_threshold = finalized.number.min(min_threshold);
debug!(
target: "engine::tree",
last_persisted = last_persisted_block_number,
finalized_number = finalized.number,
eviction_threshold,
"Evicting changesets below threshold"
);
self.changeset_cache.evict(eviction_threshold);
}
self.on_new_persisted_block()?;
Ok(())
}
@@ -1818,6 +1843,7 @@ where
/// or the database. If the required historical data (such as trie change sets) has been
/// pruned for a given block, this operation will return an error. On archive nodes, it
/// can retrieve any block.
#[instrument(level = "debug", target = "engine::tree", skip(self))]
fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
// check memory first
@@ -1835,7 +1861,18 @@ where
.get_state(block.header().number())?
.ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
let hashed_state = self.provider.hashed_post_state(execution_output.state());
let trie_updates = self.provider.get_block_trie_updates(block.number())?;
debug!(
target: "engine::tree",
number = ?block.number(),
"computing block trie updates",
);
let db_provider = self.provider.database_provider_ro()?;
let trie_updates = reth_trie_db::compute_block_trie_updates(
&self.changeset_cache,
&db_provider,
block.number(),
)?;
let sorted_hashed_state = Arc::new(hashed_state.into_sorted());
let sorted_trie_updates = Arc::new(trie_updates);

View File

@@ -886,6 +886,7 @@ mod tests {
use reth_revm::db::BundleState;
use reth_testing_utils::generators;
use reth_trie::{test_utils::state_root, HashedPostState};
use reth_trie_db::ChangesetCache;
use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
use std::sync::Arc;
@@ -1141,7 +1142,7 @@ mod tests {
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
&TreeConfig::default(),
None, // No BAL for test
);

View File

@@ -1318,9 +1318,10 @@ mod tests {
use reth_provider::{
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, TrieReader,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox,
};
use reth_trie::MultiProof;
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use revm_primitives::{B256, U256};
use std::sync::{Arc, OnceLock};
@@ -1341,7 +1342,6 @@ mod tests {
where
F: DatabaseProviderFactory<
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
@@ -1351,7 +1351,8 @@ mod tests {
+ 'static,
{
let rt_handle = get_test_runtime_handle();
let overlay_factory = OverlayStateProviderFactory::new(factory);
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
@@ -1363,7 +1364,7 @@ mod tests {
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
where
F: DatabaseProviderFactory<
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader,
> + Clone
+ Send
+ 'static,

View File

@@ -43,13 +43,14 @@ use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome,
HashedPostStateProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader,
StateProvider, StateProviderFactory, StateReader, TrieReader,
StateProvider, StateProviderFactory, StateReader,
};
use reth_revm::db::State;
use reth_trie::{
updates::{TrieUpdates, TrieUpdatesSorted},
HashedPostState, HashedPostStateSorted, StateRoot, TrieInputSorted,
};
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
use std::{
@@ -138,6 +139,8 @@ where
metrics: EngineApiMetrics,
/// Validator for the payload.
validator: V,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
}
impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
@@ -145,7 +148,6 @@ where
N: NodePrimitives,
P: DatabaseProviderFactory<
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
@@ -169,6 +171,7 @@ where
validator: V,
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
changeset_cache: ChangesetCache,
) -> Self {
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
@@ -188,6 +191,7 @@ where
invalid_block_hook,
metrics: EngineApiMetrics::default(),
validator,
changeset_cache,
}
}
@@ -427,13 +431,33 @@ where
.map_err(Box::<dyn std::error::Error + Send + Sync>::from))
.map(Arc::new);
// Compute trie input from ancestors once, before spawning payload processor.
// This will be extended with the current block's hashed state after execution.
let trie_input_start = Instant::now();
let (trie_input, block_hash_for_overlay) =
ensure_ok!(self.compute_trie_input(parent_hash, ctx.state()));
self.metrics
.block_validation
.trie_input_duration
.record(trie_input_start.elapsed().as_secs_f64());
// Create overlay factory for payload processor (StateRootTask path needs it for
// multiproofs)
let overlay_factory = {
let TrieInputSorted { nodes, state, .. } = &trie_input;
OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
.with_block_hash(Some(block_hash_for_overlay))
.with_trie_overlay(Some(Arc::clone(nodes)))
.with_hashed_state_overlay(Some(Arc::clone(state)))
};
// Spawn the appropriate processor based on strategy
let mut handle = ensure_ok!(self.spawn_payload_processor(
env.clone(),
txs,
provider_builder,
parent_hash,
ctx.state(),
overlay_factory.clone(),
strategy,
block_access_list,
));
@@ -494,11 +518,7 @@ where
}
StateRootStrategy::Parallel => {
debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
match self.compute_state_root_parallel(
block.parent_hash(),
&hashed_state,
ctx.state(),
) {
match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
Ok(result) => {
let elapsed = root_time.elapsed();
info!(
@@ -534,7 +554,7 @@ where
}
let (root, updates) = ensure_ok_post_block!(
self.compute_state_root_serial(block.parent_hash(), &hashed_state, ctx.state()),
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
block
);
(root, updates, root_time.elapsed())
@@ -571,7 +591,14 @@ where
// Terminate prewarming task with the shared execution outcome
handle.terminate_caching(Some(Arc::clone(&execution_outcome)));
Ok(self.spawn_deferred_trie_task(block, execution_outcome, &ctx, hashed_state, trie_output))
Ok(self.spawn_deferred_trie_task(
block,
execution_outcome,
&ctx,
hashed_state,
trie_output,
overlay_factory,
))
}
/// Return sealed block header from database or in-memory state by hash.
@@ -670,6 +697,10 @@ where
/// Compute state root for the given hashed post state in parallel.
///
/// Uses an overlay factory which provides the state of the parent block, along with the
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
@@ -677,58 +708,39 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn compute_state_root_parallel(
&self,
parent_hash: B256,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
state: &EngineApiTreeState<N>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let (mut input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// Extend state overlay with current block's sorted state.
input.prefix_sets.extend(hashed_state.construct_prefix_sets());
let sorted_hashed_state = hashed_state.clone_into_sorted();
Arc::make_mut(&mut input.state).extend_ref(&sorted_hashed_state);
let TrieInputSorted { nodes, state, prefix_sets: prefix_sets_mut } = input;
let factory = OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
// The `hashed_state` argument is already taken into account as part of the overlay, but we
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// ParallelStateRoot which parts of the trie need to be recomputed.
let prefix_sets = prefix_sets_mut.freeze();
ParallelStateRoot::new(factory, prefix_sets).incremental_root_with_updates()
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
ParallelStateRoot::new(overlay_factory, prefix_sets).incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
///
/// Uses an overlay factory which provides the state of the parent block, along with the
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
fn compute_state_root_serial(
&self,
parent_hash: B256,
overlay_factory: OverlayStateProviderFactory<P>,
hashed_state: &HashedPostState,
state: &EngineApiTreeState<N>,
) -> ProviderResult<(B256, TrieUpdates)> {
let (mut input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// StateRoot which parts of the trie need to be recomputed.
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
// Extend state overlay with current block's sorted state.
input.prefix_sets.extend(hashed_state.construct_prefix_sets());
let sorted_hashed_state = hashed_state.clone_into_sorted();
Arc::make_mut(&mut input.state).extend_ref(&sorted_hashed_state);
let TrieInputSorted { nodes, state, .. } = input;
let prefix_sets = hashed_state.construct_prefix_sets();
let factory = OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
let provider = factory.database_provider_ro()?;
let provider = overlay_factory.database_provider_ro()?;
Ok(StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets.freeze())
.with_prefix_sets(prefix_sets)
.root_with_updates()?)
}
@@ -811,6 +823,11 @@ where
///
/// The method handles strategy fallbacks if the preferred approach fails, ensuring
/// block execution always completes with a valid state root.
///
/// # Arguments
///
/// * `overlay_factory` - Pre-computed overlay factory for multiproof generation
/// (`StateRootTask`)
#[allow(clippy::too_many_arguments)]
#[instrument(
level = "debug",
@@ -823,8 +840,7 @@ where
env: ExecutionEnv<Evm>,
txs: T,
provider_builder: StateProviderBuilder<N, P>,
parent_hash: B256,
state: &EngineApiTreeState<N>,
overlay_factory: OverlayStateProviderFactory<P>,
strategy: StateRootStrategy,
block_access_list: Option<Arc<BlockAccessList>>,
) -> Result<
@@ -837,32 +853,14 @@ where
> {
match strategy {
StateRootStrategy::StateRootTask => {
// Compute trie input
let trie_input_start = Instant::now();
let (trie_input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// Create OverlayStateProviderFactory with sorted trie data for multiproofs
let TrieInputSorted { nodes, state, .. } = trie_input;
let multiproof_provider_factory =
OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
// Record trie input duration including OverlayStateProviderFactory setup
self.metrics
.block_validation
.trie_input_duration
.record(trie_input_start.elapsed().as_secs_f64());
let spawn_start = Instant::now();
// Use the pre-computed overlay factory for multiproofs
let handle = self.payload_processor.spawn(
env,
txs,
provider_builder,
multiproof_provider_factory,
overlay_factory,
&self.config,
block_access_list,
);
@@ -1103,6 +1101,7 @@ where
ctx: &TreeCtx<'_, N>,
hashed_state: HashedPostState,
trie_output: TrieUpdates,
overlay_factory: OverlayStateProviderFactory<P>,
) -> ExecutedBlock<N> {
// Capture parent hash and ancestor overlays for deferred trie input construction.
let (anchor_hash, overlay_blocks) = ctx
@@ -1126,9 +1125,21 @@ where
let deferred_handle_task = deferred_trie_data.clone();
let block_validation_metrics = self.metrics.block_validation.clone();
// Capture block info and cache handle for changeset computation
let block_hash = block.hash();
let block_number = block.number();
let changeset_cache = self.changeset_cache.clone();
// Spawn background task to compute trie data. Calling `wait_cloned` will compute from
// the stored inputs and cache the result, so subsequent calls return immediately.
let compute_trie_input_task = move || {
let _span = debug_span!(
target: "engine::tree::payload_validator",
"compute_trie_input_task",
block_number
)
.entered();
let result = panic::catch_unwind(AssertUnwindSafe(|| {
let compute_start = Instant::now();
let computed = deferred_handle_task.wait_cloned();
@@ -1151,6 +1162,40 @@ where
.anchored_overlay_hashed_state_size
.record(anchored.trie_input.state.total_len() as f64);
}
// Compute and cache changesets using the computed trie_updates
let changeset_start = Instant::now();
// Get a provider from the overlay factory for trie cursor access
let changeset_result =
overlay_factory.database_provider_ro().and_then(|provider| {
reth_trie::changesets::compute_trie_changesets(
&provider,
&computed.trie_updates,
)
.map_err(ProviderError::Database)
});
match changeset_result {
Ok(changesets) => {
debug!(
target: "engine::tree::changeset",
?block_number,
elapsed = ?changeset_start.elapsed(),
"Computed and caching changesets"
);
changeset_cache.insert(block_hash, block_number, Arc::new(changesets));
}
Err(e) => {
warn!(
target: "engine::tree::changeset",
?block_number,
?e,
"Failed to compute changesets in deferred trie task"
);
}
}
}));
if result.is_err() {
@@ -1247,7 +1292,6 @@ impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm
where
P: DatabaseProviderFactory<
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader

View File

@@ -7,6 +7,7 @@ use crate::{
PersistTarget, TreeConfig,
},
};
use reth_trie_db::ChangesetCache;
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::{
@@ -192,6 +193,7 @@ impl TestHarness {
let payload_builder = PayloadBuilderHandle::new(to_payload_service);
let evm_config = MockEvmConfig::default();
let changeset_cache = ChangesetCache::new();
let engine_validator = BasicEngineValidator::new(
provider.clone(),
consensus.clone(),
@@ -199,6 +201,7 @@ impl TestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
);
let tree = EngineApiTreeHandler::new(
@@ -215,6 +218,7 @@ impl TestHarness {
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
EngineApiKind::Ethereum,
evm_config,
changeset_cache,
);
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
@@ -388,6 +392,7 @@ impl ValidatorTestHarness {
let provider = harness.provider.clone();
let payload_validator = MockEngineValidator;
let evm_config = MockEvmConfig::default();
let changeset_cache = ChangesetCache::new();
let validator = BasicEngineValidator::new(
provider,
@@ -396,6 +401,7 @@ impl ValidatorTestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache,
);
Self { harness, validator, metrics: TestMetrics::default() }

View File

@@ -54,6 +54,7 @@ reth-tasks.workspace = true
reth-tokio-util.workspace = true
reth-tracing.workspace = true
reth-transaction-pool.workspace = true
reth-trie-db = { workspace = true, features = ["metrics"] }
reth-basic-payload-builder.workspace = true
reth-node-ethstats.workspace = true
@@ -115,6 +116,7 @@ test-utils = [
"reth-db-api/test-utils",
"reth-provider/test-utils",
"reth-transaction-pool/test-utils",
"reth-trie-db/test-utils",
"reth-evm-ethereum/test-utils",
"reth-node-ethereum/test-utils",
"reth-primitives-traits/test-utils",

View File

@@ -84,6 +84,7 @@ use reth_tracing::{
tracing::{debug, error, info, warn},
};
use reth_transaction_pool::TransactionPool;
use reth_trie_db::ChangesetCache;
use std::{sync::Arc, thread::available_parallelism, time::Duration};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
@@ -470,7 +471,10 @@ where
/// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
/// between the database and static files. **It may execute a pipeline unwind if it fails this
/// check.**
pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
pub async fn create_provider_factory<N, Evm>(
&self,
changeset_cache: ChangesetCache,
) -> eyre::Result<ProviderFactory<N>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
@@ -500,7 +504,8 @@ where
static_file_provider,
rocksdb_provider,
)?
.with_prune_modes(self.prune_modes());
.with_prune_modes(self.prune_modes())
.with_changeset_cache(changeset_cache);
// Keep MDBX, static files, and RocksDB aligned. If any check fails, unwind to the
// earliest consistent block.
@@ -593,12 +598,13 @@ where
/// Creates a new [`ProviderFactory`] and attaches it to the launch context.
pub async fn with_provider_factory<N, Evm>(
self,
changeset_cache: ChangesetCache,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let factory = self.create_provider_factory::<N, Evm>().await?;
let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| factory),

View File

@@ -37,6 +37,7 @@ use reth_provider::{
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, error, info};
use reth_trie_db::ChangesetCache;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -87,6 +88,9 @@ impl EngineNodeLauncher {
} = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
// Create changeset cache that will be shared across the engine
let changeset_cache = ChangesetCache::new();
// setup the launch context
let ctx = ctx
.with_configured_globals(engine_tree_config.reserved_cpu_cores())
@@ -98,8 +102,8 @@ impl EngineNodeLauncher {
.attach(database.clone())
// ensure certain settings take effect
.with_adjusted_configs()
// Create the provider factory
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
// Create the provider factory with changeset cache
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone()).await?
.inspect(|ctx| {
info!(target: "reth::cli", "Database opened");
match ctx.provider_factory().storage_settings() {
@@ -204,7 +208,7 @@ impl EngineNodeLauncher {
// Build the engine validator with all required components
let engine_validator = validator_builder
.clone()
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
.await?;
// Create the consensus engine stream with optional reorg
@@ -214,7 +218,13 @@ impl EngineNodeLauncher {
.maybe_reorg(
ctx.blockchain_db().clone(),
ctx.components().evm_config().clone(),
|| validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
validator_builder
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.await
},
node_config.debug.reorg_frequency,
node_config.debug.reorg_depth,
)
@@ -239,6 +249,7 @@ impl EngineNodeLauncher {
engine_tree_config,
ctx.sync_metrics_tx(),
ctx.components().evm_config().clone(),
changeset_cache,
);
info!(target: "reth::cli", "Consensus engine initialized");

View File

@@ -3,6 +3,7 @@
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
pub use reth_trie_db::ChangesetCache;
use crate::{
invalid_block_hook::InvalidBlockHookExt, ConfigureEngineEvm, ConsensusEngineEvent,
@@ -1288,6 +1289,7 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
}
@@ -1335,10 +1337,12 @@ where
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> eyre::Result<Self::EngineValidator> {
let validator = self.payload_validator_builder.build(ctx).await?;
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
let invalid_block_hook = ctx.create_invalid_block_hook(&data_dir).await?;
Ok(BasicEngineValidator::new(
ctx.node.provider().clone(),
std::sync::Arc::new(ctx.node.consensus().clone()),
@@ -1346,6 +1350,7 @@ where
validator,
tree_config,
invalid_block_hook,
changeset_cache,
))
}
}

View File

@@ -5,10 +5,7 @@ use alloy_primitives::{Address, BlockNumber};
use clap::{builder::RangedU64ValueParser, Args};
use reth_chainspec::EthereumHardforks;
use reth_config::config::PruneConfig;
use reth_prune_types::{
PruneMode, PruneModes, ReceiptsLogPruneConfig, MERKLE_CHANGESETS_RETENTION_BLOCKS,
MINIMUM_PRUNING_DISTANCE,
};
use reth_prune_types::{PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE};
use std::{collections::BTreeMap, ops::Not};
/// Parameters for pruning and full node
@@ -143,7 +140,6 @@ impl PruningArgs {
.ethereum_fork_activation(EthereumHardfork::Paris)
.block_number()
.map(PruneMode::Before),
merkle_changesets: PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS),
receipts_log_filter: Default::default(),
},
}
@@ -160,7 +156,6 @@ impl PruningArgs {
account_history: Some(PruneMode::Distance(10064)),
storage_history: Some(PruneMode::Distance(10064)),
bodies_history: Some(PruneMode::Distance(10064)),
merkle_changesets: PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS),
receipts_log_filter: Default::default(),
},
}

View File

@@ -38,11 +38,6 @@ pub enum StageEnum {
///
/// Handles Merkle tree-related computations and data processing.
Merkle,
/// The merkle changesets stage within the pipeline.
///
/// Handles Merkle trie changesets for storage and accounts.
#[value(name = "merkle-changesets")]
MerkleChangeSets,
/// The transaction lookup stage within the pipeline.
///
/// Deals with the retrieval and processing of transactions.

View File

@@ -81,6 +81,7 @@ reth-revm = { workspace = true, features = ["std"] }
reth-rpc.workspace = true
reth-rpc-eth-types.workspace = true
reth-stages-types.workspace = true
reth-trie-db.workspace = true
alloy-network.workspace = true
alloy-op-hardforks.workspace = true
@@ -125,6 +126,7 @@ test-utils = [
"reth-optimism-primitives/arbitrary",
"reth-primitives-traits/test-utils",
"reth-trie-common/test-utils",
"reth-trie-db/test-utils",
"reth-stages-types/test-utils",
]
reth-codec = ["reth-optimism-primitives/reth-codec"]

View File

@@ -24,6 +24,7 @@
//! use reth_rpc::TraceApi;
//! use reth_rpc_eth_types::{EthConfig, EthStateCache};
//! use reth_tasks::{pool::BlockingTaskGuard, TaskManager};
//! use reth_trie_db::ChangesetCache;
//! use std::sync::Arc;
//!
//! #[tokio::main]
@@ -37,7 +38,7 @@
//! .with_loaded_toml_config(sepolia)
//! .unwrap()
//! .attach(Arc::new(db))
//! .with_provider_factory::<_, OpEvmConfig>()
//! .with_provider_factory::<_, OpEvmConfig>(ChangesetCache::new())
//! .await
//! .unwrap()
//! .with_genesis()

View File

@@ -127,6 +127,7 @@ pub(crate) trait DbTxPruneExt: DbTxMut + DbTx {
/// Prune a DUPSORT table for the specified key range.
///
/// Returns number of rows pruned.
#[expect(unused)]
fn prune_dupsort_table_with_range<T: DupSort>(
&self,
keys: impl RangeBounds<T::Key> + Clone + Debug,

View File

@@ -17,8 +17,8 @@ pub use set::SegmentSet;
use std::{fmt::Debug, ops::RangeInclusive};
use tracing::error;
pub use user::{
AccountHistory, Bodies, MerkleChangeSets, Receipts as UserReceipts, ReceiptsByLogs,
SenderRecovery, StorageHistory, TransactionLookup,
AccountHistory, Bodies, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery,
StorageHistory, TransactionLookup,
};
/// Prunes data from static files for a given segment.

View File

@@ -1,6 +1,6 @@
use crate::segments::{
user::ReceiptsByLogs, AccountHistory, Bodies, MerkleChangeSets, Segment, SenderRecovery,
StorageHistory, TransactionLookup, UserReceipts,
user::ReceiptsByLogs, AccountHistory, Bodies, Segment, SenderRecovery, StorageHistory,
TransactionLookup, UserReceipts,
};
use alloy_eips::eip2718::Encodable2718;
use reth_db_api::{table::Value, transaction::DbTxMut};
@@ -67,15 +67,12 @@ where
account_history,
storage_history,
bodies_history,
merkle_changesets,
receipts_log_filter,
} = prune_modes;
Self::default()
// Bodies - run first since file deletion is fast
.segment_opt(bodies_history.map(Bodies::new))
// Merkle changesets
.segment(MerkleChangeSets::new(merkle_changesets))
// Account history
.segment_opt(account_history.map(AccountHistory::new))
// Storage history

View File

@@ -1,7 +1,6 @@
mod account_history;
mod bodies;
mod history;
mod merkle_change_sets;
mod receipts;
mod receipts_by_logs;
mod sender_recovery;
@@ -10,7 +9,6 @@ mod transaction_lookup;
pub use account_history::AccountHistory;
pub use bodies::Bodies;
pub use merkle_change_sets::MerkleChangeSets;
pub use receipts::Receipts;
pub use receipts_by_logs::ReceiptsByLogs;
pub use sender_recovery::SenderRecovery;

View File

@@ -30,10 +30,7 @@ pub use pruner::{
SegmentOutputCheckpoint,
};
pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError};
pub use target::{
PruneModes, UnwindTargetPrunedError, MERKLE_CHANGESETS_RETENTION_BLOCKS,
MINIMUM_PRUNING_DISTANCE,
};
pub use target::{PruneModes, UnwindTargetPrunedError, MINIMUM_PRUNING_DISTANCE};
/// Configuration for pruning receipts not associated with logs emitted by the specified contracts.
#[derive(Debug, Clone, PartialEq, Eq, Default)]

View File

@@ -1,6 +1,6 @@
#![allow(deprecated)] // necessary to all defining deprecated `PruneSegment` variants
use crate::{MERKLE_CHANGESETS_RETENTION_BLOCKS, MINIMUM_PRUNING_DISTANCE};
use crate::MINIMUM_PRUNING_DISTANCE;
use derive_more::Display;
use strum::{EnumIter, IntoEnumIterator};
use thiserror::Error;
@@ -36,6 +36,8 @@ pub enum PruneSegment {
#[strum(disabled)]
/// Prune segment responsible for the `Transactions` table.
Transactions,
#[deprecated = "Variant indexes cannot be changed"]
#[strum(disabled)]
/// Prune segment responsible for all rows in `AccountsTrieChangeSets` and
/// `StoragesTrieChangeSets` table.
MerkleChangeSets,
@@ -67,10 +69,9 @@ impl PruneSegment {
Self::ContractLogs | Self::AccountHistory | Self::StorageHistory => {
MINIMUM_PRUNING_DISTANCE
}
Self::MerkleChangeSets => MERKLE_CHANGESETS_RETENTION_BLOCKS,
#[expect(deprecated)]
#[expect(clippy::match_same_arms)]
Self::Headers | Self::Transactions => 0,
Self::Headers | Self::Transactions | Self::MerkleChangeSets => 0,
}
}
@@ -127,6 +128,7 @@ mod tests {
{
assert!(!segments.contains(&PruneSegment::Headers));
assert!(!segments.contains(&PruneSegment::Transactions));
assert!(!segments.contains(&PruneSegment::MerkleChangeSets));
}
}
}

View File

@@ -36,17 +36,8 @@ pub enum HistoryType {
StorageHistory,
}
/// Default number of blocks to retain for merkle changesets.
/// This is used by both the `MerkleChangeSets` stage and the pruner segment.
pub const MERKLE_CHANGESETS_RETENTION_BLOCKS: u64 = 128;
/// Default pruning mode for merkle changesets
const fn default_merkle_changesets_mode() -> PruneMode {
PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS)
}
/// Pruning configuration for every segment of the data that can be pruned.
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Default)]
#[cfg_attr(any(test, feature = "serde"), derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "serde"), serde(default))]
pub struct PruneModes {
@@ -81,10 +72,6 @@ pub struct PruneModes {
/// Bodies History pruning configuration.
#[cfg_attr(any(test, feature = "serde"), serde(skip_serializing_if = "Option::is_none",))]
pub bodies_history: Option<PruneMode>,
/// Merkle Changesets pruning configuration for `AccountsTrieChangeSets` and
/// `StoragesTrieChangeSets`.
#[cfg_attr(any(test, feature = "serde"), serde(default = "default_merkle_changesets_mode"))]
pub merkle_changesets: PruneMode,
/// Receipts pruning configuration by retaining only those receipts that contain logs emitted
/// by the specified addresses, discarding others. This setting is overridden by `receipts`.
///
@@ -97,21 +84,6 @@ pub struct PruneModes {
pub receipts_log_filter: ReceiptsLogPruneConfig,
}
impl Default for PruneModes {
fn default() -> Self {
Self {
sender_recovery: None,
transaction_lookup: None,
receipts: None,
account_history: None,
storage_history: None,
bodies_history: None,
merkle_changesets: default_merkle_changesets_mode(),
receipts_log_filter: ReceiptsLogPruneConfig::default(),
}
}
}
impl PruneModes {
/// Sets pruning to all targets.
pub fn all() -> Self {
@@ -122,7 +94,6 @@ impl PruneModes {
account_history: Some(PruneMode::Full),
storage_history: Some(PruneMode::Full),
bodies_history: Some(PruneMode::Full),
merkle_changesets: PruneMode::Full,
receipts_log_filter: Default::default(),
}
}
@@ -135,16 +106,7 @@ impl PruneModes {
/// Migrates deprecated prune mode values to their new defaults.
///
/// Returns `true` if any migration was performed.
///
/// Currently migrates:
/// - `merkle_changesets`: `Distance(n)` where `n < 128` or `n == 10064` -> `Distance(128)`
pub const fn migrate(&mut self) -> bool {
if let PruneMode::Distance(d) = self.merkle_changesets &&
(d < MERKLE_CHANGESETS_RETENTION_BLOCKS || d == MINIMUM_PRUNING_DISTANCE)
{
self.merkle_changesets = PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS);
return true;
}
false
}

View File

@@ -39,9 +39,9 @@
use crate::{
stages::{
AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleChangeSets,
MerkleStage, PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage,
PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage, StorageHashingStage,
TransactionLookupStage,
},
StageSet, StageSetBuilder,
};
@@ -76,7 +76,6 @@ use tokio::sync::watch;
/// - [`AccountHashingStage`]
/// - [`StorageHashingStage`]
/// - [`MerkleStage`] (execute)
/// - [`MerkleChangeSets`]
/// - [`TransactionLookupStage`]
/// - [`IndexStorageHistoryStage`]
/// - [`IndexAccountHistoryStage`]
@@ -401,7 +400,6 @@ where
/// - [`AccountHashingStage`]
/// - [`StorageHashingStage`]
/// - [`MerkleStage`] (execute)
/// - [`MerkleChangeSets`]
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct HashingStages {
@@ -414,7 +412,6 @@ where
MerkleStage: Stage<Provider>,
AccountHashingStage: Stage<Provider>,
StorageHashingStage: Stage<Provider>,
MerkleChangeSets: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default()
@@ -431,7 +428,6 @@ where
self.stages_config.merkle.rebuild_threshold,
self.stages_config.merkle.incremental_threshold,
))
.add_stage(MerkleChangeSets::new())
}
}

View File

@@ -16,8 +16,6 @@ mod index_account_history;
mod index_storage_history;
/// Stage for computing state root.
mod merkle;
/// Stage for computing merkle changesets.
mod merkle_changesets;
mod prune;
/// The sender recovery stage.
mod sender_recovery;
@@ -34,7 +32,6 @@ pub use headers::*;
pub use index_account_history::*;
pub use index_storage_history::*;
pub use merkle::*;
pub use merkle_changesets::*;
pub use prune::*;
pub use sender_recovery::*;
pub use tx_lookup::*;

View File

@@ -290,6 +290,9 @@ pub struct IndexHistoryCheckpoint {
}
/// Saves the progress of `MerkleChangeSets` stage.
///
/// Note: This type is only kept for backward compatibility with the Compact codec.
/// The `MerkleChangeSets` stage has been removed.
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "test-utils"), derive(arbitrary::Arbitrary))]
#[cfg_attr(any(test, feature = "reth-codec"), derive(reth_codecs::Compact))]
@@ -401,9 +404,6 @@ impl StageCheckpoint {
StageId::IndexStorageHistory | StageId::IndexAccountHistory => {
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint::default())
}
StageId::MerkleChangeSets => {
StageUnitCheckpoint::MerkleChangeSets(MerkleChangeSetsCheckpoint::default())
}
_ => return self,
});
_ = self.stage_checkpoint.map(|mut checkpoint| checkpoint.set_block_range(from, to));
@@ -456,6 +456,9 @@ pub enum StageUnitCheckpoint {
/// Saves the progress of Index History stage.
IndexHistory(IndexHistoryCheckpoint),
/// Saves the progress of `MerkleChangeSets` stage.
///
/// Note: This variant is only kept for backward compatibility with the Compact codec.
/// The `MerkleChangeSets` stage has been removed.
MerkleChangeSets(MerkleChangeSetsCheckpoint),
}
@@ -467,8 +470,7 @@ impl StageUnitCheckpoint {
Self::Account(AccountHashingCheckpoint { block_range, .. }) |
Self::Storage(StorageHashingCheckpoint { block_range, .. }) |
Self::Execution(ExecutionCheckpoint { block_range, .. }) |
Self::IndexHistory(IndexHistoryCheckpoint { block_range, .. }) |
Self::MerkleChangeSets(MerkleChangeSetsCheckpoint { block_range, .. }) => {
Self::IndexHistory(IndexHistoryCheckpoint { block_range, .. }) => {
let old_range = *block_range;
*block_range = CheckpointBlockRange { from, to };
@@ -492,7 +494,7 @@ macro_rules! stage_unit_checkpoints {
impl StageCheckpoint {
$(
#[doc = $fn_get_doc]
pub const fn $fn_get_name(&self) -> Option<$checkpoint_ty> {
pub const fn $fn_get_name(&self) -> Option<$checkpoint_ty> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::$enum_variant(checkpoint)) => Some(checkpoint),
_ => None,
@@ -500,7 +502,7 @@ pub const fn $fn_get_name(&self) -> Option<$checkpoint_ty> {
}
#[doc = $fn_build_doc]
pub const fn $fn_build_name(
pub const fn $fn_build_name(
mut self,
checkpoint: $checkpoint_ty,
) -> Self {
@@ -566,15 +568,6 @@ stage_unit_checkpoints!(
index_history_stage_checkpoint,
/// Sets the stage checkpoint to index history.
with_index_history_stage_checkpoint
),
(
6,
MerkleChangeSets,
MerkleChangeSetsCheckpoint,
/// Returns the merkle changesets stage checkpoint, if any.
merkle_changesets_stage_checkpoint,
/// Sets the stage checkpoint to merkle changesets.
with_merkle_changesets_stage_checkpoint
)
);

View File

@@ -25,7 +25,6 @@ pub enum StageId {
TransactionLookup,
IndexStorageHistory,
IndexAccountHistory,
MerkleChangeSets,
Prune,
Finish,
/// Other custom stage with a provided string identifier.
@@ -40,7 +39,7 @@ static ENCODED_STAGE_IDS: OnceLock<HashMap<StageId, Vec<u8>>> = OnceLock::new();
impl StageId {
/// All supported Stages
pub const ALL: [Self; 16] = [
pub const ALL: [Self; 15] = [
Self::Era,
Self::Headers,
Self::Bodies,
@@ -54,7 +53,6 @@ impl StageId {
Self::TransactionLookup,
Self::IndexStorageHistory,
Self::IndexAccountHistory,
Self::MerkleChangeSets,
Self::Prune,
Self::Finish,
];
@@ -90,7 +88,6 @@ impl StageId {
Self::TransactionLookup => "TransactionLookup",
Self::IndexAccountHistory => "IndexAccountHistory",
Self::IndexStorageHistory => "IndexStorageHistory",
Self::MerkleChangeSets => "MerkleChangeSets",
Self::Prune => "Prune",
Self::Finish => "Finish",
Self::Other(s) => s,

View File

@@ -18,8 +18,8 @@ pub use id::StageId;
mod checkpoints;
pub use checkpoints::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint,
HeadersCheckpoint, IndexHistoryCheckpoint, MerkleChangeSetsCheckpoint, MerkleCheckpoint,
StageCheckpoint, StageUnitCheckpoint, StorageHashingCheckpoint, StorageRootMerkleCheckpoint,
HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint, StorageRootMerkleCheckpoint,
};
mod execution;

View File

@@ -5,7 +5,7 @@ mod inputs;
/// Fuzzer generates a random instance of the object and proceeds to encode and decode it. It then
/// makes sure that it matches the original object.
///
/// Some types like [`IntegerList`] might have some restrictions on how they're fuzzed. For example,
/// Some types like `IntegerList` might have some restrictions on how they're fuzzed. For example,
/// the list is assumed to be sorted before creating the object.
macro_rules! impl_fuzzer_with_input {
($(($name:tt, $input_type:tt, $encode:tt, $encode_method:tt, $decode:tt, $decode_method:tt)),+) => {

View File

@@ -2,6 +2,3 @@
mod state_reverts;
pub use state_reverts::StorageRevertsIter;
mod trie;
pub use trie::*;

View File

@@ -1,147 +0,0 @@
use itertools::{merge_join_by, EitherOrBoth};
use reth_db_api::DatabaseError;
use reth_trie::{trie_cursor::TrieCursor, BranchNodeCompact, Nibbles};
use std::cmp::{Ord, Ordering};
/// Combines a sorted iterator of trie node paths and a storage trie cursor into a new
/// iterator which produces the current values of all given paths in the same order.
#[derive(Debug)]
pub struct StorageTrieCurrentValuesIter<'cursor, P, C> {
/// Sorted iterator of node paths which we want the values of.
paths: P,
/// Storage trie cursor.
cursor: &'cursor mut C,
/// Current value at the cursor, allows us to treat the cursor as a peekable iterator.
cursor_current: Option<(Nibbles, BranchNodeCompact)>,
}
impl<'cursor, P, C> StorageTrieCurrentValuesIter<'cursor, P, C>
where
P: Iterator<Item = Nibbles>,
C: TrieCursor,
{
/// Instantiate a [`StorageTrieCurrentValuesIter`] from a sorted paths iterator and a cursor.
pub fn new(paths: P, cursor: &'cursor mut C) -> Result<Self, DatabaseError> {
let mut new_self = Self { paths, cursor, cursor_current: None };
new_self.seek_cursor(Nibbles::default())?;
Ok(new_self)
}
fn seek_cursor(&mut self, path: Nibbles) -> Result<(), DatabaseError> {
self.cursor_current = self.cursor.seek(path)?;
Ok(())
}
}
impl<'cursor, P, C> Iterator for StorageTrieCurrentValuesIter<'cursor, P, C>
where
P: Iterator<Item = Nibbles>,
C: TrieCursor,
{
type Item = Result<(Nibbles, Option<BranchNodeCompact>), DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
let Some(curr_path) = self.paths.next() else {
// If there are no more paths then there is no further possible output.
return None
};
// If the path is ahead of the cursor then seek the cursor forward to catch up. The cursor
// will seek either to `curr_path` or beyond it.
if self.cursor_current.as_ref().is_some_and(|(cursor_path, _)| curr_path > *cursor_path) &&
let Err(err) = self.seek_cursor(curr_path)
{
return Some(Err(err))
}
// If there is a path but the cursor is empty then that path has no node.
if self.cursor_current.is_none() {
return Some(Ok((curr_path, None)))
}
let (cursor_path, cursor_node) =
self.cursor_current.as_mut().expect("already checked for None");
// There is both a path and a cursor value, compare their paths.
match curr_path.cmp(cursor_path) {
Ordering::Less => {
// If the path is behind the cursor then there is no value for that
// path, produce None.
Some(Ok((curr_path, None)))
}
Ordering::Equal => {
// If the target path and cursor's path match then there is a value for that path,
// return the value. We don't seek the cursor here, that will be handled on the
// next call to `next` after checking that `paths` isn't None.
let cursor_node = core::mem::take(cursor_node);
Some(Ok((*cursor_path, Some(cursor_node))))
}
Ordering::Greater => {
panic!("cursor was seeked to {curr_path:?}, but produced a node at a lower path {cursor_path:?}")
}
}
}
}
/// Returns an iterator which produces the values to be inserted into the `StoragesTrieChangeSets`
/// table for an account whose storage was wiped during a block. It is expected that this is called
/// prior to inserting the block's trie updates.
///
/// ## Arguments
///
/// - `curr_values_of_changed` is an iterator over the current values of all trie nodes modified by
/// the block, ordered by path.
/// - `all_nodes` is an iterator over all existing trie nodes for the account, ordered by path.
///
/// ## Returns
///
/// An iterator of trie node paths and a `Some(node)` (indicating the node was wiped) or a `None`
/// (indicating the node was modified in the block but didn't previously exist. The iterator's
/// results will be ordered by path.
pub fn storage_trie_wiped_changeset_iter(
curr_values_of_changed: impl Iterator<
Item = Result<(Nibbles, Option<BranchNodeCompact>), DatabaseError>,
>,
all_nodes: impl Iterator<Item = Result<(Nibbles, BranchNodeCompact), DatabaseError>>,
) -> Result<
impl Iterator<Item = Result<(Nibbles, Option<BranchNodeCompact>), DatabaseError>>,
DatabaseError,
> {
let all_nodes = all_nodes.map(|e| e.map(|(nibbles, node)| (nibbles, Some(node))));
let merged = merge_join_by(curr_values_of_changed, all_nodes, |a, b| match (a, b) {
(Err(_), _) => Ordering::Less,
(_, Err(_)) => Ordering::Greater,
(Ok(a), Ok(b)) => a.0.cmp(&b.0),
});
Ok(merged.map(|either_or| match either_or {
EitherOrBoth::Left(changed) => {
// A path of a changed node (given in `paths`) which was not found in the database (or
// there's an error). The current value of this path must be None, otherwise it would
// have also been returned by the `all_nodes` iter.
debug_assert!(
changed.as_ref().is_err() || changed.as_ref().is_ok_and(|(_, node)| node.is_none()),
"changed node is Some but wasn't returned by `all_nodes` iterator: {changed:?}",
);
changed
}
EitherOrBoth::Right(wiped) => {
// A node was found in the db (indicating it was wiped) but was not given in `paths`.
// Return it as-is.
wiped
}
EitherOrBoth::Both(changed, _wiped) => {
// A path of a changed node (given in `paths`) was found with a previous value in the
// database. The changed node must have a value which is equal to the one found by the
// `all_nodes` iterator. If the changed node had no previous value (None) it wouldn't
// be returned by `all_nodes` and so would be in the Left branch.
//
// Due to the ordering closure passed to `merge_join_by` it's not possible for either
// value to be an error here.
debug_assert!(changed.is_ok(), "unreachable error condition: {changed:?}");
debug_assert_eq!(*changed.as_ref().unwrap(), _wiped.unwrap());
changed
}
}))
}

View File

@@ -9,7 +9,7 @@ use crate::{
HashedPostStateProvider, HeaderProvider, ProviderError, ProviderFactory, PruneCheckpointReader,
ReceiptProvider, ReceiptProviderIdExt, RocksDBProviderFactory, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, StaticFileProviderFactory,
TransactionVariant, TransactionsProvider, TrieReader,
TransactionVariant, TransactionsProvider,
};
use alloy_consensus::transaction::TransactionMeta;
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag};
@@ -29,7 +29,7 @@ use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{BlockBodyIndicesProvider, NodePrimitivesProvider, StorageChangeSetReader};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{updates::TrieUpdatesSorted, HashedPostState, KeccakKeyHasher};
use reth_trie::{HashedPostState, KeccakKeyHasher};
use revm_database::BundleState;
use std::{
ops::{RangeBounds, RangeInclusive},
@@ -768,19 +768,6 @@ impl<N: ProviderNodeTypes> StateReader for BlockchainProvider<N> {
}
}
impl<N: ProviderNodeTypes> TrieReader for BlockchainProvider<N> {
fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
self.consistent_provider()?.trie_reverts(from)
}
fn get_block_trie_updates(
&self,
block_number: BlockNumber,
) -> ProviderResult<TrieUpdatesSorted> {
self.consistent_provider()?.get_block_trie_updates(block_number)
}
}
#[cfg(test)]
mod tests {
use crate::{

View File

@@ -5,7 +5,7 @@ use crate::{
BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
TransactionsProvider, TrieReader,
TransactionsProvider,
};
use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
use alloy_eips::{
@@ -30,7 +30,6 @@ use reth_storage_api::{
StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::updates::TrieUpdatesSorted;
use revm_database::states::PlainStorageRevert;
use std::{
ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
@@ -1559,19 +1558,6 @@ impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
}
}
impl<N: ProviderNodeTypes> TrieReader for ConsistentProvider<N> {
fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
self.storage_provider.trie_reverts(from)
}
fn get_block_trie_updates(
&self,
block_number: BlockNumber,
) -> ProviderResult<TrieUpdatesSorted> {
self.storage_provider.get_block_trie_updates(block_number)
}
}
#[cfg(test)]
mod tests {
use crate::{

View File

@@ -82,8 +82,6 @@ pub(crate) struct DatabaseProviderMetrics {
save_blocks_write_state: Histogram,
/// Duration of `write_hashed_state` in `save_blocks`
save_blocks_write_hashed_state: Histogram,
/// Duration of `write_trie_changesets` in `save_blocks`
save_blocks_write_trie_changesets: Histogram,
/// Duration of `write_trie_updates` in `save_blocks`
save_blocks_write_trie_updates: Histogram,
/// Duration of `update_history_indices` in `save_blocks`
@@ -110,7 +108,6 @@ pub(crate) struct SaveBlocksTimings {
pub insert_block: Duration,
pub write_state: Duration,
pub write_hashed_state: Duration,
pub write_trie_changesets: Duration,
pub write_trie_updates: Duration,
pub update_history_indices: Duration,
pub update_pipeline_stages: Duration,
@@ -153,7 +150,6 @@ impl DatabaseProviderMetrics {
self.save_blocks_insert_block.record(timings.insert_block);
self.save_blocks_write_state.record(timings.write_state);
self.save_blocks_write_hashed_state.record(timings.write_hashed_state);
self.save_blocks_write_trie_changesets.record(timings.write_trie_changesets);
self.save_blocks_write_trie_updates.record(timings.write_trie_updates);
self.save_blocks_update_history_indices.record(timings.update_history_indices);
self.save_blocks_update_pipeline_stages.record(timings.update_pipeline_stages);

View File

@@ -33,6 +33,7 @@ use reth_storage_api::{
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::HashedPostState;
use reth_trie_db::ChangesetCache;
use revm_database::BundleState;
use std::{
ops::{RangeBounds, RangeInclusive},
@@ -74,6 +75,8 @@ pub struct ProviderFactory<N: NodeTypesWithDB> {
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Changeset cache for trie unwinding
changeset_cache: ChangesetCache,
}
impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>> {
@@ -104,6 +107,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
Default::default(),
Arc::new(RwLock::new(legacy_settings)),
rocksdb_provider.clone(),
ChangesetCache::new(),
)
.storage_settings()?
.unwrap_or(legacy_settings);
@@ -116,6 +120,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
storage: Default::default(),
storage_settings: Arc::new(RwLock::new(storage_settings)),
rocksdb_provider,
changeset_cache: ChangesetCache::new(),
})
}
}
@@ -127,6 +132,12 @@ impl<N: NodeTypesWithDB> ProviderFactory<N> {
self
}
/// Sets the changeset cache for an existing [`ProviderFactory`].
pub fn with_changeset_cache(mut self, changeset_cache: ChangesetCache) -> Self {
self.changeset_cache = changeset_cache;
self
}
/// Returns reference to the underlying database.
pub const fn db_ref(&self) -> &N::DB {
&self.db
@@ -197,6 +208,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
))
}
@@ -214,6 +226,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
)))
}
@@ -623,6 +636,7 @@ where
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
} = self;
f.debug_struct("ProviderFactory")
.field("db", &db)
@@ -632,6 +646,7 @@ where
.field("storage", &storage)
.field("storage_settings", &*storage_settings.read())
.field("rocksdb_provider", &rocksdb_provider)
.field("changeset_cache", &changeset_cache)
.finish()
}
}
@@ -646,6 +661,7 @@ impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
storage: self.storage.clone(),
storage_settings: self.storage_settings.clone(),
rocksdb_provider: self.rocksdb_provider.clone(),
changeset_cache: self.changeset_cache.clone(),
}
}
}

View File

@@ -1,7 +1,5 @@
use crate::{
changesets_utils::{
storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
},
changesets_utils::StorageRevertsIter,
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
@@ -20,7 +18,7 @@ use crate::{
PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
RocksDBProviderFactory, RocksTxRefArg, StageCheckpointReader, StateProviderBox, StateWriter,
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, TrieReader, TrieWriter,
TransactionsProvider, TransactionsProviderExt, TrieWriter,
};
use alloy_consensus::{
transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
@@ -29,7 +27,7 @@ use alloy_consensus::{
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{
keccak256,
map::{hash_map, B256Map, HashMap, HashSet},
map::{hash_map, HashMap, HashSet},
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
};
use itertools::Itertools;
@@ -66,16 +64,12 @@ use reth_storage_api::{
};
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
use reth_trie::{
trie_cursor::{
InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
TrieCursorIter,
},
changesets::storage_trie_wiped_changeset_iter,
trie_cursor::{InMemoryTrieCursor, TrieCursor, TrieCursorIter, TrieStorageCursor},
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
};
use reth_trie_db::{
DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
};
use reth_trie_db::{ChangesetCache, DatabaseAccountTrieCursor, DatabaseStorageTrieCursor};
use revm_database::states::{
PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
};
@@ -187,6 +181,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Changeset cache for trie unwinding
changeset_cache: ChangesetCache,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
@@ -206,6 +202,7 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider)
.field("changeset_cache", &self.changeset_cache)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
@@ -319,6 +316,7 @@ impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpe
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
/// Creates a provider with an inner read-write transaction.
#[allow(clippy::too_many_arguments)]
pub fn new_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
@@ -327,6 +325,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self {
tx,
@@ -336,6 +335,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
@@ -529,7 +529,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
if save_mode.with_state() {
let execution_output = block.execution_outcome();
let block_number = recovered_block.number();
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
@@ -552,10 +551,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
self.write_hashed_state(&trie_data.hashed_state)?;
timings.write_hashed_state += start.elapsed();
let start = Instant::now();
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
timings.write_trie_changesets += start.elapsed();
let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
timings.write_trie_updates += start.elapsed();
@@ -692,7 +687,17 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
self.unwind_storage_history_indices(changed_storages.iter().copied())?;
// Unwind accounts/storages trie tables using the revert.
let trie_revert = self.trie_reverts(from)?;
// Get the database tip block number
let db_tip_block = self
.get_stage_checkpoint(reth_stages_types::StageId::Finish)?
.as_ref()
.map(|chk| chk.block_number)
.ok_or_else(|| ProviderError::InsufficientChangesets {
requested: from,
available: 0..=0,
})?;
let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
self.write_trie_updates_sorted(&trie_revert)?;
// Clear trie changesets which have been unwound.
@@ -834,6 +839,7 @@ where
impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
/// Creates a provider with an inner read-only transaction.
#[allow(clippy::too_many_arguments)]
pub fn new(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
@@ -842,6 +848,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
changeset_cache: ChangesetCache,
) -> Self {
Self {
tx,
@@ -851,6 +858,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
@@ -2187,7 +2195,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
}
// Write account changes
tracing::debug!(target: "sync::stages::merkle_changesets", ?first_block, "Writing account changes");
tracing::trace!(?first_block, "Writing account changes");
for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
let changeset = account_block_reverts
@@ -2680,127 +2688,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
}
}
impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
let tx = self.tx_ref();
// Read account trie changes directly into a Vec - data is already sorted by nibbles
// within each block, and we want the oldest (first) version of each node sorted by path.
let mut account_nodes = Vec::new();
let mut seen_account_keys = HashSet::new();
let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
for entry in accounts_cursor.walk_range(from..)? {
let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
// Only keep the first (oldest) version of each node
if seen_account_keys.insert(nibbles.0) {
account_nodes.push((nibbles.0, node));
}
}
account_nodes.sort_by_key(|(path, _)| *path);
// Read storage trie changes - data is sorted by (block, hashed_address, nibbles)
// Keep track of seen (address, nibbles) pairs to only keep the oldest version per address,
// sorted by path.
let mut storage_tries = B256Map::<Vec<_>>::default();
let mut seen_storage_keys = HashSet::new();
let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
// Create storage range starting from `from` block
let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
for entry in storages_cursor.walk_range(storage_range_start..)? {
let (
BlockNumberHashedAddress((_, hashed_address)),
TrieChangeSetsEntry { nibbles, node },
) = entry?;
// Only keep the first (oldest) version of each node for this address
if seen_storage_keys.insert((hashed_address, nibbles.0)) {
storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
}
}
// Convert to StorageTrieUpdatesSorted
let storage_tries = storage_tries
.into_iter()
.map(|(address, mut nodes)| {
nodes.sort_by_key(|(path, _)| *path);
(address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
})
.collect();
Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
}
fn get_block_trie_updates(
&self,
block_number: BlockNumber,
) -> ProviderResult<TrieUpdatesSorted> {
let tx = self.tx_ref();
// Step 1: Get the trie reverts for the state after the target block
let reverts = self.trie_reverts(block_number + 1)?;
// Step 2: Create an InMemoryTrieCursorFactory with the reverts
// This gives us the trie state as it was after the target block was processed
let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
// Step 3: Collect all account trie nodes that changed in the target block
let mut account_nodes = Vec::new();
// Walk through all account trie changes for this block
let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
let mut account_cursor = cursor_factory.account_trie_cursor()?;
for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
// Look up the current value of this trie node using the overlay cursor
let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
account_nodes.push((nibbles.0, node_value));
}
// Step 4: Collect all storage trie nodes that changed in the target block
let mut storage_tries = B256Map::default();
let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
let mut current_hashed_address = None;
let mut storage_cursor = None;
for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
let (
BlockNumberHashedAddress((_, hashed_address)),
TrieChangeSetsEntry { nibbles, .. },
) = entry?;
// Check if we need to create a new storage cursor for a different account
if current_hashed_address != Some(hashed_address) {
storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
current_hashed_address = Some(hashed_address);
}
// Look up the current value of this storage trie node
let cursor =
storage_cursor.as_mut().expect("storage_cursor was just initialized above");
let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
storage_tries
.entry(hashed_address)
.or_insert_with(|| StorageTrieUpdatesSorted {
storage_nodes: Vec::new(),
is_deleted: false,
})
.storage_nodes
.push((nibbles.0, node_value));
}
Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
/// Writes storage trie updates from the given storage trie map with already sorted updates.
///
@@ -2843,22 +2730,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseP
let mut changeset_cursor =
self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
let curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
// We hold two cursors to the same table because we use them simultaneously when an
// account's storage is wiped. We keep them outside the for-loop so they can be re-used
// between accounts.
let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
// DatabaseStorageTrieCursor requires ownership of the cursor. The easiest way to deal with
// this is to create this outer variable with an initial dummy account, and overwrite it on
// every loop for every real account.
let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
changed_curr_values_cursor,
B256::default(), // Will be set per iteration
);
let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
wiped_nodes_cursor,
// Wrap the cursor in DatabaseStorageTrieCursor
let mut db_storage_cursor = DatabaseStorageTrieCursor::new(
curr_values_cursor,
B256::default(), // Will be set per iteration
);
@@ -2868,43 +2744,22 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseP
for (hashed_address, storage_trie_updates) in storage_tries {
let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
// Update the hashed address for the cursors
changed_curr_values_cursor =
DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
// Update the hashed address for the cursor
db_storage_cursor.set_hashed_address(*hashed_address);
// Get the overlay updates, or use empty updates
let overlay = updates_overlay.unwrap_or(&empty_updates);
// Wrap the cursor in InMemoryTrieCursor with the overlay
let mut in_memory_changed_cursor = InMemoryTrieCursor::new_storage(
&mut changed_curr_values_cursor,
overlay,
*hashed_address,
);
let mut in_memory_storage_cursor =
InMemoryTrieCursor::new_storage(&mut db_storage_cursor, overlay, *hashed_address);
// Create an iterator which produces the current values of all updated paths, or None if
// they are currently unset.
let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
storage_trie_updates.storage_nodes.iter().map(|e| e.0),
&mut in_memory_changed_cursor,
)?;
let changed_paths = storage_trie_updates.storage_nodes.iter().map(|e| e.0);
if storage_trie_updates.is_deleted() {
// Create an iterator that starts from the beginning of the storage trie for this
// account
wiped_nodes_cursor =
DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
let all_nodes = TrieCursorIter::new(&mut in_memory_storage_cursor);
// Wrap the wiped nodes cursor in InMemoryTrieCursor with the overlay
let mut in_memory_wiped_cursor = InMemoryTrieCursor::new_storage(
&mut wiped_nodes_cursor,
overlay,
*hashed_address,
);
let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
for wiped in storage_trie_wiped_changeset_iter(changed_paths, all_nodes)? {
let (path, node) = wiped?;
num_written += 1;
changeset_cursor.append_dup(
@@ -2913,8 +2768,8 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseP
)?;
}
} else {
for curr_value in curr_values_of_changed {
let (path, node) = curr_value?;
for path in changed_paths {
let node = in_memory_storage_cursor.seek_exact(path)?.map(|(_, node)| node);
num_written += 1;
changeset_cursor.append_dup(
changeset_key,
@@ -3672,6 +3527,7 @@ mod tests {
test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
BlockWriter,
};
use alloy_primitives::map::B256Map;
use reth_ethereum_primitives::Receipt;
use reth_testing_utils::generators::{self, random_block, BlockParams};
use reth_trie::Nibbles;
@@ -4913,279 +4769,6 @@ mod tests {
provider_rw.commit().unwrap();
}
#[test]
fn test_get_block_trie_updates() {
use reth_db_api::models::BlockNumberHashedAddress;
use reth_trie::{BranchNodeCompact, StorageTrieEntry};
let factory = create_test_provider_factory();
let provider_rw = factory.provider_rw().unwrap();
let target_block = 2u64;
let next_block = 3u64;
// Create test nibbles and nodes for accounts
let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
let node1 = BranchNodeCompact::new(
0b1111_1111_0000_0000,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);
let node2 = BranchNodeCompact::new(
0b0000_0000_1111_1111,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);
let node3 = BranchNodeCompact::new(
0b1010_1010_1010_1010,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);
// Pre-populate AccountsTrie with nodes that will be the final state
{
let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
// account_nibbles3 will be deleted (not in final state)
}
// Insert trie changesets for target_block
{
let mut cursor =
provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
// nibbles1 was updated in target_block (old value stored)
cursor
.append_dup(
target_block,
TrieChangeSetsEntry {
nibbles: StoredNibblesSubKey(account_nibbles1),
node: Some(BranchNodeCompact::new(
0b1111_0000_0000_0000, // old value
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
)),
},
)
.unwrap();
// nibbles2 was created in target_block (no old value)
cursor
.append_dup(
target_block,
TrieChangeSetsEntry {
nibbles: StoredNibblesSubKey(account_nibbles2),
node: None,
},
)
.unwrap();
}
// Insert trie changesets for next_block (to test overlay)
{
let mut cursor =
provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
// nibbles3 was deleted in next_block (old value stored)
cursor
.append_dup(
next_block,
TrieChangeSetsEntry {
nibbles: StoredNibblesSubKey(account_nibbles3),
node: Some(node3),
},
)
.unwrap();
}
// Storage trie updates
let storage_address1 = B256::from([1u8; 32]);
let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
let storage_node1 = BranchNodeCompact::new(
0b1111_1111_1111_0000,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);
let storage_node2 = BranchNodeCompact::new(
0b0101_0101_0101_0101,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);
// Pre-populate StoragesTrie with final state
{
let mut cursor =
provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
cursor
.upsert(
storage_address1,
&StorageTrieEntry {
nibbles: StoredNibblesSubKey(storage_nibbles1),
node: storage_node1.clone(),
},
)
.unwrap();
// storage_nibbles2 was deleted in next_block, so it's not in final state
}
// Insert storage trie changesets for target_block
{
let mut cursor =
provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
let key = BlockNumberHashedAddress((target_block, storage_address1));
// storage_nibbles1 was updated
cursor
.append_dup(
key,
TrieChangeSetsEntry {
nibbles: StoredNibblesSubKey(storage_nibbles1),
node: Some(BranchNodeCompact::new(
0b0000_0000_1111_1111, // old value
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
)),
},
)
.unwrap();
// storage_nibbles2 was created
cursor
.append_dup(
key,
TrieChangeSetsEntry {
nibbles: StoredNibblesSubKey(storage_nibbles2),
node: None,
},
)
.unwrap();
}
// Insert storage trie changesets for next_block (to test overlay)
{
let mut cursor =
provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
let key = BlockNumberHashedAddress((next_block, storage_address1));
// storage_nibbles2 was deleted in next_block
cursor
.append_dup(
key,
TrieChangeSetsEntry {
nibbles: StoredNibblesSubKey(storage_nibbles2),
node: Some(BranchNodeCompact::new(
0b0101_0101_0101_0101, // value that was deleted
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
)),
},
)
.unwrap();
}
provider_rw.commit().unwrap();
// Now test get_block_trie_updates
let provider = factory.provider().unwrap();
let result = provider.get_block_trie_updates(target_block).unwrap();
// Verify account trie updates
assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
// Check nibbles1 - should have the current value (node1)
let nibbles1_update = result
.account_nodes_ref()
.iter()
.find(|(n, _)| n == &account_nibbles1)
.expect("Should find nibbles1");
assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
assert_eq!(
nibbles1_update.1.as_ref().unwrap().state_mask,
node1.state_mask,
"nibbles1 should have current value"
);
// Check nibbles2 - should have the current value (node2)
let nibbles2_update = result
.account_nodes_ref()
.iter()
.find(|(n, _)| n == &account_nibbles2)
.expect("Should find nibbles2");
assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
assert_eq!(
nibbles2_update.1.as_ref().unwrap().state_mask,
node2.state_mask,
"nibbles2 should have current value"
);
// nibbles3 should NOT be in the result (it was changed in next_block, not target_block)
assert!(
!result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
"nibbles3 should not be in target_block updates"
);
// Verify storage trie updates
assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
let storage_updates = result
.storage_tries_ref()
.get(&storage_address1)
.expect("Should have storage updates for address1");
assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
// Check storage_nibbles1 - should have current value
let storage1_update = storage_updates
.storage_nodes
.iter()
.find(|(n, _)| n == &storage_nibbles1)
.expect("Should find storage_nibbles1");
assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
assert_eq!(
storage1_update.1.as_ref().unwrap().state_mask,
storage_node1.state_mask,
"storage_nibbles1 should have current value"
);
// Check storage_nibbles2 - was created in target_block, will be deleted in next_block
// So it should have a value (the value that will be deleted)
let storage2_update = storage_updates
.storage_nodes
.iter()
.find(|(n, _)| n == &storage_nibbles2)
.expect("Should find storage_nibbles2");
assert!(
storage2_update.1.is_some(),
"storage_nibbles2 should have a value (the node that will be deleted in next block)"
);
assert_eq!(
storage2_update.1.as_ref().unwrap().state_mask,
storage_node2.state_mask,
"storage_nibbles2 should have the value that was created and will be deleted"
);
}
#[test]
fn test_prunable_receipts_logic() {
let insert_blocks =

View File

@@ -8,7 +8,7 @@ use reth_prune_types::PruneSegment;
use reth_stages_types::StageId;
use reth_storage_api::{
BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
DatabaseProviderROFactory, PruneCheckpointReader, StageCheckpointReader, TrieReader,
DatabaseProviderROFactory, PruneCheckpointReader, StageCheckpointReader,
};
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
@@ -17,7 +17,7 @@ use reth_trie::{
HashedPostStateSorted, KeccakKeyHasher,
};
use reth_trie_db::{
DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
ChangesetCache, DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
};
use std::{
collections::{hash_map::Entry, HashMap},
@@ -67,6 +67,8 @@ pub struct OverlayStateProviderFactory<F> {
trie_overlay: Option<Arc<TrieUpdatesSorted>>,
/// Optional hashed state overlay
hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
/// Changeset cache handle for retrieving trie changesets
changeset_cache: ChangesetCache,
/// Metrics for tracking provider operations
metrics: OverlayStateProviderMetrics,
/// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
@@ -76,12 +78,13 @@ pub struct OverlayStateProviderFactory<F> {
impl<F> OverlayStateProviderFactory<F> {
/// Create a new overlay state provider factory
pub fn new(factory: F) -> Self {
pub fn new(factory: F, changeset_cache: ChangesetCache) -> Self {
Self {
factory,
block_hash: None,
trie_overlay: None,
hashed_state_overlay: None,
changeset_cache,
metrics: OverlayStateProviderMetrics::default(),
overlay_cache: Default::default(),
}
@@ -112,13 +115,22 @@ impl<F> OverlayStateProviderFactory<F> {
self.hashed_state_overlay = hashed_state_overlay;
self
}
/// Extends the existing hashed state overlay with the given [`HashedPostStateSorted`].
pub fn with_extended_hashed_state_overlay(mut self, other: HashedPostStateSorted) -> Self {
if let Some(overlay) = self.hashed_state_overlay.as_mut() {
Arc::make_mut(overlay).extend_ref(&other);
} else {
self.hashed_state_overlay = Some(Arc::new(other))
}
self
}
}
impl<F> OverlayStateProviderFactory<F>
where
F: DatabaseProviderFactory,
F::Provider: TrieReader
+ StageCheckpointReader
F::Provider: StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ DBProvider
@@ -144,7 +156,7 @@ where
/// the DB are currently synced to.
fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
provider
.get_stage_checkpoint(StageId::MerkleChangeSets)?
.get_stage_checkpoint(StageId::Finish)?
.as_ref()
.map(|chk| chk.block_number)
.ok_or_else(|| ProviderError::InsufficientChangesets { requested: 0, available: 0..=0 })
@@ -153,7 +165,6 @@ where
/// Returns whether or not it is required to collect reverts, and validates that there are
/// sufficient changesets to revert to the requested block number if so.
///
/// Returns an error if the `MerkleChangeSets` checkpoint doesn't cover the requested block.
/// Takes into account both the stage checkpoint and the prune checkpoint to determine the
/// available data range.
fn reverts_required(
@@ -168,18 +179,10 @@ where
return Ok(false)
}
// Get the MerkleChangeSets prune checkpoints, which will be used to determine the lower
// bound.
let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::MerkleChangeSets)?;
// Extract the lower bound from prune checkpoint if available.
//
// If not available we assume pruning has never ran and so there is no lower bound. This
// should not generally happen, since MerkleChangeSets always have pruning enabled, but when
// starting a new node from scratch (e.g. in a test case or benchmark) it can surface.
//
// Check account history prune checkpoint to determine the lower bound of available data.
// The prune checkpoint's block_number is the highest pruned block, so data is available
// starting from the next block
// starting from the next block.
let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let lower_bound = prune_checkpoint
.and_then(|chk| chk.block_number)
.map(|block_number| block_number + 1)
@@ -223,16 +226,32 @@ where
self.get_requested_block_number(provider)? &&
self.reverts_required(provider, db_tip_block, from_block)?
{
// Collect trie reverts
debug!(
target: "providers::state::overlay",
block_hash = ?self.block_hash,
from_block,
db_tip_block,
range_start = from_block + 1,
range_end = db_tip_block,
"Collecting trie reverts for overlay state provider"
);
// Collect trie reverts using changeset cache
let mut trie_reverts = {
let _guard =
debug_span!(target: "providers::state::overlay", "Retrieving trie reverts")
.entered();
let start = Instant::now();
let res = provider.trie_reverts(from_block + 1)?;
// Use changeset cache to retrieve and accumulate reverts to restore state after
// from_block
let accumulated_reverts = self
.changeset_cache
.get_or_compute_range(provider, (from_block + 1)..=db_tip_block)?;
retrieve_trie_reverts_duration = start.elapsed();
res
accumulated_reverts
};
// Collect state reverts
@@ -361,11 +380,7 @@ where
impl<F> DatabaseProviderROFactory for OverlayStateProviderFactory<F>
where
F: DatabaseProviderFactory,
F::Provider: TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ BlockNumReader
+ ChangeSetReader,
F::Provider: StageCheckpointReader + PruneCheckpointReader + BlockNumReader + ChangeSetReader,
{
type Provider = OverlayStateProvider<F::Provider>;

View File

@@ -34,13 +34,12 @@ use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{
BlockBodyIndicesProvider, BytecodeReader, DBProvider, DatabaseProviderFactory,
HashedPostStateProvider, NodePrimitivesProvider, StageCheckpointReader, StateProofProvider,
StorageRootProvider, TrieReader,
StorageRootProvider,
};
use reth_storage_errors::provider::{ConsistentViewError, ProviderError, ProviderResult};
use reth_trie::{
updates::{TrieUpdates, TrieUpdatesSorted},
AccountProof, HashedPostState, HashedStorage, MultiProof, MultiProofTargets, StorageMultiProof,
StorageProof, TrieInput,
updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
use std::{
collections::BTreeMap,
@@ -1001,19 +1000,6 @@ impl<T: NodePrimitives, ChainSpec: Send + Sync> StateReader for MockEthProvider<
}
}
impl<T: NodePrimitives, ChainSpec: Send + Sync> TrieReader for MockEthProvider<T, ChainSpec> {
fn trie_reverts(&self, _from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
Ok(TrieUpdatesSorted::default())
}
fn get_block_trie_updates(
&self,
_block_number: BlockNumber,
) -> ProviderResult<TrieUpdatesSorted> {
Ok(TrieUpdatesSorted::default())
}
}
impl<T: NodePrimitives, ChainSpec: Send + Sync> CanonStateSubscriptions
for MockEthProvider<T, ChainSpec>
{

View File

@@ -4,7 +4,7 @@ use crate::{
AccountReader, BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, PruneCheckpointReader,
RocksDBProviderFactory, StageCheckpointReader, StateProviderFactory, StateReader,
StaticFileProviderFactory, TrieReader,
StaticFileProviderFactory,
};
use reth_chain_state::{
CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions,
@@ -17,11 +17,7 @@ use std::fmt::Debug;
pub trait FullProvider<N: NodeTypesWithDB>:
DatabaseProviderFactory<
DB = N::DB,
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader,
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader + ChangeSetReader,
> + NodePrimitivesProvider<Primitives = N::Primitives>
+ StaticFileProviderFactory<Primitives = N::Primitives>
+ RocksDBProviderFactory
@@ -50,11 +46,7 @@ pub trait FullProvider<N: NodeTypesWithDB>:
impl<T, N: NodeTypesWithDB> FullProvider<N> for T where
T: DatabaseProviderFactory<
DB = N::DB,
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader,
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader + ChangeSetReader,
> + NodePrimitivesProvider<Primitives = N::Primitives>
+ StaticFileProviderFactory<Primitives = N::Primitives>
+ RocksDBProviderFactory

View File

@@ -6,7 +6,7 @@ use crate::{
HashedPostStateProvider, HeaderProvider, NodePrimitivesProvider, PruneCheckpointReader,
ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProofProvider,
StateProvider, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider,
StorageRootProvider, TransactionVariant, TransactionsProvider, TrieReader,
StorageRootProvider, TransactionVariant, TransactionsProvider,
};
#[cfg(feature = "db-api")]
@@ -35,9 +35,8 @@ use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use reth_trie_common::{
updates::{TrieUpdates, TrieUpdatesSorted},
AccountProof, HashedPostState, HashedStorage, MultiProof, MultiProofTargets, StorageMultiProof,
StorageProof, TrieInput,
updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
/// Supports various api interfaces for testing purposes.
@@ -646,19 +645,6 @@ impl<ChainSpec: Send + Sync, N: NodePrimitives> DBProvider for NoopProvider<Chai
}
}
impl<C: Send + Sync, N: NodePrimitives> TrieReader for NoopProvider<C, N> {
fn trie_reverts(&self, _from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
Ok(TrieUpdatesSorted::default())
}
fn get_block_trie_updates(
&self,
_block_number: BlockNumber,
) -> ProviderResult<TrieUpdatesSorted> {
Ok(TrieUpdatesSorted::default())
}
}
#[cfg(feature = "db-api")]
impl<ChainSpec: Send + Sync, N: NodePrimitives> DatabaseProviderFactory
for NoopProvider<ChainSpec, N>

View File

@@ -89,20 +89,6 @@ pub trait StateProofProvider {
fn witness(&self, input: TrieInput, target: HashedPostState) -> ProviderResult<Vec<Bytes>>;
}
/// Trie Reader
#[auto_impl::auto_impl(&, Box)]
pub trait TrieReader: Send {
/// Returns the [`TrieUpdatesSorted`] for reverting the trie database to its state prior to the
/// given block and onwards having been processed.
fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted>;
/// Returns the trie updates that were applied by the specified block.
fn get_block_trie_updates(
&self,
block_number: BlockNumber,
) -> ProviderResult<TrieUpdatesSorted>;
}
/// Trie Writer
#[auto_impl::auto_impl(&, Box)]
pub trait TrieWriter: Send {

View File

@@ -17,12 +17,21 @@ reth-primitives-traits.workspace = true
reth-execution-errors.workspace = true
reth-db-api.workspace = true
reth-trie.workspace = true
reth-trie-common.workspace = true
reth-storage-api = { workspace = true, features = ["db-api"] }
reth-storage-errors.workspace = true
reth-stages-types.workspace = true
reth-metrics = { workspace = true, optional = true }
# alloy
alloy-primitives.workspace = true
# misc
parking_lot.workspace = true
# metrics
metrics = { workspace = true, optional = true }
# tracing
tracing.workspace = true
@@ -50,11 +59,13 @@ serde_json.workspace = true
similar-asserts.workspace = true
[features]
metrics = ["reth-trie/metrics"]
metrics = ["reth-trie/metrics", "dep:reth-metrics", "dep:metrics"]
serde = [
"similar-asserts/serde",
"alloy-consensus/serde",
"alloy-primitives/serde",
"parking_lot/serde",
"reth-stages-types/serde",
"reth-storage-api/serde",
"reth-trie/serde",
"reth-trie-common/serde",
@@ -69,5 +80,6 @@ test-utils = [
"reth-db/test-utils",
"reth-db-api/test-utils",
"reth-provider/test-utils",
"reth-stages-types/test-utils",
"reth-trie/test-utils",
]

View File

@@ -0,0 +1,841 @@
//! Trie changeset computation and caching utilities.
//!
//! This module provides functionality to compute trie changesets for a given block,
//! which represent the old trie node values before the block was processed.
//!
//! It also provides an efficient in-memory cache for these changesets, which is essential for:
//! - **Reorg support**: Quickly access changesets to revert blocks during chain reorganizations
//! - **Memory efficiency**: Automatic eviction ensures bounded memory usage
use crate::{DatabaseHashedPostState, DatabaseStateRoot, DatabaseTrieCursorFactory};
use alloy_primitives::{map::B256Map, BlockNumber, B256};
use parking_lot::RwLock;
use reth_storage_api::{BlockNumReader, ChangeSetReader, DBProvider, StageCheckpointReader};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use reth_trie::{
changesets::compute_trie_changesets,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory},
HashedPostStateSorted, KeccakKeyHasher, StateRoot, TrieInputSorted,
};
use reth_trie_common::updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted};
use std::{
collections::{BTreeMap, HashMap},
ops::RangeInclusive,
sync::Arc,
time::Instant,
};
use tracing::debug;
#[cfg(feature = "metrics")]
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
/// Computes trie changesets for a block.
///
/// # Algorithm
///
/// For block N:
/// 1. Query cumulative `HashedPostState` revert for block N-1 (from db tip to after N-1)
/// 2. Use that to calculate cumulative `TrieUpdates` revert for block N-1
/// 3. Query per-block `HashedPostState` revert for block N
/// 4. Create prefix sets from the per-block revert (step 3)
/// 5. Create overlay with cumulative trie updates and cumulative state revert for N-1
/// 6. Calculate trie updates for block N using the overlay and per-block `HashedPostState`.
/// 7. Compute changesets using the N-1 overlay and the newly calculated trie updates for N
///
/// # Arguments
///
/// * `provider` - Database provider with changeset access
/// * `block_number` - Block number to compute changesets for
///
/// # Returns
///
/// Changesets (old trie node values) for the specified block
///
/// # Errors
///
/// Returns error if:
/// - Block number exceeds database tip (based on Finish stage checkpoint)
/// - Database access fails
/// - State root computation fails
pub fn compute_block_trie_changesets<Provider>(
provider: &Provider,
block_number: BlockNumber,
) -> Result<TrieUpdatesSorted, ProviderError>
where
Provider: DBProvider + StageCheckpointReader + ChangeSetReader + BlockNumReader,
{
debug!(
target: "trie::changeset_cache",
block_number,
"Computing block trie changesets from database state"
);
// Step 1: Collect/calculate state reverts
// This is just the changes from this specific block
let individual_state_revert = HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(
provider,
block_number..=block_number,
)?;
// This reverts all changes from db tip back to just after block was processed
let cumulative_state_revert =
HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(provider, (block_number + 1)..)?;
// This reverts all changes from db tip back to just after block-1 was processed
let mut cumulative_state_revert_prev = cumulative_state_revert.clone();
cumulative_state_revert_prev.extend_ref(&individual_state_revert);
// Step 2: Calculate cumulative trie updates revert for block-1
// This gives us the trie state as it was after block-1 was processed
let prefix_sets_prev = cumulative_state_revert_prev.construct_prefix_sets();
let input_prev = TrieInputSorted::new(
Arc::default(),
Arc::new(cumulative_state_revert_prev),
prefix_sets_prev,
);
let cumulative_trie_updates_prev =
StateRoot::overlay_root_from_nodes_with_updates(provider.tx_ref(), input_prev)
.map_err(ProviderError::other)?
.1
.into_sorted();
// Step 2: Create prefix sets from individual revert (only paths changed by this block)
let prefix_sets = individual_state_revert.construct_prefix_sets();
// Step 3: Calculate trie updates for block
// Use cumulative trie updates for block-1 as the node overlay and cumulative state for block
let input = TrieInputSorted::new(
Arc::new(cumulative_trie_updates_prev.clone()),
Arc::new(cumulative_state_revert),
prefix_sets,
);
let trie_updates = StateRoot::overlay_root_from_nodes_with_updates(provider.tx_ref(), input)
.map_err(ProviderError::other)?
.1
.into_sorted();
// Step 4: Compute changesets using cumulative trie updates for block-1 as overlay
// Create an overlay cursor factory that has the trie state from after block-1
let db_cursor_factory = DatabaseTrieCursorFactory::new(provider.tx_ref());
let overlay_factory =
InMemoryTrieCursorFactory::new(db_cursor_factory, &cumulative_trie_updates_prev);
let changesets =
compute_trie_changesets(&overlay_factory, &trie_updates).map_err(ProviderError::other)?;
debug!(
target: "trie::changeset_cache",
block_number,
num_account_nodes = changesets.account_nodes_ref().len(),
num_storage_tries = changesets.storage_tries_ref().len(),
"Computed block trie changesets successfully"
);
Ok(changesets)
}
/// Computes block trie updates using the changeset cache.
///
/// # Algorithm
///
/// For block N:
/// 1. Get cumulative trie reverts from block N+1 to db tip using the cache
/// 2. Create an overlay cursor factory with these reverts (representing trie state after block N)
/// 3. Walk through account trie changesets for block N
/// 4. For each changed path, look up the current value using the overlay cursor
/// 5. Walk through storage trie changesets for block N
/// 6. For each changed path, look up the current value using the overlay cursor
/// 7. Return the collected trie updates
///
/// # Arguments
///
/// * `cache` - Handle to the changeset cache for retrieving trie reverts
/// * `provider` - Database provider for accessing changesets and block data
/// * `block_number` - Block number to compute trie updates for
///
/// # Returns
///
/// Trie updates representing the state of trie nodes after the block was processed
///
/// # Errors
///
/// Returns error if:
/// - Block number exceeds database tip
/// - Database access fails
/// - Cache retrieval fails
pub fn compute_block_trie_updates<Provider>(
cache: &ChangesetCache,
provider: &Provider,
block_number: BlockNumber,
) -> ProviderResult<TrieUpdatesSorted>
where
Provider: DBProvider + StageCheckpointReader + ChangeSetReader + BlockNumReader,
{
let tx = provider.tx_ref();
// Get the database tip block number
let db_tip_block = provider
.get_stage_checkpoint(reth_stages_types::StageId::Finish)?
.as_ref()
.map(|chk| chk.block_number)
.ok_or_else(|| ProviderError::InsufficientChangesets {
requested: block_number,
available: 0..=0,
})?;
// Step 1: Get the block hash for the target block
let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
ProviderError::other(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("block hash not found for block number {}", block_number),
))
})?;
// Step 2: Get the trie changesets for the target block from cache
let changesets = cache.get_or_compute(block_hash, block_number, provider)?;
// Step 3: Get the trie reverts for the state after the target block using the cache
let reverts = cache.get_or_compute_range(provider, (block_number + 1)..=db_tip_block)?;
// Step 4: Create an InMemoryTrieCursorFactory with the reverts
// This gives us the trie state as it was after the target block was processed
let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
// Step 5: Collect all account trie nodes that changed in the target block
let mut account_nodes = Vec::new();
let mut account_cursor = cursor_factory.account_trie_cursor()?;
// Iterate over the account nodes from the changesets
for (nibbles, _old_node) in changesets.account_nodes_ref() {
// Look up the current value of this trie node using the overlay cursor
let node_value = account_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
account_nodes.push((*nibbles, node_value));
}
// Step 6: Collect all storage trie nodes that changed in the target block
let mut storage_tries = B256Map::default();
// Iterate over the storage tries from the changesets
for (hashed_address, storage_changeset) in changesets.storage_tries_ref() {
let mut storage_cursor = cursor_factory.storage_trie_cursor(*hashed_address)?;
let mut storage_nodes = Vec::new();
// Iterate over the storage nodes for this account
for (nibbles, _old_node) in storage_changeset.storage_nodes_ref() {
// Look up the current value of this storage trie node
let node_value = storage_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
storage_nodes.push((*nibbles, node_value));
}
storage_tries.insert(
*hashed_address,
StorageTrieUpdatesSorted { storage_nodes, is_deleted: storage_changeset.is_deleted },
);
}
Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
}
/// Thread-safe changeset cache.
///
/// This type wraps a shared, mutable reference to the cache inner.
/// The `RwLock` enables concurrent reads while ensuring exclusive access for writes.
#[derive(Debug, Clone)]
pub struct ChangesetCache {
inner: Arc<RwLock<ChangesetCacheInner>>,
}
impl Default for ChangesetCache {
fn default() -> Self {
Self::new()
}
}
impl ChangesetCache {
/// Creates a new cache.
///
/// The cache has no capacity limit and relies on explicit eviction
/// via the `evict()` method to manage memory usage.
pub fn new() -> Self {
Self { inner: Arc::new(RwLock::new(ChangesetCacheInner::new())) }
}
/// Retrieves changesets for a block by hash.
///
/// Returns `None` if the block is not in the cache (either evicted or never computed).
/// Updates hit/miss metrics accordingly.
pub fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
self.inner.read().get(block_hash)
}
/// Inserts changesets for a block into the cache.
///
/// This method does not perform any eviction. Eviction must be explicitly
/// triggered by calling `evict()`.
///
/// # Arguments
///
/// * `block_hash` - Hash of the block
/// * `block_number` - Block number for tracking and eviction
/// * `changesets` - Trie changesets to cache
pub fn insert(&self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
self.inner.write().insert(block_hash, block_number, changesets)
}
/// Evicts changesets for blocks below the given block number.
///
/// This should be called after blocks are persisted to the database to free
/// memory for changesets that are no longer needed in the cache.
///
/// # Arguments
///
/// * `up_to_block` - Evict blocks with number < this value. Blocks with number >= this value
/// are retained.
pub fn evict(&self, up_to_block: BlockNumber) {
self.inner.write().evict(up_to_block)
}
/// Gets changesets from cache, or computes them on-the-fly if missing.
///
/// This is the primary API for retrieving changesets. On cache miss,
/// it computes changesets from the database state and populates the cache.
///
/// # Arguments
///
/// * `block_hash` - Hash of the block to get changesets for
/// * `block_number` - Block number (for cache insertion and logging)
/// * `provider` - Database provider for DB access
///
/// # Returns
///
/// Changesets for the block, either from cache or computed on-the-fly
pub fn get_or_compute<P>(
&self,
block_hash: B256,
block_number: u64,
provider: &P,
) -> ProviderResult<Arc<TrieUpdatesSorted>>
where
P: DBProvider + StageCheckpointReader + ChangeSetReader + BlockNumReader,
{
// Try cache first (with read lock)
{
let cache = self.inner.read();
if let Some(changesets) = cache.get(&block_hash) {
debug!(
target: "trie::changeset_cache",
?block_hash,
block_number,
"Changeset cache HIT"
);
return Ok(changesets);
}
}
// Cache miss - compute from database
debug!(
target: "trie::changeset_cache",
?block_hash,
block_number,
"Changeset cache MISS, computing from database"
);
let start = Instant::now();
// Compute changesets
let changesets =
compute_block_trie_changesets(provider, block_number).map_err(ProviderError::other)?;
let changesets = Arc::new(changesets);
let elapsed = start.elapsed();
debug!(
target: "trie::changeset_cache",
?elapsed,
block_number,
?block_hash,
"Changeset computed from database and inserting into cache"
);
// Store in cache (with write lock)
{
let mut cache = self.inner.write();
cache.insert(block_hash, block_number, Arc::clone(&changesets));
}
debug!(
target: "trie::changeset_cache",
?block_hash,
block_number,
"Changeset successfully cached"
);
Ok(changesets)
}
/// Gets or computes accumulated trie reverts for a range of blocks.
///
/// This method retrieves and accumulates all trie changesets (reverts) for the specified
/// block range (inclusive). The changesets are accumulated in reverse order (newest to oldest)
/// so that older values take precedence when there are conflicts.
///
/// # Arguments
///
/// * `provider` - Database provider for DB access and block lookups
/// * `range` - Block range to accumulate reverts for (inclusive)
///
/// # Returns
///
/// Accumulated trie reverts for all blocks in the specified range
///
/// # Errors
///
/// Returns error if:
/// - Any block in the range is beyond the database tip
/// - Database access fails
/// - Block hash lookup fails
/// - Changeset computation fails
pub fn get_or_compute_range<P>(
&self,
provider: &P,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<TrieUpdatesSorted>
where
P: DBProvider + StageCheckpointReader + ChangeSetReader + BlockNumReader,
{
// Get the database tip block number
let db_tip_block = provider
.get_stage_checkpoint(reth_stages_types::StageId::Finish)?
.as_ref()
.map(|chk| chk.block_number)
.ok_or_else(|| ProviderError::InsufficientChangesets {
requested: *range.start(),
available: 0..=0,
})?;
let start_block = *range.start();
let end_block = *range.end();
// If range end is beyond the tip, return an error
if end_block > db_tip_block {
return Err(ProviderError::InsufficientChangesets {
requested: end_block,
available: 0..=db_tip_block,
});
}
let timer = Instant::now();
debug!(
target: "trie::changeset_cache",
start_block,
end_block,
db_tip_block,
"Starting get_or_compute_range"
);
// Use changeset cache to retrieve and accumulate reverts block by block.
// Iterate in reverse order (newest to oldest) so that older changesets
// take precedence when there are conflicting updates.
let mut accumulated_reverts = TrieUpdatesSorted::default();
for block_number in range.rev() {
// Get the block hash for this block number
let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
ProviderError::other(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("block hash not found for block number {}", block_number),
))
})?;
debug!(
target: "trie::changeset_cache",
block_number,
?block_hash,
"Looked up block hash for block number in range"
);
// Get changesets from cache (or compute on-the-fly)
let changesets = self.get_or_compute(block_hash, block_number, provider)?;
// Overlay this block's changesets on top of accumulated reverts.
// Since we iterate newest to oldest, older values are added last
// and overwrite any conflicting newer values (oldest changeset values take
// precedence).
accumulated_reverts.extend_ref(&changesets);
}
let elapsed = timer.elapsed();
let num_account_nodes = accumulated_reverts.account_nodes_ref().len();
let num_storage_tries = accumulated_reverts.storage_tries_ref().len();
debug!(
target: "trie::changeset_cache",
?elapsed,
start_block,
end_block,
num_blocks = end_block.saturating_sub(start_block).saturating_add(1),
num_account_nodes,
num_storage_tries,
"Finished accumulating trie reverts for block range"
);
Ok(accumulated_reverts)
}
}
/// In-memory cache for trie changesets with explicit eviction policy.
///
/// Holds changesets for blocks that have been validated but not yet persisted.
/// Keyed by block hash for fast lookup during reorgs. Eviction is controlled
/// explicitly by the engine API tree handler when persistence completes.
///
/// ## Eviction Policy
///
/// Unlike traditional caches with automatic eviction, this cache requires explicit
/// eviction calls. The engine API tree handler calls `evict(block_number)` after
/// blocks are persisted to the database, ensuring changesets remain available
/// until their corresponding blocks are safely on disk.
///
/// ## Metrics
///
/// The cache maintains several metrics for observability:
/// - `hits`: Number of successful cache lookups
/// - `misses`: Number of failed cache lookups
/// - `evictions`: Number of blocks evicted
/// - `size`: Current number of cached blocks
#[derive(Debug)]
struct ChangesetCacheInner {
/// Cache entries: block hash -> (block number, changesets)
entries: HashMap<B256, (u64, Arc<TrieUpdatesSorted>)>,
/// Block number to hashes mapping for eviction
block_numbers: BTreeMap<u64, Vec<B256>>,
/// Metrics for monitoring cache behavior
#[cfg(feature = "metrics")]
metrics: ChangesetCacheMetrics,
}
#[cfg(feature = "metrics")]
/// Metrics for the changeset cache.
///
/// These metrics provide visibility into cache performance and help identify
/// potential issues like high miss rates.
#[derive(Metrics, Clone)]
#[metrics(scope = "trie.changeset_cache")]
struct ChangesetCacheMetrics {
/// Cache hit counter
hits: Counter,
/// Cache miss counter
misses: Counter,
/// Eviction counter
evictions: Counter,
/// Current cache size (number of entries)
size: Gauge,
}
impl Default for ChangesetCacheInner {
fn default() -> Self {
Self::new()
}
}
impl ChangesetCacheInner {
/// Creates a new empty changeset cache.
///
/// The cache has no capacity limit and relies on explicit eviction
/// via the `evict()` method to manage memory usage.
fn new() -> Self {
Self {
entries: HashMap::new(),
block_numbers: BTreeMap::new(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
}
}
fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
match self.entries.get(block_hash) {
Some((_, changesets)) => {
#[cfg(feature = "metrics")]
self.metrics.hits.increment(1);
Some(Arc::clone(changesets))
}
None => {
#[cfg(feature = "metrics")]
self.metrics.misses.increment(1);
None
}
}
}
fn insert(&mut self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
debug!(
target: "trie::changeset_cache",
?block_hash,
block_number,
cache_size_before = self.entries.len(),
"Inserting changeset into cache"
);
// Insert the entry
self.entries.insert(block_hash, (block_number, changesets));
// Add block hash to block_numbers mapping
self.block_numbers.entry(block_number).or_default().push(block_hash);
// Update size metric
#[cfg(feature = "metrics")]
self.metrics.size.set(self.entries.len() as f64);
debug!(
target: "trie::changeset_cache",
?block_hash,
block_number,
cache_size_after = self.entries.len(),
"Changeset inserted into cache"
);
}
fn evict(&mut self, up_to_block: BlockNumber) {
debug!(
target: "trie::changeset_cache",
up_to_block,
cache_size_before = self.entries.len(),
"Starting cache eviction"
);
// Find all block numbers that should be evicted (< up_to_block)
let blocks_to_evict: Vec<u64> =
self.block_numbers.range(..up_to_block).map(|(num, _)| *num).collect();
// Remove entries for each block number below threshold
#[cfg(feature = "metrics")]
let mut evicted_count = 0;
#[cfg(not(feature = "metrics"))]
let mut evicted_count = 0;
for block_number in &blocks_to_evict {
if let Some(hashes) = self.block_numbers.remove(block_number) {
debug!(
target: "trie::changeset_cache",
block_number,
num_hashes = hashes.len(),
"Evicting block from cache"
);
for hash in hashes {
if self.entries.remove(&hash).is_some() {
evicted_count += 1;
}
}
}
}
debug!(
target: "trie::changeset_cache",
up_to_block,
evicted_count,
cache_size_after = self.entries.len(),
"Finished cache eviction"
);
// Update metrics if we evicted anything
#[cfg(feature = "metrics")]
if evicted_count > 0 {
self.metrics.evictions.increment(evicted_count as u64);
self.metrics.size.set(self.entries.len() as f64);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::map::B256Map;
// Helper function to create empty TrieUpdatesSorted for testing
fn create_test_changesets() -> Arc<TrieUpdatesSorted> {
Arc::new(TrieUpdatesSorted::new(vec![], B256Map::default()))
}
#[test]
fn test_insert_and_retrieve_single_entry() {
let mut cache = ChangesetCacheInner::new();
let hash = B256::random();
let changesets = create_test_changesets();
cache.insert(hash, 100, Arc::clone(&changesets));
// Should be able to retrieve it
let retrieved = cache.get(&hash);
assert!(retrieved.is_some());
assert_eq!(cache.entries.len(), 1);
}
#[test]
fn test_insert_multiple_entries() {
let mut cache = ChangesetCacheInner::new();
// Insert 10 blocks
let mut hashes = Vec::new();
for i in 0..10 {
let hash = B256::random();
cache.insert(hash, 100 + i, create_test_changesets());
hashes.push(hash);
}
// Should be able to retrieve all
assert_eq!(cache.entries.len(), 10);
for hash in &hashes {
assert!(cache.get(hash).is_some());
}
}
#[test]
fn test_eviction_when_explicitly_called() {
let mut cache = ChangesetCacheInner::new();
// Insert 15 blocks (0-14)
let mut hashes = Vec::new();
for i in 0..15 {
let hash = B256::random();
cache.insert(hash, i, create_test_changesets());
hashes.push((i, hash));
}
// All blocks should be present (no automatic eviction)
assert_eq!(cache.entries.len(), 15);
// Explicitly evict blocks < 4
cache.evict(4);
// Blocks 0-3 should be evicted
assert_eq!(cache.entries.len(), 11); // blocks 4-14 = 11 blocks
// Verify blocks 0-3 are evicted
for i in 0..4 {
assert!(cache.get(&hashes[i as usize].1).is_none(), "Block {} should be evicted", i);
}
// Verify blocks 4-14 are still present
for i in 4..15 {
assert!(cache.get(&hashes[i as usize].1).is_some(), "Block {} should be present", i);
}
}
#[test]
fn test_eviction_with_persistence_watermark() {
let mut cache = ChangesetCacheInner::new();
// Insert blocks 100-165
let mut hashes = std::collections::HashMap::new();
for i in 100..=165 {
let hash = B256::random();
cache.insert(hash, i, create_test_changesets());
hashes.insert(i, hash);
}
// All blocks should be present (no automatic eviction)
assert_eq!(cache.entries.len(), 66);
// Simulate persistence up to block 164, with 64-block retention window
// Eviction threshold = 164 - 64 = 100
cache.evict(100);
// Blocks 100-165 should remain (66 blocks)
assert_eq!(cache.entries.len(), 66);
// Simulate persistence up to block 165
// Eviction threshold = 165 - 64 = 101
cache.evict(101);
// Blocks 101-165 should remain (65 blocks)
assert_eq!(cache.entries.len(), 65);
assert!(cache.get(&hashes[&100]).is_none());
assert!(cache.get(&hashes[&101]).is_some());
}
#[test]
fn test_out_of_order_inserts_with_explicit_eviction() {
let mut cache = ChangesetCacheInner::new();
// Insert blocks in random order
let hash_10 = B256::random();
cache.insert(hash_10, 10, create_test_changesets());
let hash_5 = B256::random();
cache.insert(hash_5, 5, create_test_changesets());
let hash_15 = B256::random();
cache.insert(hash_15, 15, create_test_changesets());
let hash_3 = B256::random();
cache.insert(hash_3, 3, create_test_changesets());
// All blocks should be present (no automatic eviction)
assert_eq!(cache.entries.len(), 4);
// Explicitly evict blocks < 5
cache.evict(5);
assert!(cache.get(&hash_3).is_none(), "Block 3 should be evicted");
assert!(cache.get(&hash_5).is_some(), "Block 5 should be present");
assert!(cache.get(&hash_10).is_some(), "Block 10 should be present");
assert!(cache.get(&hash_15).is_some(), "Block 15 should be present");
}
#[test]
fn test_multiple_blocks_same_number() {
let mut cache = ChangesetCacheInner::new();
// Insert multiple blocks with same number (side chains)
let hash_1a = B256::random();
let hash_1b = B256::random();
cache.insert(hash_1a, 100, create_test_changesets());
cache.insert(hash_1b, 100, create_test_changesets());
// Both should be retrievable
assert!(cache.get(&hash_1a).is_some());
assert!(cache.get(&hash_1b).is_some());
assert_eq!(cache.entries.len(), 2);
}
#[test]
fn test_eviction_removes_all_side_chains() {
let mut cache = ChangesetCacheInner::new();
// Insert multiple blocks at same height (side chains)
let hash_10a = B256::random();
let hash_10b = B256::random();
let hash_10c = B256::random();
cache.insert(hash_10a, 10, create_test_changesets());
cache.insert(hash_10b, 10, create_test_changesets());
cache.insert(hash_10c, 10, create_test_changesets());
let hash_20 = B256::random();
cache.insert(hash_20, 20, create_test_changesets());
assert_eq!(cache.entries.len(), 4);
// Evict blocks < 15 - should remove all three side chains at height 10
cache.evict(15);
assert_eq!(cache.entries.len(), 1);
assert!(cache.get(&hash_10a).is_none());
assert!(cache.get(&hash_10b).is_none());
assert!(cache.get(&hash_10c).is_none());
assert!(cache.get(&hash_20).is_some());
}
}

View File

@@ -2,6 +2,8 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
mod changesets;
pub use changesets::*;
mod hashed_cursor;
mod prefix_set;
mod proof;

View File

@@ -38,7 +38,8 @@ pub fn calculate_state_root(c: &mut Criterion) {
provider_rw.commit().unwrap();
}
let factory = OverlayStateProviderFactory::new(provider_factory.clone());
let changeset_cache = reth_trie_db::ChangesetCache::new();
let factory = OverlayStateProviderFactory::new(provider_factory.clone(), changeset_cache);
// state root
group.bench_function(BenchmarkId::new("sync root", size), |b| {

View File

@@ -322,7 +322,9 @@ mod tests {
let rt = Runtime::new().unwrap();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory);
let changeset_cache = reth_trie_db::ChangesetCache::new();
let factory =
reth_provider::providers::OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(factory);
let proof_worker_handle =
ProofWorkerHandle::new(rt.handle().clone(), task_ctx, 1, 1, false);

View File

@@ -1726,8 +1726,11 @@ mod tests {
runtime.block_on(async {
let handle = tokio::runtime::Handle::current();
let provider_factory = create_test_provider_factory();
let factory =
reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
let changeset_cache = reth_trie_db::ChangesetCache::new();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(
provider_factory,
changeset_cache,
);
let ctx = test_ctx(factory);
let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3, false);

View File

@@ -298,8 +298,11 @@ mod tests {
#[tokio::test]
async fn random_parallel_root() {
let factory = create_test_provider_factory();
let mut overlay_factory =
reth_provider::providers::OverlayStateProviderFactory::new(factory.clone());
let changeset_cache = reth_trie_db::ChangesetCache::new();
let mut overlay_factory = reth_provider::providers::OverlayStateProviderFactory::new(
factory.clone(),
changeset_cache,
);
let mut rng = rand::rng();
let mut state = (0..100)

View File

@@ -0,0 +1,476 @@
//! Trie changeset computation.
//!
//! This module provides functionality to compute trie changesets from trie updates.
//! Changesets represent the old values of trie nodes before a block was applied,
//! enabling reorgs by reverting blocks to their previous state.
//!
//! ## Overview
//!
//! When a block is executed, the trie is updated with new node values. To support
//! chain reorganizations, we need to preserve the old values that existed before
//! the block was applied. These old values are called "changesets".
//!
//! ## Usage
//!
//! The primary function is `compute_trie_changesets`, which takes:
//! - A `TrieCursorFactory` for reading current trie state
//! - `TrieUpdatesSorted` containing the new node values
//!
//! And returns `TrieUpdatesSorted` containing the old node values.
use crate::trie_cursor::TrieCursorIter;
use alloy_primitives::{map::B256Map, B256};
use itertools::{merge_join_by, EitherOrBoth};
use reth_storage_errors::db::DatabaseError;
use reth_trie_common::{
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
BranchNodeCompact, Nibbles,
};
use std::cmp::Ordering;
use crate::trie_cursor::{TrieCursor, TrieCursorFactory, TrieStorageCursor};
/// Result type for changeset operations.
pub type ChangesetResult<T> = Result<T, DatabaseError>;
/// Computes trie changesets by looking up current node values from the trie.
///
/// Takes the new trie updates and queries the trie for the old values of
/// changed nodes. Returns changesets representing the state before the block
/// was applied, suitable for reorg operations.
///
/// # Arguments
///
/// * `factory` - Trie cursor factory for reading current trie state
/// * `trie_updates` - New trie node values produced by state root computation
///
/// # Returns
///
/// `TrieUpdatesSorted` containing old node values (before this block)
pub fn compute_trie_changesets<Factory>(
factory: &Factory,
trie_updates: &TrieUpdatesSorted,
) -> ChangesetResult<TrieUpdatesSorted>
where
Factory: TrieCursorFactory,
{
// Compute account trie changesets
let account_nodes = compute_account_changesets(factory, trie_updates)?;
// Compute storage trie changesets
let mut storage_tries = B256Map::default();
// Create storage cursor once and reuse it for all addresses
let mut storage_cursor = factory.storage_trie_cursor(B256::default())?;
for (hashed_address, storage_updates) in trie_updates.storage_tries_ref() {
storage_cursor.set_hashed_address(*hashed_address);
let storage_changesets = if storage_updates.is_deleted() {
// Handle wiped storage
compute_wiped_storage_changesets(&mut storage_cursor, storage_updates)?
} else {
// Handle normal storage updates
compute_storage_changesets(&mut storage_cursor, storage_updates)?
};
if !storage_changesets.is_empty() {
storage_tries.insert(
*hashed_address,
StorageTrieUpdatesSorted {
is_deleted: storage_updates.is_deleted(),
storage_nodes: storage_changesets,
},
);
}
}
// Build and return the result
Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
}
/// Computes account trie changesets.
///
/// Looks up the current value for each changed account node path and returns
/// a vector of (path, `old_node`) pairs. The result is already sorted since
/// `trie_updates.account_nodes_ref()` is sorted.
fn compute_account_changesets<Factory>(
factory: &Factory,
trie_updates: &TrieUpdatesSorted,
) -> ChangesetResult<Vec<(Nibbles, Option<BranchNodeCompact>)>>
where
Factory: TrieCursorFactory,
{
let mut cursor = factory.account_trie_cursor()?;
let mut account_changesets = Vec::with_capacity(trie_updates.account_nodes_ref().len());
// For each changed account node, look up its current value
// The input is already sorted, so the output will be sorted
for (path, _new_node) in trie_updates.account_nodes_ref() {
let old_node = cursor.seek_exact(*path)?.map(|(_path, node)| node);
account_changesets.push((*path, old_node));
}
Ok(account_changesets)
}
/// Computes storage trie changesets for a single account.
///
/// Looks up the current value for each changed storage node path and returns
/// a vector of (path, `old_node`) pairs. The result is already sorted since
/// `storage_updates.storage_nodes` is sorted.
///
/// # Arguments
///
/// * `cursor` - Reusable storage trie cursor. The hashed address will be set before use.
/// * `hashed_address` - The hashed address of the account
/// * `storage_updates` - Storage trie updates for this account
fn compute_storage_changesets(
cursor: &mut impl TrieStorageCursor,
storage_updates: &StorageTrieUpdatesSorted,
) -> ChangesetResult<Vec<(Nibbles, Option<BranchNodeCompact>)>> {
let mut storage_changesets = Vec::with_capacity(storage_updates.storage_nodes.len());
// For each changed storage node, look up its current value
// The input is already sorted, so the output will be sorted
for (path, _new_node) in &storage_updates.storage_nodes {
let old_node = cursor.seek_exact(*path)?.map(|(_path, node)| node);
storage_changesets.push((*path, old_node));
}
Ok(storage_changesets)
}
/// Handles wiped storage trie changeset computation.
///
/// When an account's storage is completely wiped (e.g., account is destroyed),
/// we need to capture not just the changed nodes, but ALL existing nodes in
/// the storage trie, since they all will be deleted.
///
/// This uses an iterator-based approach to avoid allocating an intermediate Vec.
/// It merges two sorted iterators:
/// - Current values of changed paths
/// - All existing nodes in the storage trie
///
/// # Arguments
///
/// * `changed_cursor` - Cursor for looking up changed node values
/// * `wiped_cursor` - Cursor for iterating all nodes in the storage trie
/// * `hashed_address` - The hashed address of the account
/// * `storage_updates` - Storage trie updates for this account
fn compute_wiped_storage_changesets(
cursor: &mut impl TrieStorageCursor,
storage_updates: &StorageTrieUpdatesSorted,
) -> ChangesetResult<Vec<(Nibbles, Option<BranchNodeCompact>)>> {
// Set the hashed address for this account's storage trie
// Create an iterator that yields all nodes in the storage trie
let all_nodes = TrieCursorIter::new(cursor);
// Merge the two sorted iterators
let merged = storage_trie_wiped_changeset_iter(
storage_updates.storage_nodes.iter().map(|e| e.0),
all_nodes,
)?;
// Collect into a Vec
let mut storage_changesets = Vec::new();
for result in merged {
storage_changesets.push(result?);
}
Ok(storage_changesets)
}
/// Returns an iterator which produces the changeset values for an account whose storage was wiped
/// during a block.
///
/// ## Arguments
///
/// - `curr_values_of_changed` is an iterator over the current values of all trie nodes modified by
/// the block, ordered by path.
/// - `all_nodes` is an iterator over all existing trie nodes for the account, ordered by path.
///
/// ## Returns
///
/// An iterator of trie node paths and a `Some(node)` (indicating the node was wiped) or a `None`
/// (indicating the node was modified in the block but didn't previously exist). The iterator's
/// results will be ordered by path.
pub fn storage_trie_wiped_changeset_iter(
changed_paths: impl Iterator<Item = Nibbles>,
all_nodes: impl Iterator<Item = Result<(Nibbles, BranchNodeCompact), DatabaseError>>,
) -> Result<
impl Iterator<Item = Result<(Nibbles, Option<BranchNodeCompact>), DatabaseError>>,
DatabaseError,
> {
let all_nodes = all_nodes.map(|e| e.map(|(nibbles, node)| (nibbles, Some(node))));
let merged = merge_join_by(changed_paths, all_nodes, |a, b| match (a, b) {
(_, Err(_)) => Ordering::Greater,
(a, Ok(b)) => a.cmp(&b.0),
});
Ok(merged.map(|either_or| match either_or {
EitherOrBoth::Left(changed) => {
// A path of a changed node which was not found in the database. The current value of
// this path must be None, otherwise it would have also been returned by the `all_nodes`
// iter.
Ok((changed, None))
}
EitherOrBoth::Right(wiped) => {
// A node was found in the db (indicating it was wiped) but was not a changed node.
// Return it as-is.
wiped
}
EitherOrBoth::Both(_changed, wiped) => {
// A path of a changed node was found with a previous value in the database. If the
// changed node had no previous value (None) it wouldn't be returned by `all_nodes` and
// so would be in the Left branch.
//
// Due to the ordering closure passed to `merge_join_by` it's not possible for wrapped
// to be an error here.
debug_assert!(wiped.is_ok(), "unreachable error condition: {wiped:?}");
wiped
}
}))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::trie_cursor::mock::MockTrieCursorFactory;
use alloy_primitives::map::B256Map;
use reth_trie_common::updates::StorageTrieUpdatesSorted;
use std::collections::BTreeMap;
#[test]
fn test_empty_updates() {
// Create an empty mock factory
// Note: We need to include B256::default() in storage_tries because
// compute_trie_changesets creates cursors for it upfront
let mut storage_tries = B256Map::default();
storage_tries.insert(B256::default(), BTreeMap::new());
let factory = MockTrieCursorFactory::new(BTreeMap::new(), storage_tries);
// Create empty updates
let updates = TrieUpdatesSorted::new(vec![], B256Map::default());
// Compute changesets
let changesets = compute_trie_changesets(&factory, &updates).unwrap();
// Should produce empty changesets
assert!(changesets.account_nodes_ref().is_empty());
assert!(changesets.storage_tries_ref().is_empty());
}
#[test]
fn test_account_changesets() {
// Create some initial account trie state
let path1 = Nibbles::from_nibbles([0x1, 0x2, 0x3]);
let path2 = Nibbles::from_nibbles([0x4, 0x5, 0x6]);
// tree_mask and hash_mask must be subsets of state_mask
let node1 = BranchNodeCompact::new(0b1111, 0b1010, 0, vec![], None);
let node2 = BranchNodeCompact::new(0b1111, 0b1100, 0, vec![], None);
let mut account_nodes = BTreeMap::new();
account_nodes.insert(path1, node1.clone());
account_nodes.insert(path2, node2);
// Need to include B256::default() for cursor creation
let mut storage_tries = B256Map::default();
storage_tries.insert(B256::default(), BTreeMap::new());
let factory = MockTrieCursorFactory::new(account_nodes, storage_tries);
// Create updates that modify path1 and add a new path3
let path3 = Nibbles::from_nibbles([0x7, 0x8, 0x9]);
let new_node1 = BranchNodeCompact::new(0b1111, 0b0001, 0, vec![], None);
let new_node3 = BranchNodeCompact::new(0b1111, 0b0000, 0, vec![], None);
let updates = TrieUpdatesSorted::new(
vec![(path1, Some(new_node1)), (path3, Some(new_node3))],
B256Map::default(),
);
// Compute changesets
let changesets = compute_trie_changesets(&factory, &updates).unwrap();
// Check account changesets
assert_eq!(changesets.account_nodes_ref().len(), 2);
// path1 should have the old node1 value
assert_eq!(changesets.account_nodes_ref()[0].0, path1);
assert_eq!(changesets.account_nodes_ref()[0].1, Some(node1));
// path3 should have None (it didn't exist before)
assert_eq!(changesets.account_nodes_ref()[1].0, path3);
assert_eq!(changesets.account_nodes_ref()[1].1, None);
}
#[test]
fn test_storage_changesets() {
let hashed_address = B256::from([1u8; 32]);
// Create some initial storage trie state
let path1 = Nibbles::from_nibbles([0x1, 0x2]);
let path2 = Nibbles::from_nibbles([0x3, 0x4]);
let node1 = BranchNodeCompact::new(0b1111, 0b0011, 0, vec![], None);
let node2 = BranchNodeCompact::new(0b1111, 0b0101, 0, vec![], None);
let mut storage_nodes = BTreeMap::new();
storage_nodes.insert(path1, node1.clone());
storage_nodes.insert(path2, node2);
let mut storage_tries = B256Map::default();
storage_tries.insert(B256::default(), BTreeMap::new()); // For cursor creation
storage_tries.insert(hashed_address, storage_nodes);
let factory = MockTrieCursorFactory::new(BTreeMap::new(), storage_tries);
// Create updates that modify path1 and add a new path3
let path3 = Nibbles::from_nibbles([0x5, 0x6]);
let new_node1 = BranchNodeCompact::new(0b1111, 0b1000, 0, vec![], None);
let new_node3 = BranchNodeCompact::new(0b1111, 0b0000, 0, vec![], None);
let mut storage_updates = B256Map::default();
storage_updates.insert(
hashed_address,
StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(path1, Some(new_node1)), (path3, Some(new_node3))],
},
);
let updates = TrieUpdatesSorted::new(vec![], storage_updates);
// Compute changesets
let changesets = compute_trie_changesets(&factory, &updates).unwrap();
// Check storage changesets
assert_eq!(changesets.storage_tries_ref().len(), 1);
let storage_changesets = changesets.storage_tries_ref().get(&hashed_address).unwrap();
assert!(!storage_changesets.is_deleted);
assert_eq!(storage_changesets.storage_nodes.len(), 2);
// path1 should have the old node1 value
assert_eq!(storage_changesets.storage_nodes[0].0, path1);
assert_eq!(storage_changesets.storage_nodes[0].1, Some(node1));
// path3 should have None (it didn't exist before)
assert_eq!(storage_changesets.storage_nodes[1].0, path3);
assert_eq!(storage_changesets.storage_nodes[1].1, None);
}
#[test]
fn test_wiped_storage() {
let hashed_address = B256::from([2u8; 32]);
// Create initial storage trie with multiple nodes
let path1 = Nibbles::from_nibbles([0x1, 0x2]);
let path2 = Nibbles::from_nibbles([0x3, 0x4]);
let path3 = Nibbles::from_nibbles([0x5, 0x6]);
let node1 = BranchNodeCompact::new(0b1111, 0b0011, 0, vec![], None);
let node2 = BranchNodeCompact::new(0b1111, 0b0101, 0, vec![], None);
let node3 = BranchNodeCompact::new(0b1111, 0b1001, 0, vec![], None);
let mut storage_nodes = BTreeMap::new();
storage_nodes.insert(path1, node1.clone());
storage_nodes.insert(path2, node2.clone());
storage_nodes.insert(path3, node3.clone());
let mut storage_tries = B256Map::default();
storage_tries.insert(B256::default(), BTreeMap::new()); // For cursor creation
storage_tries.insert(hashed_address, storage_nodes);
let factory = MockTrieCursorFactory::new(BTreeMap::new(), storage_tries);
// Create updates that modify path1 and mark storage as wiped
let new_node1 = BranchNodeCompact::new(0b1111, 0b1000, 0, vec![], None);
let mut storage_updates = B256Map::default();
storage_updates.insert(
hashed_address,
StorageTrieUpdatesSorted {
is_deleted: true,
storage_nodes: vec![(path1, Some(new_node1))],
},
);
let updates = TrieUpdatesSorted::new(vec![], storage_updates);
// Compute changesets
let changesets = compute_trie_changesets(&factory, &updates).unwrap();
// Check storage changesets
assert_eq!(changesets.storage_tries_ref().len(), 1);
let storage_changesets = changesets.storage_tries_ref().get(&hashed_address).unwrap();
assert!(storage_changesets.is_deleted);
// Should include all three nodes (changed path1 + wiped path2 and path3)
assert_eq!(storage_changesets.storage_nodes.len(), 3);
// All paths should be present in sorted order
assert_eq!(storage_changesets.storage_nodes[0].0, path1);
assert_eq!(storage_changesets.storage_nodes[1].0, path2);
assert_eq!(storage_changesets.storage_nodes[2].0, path3);
// All should have their old values
assert_eq!(storage_changesets.storage_nodes[0].1, Some(node1));
assert_eq!(storage_changesets.storage_nodes[1].1, Some(node2));
assert_eq!(storage_changesets.storage_nodes[2].1, Some(node3));
}
#[test]
fn test_wiped_storage_with_new_path() {
let hashed_address = B256::from([3u8; 32]);
// Create initial storage trie with two nodes
let path1 = Nibbles::from_nibbles([0x1, 0x2]);
let path3 = Nibbles::from_nibbles([0x5, 0x6]);
let node1 = BranchNodeCompact::new(0b1111, 0b0011, 0, vec![], None);
let node3 = BranchNodeCompact::new(0b1111, 0b1001, 0, vec![], None);
let mut storage_nodes = BTreeMap::new();
storage_nodes.insert(path1, node1.clone());
storage_nodes.insert(path3, node3.clone());
let mut storage_tries = B256Map::default();
storage_tries.insert(B256::default(), BTreeMap::new()); // For cursor creation
storage_tries.insert(hashed_address, storage_nodes);
let factory = MockTrieCursorFactory::new(BTreeMap::new(), storage_tries);
// Create updates that add a new path2 that didn't exist before
let path2 = Nibbles::from_nibbles([0x3, 0x4]);
let new_node2 = BranchNodeCompact::new(0b1111, 0b0101, 0, vec![], None);
let mut storage_updates = B256Map::default();
storage_updates.insert(
hashed_address,
StorageTrieUpdatesSorted {
is_deleted: true,
storage_nodes: vec![(path2, Some(new_node2))],
},
);
let updates = TrieUpdatesSorted::new(vec![], storage_updates);
// Compute changesets
let changesets = compute_trie_changesets(&factory, &updates).unwrap();
// Check storage changesets
let storage_changesets = changesets.storage_tries_ref().get(&hashed_address).unwrap();
assert!(storage_changesets.is_deleted);
// Should include all three paths: existing path1, new path2, existing path3
assert_eq!(storage_changesets.storage_nodes.len(), 3);
// Check sorted order
assert_eq!(storage_changesets.storage_nodes[0].0, path1);
assert_eq!(storage_changesets.storage_nodes[1].0, path2);
assert_eq!(storage_changesets.storage_nodes[2].0, path3);
// path1 and path3 have old values, path2 has None (didn't exist)
assert_eq!(storage_changesets.storage_nodes[0].1, Some(node1));
assert_eq!(storage_changesets.storage_nodes[1].1, None);
assert_eq!(storage_changesets.storage_nodes[2].1, Some(node3));
}
}

View File

@@ -38,6 +38,9 @@ pub mod proof_v2;
/// Trie witness generation.
pub mod witness;
/// Trie changeset computation.
pub mod changesets;
/// The implementation of the Merkle Patricia Trie.
mod trie;
pub use trie::{StateRoot, StorageRoot, TrieType};

View File

@@ -131,18 +131,17 @@ Static Files:
<STAGE>
Possible values:
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- merkle-changesets: The merkle changesets stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
Logging:
--log.stdout.format <FORMAT>

View File

@@ -162,18 +162,17 @@ Static Files:
The name of the stage to run
Possible values:
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- merkle-changesets: The merkle changesets stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
Networking:
-d, --disable-discovery

View File

@@ -131,18 +131,17 @@ Static Files:
<STAGE>
Possible values:
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- merkle-changesets: The merkle changesets stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
Logging:
--log.stdout.format <FORMAT>

View File

@@ -162,18 +162,17 @@ Static Files:
The name of the stage to run
Possible values:
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- merkle-changesets: The merkle changesets stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
- headers: The headers stage within the pipeline
- bodies: The bodies stage within the pipeline
- senders: The senders stage within the pipeline
- execution: The execution stage within the pipeline
- account-hashing: The account hashing stage within the pipeline
- storage-hashing: The storage hashing stage within the pipeline
- hashing: The account and storage hashing stages within the pipeline
- merkle: The merkle stage within the pipeline
- tx-lookup: The transaction lookup stage within the pipeline
- account-history: The account history stage within the pipeline
- storage-history: The storage history stage within the pipeline
Networking:
-d, --disable-discovery