perf: use mutex in for_each_ordered (#22252)

This commit is contained in:
DaniPopes
2026-02-17 02:19:56 +01:00
committed by GitHub
parent 5b8808e5fd
commit 4ecb0d5680
4 changed files with 185 additions and 100 deletions

1
Cargo.lock generated
View File

@@ -10354,6 +10354,7 @@ dependencies = [
"crossbeam-utils",
"futures-util",
"metrics",
"parking_lot",
"pin-project",
"quanta",
"rayon",

View File

@@ -445,6 +445,8 @@ where
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx iterator").entered();
let (transactions, convert) = transactions.into_parts();
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
@@ -461,7 +463,9 @@ where
} else {
// Parallel path — recover signatures in parallel on rayon, stream results
// to execution in order via `for_each_ordered`.
rayon::spawn(move || {
self.executor.spawn_blocking(move || {
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx iterator").entered();
let (transactions, convert) = transactions.into_parts();
transactions
.into_par_iter()

View File

@@ -29,11 +29,12 @@ thiserror.workspace = true
# feature `rayon`
rayon = { workspace = true, optional = true }
crossbeam-utils = { workspace = true, optional = true }
parking_lot = { workspace = true, optional = true }
pin-project = { workspace = true, optional = true }
[dev-dependencies]
tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread", "time", "macros"] }
[features]
rayon = ["dep:rayon", "dep:crossbeam-utils", "pin-project"]
rayon = ["dep:rayon", "dep:crossbeam-utils", "dep:parking_lot", "pin-project"]
test-utils = []

View File

@@ -1,10 +1,7 @@
use crossbeam_utils::CachePadded;
use parking_lot::{Condvar, Mutex};
use rayon::iter::{IndexedParallelIterator, ParallelIterator};
use std::{
cell::UnsafeCell,
mem::MaybeUninit,
sync::atomic::{AtomicBool, Ordering},
};
use std::sync::atomic::{AtomicBool, Ordering};
/// Extension trait for [`IndexedParallelIterator`]
/// that streams results to a sequential consumer in index order.
@@ -17,6 +14,13 @@ pub trait ForEachOrdered: IndexedParallelIterator {
/// as it (and all preceding items) are ready.
///
/// `f` does **not** need to be [`Send`] — it runs exclusively on the calling thread.
///
/// # Blocking
///
/// The calling thread blocks (via [`Condvar`]) while waiting for the next item to become
/// ready. It does **not** participate in rayon's work-stealing while blocked. Callers
/// should invoke this from a dedicated blocking thread (e.g. via
/// [`tokio::task::spawn_blocking`]) rather than from within the rayon thread pool.
fn for_each_ordered<F>(self, f: F)
where
Self::Item: Send,
@@ -33,16 +37,17 @@ impl<I: IndexedParallelIterator> ForEachOrdered for I {
}
}
/// A cache-line-padded slot with an atomic ready flag.
/// A slot holding an optional value and a condvar for notification.
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
value: Mutex<Option<T>>,
notify: Condvar,
}
// SAFETY: Each slot is written by exactly one producer and read by exactly one consumer.
// The AtomicBool synchronizes access (Release on write, Acquire on read).
unsafe impl<T: Send> Send for Slot<T> {}
unsafe impl<T: Send> Sync for Slot<T> {}
impl<T> Slot<T> {
const fn new() -> Self {
Self { value: Mutex::new(None), notify: Condvar::new() }
}
}
struct Shared<T> {
slots: Box<[CachePadded<Slot<T>>]>,
@@ -51,60 +56,40 @@ struct Shared<T> {
impl<T> Shared<T> {
fn new(n: usize) -> Self {
Self {
// SAFETY: Zero is a valid bit pattern for Slot.
// Needs to be zero for `ready` to be false.
slots: unsafe {
Box::<[_]>::assume_init(Box::<[CachePadded<Slot<T>>]>::new_zeroed_slice(n))
},
panicked: AtomicBool::new(false),
}
let slots =
(0..n).map(|_| CachePadded::new(Slot::new())).collect::<Vec<_>>().into_boxed_slice();
Self { slots, panicked: AtomicBool::new(false) }
}
/// # Safety
/// Index `i` must be in bounds and must only be written once.
/// Writes a value into slot `i`. Must only be called once per index.
#[inline]
unsafe fn write(&self, i: usize, val: T) {
let slot = unsafe { self.slots.get_unchecked(i) };
unsafe { (*slot.value.get()).write(val) };
slot.ready.store(true, Ordering::Release);
fn write(&self, i: usize, val: T) {
let slot = &self.slots[i];
*slot.value.lock() = Some(val);
slot.notify.notify_one();
}
/// # Safety
/// Index `i` must be in bounds. Must only be called after `ready` is observed `true`.
#[inline]
unsafe fn take(&self, i: usize) -> T {
let slot = unsafe { self.slots.get_unchecked(i) };
let v = unsafe { (*slot.value.get()).assume_init_read() };
// Clear ready so Drop doesn't double-free this slot.
slot.ready.store(false, Ordering::Relaxed);
v
}
#[inline]
fn is_ready(&self, i: usize) -> bool {
// SAFETY: caller ensures `i < n`.
unsafe { self.slots.get_unchecked(i) }.ready.load(Ordering::Acquire)
}
}
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
for slot in &mut *self.slots {
if *slot.ready.get_mut() {
unsafe { (*slot.value.get()).assume_init_drop() };
/// Blocks until slot `i` is ready and takes the value.
/// Returns `None` if the producer panicked.
fn take(&self, i: usize) -> Option<T> {
let slot = &self.slots[i];
let mut guard = slot.value.lock();
loop {
if let Some(val) = guard.take() {
return Some(val);
}
if self.panicked.load(Ordering::Acquire) {
return None;
}
slot.notify.wait(&mut guard);
}
}
}
/// Executes a parallel iterator and delivers results to a sequential callback in index order.
///
/// This works by pre-allocating one cache-line-padded slot per item. Each slot holds an
/// `UnsafeCell<MaybeUninit<T>>` and an `AtomicBool` ready flag. A rayon task computes all
/// items in parallel, writing each result into its slot and setting the flag (`Release`).
/// The calling thread walks slots 0, 1, 2, … in order, spinning on the flag (`Acquire`),
/// then reading the value and passing it to `f`.
/// Each slot has its own [`Condvar`], so the consumer blocks precisely on the slot it needs
/// with zero spurious wakeups.
fn ordered_impl<I, F>(iter: I, mut f: F)
where
I: IndexedParallelIterator,
@@ -125,49 +110,26 @@ where
s.spawn(|_| {
let res = catch_unwind(AssertUnwindSafe(|| {
iter.enumerate().for_each(|(i, item)| {
// SAFETY: `enumerate()` on an IndexedParallelIterator yields each
// index exactly once.
unsafe { shared.write(i, item) };
shared.write(i, item);
});
}));
if let Err(payload) = res {
shared.panicked.store(true, Ordering::Release);
// Wake all slots so the consumer doesn't hang. Lock each slot's mutex
// first to serialize with the consumer's panicked check → wait sequence.
for slot in &*shared.slots {
let _guard = slot.value.lock();
slot.notify.notify_one();
}
std::panic::resume_unwind(payload);
}
});
// Consumer: sequential, ordered, on the calling thread.
// Exponential backoff: 1, 2, 4, …, 64 pause instructions, then OS yields.
const SPIN_SHIFT_LIMIT: u32 = 6;
for i in 0..n {
let mut backoff = 0u32;
'wait: loop {
if shared.is_ready(i) {
break 'wait;
}
if shared.panicked.load(Ordering::Relaxed) {
return;
}
// Yield to rayon's work-stealing so the producer can make progress,
// especially important when the thread pool is small.
if rayon::yield_now() == Some(rayon::Yield::Executed) {
continue 'wait;
}
if backoff < SPIN_SHIFT_LIMIT {
for _ in 0..(1u32 << backoff) {
std::hint::spin_loop();
}
backoff += 1;
} else {
// Producer is genuinely slow; fall back to OS-level yield.
std::thread::yield_now();
}
}
// SAFETY: `i < n` and we just observed the ready flag with Acquire ordering.
let value = unsafe { shared.take(i) };
let Some(value) = shared.take(i) else {
return;
};
f(value);
}
});
@@ -208,8 +170,6 @@ mod tests {
#[test]
fn slow_early_items_still_delivered_in_order() {
// Item 0 is deliberately delayed; all other items complete quickly.
// The consumer must still deliver items in order 0, 1, 2, … regardless.
let barrier = Barrier::new(2);
let n = 64usize;
let input: Vec<usize> = (0..n).collect();
@@ -218,11 +178,7 @@ mod tests {
input
.par_iter()
.map(|&i| {
if i == 0 {
// Wait until at least one other item has been produced.
barrier.wait();
} else if i == n - 1 {
// Signal that other items are ready.
if i == 0 || i == n - 1 {
barrier.wait();
}
i
@@ -258,15 +214,12 @@ mod tests {
});
assert!(result.is_err());
// All produced Tracked values must have been dropped (either consumed or cleaned up).
// We can't assert an exact count since the panic may cut production short.
let drops = DROP_COUNT.load(Ordering::Relaxed);
assert!(drops > 0, "some items should have been dropped");
}
#[test]
fn no_double_drop() {
// Verify that consumed items are dropped exactly once (not double-freed by Drop).
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
struct Counted(#[allow(dead_code)] u64);
@@ -286,7 +239,6 @@ mod tests {
#[test]
fn callback_is_not_send() {
// Verify that the callback does not need to be Send.
use std::rc::Rc;
let counter = Rc::new(std::cell::Cell::new(0u64));
let input: Vec<u64> = (0..100).collect();
@@ -295,4 +247,131 @@ mod tests {
});
assert_eq!(counter.get(), (0..100u64).sum::<u64>());
}
#[test]
fn panic_does_not_deadlock_consumer() {
// Regression test: producer panics while consumer is waiting on a condvar.
// Without the lock-before-notify fix, the consumer could miss the wakeup
// and deadlock.
for _ in 0..100 {
let result = std::panic::catch_unwind(|| {
let input: Vec<usize> = (0..256).collect();
input
.par_iter()
.map(|&i| {
if i == 128 {
// Yield to increase the chance of the consumer being
// between the panicked check and condvar wait.
std::thread::yield_now();
panic!("intentional");
}
i
})
.for_each_ordered(|_| {});
});
assert!(result.is_err());
}
}
#[test]
fn early_panic_at_item_zero() {
let result = std::panic::catch_unwind(|| {
let input: Vec<u64> = (0..10).collect();
input
.par_iter()
.map(|&i| {
assert!(i != 0, "boom at zero");
i
})
.for_each_ordered(|_| {});
});
assert!(result.is_err());
}
#[test]
fn late_panic_at_last_item() {
let n = 100usize;
let result = std::panic::catch_unwind(|| {
let input: Vec<usize> = (0..n).collect();
input
.par_iter()
.map(|&i| {
assert!(i != n - 1, "boom at last");
i
})
.for_each_ordered(|_| {});
});
assert!(result.is_err());
}
#[test]
fn large_items() {
let n = 500usize;
let input: Vec<usize> = (0..n).collect();
let mut output = Vec::with_capacity(n);
input
.par_iter()
.map(|&i| {
// Return a heap-allocated value to stress drop semantics.
vec![i; 64]
})
.for_each_ordered(|v| output.push(v[0]));
assert_eq!(output, input);
}
#[test]
fn consumer_slower_than_producer() {
// Producer is fast, consumer is slow. All items should still arrive in order.
let n = 64usize;
let input: Vec<usize> = (0..n).collect();
let mut output = Vec::with_capacity(n);
input.par_iter().map(|&i| i).for_each_ordered(|x| {
if x % 8 == 0 {
std::thread::yield_now();
}
output.push(x);
});
assert_eq!(output, input);
}
#[test]
fn concurrent_panic_and_drop_no_leak() {
// Ensure items produced before a panic are all dropped.
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
static PRODUCED: AtomicUsize = AtomicUsize::new(0);
struct Tracked;
impl Drop for Tracked {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
DROP_COUNT.store(0, Ordering::Relaxed);
PRODUCED.store(0, Ordering::Relaxed);
let barrier = Barrier::new(2);
let result = std::panic::catch_unwind(|| {
let input: Vec<usize> = (0..64).collect();
input
.par_iter()
.map(|&i| {
if i == 32 {
barrier.wait();
panic!("intentional");
}
if i == 0 {
barrier.wait();
}
PRODUCED.fetch_add(1, Ordering::Relaxed);
Tracked
})
.for_each_ordered(|_| {});
});
assert!(result.is_err());
let produced = PRODUCED.load(Ordering::Relaxed);
let dropped = DROP_COUNT.load(Ordering::Relaxed);
assert_eq!(dropped, produced, "all produced items must be dropped");
}
}