Compare commits

...

9 Commits

Author SHA1 Message Date
yongkangc
c67534d872 test(engine): cover stale sparse trie checkout drop 2026-03-16 05:14:05 +00:00
yongkangc
cb7ba92e3d fix(engine): tighten shared cache visibility 2026-03-16 05:11:35 +00:00
yongkangc
78ba601e47 fix(node): preserve sparse trie cache kind 2026-03-16 04:55:28 +00:00
yongkangc
ed90ebe0da feat(engine): add sparse trie cache sdk
Expose a public sparse trie cache and checkout lease through EngineSharedCaches, then route the engine payload processor through that API so the SDK surface is exercised in-tree.

Add a small integration smoke test and preserve the existing checkout/store sequencing guard around state-root publication.
2026-03-16 04:55:28 +00:00
yongkangc
f7b45a8585 refactor(node): drop unused builder context resources 2026-03-16 04:53:57 +00:00
yongkangc
64e5a6fdab ci: apply nightly rustfmt
Format the remaining files that fail the repository nightly rustfmt job so the PR can clear the fmt and lint-success checks.
2026-03-16 04:53:57 +00:00
yongkangc
9c826df006 fix(node): make validator shared cache wiring explicit
Keep builder-side shared cache export on BuilderContext, but route validator construction through an explicit cache-aware builder API. This removes the AddOnsContext API break, restores a const BuilderContext::new via lazy ContextResources allocation, and preserves the existing default validator behavior for callers that do not pass shared caches.
2026-03-16 04:53:57 +00:00
yongkangc
1d1398bd6a refactor(engine): route shared caches through context resources 2026-03-16 04:53:57 +00:00
yongkangc
d17405f563 feat(engine): export shared cache handles 2026-03-16 04:53:56 +00:00
9 changed files with 565 additions and 183 deletions

View File

