perf: Optimize and simplify sender recovery (#1900)

This commit is contained in:
rakita
2023-03-22 17:28:31 +01:00
committed by GitHub
parent 6afdd331dc
commit 8ac2b84c01
3 changed files with 28 additions and 177 deletions

View File

@@ -18,8 +18,6 @@ mod index_storage_history;
mod merkle;
/// The sender recovery stage.
mod sender_recovery;
/// Helper types for working with streams.
mod stream;
/// The total difficulty stage
mod total_difficulty;
/// The transaction lookup stage

View File

@@ -1,8 +1,7 @@
use crate::{
exec_or_return, stages::stream::SequentialPairStream, ExecAction, ExecInput, ExecOutput, Stage,
StageError, StageId, UnwindInput, UnwindOutput,
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use futures_util::StreamExt;
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@@ -10,12 +9,11 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::TxNumber;
use reth_primitives::{TransactionSigned, TxNumber, H160};
use reth_provider::Transaction;
use std::fmt::Debug;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
/// The [`StageId`] of the sender recovery stage.
@@ -81,8 +79,8 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();
// channels used to return result of sender recovery.
let mut channels = Vec::new();
// Spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel.
@@ -93,38 +91,37 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
for chunk in
&tx_walker.chunks(self.commit_threshold as usize / rayon::current_num_threads())
{
let tx = tx.clone();
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();
channels.push(rx);
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let mut chunk: Vec<_> = chunk.collect();
let chunk: Vec<_> = chunk.collect();
// closure that would recover signer. Used as utility to wrap result
let recover = |entry: Result<(TxNumber, TransactionSigned), reth_db::Error>| -> Result<(u64, H160), Box<StageError>> {
let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?;
let sender = transaction.recover_signer().ok_or(StageError::from(
SenderRecoveryStageError::SenderRecovery { tx: tx_id },
))?;
Ok((tx_id, sender))
};
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered the senders.
rayon::spawn(move || {
chunk
.drain(..)
.map(|entry| {
let (tx_id, transaction) = entry?;
let sender = transaction.recover_signer().ok_or(StageError::from(
SenderRecoveryStageError::SenderRecovery { tx: tx_id },
))?;
Ok((tx_id, sender))
})
.for_each(|result: Result<_, StageError>| {
let _ = tx.send(result);
});
for entry in chunk {
let _ = tx.send(recover(entry));
}
});
}
drop(tx);
// We need sorted results, so we wrap the _unordered_ receiver stream into a sequential
// stream, which yields the results by ascending transaction ID.
let mut recovered_senders =
SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));
while let Some(recovered) = recovered_senders.next().await {
let (id, sender) = recovered?;
senders_cursor.append(id, sender)?;
// Iterate over channels and append the sender in the order that they are received.
for mut channel in channels {
while let Some(recovered) = channel.recv().await {
let (tx_id, sender) = recovered.map_err(|boxed| *boxed)?;
senders_cursor.append(tx_id, sender)?;
}
}
info!(target: "sync::stages::sender_recovery", stage_progress = end_block, done, "Sync iteration finished");

View File

@@ -1,144 +0,0 @@
use futures_util::stream::Stream;
use num_traits::One;
use std::{
cmp::Ordering,
collections::{binary_heap::PeekMut, BinaryHeap},
ops::Add,
pin::Pin,
task::{Context, Poll},
};
/// A Stream type that emits key-value pairs in sequential order of the key.
#[pin_project::pin_project]
#[must_use = "stream does nothing unless polled"]
pub(crate) struct SequentialPairStream<Key: Ord, Value, St> {
/// The next item we expect from the stream
next: Key,
/// buffered entries
pending: BinaryHeap<OrderedItem<Key, Value>>,
#[pin]
stream: St,
done: bool,
}
// === impl SequentialPairStream ===
impl<Key: Ord, Value, St> SequentialPairStream<Key, Value, St> {
/// Returns a new [SequentialPairStream] that emits the items of the given stream in order
/// starting at the given start point.
pub(crate) fn new(start: Key, stream: St) -> Self {
Self { next: start, pending: Default::default(), stream, done: false }
}
}
/// implements Stream for any underlying Stream that returns a result
impl<Key, Value, St, Err> Stream for SequentialPairStream<Key, Value, St>
where
Key: Ord + Copy + Add<Output = Key> + One,
St: Stream<Item = Result<(Key, Value), Err>>,
{
type Item = Result<(Key, Value), Err>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
'outer: loop {
// try to drain buffered items
while let Some(maybe_next) = this.pending.peek_mut() {
match (maybe_next.0).cmp(&*this.next) {
Ordering::Less => {
PeekMut::pop(maybe_next);
continue
}
Ordering::Equal => {
let next = PeekMut::pop(maybe_next);
*this.next = *this.next + Key::one();
return Poll::Ready(Some(Ok(next.into())))
}
Ordering::Greater => {
if *this.done {
let next = PeekMut::pop(maybe_next);
return Poll::Ready(Some(Ok(next.into())))
}
break
}
}
}
if *this.done {
return Poll::Ready(None)
}
loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(item) => match item {
Some(Ok((k, v))) => {
if k == *this.next {
*this.next = *this.next + Key::one();
return Poll::Ready(Some(Ok((k, v))))
}
this.pending.push(OrderedItem(k, v));
}
Some(err @ Err(_)) => return Poll::Ready(Some(err)),
None => {
*this.done = true;
continue 'outer
}
},
}
}
return Poll::Pending
}
}
}
/// The item a [SequentialPairStream] emits
struct OrderedItem<Key, Value>(Key, Value);
impl<Key, Value> From<OrderedItem<Key, Value>> for (Key, Value) {
fn from(value: OrderedItem<Key, Value>) -> Self {
(value.0, value.1)
}
}
impl<Key: Ord, Value> Eq for OrderedItem<Key, Value> {}
impl<Key: Ord, Value> PartialEq for OrderedItem<Key, Value> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
impl<Key: Ord, Value> PartialOrd for OrderedItem<Key, Value> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(other.0.cmp(&self.0))
}
}
impl<Key: Ord, Value> Ord for OrderedItem<Key, Value> {
fn cmp(&self, other: &Self) -> Ordering {
// binary heap is max-heap
other.0.cmp(&self.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::{stream, TryStreamExt};
use rand::{seq::SliceRandom, thread_rng};
#[tokio::test]
async fn test_ordered_stream() {
let values: Vec<_> = (0..10).map(|i| (i, i)).collect();
let mut input = values.clone();
input.shuffle(&mut thread_rng());
let stream = stream::iter(input.into_iter().map(Ok::<_, ()>));
let ordered = SequentialPairStream::new(values[0].0, stream);
let received = ordered.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(received, values);
}
}