Compare commits

...

2 Commits

Author SHA1 Message Date
Sergei Shulepov
26521f1100 dani's suggestion 2026-04-01 16:11:40 +08:00
Sergei Shulepov
8f7677de78 worker availability sheet 2026-04-01 16:11:06 +08:00
5 changed files with 88 additions and 55 deletions

1
Cargo.lock generated
View File

@@ -10559,6 +10559,7 @@ dependencies = [
"alloy-primitives",
"alloy-rlp",
"crossbeam-channel",
"crossbeam-utils",
"derive_more",
"itertools 0.14.0",
"metrics",

View File

@@ -61,17 +61,14 @@ pub(crate) fn dispatch_with_chunking<T, I>(
chunking_len: usize,
chunk_size: usize,
max_targets_for_chunking: usize,
available_account_workers: usize,
available_storage_workers: usize,
has_spare_workers: impl FnOnce() -> bool,
chunker: impl FnOnce(T, usize) -> I,
mut dispatch: impl FnMut(T),
) -> usize
where
I: IntoIterator<Item = T>,
{
let should_chunk = chunking_len > max_targets_for_chunking ||
available_account_workers > 1 ||
available_storage_workers > 1;
let should_chunk = chunking_len > max_targets_for_chunking || has_spare_workers();
if should_chunk && chunking_len > chunk_size {
let mut num_chunks = 0usize;

View File

@@ -777,8 +777,10 @@ where
chunking_length,
self.chunk_size,
self.max_targets_for_chunking,
self.proof_worker_handle.available_account_workers(),
self.proof_worker_handle.available_storage_workers(),
|| {
self.proof_worker_handle.has_multiple_idle_account_workers() ||
self.proof_worker_handle.has_multiple_idle_storage_workers()
},
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) =

View File

@@ -39,6 +39,7 @@ derive_more.workspace = true
rayon.workspace = true
itertools.workspace = true
crossbeam-channel.workspace = true
crossbeam-utils.workspace = true
# `metrics` feature
reth-metrics = { workspace = true, optional = true }

View File

@@ -55,7 +55,7 @@ use std::{
cell::RefCell,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender},
Arc,
},
@@ -86,6 +86,52 @@ type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
>;
/// Tracks worker availability counts.
///
/// It uses cacheline-aligned flags to avoid core-to-core chatter.
#[derive(Debug)]
struct AvailabilitySheet {
/// One flag per worker, each on its own cacheline. Workers store `true` when idle,
/// `false` when busy. Only the owning worker writes; the dispatcher only reads.
flags: Vec<crossbeam_utils::CachePadded<AtomicBool>>,
}
impl AvailabilitySheet {
/// Creates a new sheet with `count` workers, all initially marked as busy.
fn new(count: usize) -> Self {
let flags =
(0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect();
Self { flags }
}
/// Returns `true` if more than one worker is currently idle.
///
/// Note, that this is somewhat racy since a flag that was just saying `idle` and we counted it
/// as such might turn into `busy` right away.
fn has_multiple_idle(&self) -> bool {
let mut idle = 0u32;
for flag in &self.flags {
if flag.load(Ordering::Relaxed) {
idle += 1;
if idle > 1 {
return true;
}
}
}
false
}
/// Marks the given worker as idle.
fn mark_idle(&self, worker_id: usize) {
self.flags[worker_id].store(true, Ordering::Relaxed);
}
/// Marks the given worker as busy.
fn mark_busy(&self, worker_id: usize) {
self.flags[worker_id].store(false, Ordering::Relaxed);
}
}
/// A handle that provides type-safe access to proof worker pools.
///
/// The handle stores direct senders to both storage and account worker pools,
@@ -97,12 +143,12 @@ pub struct ProofWorkerHandle {
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Direct sender to account worker pool
account_work_tx: CrossbeamSender<AccountWorkerJob>,
/// Counter tracking available storage workers. Workers decrement when starting work,
/// increment when finishing. Used to determine whether to chunk multiproofs.
storage_available_workers: Arc<AtomicUsize>,
/// Counter tracking available account workers. Workers decrement when starting work,
/// increment when finishing. Used to determine whether to chunk multiproofs.
account_available_workers: Arc<AtomicUsize>,
/// Per-worker availability flags for storage workers. Used to determine whether to chunk
/// multiproofs.
storage_availability: Arc<AvailabilitySheet>,
/// Per-worker availability flags for account workers. Used to determine whether to chunk
/// multiproofs.
account_availability: Arc<AvailabilitySheet>,
/// Total number of storage workers spawned
storage_worker_count: usize,
/// Total number of account workers spawned
@@ -140,9 +186,6 @@ impl ProofWorkerHandle {
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
let storage_available_workers = Arc::<AtomicUsize>::default();
let account_available_workers = Arc::<AtomicUsize>::default();
let cached_storage_roots = Arc::<DashMap<_, _>>::default();
let divisor = if halve_workers { 2 } else { 1 };
@@ -151,6 +194,9 @@ impl ProofWorkerHandle {
let account_worker_count =
runtime.proof_account_worker_pool().current_num_threads() / divisor;
let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count));
let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count));
debug!(
target: "trie::proof_task",
storage_worker_count,
@@ -163,7 +209,7 @@ impl ProofWorkerHandle {
// tokio's blocking pool.
let storage_rt = runtime.clone();
let storage_task_ctx = task_ctx.clone();
let storage_avail = storage_available_workers.clone();
let storage_avail = storage_availability.clone();
let storage_roots = cached_storage_roots.clone();
let storage_parent_span = tracing::Span::current();
runtime.spawn_blocking_named("storage-workers", move || {
@@ -202,7 +248,7 @@ impl ProofWorkerHandle {
let account_rt = runtime.clone();
let account_tx = storage_work_tx.clone();
let account_avail = account_available_workers.clone();
let account_avail = account_availability.clone();
let account_parent_span = tracing::Span::current();
runtime.spawn_blocking_named("account-workers", move || {
let worker_id = AtomicUsize::new(0);
@@ -242,21 +288,21 @@ impl ProofWorkerHandle {
Self {
storage_work_tx,
account_work_tx,
storage_available_workers,
account_available_workers,
storage_availability,
account_availability,
storage_worker_count,
account_worker_count,
}
}
/// Returns how many storage workers are currently available/idle.
pub fn available_storage_workers(&self) -> usize {
self.storage_available_workers.load(Ordering::Relaxed)
/// Returns `true` if more than one storage worker is currently idle.
pub fn has_multiple_idle_storage_workers(&self) -> bool {
self.storage_availability.has_multiple_idle()
}
/// Returns how many account workers are currently available/idle.
pub fn available_account_workers(&self) -> usize {
self.account_available_workers.load(Ordering::Relaxed)
/// Returns `true` if more than one account worker is currently idle.
pub fn has_multiple_idle_account_workers(&self) -> bool {
self.account_availability.has_multiple_idle()
}
/// Returns the number of pending storage tasks in the queue.
@@ -279,20 +325,6 @@ impl ProofWorkerHandle {
self.account_worker_count
}
/// Returns the number of storage workers currently processing tasks.
///
/// This is calculated as total workers minus available workers.
pub fn active_storage_workers(&self) -> usize {
self.storage_worker_count.saturating_sub(self.available_storage_workers())
}
/// Returns the number of account workers currently processing tasks.
///
/// This is calculated as total workers minus available workers.
pub fn active_account_workers(&self) -> usize {
self.account_worker_count.saturating_sub(self.available_account_workers())
}
/// Dispatch a storage proof computation to storage worker pool
///
/// The result will be sent via the `proof_result_sender` channel.
@@ -635,8 +667,8 @@ struct StorageProofWorker<Factory> {
work_rx: CrossbeamReceiver<StorageWorkerJob>,
/// Unique identifier for this worker (used for tracing)
worker_id: usize,
/// Counter tracking worker availability
available_workers: Arc<AtomicUsize>,
/// Per-worker availability flags
availability: Arc<AvailabilitySheet>,
/// Cached storage roots
cached_storage_roots: Arc<DashMap<B256, B256>>,
/// Metrics collector for this worker
@@ -656,7 +688,7 @@ where
task_ctx: ProofTaskCtx<Factory>,
work_rx: CrossbeamReceiver<StorageWorkerJob>,
worker_id: usize,
available_workers: Arc<AtomicUsize>,
availability: Arc<AvailabilitySheet>,
cached_storage_roots: Arc<DashMap<B256, B256>>,
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
#[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
@@ -665,7 +697,7 @@ where
task_ctx,
work_rx,
worker_id,
available_workers,
availability,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
@@ -719,7 +751,7 @@ where
);
// Initially mark this worker as available.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);
let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
@@ -728,7 +760,7 @@ where
total_idle_time += idle_start.elapsed();
// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
self.availability.mark_busy(self.worker_id);
#[cfg(feature = "trie-debug")]
if let Some(max_jitter) = self.task_ctx.proof_jitter {
@@ -767,7 +799,7 @@ where
}
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);
idle_start = Instant::now();
}
@@ -911,8 +943,8 @@ struct AccountProofWorker<Factory> {
worker_id: usize,
/// Channel for dispatching storage proof work (for pre-dispatched target proofs)
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Counter tracking worker availability
available_workers: Arc<AtomicUsize>,
/// Per-worker availability flags
availability: Arc<AvailabilitySheet>,
/// Cached storage roots
cached_storage_roots: Arc<DashMap<B256, B256>>,
/// Metrics collector for this worker
@@ -934,7 +966,7 @@ where
work_rx: CrossbeamReceiver<AccountWorkerJob>,
worker_id: usize,
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
available_workers: Arc<AtomicUsize>,
availability: Arc<AvailabilitySheet>,
cached_storage_roots: Arc<DashMap<B256, B256>>,
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
#[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
@@ -944,7 +976,7 @@ where
work_rx,
worker_id,
storage_work_tx,
available_workers,
availability,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
@@ -1030,7 +1062,7 @@ where
)));
// Count this worker as available only after successful initialization.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);
let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
@@ -1040,7 +1072,7 @@ where
total_idle_time += idle_start.elapsed();
// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
self.availability.mark_busy(self.worker_id);
#[cfg(feature = "trie-debug")]
if let Some(max_jitter) = self.task_ctx.proof_jitter {
@@ -1079,7 +1111,7 @@ where
}
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);
idle_start = Instant::now();
}