@@ -38,10 +38,7 @@ use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::{
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, ParallelismThresholds,
RevealableSparseTrie, SparseStateTrie,
};
use reth_trie_sparse::ParallelismThresholds;
use std::{
ops::Not,
sync::{
@@ -60,7 +57,10 @@ pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
pub use preserved_sparse_trie::{
PayloadSparseTrieCache, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
SparseTrieCheckout,
};
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
@@ -104,6 +104,52 @@ type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
<N as NodePrimitives>::Receipt,
>;
/// Shared cache handles that can be exported to engine consumers and downstream payload builders.
#[derive(Debug, Clone)]
pub struct EngineSharedCaches<Evm: ConfigureEvm> {
execution_cache: PayloadExecutionCache,
sparse_trie_cache: PayloadSparseTrieCache,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
}
impl<Evm> Default for EngineSharedCaches<Evm>
where
Evm: ConfigureEvm,
{
fn default() -> Self {
Self::with_sparse_trie_kind(PayloadSparseTrieKind::default())
}
}
impl<Evm> EngineSharedCaches<Evm>
where
Evm: ConfigureEvm,
{
/// Creates shared caches backed by the requested sparse trie implementation.
pub fn with_sparse_trie_kind(sparse_trie_kind: PayloadSparseTrieKind) -> Self {
Self {
execution_cache: Default::default(),
sparse_trie_cache: PayloadSparseTrieCache::new(sparse_trie_kind),
precompile_cache_map: Default::default(),
}
}
/// Returns the shared execution cache handle for engine-internal use.
pub(crate) fn execution_cache(&self) -> PayloadExecutionCache {
self.execution_cache.clone()
}
/// Returns the shared sparse trie cache handle.
pub fn sparse_trie_cache(&self) -> PayloadSparseTrieCache {
self.sparse_trie_cache.clone()
}
/// Returns the shared precompile cache map.
pub fn precompile_cache_map(&self) -> PrecompileCacheMap<SpecFor<Evm>> {
self.precompile_cache_map.clone()
}
}
/// Entrypoint for executing the payload.
#[derive(Debug)]
pub struct PayloadProcessor<Evm>
@@ -112,8 +158,8 @@ where
{
/// The executor used by to spawn tasks.
executor: Runtime,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Shared caches reused across payload processing.
shared_caches: EngineSharedCaches<Evm>,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
@@ -126,20 +172,12 @@ where
evm_config: Evm,
/// Whether precompile cache should be disabled.
precompile_cache_disabled: bool,
/// Precompile cache map.
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// LFU hot-slot capacity: max storage slots retained across prune cycles.
sparse_trie_max_hot_slots: usize,
/// LFU hot-account capacity: max account addresses retained across prune cycles.
sparse_trie_max_hot_accounts: usize,
/// Whether sparse trie cache pruning is fully disabled.
disable_sparse_trie_cache_pruning: bool,
/// Whether to use the arena-based sparse trie implementation.
enable_arena_sparse_trie: bool,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -159,23 +197,20 @@ where
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
shared_caches: EngineSharedCaches<Evm>,
) -> Self {
Self {
executor,
execution_cache: Default::default(),
shared_caches,
trie_metrics: Default::default(),
cross_block_cache_size: config.cross_block_cache_size(),
disable_transaction_prewarming: config.disable_prewarming(),
evm_config,
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: SharedPreservedSparseTrie::default(),
sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
enable_arena_sparse_trie: config.enable_arena_sparse_trie(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -189,8 +224,8 @@ where
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
// Wait for both caches in parallel using std threads
let execution_cache = self.execution_cache.clone();
let sparse_trie = self.sparse_state_trie.clone();
let execution_cache = self.shared_caches.execution_cache();
let sparse_trie = self.shared_caches.sparse_trie_cache();
// Use channels and spawn_blocking instead of std::thread::spawn
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
@@ -504,12 +539,12 @@ where
terminate_execution: Arc::new(AtomicBool::new(false)),
executed_tx_index: Arc::clone(&executed_tx_index),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.precompile_cache_map.clone(),
precompile_cache_map: self.shared_caches.precompile_cache_map(),
};
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
self.executor.clone(),
self.execution_cache.clone(),
self.shared_caches.execution_cache(),
prewarm_ctx,
to_multi_proof,
);
@@ -537,7 +572,7 @@ where
/// instance.
#[instrument(level = "debug", target = "engine::caching", skip(self))]
fn cache_for(&self, parent_hash: B256) -> SavedCache {
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
if let Some(cache) = self.shared_caches.execution_cache().get_cache_for(parent_hash) {
debug!("reusing execution cache");
cache
} else {
@@ -562,12 +597,11 @@ where
parent_state_root: B256,
chunk_size: usize,
) {
let preserved_sparse_trie = self.sparse_state_trie.clone();
let sparse_trie_cache = self.shared_caches.sparse_trie_cache();
let trie_metrics = self.trie_metrics.clone();
let max_hot_slots = self.sparse_trie_max_hot_slots;
let max_hot_accounts = self.sparse_trie_max_hot_accounts;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
let enable_arena_sparse_trie = self.enable_arena_sparse_trie;
let executor = self.executor.clone();
let parent_span = Span::current();
@@ -577,49 +611,19 @@ where
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
.entered();
// Reuse a stored SparseStateTrie if available, applying continuation logic.
// If this payload's parent state root matches the preserved trie's anchor,
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
// keep allocations.
let start = Instant::now();
let preserved = preserved_sparse_trie.take();
let mut checkout = sparse_trie_cache.take_or_create_for(parent_state_root);
trie_metrics
.sparse_trie_cache_wait_duration_histogram
.record(start.elapsed().as_secs_f64());
checkout.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
let mut sparse_state_trie = preserved
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
"Creating new sparse trie - no preserved trie available"
);
let default_trie = if enable_arena_sparse_trie {
RevealableSparseTrie::blind_from(
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
)
} else {
RevealableSparseTrie::blind_from(
ConfigurableSparseTrie::HashMap(
ParallelSparseTrie::default().with_parallelism_thresholds(
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
),
),
)
};
SparseStateTrie::default()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true)
});
sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
let mut task = SparseTrieCacheTask::new_with_trie(
let mut task = SparseTrieCacheTask::new_with_checkout(
&executor,
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
sparse_state_trie,
checkout,
chunk_size,
);
@@ -630,7 +634,7 @@ where
// causing take() to return None and forcing it to create a new empty trie
// instead of reusing the preserved one. Holding the guard ensures the next
// block's take() blocks until we've stored the trie for reuse.
let mut guard = preserved_sparse_trie.lock();
let mut guard = sparse_trie_cache.lock();
let task_result = result.as_ref().ok().cloned();
// Send state root computation result - next block may start but will block on take()
@@ -645,7 +649,7 @@ where
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
trie.store_prepared_cleared_with_guard(&mut guard);
drop(guard);
executor.spawn_drop(deferred);
return;
@@ -674,7 +678,7 @@ where
trie_metrics
.sparse_trie_retained_storage_tries
.set(trie.retained_storage_tries_count() as f64);
guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
trie.store_anchored_with_guard(&mut guard, result.state_root);
deferred
} else {
debug!(
@@ -685,7 +689,7 @@ where
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
guard.store(PreservedSparseTrie::cleared(trie));
trie.store_prepared_cleared_with_guard(&mut guard);
deferred
};
drop(guard);
@@ -706,7 +710,7 @@ where
bundle_state: &BundleState,
) {
let disable_cache_metrics = self.disable_cache_metrics;
self.execution_cache.update_with_guard(|cached| {
self.shared_caches.execution_cache().update_with_guard(|cached| {
if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
debug!(
target: "engine::caching",
@@ -1010,7 +1014,7 @@ impl<R> Drop for CacheTaskHandle<R> {
/// - Prepares data for state root proof computation
/// - Runs concurrently but must not interfere with cache saves
#[derive(Clone, Debug, Default)]
pub struct PayloadExecutionCache {
pub(crate) struct PayloadExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
/// Metrics for cache operations.
@@ -1018,11 +1022,11 @@ pub struct PayloadExecutionCache {
}
impl PayloadExecutionCache {
/// Returns the cache for `parent_hash` if it's available for use.
/// Returns the cache backing store for `parent_hash` if it's available for reuse.
///
/// A cache is considered available when:
/// - It exists and matches the requested parent hash
/// - No other tasks are currently using it (checked via Arc reference count)
/// If the tracked cache is available but keyed to a different parent hash, the cache is
/// cleared and returned so callers can reuse the underlying allocations without carrying over
/// stale state.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
@@ -1061,7 +1065,7 @@ impl PayloadExecutionCache {
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone())
return Some(c.clone());
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -1078,7 +1082,7 @@ impl PayloadExecutionCache {
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
pub(crate) fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
@@ -1106,7 +1110,7 @@ impl PayloadExecutionCache {
///
/// Violating this requirement can result in cache corruption, incorrect state data,
/// and potential consensus failures.
pub fn update_with_guard<F>(&self, update_fn: F)
pub(crate) fn update_with_guard<F>(&self, update_fn: F)
where
F: FnOnce(&mut Option<SavedCache>),
{
@@ -1175,8 +1179,9 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
payload_processor::{
evm_state_to_hashed_post_state, EngineSharedCaches, ExecutionEnv, PayloadProcessor,
},
StateProviderBuilder, TreeConfig,
};
use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
@@ -1288,7 +1293,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
EngineSharedCaches::default(),
);
let parent_hash = B256::from([1u8; 32]);
@@ -1300,13 +1305,17 @@ mod tests {
let bundle_state = BundleState::default();
// Cache should be empty initially
assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
assert!(payload_processor
.shared_caches
.execution_cache()
.get_cache_for(block_hash)
.is_none());
// Update cache with inserted block
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
// Cache should now exist for the block hash
let cached = payload_processor.execution_cache.get_cache_for(block_hash);
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block_hash);
assert!(cached.is_some());
assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
}
@@ -1317,13 +1326,14 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
EngineSharedCaches::default(),
);
// Setup: populate cache with block 1
let block1_hash = B256::from([1u8; 32]);
payload_processor
.execution_cache
.shared_caches
.execution_cache()
.update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
// Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
@@ -1338,11 +1348,11 @@ mod tests {
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
// Cache should still be for block 1 (unchanged)
let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block1_hash);
assert!(cached.is_some(), "Original cache should be preserved");
// Cache for block 3 should not exist
let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
let cached3 = payload_processor.shared_caches.execution_cache().get_cache_for(block3_hash);
assert!(cached3.is_none(), "New block cache should not be created on mismatch");
}
@@ -1452,7 +1462,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),
EngineSharedCaches::default(),
);
let provider_factory = BlockchainProvider::new(factory).unwrap();

View File

@@ -1,44 +1,128 @@
//! Preserved sparse trie for reuse across payload validations.
use super::{
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS, SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
};
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::{ConfigurableSparseTrie, SparseStateTrie};
use std::{sync::Arc, time::Instant};
use reth_trie_sparse::{
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, RevealableSparseTrie,
SparseStateTrie,
};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::{Duration, Instant},
};
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
pub(super) type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
/// Sparse trie implementation used by [`PayloadSparseTrieCache`].
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum PayloadSparseTrieKind {
/// Back sparse trie storage with hash maps.
#[default]
HashMap,
/// Back sparse trie storage with arena allocations.
Arena,
}
impl From<bool> for PayloadSparseTrieKind {
fn from(enable_arena_sparse_trie: bool) -> Self {
if enable_arena_sparse_trie {
Self::Arena
} else {
Self::HashMap
}
}
}
#[derive(Debug, Default)]
struct PayloadSparseTrieState {
latest_checkout_id: u64,
preserved: Option<PreservedSparseTrie>,
}
/// Outcome of storing a checked-out sparse trie back into the shared cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PayloadSparseTrieStoreOutcome {
/// The checkout was the most recent lease and the trie was stored.
Stored,
/// A newer checkout had already been issued, so this stale lease was ignored.
IgnoredStaleCheckout,
}
/// Shared sparse trie cache that can be reused across payload validations.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
/// This is the public sparse-trie SDK surface exposed through
/// [`EngineSharedCaches`](super::EngineSharedCaches). Callers take or create a trie, use it for
/// payload work, then store it back either anchored to the resulting state root or cleared for
/// allocation reuse.
#[derive(Debug, Clone)]
pub struct PayloadSparseTrieCache {
kind: PayloadSparseTrieKind,
state: Arc<Mutex<PayloadSparseTrieState>>,
}
impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
self.0.lock().take()
impl Default for PayloadSparseTrieCache {
fn default() -> Self {
Self::new(PayloadSparseTrieKind::default())
}
}
impl PayloadSparseTrieCache {
/// Creates a sparse trie cache backed by the requested trie implementation.
pub fn new(kind: PayloadSparseTrieKind) -> Self {
Self { kind, state: Arc::new(Mutex::new(PayloadSparseTrieState::default())) }
}
/// Acquires a guard that blocks `take()` until dropped.
/// Use this before sending the state root result to ensure the next block
/// waits for the trie to be stored.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
/// Returns the sparse trie implementation used when the cache needs to create a new trie.
pub const fn kind(&self) -> PayloadSparseTrieKind {
self.kind
}
/// Takes a preserved trie for `parent_state_root` or creates a new trie if the cache is empty.
pub fn take_or_create_for(&self, parent_state_root: B256) -> SparseTrieCheckout {
let start = Instant::now();
let mut state = self.state.lock();
state.latest_checkout_id += 1;
let checkout_id = state.latest_checkout_id;
let trie = state
.preserved
.take()
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
%parent_state_root,
kind = ?self.kind,
"Creating new sparse trie - no preserved trie available"
);
new_sparse_trie(self.kind)
});
drop(state);
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for preserved sparse trie checkout"
);
}
SparseTrieCheckout { trie: Some(trie), cache: self.clone(), checkout_id }
}
/// Waits until the sparse trie lock becomes available.
///
/// This acquires and immediately releases the lock, ensuring that any
/// ongoing operations complete before returning. Useful for synchronization
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
let _guard = self.0.lock();
let _guard = self.state.lock();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
@@ -49,27 +133,142 @@ impl SharedPreservedSparseTrie {
}
elapsed
}
/// Acquires a guard that blocks cache mutation until dropped.
///
/// Engine-internal code uses this before making the state-root result visible so the next
/// payload cannot observe an empty cache between send and store.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard { state: self.state.lock() }
}
}
/// A checked-out sparse trie lease.
///
/// This dereferences to [`SparseStateTrie`] so callers can reuse the trie directly. If the lease is
/// dropped without being stored back, a cleared trie is returned to the shared cache unless a newer
/// checkout has already superseded it.
#[derive(Debug)]
pub struct SparseTrieCheckout {
trie: Option<SparseTrie>,
cache: PayloadSparseTrieCache,
checkout_id: u64,
}
impl SparseTrieCheckout {
/// Stores the trie back into the shared cache anchored to the given state root.
pub fn store_anchored(self, state_root: B256) -> PayloadSparseTrieStoreOutcome {
let cache = self.cache.clone();
let mut guard = cache.lock();
self.store_anchored_with_guard(&mut guard, state_root)
}
/// Stores the trie back into the shared cache in a cleared state.
pub fn store_cleared(mut self) -> PayloadSparseTrieStoreOutcome {
let cache = self.cache.clone();
let mut trie = self.take_trie();
prepare_cleared_trie(&mut trie);
let deferred = trie.take_deferred_drops();
let mut guard = cache.lock();
let outcome = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
drop(guard);
drop(deferred);
outcome
}
/// Stores the trie back into the shared cache anchored to the given state root while the
/// caller is already holding the preservation lock.
pub(super) fn store_anchored_with_guard(
mut self,
guard: &mut PreservedTrieGuard<'_>,
state_root: B256,
) -> PayloadSparseTrieStoreOutcome {
guard.store(self.checkout_id, PreservedSparseTrie::anchored(self.take_trie(), state_root))
}
/// Stores an already-cleared trie back into the shared cache while the caller is already
/// holding the preservation lock.
pub(super) fn store_prepared_cleared_with_guard(
mut self,
guard: &mut PreservedTrieGuard<'_>,
) -> PayloadSparseTrieStoreOutcome {
guard.store(self.checkout_id, PreservedSparseTrie::cleared(self.take_trie()))
}
fn take_trie(&mut self) -> SparseTrie {
self.trie.take().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl Deref for SparseTrieCheckout {
type Target = SparseTrie;
fn deref(&self) -> &Self::Target {
self.trie.as_ref().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl DerefMut for SparseTrieCheckout {
fn deref_mut(&mut self) -> &mut Self::Target {
self.trie.as_mut().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl Drop for SparseTrieCheckout {
fn drop(&mut self) {
let Some(mut trie) = self.trie.take() else { return };
debug!(
target: "engine::tree::payload_processor",
checkout_id = self.checkout_id,
"Sparse trie checkout dropped before store, returning cleared trie to cache"
);
prepare_cleared_trie(&mut trie);
let deferred = trie.take_deferred_drops();
let mut guard = self.cache.lock();
let _ = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
drop(guard);
drop(deferred);
}
}
/// Guard that holds the lock on the preserved trie.
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
/// While held, take-or-create calls will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a> {
state: parking_lot::MutexGuard<'a, PayloadSparseTrieState>,
}
impl PreservedTrieGuard<'_> {
/// Stores a preserved trie for later reuse.
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
self.0.replace(trie);
/// Stores a preserved trie for later reuse if the checkout is still current.
fn store(
&mut self,
checkout_id: u64,
trie: PreservedSparseTrie,
) -> PayloadSparseTrieStoreOutcome {
if checkout_id != self.state.latest_checkout_id {
debug!(
target: "engine::tree::payload_processor",
checkout_id,
latest_checkout_id = self.state.latest_checkout_id,
"Ignoring stale sparse trie checkout"
);
return PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout;
}
self.state.preserved.replace(trie);
PayloadSparseTrieStoreOutcome::Stored
}
}
/// A preserved sparse trie that can be reused across payload validations.
///
/// The trie exists in one of two states:
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
/// matches the anchor.
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state
/// root matches the anchor.
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
#[derive(Debug)]
pub(super) enum PreservedSparseTrie {
enum PreservedSparseTrie {
/// Trie with a computed state root that can be reused for continuation payloads.
Anchored {
/// The sparse state trie (pruned after root computation).
@@ -87,24 +286,17 @@ pub(super) enum PreservedSparseTrie {
impl PreservedSparseTrie {
/// Creates a new anchored preserved trie.
///
/// The `state_root` is the computed state root from the trie, which becomes the
/// anchor for determining if subsequent payloads can reuse this trie.
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
Self::Anchored { trie, state_root }
}
/// Creates a cleared preserved trie (allocations preserved, data cleared).
pub(super) const fn cleared(trie: SparseTrie) -> Self {
const fn cleared(trie: SparseTrie) -> Self {
Self::Cleared { trie }
}
/// Consumes self and returns the trie for reuse.
///
/// If the preserved trie is anchored and the parent state root matches, the pruned
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
/// are preserved to reduce memory overhead.
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
match self {
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
debug!(
@@ -135,3 +327,111 @@ impl PreservedSparseTrie {
}
}
}
fn new_sparse_trie(kind: PayloadSparseTrieKind) -> SparseTrie {
let default_trie = match kind {
PayloadSparseTrieKind::HashMap => {
RevealableSparseTrie::blind_from(ConfigurableSparseTrie::HashMap(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
}
PayloadSparseTrieKind::Arena => RevealableSparseTrie::blind_from(
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
),
};
SparseStateTrie::default()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true)
}
fn prepare_cleared_trie(trie: &mut SparseTrie) {
trie.clear();
trie.shrink_to(SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY, SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn take_or_create_reuses_matching_anchor() {
let cache = PayloadSparseTrieCache::default();
let state_root = B256::with_last_byte(1);
assert_eq!(
cache.take_or_create_for(state_root).store_anchored(state_root),
PayloadSparseTrieStoreOutcome::Stored
);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root: anchored, .. }) => {
assert_eq!(*anchored, state_root);
}
other => panic!("expected anchored trie, got {other:?}"),
}
}
#[test]
fn drop_restores_cleared_trie() {
let cache = PayloadSparseTrieCache::default();
let state_root = B256::with_last_byte(2);
let mut checkout = cache.take_or_create_for(state_root);
checkout.set_updates(true);
drop(checkout);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Cleared { .. }) => {}
other => panic!("expected cleared trie, got {other:?}"),
}
}
#[test]
fn stale_checkout_does_not_overwrite_newer_store() {
let cache = PayloadSparseTrieCache::default();
let parent_state_root = B256::with_last_byte(3);
let anchored_state_root = B256::with_last_byte(4);
let stale = cache.take_or_create_for(parent_state_root);
let fresh = cache.take_or_create_for(parent_state_root);
assert_eq!(
fresh.store_anchored(anchored_state_root),
PayloadSparseTrieStoreOutcome::Stored
);
assert_eq!(stale.store_cleared(), PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
assert_eq!(*state_root, anchored_state_root);
}
other => panic!("expected anchored trie to survive stale checkout, got {other:?}"),
}
}
#[test]
fn stale_checkout_drop_does_not_overwrite_newer_store() {
let cache = PayloadSparseTrieCache::default();
let parent_state_root = B256::with_last_byte(5);
let anchored_state_root = B256::with_last_byte(6);
let stale = cache.take_or_create_for(parent_state_root);
let fresh = cache.take_or_create_for(parent_state_root);
assert_eq!(
fresh.store_anchored(anchored_state_root),
PayloadSparseTrieStoreOutcome::Stored
);
drop(stale);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
assert_eq!(*state_root, anchored_state_root);
}
other => panic!("expected anchored trie to survive stale checkout drop, got {other:?}"),
}
}
}

View File

@@ -84,7 +84,7 @@ where
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Initializes the task with the given transactions pending execution
pub fn new(
pub(crate) fn new(
executor: Runtime,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,

View File

@@ -7,7 +7,7 @@ use crate::tree::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
DEFAULT_MAX_TARGETS_FOR_CHUNKING,
},
payload_processor::multiproof::MultiProofTaskMetrics,
payload_processor::{multiproof::MultiProofTaskMetrics, SparseTrieCheckout},
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
@@ -30,7 +30,7 @@ use reth_trie_parallel::{
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
use reth_trie_sparse::{
errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
RevealableSparseTrie, SparseStateTrie, SparseTrie,
RevealableSparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use tracing::{debug, debug_span, error, instrument, trace_span};
@@ -39,7 +39,7 @@ use tracing::{debug, debug_span, error, instrument, trace_span};
const MAX_PENDING_UPDATES: usize = 100;
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
pub(super) struct SparseTrieCacheTask {
/// Sender for proof results.
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Receiver for proof results directly from workers.
@@ -47,7 +47,7 @@ pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = Configurab
/// Receives updates from execution and prewarming.
updates: CrossbeamReceiver<SparseTrieTaskMessage>,
/// `SparseStateTrie` used for computing the state root.
trie: SparseStateTrie<A, S>,
trie: SparseTrieCheckout,
/// Handle to the proof worker pools (storage and account).
proof_worker_handle: ProofWorkerHandle,
@@ -110,18 +110,14 @@ pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = Configurab
metrics: MultiProofTaskMetrics,
}
impl<A, S> SparseTrieCacheTask<A, S>
where
A: SparseTrie + Default,
S: SparseTrie + Default + Clone,
{
impl SparseTrieCacheTask {
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_trie(
pub(super) fn new_with_checkout(
executor: &Runtime,
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
trie: SparseStateTrie<A, S>,
trie: SparseTrieCheckout,
chunk_size: usize,
) -> Self {
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
@@ -205,7 +201,7 @@ where
max_values_capacity: usize,
disable_pruning: bool,
updates: &TrieUpdates,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
) -> (SparseTrieCheckout, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.commit_updates(updates);
if !disable_pruning {
@@ -224,7 +220,7 @@ where
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
) -> (SparseTrieCheckout, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.clear();
trie.shrink_to(max_nodes_capacity, max_values_capacity);
@@ -306,9 +302,9 @@ where
self.promote_pending_account_updates()?;
self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
if self.finished_state_updates &&
self.account_updates.is_empty() &&
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
if self.finished_state_updates
&& self.account_updates.is_empty()
&& self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
{
break;
}
@@ -600,7 +596,6 @@ where
Ok(updates_len_after < updates_len_before)
}
/// Computes storage roots for accounts whose storage updates are fully drained.
///
/// For each storage trie T that:
@@ -621,16 +616,16 @@ where
.filter_map(|(address, updates)| updates.is_empty().then_some(*address))
.collect();
struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
struct SendStorageTriePtr(*mut RevealableSparseTrie<ConfigurableSparseTrie>);
// SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
// documented at the use site below.
unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
unsafe impl Send for SendStorageTriePtr {}
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr)> =
Vec::with_capacity(addresses_to_compute_roots.len());
for address in addresses_to_compute_roots {
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
!trie.is_root_cached()
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address)
&& !trie.is_root_cached()
{
tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
}
@@ -729,7 +724,7 @@ where
// We need to keep iterating if any updates are being drained because that might
// indicate that more pending account updates can be promoted.
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
break
break;
}
}
@@ -850,7 +845,6 @@ pub struct StateRootComputeOutcome {
mod tests {
use super::*;
use alloy_primitives::{keccak256, Address, B256, U256};
use reth_trie_sparse::ArenaParallelSparseTrie;
#[test]
fn test_run_hashing_task_hashed_state_update_forwards() {
@@ -873,10 +867,7 @@ mod tests {
let expected_state = hashed_state.clone();
let handle = std::thread::spawn(move || {
SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
updates_rx,
hashed_state_tx,
);
SparseTrieCacheTask::run_hashing_task(updates_rx, hashed_state_tx);
});
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();

View File

@@ -4,7 +4,7 @@ use crate::tree::{
cached_state::{CacheStats, CachedStateProvider},
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::{InstrumentedStateProvider, StateProviderStats},
payload_processor::PayloadProcessor,
payload_processor::{EngineSharedCaches, PayloadProcessor},
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
@@ -190,16 +190,13 @@ where
validator: V,
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
shared_caches: EngineSharedCaches<Evm>,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
) -> Self {
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
runtime.clone(),
evm_config.clone(),
&config,
precompile_cache_map.clone(),
);
let precompile_cache_map = shared_caches.precompile_cache_map();
let payload_processor =
PayloadProcessor::new(runtime.clone(), evm_config.clone(), &config, shared_caches);
Self {
provider,
consensus,
@@ -313,7 +310,7 @@ where
// Validate block consensus rules which includes header validation
if let Err(consensus_err) = self.validate_block_inner(&block, None) {
// Header validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into())
return Err(InsertBlockError::new(block, consensus_err.into()).into());
}
// Also validate against the parent
@@ -321,7 +318,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
// Parent validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into())
return Err(InsertBlockError::new(block, consensus_err.into()).into());
}
// No header validation errors, return the original execution error
@@ -396,7 +393,7 @@ where
Ok(val) => val,
Err(e) => {
let block = convert_to_block(input)?;
return Err(InsertBlockError::new(block, e.into()).into())
return Err(InsertBlockError::new(block, e.into()).into());
}
}
};
@@ -429,7 +426,7 @@ where
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
.into());
};
let mut state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
@@ -442,7 +439,7 @@ where
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
.into());
};
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
@@ -762,7 +759,7 @@ where
)
.into(),
)
.into())
.into());
}
let timing_stats = state_provider_stats.map(|stats| {
@@ -824,14 +821,14 @@ where
) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
return Err(e)
return Err(e);
}
if let Err(e) =
self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
{
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
return Err(e)
return Err(e);
}
Ok(())
@@ -1323,7 +1320,7 @@ where
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
// validate block consensus rules
if let Err(e) = self.validate_block_inner(block, transaction_root) {
return Err(e.into())
return Err(e.into());
}
// now validate against the parent
@@ -1332,7 +1329,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(e.into())
return Err(e.into());
}
drop(_enter);
@@ -1345,7 +1342,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())
return Err(err.into());
}
drop(_enter);
@@ -1361,7 +1358,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())
return Err(err.into());
}
// record post-execution validation duration
@@ -1469,7 +1466,7 @@ where
self.provider.clone(),
historical,
Some(blocks),
)))
)));
}
// Check if the block is persisted
@@ -1477,7 +1474,7 @@ where
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
// For persisted blocks, we create a builder that will fetch state directly from the
// database
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
}
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
@@ -1509,7 +1506,7 @@ where
) {
if state.invalid_headers.get(&block.hash()).is_some() {
// we already marked this block as invalid
return
return;
}
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
}

