This commit is contained in:
Matthias Seitz
2025-11-19 20:13:58 +01:00
parent e3562e981c
commit 338c9272eb

View File

@@ -70,66 +70,93 @@ enum PreparedTx<'a, Tx> {
Prepared(&'a Tx)
}
/// Drives an underlying underlying iterator and collects the transactions.
///
/// Transactions can be yielded out of order by the iterator.
pub struct CollectExecutableTxIterator<Evm, Iter, Tx, Err>
where Evm: ConfigureEvm,
Iter: Iterator<Item = (usize, Result<WithTxEnv<TxEnvFor<Evm>, Tx>, Err>)>
{
/// The iterator that yields prepared tx in any order
iter: Iter,
/// Prepared transaction that wait for an ancestor to become available from the `iter`.
buffered: BTreeMap<usize, WithTxEnv<TxEnvFor<Evm>, Tx>>,
current_idx: usize,
}
// #[cfg(feature = "rayon")]
mod parallel {
use super::*;
impl<Evm, Iter, Tx, Err> ExecutableTxIterator3<Evm> for CollectExecutableTxIterator<Evm, Iter, Tx, Err>
where
Evm: ConfigureEvm,
Iter: Iterator<Item = (usize, Result<WithTxEnv<TxEnvFor<Evm>, Tx>, Err>)>,
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
{
type Tx = Tx;
type Error = Err;
/// Drives an underlying underlying iterator and collects the transactions.
///
/// Transactions can be yielded out of order by the iterator.
pub struct CollectExecutableTxIterator<Evm, Iter, Tx, Err>
where Evm: ConfigureEvm,
Iter: Iterator<Item = (usize, Result<WithTxEnv<TxEnvFor<Evm>, Tx>, Err>)>
{
/// The iterator that yields prepared tx in any order
iter: Iter,
/// Prepared transaction that wait for an ancestor to become available from the `iter`.
buffered: BTreeMap<usize, WithTxEnv<TxEnvFor<Evm>, Tx>>,
current_idx: usize,
}
fn next(&mut self) -> Option<Result<PreparedTx<WithTxEnv<TxEnvFor<Evm>, Self::Tx>>, Self::Error>> {
// First check if the next transaction in sequence is already buffered
if let Some(tx) = self.buffered.remove(&self.current_idx) {
self.current_idx += 1;
return Some(Ok(PreparedTx::Next(tx)));
}
impl<Evm, Iter, Tx, Err> ExecutableTxIterator3<Evm> for CollectExecutableTxIterator<Evm, Iter, Tx, Err>
where
Evm: ConfigureEvm,
Iter: Iterator<Item = (usize, Result<WithTxEnv<TxEnvFor<Evm>, Tx>, Err>)>,
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
{
type Tx = Tx;
type Error = Err;
// Pull next item from iterator
let (idx, result) = self.iter.next()?;
fn next<'a>(&'a mut self) -> Option<Result<PreparedTx<'a, WithTxEnv<TxEnvFor<Evm>, Self::Tx>>, Self::Error>> {
// First check if the next transaction in sequence is already buffered
if let Some(tx) = self.buffered.remove(&self.current_idx) {
self.current_idx += 1;
return Some(Ok(PreparedTx::Next(tx)));
}
// Pull next item from iterator
let (idx, result) = self.iter.next()?;
match result {
Err(err) => Some(Err(err)),
Ok(tx) => {
if idx == self.current_idx {
// This is the next transaction in sequence
self.current_idx += 1;
Some(Ok(PreparedTx::Next(tx)))
} else {
// This transaction is out of order, buffer it and return as Prepared
self.buffered.insert(idx, tx);
// Return a reference to the buffered transaction
// SAFETY: we just inserted at this index
let tx_ref = self.buffered.get(&idx).unwrap();
Some(Ok(PreparedTx::Prepared(tx_ref)))
}
match result {
Err(err) => Some(Err(err)),
Ok(tx) => {
if idx == self.current_idx {
// This is the next transaction in sequence
self.current_idx += 1;
Some(Ok(PreparedTx::Next(tx)))
} else {
// This transaction is out of order, buffer it and return as Prepared
self.buffered.insert(idx, tx);
// Return a reference to the buffered transaction
// SAFETY: we just inserted at this index
let tx_ref = self.buffered.get(&idx).unwrap();
Some(Ok(PreparedTx::Prepared(tx_ref)))
}
}
}
}
fn total_tx_count(&self) -> usize {
todo!()
}
}
// #[cfg(feature = "rayon")]
mod parallel {
use rayon::prelude::IntoParallelIterator;
use crate::{ConfigureEvm, TxEnvFor};
use crate::execute::{ExecutableTxFor, WithTxEnv};
pub struct RayonTxIter<Evm, Iter, Tx, Err>
where Evm: ConfigureEvm,
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
{
from_tx: std::sync::mpsc::Receiver< (usize, Result<WithTxEnv<TxEnvFor<Evm>, Tx>, Err>)>
}
impl<Evm,Iter, Tx, Err> RayonTxIter<Evm, Iter, Tx, Err> {
fn spawn<I>(iter: I) -> Self
where I: IntoParallelIterator<Item = (usize, Result<WithTxEnv<TxEnvFor<Evm>, Tx>, Err>)>
{
let (from_tx, rx) = std::sync::mpsc::channel();
rayon::spawn(move || {
})
fn total_tx_count(&self) -> usize {
todo!()
}
}
}