From 33ac869a85a4ecb10be0dee526e901b04fa9477e Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Sun, 15 Feb 2026 18:06:10 -0800 Subject: [PATCH] perf(engine): replace channel+BTreeMap reorder with lock-free for_each_ordered (#22144) Co-authored-by: Amp Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com> --- Cargo.lock | 1 + Cargo.toml | 1 + crates/engine/tree/Cargo.toml | 2 +- .../tree/src/tree/payload_processor/mod.rs | 61 ++-- crates/tasks/Cargo.toml | 3 +- crates/tasks/src/for_each_ordered.rs | 298 ++++++++++++++++++ crates/tasks/src/lib.rs | 6 + 7 files changed, 329 insertions(+), 43 deletions(-) create mode 100644 crates/tasks/src/for_each_ordered.rs diff --git a/Cargo.lock b/Cargo.lock index 83767fc787..4a95e046c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10350,6 +10350,7 @@ name = "reth-tasks" version = "1.11.0" dependencies = [ "auto_impl", + "crossbeam-utils", "dyn-clone", "futures-util", "metrics", diff --git a/Cargo.toml b/Cargo.toml index 77d1e3ebea..0b4060d563 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -663,6 +663,7 @@ cipher = "0.4.3" comfy-table = "7.0" concat-kdf = "0.1.0" crossbeam-channel = "0.5.13" +crossbeam-utils = "0.8" crossterm = "0.29.0" csv = "1.3.0" ctrlc = "3.4" diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 2edd073544..2445c60ed5 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -29,7 +29,7 @@ reth-provider.workspace = true reth-prune.workspace = true reth-revm = { workspace = true, features = ["optional-balance-check"] } reth-stages-api.workspace = true -reth-tasks.workspace = true +reth-tasks = { workspace = true, features = ["rayon"] } reth-trie-parallel.workspace = true reth-trie-sparse = { workspace = true, features = ["std", "metrics"] } reth-trie.workspace = true diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index e40c926ae2..b7f4b3d091 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -33,7 +33,7 @@ use reth_provider::{ StateProviderFactory, StateReader, }; use reth_revm::{db::BundleState, state::EvmState}; -use reth_tasks::Runtime; +use reth_tasks::{ForEachOrdered, Runtime}; use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory}; use reth_trie_parallel::{ proof_task::{ProofTaskCtx, ProofWorkerHandle}, @@ -43,7 +43,6 @@ use reth_trie_sparse::{ ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie, }; use std::{ - collections::BTreeMap, ops::Not, sync::{ atomic::AtomicBool, @@ -382,7 +381,9 @@ where /// Spawns a task advancing transaction env iterator and streaming updates through a channel. /// /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses - /// sequential iteration to avoid rayon overhead. + /// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel + /// iteration with [`ForEachOrdered`] to recover signatures in parallel while streaming + /// results to execution in the original transaction order. #[expect(clippy::type_complexity)] fn spawn_tx_iterator>( &self, @@ -392,7 +393,6 @@ where mpsc::Receiver, I::Recovered>>, mpsc::Receiver, I::Recovered>, I::Error>>, ) { - let (ooo_tx, ooo_rx) = mpsc::sync_channel(transaction_count); let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count); let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count); @@ -408,7 +408,7 @@ where ); self.executor.spawn_blocking(move || { let (transactions, convert) = transactions.into_parts(); - for (idx, tx) in transactions.into_iter().enumerate() { + for tx in transactions { let tx = convert.convert(tx); let tx = tx.map(|tx| { let (tx_env, tx) = tx.into_parts(); @@ -417,53 +417,32 @@ where if let Ok(tx) = &tx { let _ = prewarm_tx.send(tx.clone()); } - let _ = ooo_tx.send((idx, tx)); + let _ = execute_tx.send(tx); } }); } else { - // Parallel path — spawn on rayon for parallel signature recovery. + // Parallel path — recover signatures in parallel on rayon, stream results + // to execution in order via `for_each_ordered`. rayon::spawn(move || { let (transactions, convert) = transactions.into_parts(); - transactions.into_par_iter().enumerate().for_each_with( - ooo_tx, - |ooo_tx, (idx, tx)| { + transactions + .into_par_iter() + .map(|tx| { let tx = convert.convert(tx); - let tx = tx.map(|tx| { + tx.map(|tx| { let (tx_env, tx) = tx.into_parts(); - WithTxEnv { tx_env, tx: Arc::new(tx) } - }); - // Only send Ok(_) variants to prewarming task. - if let Ok(tx) = &tx { + let tx = WithTxEnv { tx_env, tx: Arc::new(tx) }; + // Send to prewarming out of order — order doesn't matter there. let _ = prewarm_tx.send(tx.clone()); - } - let _ = ooo_tx.send((idx, tx)); - }, - ); + tx + }) + }) + .for_each_ordered(|tx| { + let _ = execute_tx.send(tx); + }); }); } - // Spawn a task that processes out-of-order transactions from the task above and sends them - // to the execution task in order. - self.executor.spawn_blocking(move || { - let mut next_for_execution = 0; - let mut queue = BTreeMap::new(); - while let Ok((idx, tx)) = ooo_rx.recv() { - if next_for_execution == idx { - let _ = execute_tx.send(tx); - next_for_execution += 1; - - while let Some(entry) = queue.first_entry() && - *entry.key() == next_for_execution - { - let _ = execute_tx.send(entry.remove()); - next_for_execution += 1; - } - } else { - queue.insert(idx, tx); - } - } - }); - (prewarm_rx, execute_rx) } diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 8150651c41..78d1e4d7ad 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -29,11 +29,12 @@ dyn-clone.workspace = true # feature `rayon` rayon = { workspace = true, optional = true } +crossbeam-utils = { workspace = true, optional = true } pin-project = { workspace = true, optional = true } [dev-dependencies] tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } [features] -rayon = ["dep:rayon", "pin-project"] +rayon = ["dep:rayon", "dep:crossbeam-utils", "pin-project"] test-utils = [] diff --git a/crates/tasks/src/for_each_ordered.rs b/crates/tasks/src/for_each_ordered.rs new file mode 100644 index 0000000000..b018a838ee --- /dev/null +++ b/crates/tasks/src/for_each_ordered.rs @@ -0,0 +1,298 @@ +use crossbeam_utils::CachePadded; +use rayon::iter::{IndexedParallelIterator, ParallelIterator}; +use std::{ + cell::UnsafeCell, + mem::MaybeUninit, + sync::atomic::{AtomicBool, Ordering}, +}; + +/// Extension trait for [`IndexedParallelIterator`] +/// that streams results to a sequential consumer in index order. +pub trait ForEachOrdered: IndexedParallelIterator { + /// Executes the parallel iterator, calling `f` on each result **sequentially in index + /// order**. + /// + /// Items are computed in parallel, but `f` is invoked as `f(item_0)`, `f(item_1)`, …, + /// `f(item_{n-1})` on the calling thread. The calling thread receives each item as soon + /// as it (and all preceding items) are ready. + /// + /// `f` does **not** need to be [`Send`] — it runs exclusively on the calling thread. + fn for_each_ordered(self, f: F) + where + Self::Item: Send, + F: FnMut(Self::Item); +} + +impl ForEachOrdered for I { + fn for_each_ordered(self, f: F) + where + Self::Item: Send, + F: FnMut(Self::Item), + { + ordered_impl(self, f); + } +} + +/// A cache-line-padded slot with an atomic ready flag. +struct Slot { + value: UnsafeCell>, + ready: AtomicBool, +} + +// SAFETY: Each slot is written by exactly one producer and read by exactly one consumer. +// The AtomicBool synchronizes access (Release on write, Acquire on read). +unsafe impl Send for Slot {} +unsafe impl Sync for Slot {} + +struct Shared { + slots: Box<[CachePadded>]>, + panicked: AtomicBool, +} + +impl Shared { + fn new(n: usize) -> Self { + Self { + // SAFETY: Zero is a valid bit pattern for Slot. + // Needs to be zero for `ready` to be false. + slots: unsafe { + Box::<[_]>::assume_init(Box::<[CachePadded>]>::new_zeroed_slice(n)) + }, + panicked: AtomicBool::new(false), + } + } + + /// # Safety + /// Index `i` must be in bounds and must only be written once. + #[inline] + unsafe fn write(&self, i: usize, val: T) { + let slot = unsafe { self.slots.get_unchecked(i) }; + unsafe { (*slot.value.get()).write(val) }; + slot.ready.store(true, Ordering::Release); + } + + /// # Safety + /// Index `i` must be in bounds. Must only be called after `ready` is observed `true`. + #[inline] + unsafe fn take(&self, i: usize) -> T { + let slot = unsafe { self.slots.get_unchecked(i) }; + let v = unsafe { (*slot.value.get()).assume_init_read() }; + // Clear ready so Drop doesn't double-free this slot. + slot.ready.store(false, Ordering::Relaxed); + v + } + + #[inline] + fn is_ready(&self, i: usize) -> bool { + // SAFETY: caller ensures `i < n`. + unsafe { self.slots.get_unchecked(i) }.ready.load(Ordering::Acquire) + } +} + +impl Drop for Shared { + fn drop(&mut self) { + for slot in &mut *self.slots { + if *slot.ready.get_mut() { + unsafe { (*slot.value.get()).assume_init_drop() }; + } + } + } +} + +/// Executes a parallel iterator and delivers results to a sequential callback in index order. +/// +/// This works by pre-allocating one cache-line-padded slot per item. Each slot holds an +/// `UnsafeCell>` and an `AtomicBool` ready flag. A rayon task computes all +/// items in parallel, writing each result into its slot and setting the flag (`Release`). +/// The calling thread walks slots 0, 1, 2, … in order, spinning on the flag (`Acquire`), +/// then reading the value and passing it to `f`. +fn ordered_impl(iter: I, mut f: F) +where + I: IndexedParallelIterator, + I::Item: Send, + F: FnMut(I::Item), +{ + use std::panic::{catch_unwind, AssertUnwindSafe}; + + let n = iter.len(); + if n == 0 { + return; + } + + let shared = Shared::::new(n); + + rayon::in_place_scope(|s| { + // Producer: compute items in parallel and write them into their slots. + s.spawn(|_| { + let res = catch_unwind(AssertUnwindSafe(|| { + iter.enumerate().for_each(|(i, item)| { + // SAFETY: `enumerate()` on an IndexedParallelIterator yields each + // index exactly once. + unsafe { shared.write(i, item) }; + }); + })); + if let Err(payload) = res { + shared.panicked.store(true, Ordering::Release); + std::panic::resume_unwind(payload); + } + }); + + // Consumer: sequential, ordered, on the calling thread. + // Exponential backoff: 1, 2, 4, …, 64 pause instructions, then OS yields. + const SPIN_SHIFT_LIMIT: u32 = 6; + for i in 0..n { + let mut backoff = 0u32; + 'wait: loop { + if shared.is_ready(i) { + break 'wait; + } + + if shared.panicked.load(Ordering::Relaxed) { + return; + } + + // Yield to rayon's work-stealing so the producer can make progress, + // especially important when the thread pool is small. + if rayon::yield_now() == Some(rayon::Yield::Executed) { + continue 'wait; + } + + if backoff < SPIN_SHIFT_LIMIT { + for _ in 0..(1u32 << backoff) { + std::hint::spin_loop(); + } + backoff += 1; + } else { + // Producer is genuinely slow; fall back to OS-level yield. + std::thread::yield_now(); + } + } + // SAFETY: `i < n` and we just observed the ready flag with Acquire ordering. + let value = unsafe { shared.take(i) }; + f(value); + } + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use rayon::prelude::*; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Barrier, + }; + + #[test] + fn preserves_order() { + let input: Vec = (0..1000).collect(); + let mut output = Vec::with_capacity(input.len()); + input.par_iter().map(|x| x * 2).for_each_ordered(|x| output.push(x)); + let expected: Vec = (0..1000).map(|x| x * 2).collect(); + assert_eq!(output, expected); + } + + #[test] + fn empty_iterator() { + let input: Vec = vec![]; + let mut output = Vec::new(); + input.par_iter().map(|x| *x).for_each_ordered(|x| output.push(x)); + assert!(output.is_empty()); + } + + #[test] + fn single_element() { + let mut output = Vec::new(); + vec![42u64].par_iter().map(|x| *x).for_each_ordered(|x| output.push(x)); + assert_eq!(output, vec![42]); + } + + #[test] + fn slow_early_items_still_delivered_in_order() { + // Item 0 is deliberately delayed; all other items complete quickly. + // The consumer must still deliver items in order 0, 1, 2, … regardless. + let barrier = Barrier::new(2); + let n = 64usize; + let input: Vec = (0..n).collect(); + let mut output = Vec::with_capacity(n); + + input + .par_iter() + .map(|&i| { + if i == 0 { + // Wait until at least one other item has been produced. + barrier.wait(); + } else if i == n - 1 { + // Signal that other items are ready. + barrier.wait(); + } + i + }) + .for_each_ordered(|x| output.push(x)); + + assert_eq!(output, input); + } + + #[test] + fn drops_unconsumed_slots_on_panic() { + static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); + + #[derive(Clone)] + struct Tracked(#[allow(dead_code)] u64); + impl Drop for Tracked { + fn drop(&mut self) { + DROP_COUNT.fetch_add(1, Ordering::Relaxed); + } + } + + DROP_COUNT.store(0, Ordering::Relaxed); + + let input: Vec = (0..100).collect(); + let result = std::panic::catch_unwind(|| { + input + .par_iter() + .map(|&i| { + assert!(i != 50, "intentional"); + Tracked(i) + }) + .for_each_ordered(|_item| {}); + }); + + assert!(result.is_err()); + // All produced Tracked values must have been dropped (either consumed or cleaned up). + // We can't assert an exact count since the panic may cut production short. + let drops = DROP_COUNT.load(Ordering::Relaxed); + assert!(drops > 0, "some items should have been dropped"); + } + + #[test] + fn no_double_drop() { + // Verify that consumed items are dropped exactly once (not double-freed by Drop). + static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); + + struct Counted(#[allow(dead_code)] u64); + impl Drop for Counted { + fn drop(&mut self) { + DROP_COUNT.fetch_add(1, Ordering::Relaxed); + } + } + + DROP_COUNT.store(0, Ordering::Relaxed); + let n = 200u64; + let input: Vec = (0..n).collect(); + input.par_iter().map(|&i| Counted(i)).for_each_ordered(|_item| {}); + + assert_eq!(DROP_COUNT.load(Ordering::Relaxed), n as usize); + } + + #[test] + fn callback_is_not_send() { + // Verify that the callback does not need to be Send. + use std::rc::Rc; + let counter = Rc::new(std::cell::Cell::new(0u64)); + let input: Vec = (0..100).collect(); + input.par_iter().map(|&x| x).for_each_ordered(|x| { + counter.set(counter.get() + x); + }); + assert_eq!(counter.get(), (0..100u64).sum::()); + } +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 091d66ae1b..1a0e38c953 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -40,6 +40,12 @@ pub mod shutdown; #[cfg(feature = "rayon")] pub mod pool; +/// Lock-free ordered parallel iterator extension trait. +#[cfg(feature = "rayon")] +pub mod for_each_ordered; +#[cfg(feature = "rayon")] +pub use for_each_ordered::ForEachOrdered; + #[cfg(feature = "rayon")] pub use runtime::RayonConfig; pub use runtime::{Runtime, RuntimeBuildError, RuntimeBuilder, RuntimeConfig, TokioConfig};