Compare commits

...

2 Commits

Author SHA1 Message Date
yongkangc
182cbfc06d perf: reduce update_leaves key cloning
Amp-Thread-ID: https://ampcode.com/threads/T-019c6516-69ab-714b-bea2-a2b6b6e06709
2026-02-16 06:36:38 +00:00
yongkangc
2740793e42 perf(engine): reduce save_cache write lock hold time
Move insert_state, update_metrics, and valid_block_rx.recv() outside
the PayloadExecutionCache write lock. A short take() under the lock
first detaches the published cache so readers see None during the
update window, preventing observation of partially-mutated Arc-shared
ExecutionCache state.

Amp-Thread-ID: https://ampcode.com/threads/T-019c42d4-fff9-727c-8256-eb3684deaba4
2026-02-16 02:48:53 +00:00
2 changed files with 27 additions and 31 deletions

View File

@@ -236,36 +236,33 @@ where
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
// Detach the published cache so readers see None during the update.
// This is necessary because ExecutionCache is Arc-shared: mutating
// it via insert_state would be visible through the old SavedCache.
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
cached.take();
});
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
debug!(target: "engine::caching", "cleared execution cache on update error");
} else {
new_cache.update_metrics();
if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
} else {
// Block was invalid; caches were already mutated by insert_state above,
// so we must clear to prevent using polluted state
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
let valid = valid_block_rx.recv().is_ok();
execution_cache.update_with_guard(|cached| {
if valid {
*cached = Some(new_cache);
} else {
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
}
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");

View File

@@ -1320,14 +1320,13 @@ impl SparseTrie for ParallelSparseTrie {
) -> SparseTrieResult<()> {
use crate::{provider::NoRevealProvider, LeafUpdate};
// Collect keys upfront since we mutate `updates` during iteration.
// On success, entries are removed; on blinded node failure, they're re-inserted.
let keys: Vec<B256> = updates.keys().copied().collect();
// Drain updates so we avoid cloning keys while mutating the map.
// On success, entries remain removed; on blinded node failure, they're re-inserted.
let drained = std::mem::take(updates);
updates.reserve(drained.len());
for key in keys {
for (key, update) in drained {
let full_path = Nibbles::unpack(key);
// Remove upfront - we'll re-insert if the operation fails due to blinded node.
let update = updates.remove(&key).unwrap();
match update {
LeafUpdate::Changed(value) => {