From 920796387da172a8c16000e37fd604df2ae48428 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Tue, 18 Mar 2025 05:20:43 -0400 Subject: [PATCH] chore: remove layer of blocking task calls (#15102) --- .../src/tree/payload_processor/multiproof.rs | 2 - crates/trie/parallel/src/proof.rs | 70 +++---------------- crates/trie/parallel/src/proof_task.rs | 10 ++- 3 files changed, 20 insertions(+), 62 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index b974d8208f..25082b0ab1 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -344,7 +344,6 @@ where state_root_message_sender, }: MultiproofInput, ) { - let executor = self.executor.clone(); let to_storage_proof_task = self.storage_proof_task_handle.sender(); self.executor.spawn_blocking(move || { @@ -365,7 +364,6 @@ where config.nodes_sorted, config.state_sorted, config.prefix_sets, - executor.handle().clone(), to_storage_proof_task.clone(), ) .with_branch_node_masks(true) diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 9e53d608b6..552a182503 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -30,12 +30,8 @@ use reth_trie::{ }; use reth_trie_common::proof::ProofRetainer; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; -use std::{ - sync::{mpsc::Sender, Arc}, - time::Instant, -}; -use tokio::runtime::Handle; -use tracing::{debug, trace}; +use std::sync::{mpsc::Sender, Arc}; +use tracing::debug; /// Parallel proof calculator. /// @@ -56,8 +52,6 @@ pub struct ParallelProof { pub prefix_sets: Arc, /// Flag indicating whether to include branch node masks in the proof. collect_branch_node_masks: bool, - /// Handle to spawn blocking tasks to fetch data. - executor: Handle, /// Sender to the storage proof task. storage_proof_task: Sender>>, #[cfg(feature = "metrics")] @@ -71,7 +65,6 @@ impl ParallelProof { nodes_sorted: Arc, state_sorted: Arc, prefix_sets: Arc, - executor: Handle, storage_proof_task: Sender>>, ) -> Self { Self { @@ -80,7 +73,6 @@ impl ParallelProof { state_sorted, prefix_sets, collect_branch_node_masks: false, - executor, storage_proof_task, #[cfg(feature = "metrics")] metrics: ParallelTrieMetrics::new_with_labels(&[("type", "proof")]), @@ -146,57 +138,18 @@ where { let target_slots = targets.get(&hashed_address).cloned().unwrap_or_default(); - let (tx, rx) = std::sync::mpsc::sync_channel(1); - let proof_task_sender = self.storage_proof_task.clone(); - - // spawn the task as blocking and send the the result through the channel - self.executor.spawn_blocking(move || { - debug!( - target: "trie::parallel_proof", - ?hashed_address, - "Starting proof calculation" - ); - - let proof_start = Instant::now(); - let prefix_set_len = prefix_set.len(); - let target_slots_len = target_slots.len(); - - let input = StorageProofInput::new( - hashed_address, - prefix_set, - target_slots, - self.collect_branch_node_masks, - ); - let (sender, receiver) = std::sync::mpsc::channel(); - let _ = proof_task_sender.send(ProofTaskMessage::StorageProof((input, sender))); - let result = receiver.recv().unwrap(); - - trace!( - target: "trie::parallel_proof", - ?hashed_address, - prefix_set = ?prefix_set_len, - target_slots = ?target_slots_len, - proof_time = ?proof_start.elapsed(), - "Completed proof calculation" - ); - - // We can have the receiver dropped before we send, because we still calculate - // storage proofs for deleted accounts, but do not actually walk over them in - // `account_node_iter` below. - if let Err(e) = tx.send(result) { - debug!( - target: "trie::parallel_proof", - ?hashed_address, - error = ?e, - task_time = ?proof_start.elapsed(), - "Failed to send proof result" - ); - } - }); + let input = StorageProofInput::new( + hashed_address, + prefix_set, + target_slots, + self.collect_branch_node_masks, + ); + let (sender, receiver) = std::sync::mpsc::channel(); + let _ = self.storage_proof_task.send(ProofTaskMessage::StorageProof((input, sender))); // store the receiver for that result with the hashed address so we can await this in // place when we iterate over the trie - storage_proofs.insert(hashed_address, rx); + storage_proofs.insert(hashed_address, receiver); } let provider_ro = self.view.provider_ro()?; @@ -415,7 +368,6 @@ mod tests { Default::default(), Default::default(), Default::default(), - rt.handle().clone(), proof_task_sender, ) .multiproof(targets.clone()) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 047f5c993e..e91452b045 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -238,7 +238,15 @@ where ); // send the result back - let _ = result_sender.send(result); + if let Err(e) = result_sender.send(result) { + debug!( + target: "trie::parallel_proof", + hashed_address=?input.hashed_address, + error = ?e, + task_time = ?proof_start.elapsed(), + "Failed to send proof result" + ); + } // send the tx back let _ = tx_sender.send(ProofTaskMessage::Transaction(self));