refactor(engine): remove storage and account proof worker configurations

- Eliminated constants and functions related to storage and account proof worker counts to simplify configuration.
- Updated `TreeConfig` and related functions to remove references to proof worker counts, streamlining the codebase.
- Adjusted proof task handling to utilize half of the queue capacity for storage proofs, enhancing resource management.
This commit is contained in:
Yong Kang
2025-10-07 05:27:09 +00:00
parent 072a4fd2ca
commit 3f0bb47a0e
5 changed files with 2 additions and 113 deletions

View File

@@ -17,21 +17,6 @@ pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 10;
/// This will be deducted from the thread count of main reth global threadpool.
pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Upper limit for storage proof workers to prevent excessive memory usage.
const MAX_STORAGE_PROOF_WORKERS: usize = 12;
/// Upper limit for account proof workers to prevent excessive memory usage.
const MAX_ACCOUNT_PROOF_WORKERS: usize = 4;
/// Lower limit for storage proof workers to maintain pipeline progress.
const MIN_STORAGE_PROOF_WORKERS: usize = 2;
/// Lower limit for account proof workers to keep the pool active.
const MIN_ACCOUNT_PROOF_WORKERS: usize = 1;
/// Default `(storage, account)` worker counts used when CPU detection fails.
const FALLBACK_PROOF_WORKERS: (usize, usize) = (6, 2);
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
@@ -40,61 +25,6 @@ const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: u64 = 4 * 1024 * 1024 * 1024;
/// Calculates optimal `(storage, account)` proof worker counts based on available CPU cores.
///
/// Each worker runs in a dedicated blocking thread via `tokio::spawn_blocking`. The allocation
/// strategy:
/// 1. Query available logical CPUs via `available_parallelism`
/// 2. Reserve `reserved_cpu_cores` for other tasks (e.g., networking, RPC)
/// 3. Allocate remaining cores with a 3:1 ratio favoring storage proofs (which are more intensive)
/// 4. Clamp results to `[MIN, MAX]` bounds to handle both low-core and high-core systems
///
/// Returns `(6, 2)` if CPU detection fails.
pub fn default_proof_workers(reserved_cpu_cores: usize) -> (usize, usize) {
#[cfg(feature = "std")]
{
if let Ok(parallelism) = std::thread::available_parallelism() {
return split_proof_worker_budget(parallelism.get(), reserved_cpu_cores);
}
}
FALLBACK_PROOF_WORKERS
}
#[cfg(feature = "std")]
/// Splits the usable CPU budget into storage/account threads while applying the fixed 3:1 ratio
/// and clamping to the configured min/max bounds.
fn split_proof_worker_budget(parallelism: usize, reserved_cpu_cores: usize) -> (usize, usize) {
const STORAGE_WEIGHT: usize = 3;
const ACCOUNT_WEIGHT: usize = 1;
const TOTAL_WEIGHT: usize = STORAGE_WEIGHT + ACCOUNT_WEIGHT;
let usable = parallelism.saturating_sub(reserved_cpu_cores);
let budget = usable.max(MIN_STORAGE_PROOF_WORKERS + MIN_ACCOUNT_PROOF_WORKERS);
let mut storage = (budget * STORAGE_WEIGHT) / TOTAL_WEIGHT;
let mut account = budget.saturating_sub(storage);
storage = storage.clamp(MIN_STORAGE_PROOF_WORKERS, MAX_STORAGE_PROOF_WORKERS);
account = account.clamp(MIN_ACCOUNT_PROOF_WORKERS, MAX_ACCOUNT_PROOF_WORKERS);
(storage, account)
}
/// Default number of storage proof workers, derived from available parallelism.
///
/// Uses [`default_proof_workers`] with [`DEFAULT_RESERVED_CPU_CORES`].
pub fn default_storage_proof_workers() -> usize {
default_proof_workers(DEFAULT_RESERVED_CPU_CORES).0
}
/// Default number of account proof workers, derived from available parallelism.
///
/// Uses [`default_proof_workers`] with [`DEFAULT_RESERVED_CPU_CORES`].
pub fn default_account_proof_workers() -> usize {
default_proof_workers(DEFAULT_RESERVED_CPU_CORES).1
}
/// Determines if the host has enough parallelism to run the payload processor.
///
/// It requires at least 5 parallel threads:
@@ -151,10 +81,6 @@ pub struct TreeConfig {
has_enough_parallelism: bool,
/// Maximum number of concurrent proof tasks
max_proof_task_concurrency: u64,
/// Number of workers dedicated to storage proof execution
storage_proof_workers: usize,
/// Number of workers dedicated to account proof execution
account_proof_workers: usize,
/// Whether multiproof task should chunk proof targets.
multiproof_chunking_enabled: bool,
/// Multiproof task chunk size for proof targets.
@@ -201,8 +127,6 @@ impl Default for TreeConfig {
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE,
has_enough_parallelism: has_enough_parallelism(),
max_proof_task_concurrency: DEFAULT_MAX_PROOF_TASK_CONCURRENCY,
storage_proof_workers: default_storage_proof_workers(),
account_proof_workers: default_account_proof_workers(),
multiproof_chunking_enabled: true,
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
@@ -232,8 +156,6 @@ impl TreeConfig {
cross_block_cache_size: u64,
has_enough_parallelism: bool,
max_proof_task_concurrency: u64,
storage_proof_workers: usize,
account_proof_workers: usize,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
reserved_cpu_cores: usize,
@@ -257,8 +179,6 @@ impl TreeConfig {
cross_block_cache_size,
has_enough_parallelism,
max_proof_task_concurrency,
storage_proof_workers,
account_proof_workers,
multiproof_chunking_enabled,
multiproof_chunk_size,
reserved_cpu_cores,
@@ -300,16 +220,6 @@ impl TreeConfig {
self.max_proof_task_concurrency
}
/// Return the number of storage proof workers.
pub const fn storage_proof_workers(&self) -> usize {
self.storage_proof_workers
}
/// Return the number of account proof workers.
pub const fn account_proof_workers(&self) -> usize {
self.account_proof_workers
}
/// Return whether the multiproof task chunking is enabled.
pub const fn multiproof_chunking_enabled(&self) -> bool {
self.multiproof_chunking_enabled
@@ -488,18 +398,6 @@ impl TreeConfig {
self
}
/// Setter for number of storage proof workers.
pub const fn with_storage_proof_workers(mut self, storage_proof_workers: usize) -> Self {
self.storage_proof_workers = storage_proof_workers;
self
}
/// Setter for number of account proof workers.
pub const fn with_account_proof_workers(mut self, account_proof_workers: usize) -> Self {
self.account_proof_workers = account_proof_workers;
self
}
/// Setter for whether multiproof task should chunk proof targets.
pub const fn with_multiproof_chunking_enabled(
mut self,
@@ -518,9 +416,6 @@ impl TreeConfig {
/// Setter for the number of reserved CPU cores for any non-reth processes
pub fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
self.reserved_cpu_cores = reserved_cpu_cores;
let (storage_workers, account_workers) = default_proof_workers(reserved_cpu_cores);
self.storage_proof_workers = storage_workers;
self.account_proof_workers = account_workers;
self
}

View File

@@ -196,13 +196,11 @@ where
state_root_config.prefix_sets.clone(),
);
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let storage_worker_count = config.storage_proof_workers();
let proof_task_handle = new_proof_task_handle(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx,
max_proof_task_concurrency,
storage_worker_count,
)?;
// We set it to half of the proof task concurrency, because often for each multiproof we

View File

@@ -1236,7 +1236,6 @@ mod tests {
config.consistent_view.clone(),
task_ctx,
1, // max_concurrency for test
1, // storage worker count for test
)
.expect("Failed to create proof task handle for multiproof test");
let channel = channel();

View File

@@ -461,7 +461,6 @@ mod tests {
consistent_view.clone(),
task_ctx,
1, // max_concurrency for test
1, // storage worker count for test
)
.expect("Failed to create proof task");

View File

@@ -55,13 +55,12 @@ pub fn new_proof_task_handle<Factory>(
view: ConsistentDbView<Factory>,
task_ctx: ProofTaskCtx,
max_concurrency: usize,
storage_worker_count: usize,
) -> ProviderResult<ProofTaskManagerHandle<<Factory::Provider as DBProvider>::Tx>>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + Send + Sync + 'static,
{
let queue_capacity = max_concurrency.max(1);
let worker_count = storage_worker_count.max(1).min(queue_capacity);
let worker_count = queue_capacity / 2; // Right now we are only using half of the queue capacity for storage proofs. TODO: Update this when we have account proof workers.
let (task_sender, task_receiver) = bounded(queue_capacity);
@@ -618,8 +617,7 @@ mod tests {
runtime.handle().clone(),
consistent_view,
task_ctx,
4, // queue capacity
2, // storage worker count
2, // queue capacity (and worker count)
)
.expect("failed to create proof task handle");