chore: remove layer of blocking task calls (#15102)

This commit is contained in:
Dan Cline
2025-03-18 05:20:43 -04:00
committed by GitHub
parent 5cfa9c7df9
commit 920796387d
3 changed files with 20 additions and 62 deletions

View File

@@ -344,7 +344,6 @@ where
state_root_message_sender,
}: MultiproofInput<Factory>,
) {
let executor = self.executor.clone();
let to_storage_proof_task = self.storage_proof_task_handle.sender();
self.executor.spawn_blocking(move || {
@@ -365,7 +364,6 @@ where
config.nodes_sorted,
config.state_sorted,
config.prefix_sets,
executor.handle().clone(),
to_storage_proof_task.clone(),
)
.with_branch_node_masks(true)

View File

@@ -30,12 +30,8 @@ use reth_trie::{
};
use reth_trie_common::proof::ProofRetainer;
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::{
sync::{mpsc::Sender, Arc},
time::Instant,
};
use tokio::runtime::Handle;
use tracing::{debug, trace};
use std::sync::{mpsc::Sender, Arc};
use tracing::debug;
/// Parallel proof calculator.
///
@@ -56,8 +52,6 @@ pub struct ParallelProof<Factory: DatabaseProviderFactory> {
pub prefix_sets: Arc<TriePrefixSetsMut>,
/// Flag indicating whether to include branch node masks in the proof.
collect_branch_node_masks: bool,
/// Handle to spawn blocking tasks to fetch data.
executor: Handle,
/// Sender to the storage proof task.
storage_proof_task: Sender<ProofTaskMessage<FactoryTx<Factory>>>,
#[cfg(feature = "metrics")]
@@ -71,7 +65,6 @@ impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
nodes_sorted: Arc<TrieUpdatesSorted>,
state_sorted: Arc<HashedPostStateSorted>,
prefix_sets: Arc<TriePrefixSetsMut>,
executor: Handle,
storage_proof_task: Sender<ProofTaskMessage<FactoryTx<Factory>>>,
) -> Self {
Self {
@@ -80,7 +73,6 @@ impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
state_sorted,
prefix_sets,
collect_branch_node_masks: false,
executor,
storage_proof_task,
#[cfg(feature = "metrics")]
metrics: ParallelTrieMetrics::new_with_labels(&[("type", "proof")]),
@@ -146,57 +138,18 @@ where
{
let target_slots = targets.get(&hashed_address).cloned().unwrap_or_default();
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let proof_task_sender = self.storage_proof_task.clone();
// spawn the task as blocking and send the the result through the channel
self.executor.spawn_blocking(move || {
debug!(
target: "trie::parallel_proof",
?hashed_address,
"Starting proof calculation"
);
let proof_start = Instant::now();
let prefix_set_len = prefix_set.len();
let target_slots_len = target_slots.len();
let input = StorageProofInput::new(
hashed_address,
prefix_set,
target_slots,
self.collect_branch_node_masks,
);
let (sender, receiver) = std::sync::mpsc::channel();
let _ = proof_task_sender.send(ProofTaskMessage::StorageProof((input, sender)));
let result = receiver.recv().unwrap();
trace!(
target: "trie::parallel_proof",
?hashed_address,
prefix_set = ?prefix_set_len,
target_slots = ?target_slots_len,
proof_time = ?proof_start.elapsed(),
"Completed proof calculation"
);
// We can have the receiver dropped before we send, because we still calculate
// storage proofs for deleted accounts, but do not actually walk over them in
// `account_node_iter` below.
if let Err(e) = tx.send(result) {
debug!(
target: "trie::parallel_proof",
?hashed_address,
error = ?e,
task_time = ?proof_start.elapsed(),
"Failed to send proof result"
);
}
});
let input = StorageProofInput::new(
hashed_address,
prefix_set,
target_slots,
self.collect_branch_node_masks,
);
let (sender, receiver) = std::sync::mpsc::channel();
let _ = self.storage_proof_task.send(ProofTaskMessage::StorageProof((input, sender)));
// store the receiver for that result with the hashed address so we can await this in
// place when we iterate over the trie
storage_proofs.insert(hashed_address, rx);
storage_proofs.insert(hashed_address, receiver);
}
let provider_ro = self.view.provider_ro()?;
@@ -415,7 +368,6 @@ mod tests {
Default::default(),
Default::default(),
Default::default(),
rt.handle().clone(),
proof_task_sender,
)
.multiproof(targets.clone())

View File

@@ -238,7 +238,15 @@ where
);
// send the result back
let _ = result_sender.send(result);
if let Err(e) = result_sender.send(result) {
debug!(
target: "trie::parallel_proof",
hashed_address=?input.hashed_address,
error = ?e,
task_time = ?proof_start.elapsed(),
"Failed to send proof result"
);
}
// send the tx back
let _ = tx_sender.send(ProofTaskMessage::Transaction(self));