mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf(engine): reduce proof worker count for small blocks (#22074)
Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net> Co-authored-by: YK <chiayongkang@hotmail.com>
This commit is contained in:
committed by
GitHub
parent
d3c42fc718
commit
e4ec836a46
@@ -97,6 +97,7 @@ pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
|
||||
/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
|
||||
/// prewarm workers exceeds the execution time saved.
|
||||
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
|
||||
|
||||
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
|
||||
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
|
||||
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
|
||||
@@ -251,8 +252,9 @@ where
|
||||
// Extract V2 proofs flag early so we can pass it to prewarm
|
||||
let v2_proofs_enabled = !config.disable_proof_v2();
|
||||
|
||||
// Capture parent_state_root before env is moved into spawn_caching_with
|
||||
// Capture fields before env is moved into spawn_caching_with
|
||||
let parent_state_root = env.parent_state_root;
|
||||
let transaction_count = env.transaction_count;
|
||||
|
||||
// Handle BAL-based optimization if available
|
||||
let prewarm_handle = if let Some(bal) = bal {
|
||||
@@ -281,9 +283,11 @@ where
|
||||
)
|
||||
};
|
||||
|
||||
// Create and spawn the storage proof task
|
||||
// Create and spawn the storage proof task.
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
|
||||
let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
|
||||
let proof_handle =
|
||||
ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers, v2_proofs_enabled);
|
||||
|
||||
if config.disable_trie_cache() {
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
@@ -363,6 +367,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Transaction count threshold below which proof workers are halved, since fewer transactions
|
||||
/// produce fewer state changes and most workers would be idle overhead.
|
||||
const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
|
||||
|
||||
/// Transaction count threshold below which sequential signature recovery is used.
|
||||
///
|
||||
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
|
||||
|
||||
@@ -1582,7 +1582,7 @@ mod tests {
|
||||
let changeset_cache = ChangesetCache::new();
|
||||
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
|
||||
let task_ctx = ProofTaskCtx::new(overlay_factory);
|
||||
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
|
||||
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false, false);
|
||||
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
|
||||
|
||||
@@ -325,7 +325,7 @@ mod tests {
|
||||
reth_provider::providers::OverlayStateProviderFactory::new(factory, changeset_cache);
|
||||
let task_ctx = ProofTaskCtx::new(factory);
|
||||
let runtime = reth_tasks::Runtime::test();
|
||||
let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, false);
|
||||
let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, false, false);
|
||||
|
||||
let parallel_result = ParallelProof::new(Default::default(), proof_worker_handle.clone())
|
||||
.decoded_multiproof(targets.clone())
|
||||
|
||||
@@ -134,10 +134,12 @@ impl ProofWorkerHandle {
|
||||
/// # Parameters
|
||||
/// - `runtime`: The centralized runtime used to spawn blocking worker tasks
|
||||
/// - `task_ctx`: Shared context with database view and prefix sets
|
||||
/// - `halve_workers`: Whether to halve the worker pool size (for small blocks)
|
||||
/// - `v2_proofs_enabled`: Whether to enable V2 storage proofs
|
||||
pub fn new<Factory>(
|
||||
runtime: &Runtime,
|
||||
task_ctx: ProofTaskCtx<Factory>,
|
||||
halve_workers: bool,
|
||||
v2_proofs_enabled: bool,
|
||||
) -> Self
|
||||
where
|
||||
@@ -154,13 +156,17 @@ impl ProofWorkerHandle {
|
||||
|
||||
let cached_storage_roots = Arc::<DashMap<_, _>>::default();
|
||||
|
||||
let storage_worker_count = runtime.proof_storage_worker_pool().current_num_threads();
|
||||
let account_worker_count = runtime.proof_account_worker_pool().current_num_threads();
|
||||
let divisor = if halve_workers { 2 } else { 1 };
|
||||
let storage_worker_count =
|
||||
runtime.proof_storage_worker_pool().current_num_threads() / divisor;
|
||||
let account_worker_count =
|
||||
runtime.proof_account_worker_pool().current_num_threads() / divisor;
|
||||
|
||||
debug!(
|
||||
target: "trie::proof_task",
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
halve_workers,
|
||||
?v2_proofs_enabled,
|
||||
"Spawning proof worker pools"
|
||||
);
|
||||
@@ -2012,7 +2018,7 @@ mod tests {
|
||||
let ctx = test_ctx(factory);
|
||||
|
||||
let runtime = reth_tasks::Runtime::test();
|
||||
let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
|
||||
let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false, false);
|
||||
|
||||
// Verify handle can be cloned
|
||||
let _cloned_handle = proof_handle.clone();
|
||||
|
||||
Reference in New Issue
Block a user