diff --git a/crates/transaction-pool/src/pool/best.rs b/crates/transaction-pool/src/pool/best.rs index 663274e76c..8b7468c410 100644 --- a/crates/transaction-pool/src/pool/best.rs +++ b/crates/transaction-pool/src/pool/best.rs @@ -2,11 +2,13 @@ use crate::{ identifier::TransactionId, pool::pending::PendingTransaction, PoolTransaction, TransactionOrdering, ValidPoolTransaction, }; +use core::fmt; use reth_primitives::B256 as TxHash; use std::{ collections::{BTreeMap, BTreeSet, HashSet}, sync::Arc, }; + use tokio::sync::broadcast::{error::TryRecvError, Receiver}; use tracing::debug; @@ -190,6 +192,70 @@ impl Iterator for BestTransactions { } } +/// A[`BestTransactions`](crate::traits::BestTransactions) implementation that filters the +/// transactions of iter with predicate. +/// +/// Filter out transactions are marked as invalid: +/// [BestTransactions::mark_invalid](crate::traits::BestTransactions::mark_invalid). +pub struct BestTransactionFilter { + pub(crate) best: I, + pub(crate) predicate: P, +} + +impl BestTransactionFilter { + /// Create a new [`BestTransactionFilter`] with the given predicate. + pub(crate) fn new(best: I, predicate: P) -> Self { + Self { best, predicate } + } +} + +impl Iterator for BestTransactionFilter +where + I: crate::traits::BestTransactions, + P: FnMut(&::Item) -> bool, +{ + type Item = ::Item; + + fn next(&mut self) -> Option { + loop { + let best = self.best.next()?; + if (self.predicate)(&best) { + return Some(best); + } else { + self.best.mark_invalid(&best); + } + } + } +} + +impl crate::traits::BestTransactions for BestTransactionFilter +where + I: crate::traits::BestTransactions, + P: FnMut(&::Item) -> bool + Send, +{ + fn mark_invalid(&mut self, tx: &Self::Item) { + crate::traits::BestTransactions::mark_invalid(&mut self.best, tx) + } + + fn no_updates(&mut self) { + self.best.no_updates() + } + + fn skip_blobs(&mut self) { + self.set_skip_blobs(true) + } + + fn set_skip_blobs(&mut self, skip_blobs: bool) { + self.best.set_skip_blobs(skip_blobs) + } +} + +impl fmt::Debug for BestTransactionFilter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BestTransactionFilter").field("best", &self.best).finish() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 0bd0cb33b2..894f0a9b3f 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -96,9 +96,6 @@ use std::{ use tokio::sync::mpsc; use tracing::{debug, trace, warn}; mod events; -pub use events::{FullTransactionEvent, TransactionEvent}; - -mod listener; use crate::{ blobstore::BlobStore, metrics::BlobStoreMetrics, @@ -107,13 +104,16 @@ use crate::{ validate::ValidTransaction, }; use alloy_rlp::Encodable; +pub use best::BestTransactionFilter; +pub use blob::{blob_tx_priority, fee_delta}; +pub use events::{FullTransactionEvent, TransactionEvent}; pub use listener::{AllTransactionsEvents, TransactionEvents}; pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool}; pub use pending::PendingPool; mod best; mod blob; -pub use blob::{blob_tx_priority, fee_delta}; +mod listener; mod parked; pub(crate) mod pending; pub(crate) mod size; diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index b83b792568..2e7518eefe 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1,7 +1,7 @@ use crate::{ blobstore::BlobStoreError, error::PoolResult, - pool::{state::SubPool, TransactionEvents}, + pool::{state::SubPool, BestTransactionFilter, TransactionEvents}, validate::ValidPoolTransaction, AllTransactionsEvents, }; @@ -655,6 +655,20 @@ pub trait BestTransactions: Iterator + Send { /// /// If set to true, no blob transactions will be returned. fn set_skip_blobs(&mut self, skip_blobs: bool); + + /// Creates an iterator which uses a closure to determine if a transaction should be yielded. + /// + /// Given an element the closure must return true or false. The returned iterator will yield + /// only the elements for which the closure returns true. + /// + /// Descendant transactions will be skipped. + fn filter

(self, predicate: P) -> BestTransactionFilter + where + P: FnMut(&Self::Item) -> bool, + Self: Sized, + { + BestTransactionFilter::new(self, predicate) + } } /// A no-op implementation that yields no transactions.