From 36107c60abcae06724e57ed1f6b64ba92ad11b10 Mon Sep 17 00:00:00 2001 From: YK Date: Mon, 22 Sep 2025 17:38:02 +0800 Subject: [PATCH] fix(cache): Ensure execution cache remains locked until updated (#18564) Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com> --- crates/engine/tree/src/tree/cached_state.rs | 72 ++++++++++-- .../tree/src/tree/payload_processor/mod.rs | 107 +++++++++++++++--- .../src/tree/payload_processor/prewarm.rs | 44 +++---- 3 files changed, 182 insertions(+), 41 deletions(-) diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index a3d271b9f1..2fbeed1509 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -15,7 +15,7 @@ use reth_trie::{ MultiProofTargets, StorageMultiProof, StorageProof, TrieInput, }; use revm_primitives::map::DefaultHashBuilder; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tracing::trace; pub(crate) type Cache = @@ -520,16 +520,16 @@ pub(crate) struct SavedCache { /// Metrics for the cached state provider metrics: CachedStateMetrics, + + /// A guard to track in-flight usage of this cache. + /// The cache is considered available if the strong count is 1. + usage_guard: Arc<()>, } impl SavedCache { /// Creates a new instance with the internals - pub(super) const fn new( - hash: B256, - caches: ExecutionCache, - metrics: CachedStateMetrics, - ) -> Self { - Self { hash, caches, metrics } + pub(super) fn new(hash: B256, caches: ExecutionCache, metrics: CachedStateMetrics) -> Self { + Self { hash, caches, metrics, usage_guard: Arc::new(()) } } /// Returns the hash for this cache @@ -542,11 +542,21 @@ impl SavedCache { (self.caches, self.metrics) } + /// Returns true if the cache is available for use (no other tasks are currently using it). + pub(crate) fn is_available(&self) -> bool { + Arc::strong_count(&self.usage_guard) == 1 + } + /// Returns the [`ExecutionCache`] belonging to the tracked hash. pub(crate) const fn cache(&self) -> &ExecutionCache { &self.caches } + /// Returns the metrics associated with this cache. + pub(crate) const fn metrics(&self) -> &CachedStateMetrics { + &self.metrics + } + /// Updates the metrics for the [`ExecutionCache`]. pub(crate) fn update_metrics(&self) { self.metrics.storage_cache_size.set(self.caches.total_storage_slots() as f64); @@ -555,6 +565,13 @@ impl SavedCache { } } +#[cfg(test)] +impl SavedCache { + fn clone_guard_for_test(&self) -> Arc<()> { + self.usage_guard.clone() + } +} + /// Cache for an individual account's storage slots. /// /// This represents the second level of the hierarchical storage cache. @@ -796,4 +813,45 @@ mod tests { let slot_status = caches.get_storage(&address, &storage_key); assert_eq!(slot_status, SlotStatus::Empty); } + + // Tests for SavedCache locking mechanism + #[test] + fn test_saved_cache_is_available() { + let execution_cache = ExecutionCacheBuilder::default().build_caches(1000); + let cache = SavedCache::new(B256::ZERO, execution_cache, CachedStateMetrics::zeroed()); + + // Initially, the cache should be available (only one reference) + assert!(cache.is_available(), "Cache should be available initially"); + + // Clone the usage guard (simulating it being handed out) + let _guard = cache.clone_guard_for_test(); + + // Now the cache should not be available (two references) + assert!(!cache.is_available(), "Cache should not be available with active guard"); + } + + #[test] + fn test_saved_cache_multiple_references() { + let execution_cache = ExecutionCacheBuilder::default().build_caches(1000); + let cache = + SavedCache::new(B256::from([2u8; 32]), execution_cache, CachedStateMetrics::zeroed()); + + // Create multiple references to the usage guard + let guard1 = cache.clone_guard_for_test(); + let guard2 = cache.clone_guard_for_test(); + let guard3 = guard1.clone(); + + // Cache should not be available with multiple guards + assert!(!cache.is_available()); + + // Drop guards one by one + drop(guard1); + assert!(!cache.is_available()); // Still not available + + drop(guard2); + assert!(!cache.is_available()); // Still not available + + drop(guard3); + assert!(cache.is_available()); // Now available + } } diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 2b5573f4ea..f1175ed57a 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -314,13 +314,14 @@ where transactions = mpsc::channel().1; } - let (cache, cache_metrics) = self.cache_for(env.parent_hash).split(); + let saved_cache = self.cache_for(env.parent_hash); + let cache = saved_cache.cache().clone(); + let cache_metrics = saved_cache.metrics().clone(); // configure prewarming let prewarm_ctx = PrewarmContext { env, evm_config: self.evm_config.clone(), - cache: cache.clone(), - cache_metrics: cache_metrics.clone(), + saved_cache, provider: provider_builder, metrics: PrewarmMetrics::default(), terminate_execution: Arc::new(AtomicBool::new(false)), @@ -357,15 +358,14 @@ where /// instance. #[instrument(target = "engine::caching", skip(self))] fn cache_for(&self, parent_hash: B256) -> SavedCache { - self.execution_cache - .get_cache_for(parent_hash) - .inspect(|_| debug!("reusing execution cache")) - .unwrap_or_else(|| { - debug!("creating new execution cache on cache miss"); - let cache = - ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size); - SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed()) - }) + if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) { + debug!("reusing execution cache"); + cache + } else { + debug!("creating new execution cache on cache miss"); + let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size); + SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed()) + } } /// Spawns the [`SparseTrieTask`] for this payload processor. @@ -465,6 +465,7 @@ impl PayloadHandle { self.prewarm_handle.cache.clone() } + /// Returns a clone of the cache metrics used by prewarming pub(super) fn cache_metrics(&self) -> CachedStateMetrics { self.prewarm_handle.cache_metrics.clone() } @@ -561,12 +562,17 @@ struct ExecutionCache { } impl ExecutionCache { - /// Returns the cache if the currently store cache is for the given `parent_hash` + /// Returns the cache for `parent_hash` if it's available for use. + /// + /// A cache is considered available when: + /// - It exists and matches the requested parent hash + /// - No other tasks are currently using it (checked via Arc reference count) pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option { let cache = self.inner.read(); cache .as_ref() - .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone())) + .filter(|c| c.executed_block_hash() == parent_hash && c.is_available()) + .cloned() } /// Clears the tracked cache @@ -623,7 +629,9 @@ where #[cfg(test)] mod tests { + use super::ExecutionCache; use crate::tree::{ + cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache}, payload_processor::{ evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor, }, @@ -649,6 +657,77 @@ mod tests { use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot}; use std::sync::Arc; + fn make_saved_cache(hash: B256) -> SavedCache { + let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000); + SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed()) + } + + #[test] + fn execution_cache_allows_single_checkout() { + let execution_cache = ExecutionCache::default(); + let hash = B256::from([1u8; 32]); + + execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash))); + + let first = execution_cache.get_cache_for(hash); + assert!(first.is_some(), "expected initial checkout to succeed"); + + let second = execution_cache.get_cache_for(hash); + assert!(second.is_none(), "second checkout should be blocked while guard is active"); + + drop(first); + + let third = execution_cache.get_cache_for(hash); + assert!(third.is_some(), "third checkout should succeed after guard is dropped"); + } + + #[test] + fn execution_cache_checkout_releases_on_drop() { + let execution_cache = ExecutionCache::default(); + let hash = B256::from([2u8; 32]); + + execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash))); + + { + let guard = execution_cache.get_cache_for(hash); + assert!(guard.is_some(), "expected checkout to succeed"); + // Guard dropped at end of scope + } + + let retry = execution_cache.get_cache_for(hash); + assert!(retry.is_some(), "checkout should succeed after guard drop"); + } + + #[test] + fn execution_cache_mismatch_parent_returns_none() { + let execution_cache = ExecutionCache::default(); + let hash = B256::from([3u8; 32]); + + execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash))); + + let miss = execution_cache.get_cache_for(B256::from([4u8; 32])); + assert!(miss.is_none(), "checkout should fail for different parent hash"); + } + + #[test] + fn execution_cache_update_after_release_succeeds() { + let execution_cache = ExecutionCache::default(); + let initial = B256::from([5u8; 32]); + + execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial))); + + let guard = + execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed"); + + drop(guard); + + let updated = B256::from([6u8; 32]); + execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated))); + + let new_checkout = execution_cache.get_cache_for(updated); + assert!(new_checkout.is_some(), "new checkout should succeed after release and update"); + } + fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec { let mut rng = generators::rng(); let all_addresses: Vec
= (0..num_accounts).map(|_| rng.random()).collect(); diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 406876f844..8917d13f06 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -12,9 +12,7 @@ //! 3. When actual block execution happens, it benefits from the warmed cache use crate::tree::{ - cached_state::{ - CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache, SavedCache, - }, + cached_state::{CachedStateProvider, SavedCache}, payload_processor::{ executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache as PayloadExecutionCache, @@ -147,37 +145,43 @@ where /// It should only be called after ensuring that: /// 1. All prewarming tasks have completed execution /// 2. No other concurrent operations are accessing the cache - /// 3. The prewarming phase has finished (typically signaled by `FinishedTxExecution`) /// - /// This method is called from `run()` only after all execution tasks are complete, + /// Saves the warmed caches back into the shared slot after prewarming completes. + /// + /// This consumes the `SavedCache` held by the task, which releases its usage guard and allows + /// the new, warmed cache to be inserted. + /// + /// This method is called from `run()` only after all execution tasks are complete. fn save_cache(self, state: BundleState) { let start = Instant::now(); - // Precompute outside the lock - let hash = self.ctx.env.hash; - let caches = self.ctx.cache.clone(); - let metrics = self.ctx.cache_metrics.clone(); + let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } = + self; + let hash = env.hash; // Perform all cache operations atomically under the lock - self.execution_cache.update_with_guard(|cached| { - let cache = SavedCache::new(hash, caches, metrics); + execution_cache.update_with_guard(|cached| { + + // consumes the `SavedCache` held by the prewarming task, which releases its usage guard + let (caches, cache_metrics) = saved_cache.split(); + let new_cache = SavedCache::new(hash, caches, cache_metrics); // Insert state into cache while holding the lock - if cache.cache().insert_state(&state).is_err() { + if new_cache.cache().insert_state(&state).is_err() { // Clear the cache on error to prevent having a polluted cache *cached = None; debug!(target: "engine::caching", "cleared execution cache on update error"); return; } - cache.update_metrics(); - debug!(target: "engine::caching", parent_hash=?cache.executed_block_hash(), "Updated execution cache"); + new_cache.update_metrics(); + debug!(target: "engine::caching", parent_hash=?new_cache.executed_block_hash(), "Updated execution cache"); // Replace the shared cache with the new one; the previous cache (if any) is dropped. - *cached = Some(cache); + *cached = Some(new_cache); }); - self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64()); + metrics.cache_saving_duration.set(start.elapsed().as_secs_f64()); } /// Executes the task. @@ -246,8 +250,7 @@ where { pub(super) env: ExecutionEnv, pub(super) evm_config: Evm, - pub(super) cache: StateExecutionCache, - pub(super) cache_metrics: CachedStateMetrics, + pub(super) saved_cache: SavedCache, /// Provider to obtain the state pub(super) provider: StateProviderBuilder, pub(super) metrics: PrewarmMetrics, @@ -269,8 +272,7 @@ where let Self { env, evm_config, - cache: caches, - cache_metrics, + saved_cache, provider, metrics, terminate_execution, @@ -291,6 +293,8 @@ where }; // Use the caches to create a new provider with caching + let caches = saved_cache.cache().clone(); + let cache_metrics = saved_cache.metrics().clone(); let state_provider = CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);