perf: add dedicated prewarming rayon pool (#22108)

This commit is contained in:
DaniPopes
2026-02-16 04:05:36 +01:00
committed by GitHub
parent 0142769191
commit 0dd47af250
7 changed files with 126 additions and 187 deletions

View File

@@ -42,18 +42,6 @@ pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK
/// This will be deducted from the thread count of main reth global threadpool.
pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Returns the default maximum concurrency for prewarm task based on available parallelism.
fn default_prewarm_max_concurrency() -> usize {
#[cfg(feature = "std")]
{
std::thread::available_parallelism().map_or(16, |n| n.get())
}
#[cfg(not(feature = "std"))]
{
16
}
}
/// Default depth for sparse trie pruning.
///
/// Nodes at this depth and below are converted to hash stubs to reduce memory.
@@ -161,8 +149,6 @@ pub struct TreeConfig {
/// where immediate payload regeneration is desired despite the head not changing or moving to
/// an ancestor.
always_process_payload_attributes_on_canonical_head: bool,
/// Maximum concurrency for the prewarm task.
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
/// Number of storage proof worker threads.
@@ -209,7 +195,6 @@ impl Default for TreeConfig {
precompile_cache_disabled: false,
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: default_prewarm_max_concurrency(),
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
@@ -246,7 +231,6 @@ impl TreeConfig {
precompile_cache_disabled: bool,
state_root_fallback: bool,
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
@@ -275,7 +259,6 @@ impl TreeConfig {
precompile_cache_disabled,
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
@@ -533,17 +516,6 @@ impl TreeConfig {
self.has_enough_parallelism && !self.legacy_state_root
}
/// Setter for prewarm max concurrency.
pub const fn with_prewarm_max_concurrency(mut self, prewarm_max_concurrency: usize) -> Self {
self.prewarm_max_concurrency = prewarm_max_concurrency;
self
}
/// Return the prewarm max concurrency.
pub const fn prewarm_max_concurrency(&self) -> usize {
self.prewarm_max_concurrency
}
/// Return the number of storage proof worker threads.
pub const fn storage_worker_count(&self) -> usize {
self.storage_worker_count

View File

@@ -132,8 +132,6 @@ where
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Sparse trie prune depth.
sparse_trie_prune_depth: usize,
/// Maximum storage tries to retain after pruning.
@@ -172,7 +170,6 @@ where
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: SharedPreservedSparseTrie::default(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
@@ -467,7 +464,6 @@ where
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
self.prewarm_max_concurrency,
);
{

View File

@@ -14,7 +14,7 @@
use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal::{self, total_slots, BALSlotIter},
bal,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
@@ -25,9 +25,10 @@ use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip4895::Withdrawal;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use alloy_primitives::{keccak256, map::B256Set, StorageKey, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Gauge, Histogram};
use rayon::prelude::*;
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
@@ -39,7 +40,6 @@ use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use std::{
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender, SyncSender},
@@ -86,8 +86,6 @@ where
execution_cache: PayloadExecutionCache,
/// Context provided to execution tasks
ctx: PrewarmContext<N, P, Evm>,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
@@ -108,13 +106,12 @@ where
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
let (actions_tx, actions_rx) = channel();
trace!(
target: "engine::tree::payload_processor::prewarm",
max_concurrency,
prewarming_threads = executor.prewarming_pool().current_num_threads(),
transaction_count = ctx.env.transaction_count,
"Initialized prewarm task"
);
@@ -124,7 +121,6 @@ where
executor,
execution_cache,
ctx,
max_concurrency,
to_multi_proof,
actions_rx,
parent_span: Span::current(),
@@ -148,20 +144,18 @@ where
{
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let span = Span::current();
self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
let transaction_count = ctx.env.transaction_count;
let workers_needed = if transaction_count == 0 {
max_concurrency
let pool_threads = executor.prewarming_pool().current_num_threads();
// Don't spawn more workers than transactions. When transaction_count is 0
// (unknown), use all pool threads.
let workers_needed = if ctx.env.transaction_count > 0 {
ctx.env.transaction_count.min(pool_threads)
} else {
transaction_count.min(max_concurrency)
pool_threads
};
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
@@ -274,10 +268,8 @@ where
}
}
/// Runs BAL-based prewarming by spawning workers to prefetch storage slots.
///
/// Divides the total slots across `max_concurrency` workers, each responsible for
/// prefetching a range of slots from the BAL.
/// Runs BAL-based prewarming by using the prewarming pool's parallel iterator to prefetch
/// accounts and storage slots.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn run_bal_prewarm(
&self,
@@ -296,59 +288,35 @@ where
return;
}
let total_slots = total_slots(&bal);
trace!(
target: "engine::tree::payload_processor::prewarm",
total_slots,
max_concurrency = self.max_concurrency,
"Starting BAL prewarm"
);
if total_slots == 0 {
if bal.is_empty() {
self.send_bal_hashed_state(&bal);
let _ =
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
return;
}
// Calculate number of workers needed (at most max_concurrency)
let workers_needed = total_slots.min(self.max_concurrency);
trace!(
target: "engine::tree::payload_processor::prewarm",
accounts = bal.len(),
"Starting BAL prewarm"
);
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// Calculate slots per worker
let slots_per_worker = total_slots / workers_needed;
let remainder = total_slots % workers_needed;
// Spawn workers with their assigned ranges
for i in 0..workers_needed {
let start = i * slots_per_worker + i.min(remainder);
let extra = if i < remainder { 1 } else { 0 };
let end = start + slots_per_worker + extra;
self.ctx.spawn_bal_worker(
i,
&self.executor,
Arc::clone(&bal),
start..end,
done_tx.clone(),
let ctx = self.ctx.clone();
self.executor.prewarming_pool().install(|| {
bal.par_iter().for_each_init(
|| (ctx.clone(), None::<CachedStateProvider<reth_provider::StateProviderBox>>),
|(ctx, provider), account| {
if ctx.terminate_execution.load(Ordering::Relaxed) {
return;
}
ctx.prefetch_bal_account(provider, account);
},
);
}
// Drop our handle to done_tx so we can detect completion
drop(done_tx);
// Wait for all workers to complete
let mut completed_workers = 0;
while done_rx.recv().is_ok() {
completed_workers += 1;
}
});
trace!(
target: "engine::tree::payload_processor::prewarm",
completed_workers,
"All BAL prewarm workers completed"
"All BAL prewarm accounts completed"
);
// Convert BAL to HashedPostState and send to multiproof task
@@ -668,115 +636,65 @@ where
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
// Spawn workers that all pull from the shared receiver
let executor = task_executor.clone();
let span = Span::current();
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for idx in 0..workers_needed {
let ctx = self.clone();
let to_multi_proof = to_multi_proof.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.transact_batch(rx, to_multi_proof, done_tx);
});
}
});
for idx in 0..workers_needed {
let ctx = self.clone();
let to_multi_proof = to_multi_proof.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: &span, "prewarm worker", idx);
task_executor.prewarming_pool().spawn(move || {
let _enter = span.entered();
ctx.transact_batch(rx, to_multi_proof, done_tx);
});
}
tx_sender
}
/// Spawns a worker task for BAL slot prefetching.
/// Prefetches a single account and all its storage slots from the BAL into the cache.
///
/// The worker iterates over the specified range of slots in the BAL and ensures
/// each slot is loaded into the cache by accessing it through the state provider.
fn spawn_bal_worker(
/// The `provider` is lazily initialized on first call and reused across accounts on the same
/// thread.
fn prefetch_bal_account(
&self,
idx: usize,
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: SyncSender<()>,
provider: &mut Option<CachedStateProvider<reth_provider::StateProviderBox>>,
account: &alloy_eip7928::AccountChanges,
) {
let ctx = self.clone();
let span = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"bal prewarm worker",
idx,
range_start = range.start,
range_end = range.end
);
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.prefetch_bal_slots(bal, range, done_tx);
});
}
/// Prefetches storage slots from a BAL range into the cache.
///
/// This iterates through the specified range of slots and accesses them via the state
/// provider to populate the cache.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn prefetch_bal_slots(
self,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: SyncSender<()>,
) {
let Self { saved_cache, provider, metrics, .. } = self;
// Build state provider
let state_provider = match provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in BAL prewarm thread"
);
let _ = done_tx.send(());
return;
let state_provider = match provider {
Some(p) => p,
slot @ None => {
let built = match self.provider.build() {
Ok(p) => p,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in BAL prewarm thread"
);
return;
}
};
let saved_cache =
self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
slot.insert(CachedStateProvider::new(built, caches, cache_metrics))
}
};
// Wrap with cache (guaranteed to be Some since run_bal_prewarm checks)
let saved_cache = saved_cache.expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics);
let start = Instant::now();
// Track last seen address to avoid fetching the same account multiple times.
let mut last_address = None;
let _ = state_provider.basic_account(&account.address);
// Iterate through the assigned range of slots
for (address, slot) in BALSlotIter::new(&bal, range.clone()) {
// Fetch the account if this is a different address than the last one
if last_address != Some(address) {
let _ = state_provider.basic_account(&address);
last_address = Some(address);
}
// Access the slot to populate the cache
let _ = state_provider.storage(address, slot);
for slot in &account.storage_changes {
let _ = state_provider.storage(account.address, StorageKey::from(slot.slot));
}
for &slot in &account.storage_reads {
let _ = state_provider.storage(account.address, StorageKey::from(slot));
}
let elapsed = start.elapsed();
trace!(
target: "engine::tree::payload_processor::prewarm",
?range,
elapsed_ms = elapsed.as_millis(),
"BAL prewarm worker completed"
);
// Signal completion
let _ = done_tx.send(());
metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64());
self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64());
}
}

