From 8ac2b84c016ca52052c9d3999ee4e89c1245b3df Mon Sep 17 00:00:00 2001 From: rakita Date: Wed, 22 Mar 2023 17:28:31 +0100 Subject: [PATCH] perf: Optimize and simplify sender recovery (#1900) --- crates/stages/src/stages/mod.rs | 2 - crates/stages/src/stages/sender_recovery.rs | 59 ++++---- crates/stages/src/stages/stream.rs | 144 -------------------- 3 files changed, 28 insertions(+), 177 deletions(-) delete mode 100644 crates/stages/src/stages/stream.rs diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 98a40c7f34..b44e49309e 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -18,8 +18,6 @@ mod index_storage_history; mod merkle; /// The sender recovery stage. mod sender_recovery; -/// Helper types for working with streams. -mod stream; /// The total difficulty stage mod total_difficulty; /// The transaction lookup stage diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index dc0e39aa97..8695980c8c 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,8 +1,7 @@ use crate::{ - exec_or_return, stages::stream::SequentialPairStream, ExecAction, ExecInput, ExecOutput, Stage, - StageError, StageId, UnwindInput, UnwindOutput, + exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + UnwindOutput, }; -use futures_util::StreamExt; use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -10,12 +9,11 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::TxNumber; +use reth_primitives::{TransactionSigned, TxNumber, H160}; use reth_provider::Transaction; use std::fmt::Debug; use thiserror::Error; use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; /// The [`StageId`] of the sender recovery stage. @@ -81,8 +79,8 @@ impl Stage for SenderRecoveryStage { // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders"); - // An _unordered_ channel to receive results from a rayon job - let (tx, rx) = mpsc::unbounded_channel(); + // channels used to return result of sender recovery. + let mut channels = Vec::new(); // Spawn recovery jobs onto the default rayon threadpool and send the result through the // channel. @@ -93,38 +91,37 @@ impl Stage for SenderRecoveryStage { for chunk in &tx_walker.chunks(self.commit_threshold as usize / rayon::current_num_threads()) { - let tx = tx.clone(); + // An _unordered_ channel to receive results from a rayon job + let (tx, rx) = mpsc::unbounded_channel(); + channels.push(rx); // Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send) - let mut chunk: Vec<_> = chunk.collect(); + let chunk: Vec<_> = chunk.collect(); + + // closure that would recover signer. Used as utility to wrap result + let recover = |entry: Result<(TxNumber, TransactionSigned), reth_db::Error>| -> Result<(u64, H160), Box> { + let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?; + let sender = transaction.recover_signer().ok_or(StageError::from( + SenderRecoveryStageError::SenderRecovery { tx: tx_id }, + ))?; + + Ok((tx_id, sender)) + }; // Spawn the sender recovery task onto the global rayon pool // This task will send the results through the channel after it recovered the senders. rayon::spawn(move || { - chunk - .drain(..) - .map(|entry| { - let (tx_id, transaction) = entry?; - let sender = transaction.recover_signer().ok_or(StageError::from( - SenderRecoveryStageError::SenderRecovery { tx: tx_id }, - ))?; - - Ok((tx_id, sender)) - }) - .for_each(|result: Result<_, StageError>| { - let _ = tx.send(result); - }); + for entry in chunk { + let _ = tx.send(recover(entry)); + } }); } - drop(tx); - // We need sorted results, so we wrap the _unordered_ receiver stream into a sequential - // stream, which yields the results by ascending transaction ID. - let mut recovered_senders = - SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx)); - - while let Some(recovered) = recovered_senders.next().await { - let (id, sender) = recovered?; - senders_cursor.append(id, sender)?; + // Iterate over channels and append the sender in the order that they are received. + for mut channel in channels { + while let Some(recovered) = channel.recv().await { + let (tx_id, sender) = recovered.map_err(|boxed| *boxed)?; + senders_cursor.append(tx_id, sender)?; + } } info!(target: "sync::stages::sender_recovery", stage_progress = end_block, done, "Sync iteration finished"); diff --git a/crates/stages/src/stages/stream.rs b/crates/stages/src/stages/stream.rs deleted file mode 100644 index 5e8f41e3c9..0000000000 --- a/crates/stages/src/stages/stream.rs +++ /dev/null @@ -1,144 +0,0 @@ -use futures_util::stream::Stream; -use num_traits::One; -use std::{ - cmp::Ordering, - collections::{binary_heap::PeekMut, BinaryHeap}, - ops::Add, - pin::Pin, - task::{Context, Poll}, -}; - -/// A Stream type that emits key-value pairs in sequential order of the key. -#[pin_project::pin_project] -#[must_use = "stream does nothing unless polled"] -pub(crate) struct SequentialPairStream { - /// The next item we expect from the stream - next: Key, - /// buffered entries - pending: BinaryHeap>, - #[pin] - stream: St, - done: bool, -} - -// === impl SequentialPairStream === - -impl SequentialPairStream { - /// Returns a new [SequentialPairStream] that emits the items of the given stream in order - /// starting at the given start point. - pub(crate) fn new(start: Key, stream: St) -> Self { - Self { next: start, pending: Default::default(), stream, done: false } - } -} - -/// implements Stream for any underlying Stream that returns a result -impl Stream for SequentialPairStream -where - Key: Ord + Copy + Add + One, - St: Stream>, -{ - type Item = Result<(Key, Value), Err>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - 'outer: loop { - // try to drain buffered items - while let Some(maybe_next) = this.pending.peek_mut() { - match (maybe_next.0).cmp(&*this.next) { - Ordering::Less => { - PeekMut::pop(maybe_next); - continue - } - Ordering::Equal => { - let next = PeekMut::pop(maybe_next); - *this.next = *this.next + Key::one(); - return Poll::Ready(Some(Ok(next.into()))) - } - Ordering::Greater => { - if *this.done { - let next = PeekMut::pop(maybe_next); - return Poll::Ready(Some(Ok(next.into()))) - } - break - } - } - } - - if *this.done { - return Poll::Ready(None) - } - - loop { - match this.stream.as_mut().poll_next(cx) { - Poll::Pending => break, - Poll::Ready(item) => match item { - Some(Ok((k, v))) => { - if k == *this.next { - *this.next = *this.next + Key::one(); - return Poll::Ready(Some(Ok((k, v)))) - } - this.pending.push(OrderedItem(k, v)); - } - Some(err @ Err(_)) => return Poll::Ready(Some(err)), - None => { - *this.done = true; - continue 'outer - } - }, - } - } - - return Poll::Pending - } - } -} - -/// The item a [SequentialPairStream] emits -struct OrderedItem(Key, Value); - -impl From> for (Key, Value) { - fn from(value: OrderedItem) -> Self { - (value.0, value.1) - } -} - -impl Eq for OrderedItem {} - -impl PartialEq for OrderedItem { - fn eq(&self, other: &Self) -> bool { - self.0 == other.0 - } -} - -impl PartialOrd for OrderedItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(other.0.cmp(&self.0)) - } -} - -impl Ord for OrderedItem { - fn cmp(&self, other: &Self) -> Ordering { - // binary heap is max-heap - other.0.cmp(&self.0) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::{stream, TryStreamExt}; - use rand::{seq::SliceRandom, thread_rng}; - - #[tokio::test] - async fn test_ordered_stream() { - let values: Vec<_> = (0..10).map(|i| (i, i)).collect(); - - let mut input = values.clone(); - input.shuffle(&mut thread_rng()); - let stream = stream::iter(input.into_iter().map(Ok::<_, ()>)); - let ordered = SequentialPairStream::new(values[0].0, stream); - let received = ordered.try_collect::>().await.unwrap(); - assert_eq!(received, values); - } -}