View File

@@ -203,6 +203,7 @@ impl TestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
EngineSharedCaches::default(),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
@@ -407,6 +408,7 @@ impl ValidatorTestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
EngineSharedCaches::default(),
changeset_cache,
reth_tasks::Runtime::test(),
);

View File

@@ -0,0 +1,27 @@
//! SDK smoke tests for `EngineSharedCaches`.
use alloy_primitives::B256;
use reth_engine_tree::tree::{
EngineSharedCaches, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
};
use reth_evm_ethereum::EthEvmConfig;
#[test]
fn engine_shared_caches_exposes_public_sparse_trie_sdk() {
let caches =
EngineSharedCaches::<EthEvmConfig>::with_sparse_trie_kind(PayloadSparseTrieKind::Arena);
let _precompile_cache_map = caches.precompile_cache_map();
let sparse_trie_cache = caches.sparse_trie_cache();
assert_eq!(sparse_trie_cache.kind(), PayloadSparseTrieKind::Arena);
let state_root = B256::with_last_byte(1);
assert_eq!(
sparse_trie_cache.take_or_create_for(state_root).store_anchored(state_root),
PayloadSparseTrieStoreOutcome::Stored
);
let checkout = sparse_trie_cache.take_or_create_for(state_root);
assert!(checkout.memory_size() > 0 || checkout.retained_storage_tries_count() == 0);
}