View File

@@ -158,6 +158,7 @@ where
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
prewarming_threads: command.engine.prewarming_threads,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(

View File

@@ -38,6 +38,7 @@ pub struct DefaultEngineValues {
allow_unwind_canonical_header: bool,
storage_worker_count: Option<usize>,
account_worker_count: Option<usize>,
prewarming_threads: Option<usize>,
disable_proof_v2: bool,
cache_metrics_disabled: bool,
disable_trie_cache: bool,
@@ -169,6 +170,12 @@ impl DefaultEngineValues {
self
}
/// Set the default prewarming thread count
pub const fn with_prewarming_threads(mut self, v: Option<usize>) -> Self {
self.prewarming_threads = v;
self
}
/// Set whether to disable proof V2 by default
pub const fn with_disable_proof_v2(mut self, v: bool) -> Self {
self.disable_proof_v2 = v;
@@ -233,6 +240,7 @@ impl Default for DefaultEngineValues {
allow_unwind_canonical_header: false,
storage_worker_count: None,
account_worker_count: None,
prewarming_threads: None,
disable_proof_v2: false,
cache_metrics_disabled: false,
disable_trie_cache: false,
@@ -360,6 +368,11 @@ pub struct EngineArgs {
#[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))]
pub account_worker_count: Option<usize>,
/// Configure the number of prewarming threads.
/// If not specified, defaults to available parallelism.
#[arg(long = "engine.prewarming-threads", default_value = Resettable::from(DefaultEngineValues::get_global().prewarming_threads.map(|v| v.to_string().into())))]
pub prewarming_threads: Option<usize>,
/// Disable V2 storage proofs for state root calculations
#[arg(long = "engine.disable-proof-v2", default_value_t = DefaultEngineValues::get_global().disable_proof_v2)]
pub disable_proof_v2: bool,
@@ -424,6 +437,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
prewarming_threads,
disable_proof_v2,
cache_metrics_disabled,
disable_trie_cache,
@@ -455,6 +469,7 @@ impl Default for EngineArgs {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
prewarming_threads,
disable_proof_v2,
cache_metrics_disabled,
disable_trie_cache,
@@ -546,6 +561,7 @@ mod tests {
allow_unwind_canonical_header: true,
storage_worker_count: Some(16),
account_worker_count: Some(8),
prewarming_threads: Some(4),
disable_proof_v2: false,
cache_metrics_disabled: true,
disable_trie_cache: true,
@@ -582,6 +598,8 @@ mod tests {
"16",
"--engine.account-worker-count",
"8",
"--engine.prewarming-threads",
"4",
"--engine.disable-cache-metrics",
"--engine.disable-trie-cache",
"--engine.sparse-trie-prune-depth",

View File

@@ -110,6 +110,9 @@ pub struct RayonConfig {
/// Number of threads for the proof account worker pool (trie account proof workers).
/// If `None`, derived from available parallelism.
pub proof_account_worker_threads: Option<usize>,
/// Number of threads for the prewarming pool (execution prewarming workers).
/// If `None`, derived from available parallelism.
pub prewarming_threads: Option<usize>,
}
#[cfg(feature = "rayon")]
@@ -123,6 +126,7 @@ impl Default for RayonConfig {
max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
proof_storage_worker_threads: None,
proof_account_worker_threads: None,
prewarming_threads: None,
}
}
}
@@ -171,6 +175,12 @@ impl RayonConfig {
self
}
/// Set the number of threads for the prewarming pool.
pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
self.prewarming_threads = Some(prewarming_threads);
self
}
/// Compute the default number of threads based on available parallelism.
fn default_thread_count(&self) -> usize {
self.cpu_threads.unwrap_or_else(|| {
@@ -260,6 +270,9 @@ struct RuntimeInner {
/// Proof account worker pool (trie account proof computation).
#[cfg(feature = "rayon")]
proof_account_worker_pool: rayon::ThreadPool,
/// Prewarming pool (execution prewarming workers).
#[cfg(feature = "rayon")]
prewarming_pool: rayon::ThreadPool,
/// Handle to the spawned [`TaskManager`] background task.
/// The task monitors critical tasks for panics and fires the shutdown signal.
/// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
@@ -355,6 +368,12 @@ impl Runtime {
pub fn proof_account_worker_pool(&self) -> &rayon::ThreadPool {
&self.0.proof_account_worker_pool
}
/// Get the prewarming pool.
#[cfg(feature = "rayon")]
pub fn prewarming_pool(&self) -> &rayon::ThreadPool {
&self.0.prewarming_pool
}
}
// ── Test helpers ──────────────────────────────────────────────────────
@@ -394,6 +413,7 @@ impl Runtime {
max_blocking_tasks: 16,
proof_storage_worker_threads: Some(2),
proof_account_worker_threads: Some(2),
prewarming_threads: Some(2),
},
}
}
@@ -824,6 +844,7 @@ impl RuntimeBuilder {
blocking_guard,
proof_storage_worker_pool,
proof_account_worker_pool,
prewarming_pool,
) = {
let default_threads = config.rayon.default_thread_count();
let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
@@ -862,12 +883,19 @@ impl RuntimeBuilder {
.thread_name(|i| format!("proof-acct-{i:02}"))
.build()?;
let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
let prewarming_pool = rayon::ThreadPoolBuilder::new()
.num_threads(prewarming_threads)
.thread_name(|i| format!("prewarm-{i:02}"))
.build()?;
debug!(
default_threads,
rpc_threads,
storage_threads,
proof_storage_worker_threads,
proof_account_worker_threads,
prewarming_threads,
max_blocking_tasks = config.rayon.max_blocking_tasks,
"Initialized rayon thread pools"
);
@@ -879,6 +907,7 @@ impl RuntimeBuilder {
blocking_guard,
proof_storage_worker_pool,
proof_account_worker_pool,
prewarming_pool,
)
};
@@ -909,6 +938,8 @@ impl RuntimeBuilder {
proof_storage_worker_pool,
#[cfg(feature = "rayon")]
proof_account_worker_pool,
#[cfg(feature = "rayon")]
prewarming_pool,
task_manager_handle: Mutex::new(Some(task_manager_handle)),
};

View File

@@ -981,6 +981,9 @@ Engine:
--engine.account-worker-count <ACCOUNT_WORKER_COUNT>
Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers
--engine.prewarming-threads <PREWARMING_THREADS>
Configure the number of prewarming threads. If not specified, defaults to available parallelism
--engine.disable-proof-v2
Disable V2 storage proofs for state root calculations