mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
perf: Remove unnecessary single-target storage proofs (#22539)
Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
This commit is contained in:
@@ -677,7 +677,6 @@ where
|
||||
targets: proof_targets,
|
||||
proof_result_sender: ProofResultContext::new(
|
||||
self.proof_result_tx.clone(),
|
||||
0,
|
||||
HashedPostState::default(),
|
||||
Instant::now(),
|
||||
),
|
||||
|
||||
@@ -16,8 +16,7 @@
|
||||
//! The job carries a `ProofResultContext` so the worker knows how to send the result back.
|
||||
//! 2. A worker receives the job, runs the proof, and sends a `ProofResultMessage` through the
|
||||
//! provided `ProofResultSender`.
|
||||
//! 3. The multiproof task receives the message, uses `sequence_number` to keep proofs in order, and
|
||||
//! proceeds with its state-root logic.
|
||||
//! 3. The multiproof task receives the message and proceeds with its state-root logic.
|
||||
//!
|
||||
//! Each job gets its own direct channel so results go straight back to the multiproof task. That
|
||||
//! keeps ordering decisions in one place and lets workers run independently.
|
||||
@@ -334,15 +333,10 @@ impl ProofWorkerHandle {
|
||||
ProviderError::other(std::io::Error::other("account workers unavailable"));
|
||||
|
||||
if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
|
||||
let ProofResultContext {
|
||||
sender: result_tx,
|
||||
sequence_number: seq,
|
||||
state,
|
||||
start_time: start,
|
||||
} = input.into_proof_result_sender();
|
||||
let ProofResultContext { sender: result_tx, state, start_time: start } =
|
||||
input.into_proof_result_sender();
|
||||
|
||||
let _ = result_tx.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: Err(ParallelStateRootError::Provider(error.clone())),
|
||||
elapsed: start.elapsed(),
|
||||
state,
|
||||
@@ -535,8 +529,6 @@ pub type ProofResultSender = CrossbeamSender<ProofResultMessage>;
|
||||
/// This type enables workers to send proof results directly to the `MultiProofTask` event loop.
|
||||
#[derive(Debug)]
|
||||
pub struct ProofResultMessage {
|
||||
/// Sequence number for ordering proofs
|
||||
pub sequence_number: u64,
|
||||
/// The proof calculation result
|
||||
pub result: Result<DecodedMultiProofV2, ParallelStateRootError>,
|
||||
/// Time taken for the entire proof calculation (from dispatch to completion)
|
||||
@@ -553,8 +545,6 @@ pub struct ProofResultMessage {
|
||||
pub struct ProofResultContext {
|
||||
/// Channel sender for result delivery
|
||||
pub sender: ProofResultSender,
|
||||
/// Sequence number for proof ordering
|
||||
pub sequence_number: u64,
|
||||
/// Original state update that triggered this proof
|
||||
pub state: HashedPostState,
|
||||
/// Calculation start time for measuring elapsed duration
|
||||
@@ -565,11 +555,10 @@ impl ProofResultContext {
|
||||
/// Creates a new proof result context.
|
||||
pub const fn new(
|
||||
sender: ProofResultSender,
|
||||
sequence_number: u64,
|
||||
state: HashedPostState,
|
||||
start_time: Instant,
|
||||
) -> Self {
|
||||
Self { sender, sequence_number, state, start_time }
|
||||
Self { sender, state, start_time }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1108,27 +1097,15 @@ where
|
||||
Err(e) => (Err(e), ValueEncoderStats::default()),
|
||||
};
|
||||
|
||||
let ProofResultContext {
|
||||
sender: result_tx,
|
||||
sequence_number: seq,
|
||||
state,
|
||||
start_time: start,
|
||||
} = proof_result_sender;
|
||||
let ProofResultContext { sender: result_tx, state, start_time: start } =
|
||||
proof_result_sender;
|
||||
|
||||
let proof_elapsed = proof_start.elapsed();
|
||||
let total_elapsed = start.elapsed();
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
// Send result to MultiProofTask
|
||||
if result_tx
|
||||
.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result,
|
||||
elapsed: total_elapsed,
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
if result_tx.send(ProofResultMessage { result, elapsed: total_elapsed, state }).is_err() {
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id=self.worker_id,
|
||||
@@ -1219,7 +1196,7 @@ where
|
||||
/// Propagates errors up if queuing fails. Receivers must be consumed by the caller.
|
||||
fn dispatch_v2_storage_proofs(
|
||||
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
|
||||
account_targets: &Vec<proof_v2::Target>,
|
||||
account_targets: &[proof_v2::Target],
|
||||
mut storage_targets: B256Map<Vec<proof_v2::Target>>,
|
||||
) -> Result<B256Map<CrossbeamReceiver<StorageProofResultMessage>>, ParallelStateRootError> {
|
||||
let mut storage_proof_receivers =
|
||||
@@ -1261,28 +1238,6 @@ fn dispatch_v2_storage_proofs(
|
||||
storage_proof_receivers.insert(hashed_address, result_rx);
|
||||
}
|
||||
|
||||
// If there are any targeted accounts which did not have storage targets then we generate a
|
||||
// single proof target for them so that we get their root.
|
||||
for target in account_targets {
|
||||
let hashed_address = target.key();
|
||||
if storage_proof_receivers.contains_key(&hashed_address) {
|
||||
continue
|
||||
}
|
||||
|
||||
let (result_tx, result_rx) = crossbeam_channel::unbounded();
|
||||
let input = StorageProofInput::new(hashed_address, vec![proof_v2::Target::new(B256::ZERO)]);
|
||||
|
||||
storage_work_tx
|
||||
.send(StorageWorkerJob::StorageProof { input, proof_result_sender: result_tx })
|
||||
.map_err(|_| {
|
||||
ParallelStateRootError::Other(format!(
|
||||
"Failed to queue storage proof for {hashed_address:?}: storage worker pool unavailable",
|
||||
))
|
||||
})?;
|
||||
|
||||
storage_proof_receivers.insert(hashed_address, result_rx);
|
||||
}
|
||||
|
||||
Ok(storage_proof_receivers)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user