mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 07:17:56 -05:00
fix(cache): Ensure execution cache remains locked until updated (#18564)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
@@ -15,7 +15,7 @@ use reth_trie::{
|
||||
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
|
||||
};
|
||||
use revm_primitives::map::DefaultHashBuilder;
|
||||
use std::time::Duration;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tracing::trace;
|
||||
|
||||
pub(crate) type Cache<K, V> =
|
||||
@@ -520,16 +520,16 @@ pub(crate) struct SavedCache {
|
||||
|
||||
/// Metrics for the cached state provider
|
||||
metrics: CachedStateMetrics,
|
||||
|
||||
/// A guard to track in-flight usage of this cache.
|
||||
/// The cache is considered available if the strong count is 1.
|
||||
usage_guard: Arc<()>,
|
||||
}
|
||||
|
||||
impl SavedCache {
|
||||
/// Creates a new instance with the internals
|
||||
pub(super) const fn new(
|
||||
hash: B256,
|
||||
caches: ExecutionCache,
|
||||
metrics: CachedStateMetrics,
|
||||
) -> Self {
|
||||
Self { hash, caches, metrics }
|
||||
pub(super) fn new(hash: B256, caches: ExecutionCache, metrics: CachedStateMetrics) -> Self {
|
||||
Self { hash, caches, metrics, usage_guard: Arc::new(()) }
|
||||
}
|
||||
|
||||
/// Returns the hash for this cache
|
||||
@@ -542,11 +542,21 @@ impl SavedCache {
|
||||
(self.caches, self.metrics)
|
||||
}
|
||||
|
||||
/// Returns true if the cache is available for use (no other tasks are currently using it).
|
||||
pub(crate) fn is_available(&self) -> bool {
|
||||
Arc::strong_count(&self.usage_guard) == 1
|
||||
}
|
||||
|
||||
/// Returns the [`ExecutionCache`] belonging to the tracked hash.
|
||||
pub(crate) const fn cache(&self) -> &ExecutionCache {
|
||||
&self.caches
|
||||
}
|
||||
|
||||
/// Returns the metrics associated with this cache.
|
||||
pub(crate) const fn metrics(&self) -> &CachedStateMetrics {
|
||||
&self.metrics
|
||||
}
|
||||
|
||||
/// Updates the metrics for the [`ExecutionCache`].
|
||||
pub(crate) fn update_metrics(&self) {
|
||||
self.metrics.storage_cache_size.set(self.caches.total_storage_slots() as f64);
|
||||
@@ -555,6 +565,13 @@ impl SavedCache {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl SavedCache {
|
||||
fn clone_guard_for_test(&self) -> Arc<()> {
|
||||
self.usage_guard.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache for an individual account's storage slots.
|
||||
///
|
||||
/// This represents the second level of the hierarchical storage cache.
|
||||
@@ -796,4 +813,45 @@ mod tests {
|
||||
let slot_status = caches.get_storage(&address, &storage_key);
|
||||
assert_eq!(slot_status, SlotStatus::Empty);
|
||||
}
|
||||
|
||||
// Tests for SavedCache locking mechanism
|
||||
#[test]
|
||||
fn test_saved_cache_is_available() {
|
||||
let execution_cache = ExecutionCacheBuilder::default().build_caches(1000);
|
||||
let cache = SavedCache::new(B256::ZERO, execution_cache, CachedStateMetrics::zeroed());
|
||||
|
||||
// Initially, the cache should be available (only one reference)
|
||||
assert!(cache.is_available(), "Cache should be available initially");
|
||||
|
||||
// Clone the usage guard (simulating it being handed out)
|
||||
let _guard = cache.clone_guard_for_test();
|
||||
|
||||
// Now the cache should not be available (two references)
|
||||
assert!(!cache.is_available(), "Cache should not be available with active guard");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_saved_cache_multiple_references() {
|
||||
let execution_cache = ExecutionCacheBuilder::default().build_caches(1000);
|
||||
let cache =
|
||||
SavedCache::new(B256::from([2u8; 32]), execution_cache, CachedStateMetrics::zeroed());
|
||||
|
||||
// Create multiple references to the usage guard
|
||||
let guard1 = cache.clone_guard_for_test();
|
||||
let guard2 = cache.clone_guard_for_test();
|
||||
let guard3 = guard1.clone();
|
||||
|
||||
// Cache should not be available with multiple guards
|
||||
assert!(!cache.is_available());
|
||||
|
||||
// Drop guards one by one
|
||||
drop(guard1);
|
||||
assert!(!cache.is_available()); // Still not available
|
||||
|
||||
drop(guard2);
|
||||
assert!(!cache.is_available()); // Still not available
|
||||
|
||||
drop(guard3);
|
||||
assert!(cache.is_available()); // Now available
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,13 +314,14 @@ where
|
||||
transactions = mpsc::channel().1;
|
||||
}
|
||||
|
||||
let (cache, cache_metrics) = self.cache_for(env.parent_hash).split();
|
||||
let saved_cache = self.cache_for(env.parent_hash);
|
||||
let cache = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
// configure prewarming
|
||||
let prewarm_ctx = PrewarmContext {
|
||||
env,
|
||||
evm_config: self.evm_config.clone(),
|
||||
cache: cache.clone(),
|
||||
cache_metrics: cache_metrics.clone(),
|
||||
saved_cache,
|
||||
provider: provider_builder,
|
||||
metrics: PrewarmMetrics::default(),
|
||||
terminate_execution: Arc::new(AtomicBool::new(false)),
|
||||
@@ -357,15 +358,14 @@ where
|
||||
/// instance.
|
||||
#[instrument(target = "engine::caching", skip(self))]
|
||||
fn cache_for(&self, parent_hash: B256) -> SavedCache {
|
||||
self.execution_cache
|
||||
.get_cache_for(parent_hash)
|
||||
.inspect(|_| debug!("reusing execution cache"))
|
||||
.unwrap_or_else(|| {
|
||||
debug!("creating new execution cache on cache miss");
|
||||
let cache =
|
||||
ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
|
||||
SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
|
||||
})
|
||||
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
|
||||
debug!("reusing execution cache");
|
||||
cache
|
||||
} else {
|
||||
debug!("creating new execution cache on cache miss");
|
||||
let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
|
||||
SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns the [`SparseTrieTask`] for this payload processor.
|
||||
@@ -465,6 +465,7 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
|
||||
self.prewarm_handle.cache.clone()
|
||||
}
|
||||
|
||||
/// Returns a clone of the cache metrics used by prewarming
|
||||
pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
|
||||
self.prewarm_handle.cache_metrics.clone()
|
||||
}
|
||||
@@ -561,12 +562,17 @@ struct ExecutionCache {
|
||||
}
|
||||
|
||||
impl ExecutionCache {
|
||||
/// Returns the cache if the currently store cache is for the given `parent_hash`
|
||||
/// Returns the cache for `parent_hash` if it's available for use.
|
||||
///
|
||||
/// A cache is considered available when:
|
||||
/// - It exists and matches the requested parent hash
|
||||
/// - No other tasks are currently using it (checked via Arc reference count)
|
||||
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
|
||||
let cache = self.inner.read();
|
||||
cache
|
||||
.as_ref()
|
||||
.and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
|
||||
.filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Clears the tracked cache
|
||||
@@ -623,7 +629,9 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ExecutionCache;
|
||||
use crate::tree::{
|
||||
cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
|
||||
payload_processor::{
|
||||
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
|
||||
},
|
||||
@@ -649,6 +657,77 @@ mod tests {
|
||||
use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn make_saved_cache(hash: B256) -> SavedCache {
|
||||
let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
|
||||
SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execution_cache_allows_single_checkout() {
|
||||
let execution_cache = ExecutionCache::default();
|
||||
let hash = B256::from([1u8; 32]);
|
||||
|
||||
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
|
||||
|
||||
let first = execution_cache.get_cache_for(hash);
|
||||
assert!(first.is_some(), "expected initial checkout to succeed");
|
||||
|
||||
let second = execution_cache.get_cache_for(hash);
|
||||
assert!(second.is_none(), "second checkout should be blocked while guard is active");
|
||||
|
||||
drop(first);
|
||||
|
||||
let third = execution_cache.get_cache_for(hash);
|
||||
assert!(third.is_some(), "third checkout should succeed after guard is dropped");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execution_cache_checkout_releases_on_drop() {
|
||||
let execution_cache = ExecutionCache::default();
|
||||
let hash = B256::from([2u8; 32]);
|
||||
|
||||
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
|
||||
|
||||
{
|
||||
let guard = execution_cache.get_cache_for(hash);
|
||||
assert!(guard.is_some(), "expected checkout to succeed");
|
||||
// Guard dropped at end of scope
|
||||
}
|
||||
|
||||
let retry = execution_cache.get_cache_for(hash);
|
||||
assert!(retry.is_some(), "checkout should succeed after guard drop");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execution_cache_mismatch_parent_returns_none() {
|
||||
let execution_cache = ExecutionCache::default();
|
||||
let hash = B256::from([3u8; 32]);
|
||||
|
||||
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
|
||||
|
||||
let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
|
||||
assert!(miss.is_none(), "checkout should fail for different parent hash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execution_cache_update_after_release_succeeds() {
|
||||
let execution_cache = ExecutionCache::default();
|
||||
let initial = B256::from([5u8; 32]);
|
||||
|
||||
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));
|
||||
|
||||
let guard =
|
||||
execution_cache.get_cache_for(initial).expect("expected initial checkout to succeed");
|
||||
|
||||
drop(guard);
|
||||
|
||||
let updated = B256::from([6u8; 32]);
|
||||
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(updated)));
|
||||
|
||||
let new_checkout = execution_cache.get_cache_for(updated);
|
||||
assert!(new_checkout.is_some(), "new checkout should succeed after release and update");
|
||||
}
|
||||
|
||||
fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
|
||||
let mut rng = generators::rng();
|
||||
let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
|
||||
|
||||
@@ -12,9 +12,7 @@
|
||||
//! 3. When actual block execution happens, it benefits from the warmed cache
|
||||
|
||||
use crate::tree::{
|
||||
cached_state::{
|
||||
CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache, SavedCache,
|
||||
},
|
||||
cached_state::{CachedStateProvider, SavedCache},
|
||||
payload_processor::{
|
||||
executor::WorkloadExecutor, multiproof::MultiProofMessage,
|
||||
ExecutionCache as PayloadExecutionCache,
|
||||
@@ -147,37 +145,43 @@ where
|
||||
/// It should only be called after ensuring that:
|
||||
/// 1. All prewarming tasks have completed execution
|
||||
/// 2. No other concurrent operations are accessing the cache
|
||||
/// 3. The prewarming phase has finished (typically signaled by `FinishedTxExecution`)
|
||||
///
|
||||
/// This method is called from `run()` only after all execution tasks are complete,
|
||||
/// Saves the warmed caches back into the shared slot after prewarming completes.
|
||||
///
|
||||
/// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
|
||||
/// the new, warmed cache to be inserted.
|
||||
///
|
||||
/// This method is called from `run()` only after all execution tasks are complete.
|
||||
fn save_cache(self, state: BundleState) {
|
||||
let start = Instant::now();
|
||||
|
||||
// Precompute outside the lock
|
||||
let hash = self.ctx.env.hash;
|
||||
let caches = self.ctx.cache.clone();
|
||||
let metrics = self.ctx.cache_metrics.clone();
|
||||
let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
|
||||
self;
|
||||
let hash = env.hash;
|
||||
|
||||
// Perform all cache operations atomically under the lock
|
||||
self.execution_cache.update_with_guard(|cached| {
|
||||
let cache = SavedCache::new(hash, caches, metrics);
|
||||
execution_cache.update_with_guard(|cached| {
|
||||
|
||||
// consumes the `SavedCache` held by the prewarming task, which releases its usage guard
|
||||
let (caches, cache_metrics) = saved_cache.split();
|
||||
let new_cache = SavedCache::new(hash, caches, cache_metrics);
|
||||
|
||||
// Insert state into cache while holding the lock
|
||||
if cache.cache().insert_state(&state).is_err() {
|
||||
if new_cache.cache().insert_state(&state).is_err() {
|
||||
// Clear the cache on error to prevent having a polluted cache
|
||||
*cached = None;
|
||||
debug!(target: "engine::caching", "cleared execution cache on update error");
|
||||
return;
|
||||
}
|
||||
|
||||
cache.update_metrics();
|
||||
debug!(target: "engine::caching", parent_hash=?cache.executed_block_hash(), "Updated execution cache");
|
||||
new_cache.update_metrics();
|
||||
debug!(target: "engine::caching", parent_hash=?new_cache.executed_block_hash(), "Updated execution cache");
|
||||
|
||||
// Replace the shared cache with the new one; the previous cache (if any) is dropped.
|
||||
*cached = Some(cache);
|
||||
*cached = Some(new_cache);
|
||||
});
|
||||
|
||||
self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
|
||||
metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
|
||||
}
|
||||
|
||||
/// Executes the task.
|
||||
@@ -246,8 +250,7 @@ where
|
||||
{
|
||||
pub(super) env: ExecutionEnv<Evm>,
|
||||
pub(super) evm_config: Evm,
|
||||
pub(super) cache: StateExecutionCache,
|
||||
pub(super) cache_metrics: CachedStateMetrics,
|
||||
pub(super) saved_cache: SavedCache,
|
||||
/// Provider to obtain the state
|
||||
pub(super) provider: StateProviderBuilder<N, P>,
|
||||
pub(super) metrics: PrewarmMetrics,
|
||||
@@ -269,8 +272,7 @@ where
|
||||
let Self {
|
||||
env,
|
||||
evm_config,
|
||||
cache: caches,
|
||||
cache_metrics,
|
||||
saved_cache,
|
||||
provider,
|
||||
metrics,
|
||||
terminate_execution,
|
||||
@@ -291,6 +293,8 @@ where
|
||||
};
|
||||
|
||||
// Use the caches to create a new provider with caching
|
||||
let caches = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
let state_provider =
|
||||
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user