|
|
|
|
@@ -55,7 +55,7 @@ use std::{
|
|
|
|
|
cell::RefCell,
|
|
|
|
|
rc::Rc,
|
|
|
|
|
sync::{
|
|
|
|
|
atomic::{AtomicUsize, Ordering},
|
|
|
|
|
atomic::{AtomicBool, AtomicUsize, Ordering},
|
|
|
|
|
mpsc::{channel, Receiver, Sender},
|
|
|
|
|
Arc,
|
|
|
|
|
},
|
|
|
|
|
@@ -86,6 +86,52 @@ type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
|
|
|
|
|
InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
|
|
|
|
|
>;
|
|
|
|
|
|
|
|
|
|
/// Tracks worker availability counts.
|
|
|
|
|
///
|
|
|
|
|
/// It uses cacheline-aligned flags to avoid core-to-core chatter.
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
struct AvailabilitySheet {
|
|
|
|
|
/// One flag per worker, each on its own cacheline. Workers store `true` when idle,
|
|
|
|
|
/// `false` when busy. Only the owning worker writes; the dispatcher only reads.
|
|
|
|
|
flags: Vec<crossbeam_utils::CachePadded<AtomicBool>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AvailabilitySheet {
|
|
|
|
|
/// Creates a new sheet with `count` workers, all initially marked as busy.
|
|
|
|
|
fn new(count: usize) -> Self {
|
|
|
|
|
let flags =
|
|
|
|
|
(0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect();
|
|
|
|
|
Self { flags }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns `true` if more than one worker is currently idle.
|
|
|
|
|
///
|
|
|
|
|
/// Note, that this is somewhat racy since a flag that was just saying `idle` and we counted it
|
|
|
|
|
/// as such might turn into `busy` right away.
|
|
|
|
|
fn has_multiple_idle(&self) -> bool {
|
|
|
|
|
let mut idle = 0u32;
|
|
|
|
|
for flag in &self.flags {
|
|
|
|
|
if flag.load(Ordering::Relaxed) {
|
|
|
|
|
idle += 1;
|
|
|
|
|
if idle > 1 {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Marks the given worker as idle.
|
|
|
|
|
fn mark_idle(&self, worker_id: usize) {
|
|
|
|
|
self.flags[worker_id].store(true, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Marks the given worker as busy.
|
|
|
|
|
fn mark_busy(&self, worker_id: usize) {
|
|
|
|
|
self.flags[worker_id].store(false, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A handle that provides type-safe access to proof worker pools.
|
|
|
|
|
///
|
|
|
|
|
/// The handle stores direct senders to both storage and account worker pools,
|
|
|
|
|
@@ -97,12 +143,12 @@ pub struct ProofWorkerHandle {
|
|
|
|
|
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
|
|
|
|
/// Direct sender to account worker pool
|
|
|
|
|
account_work_tx: CrossbeamSender<AccountWorkerJob>,
|
|
|
|
|
/// Counter tracking available storage workers. Workers decrement when starting work,
|
|
|
|
|
/// increment when finishing. Used to determine whether to chunk multiproofs.
|
|
|
|
|
storage_available_workers: Arc<AtomicUsize>,
|
|
|
|
|
/// Counter tracking available account workers. Workers decrement when starting work,
|
|
|
|
|
/// increment when finishing. Used to determine whether to chunk multiproofs.
|
|
|
|
|
account_available_workers: Arc<AtomicUsize>,
|
|
|
|
|
/// Per-worker availability flags for storage workers. Used to determine whether to chunk
|
|
|
|
|
/// multiproofs.
|
|
|
|
|
storage_availability: Arc<AvailabilitySheet>,
|
|
|
|
|
/// Per-worker availability flags for account workers. Used to determine whether to chunk
|
|
|
|
|
/// multiproofs.
|
|
|
|
|
account_availability: Arc<AvailabilitySheet>,
|
|
|
|
|
/// Total number of storage workers spawned
|
|
|
|
|
storage_worker_count: usize,
|
|
|
|
|
/// Total number of account workers spawned
|
|
|
|
|
@@ -140,9 +186,6 @@ impl ProofWorkerHandle {
|
|
|
|
|
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
|
|
|
|
|
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
|
|
|
|
|
|
|
|
|
|
let storage_available_workers = Arc::<AtomicUsize>::default();
|
|
|
|
|
let account_available_workers = Arc::<AtomicUsize>::default();
|
|
|
|
|
|
|
|
|
|
let cached_storage_roots = Arc::<DashMap<_, _>>::default();
|
|
|
|
|
|
|
|
|
|
let divisor = if halve_workers { 2 } else { 1 };
|
|
|
|
|
@@ -151,6 +194,9 @@ impl ProofWorkerHandle {
|
|
|
|
|
let account_worker_count =
|
|
|
|
|
runtime.proof_account_worker_pool().current_num_threads() / divisor;
|
|
|
|
|
|
|
|
|
|
let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count));
|
|
|
|
|
let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count));
|
|
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
|
target: "trie::proof_task",
|
|
|
|
|
storage_worker_count,
|
|
|
|
|
@@ -163,7 +209,7 @@ impl ProofWorkerHandle {
|
|
|
|
|
// tokio's blocking pool.
|
|
|
|
|
let storage_rt = runtime.clone();
|
|
|
|
|
let storage_task_ctx = task_ctx.clone();
|
|
|
|
|
let storage_avail = storage_available_workers.clone();
|
|
|
|
|
let storage_avail = storage_availability.clone();
|
|
|
|
|
let storage_roots = cached_storage_roots.clone();
|
|
|
|
|
let storage_parent_span = tracing::Span::current();
|
|
|
|
|
runtime.spawn_blocking_named("storage-workers", move || {
|
|
|
|
|
@@ -202,7 +248,7 @@ impl ProofWorkerHandle {
|
|
|
|
|
|
|
|
|
|
let account_rt = runtime.clone();
|
|
|
|
|
let account_tx = storage_work_tx.clone();
|
|
|
|
|
let account_avail = account_available_workers.clone();
|
|
|
|
|
let account_avail = account_availability.clone();
|
|
|
|
|
let account_parent_span = tracing::Span::current();
|
|
|
|
|
runtime.spawn_blocking_named("account-workers", move || {
|
|
|
|
|
let worker_id = AtomicUsize::new(0);
|
|
|
|
|
@@ -242,21 +288,21 @@ impl ProofWorkerHandle {
|
|
|
|
|
Self {
|
|
|
|
|
storage_work_tx,
|
|
|
|
|
account_work_tx,
|
|
|
|
|
storage_available_workers,
|
|
|
|
|
account_available_workers,
|
|
|
|
|
storage_availability,
|
|
|
|
|
account_availability,
|
|
|
|
|
storage_worker_count,
|
|
|
|
|
account_worker_count,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns how many storage workers are currently available/idle.
|
|
|
|
|
pub fn available_storage_workers(&self) -> usize {
|
|
|
|
|
self.storage_available_workers.load(Ordering::Relaxed)
|
|
|
|
|
/// Returns `true` if more than one storage worker is currently idle.
|
|
|
|
|
pub fn has_multiple_idle_storage_workers(&self) -> bool {
|
|
|
|
|
self.storage_availability.has_multiple_idle()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns how many account workers are currently available/idle.
|
|
|
|
|
pub fn available_account_workers(&self) -> usize {
|
|
|
|
|
self.account_available_workers.load(Ordering::Relaxed)
|
|
|
|
|
/// Returns `true` if more than one account worker is currently idle.
|
|
|
|
|
pub fn has_multiple_idle_account_workers(&self) -> bool {
|
|
|
|
|
self.account_availability.has_multiple_idle()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns the number of pending storage tasks in the queue.
|
|
|
|
|
@@ -279,20 +325,6 @@ impl ProofWorkerHandle {
|
|
|
|
|
self.account_worker_count
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns the number of storage workers currently processing tasks.
|
|
|
|
|
///
|
|
|
|
|
/// This is calculated as total workers minus available workers.
|
|
|
|
|
pub fn active_storage_workers(&self) -> usize {
|
|
|
|
|
self.storage_worker_count.saturating_sub(self.available_storage_workers())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns the number of account workers currently processing tasks.
|
|
|
|
|
///
|
|
|
|
|
/// This is calculated as total workers minus available workers.
|
|
|
|
|
pub fn active_account_workers(&self) -> usize {
|
|
|
|
|
self.account_worker_count.saturating_sub(self.available_account_workers())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Dispatch a storage proof computation to storage worker pool
|
|
|
|
|
///
|
|
|
|
|
/// The result will be sent via the `proof_result_sender` channel.
|
|
|
|
|
@@ -635,8 +667,8 @@ struct StorageProofWorker<Factory> {
|
|
|
|
|
work_rx: CrossbeamReceiver<StorageWorkerJob>,
|
|
|
|
|
/// Unique identifier for this worker (used for tracing)
|
|
|
|
|
worker_id: usize,
|
|
|
|
|
/// Counter tracking worker availability
|
|
|
|
|
available_workers: Arc<AtomicUsize>,
|
|
|
|
|
/// Per-worker availability flags
|
|
|
|
|
availability: Arc<AvailabilitySheet>,
|
|
|
|
|
/// Cached storage roots
|
|
|
|
|
cached_storage_roots: Arc<DashMap<B256, B256>>,
|
|
|
|
|
/// Metrics collector for this worker
|
|
|
|
|
@@ -656,7 +688,7 @@ where
|
|
|
|
|
task_ctx: ProofTaskCtx<Factory>,
|
|
|
|
|
work_rx: CrossbeamReceiver<StorageWorkerJob>,
|
|
|
|
|
worker_id: usize,
|
|
|
|
|
available_workers: Arc<AtomicUsize>,
|
|
|
|
|
availability: Arc<AvailabilitySheet>,
|
|
|
|
|
cached_storage_roots: Arc<DashMap<B256, B256>>,
|
|
|
|
|
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
|
|
|
|
|
#[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
|
|
|
|
|
@@ -665,7 +697,7 @@ where
|
|
|
|
|
task_ctx,
|
|
|
|
|
work_rx,
|
|
|
|
|
worker_id,
|
|
|
|
|
available_workers,
|
|
|
|
|
availability,
|
|
|
|
|
cached_storage_roots,
|
|
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
|
metrics,
|
|
|
|
|
@@ -719,7 +751,7 @@ where
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Initially mark this worker as available.
|
|
|
|
|
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
self.availability.mark_idle(self.worker_id);
|
|
|
|
|
|
|
|
|
|
let mut total_idle_time = Duration::ZERO;
|
|
|
|
|
let mut idle_start = Instant::now();
|
|
|
|
|
@@ -728,7 +760,7 @@ where
|
|
|
|
|
total_idle_time += idle_start.elapsed();
|
|
|
|
|
|
|
|
|
|
// Mark worker as busy.
|
|
|
|
|
self.available_workers.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
|
self.availability.mark_busy(self.worker_id);
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "trie-debug")]
|
|
|
|
|
if let Some(max_jitter) = self.task_ctx.proof_jitter {
|
|
|
|
|
@@ -767,7 +799,7 @@ where
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mark worker as available again.
|
|
|
|
|
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
self.availability.mark_idle(self.worker_id);
|
|
|
|
|
|
|
|
|
|
idle_start = Instant::now();
|
|
|
|
|
}
|
|
|
|
|
@@ -911,8 +943,8 @@ struct AccountProofWorker<Factory> {
|
|
|
|
|
worker_id: usize,
|
|
|
|
|
/// Channel for dispatching storage proof work (for pre-dispatched target proofs)
|
|
|
|
|
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
|
|
|
|
/// Counter tracking worker availability
|
|
|
|
|
available_workers: Arc<AtomicUsize>,
|
|
|
|
|
/// Per-worker availability flags
|
|
|
|
|
availability: Arc<AvailabilitySheet>,
|
|
|
|
|
/// Cached storage roots
|
|
|
|
|
cached_storage_roots: Arc<DashMap<B256, B256>>,
|
|
|
|
|
/// Metrics collector for this worker
|
|
|
|
|
@@ -934,7 +966,7 @@ where
|
|
|
|
|
work_rx: CrossbeamReceiver<AccountWorkerJob>,
|
|
|
|
|
worker_id: usize,
|
|
|
|
|
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
|
|
|
|
available_workers: Arc<AtomicUsize>,
|
|
|
|
|
availability: Arc<AvailabilitySheet>,
|
|
|
|
|
cached_storage_roots: Arc<DashMap<B256, B256>>,
|
|
|
|
|
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
|
|
|
|
|
#[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
|
|
|
|
|
@@ -944,7 +976,7 @@ where
|
|
|
|
|
work_rx,
|
|
|
|
|
worker_id,
|
|
|
|
|
storage_work_tx,
|
|
|
|
|
available_workers,
|
|
|
|
|
availability,
|
|
|
|
|
cached_storage_roots,
|
|
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
|
metrics,
|
|
|
|
|
@@ -1030,7 +1062,7 @@ where
|
|
|
|
|
)));
|
|
|
|
|
|
|
|
|
|
// Count this worker as available only after successful initialization.
|
|
|
|
|
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
self.availability.mark_idle(self.worker_id);
|
|
|
|
|
|
|
|
|
|
let mut total_idle_time = Duration::ZERO;
|
|
|
|
|
let mut idle_start = Instant::now();
|
|
|
|
|
@@ -1040,7 +1072,7 @@ where
|
|
|
|
|
total_idle_time += idle_start.elapsed();
|
|
|
|
|
|
|
|
|
|
// Mark worker as busy.
|
|
|
|
|
self.available_workers.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
|
self.availability.mark_busy(self.worker_id);
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "trie-debug")]
|
|
|
|
|
if let Some(max_jitter) = self.task_ctx.proof_jitter {
|
|
|
|
|
@@ -1079,7 +1111,7 @@ where
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mark worker as available again.
|
|
|
|
|
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
self.availability.mark_idle(self.worker_id);
|
|
|
|
|
|
|
|
|
|
idle_start = Instant::now();
|
|
|
|
|
}
|
|
|
|
|
|