mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf(engine): send proofs directly from prewarming to multiproof task (#21901)
This commit is contained in:
@@ -138,6 +138,7 @@ where
|
||||
&self,
|
||||
pending: mpsc::Receiver<Tx>,
|
||||
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
) where
|
||||
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
|
||||
{
|
||||
@@ -162,7 +163,7 @@ where
|
||||
};
|
||||
|
||||
// Spawn workers
|
||||
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
|
||||
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof, done_tx.clone());
|
||||
|
||||
// Distribute transactions to workers
|
||||
let mut tx_index = 0usize;
|
||||
@@ -196,24 +197,6 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
/// Returns true if prewarming was terminated and no more transactions should be prewarmed.
|
||||
fn is_execution_terminated(&self) -> bool {
|
||||
self.ctx.terminate_execution.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// If configured and the tx returned proof targets, emit the targets the transaction produced
|
||||
fn send_multi_proof_targets(&self, targets: Option<VersionedMultiProofTargets>) {
|
||||
if self.is_execution_terminated() {
|
||||
// if execution is already terminated then we dont need to send more proof fetch
|
||||
// messages
|
||||
return
|
||||
}
|
||||
|
||||
if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
|
||||
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
|
||||
}
|
||||
}
|
||||
|
||||
/// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
|
||||
/// It should only be called after ensuring that:
|
||||
/// 1. All prewarming tasks have completed execution
|
||||
@@ -375,7 +358,7 @@ where
|
||||
// Spawn execution tasks based on mode
|
||||
match mode {
|
||||
PrewarmMode::Transactions(pending) => {
|
||||
self.spawn_all(pending, actions_tx);
|
||||
self.spawn_all(pending, actions_tx, self.to_multi_proof.clone());
|
||||
}
|
||||
PrewarmMode::BlockAccessList(bal) => {
|
||||
self.run_bal_prewarm(bal, actions_tx);
|
||||
@@ -391,10 +374,6 @@ where
|
||||
debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
|
||||
self.ctx.terminate_execution.store(true, Ordering::Relaxed);
|
||||
}
|
||||
PrewarmTaskEvent::Outcome { proof_targets } => {
|
||||
// completed executing a set of transactions
|
||||
self.send_multi_proof_targets(proof_targets);
|
||||
}
|
||||
PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => {
|
||||
trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
|
||||
final_execution_outcome =
|
||||
@@ -531,7 +510,8 @@ where
|
||||
}
|
||||
|
||||
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
|
||||
/// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
|
||||
/// transactions and streams [`MultiProofMessage::PrefetchProofs`] messages for each
|
||||
/// transaction.
|
||||
///
|
||||
/// This function processes transactions sequentially from the receiver and emits outcome events
|
||||
/// via the provided sender. Execution errors are logged and tracked but do not stop the batch
|
||||
@@ -543,7 +523,7 @@ where
|
||||
fn transact_batch<Tx>(
|
||||
self,
|
||||
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
|
||||
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
done_tx: Sender<()>,
|
||||
) where
|
||||
Tx: ExecutableTxFor<Evm>,
|
||||
@@ -571,10 +551,8 @@ where
|
||||
// create the tx env
|
||||
let start = Instant::now();
|
||||
|
||||
// If the task was cancelled, stop execution, send an empty result to notify the task,
|
||||
// and exit.
|
||||
// If the task was cancelled, stop execution, and exit.
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
|
||||
break
|
||||
}
|
||||
|
||||
@@ -603,10 +581,8 @@ where
|
||||
|
||||
drop(enter);
|
||||
|
||||
// If the task was cancelled, stop execution, send an empty result to notify the task,
|
||||
// and exit.
|
||||
// If the task was cancelled, stop execution, and exit.
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
|
||||
break
|
||||
}
|
||||
|
||||
@@ -619,7 +595,9 @@ where
|
||||
let (targets, storage_targets) =
|
||||
multiproof_targets_from_state(res.state, v2_proofs_enabled);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
|
||||
if let Some(to_multi_proof) = &to_multi_proof {
|
||||
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
|
||||
}
|
||||
drop(_enter);
|
||||
}
|
||||
|
||||
@@ -637,7 +615,7 @@ where
|
||||
self,
|
||||
workers_needed: usize,
|
||||
task_executor: &WorkloadExecutor,
|
||||
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
done_tx: Sender<()>,
|
||||
) -> CrossbeamSender<IndexedTransaction<Tx>>
|
||||
where
|
||||
@@ -652,13 +630,13 @@ where
|
||||
let _enter = span.entered();
|
||||
for idx in 0..workers_needed {
|
||||
let ctx = self.clone();
|
||||
let actions_tx = actions_tx.clone();
|
||||
let to_multi_proof = to_multi_proof.clone();
|
||||
let done_tx = done_tx.clone();
|
||||
let rx = tx_receiver.clone();
|
||||
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
|
||||
executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
ctx.transact_batch(rx, actions_tx, done_tx);
|
||||
ctx.transact_batch(rx, to_multi_proof, done_tx);
|
||||
});
|
||||
}
|
||||
});
|
||||
@@ -869,11 +847,6 @@ pub enum PrewarmTaskEvent<R> {
|
||||
/// updated cache but only save it once we know the block is valid.
|
||||
valid_block_rx: mpsc::Receiver<()>,
|
||||
},
|
||||
/// The outcome of a pre-warm task
|
||||
Outcome {
|
||||
/// The prepared proof targets based on the evm state outcome
|
||||
proof_targets: Option<VersionedMultiProofTargets>,
|
||||
},
|
||||
/// Finished executing all transactions
|
||||
FinishedTxExecution {
|
||||
/// Number of transactions executed
|
||||
|
||||
Reference in New Issue
Block a user