mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-10 07:48:19 -05:00
perf: bias towards proof results (#19426)
This commit is contained in:
@@ -1029,7 +1029,64 @@ impl MultiProofTask {
|
||||
loop {
|
||||
trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop");
|
||||
|
||||
crossbeam_channel::select! {
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(self.proof_result_rx) -> proof_msg => {
|
||||
match proof_msg {
|
||||
Ok(proof_result) => {
|
||||
proofs_processed += 1;
|
||||
|
||||
self.metrics
|
||||
.proof_calculation_duration_histogram
|
||||
.record(proof_result.elapsed);
|
||||
|
||||
self.multiproof_manager.on_calculation_complete();
|
||||
|
||||
// Convert ProofResultMessage to SparseTrieUpdate
|
||||
match proof_result.result {
|
||||
Ok(proof_result_data) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::multiproof",
|
||||
sequence = proof_result.sequence_number,
|
||||
total_proofs = proofs_processed,
|
||||
"Processing calculated proof from worker"
|
||||
);
|
||||
|
||||
let update = SparseTrieUpdate {
|
||||
state: proof_result.state,
|
||||
multiproof: proof_result_data.into_multiproof(),
|
||||
};
|
||||
|
||||
if let Some(combined_update) =
|
||||
self.on_proof(proof_result.sequence_number, update)
|
||||
{
|
||||
let _ = self.to_sparse_trie.send(combined_update);
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if self.is_done(
|
||||
proofs_processed,
|
||||
state_update_proofs_requested,
|
||||
prefetch_proofs_requested,
|
||||
updates_finished,
|
||||
) {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::multiproof",
|
||||
"State updates finished and all proofs processed, ending calculation"
|
||||
);
|
||||
break
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
|
||||
return
|
||||
}
|
||||
}
|
||||
},
|
||||
recv(self.rx) -> message => {
|
||||
match message {
|
||||
Ok(msg) => match msg {
|
||||
@@ -1129,63 +1186,6 @@ impl MultiProofTask {
|
||||
return
|
||||
}
|
||||
}
|
||||
},
|
||||
recv(self.proof_result_rx) -> proof_msg => {
|
||||
match proof_msg {
|
||||
Ok(proof_result) => {
|
||||
proofs_processed += 1;
|
||||
|
||||
self.metrics
|
||||
.proof_calculation_duration_histogram
|
||||
.record(proof_result.elapsed);
|
||||
|
||||
self.multiproof_manager.on_calculation_complete();
|
||||
|
||||
// Convert ProofResultMessage to SparseTrieUpdate
|
||||
match proof_result.result {
|
||||
Ok(proof_result_data) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::multiproof",
|
||||
sequence = proof_result.sequence_number,
|
||||
total_proofs = proofs_processed,
|
||||
"Processing calculated proof from worker"
|
||||
);
|
||||
|
||||
let update = SparseTrieUpdate {
|
||||
state: proof_result.state,
|
||||
multiproof: proof_result_data.into_multiproof(),
|
||||
};
|
||||
|
||||
if let Some(combined_update) =
|
||||
self.on_proof(proof_result.sequence_number, update)
|
||||
{
|
||||
let _ = self.to_sparse_trie.send(combined_update);
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if self.is_done(
|
||||
proofs_processed,
|
||||
state_update_proofs_requested,
|
||||
prefetch_proofs_requested,
|
||||
updates_finished,
|
||||
) {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::multiproof",
|
||||
"State updates finished and all proofs processed, ending calculation"
|
||||
);
|
||||
break
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user