wiring for crossbeam sender

This commit is contained in:
Yong Kang
2025-10-22 04:39:48 +00:00
parent 9bbf81dd0a
commit 3d4d478281

View File

@@ -175,10 +175,10 @@ impl ProofSequencer {
/// This should trigger once the block has been executed (after) the last state update has been
/// sent. This triggers the exit condition of the multi proof task.
#[derive(Deref, Debug)]
pub(super) struct StateHookSender(Sender<MultiProofMessage>);
pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
impl StateHookSender {
pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
Self(inner)
}
}
@@ -278,7 +278,7 @@ struct StorageMultiproofInput {
hashed_address: B256,
proof_targets: B256Set,
proof_sequence_number: u64,
state_root_message_sender: Sender<MultiProofMessage>,
state_root_message_sender: CrossbeamSender<MultiProofMessage>,
multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
}
@@ -300,7 +300,7 @@ struct MultiproofInput {
hashed_state_update: HashedPostState,
proof_targets: MultiProofTargets,
proof_sequence_number: u64,
state_root_message_sender: Sender<MultiProofMessage>,
state_root_message_sender: CrossbeamSender<MultiProofMessage>,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
}
@@ -335,6 +335,8 @@ pub struct MultiproofManager {
/// a big account change into different chunks, which may repeatedly
/// revisit missed leaves.
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
/// Sender for proof results (passed directly to workers)
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Metrics
metrics: MultiProofTaskMetrics,
}
@@ -345,6 +347,8 @@ impl MultiproofManager {
executor: WorkloadExecutor,
metrics: MultiProofTaskMetrics,
proof_worker_handle: ProofWorkerHandle,
max_concurrent: usize,
proof_result_tx: CrossbeamSender<ProofResultMessage>,
) -> Self {
Self {
inflight: 0,
@@ -352,11 +356,17 @@ impl MultiproofManager {
metrics,
proof_worker_handle,
missed_leaves_storage_roots: Default::default(),
proof_result_tx,
}
}
/// Spawns a new multiproof calculation.
fn spawn(&mut self, input: PendingMultiproofTask) {
const fn is_full(&self) -> bool {
self.inflight >= self.max_concurrent
}
/// Spawns a new multiproof calculation or enqueues it for later if
/// `max_concurrent` are already inflight.
fn spawn_or_queue(&mut self, input: PendingMultiproofTask) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets_is_empty() {
debug!(
@@ -392,8 +402,8 @@ impl MultiproofManager {
hashed_address,
proof_targets,
proof_sequence_number,
state_root_message_sender,
multi_added_removed_keys,
..
} = storage_multiproof_input;
let storage_proof_worker_handle = self.proof_worker_handle.clone();