mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-17 02:12:02 -05:00
fix(tree): save caches only after prewarm tasks finish (#16011)
This commit is contained in:
@@ -31,6 +31,7 @@ use reth_trie_parallel::{
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{
|
||||
atomic::AtomicBool,
|
||||
mpsc,
|
||||
mpsc::{channel, Sender},
|
||||
Arc,
|
||||
@@ -251,6 +252,7 @@ where
|
||||
cache_metrics: cache_metrics.clone(),
|
||||
provider: provider_builder,
|
||||
metrics: PrewarmMetrics::default(),
|
||||
terminate_execution: Arc::new(AtomicBool::new(false)),
|
||||
};
|
||||
|
||||
let prewarm_task = PrewarmCacheTask::new(
|
||||
|
||||
@@ -20,7 +20,11 @@ use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmStat
|
||||
use reth_trie::MultiProofTargets;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
Arc,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, trace};
|
||||
@@ -46,6 +50,8 @@ pub(super) struct PrewarmCacheTask<N: NodePrimitives, P, Evm> {
|
||||
actions_rx: Receiver<PrewarmTaskEvent>,
|
||||
/// Sender the transactions use to send their result back
|
||||
actions_tx: Sender<PrewarmTaskEvent>,
|
||||
/// Total prewarming tasks spawned
|
||||
prewarm_outcomes_left: usize,
|
||||
}
|
||||
|
||||
impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
|
||||
@@ -72,6 +78,7 @@ where
|
||||
to_multi_proof,
|
||||
actions_rx,
|
||||
actions_tx,
|
||||
prewarm_outcomes_left: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +96,7 @@ where
|
||||
let ctx = self.ctx.clone();
|
||||
let pending_chunk = chunk.collect::<Vec<_>>();
|
||||
|
||||
self.prewarm_outcomes_left += pending_chunk.len();
|
||||
self.executor.spawn_blocking(move || {
|
||||
ctx.transact_batch(&pending_chunk, sender);
|
||||
});
|
||||
@@ -103,7 +111,7 @@ where
|
||||
}
|
||||
|
||||
/// Save the state to the shared cache for the given block.
|
||||
fn save_cache(&self, state: BundleState) {
|
||||
fn save_cache(self, state: BundleState) {
|
||||
let start = Instant::now();
|
||||
let cache = SavedCache::new(
|
||||
self.ctx.header.hash(),
|
||||
@@ -123,6 +131,15 @@ where
|
||||
self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
|
||||
}
|
||||
|
||||
/// Removes the `actions_tx` currently stored in the struct, replacing it with a new one that
|
||||
/// does not point to any active receiver.
|
||||
///
|
||||
/// This is used to drop the `actions_tx` after all tasks have been spawned, and should not be
|
||||
/// used in any context other than the `run` method.
|
||||
fn drop_actions_tx(&mut self) {
|
||||
self.actions_tx = channel().0;
|
||||
}
|
||||
|
||||
/// Executes the task.
|
||||
///
|
||||
/// This will execute the transactions until all transactions have been processed or the task
|
||||
@@ -134,26 +151,45 @@ where
|
||||
// spawn execution tasks.
|
||||
self.spawn_all();
|
||||
|
||||
// drop the actions sender after we've spawned all execution tasks. This is so that the
|
||||
// following loop can terminate even if one of the prewarm tasks ends in an error (i.e.,
|
||||
// does not return an Outcome) or panics.
|
||||
self.drop_actions_tx();
|
||||
|
||||
let mut final_block_output = None;
|
||||
while let Ok(event) = self.actions_rx.recv() {
|
||||
match event {
|
||||
PrewarmTaskEvent::TerminateTransactionExecution => {
|
||||
// stop tx processing
|
||||
self.pending.clear();
|
||||
self.ctx.terminate_execution.store(true, Ordering::Relaxed);
|
||||
}
|
||||
PrewarmTaskEvent::Outcome { proof_targets } => {
|
||||
// completed a transaction, frees up one slot
|
||||
// completed executing a set of transactions
|
||||
self.send_multi_proof_targets(proof_targets);
|
||||
|
||||
// decrement the number of tasks left
|
||||
self.prewarm_outcomes_left -= 1;
|
||||
|
||||
if self.prewarm_outcomes_left == 0 && final_block_output.is_some() {
|
||||
// all tasks are done, and we have the block output, we can exit
|
||||
break
|
||||
}
|
||||
}
|
||||
PrewarmTaskEvent::Terminate { block_output } => {
|
||||
// terminate the task
|
||||
if let Some(state) = block_output {
|
||||
self.save_cache(state);
|
||||
}
|
||||
final_block_output = Some(block_output);
|
||||
|
||||
break
|
||||
if self.prewarm_outcomes_left == 0 {
|
||||
// all tasks are done, we can exit, which will save caches and exit
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// save caches and finish
|
||||
if let Some(Some(state)) = final_block_output {
|
||||
self.save_cache(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,6 +203,8 @@ pub(super) struct PrewarmContext<N: NodePrimitives, P, Evm> {
|
||||
/// Provider to obtain the state
|
||||
pub(super) provider: StateProviderBuilder<N, P>,
|
||||
pub(super) metrics: PrewarmMetrics,
|
||||
/// An atomic bool that tells prewarm tasks to not start any more execution.
|
||||
pub(super) terminate_execution: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<N, P, Evm> PrewarmContext<N, P, Evm>
|
||||
@@ -175,9 +213,20 @@ where
|
||||
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
|
||||
Evm: ConfigureEvm<Primitives = N> + 'static,
|
||||
{
|
||||
/// Splits this context into an evm, an evm config, and metrics.
|
||||
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, Evm, PrewarmMetrics)> {
|
||||
let Self { header, evm_config, cache: caches, cache_metrics, provider, metrics } = self;
|
||||
/// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
|
||||
/// execution.
|
||||
fn evm_for_ctx(
|
||||
self,
|
||||
) -> Option<(EvmFor<Evm, impl Database>, Evm, PrewarmMetrics, Arc<AtomicBool>)> {
|
||||
let Self {
|
||||
header,
|
||||
evm_config,
|
||||
cache: caches,
|
||||
cache_metrics,
|
||||
provider,
|
||||
metrics,
|
||||
terminate_execution,
|
||||
} = self;
|
||||
|
||||
let state_provider = match provider.build() {
|
||||
Ok(provider) => provider,
|
||||
@@ -206,7 +255,7 @@ where
|
||||
// create a new executor and disable nonce checks in the env
|
||||
let evm = evm_config.evm_with_env(state_provider, evm_env);
|
||||
|
||||
Some((evm, evm_config, metrics))
|
||||
Some((evm, evm_config, metrics, terminate_execution))
|
||||
}
|
||||
|
||||
/// Transacts the vec of transactions and returns the state outcome.
|
||||
@@ -217,9 +266,18 @@ where
|
||||
/// Note: Since here are no ordering guarantees this won't the state the txs produce when
|
||||
/// executed sequentially.
|
||||
fn transact_batch(self, txs: &[Recovered<N::SignedTx>], sender: Sender<PrewarmTaskEvent>) {
|
||||
let Some((mut evm, evm_config, metrics)) = self.evm_for_ctx() else { return };
|
||||
let Some((mut evm, evm_config, metrics, terminate_execution)) = self.evm_for_ctx() else {
|
||||
return
|
||||
};
|
||||
|
||||
for tx in txs {
|
||||
// If the task was cancelled, stop execution, send an empty result to notify the task,
|
||||
// and exit.
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
|
||||
return
|
||||
}
|
||||
|
||||
// create the tx env
|
||||
let tx_env = evm_config.tx_env(tx);
|
||||
let start = Instant::now();
|
||||
|
||||
Reference in New Issue
Block a user