diff --git a/Cargo.lock b/Cargo.lock index 7ec0ac0a12..ea1b8aba65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10354,6 +10354,7 @@ dependencies = [ "crossbeam-utils", "futures-util", "metrics", + "parking_lot", "pin-project", "quanta", "rayon", diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index ce32f90f65..8ee3c6d700 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -445,6 +445,8 @@ where "using sequential sig recovery for small block" ); self.executor.spawn_blocking(move || { + let _enter = + debug_span!(target: "engine::tree::payload_processor", "tx iterator").entered(); let (transactions, convert) = transactions.into_parts(); for (idx, tx) in transactions.into_iter().enumerate() { let tx = convert.convert(tx); @@ -461,7 +463,9 @@ where } else { // Parallel path — recover signatures in parallel on rayon, stream results // to execution in order via `for_each_ordered`. - rayon::spawn(move || { + self.executor.spawn_blocking(move || { + let _enter = + debug_span!(target: "engine::tree::payload_processor", "tx iterator").entered(); let (transactions, convert) = transactions.into_parts(); transactions .into_par_iter() diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 31a1be9091..90a9f19fa2 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -29,11 +29,12 @@ thiserror.workspace = true # feature `rayon` rayon = { workspace = true, optional = true } crossbeam-utils = { workspace = true, optional = true } +parking_lot = { workspace = true, optional = true } pin-project = { workspace = true, optional = true } [dev-dependencies] tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } [features] -rayon = ["dep:rayon", "dep:crossbeam-utils", "pin-project"] +rayon = ["dep:rayon", "dep:crossbeam-utils", "dep:parking_lot", "pin-project"] test-utils = [] diff --git a/crates/tasks/src/for_each_ordered.rs b/crates/tasks/src/for_each_ordered.rs index b018a838ee..53907e3e4a 100644 --- a/crates/tasks/src/for_each_ordered.rs +++ b/crates/tasks/src/for_each_ordered.rs @@ -1,10 +1,7 @@ use crossbeam_utils::CachePadded; +use parking_lot::{Condvar, Mutex}; use rayon::iter::{IndexedParallelIterator, ParallelIterator}; -use std::{ - cell::UnsafeCell, - mem::MaybeUninit, - sync::atomic::{AtomicBool, Ordering}, -}; +use std::sync::atomic::{AtomicBool, Ordering}; /// Extension trait for [`IndexedParallelIterator`] /// that streams results to a sequential consumer in index order. @@ -17,6 +14,13 @@ pub trait ForEachOrdered: IndexedParallelIterator { /// as it (and all preceding items) are ready. /// /// `f` does **not** need to be [`Send`] — it runs exclusively on the calling thread. + /// + /// # Blocking + /// + /// The calling thread blocks (via [`Condvar`]) while waiting for the next item to become + /// ready. It does **not** participate in rayon's work-stealing while blocked. Callers + /// should invoke this from a dedicated blocking thread (e.g. via + /// [`tokio::task::spawn_blocking`]) rather than from within the rayon thread pool. fn for_each_ordered(self, f: F) where Self::Item: Send, @@ -33,16 +37,17 @@ impl ForEachOrdered for I { } } -/// A cache-line-padded slot with an atomic ready flag. +/// A slot holding an optional value and a condvar for notification. struct Slot { - value: UnsafeCell>, - ready: AtomicBool, + value: Mutex>, + notify: Condvar, } -// SAFETY: Each slot is written by exactly one producer and read by exactly one consumer. -// The AtomicBool synchronizes access (Release on write, Acquire on read). -unsafe impl Send for Slot {} -unsafe impl Sync for Slot {} +impl Slot { + const fn new() -> Self { + Self { value: Mutex::new(None), notify: Condvar::new() } + } +} struct Shared { slots: Box<[CachePadded>]>, @@ -51,60 +56,40 @@ struct Shared { impl Shared { fn new(n: usize) -> Self { - Self { - // SAFETY: Zero is a valid bit pattern for Slot. - // Needs to be zero for `ready` to be false. - slots: unsafe { - Box::<[_]>::assume_init(Box::<[CachePadded>]>::new_zeroed_slice(n)) - }, - panicked: AtomicBool::new(false), - } + let slots = + (0..n).map(|_| CachePadded::new(Slot::new())).collect::>().into_boxed_slice(); + Self { slots, panicked: AtomicBool::new(false) } } - /// # Safety - /// Index `i` must be in bounds and must only be written once. + /// Writes a value into slot `i`. Must only be called once per index. #[inline] - unsafe fn write(&self, i: usize, val: T) { - let slot = unsafe { self.slots.get_unchecked(i) }; - unsafe { (*slot.value.get()).write(val) }; - slot.ready.store(true, Ordering::Release); + fn write(&self, i: usize, val: T) { + let slot = &self.slots[i]; + *slot.value.lock() = Some(val); + slot.notify.notify_one(); } - /// # Safety - /// Index `i` must be in bounds. Must only be called after `ready` is observed `true`. - #[inline] - unsafe fn take(&self, i: usize) -> T { - let slot = unsafe { self.slots.get_unchecked(i) }; - let v = unsafe { (*slot.value.get()).assume_init_read() }; - // Clear ready so Drop doesn't double-free this slot. - slot.ready.store(false, Ordering::Relaxed); - v - } - - #[inline] - fn is_ready(&self, i: usize) -> bool { - // SAFETY: caller ensures `i < n`. - unsafe { self.slots.get_unchecked(i) }.ready.load(Ordering::Acquire) - } -} - -impl Drop for Shared { - fn drop(&mut self) { - for slot in &mut *self.slots { - if *slot.ready.get_mut() { - unsafe { (*slot.value.get()).assume_init_drop() }; + /// Blocks until slot `i` is ready and takes the value. + /// Returns `None` if the producer panicked. + fn take(&self, i: usize) -> Option { + let slot = &self.slots[i]; + let mut guard = slot.value.lock(); + loop { + if let Some(val) = guard.take() { + return Some(val); } + if self.panicked.load(Ordering::Acquire) { + return None; + } + slot.notify.wait(&mut guard); } } } /// Executes a parallel iterator and delivers results to a sequential callback in index order. /// -/// This works by pre-allocating one cache-line-padded slot per item. Each slot holds an -/// `UnsafeCell>` and an `AtomicBool` ready flag. A rayon task computes all -/// items in parallel, writing each result into its slot and setting the flag (`Release`). -/// The calling thread walks slots 0, 1, 2, … in order, spinning on the flag (`Acquire`), -/// then reading the value and passing it to `f`. +/// Each slot has its own [`Condvar`], so the consumer blocks precisely on the slot it needs +/// with zero spurious wakeups. fn ordered_impl(iter: I, mut f: F) where I: IndexedParallelIterator, @@ -125,49 +110,26 @@ where s.spawn(|_| { let res = catch_unwind(AssertUnwindSafe(|| { iter.enumerate().for_each(|(i, item)| { - // SAFETY: `enumerate()` on an IndexedParallelIterator yields each - // index exactly once. - unsafe { shared.write(i, item) }; + shared.write(i, item); }); })); if let Err(payload) = res { shared.panicked.store(true, Ordering::Release); + // Wake all slots so the consumer doesn't hang. Lock each slot's mutex + // first to serialize with the consumer's panicked check → wait sequence. + for slot in &*shared.slots { + let _guard = slot.value.lock(); + slot.notify.notify_one(); + } std::panic::resume_unwind(payload); } }); // Consumer: sequential, ordered, on the calling thread. - // Exponential backoff: 1, 2, 4, …, 64 pause instructions, then OS yields. - const SPIN_SHIFT_LIMIT: u32 = 6; for i in 0..n { - let mut backoff = 0u32; - 'wait: loop { - if shared.is_ready(i) { - break 'wait; - } - - if shared.panicked.load(Ordering::Relaxed) { - return; - } - - // Yield to rayon's work-stealing so the producer can make progress, - // especially important when the thread pool is small. - if rayon::yield_now() == Some(rayon::Yield::Executed) { - continue 'wait; - } - - if backoff < SPIN_SHIFT_LIMIT { - for _ in 0..(1u32 << backoff) { - std::hint::spin_loop(); - } - backoff += 1; - } else { - // Producer is genuinely slow; fall back to OS-level yield. - std::thread::yield_now(); - } - } - // SAFETY: `i < n` and we just observed the ready flag with Acquire ordering. - let value = unsafe { shared.take(i) }; + let Some(value) = shared.take(i) else { + return; + }; f(value); } }); @@ -208,8 +170,6 @@ mod tests { #[test] fn slow_early_items_still_delivered_in_order() { - // Item 0 is deliberately delayed; all other items complete quickly. - // The consumer must still deliver items in order 0, 1, 2, … regardless. let barrier = Barrier::new(2); let n = 64usize; let input: Vec = (0..n).collect(); @@ -218,11 +178,7 @@ mod tests { input .par_iter() .map(|&i| { - if i == 0 { - // Wait until at least one other item has been produced. - barrier.wait(); - } else if i == n - 1 { - // Signal that other items are ready. + if i == 0 || i == n - 1 { barrier.wait(); } i @@ -258,15 +214,12 @@ mod tests { }); assert!(result.is_err()); - // All produced Tracked values must have been dropped (either consumed or cleaned up). - // We can't assert an exact count since the panic may cut production short. let drops = DROP_COUNT.load(Ordering::Relaxed); assert!(drops > 0, "some items should have been dropped"); } #[test] fn no_double_drop() { - // Verify that consumed items are dropped exactly once (not double-freed by Drop). static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); struct Counted(#[allow(dead_code)] u64); @@ -286,7 +239,6 @@ mod tests { #[test] fn callback_is_not_send() { - // Verify that the callback does not need to be Send. use std::rc::Rc; let counter = Rc::new(std::cell::Cell::new(0u64)); let input: Vec = (0..100).collect(); @@ -295,4 +247,131 @@ mod tests { }); assert_eq!(counter.get(), (0..100u64).sum::()); } + + #[test] + fn panic_does_not_deadlock_consumer() { + // Regression test: producer panics while consumer is waiting on a condvar. + // Without the lock-before-notify fix, the consumer could miss the wakeup + // and deadlock. + for _ in 0..100 { + let result = std::panic::catch_unwind(|| { + let input: Vec = (0..256).collect(); + input + .par_iter() + .map(|&i| { + if i == 128 { + // Yield to increase the chance of the consumer being + // between the panicked check and condvar wait. + std::thread::yield_now(); + panic!("intentional"); + } + i + }) + .for_each_ordered(|_| {}); + }); + assert!(result.is_err()); + } + } + + #[test] + fn early_panic_at_item_zero() { + let result = std::panic::catch_unwind(|| { + let input: Vec = (0..10).collect(); + input + .par_iter() + .map(|&i| { + assert!(i != 0, "boom at zero"); + i + }) + .for_each_ordered(|_| {}); + }); + assert!(result.is_err()); + } + + #[test] + fn late_panic_at_last_item() { + let n = 100usize; + let result = std::panic::catch_unwind(|| { + let input: Vec = (0..n).collect(); + input + .par_iter() + .map(|&i| { + assert!(i != n - 1, "boom at last"); + i + }) + .for_each_ordered(|_| {}); + }); + assert!(result.is_err()); + } + + #[test] + fn large_items() { + let n = 500usize; + let input: Vec = (0..n).collect(); + let mut output = Vec::with_capacity(n); + input + .par_iter() + .map(|&i| { + // Return a heap-allocated value to stress drop semantics. + vec![i; 64] + }) + .for_each_ordered(|v| output.push(v[0])); + assert_eq!(output, input); + } + + #[test] + fn consumer_slower_than_producer() { + // Producer is fast, consumer is slow. All items should still arrive in order. + let n = 64usize; + let input: Vec = (0..n).collect(); + let mut output = Vec::with_capacity(n); + input.par_iter().map(|&i| i).for_each_ordered(|x| { + if x % 8 == 0 { + std::thread::yield_now(); + } + output.push(x); + }); + assert_eq!(output, input); + } + + #[test] + fn concurrent_panic_and_drop_no_leak() { + // Ensure items produced before a panic are all dropped. + static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); + static PRODUCED: AtomicUsize = AtomicUsize::new(0); + + struct Tracked; + impl Drop for Tracked { + fn drop(&mut self) { + DROP_COUNT.fetch_add(1, Ordering::Relaxed); + } + } + + DROP_COUNT.store(0, Ordering::Relaxed); + PRODUCED.store(0, Ordering::Relaxed); + + let barrier = Barrier::new(2); + let result = std::panic::catch_unwind(|| { + let input: Vec = (0..64).collect(); + input + .par_iter() + .map(|&i| { + if i == 32 { + barrier.wait(); + panic!("intentional"); + } + if i == 0 { + barrier.wait(); + } + PRODUCED.fetch_add(1, Ordering::Relaxed); + Tracked + }) + .for_each_ordered(|_| {}); + }); + + assert!(result.is_err()); + let produced = PRODUCED.load(Ordering::Relaxed); + let dropped = DROP_COUNT.load(Ordering::Relaxed); + assert_eq!(dropped, produced, "all produced items must be dropped"); + } }