View File

@@ -15,7 +15,7 @@ use reth_engine_tree::{
chain::{ChainEvent, FromOrchestrator},
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
launch::build_engine_orchestrator,
tree::TreeConfig,
tree::{EngineSharedCaches, PayloadSparseTrieKind, TreeConfig},
};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
@@ -90,6 +90,10 @@ impl EngineNodeLauncher {
// Create changeset cache that will be shared across the engine
let changeset_cache = ChangesetCache::new();
let main_shared_caches =
EngineSharedCaches::<<CB::Components as NodeComponents<T>>::Evm>::with_sparse_trie_kind(
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
);
// setup the launch context
let ctx = ctx
@@ -123,7 +127,8 @@ impl EngineNodeLauncher {
.with_blockchain_db::<T, _>(move |provider_factory| {
Ok(BlockchainProvider::new(provider_factory)?)
})?
.with_components(components_builder, on_component_initialized).await?;
.with_components(components_builder, on_component_initialized)
.await?;
// spawn exexs if any
let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
@@ -194,7 +199,12 @@ impl EngineNodeLauncher {
// Build the engine validator with all required components
let engine_validator = validator_builder
.clone()
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
.build_tree_validator_with_caches(
&add_ons_ctx,
engine_tree_config.clone(),
changeset_cache.clone(),
main_shared_caches.clone(),
)
.await?;
// Create the consensus engine stream with optional reorg
@@ -207,8 +217,18 @@ impl EngineNodeLauncher {
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
let reorg_shared_caches = EngineSharedCaches::<
<CB::Components as NodeComponents<T>>::Evm,
>::with_sparse_trie_kind(
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
);
validator_builder
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.build_tree_validator_with_caches(
&add_ons_ctx,
engine_tree_config.clone(),
reorg_cache,
reorg_shared_caches,
)
.await
},
node_config.debug.reorg_frequency,

View File

@@ -1,8 +1,8 @@
//! Builder support for rpc components.
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
use reth_engine_tree::tree::WaitForCaches;
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
use reth_engine_tree::tree::{EngineSharedCaches, PayloadSparseTrieKind, WaitForCaches};
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
pub use reth_trie_db::ChangesetCache;
@@ -981,7 +981,8 @@ where
let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
let engine_api = engine_api_builder.build_engine_api(&ctx).await?;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events, .. } =
ctx;
info!(target: "reth::cli", "Engine API handler initialized");
@@ -1294,6 +1295,25 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
/// Builds the tree validator using the shared cache handles exported by the launcher.
///
/// The default implementation preserves the legacy behavior and ignores the provided caches.
fn build_tree_validator_with_caches(
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send
where
Self: Sized,
{
async move {
let _ = shared_caches;
self.build_tree_validator(ctx, tree_config, changeset_cache).await
}
}
}
/// Basic implementation of [`EngineValidatorBuilder`].
@@ -1341,6 +1361,20 @@ where
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> eyre::Result<Self::EngineValidator> {
let shared_caches = EngineSharedCaches::with_sparse_trie_kind(PayloadSparseTrieKind::from(
tree_config.enable_arena_sparse_trie(),
));
self.build_tree_validator_with_caches(ctx, tree_config, changeset_cache, shared_caches)
.await
}
async fn build_tree_validator_with_caches(
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> eyre::Result<Self::EngineValidator> {
let validator = self.payload_validator_builder.build(ctx).await?;
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
@@ -1353,6 +1387,7 @@ where
validator,
tree_config,
invalid_block_hook,
shared_caches,
changeset_cache,
ctx.node.task_executor().clone(),
))