mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
9 Commits
dan/static
...
yk/shared-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c67534d872 | ||
|
|
cb7ba92e3d | ||
|
|
78ba601e47 | ||
|
|
ed90ebe0da | ||
|
|
f7b45a8585 | ||
|
|
64e5a6fdab | ||
|
|
9c826df006 | ||
|
|
1d1398bd6a | ||
|
|
d17405f563 |
@@ -38,10 +38,7 @@ use reth_trie_parallel::{
|
||||
proof_task::{ProofTaskCtx, ProofWorkerHandle},
|
||||
root::ParallelStateRootError,
|
||||
};
|
||||
use reth_trie_sparse::{
|
||||
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, ParallelismThresholds,
|
||||
RevealableSparseTrie, SparseStateTrie,
|
||||
};
|
||||
use reth_trie_sparse::ParallelismThresholds;
|
||||
use std::{
|
||||
ops::Not,
|
||||
sync::{
|
||||
@@ -60,7 +57,10 @@ pub mod prewarm;
|
||||
pub mod receipt_root_task;
|
||||
pub mod sparse_trie;
|
||||
|
||||
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
|
||||
pub use preserved_sparse_trie::{
|
||||
PayloadSparseTrieCache, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
|
||||
SparseTrieCheckout,
|
||||
};
|
||||
|
||||
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
|
||||
///
|
||||
@@ -104,6 +104,52 @@ type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
|
||||
<N as NodePrimitives>::Receipt,
|
||||
>;
|
||||
|
||||
/// Shared cache handles that can be exported to engine consumers and downstream payload builders.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EngineSharedCaches<Evm: ConfigureEvm> {
|
||||
execution_cache: PayloadExecutionCache,
|
||||
sparse_trie_cache: PayloadSparseTrieCache,
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
}
|
||||
|
||||
impl<Evm> Default for EngineSharedCaches<Evm>
|
||||
where
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::with_sparse_trie_kind(PayloadSparseTrieKind::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Evm> EngineSharedCaches<Evm>
|
||||
where
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
/// Creates shared caches backed by the requested sparse trie implementation.
|
||||
pub fn with_sparse_trie_kind(sparse_trie_kind: PayloadSparseTrieKind) -> Self {
|
||||
Self {
|
||||
execution_cache: Default::default(),
|
||||
sparse_trie_cache: PayloadSparseTrieCache::new(sparse_trie_kind),
|
||||
precompile_cache_map: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the shared execution cache handle for engine-internal use.
|
||||
pub(crate) fn execution_cache(&self) -> PayloadExecutionCache {
|
||||
self.execution_cache.clone()
|
||||
}
|
||||
|
||||
/// Returns the shared sparse trie cache handle.
|
||||
pub fn sparse_trie_cache(&self) -> PayloadSparseTrieCache {
|
||||
self.sparse_trie_cache.clone()
|
||||
}
|
||||
|
||||
/// Returns the shared precompile cache map.
|
||||
pub fn precompile_cache_map(&self) -> PrecompileCacheMap<SpecFor<Evm>> {
|
||||
self.precompile_cache_map.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Entrypoint for executing the payload.
|
||||
#[derive(Debug)]
|
||||
pub struct PayloadProcessor<Evm>
|
||||
@@ -112,8 +158,8 @@ where
|
||||
{
|
||||
/// The executor used by to spawn tasks.
|
||||
executor: Runtime,
|
||||
/// The most recent cache used for execution.
|
||||
execution_cache: PayloadExecutionCache,
|
||||
/// Shared caches reused across payload processing.
|
||||
shared_caches: EngineSharedCaches<Evm>,
|
||||
/// Metrics for trie operations
|
||||
trie_metrics: MultiProofTaskMetrics,
|
||||
/// Cross-block cache size in bytes.
|
||||
@@ -126,20 +172,12 @@ where
|
||||
evm_config: Evm,
|
||||
/// Whether precompile cache should be disabled.
|
||||
precompile_cache_disabled: bool,
|
||||
/// Precompile cache map.
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
|
||||
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
|
||||
/// preservation across sequential payload validations.
|
||||
sparse_state_trie: SharedPreservedSparseTrie,
|
||||
/// LFU hot-slot capacity: max storage slots retained across prune cycles.
|
||||
sparse_trie_max_hot_slots: usize,
|
||||
/// LFU hot-account capacity: max account addresses retained across prune cycles.
|
||||
sparse_trie_max_hot_accounts: usize,
|
||||
/// Whether sparse trie cache pruning is fully disabled.
|
||||
disable_sparse_trie_cache_pruning: bool,
|
||||
/// Whether to use the arena-based sparse trie implementation.
|
||||
enable_arena_sparse_trie: bool,
|
||||
/// Whether to disable cache metrics recording.
|
||||
disable_cache_metrics: bool,
|
||||
}
|
||||
@@ -159,23 +197,20 @@ where
|
||||
executor: Runtime,
|
||||
evm_config: Evm,
|
||||
config: &TreeConfig,
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
shared_caches: EngineSharedCaches<Evm>,
|
||||
) -> Self {
|
||||
Self {
|
||||
executor,
|
||||
execution_cache: Default::default(),
|
||||
shared_caches,
|
||||
trie_metrics: Default::default(),
|
||||
cross_block_cache_size: config.cross_block_cache_size(),
|
||||
disable_transaction_prewarming: config.disable_prewarming(),
|
||||
evm_config,
|
||||
disable_state_cache: config.disable_state_cache(),
|
||||
precompile_cache_disabled: config.precompile_cache_disabled(),
|
||||
precompile_cache_map,
|
||||
sparse_state_trie: SharedPreservedSparseTrie::default(),
|
||||
sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
|
||||
sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
|
||||
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
|
||||
enable_arena_sparse_trie: config.enable_arena_sparse_trie(),
|
||||
disable_cache_metrics: config.disable_cache_metrics(),
|
||||
}
|
||||
}
|
||||
@@ -189,8 +224,8 @@ where
|
||||
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
|
||||
|
||||
// Wait for both caches in parallel using std threads
|
||||
let execution_cache = self.execution_cache.clone();
|
||||
let sparse_trie = self.sparse_state_trie.clone();
|
||||
let execution_cache = self.shared_caches.execution_cache();
|
||||
let sparse_trie = self.shared_caches.sparse_trie_cache();
|
||||
|
||||
// Use channels and spawn_blocking instead of std::thread::spawn
|
||||
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
|
||||
@@ -504,12 +539,12 @@ where
|
||||
terminate_execution: Arc::new(AtomicBool::new(false)),
|
||||
executed_tx_index: Arc::clone(&executed_tx_index),
|
||||
precompile_cache_disabled: self.precompile_cache_disabled,
|
||||
precompile_cache_map: self.precompile_cache_map.clone(),
|
||||
precompile_cache_map: self.shared_caches.precompile_cache_map(),
|
||||
};
|
||||
|
||||
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
|
||||
self.executor.clone(),
|
||||
self.execution_cache.clone(),
|
||||
self.shared_caches.execution_cache(),
|
||||
prewarm_ctx,
|
||||
to_multi_proof,
|
||||
);
|
||||
@@ -537,7 +572,7 @@ where
|
||||
/// instance.
|
||||
#[instrument(level = "debug", target = "engine::caching", skip(self))]
|
||||
fn cache_for(&self, parent_hash: B256) -> SavedCache {
|
||||
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
|
||||
if let Some(cache) = self.shared_caches.execution_cache().get_cache_for(parent_hash) {
|
||||
debug!("reusing execution cache");
|
||||
cache
|
||||
} else {
|
||||
@@ -562,12 +597,11 @@ where
|
||||
parent_state_root: B256,
|
||||
chunk_size: usize,
|
||||
) {
|
||||
let preserved_sparse_trie = self.sparse_state_trie.clone();
|
||||
let sparse_trie_cache = self.shared_caches.sparse_trie_cache();
|
||||
let trie_metrics = self.trie_metrics.clone();
|
||||
let max_hot_slots = self.sparse_trie_max_hot_slots;
|
||||
let max_hot_accounts = self.sparse_trie_max_hot_accounts;
|
||||
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
|
||||
let enable_arena_sparse_trie = self.enable_arena_sparse_trie;
|
||||
let executor = self.executor.clone();
|
||||
|
||||
let parent_span = Span::current();
|
||||
@@ -577,49 +611,19 @@ where
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
|
||||
.entered();
|
||||
|
||||
// Reuse a stored SparseStateTrie if available, applying continuation logic.
|
||||
// If this payload's parent state root matches the preserved trie's anchor,
|
||||
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
|
||||
// keep allocations.
|
||||
let start = Instant::now();
|
||||
let preserved = preserved_sparse_trie.take();
|
||||
let mut checkout = sparse_trie_cache.take_or_create_for(parent_state_root);
|
||||
trie_metrics
|
||||
.sparse_trie_cache_wait_duration_histogram
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
checkout.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
|
||||
|
||||
let mut sparse_state_trie = preserved
|
||||
.map(|preserved| preserved.into_trie_for(parent_state_root))
|
||||
.unwrap_or_else(|| {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
"Creating new sparse trie - no preserved trie available"
|
||||
);
|
||||
let default_trie = if enable_arena_sparse_trie {
|
||||
RevealableSparseTrie::blind_from(
|
||||
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
|
||||
)
|
||||
} else {
|
||||
RevealableSparseTrie::blind_from(
|
||||
ConfigurableSparseTrie::HashMap(
|
||||
ParallelSparseTrie::default().with_parallelism_thresholds(
|
||||
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
|
||||
),
|
||||
),
|
||||
)
|
||||
};
|
||||
SparseStateTrie::default()
|
||||
.with_accounts_trie(default_trie.clone())
|
||||
.with_default_storage_trie(default_trie)
|
||||
.with_updates(true)
|
||||
});
|
||||
sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
|
||||
|
||||
let mut task = SparseTrieCacheTask::new_with_trie(
|
||||
let mut task = SparseTrieCacheTask::new_with_checkout(
|
||||
&executor,
|
||||
from_multi_proof,
|
||||
proof_worker_handle,
|
||||
trie_metrics.clone(),
|
||||
sparse_state_trie,
|
||||
checkout,
|
||||
chunk_size,
|
||||
);
|
||||
|
||||
@@ -630,7 +634,7 @@ where
|
||||
// causing take() to return None and forcing it to create a new empty trie
|
||||
// instead of reusing the preserved one. Holding the guard ensures the next
|
||||
// block's take() blocks until we've stored the trie for reuse.
|
||||
let mut guard = preserved_sparse_trie.lock();
|
||||
let mut guard = sparse_trie_cache.lock();
|
||||
|
||||
let task_result = result.as_ref().ok().cloned();
|
||||
// Send state root computation result - next block may start but will block on take()
|
||||
@@ -645,7 +649,7 @@ where
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
trie.store_prepared_cleared_with_guard(&mut guard);
|
||||
drop(guard);
|
||||
executor.spawn_drop(deferred);
|
||||
return;
|
||||
@@ -674,7 +678,7 @@ where
|
||||
trie_metrics
|
||||
.sparse_trie_retained_storage_tries
|
||||
.set(trie.retained_storage_tries_count() as f64);
|
||||
guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
|
||||
trie.store_anchored_with_guard(&mut guard, result.state_root);
|
||||
deferred
|
||||
} else {
|
||||
debug!(
|
||||
@@ -685,7 +689,7 @@ where
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
trie.store_prepared_cleared_with_guard(&mut guard);
|
||||
deferred
|
||||
};
|
||||
drop(guard);
|
||||
@@ -706,7 +710,7 @@ where
|
||||
bundle_state: &BundleState,
|
||||
) {
|
||||
let disable_cache_metrics = self.disable_cache_metrics;
|
||||
self.execution_cache.update_with_guard(|cached| {
|
||||
self.shared_caches.execution_cache().update_with_guard(|cached| {
|
||||
if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
|
||||
debug!(
|
||||
target: "engine::caching",
|
||||
@@ -1010,7 +1014,7 @@ impl<R> Drop for CacheTaskHandle<R> {
|
||||
/// - Prepares data for state root proof computation
|
||||
/// - Runs concurrently but must not interfere with cache saves
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct PayloadExecutionCache {
|
||||
pub(crate) struct PayloadExecutionCache {
|
||||
/// Guarded cloneable cache identified by a block hash.
|
||||
inner: Arc<RwLock<Option<SavedCache>>>,
|
||||
/// Metrics for cache operations.
|
||||
@@ -1018,11 +1022,11 @@ pub struct PayloadExecutionCache {
|
||||
}
|
||||
|
||||
impl PayloadExecutionCache {
|
||||
/// Returns the cache for `parent_hash` if it's available for use.
|
||||
/// Returns the cache backing store for `parent_hash` if it's available for reuse.
|
||||
///
|
||||
/// A cache is considered available when:
|
||||
/// - It exists and matches the requested parent hash
|
||||
/// - No other tasks are currently using it (checked via Arc reference count)
|
||||
/// If the tracked cache is available but keyed to a different parent hash, the cache is
|
||||
/// cleared and returned so callers can reuse the underlying allocations without carrying over
|
||||
/// stale state.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
|
||||
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
|
||||
let start = Instant::now();
|
||||
@@ -1061,7 +1065,7 @@ impl PayloadExecutionCache {
|
||||
// and picking up polluted data if the fork block fails.
|
||||
c.clear_with_hash(parent_hash);
|
||||
}
|
||||
return Some(c.clone())
|
||||
return Some(c.clone());
|
||||
} else if hash_matches {
|
||||
self.metrics.execution_cache_in_use.increment(1);
|
||||
}
|
||||
@@ -1078,7 +1082,7 @@ impl PayloadExecutionCache {
|
||||
/// This is useful for synchronization before starting payload processing.
|
||||
///
|
||||
/// Returns the time spent waiting for the lock.
|
||||
pub fn wait_for_availability(&self) -> Duration {
|
||||
pub(crate) fn wait_for_availability(&self) -> Duration {
|
||||
let start = Instant::now();
|
||||
// Acquire write lock to wait for any current holders to finish
|
||||
let _guard = self.inner.write();
|
||||
@@ -1106,7 +1110,7 @@ impl PayloadExecutionCache {
|
||||
///
|
||||
/// Violating this requirement can result in cache corruption, incorrect state data,
|
||||
/// and potential consensus failures.
|
||||
pub fn update_with_guard<F>(&self, update_fn: F)
|
||||
pub(crate) fn update_with_guard<F>(&self, update_fn: F)
|
||||
where
|
||||
F: FnOnce(&mut Option<SavedCache>),
|
||||
{
|
||||
@@ -1175,8 +1179,9 @@ mod tests {
|
||||
use super::PayloadExecutionCache;
|
||||
use crate::tree::{
|
||||
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
|
||||
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
|
||||
precompile_cache::PrecompileCacheMap,
|
||||
payload_processor::{
|
||||
evm_state_to_hashed_post_state, EngineSharedCaches, ExecutionEnv, PayloadProcessor,
|
||||
},
|
||||
StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
|
||||
@@ -1288,7 +1293,7 @@ mod tests {
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
EngineSharedCaches::default(),
|
||||
);
|
||||
|
||||
let parent_hash = B256::from([1u8; 32]);
|
||||
@@ -1300,13 +1305,17 @@ mod tests {
|
||||
let bundle_state = BundleState::default();
|
||||
|
||||
// Cache should be empty initially
|
||||
assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
|
||||
assert!(payload_processor
|
||||
.shared_caches
|
||||
.execution_cache()
|
||||
.get_cache_for(block_hash)
|
||||
.is_none());
|
||||
|
||||
// Update cache with inserted block
|
||||
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
|
||||
|
||||
// Cache should now exist for the block hash
|
||||
let cached = payload_processor.execution_cache.get_cache_for(block_hash);
|
||||
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block_hash);
|
||||
assert!(cached.is_some());
|
||||
assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
|
||||
}
|
||||
@@ -1317,13 +1326,14 @@ mod tests {
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
EngineSharedCaches::default(),
|
||||
);
|
||||
|
||||
// Setup: populate cache with block 1
|
||||
let block1_hash = B256::from([1u8; 32]);
|
||||
payload_processor
|
||||
.execution_cache
|
||||
.shared_caches
|
||||
.execution_cache()
|
||||
.update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
|
||||
|
||||
// Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
|
||||
@@ -1338,11 +1348,11 @@ mod tests {
|
||||
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
|
||||
|
||||
// Cache should still be for block 1 (unchanged)
|
||||
let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
|
||||
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block1_hash);
|
||||
assert!(cached.is_some(), "Original cache should be preserved");
|
||||
|
||||
// Cache for block 3 should not exist
|
||||
let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
|
||||
let cached3 = payload_processor.shared_caches.execution_cache().get_cache_for(block3_hash);
|
||||
assert!(cached3.is_none(), "New block cache should not be created on mismatch");
|
||||
}
|
||||
|
||||
@@ -1452,7 +1462,7 @@ mod tests {
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
EngineSharedCaches::default(),
|
||||
);
|
||||
|
||||
let provider_factory = BlockchainProvider::new(factory).unwrap();
|
||||
|
||||
@@ -1,44 +1,128 @@
|
||||
//! Preserved sparse trie for reuse across payload validations.
|
||||
|
||||
use super::{
|
||||
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS, SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use parking_lot::Mutex;
|
||||
use reth_trie_sparse::{ConfigurableSparseTrie, SparseStateTrie};
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use reth_trie_sparse::{
|
||||
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, RevealableSparseTrie,
|
||||
SparseStateTrie,
|
||||
};
|
||||
use std::{
|
||||
ops::{Deref, DerefMut},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::debug;
|
||||
|
||||
/// Type alias for the sparse trie type used in preservation.
|
||||
pub(super) type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
|
||||
type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
|
||||
|
||||
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
|
||||
/// Sparse trie implementation used by [`PayloadSparseTrieCache`].
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PayloadSparseTrieKind {
|
||||
/// Back sparse trie storage with hash maps.
|
||||
#[default]
|
||||
HashMap,
|
||||
/// Back sparse trie storage with arena allocations.
|
||||
Arena,
|
||||
}
|
||||
|
||||
impl From<bool> for PayloadSparseTrieKind {
|
||||
fn from(enable_arena_sparse_trie: bool) -> Self {
|
||||
if enable_arena_sparse_trie {
|
||||
Self::Arena
|
||||
} else {
|
||||
Self::HashMap
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct PayloadSparseTrieState {
|
||||
latest_checkout_id: u64,
|
||||
preserved: Option<PreservedSparseTrie>,
|
||||
}
|
||||
|
||||
/// Outcome of storing a checked-out sparse trie back into the shared cache.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PayloadSparseTrieStoreOutcome {
|
||||
/// The checkout was the most recent lease and the trie was stored.
|
||||
Stored,
|
||||
/// A newer checkout had already been issued, so this stale lease was ignored.
|
||||
IgnoredStaleCheckout,
|
||||
}
|
||||
|
||||
/// Shared sparse trie cache that can be reused across payload validations.
|
||||
///
|
||||
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
|
||||
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
|
||||
/// This is the public sparse-trie SDK surface exposed through
|
||||
/// [`EngineSharedCaches`](super::EngineSharedCaches). Callers take or create a trie, use it for
|
||||
/// payload work, then store it back either anchored to the resulting state root or cleared for
|
||||
/// allocation reuse.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PayloadSparseTrieCache {
|
||||
kind: PayloadSparseTrieKind,
|
||||
state: Arc<Mutex<PayloadSparseTrieState>>,
|
||||
}
|
||||
|
||||
impl SharedPreservedSparseTrie {
|
||||
/// Takes the preserved trie if present, leaving `None` in its place.
|
||||
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
|
||||
self.0.lock().take()
|
||||
impl Default for PayloadSparseTrieCache {
|
||||
fn default() -> Self {
|
||||
Self::new(PayloadSparseTrieKind::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl PayloadSparseTrieCache {
|
||||
/// Creates a sparse trie cache backed by the requested trie implementation.
|
||||
pub fn new(kind: PayloadSparseTrieKind) -> Self {
|
||||
Self { kind, state: Arc::new(Mutex::new(PayloadSparseTrieState::default())) }
|
||||
}
|
||||
|
||||
/// Acquires a guard that blocks `take()` until dropped.
|
||||
/// Use this before sending the state root result to ensure the next block
|
||||
/// waits for the trie to be stored.
|
||||
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
|
||||
PreservedTrieGuard(self.0.lock())
|
||||
/// Returns the sparse trie implementation used when the cache needs to create a new trie.
|
||||
pub const fn kind(&self) -> PayloadSparseTrieKind {
|
||||
self.kind
|
||||
}
|
||||
|
||||
/// Takes a preserved trie for `parent_state_root` or creates a new trie if the cache is empty.
|
||||
pub fn take_or_create_for(&self, parent_state_root: B256) -> SparseTrieCheckout {
|
||||
let start = Instant::now();
|
||||
let mut state = self.state.lock();
|
||||
state.latest_checkout_id += 1;
|
||||
let checkout_id = state.latest_checkout_id;
|
||||
let trie = state
|
||||
.preserved
|
||||
.take()
|
||||
.map(|preserved| preserved.into_trie_for(parent_state_root))
|
||||
.unwrap_or_else(|| {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
%parent_state_root,
|
||||
kind = ?self.kind,
|
||||
"Creating new sparse trie - no preserved trie available"
|
||||
);
|
||||
new_sparse_trie(self.kind)
|
||||
});
|
||||
drop(state);
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed.as_millis() > 5 {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
blocked_for=?elapsed,
|
||||
"Waited for preserved sparse trie checkout"
|
||||
);
|
||||
}
|
||||
|
||||
SparseTrieCheckout { trie: Some(trie), cache: self.clone(), checkout_id }
|
||||
}
|
||||
|
||||
/// Waits until the sparse trie lock becomes available.
|
||||
///
|
||||
/// This acquires and immediately releases the lock, ensuring that any
|
||||
/// ongoing operations complete before returning. Useful for synchronization
|
||||
/// before starting payload processing.
|
||||
///
|
||||
/// Returns the time spent waiting for the lock.
|
||||
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
|
||||
pub fn wait_for_availability(&self) -> Duration {
|
||||
let start = Instant::now();
|
||||
let _guard = self.0.lock();
|
||||
let _guard = self.state.lock();
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed.as_millis() > 5 {
|
||||
debug!(
|
||||
@@ -49,27 +133,142 @@ impl SharedPreservedSparseTrie {
|
||||
}
|
||||
elapsed
|
||||
}
|
||||
|
||||
/// Acquires a guard that blocks cache mutation until dropped.
|
||||
///
|
||||
/// Engine-internal code uses this before making the state-root result visible so the next
|
||||
/// payload cannot observe an empty cache between send and store.
|
||||
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
|
||||
PreservedTrieGuard { state: self.state.lock() }
|
||||
}
|
||||
}
|
||||
|
||||
/// A checked-out sparse trie lease.
|
||||
///
|
||||
/// This dereferences to [`SparseStateTrie`] so callers can reuse the trie directly. If the lease is
|
||||
/// dropped without being stored back, a cleared trie is returned to the shared cache unless a newer
|
||||
/// checkout has already superseded it.
|
||||
#[derive(Debug)]
|
||||
pub struct SparseTrieCheckout {
|
||||
trie: Option<SparseTrie>,
|
||||
cache: PayloadSparseTrieCache,
|
||||
checkout_id: u64,
|
||||
}
|
||||
|
||||
impl SparseTrieCheckout {
|
||||
/// Stores the trie back into the shared cache anchored to the given state root.
|
||||
pub fn store_anchored(self, state_root: B256) -> PayloadSparseTrieStoreOutcome {
|
||||
let cache = self.cache.clone();
|
||||
let mut guard = cache.lock();
|
||||
self.store_anchored_with_guard(&mut guard, state_root)
|
||||
}
|
||||
|
||||
/// Stores the trie back into the shared cache in a cleared state.
|
||||
pub fn store_cleared(mut self) -> PayloadSparseTrieStoreOutcome {
|
||||
let cache = self.cache.clone();
|
||||
let mut trie = self.take_trie();
|
||||
prepare_cleared_trie(&mut trie);
|
||||
let deferred = trie.take_deferred_drops();
|
||||
let mut guard = cache.lock();
|
||||
let outcome = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
|
||||
drop(guard);
|
||||
drop(deferred);
|
||||
outcome
|
||||
}
|
||||
|
||||
/// Stores the trie back into the shared cache anchored to the given state root while the
|
||||
/// caller is already holding the preservation lock.
|
||||
pub(super) fn store_anchored_with_guard(
|
||||
mut self,
|
||||
guard: &mut PreservedTrieGuard<'_>,
|
||||
state_root: B256,
|
||||
) -> PayloadSparseTrieStoreOutcome {
|
||||
guard.store(self.checkout_id, PreservedSparseTrie::anchored(self.take_trie(), state_root))
|
||||
}
|
||||
|
||||
/// Stores an already-cleared trie back into the shared cache while the caller is already
|
||||
/// holding the preservation lock.
|
||||
pub(super) fn store_prepared_cleared_with_guard(
|
||||
mut self,
|
||||
guard: &mut PreservedTrieGuard<'_>,
|
||||
) -> PayloadSparseTrieStoreOutcome {
|
||||
guard.store(self.checkout_id, PreservedSparseTrie::cleared(self.take_trie()))
|
||||
}
|
||||
|
||||
fn take_trie(&mut self) -> SparseTrie {
|
||||
self.trie.take().expect("sparse trie checkout must hold a trie until it is stored")
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SparseTrieCheckout {
|
||||
type Target = SparseTrie;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.trie.as_ref().expect("sparse trie checkout must hold a trie until it is stored")
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SparseTrieCheckout {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.trie.as_mut().expect("sparse trie checkout must hold a trie until it is stored")
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SparseTrieCheckout {
|
||||
fn drop(&mut self) {
|
||||
let Some(mut trie) = self.trie.take() else { return };
|
||||
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
checkout_id = self.checkout_id,
|
||||
"Sparse trie checkout dropped before store, returning cleared trie to cache"
|
||||
);
|
||||
|
||||
prepare_cleared_trie(&mut trie);
|
||||
let deferred = trie.take_deferred_drops();
|
||||
let mut guard = self.cache.lock();
|
||||
let _ = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
|
||||
drop(guard);
|
||||
drop(deferred);
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard that holds the lock on the preserved trie.
|
||||
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
|
||||
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
|
||||
/// While held, take-or-create calls will block. Call `store()` to save the trie before dropping.
|
||||
pub(super) struct PreservedTrieGuard<'a> {
|
||||
state: parking_lot::MutexGuard<'a, PayloadSparseTrieState>,
|
||||
}
|
||||
|
||||
impl PreservedTrieGuard<'_> {
|
||||
/// Stores a preserved trie for later reuse.
|
||||
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
|
||||
self.0.replace(trie);
|
||||
/// Stores a preserved trie for later reuse if the checkout is still current.
|
||||
fn store(
|
||||
&mut self,
|
||||
checkout_id: u64,
|
||||
trie: PreservedSparseTrie,
|
||||
) -> PayloadSparseTrieStoreOutcome {
|
||||
if checkout_id != self.state.latest_checkout_id {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
checkout_id,
|
||||
latest_checkout_id = self.state.latest_checkout_id,
|
||||
"Ignoring stale sparse trie checkout"
|
||||
);
|
||||
return PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout;
|
||||
}
|
||||
|
||||
self.state.preserved.replace(trie);
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
}
|
||||
}
|
||||
|
||||
/// A preserved sparse trie that can be reused across payload validations.
|
||||
///
|
||||
/// The trie exists in one of two states:
|
||||
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
|
||||
/// matches the anchor.
|
||||
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state
|
||||
/// root matches the anchor.
|
||||
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
|
||||
#[derive(Debug)]
|
||||
pub(super) enum PreservedSparseTrie {
|
||||
enum PreservedSparseTrie {
|
||||
/// Trie with a computed state root that can be reused for continuation payloads.
|
||||
Anchored {
|
||||
/// The sparse state trie (pruned after root computation).
|
||||
@@ -87,24 +286,17 @@ pub(super) enum PreservedSparseTrie {
|
||||
|
||||
impl PreservedSparseTrie {
|
||||
/// Creates a new anchored preserved trie.
|
||||
///
|
||||
/// The `state_root` is the computed state root from the trie, which becomes the
|
||||
/// anchor for determining if subsequent payloads can reuse this trie.
|
||||
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
|
||||
const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
|
||||
Self::Anchored { trie, state_root }
|
||||
}
|
||||
|
||||
/// Creates a cleared preserved trie (allocations preserved, data cleared).
|
||||
pub(super) const fn cleared(trie: SparseTrie) -> Self {
|
||||
const fn cleared(trie: SparseTrie) -> Self {
|
||||
Self::Cleared { trie }
|
||||
}
|
||||
|
||||
/// Consumes self and returns the trie for reuse.
|
||||
///
|
||||
/// If the preserved trie is anchored and the parent state root matches, the pruned
|
||||
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
|
||||
/// are preserved to reduce memory overhead.
|
||||
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
|
||||
fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
|
||||
match self {
|
||||
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
|
||||
debug!(
|
||||
@@ -135,3 +327,111 @@ impl PreservedSparseTrie {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_sparse_trie(kind: PayloadSparseTrieKind) -> SparseTrie {
|
||||
let default_trie = match kind {
|
||||
PayloadSparseTrieKind::HashMap => {
|
||||
RevealableSparseTrie::blind_from(ConfigurableSparseTrie::HashMap(
|
||||
ParallelSparseTrie::default()
|
||||
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
|
||||
))
|
||||
}
|
||||
PayloadSparseTrieKind::Arena => RevealableSparseTrie::blind_from(
|
||||
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
|
||||
),
|
||||
};
|
||||
|
||||
SparseStateTrie::default()
|
||||
.with_accounts_trie(default_trie.clone())
|
||||
.with_default_storage_trie(default_trie)
|
||||
.with_updates(true)
|
||||
}
|
||||
|
||||
fn prepare_cleared_trie(trie: &mut SparseTrie) {
|
||||
trie.clear();
|
||||
trie.shrink_to(SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY, SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn take_or_create_reuses_matching_anchor() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let state_root = B256::with_last_byte(1);
|
||||
|
||||
assert_eq!(
|
||||
cache.take_or_create_for(state_root).store_anchored(state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Anchored { state_root: anchored, .. }) => {
|
||||
assert_eq!(*anchored, state_root);
|
||||
}
|
||||
other => panic!("expected anchored trie, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_restores_cleared_trie() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let state_root = B256::with_last_byte(2);
|
||||
|
||||
let mut checkout = cache.take_or_create_for(state_root);
|
||||
checkout.set_updates(true);
|
||||
drop(checkout);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Cleared { .. }) => {}
|
||||
other => panic!("expected cleared trie, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stale_checkout_does_not_overwrite_newer_store() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let parent_state_root = B256::with_last_byte(3);
|
||||
let anchored_state_root = B256::with_last_byte(4);
|
||||
|
||||
let stale = cache.take_or_create_for(parent_state_root);
|
||||
let fresh = cache.take_or_create_for(parent_state_root);
|
||||
|
||||
assert_eq!(
|
||||
fresh.store_anchored(anchored_state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
assert_eq!(stale.store_cleared(), PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
|
||||
assert_eq!(*state_root, anchored_state_root);
|
||||
}
|
||||
other => panic!("expected anchored trie to survive stale checkout, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stale_checkout_drop_does_not_overwrite_newer_store() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let parent_state_root = B256::with_last_byte(5);
|
||||
let anchored_state_root = B256::with_last_byte(6);
|
||||
|
||||
let stale = cache.take_or_create_for(parent_state_root);
|
||||
let fresh = cache.take_or_create_for(parent_state_root);
|
||||
|
||||
assert_eq!(
|
||||
fresh.store_anchored(anchored_state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
drop(stale);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
|
||||
assert_eq!(*state_root, anchored_state_root);
|
||||
}
|
||||
other => panic!("expected anchored trie to survive stale checkout drop, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N> + 'static,
|
||||
{
|
||||
/// Initializes the task with the given transactions pending execution
|
||||
pub fn new(
|
||||
pub(crate) fn new(
|
||||
executor: Runtime,
|
||||
execution_cache: PayloadExecutionCache,
|
||||
ctx: PrewarmContext<N, P, Evm>,
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::tree::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
DEFAULT_MAX_TARGETS_FOR_CHUNKING,
|
||||
},
|
||||
payload_processor::multiproof::MultiProofTaskMetrics,
|
||||
payload_processor::{multiproof::MultiProofTaskMetrics, SparseTrieCheckout},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
@@ -30,7 +30,7 @@ use reth_trie_parallel::{
|
||||
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
|
||||
use reth_trie_sparse::{
|
||||
errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
|
||||
RevealableSparseTrie, SparseStateTrie, SparseTrie,
|
||||
RevealableSparseTrie,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use tracing::{debug, debug_span, error, instrument, trace_span};
|
||||
@@ -39,7 +39,7 @@ use tracing::{debug, debug_span, error, instrument, trace_span};
|
||||
const MAX_PENDING_UPDATES: usize = 100;
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
|
||||
pub(super) struct SparseTrieCacheTask {
|
||||
/// Sender for proof results.
|
||||
proof_result_tx: CrossbeamSender<ProofResultMessage>,
|
||||
/// Receiver for proof results directly from workers.
|
||||
@@ -47,7 +47,7 @@ pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = Configurab
|
||||
/// Receives updates from execution and prewarming.
|
||||
updates: CrossbeamReceiver<SparseTrieTaskMessage>,
|
||||
/// `SparseStateTrie` used for computing the state root.
|
||||
trie: SparseStateTrie<A, S>,
|
||||
trie: SparseTrieCheckout,
|
||||
/// Handle to the proof worker pools (storage and account).
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
|
||||
@@ -110,18 +110,14 @@ pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = Configurab
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
|
||||
impl<A, S> SparseTrieCacheTask<A, S>
|
||||
where
|
||||
A: SparseTrie + Default,
|
||||
S: SparseTrie + Default + Clone,
|
||||
{
|
||||
impl SparseTrieCacheTask {
|
||||
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
|
||||
pub(super) fn new_with_trie(
|
||||
pub(super) fn new_with_checkout(
|
||||
executor: &Runtime,
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
trie: SparseStateTrie<A, S>,
|
||||
trie: SparseTrieCheckout,
|
||||
chunk_size: usize,
|
||||
) -> Self {
|
||||
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
|
||||
@@ -205,7 +201,7 @@ where
|
||||
max_values_capacity: usize,
|
||||
disable_pruning: bool,
|
||||
updates: &TrieUpdates,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
) -> (SparseTrieCheckout, DeferredDrops) {
|
||||
let Self { mut trie, .. } = self;
|
||||
trie.commit_updates(updates);
|
||||
if !disable_pruning {
|
||||
@@ -224,7 +220,7 @@ where
|
||||
self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
) -> (SparseTrieCheckout, DeferredDrops) {
|
||||
let Self { mut trie, .. } = self;
|
||||
trie.clear();
|
||||
trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
@@ -306,9 +302,9 @@ where
|
||||
self.promote_pending_account_updates()?;
|
||||
self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
|
||||
|
||||
if self.finished_state_updates &&
|
||||
self.account_updates.is_empty() &&
|
||||
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
if self.finished_state_updates
|
||||
&& self.account_updates.is_empty()
|
||||
&& self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
{
|
||||
break;
|
||||
}
|
||||
@@ -600,7 +596,6 @@ where
|
||||
|
||||
Ok(updates_len_after < updates_len_before)
|
||||
}
|
||||
|
||||
/// Computes storage roots for accounts whose storage updates are fully drained.
|
||||
///
|
||||
/// For each storage trie T that:
|
||||
@@ -621,16 +616,16 @@ where
|
||||
.filter_map(|(address, updates)| updates.is_empty().then_some(*address))
|
||||
.collect();
|
||||
|
||||
struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
|
||||
struct SendStorageTriePtr(*mut RevealableSparseTrie<ConfigurableSparseTrie>);
|
||||
// SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
|
||||
// documented at the use site below.
|
||||
unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
|
||||
unsafe impl Send for SendStorageTriePtr {}
|
||||
|
||||
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
|
||||
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr)> =
|
||||
Vec::with_capacity(addresses_to_compute_roots.len());
|
||||
for address in addresses_to_compute_roots {
|
||||
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
|
||||
!trie.is_root_cached()
|
||||
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address)
|
||||
&& !trie.is_root_cached()
|
||||
{
|
||||
tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
|
||||
}
|
||||
@@ -729,7 +724,7 @@ where
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
|
||||
break
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -850,7 +845,6 @@ pub struct StateRootComputeOutcome {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::{keccak256, Address, B256, U256};
|
||||
use reth_trie_sparse::ArenaParallelSparseTrie;
|
||||
|
||||
#[test]
|
||||
fn test_run_hashing_task_hashed_state_update_forwards() {
|
||||
@@ -873,10 +867,7 @@ mod tests {
|
||||
let expected_state = hashed_state.clone();
|
||||
|
||||
let handle = std::thread::spawn(move || {
|
||||
SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
|
||||
updates_rx,
|
||||
hashed_state_tx,
|
||||
);
|
||||
SparseTrieCacheTask::run_hashing_task(updates_rx, hashed_state_tx);
|
||||
});
|
||||
|
||||
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::tree::{
|
||||
cached_state::{CacheStats, CachedStateProvider},
|
||||
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
|
||||
instrumented_state::{InstrumentedStateProvider, StateProviderStats},
|
||||
payload_processor::PayloadProcessor,
|
||||
payload_processor::{EngineSharedCaches, PayloadProcessor},
|
||||
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
|
||||
sparse_trie::StateRootComputeOutcome,
|
||||
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
|
||||
@@ -190,16 +190,13 @@ where
|
||||
validator: V,
|
||||
config: TreeConfig,
|
||||
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
|
||||
shared_caches: EngineSharedCaches<Evm>,
|
||||
changeset_cache: ChangesetCache,
|
||||
runtime: reth_tasks::Runtime,
|
||||
) -> Self {
|
||||
let precompile_cache_map = PrecompileCacheMap::default();
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
runtime.clone(),
|
||||
evm_config.clone(),
|
||||
&config,
|
||||
precompile_cache_map.clone(),
|
||||
);
|
||||
let precompile_cache_map = shared_caches.precompile_cache_map();
|
||||
let payload_processor =
|
||||
PayloadProcessor::new(runtime.clone(), evm_config.clone(), &config, shared_caches);
|
||||
Self {
|
||||
provider,
|
||||
consensus,
|
||||
@@ -313,7 +310,7 @@ where
|
||||
// Validate block consensus rules which includes header validation
|
||||
if let Err(consensus_err) = self.validate_block_inner(&block, None) {
|
||||
// Header validation error takes precedence over execution error
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into())
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into());
|
||||
}
|
||||
|
||||
// Also validate against the parent
|
||||
@@ -321,7 +318,7 @@ where
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
|
||||
{
|
||||
// Parent validation error takes precedence over execution error
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into())
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into());
|
||||
}
|
||||
|
||||
// No header validation errors, return the original execution error
|
||||
@@ -396,7 +393,7 @@ where
|
||||
Ok(val) => val,
|
||||
Err(e) => {
|
||||
let block = convert_to_block(input)?;
|
||||
return Err(InsertBlockError::new(block, e.into()).into())
|
||||
return Err(InsertBlockError::new(block, e.into()).into());
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -429,7 +426,7 @@ where
|
||||
convert_to_block(input)?,
|
||||
ProviderError::HeaderNotFound(parent_hash.into()).into(),
|
||||
)
|
||||
.into())
|
||||
.into());
|
||||
};
|
||||
let mut state_provider = ensure_ok!(provider_builder.build());
|
||||
drop(_enter);
|
||||
@@ -442,7 +439,7 @@ where
|
||||
convert_to_block(input)?,
|
||||
ProviderError::HeaderNotFound(parent_hash.into()).into(),
|
||||
)
|
||||
.into())
|
||||
.into());
|
||||
};
|
||||
|
||||
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
|
||||
@@ -762,7 +759,7 @@ where
|
||||
)
|
||||
.into(),
|
||||
)
|
||||
.into())
|
||||
.into());
|
||||
}
|
||||
|
||||
let timing_stats = state_provider_stats.map(|stats| {
|
||||
@@ -824,14 +821,14 @@ where
|
||||
) -> Result<(), ConsensusError> {
|
||||
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
|
||||
return Err(e)
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if let Err(e) =
|
||||
self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
|
||||
{
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
|
||||
return Err(e)
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1323,7 +1320,7 @@ where
|
||||
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
|
||||
// validate block consensus rules
|
||||
if let Err(e) = self.validate_block_inner(block, transaction_root) {
|
||||
return Err(e.into())
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
// now validate against the parent
|
||||
@@ -1332,7 +1329,7 @@ where
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
|
||||
{
|
||||
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
|
||||
return Err(e.into())
|
||||
return Err(e.into());
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
@@ -1345,7 +1342,7 @@ where
|
||||
{
|
||||
// call post-block hook
|
||||
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
|
||||
return Err(err.into())
|
||||
return Err(err.into());
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
@@ -1361,7 +1358,7 @@ where
|
||||
{
|
||||
// call post-block hook
|
||||
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
|
||||
return Err(err.into())
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
// record post-execution validation duration
|
||||
@@ -1469,7 +1466,7 @@ where
|
||||
self.provider.clone(),
|
||||
historical,
|
||||
Some(blocks),
|
||||
)))
|
||||
)));
|
||||
}
|
||||
|
||||
// Check if the block is persisted
|
||||
@@ -1477,7 +1474,7 @@ where
|
||||
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
|
||||
// For persisted blocks, we create a builder that will fetch state directly from the
|
||||
// database
|
||||
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
|
||||
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
|
||||
}
|
||||
|
||||
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
|
||||
@@ -1509,7 +1506,7 @@ where
|
||||
) {
|
||||
if state.invalid_headers.get(&block.hash()).is_some() {
|
||||
// we already marked this block as invalid
|
||||
return
|
||||
return;
|
||||
}
|
||||
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
|
||||
}
|
||||
|
||||
@@ -203,6 +203,7 @@ impl TestHarness {
|
||||
payload_validator,
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
EngineSharedCaches::default(),
|
||||
changeset_cache.clone(),
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
@@ -407,6 +408,7 @@ impl ValidatorTestHarness {
|
||||
payload_validator,
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
EngineSharedCaches::default(),
|
||||
changeset_cache,
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
27
crates/engine/tree/tests/shared_caches_sdk.rs
Normal file
27
crates/engine/tree/tests/shared_caches_sdk.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
//! SDK smoke tests for `EngineSharedCaches`.
|
||||
|
||||
use alloy_primitives::B256;
|
||||
use reth_engine_tree::tree::{
|
||||
EngineSharedCaches, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
|
||||
};
|
||||
use reth_evm_ethereum::EthEvmConfig;
|
||||
|
||||
#[test]
|
||||
fn engine_shared_caches_exposes_public_sparse_trie_sdk() {
|
||||
let caches =
|
||||
EngineSharedCaches::<EthEvmConfig>::with_sparse_trie_kind(PayloadSparseTrieKind::Arena);
|
||||
|
||||
let _precompile_cache_map = caches.precompile_cache_map();
|
||||
|
||||
let sparse_trie_cache = caches.sparse_trie_cache();
|
||||
assert_eq!(sparse_trie_cache.kind(), PayloadSparseTrieKind::Arena);
|
||||
let state_root = B256::with_last_byte(1);
|
||||
|
||||
assert_eq!(
|
||||
sparse_trie_cache.take_or_create_for(state_root).store_anchored(state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
|
||||
let checkout = sparse_trie_cache.take_or_create_for(state_root);
|
||||
assert!(checkout.memory_size() > 0 || checkout.retained_storage_tries_count() == 0);
|
||||
}
|
||||
@@ -15,7 +15,7 @@ use reth_engine_tree::{
|
||||
chain::{ChainEvent, FromOrchestrator},
|
||||
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
|
||||
launch::build_engine_orchestrator,
|
||||
tree::TreeConfig,
|
||||
tree::{EngineSharedCaches, PayloadSparseTrieKind, TreeConfig},
|
||||
};
|
||||
use reth_engine_util::EngineMessageStreamExt;
|
||||
use reth_exex::ExExManagerHandle;
|
||||
@@ -90,6 +90,10 @@ impl EngineNodeLauncher {
|
||||
|
||||
// Create changeset cache that will be shared across the engine
|
||||
let changeset_cache = ChangesetCache::new();
|
||||
let main_shared_caches =
|
||||
EngineSharedCaches::<<CB::Components as NodeComponents<T>>::Evm>::with_sparse_trie_kind(
|
||||
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
|
||||
);
|
||||
|
||||
// setup the launch context
|
||||
let ctx = ctx
|
||||
@@ -123,7 +127,8 @@ impl EngineNodeLauncher {
|
||||
.with_blockchain_db::<T, _>(move |provider_factory| {
|
||||
Ok(BlockchainProvider::new(provider_factory)?)
|
||||
})?
|
||||
.with_components(components_builder, on_component_initialized).await?;
|
||||
.with_components(components_builder, on_component_initialized)
|
||||
.await?;
|
||||
|
||||
// spawn exexs if any
|
||||
let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
|
||||
@@ -194,7 +199,12 @@ impl EngineNodeLauncher {
|
||||
// Build the engine validator with all required components
|
||||
let engine_validator = validator_builder
|
||||
.clone()
|
||||
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
|
||||
.build_tree_validator_with_caches(
|
||||
&add_ons_ctx,
|
||||
engine_tree_config.clone(),
|
||||
changeset_cache.clone(),
|
||||
main_shared_caches.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Create the consensus engine stream with optional reorg
|
||||
@@ -207,8 +217,18 @@ impl EngineNodeLauncher {
|
||||
|| async {
|
||||
// Create a separate cache for reorg validator (not shared with main engine)
|
||||
let reorg_cache = ChangesetCache::new();
|
||||
let reorg_shared_caches = EngineSharedCaches::<
|
||||
<CB::Components as NodeComponents<T>>::Evm,
|
||||
>::with_sparse_trie_kind(
|
||||
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
|
||||
);
|
||||
validator_builder
|
||||
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
|
||||
.build_tree_validator_with_caches(
|
||||
&add_ons_ctx,
|
||||
engine_tree_config.clone(),
|
||||
reorg_cache,
|
||||
reorg_shared_caches,
|
||||
)
|
||||
.await
|
||||
},
|
||||
node_config.debug.reorg_frequency,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
//! Builder support for rpc components.
|
||||
|
||||
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
|
||||
use reth_engine_tree::tree::WaitForCaches;
|
||||
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
|
||||
use reth_engine_tree::tree::{EngineSharedCaches, PayloadSparseTrieKind, WaitForCaches};
|
||||
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
|
||||
pub use reth_trie_db::ChangesetCache;
|
||||
|
||||
@@ -981,7 +981,8 @@ where
|
||||
let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
|
||||
|
||||
let engine_api = engine_api_builder.build_engine_api(&ctx).await?;
|
||||
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
|
||||
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events, .. } =
|
||||
ctx;
|
||||
|
||||
info!(target: "reth::cli", "Engine API handler initialized");
|
||||
|
||||
@@ -1294,6 +1295,25 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
|
||||
|
||||
/// Builds the tree validator using the shared cache handles exported by the launcher.
|
||||
///
|
||||
/// The default implementation preserves the legacy behavior and ignores the provided caches.
|
||||
fn build_tree_validator_with_caches(
|
||||
self,
|
||||
ctx: &AddOnsContext<'_, Node>,
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
shared_caches: EngineSharedCaches<Node::Evm>,
|
||||
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
async move {
|
||||
let _ = shared_caches;
|
||||
self.build_tree_validator(ctx, tree_config, changeset_cache).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Basic implementation of [`EngineValidatorBuilder`].
|
||||
@@ -1341,6 +1361,20 @@ where
|
||||
ctx: &AddOnsContext<'_, Node>,
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> eyre::Result<Self::EngineValidator> {
|
||||
let shared_caches = EngineSharedCaches::with_sparse_trie_kind(PayloadSparseTrieKind::from(
|
||||
tree_config.enable_arena_sparse_trie(),
|
||||
));
|
||||
self.build_tree_validator_with_caches(ctx, tree_config, changeset_cache, shared_caches)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn build_tree_validator_with_caches(
|
||||
self,
|
||||
ctx: &AddOnsContext<'_, Node>,
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
shared_caches: EngineSharedCaches<Node::Evm>,
|
||||
) -> eyre::Result<Self::EngineValidator> {
|
||||
let validator = self.payload_validator_builder.build(ctx).await?;
|
||||
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
|
||||
@@ -1353,6 +1387,7 @@ where
|
||||
validator,
|
||||
tree_config,
|
||||
invalid_block_hook,
|
||||
shared_caches,
|
||||
changeset_cache,
|
||||
ctx.node.task_executor().clone(),
|
||||
))
|
||||
|
||||
Reference in New Issue
Block a user