mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: start saving cache sooner (#21130)
This commit is contained in:
@@ -668,10 +668,12 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
/// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
|
||||
/// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
|
||||
/// path without cloning the expensive `BundleState`.
|
||||
///
|
||||
/// Returns a sender for the channel that should be notified on block validation success.
|
||||
pub(super) fn terminate_caching(
|
||||
&mut self,
|
||||
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
|
||||
) {
|
||||
) -> Option<mpsc::Sender<()>> {
|
||||
self.prewarm_handle.terminate_caching(execution_outcome)
|
||||
}
|
||||
|
||||
@@ -709,13 +711,19 @@ impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
|
||||
///
|
||||
/// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its
|
||||
/// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
|
||||
#[must_use = "sender must be used and notified on block validation success"]
|
||||
pub(super) fn terminate_caching(
|
||||
&mut self,
|
||||
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
|
||||
) {
|
||||
) -> Option<mpsc::Sender<()>> {
|
||||
if let Some(tx) = self.to_prewarm_task.take() {
|
||||
let event = PrewarmTaskEvent::Terminate { execution_outcome };
|
||||
let (valid_block_tx, valid_block_rx) = mpsc::channel();
|
||||
let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx };
|
||||
let _ = tx.send(event);
|
||||
|
||||
Some(valid_block_tx)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -724,7 +732,10 @@ impl<R> Drop for CacheTaskHandle<R> {
|
||||
fn drop(&mut self) {
|
||||
// Ensure we always terminate on drop - send None without needing Send + Sync bounds
|
||||
if let Some(tx) = self.to_prewarm_task.take() {
|
||||
let _ = tx.send(PrewarmTaskEvent::Terminate { execution_outcome: None });
|
||||
let _ = tx.send(PrewarmTaskEvent::Terminate {
|
||||
execution_outcome: None,
|
||||
valid_block_rx: mpsc::channel().1,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,7 +261,11 @@ where
|
||||
///
|
||||
/// This method is called from `run()` only after all execution tasks are complete.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
|
||||
fn save_cache(self, execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>) {
|
||||
fn save_cache(
|
||||
self,
|
||||
execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
|
||||
valid_block_rx: mpsc::Receiver<()>,
|
||||
) {
|
||||
let start = Instant::now();
|
||||
|
||||
let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
|
||||
@@ -288,9 +292,11 @@ where
|
||||
|
||||
new_cache.update_metrics();
|
||||
|
||||
// Replace the shared cache with the new one; the previous cache (if any) is
|
||||
// dropped.
|
||||
*cached = Some(new_cache);
|
||||
if valid_block_rx.recv().is_ok() {
|
||||
// Replace the shared cache with the new one; the previous cache (if any) is
|
||||
// dropped.
|
||||
*cached = Some(new_cache);
|
||||
}
|
||||
});
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
@@ -421,9 +427,10 @@ where
|
||||
// completed executing a set of transactions
|
||||
self.send_multi_proof_targets(proof_targets);
|
||||
}
|
||||
PrewarmTaskEvent::Terminate { execution_outcome } => {
|
||||
PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
|
||||
trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
|
||||
final_execution_outcome = Some(execution_outcome);
|
||||
final_execution_outcome =
|
||||
Some(execution_outcome.map(|outcome| (outcome, valid_block_rx)));
|
||||
|
||||
if finished_execution {
|
||||
// all tasks are done, we can exit, which will save caches and exit
|
||||
@@ -448,8 +455,8 @@ where
|
||||
debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
|
||||
|
||||
// save caches and finish using the shared ExecutionOutcome
|
||||
if let Some(Some(execution_outcome)) = final_execution_outcome {
|
||||
self.save_cache(execution_outcome);
|
||||
if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome {
|
||||
self.save_cache(execution_outcome, valid_block_rx);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -813,6 +820,11 @@ pub(super) enum PrewarmTaskEvent<R> {
|
||||
/// The final execution outcome. Using `Arc` allows sharing with the main execution
|
||||
/// path without cloning the expensive `BundleState`.
|
||||
execution_outcome: Option<Arc<BlockExecutionOutput<R>>>,
|
||||
/// Receiver for the block validation result.
|
||||
///
|
||||
/// Cache saving is racing the state root validation. We optimistically construct the
|
||||
/// updated cache but only save it once we know the block is valid.
|
||||
valid_block_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
/// The outcome of a pre-warm task
|
||||
Outcome {
|
||||
|
||||
@@ -462,6 +462,15 @@ where
|
||||
// After executing the block we can stop prewarming transactions
|
||||
handle.stop_prewarming_execution();
|
||||
|
||||
// Create ExecutionOutcome early so we can terminate caching before validation and state
|
||||
// root computation. Using Arc allows sharing with both the caching task and the deferred
|
||||
// trie task without cloning the expensive BundleState.
|
||||
let output = Arc::new(output);
|
||||
|
||||
// Terminate caching task early since execution is complete and caching is no longer
|
||||
// needed. This frees up resources while state root computation continues.
|
||||
let valid_block_tx = handle.terminate_caching(Some(output.clone()));
|
||||
|
||||
let block = self.convert_to_block(input)?.with_senders(senders);
|
||||
|
||||
let hashed_state = ensure_ok_post_block!(
|
||||
@@ -564,16 +573,13 @@ where
|
||||
.into())
|
||||
}
|
||||
|
||||
// Create ExecutionOutcome and wrap in Arc for sharing with both the caching task
|
||||
// and the deferred trie task. This avoids cloning the expensive BundleState.
|
||||
let execution_outcome = Arc::new(output);
|
||||
|
||||
// Terminate prewarming task with the shared execution outcome
|
||||
handle.terminate_caching(Some(Arc::clone(&execution_outcome)));
|
||||
if let Some(valid_block_tx) = valid_block_tx {
|
||||
let _ = valid_block_tx.send(());
|
||||
}
|
||||
|
||||
Ok(self.spawn_deferred_trie_task(
|
||||
block,
|
||||
execution_outcome,
|
||||
output,
|
||||
&ctx,
|
||||
hashed_state,
|
||||
trie_output,
|
||||
|
||||
Reference in New Issue
Block a user