refactor(multiproof): streamline dispatch logic and add multiproof conversion

- Updated the `dispatch` method to accept `MultiproofInput` directly, simplifying the handling of proof targets.
- Removed the separate handling for `PendingMultiproofTask`, consolidating the dispatch logic.
- Introduced `into_multiproof` method in `ProofResult` for converting proof results into `DecodedMultiProof`, enhancing clarity and usability.
This commit is contained in:
Yong Kang
2025-10-27 05:11:21 +00:00
parent f3913f47c9
commit dddc9e1237
2 changed files with 41 additions and 98 deletions

View File

@@ -72,87 +72,18 @@ impl MultiproofManager {
}
/// Dispatches a new multiproof calculation to worker pools.
fn dispatch(&mut self, input: PendingMultiproofTask) {
fn dispatch(&mut self, input: MultiproofInput) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets_is_empty() {
if input.proof_targets.is_empty() {
debug!(
sequence_number = input.proof_sequence_number(),
sequence_number = input.proof_sequence_number,
"No proof targets, sending empty multiproof back immediately"
);
input.send_empty_proof();
return
}
match input {
PendingMultiproofTask::Storage(storage_input) => {
self.dispatch_storage_proof(storage_input);
}
PendingMultiproofTask::Regular(multiproof_input) => {
self.dispatch_multiproof(multiproof_input);
}
}
}
/// Dispatches a single storage proof calculation to worker pool.
fn dispatch_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput) {
let StorageMultiproofInput {
hashed_state_update,
hashed_address,
proof_targets,
proof_sequence_number,
multi_added_removed_keys,
state_root_message_sender: _,
} = storage_multiproof_input;
let storage_targets = proof_targets.len();
trace!(
target: "engine::tree::payload_processor::multiproof",
proof_sequence_number,
?proof_targets,
storage_targets,
"Dispatching storage proof to workers"
);
let start = Instant::now();
// Create prefix set from targets
let prefix_set = reth_trie::prefix_set::PrefixSetMut::from(
proof_targets.iter().map(reth_trie::Nibbles::unpack),
);
let prefix_set = prefix_set.freeze();
// Build computation input (data only)
let input = reth_trie_parallel::proof_task::StorageProofInput::new(
hashed_address,
prefix_set,
proof_targets,
true, // with_branch_node_masks
Some(multi_added_removed_keys),
);
// Dispatch to storage worker
if let Err(e) = self.proof_worker_handle.dispatch_storage_proof(
input,
reth_trie_parallel::proof_task::ProofResultContext::new(
self.proof_result_tx.clone(),
proof_sequence_number,
hashed_state_update,
start,
),
) {
error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof");
return;
}
self.inflight += 1;
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
self.metrics
.pending_storage_multiproofs_histogram
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
self.metrics
.pending_account_multiproofs_histogram
.record(self.proof_worker_handle.pending_account_tasks() as f64);
self.dispatch_multiproof(input);
}
/// Signals that a multiproof calculation has finished.
@@ -425,18 +356,15 @@ impl MultiProofTask {
self.multiproof_manager.proof_worker_handle.has_available_storage_workers();
let mut dispatch = |proof_targets| {
self.multiproof_manager.dispatch(
MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
self.multiproof_manager.dispatch(MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
chunks += 1;
};
@@ -573,18 +501,15 @@ impl MultiProofTask {
);
spawned_proof_targets.extend_ref(&proof_targets);
self.multiproof_manager.dispatch(
MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
self.multiproof_manager.dispatch(MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
chunks += 1;
};
@@ -800,7 +725,7 @@ impl MultiProofTask {
// Convert ProofResultMessage to SparseTrieUpdate
match proof_result.result {
Ok((multiproof, _stats)) => {
Ok(proof_result_data) => {
debug!(
target: "engine::tree::payload_processor::multiproof",
sequence = proof_result.sequence_number,
@@ -808,6 +733,9 @@ impl MultiProofTask {
"Processing calculated proof from worker"
);
// Convert ProofResult to DecodedMultiProof
let multiproof = proof_result_data.into_multiproof();
let update = SparseTrieUpdate {
state: proof_result.state,
multiproof,

View File

@@ -564,6 +564,21 @@ pub enum ProofResult {
},
}
impl ProofResult {
/// Convert this proof result into a `DecodedMultiProof`.
///
/// For account multiproofs, returns the multiproof directly (discarding stats).
/// For storage proofs, wraps the storage proof into a minimal multiproof.
pub fn into_multiproof(self) -> DecodedMultiProof {
match self {
ProofResult::AccountMultiproof(multiproof, _stats) => multiproof,
ProofResult::StorageProof { hashed_address, proof } => {
DecodedMultiProof::from_storage_proof(hashed_address, proof)
}
}
}
}
/// Channel used by worker threads to deliver `ProofResultMessage` items back to
/// `MultiProofTask`.
///