diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index fd3297c965..865a41816c 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -175,10 +175,10 @@ impl ProofSequencer { /// This should trigger once the block has been executed (after) the last state update has been /// sent. This triggers the exit condition of the multi proof task. #[derive(Deref, Debug)] -pub(super) struct StateHookSender(Sender); +pub(super) struct StateHookSender(CrossbeamSender); impl StateHookSender { - pub(crate) const fn new(inner: Sender) -> Self { + pub(crate) const fn new(inner: CrossbeamSender) -> Self { Self(inner) } } @@ -278,7 +278,7 @@ struct StorageMultiproofInput { hashed_address: B256, proof_targets: B256Set, proof_sequence_number: u64, - state_root_message_sender: Sender, + state_root_message_sender: CrossbeamSender, multi_added_removed_keys: Arc, } @@ -300,7 +300,7 @@ struct MultiproofInput { hashed_state_update: HashedPostState, proof_targets: MultiProofTargets, proof_sequence_number: u64, - state_root_message_sender: Sender, + state_root_message_sender: CrossbeamSender, multi_added_removed_keys: Option>, } @@ -335,6 +335,8 @@ pub struct MultiproofManager { /// a big account change into different chunks, which may repeatedly /// revisit missed leaves. missed_leaves_storage_roots: Arc>, + /// Sender for proof results (passed directly to workers) + proof_result_tx: CrossbeamSender, /// Metrics metrics: MultiProofTaskMetrics, } @@ -345,6 +347,8 @@ impl MultiproofManager { executor: WorkloadExecutor, metrics: MultiProofTaskMetrics, proof_worker_handle: ProofWorkerHandle, + max_concurrent: usize, + proof_result_tx: CrossbeamSender, ) -> Self { Self { inflight: 0, @@ -352,11 +356,17 @@ impl MultiproofManager { metrics, proof_worker_handle, missed_leaves_storage_roots: Default::default(), + proof_result_tx, } } - /// Spawns a new multiproof calculation. - fn spawn(&mut self, input: PendingMultiproofTask) { + const fn is_full(&self) -> bool { + self.inflight >= self.max_concurrent + } + + /// Spawns a new multiproof calculation or enqueues it for later if + /// `max_concurrent` are already inflight. + fn spawn_or_queue(&mut self, input: PendingMultiproofTask) { // If there are no proof targets, we can just send an empty multiproof back immediately if input.proof_targets_is_empty() { debug!( @@ -392,8 +402,8 @@ impl MultiproofManager { hashed_address, proof_targets, proof_sequence_number, - state_root_message_sender, multi_added_removed_keys, + .. } = storage_multiproof_input; let storage_proof_worker_handle = self.proof_worker_handle.clone();