Compare commits

...

1 Commits

Author SHA1 Message Date
yongkangc
91c4c80da7 perf(engine): release cache lock before waiting for state root validation
Move valid_block_rx.recv() out from under the execution cache write lock
in save_cache(). The original code held the lock across the blocking recv,
which stalled concurrent readers (get_cache_for) for ~100ms+ during state
root validation.

The usage guard (Arc<()> strong_count) still prevents concurrent access
to the shared ExecutionCache while the lock is released: is_available()
returns false as long as the SavedCache is alive, so get_cache_for()
returns None. split() is called inside update_with_guard to release the
guard and publish the new cache atomically under the write lock.
2026-02-20 07:21:52 +00:00
2 changed files with 37 additions and 43 deletions

View File

@@ -1019,16 +1019,10 @@ impl PayloadExecutionCache {
/// Updates the cache with a closure that has exclusive access to the guard.
/// This ensures that all cache operations happen atomically.
///
/// ## CRITICAL SAFETY REQUIREMENT
///
/// **Before calling this method, you MUST ensure there are no other active cache users.**
/// This includes:
/// - No running [`PrewarmCacheTask`] instances that could write to the cache
/// - No concurrent transactions that might access the cached state
/// - All prewarming operations must be completed or cancelled
///
/// Violating this requirement can result in cache corruption, incorrect state data,
/// and potential consensus failures.
/// Callers must not mutate the *underlying* [`ExecutionCache`] data (e.g. via
/// `SavedCache::clear`) while other tasks may hold clones of the same
/// `SavedCache`. Swapping the slot value (`*cached = Some(..)` / `*cached = None`)
/// is always safe because existing clones retain their own `Arc` references.
pub fn update_with_guard<F>(&self, update_fn: F)
where
F: FnOnce(&mut Option<SavedCache>),

View File

@@ -198,15 +198,15 @@ where
});
}
/// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
/// It should only be called after ensuring that:
/// 1. All prewarming tasks have completed execution
/// 2. No other concurrent operations are accessing the cache
///
/// Saves the warmed caches back into the shared slot after prewarming completes.
///
/// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
/// the new, warmed cache to be inserted.
/// Waits for block validation without any lock held, then only on success inserts
/// state and publishes under a brief write lock. This avoids the ~100ms+ lock hold
/// that previously blocked concurrent readers during `valid_block_rx.recv()`.
///
/// The ordering is critical: `insert_state()` mutates the shared fixed-caches
/// in-place while the usage guard is still held (keeping `is_available() == false`),
/// then `split()` releases the guard and publishes the new cache atomically.
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
@@ -223,36 +223,36 @@ where
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
// Wait for state root validation WITHOUT holding the cache lock.
// This is the key optimization: the original code held the lock across this
// blocking recv(), which blocked the next block's prewarming from accessing
// the cache for ~100ms+.
if valid_block_rx.recv().is_err() {
debug!(target: "engine::caching", parent_hash=?hash, "skipped cache publish on invalid block");
return;
}
// Block is valid — mutate caches while the usage guard is still held
// (keeping is_available() == false) so no concurrent reader can observe
// the cache mid-mutation via get_cache_for().
if saved_cache.cache().insert_state(&execution_outcome.state).is_err() {
execution_cache.update_with_guard(|cached| {
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
});
debug!(target: "engine::caching", "cleared execution cache on update error");
} else {
saved_cache.update_metrics();
new_cache.update_metrics();
if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
// Now consume the SavedCache (releasing the usage guard) and publish
// the new cache under a brief lock.
execution_cache.update_with_guard(|cached| {
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
*cached = Some(new_cache);
} else {
// Block was invalid; caches were already mutated by insert_state above,
// so we must clear to prevent using polluted state
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
});
}
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");