diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 46a744cfb8..8689dcc377 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -156,7 +156,7 @@ pub use crate::{ }, error::PoolResult, ordering::{GasCostOrdering, TransactionOrdering}, - pool::{AllTransactionsEvents, PoolTransactionEvent, TransactionEvent, TransactionEvents}, + pool::{AllTransactionsEvents, FullTransactionEvent, TransactionEvent, TransactionEvents}, traits::{ AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount, NewTransactionEvent, PoolSize, PoolTransaction, PooledTransaction, PropagateKind, @@ -354,7 +354,7 @@ where self.pool.add_transaction_event_listener(tx_hash) } - fn all_transactions_event_listener(&self) -> AllTransactionsEvents { + fn all_transactions_event_listener(&self) -> AllTransactionsEvents { self.pool.add_all_transactions_event_listener() } diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index 50be4a2bcf..2fec5391c1 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -73,7 +73,7 @@ impl TransactionPool for NoopTransactionPool { None } - fn all_transactions_event_listener(&self) -> AllTransactionsEvents { + fn all_transactions_event_listener(&self) -> AllTransactionsEvents { AllTransactionsEvents { events: mpsc::channel(1).1 } } diff --git a/crates/transaction-pool/src/pool/events.rs b/crates/transaction-pool/src/pool/events.rs index 62da13392e..03856aea27 100644 --- a/crates/transaction-pool/src/pool/events.rs +++ b/crates/transaction-pool/src/pool/events.rs @@ -1,33 +1,49 @@ -use crate::traits::PropagateKind; +use crate::{traits::PropagateKind, PoolTransaction, ValidPoolTransaction}; use reth_primitives::{TxHash, H256}; use std::sync::Arc; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -/// Wrapper around a transaction hash and the event that happened to it. +/// An event that happened to a transaction and contains its full body where possible. #[derive(Debug)] -pub struct PoolTransactionEvent(TxHash, TransactionEvent); +pub enum FullTransactionEvent { + /// Transaction has been added to the pending pool. + Pending(TxHash), + /// Transaction has been added to the queued pool. + Queued(TxHash), + /// Transaction has been included in the block belonging to this hash. + Mined { + /// The hash of the mined transaction. + tx_hash: TxHash, + /// The hash of the mined block that contains the transaction. + block_hash: H256, + }, + /// Transaction has been replaced by the transaction belonging to the hash. + /// + /// E.g. same (sender + nonce) pair + Replaced { + /// The transaction that was replaced. + transaction: Arc>, + /// The transaction that replaced the event subject. + replaced_by: TxHash, + }, + /// Transaction was dropped due to configured limits. + Discarded(TxHash), + /// Transaction became invalid indefinitely. + Invalid(TxHash), + /// Transaction was propagated to peers. + Propagated(Arc>), +} -impl PoolTransactionEvent { - /// Create a new transaction event. - pub fn new(hash: TxHash, event: TransactionEvent) -> Self { - Self(hash, event) - } - - /// The hash of the transaction this event is about. - pub fn hash(&self) -> TxHash { - self.0 - } - - /// The event that happened to the transaction. - pub fn event(&self) -> &TransactionEvent { - &self.1 - } - - /// Split the event into its components. - pub fn split(self) -> (TxHash, TransactionEvent) { - (self.0, self.1) +impl Clone for FullTransactionEvent { + fn clone(&self) -> Self { + match self { + Self::Replaced { transaction, replaced_by } => { + Self::Replaced { transaction: Arc::clone(transaction), replaced_by: *replaced_by } + } + other => other.clone(), + } } } diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index 994c1fac5c..c64fa71f5e 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -1,8 +1,9 @@ //! Listeners for the transaction-pool use crate::{ - pool::events::{PoolTransactionEvent, TransactionEvent}, + pool::events::{FullTransactionEvent, TransactionEvent}, traits::PropagateKind, + PoolTransaction, ValidPoolTransaction, }; use futures_util::Stream; use reth_primitives::{TxHash, H256}; @@ -24,7 +25,7 @@ const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024; #[must_use = "streams do nothing unless polled"] pub struct TransactionEvents { hash: TxHash, - events: UnboundedReceiver, + events: UnboundedReceiver, } impl TransactionEvents { @@ -35,7 +36,7 @@ impl TransactionEvents { } impl Stream for TransactionEvents { - type Item = PoolTransactionEvent; + type Item = TransactionEvent; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -45,15 +46,15 @@ impl Stream for TransactionEvents { } } -/// A Stream that receives [PoolTransactionEvent] for _all_ transaction. +/// A Stream that receives [FullTransactionEvent] for _all_ transaction. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] -pub struct AllTransactionsEvents { - pub(crate) events: Receiver, +pub struct AllTransactionsEvents { + pub(crate) events: Receiver>, } -impl Stream for AllTransactionsEvents { - type Item = PoolTransactionEvent; +impl Stream for AllTransactionsEvents { + type Item = FullTransactionEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().events.poll_recv(cx) @@ -64,20 +65,34 @@ impl Stream for AllTransactionsEvents { /// /// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to /// all active receivers. -#[derive(Default, Debug)] -pub(crate) struct PoolEventBroadcast { +#[derive(Debug)] +pub(crate) struct PoolEventBroadcast { /// All listeners for all transaction events. - all_events_broadcaster: AllPoolEventsBroadcaster, + all_events_broadcaster: AllPoolEventsBroadcaster, /// All listeners for events for a certain transaction hash. broadcasters_by_hash: HashMap, } -impl PoolEventBroadcast { +impl Default for PoolEventBroadcast { + fn default() -> Self { + Self { + all_events_broadcaster: AllPoolEventsBroadcaster::default(), + broadcasters_by_hash: HashMap::default(), + } + } +} + +impl PoolEventBroadcast { /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash. - fn broadcast_event(&mut self, hash: &TxHash, event: TransactionEvent) { + fn broadcast_event( + &mut self, + hash: &TxHash, + event: TransactionEvent, + pool_event: FullTransactionEvent, + ) { // Broadcast to all listeners for the transaction hash. if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) { - sink.get_mut().broadcast(*hash, event.clone()); + sink.get_mut().broadcast(event.clone()); if sink.get().is_empty() || event.is_final() { sink.remove(); @@ -85,7 +100,7 @@ impl PoolEventBroadcast { } // Broadcast to all listeners for all transactions. - self.all_events_broadcaster.broadcast(*hash, event); + self.all_events_broadcaster.broadcast(pool_event); } /// Create a new subscription for the given transaction hash. @@ -104,65 +119,83 @@ impl PoolEventBroadcast { } /// Create a new subscription for all transactions. - pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents { + pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents { let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE); self.all_events_broadcaster.senders.push(tx); AllTransactionsEvents { events: rx } } /// Notify listeners about a transaction that was added to the pending queue. - pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) { - self.broadcast_event(tx, TransactionEvent::Pending); + pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option>>) { + self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx)); if let Some(replaced) = replaced { // notify listeners that this transaction was replaced - self.replaced(replaced, tx); + self.replaced(replaced, *tx); } } /// Notify listeners about a transaction that was replaced. - pub(crate) fn replaced(&mut self, tx: &TxHash, replaced_by: &TxHash) { - self.broadcast_event(tx, TransactionEvent::Replaced(*replaced_by)); + pub(crate) fn replaced(&mut self, tx: Arc>, replaced_by: TxHash) { + let transaction = Arc::clone(&tx); + self.broadcast_event( + tx.hash(), + TransactionEvent::Replaced(replaced_by), + FullTransactionEvent::Replaced { transaction, replaced_by }, + ); } /// Notify listeners about a transaction that was added to the queued pool. pub(crate) fn queued(&mut self, tx: &TxHash) { - self.broadcast_event(tx, TransactionEvent::Queued); + self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx)); } /// Notify listeners about a transaction that was propagated. pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec) { - self.broadcast_event(tx, TransactionEvent::Propagated(Arc::new(peers))); + let peers = Arc::new(peers); + self.broadcast_event( + tx, + TransactionEvent::Propagated(Arc::clone(&peers)), + FullTransactionEvent::Propagated(peers), + ); } /// Notify listeners about a transaction that was discarded. pub(crate) fn discarded(&mut self, tx: &TxHash) { - self.broadcast_event(tx, TransactionEvent::Discarded); + self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx)); } /// Notify listeners that the transaction was mined pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: H256) { - self.broadcast_event(tx, TransactionEvent::Mined(block_hash)); + self.broadcast_event( + tx, + TransactionEvent::Mined(block_hash), + FullTransactionEvent::Mined { tx_hash: *tx, block_hash }, + ); } } /// All Sender half(s) of the event channels for all transactions. /// /// This mimics [tokio::sync::broadcast] but uses separate channels. -#[derive(Default, Debug)] -struct AllPoolEventsBroadcaster { +#[derive(Debug)] +struct AllPoolEventsBroadcaster { /// Corresponding sender half(s) for event listener channel - senders: Vec>, + senders: Vec>>, } -impl AllPoolEventsBroadcaster { +impl Default for AllPoolEventsBroadcaster { + fn default() -> Self { + Self { senders: Vec::new() } + } +} + +impl AllPoolEventsBroadcaster { // Broadcast an event to all listeners. Dropped listeners are silently evicted. - fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) { - self.senders.retain(|sender| { - match sender.try_send(PoolTransactionEvent::new(tx_hash, event.clone())) { - Ok(_) | Err(TrySendError::Full(_)) => true, - Err(TrySendError::Closed(_)) => false, - } + fn broadcast(&mut self, event: FullTransactionEvent) { + self.senders.retain(|sender| match sender.try_send(event.clone()) { + Ok(_) | Err(TrySendError::Full(_)) => true, + Err(TrySendError::Closed(_)) => false, }) } } @@ -173,7 +206,7 @@ impl AllPoolEventsBroadcaster { #[derive(Default, Debug)] struct PoolEventBroadcaster { /// Corresponding sender half(s) for event listener channel - senders: Vec>, + senders: Vec>, } impl PoolEventBroadcaster { @@ -183,8 +216,7 @@ impl PoolEventBroadcaster { } // Broadcast an event to all listeners. Dropped listeners are silently evicted. - fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) { - self.senders - .retain(|sender| sender.send(PoolTransactionEvent::new(tx_hash, event.clone())).is_ok()) + fn broadcast(&mut self, event: TransactionEvent) { + self.senders.retain(|sender| sender.send(event.clone()).is_ok()) } } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 23590d7416..6b16c26470 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -93,7 +93,7 @@ use tokio::sync::mpsc; use tracing::debug; mod events; -pub use events::{PoolTransactionEvent, TransactionEvent}; +pub use events::{FullTransactionEvent, TransactionEvent}; mod listener; pub use listener::{AllTransactionsEvents, TransactionEvents}; @@ -117,7 +117,7 @@ pub struct PoolInner { /// Pool settings. config: PoolConfig, /// Manages listeners for transaction state change events. - event_listener: RwLock, + event_listener: RwLock>, /// Listeners for new ready transactions. pending_transaction_listener: Mutex>>, /// Listeners for new transactions added to the pool. @@ -223,7 +223,9 @@ where } /// Adds a listener for all transaction events. - pub(crate) fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents { + pub(crate) fn add_all_transactions_event_listener( + &self, + ) -> AllTransactionsEvents { self.event_listener.write().subscribe_all() } @@ -417,14 +419,14 @@ where AddedTransaction::Pending(tx) => { let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx; - listener.pending(transaction.hash(), replaced.as_ref().map(|tx| tx.hash())); + listener.pending(transaction.hash(), replaced.clone()); promoted.iter().for_each(|tx| listener.pending(tx, None)); discarded.iter().for_each(|tx| listener.discarded(tx)); } AddedTransaction::Parked { transaction, replaced, .. } => { listener.queued(transaction.hash()); if let Some(replaced) = replaced { - listener.replaced(replaced.hash(), transaction.hash()); + listener.replaced(replaced.clone(), *transaction.hash()); } } } diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 6e5ebe197d..afbfb47a34 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -96,7 +96,7 @@ pub trait TransactionPool: Send + Sync + Clone { fn transaction_event_listener(&self, tx_hash: TxHash) -> Option; /// Returns a new transaction change event stream for _all_ transactions in the pool. - fn all_transactions_event_listener(&self) -> AllTransactionsEvents; + fn all_transactions_event_listener(&self) -> AllTransactionsEvents; /// Returns a new Stream that yields transactions hashes for new ready transactions. ///