diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 3b2bce7f74..27c63ed8ac 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -96,7 +96,15 @@ use reth_execution_types::ChangedAccount; use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718}; use reth_primitives_traits::Recovered; use rustc_hash::FxHashMap; -use std::{collections::HashSet, fmt, sync::Arc, time::Instant}; +use std::{ + collections::HashSet, + fmt, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Instant, +}; use tokio::sync::mpsc; use tracing::{debug, trace, warn}; mod events; @@ -144,6 +152,8 @@ where config: PoolConfig, /// Manages listeners for transaction state change events. event_listener: RwLock>, + /// Tracks whether any event listeners have ever been installed. + has_event_listeners: AtomicBool, /// Listeners for new _full_ pending transactions. pending_transaction_listener: RwLock>, /// Listeners for new transactions added to the pool. @@ -168,6 +178,7 @@ where identifiers: Default::default(), validator, event_listener: Default::default(), + has_event_listeners: AtomicBool::new(false), pool: RwLock::new(TxPool::new(ordering, config.clone())), pending_transaction_listener: Default::default(), transaction_listener: Default::default(), @@ -279,14 +290,53 @@ where /// If the pool contains the transaction, this adds a new listener that gets notified about /// transaction events. pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option { - self.get_pool_data() - .contains(&tx_hash) - .then(|| self.event_listener.write().subscribe(tx_hash)) + if !self.get_pool_data().contains(&tx_hash) { + return None + } + let mut listener = self.event_listener.write(); + let events = listener.subscribe(tx_hash); + self.mark_event_listener_installed(); + Some(events) } /// Adds a listener for all transaction events. pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents { - self.event_listener.write().subscribe_all() + let mut listener = self.event_listener.write(); + let events = listener.subscribe_all(); + self.mark_event_listener_installed(); + events + } + + #[inline] + fn has_event_listeners(&self) -> bool { + self.has_event_listeners.load(Ordering::Relaxed) + } + + #[inline] + fn mark_event_listener_installed(&self) { + self.has_event_listeners.store(true, Ordering::Relaxed); + } + + #[inline] + fn update_event_listener_state(&self, listener: &PoolEventBroadcast) { + if listener.is_empty() { + self.has_event_listeners.store(false, Ordering::Relaxed); + } + } + + #[inline] + fn with_event_listener(&self, emit: F) + where + F: FnOnce(&mut PoolEventBroadcast), + { + if !self.has_event_listeners() { + return + } + let mut listener = self.event_listener.write(); + if !listener.is_empty() { + emit(&mut listener); + } + self.update_event_listener_state(&listener); } /// Returns a read lock to the pool's data. @@ -554,13 +604,11 @@ where (Ok(AddedTransactionOutcome { hash, state }), Some(meta)) } TransactionValidationOutcome::Invalid(tx, err) => { - let mut listener = self.event_listener.write(); - listener.invalid(tx.hash()); + self.with_event_listener(|listener| listener.invalid(tx.hash())); (Err(PoolError::new(*tx.hash(), err)), None) } TransactionValidationOutcome::Error(tx_hash, err) => { - let mut listener = self.event_listener.write(); - listener.discarded(&tx_hash); + self.with_event_listener(|listener| listener.discarded(&tx_hash)); (Err(PoolError::other(tx_hash, err)), None) } } @@ -574,7 +622,9 @@ where ) -> PoolResult { let listener = { let mut listener = self.event_listener.write(); - listener.subscribe(tx.tx_hash()) + let events = listener.subscribe(tx.tx_hash()); + self.mark_event_listener_installed(); + events }; let mut results = self.add_transactions(origin, std::iter::once(tx)); results.pop().expect("result length is the same as the input")?; @@ -625,7 +675,7 @@ where if !discarded.is_empty() { // Delete any blobs associated with discarded blob transactions self.delete_discarded_blobs(discarded.iter()); - self.event_listener.write().discarded_many(&discarded); + self.with_event_listener(|listener| listener.discarded_many(&discarded)); let discarded_hashes = discarded.into_iter().map(|tx| *tx.hash()).collect::>(); @@ -803,9 +853,7 @@ where let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome; // broadcast specific transaction events - let mut listener = self.event_listener.write(); - - if !listener.is_empty() { + self.with_event_listener(|listener| { for tx in &mined { listener.mined(tx, block_hash); } @@ -815,7 +863,7 @@ where for tx in &discarded { listener.discarded(tx.hash()); } - } + }) } /// Notifies all listeners about the transaction movements. @@ -876,17 +924,14 @@ where } } - { - let mut listener = self.event_listener.write(); - if !listener.is_empty() { - for tx in &promoted { - listener.pending(tx.hash(), None); - } - for tx in &discarded { - listener.discarded(tx.hash()); - } + self.with_event_listener(|listener| { + for tx in &promoted { + listener.pending(tx.hash(), None); } - } + for tx in &discarded { + listener.discarded(tx.hash()); + } + }); if !discarded.is_empty() { // This deletes outdated blob txs from the blob store, based on the account's nonce. @@ -904,13 +949,7 @@ where /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool). pub fn notify_event_listeners(&self, tx: &AddedTransaction) { - let mut listener = self.event_listener.write(); - if listener.is_empty() { - // nothing to notify - return - } - - match tx { + self.with_event_listener(|listener| match tx { AddedTransaction::Pending(tx) => { let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx; @@ -928,7 +967,7 @@ where listener.replaced(replaced.clone(), *transaction.hash()); } } - } + }); } /// Returns an iterator that yields transactions that are ready to be included in the block. @@ -991,7 +1030,7 @@ where } let removed = self.pool.write().remove_transactions(hashes); - self.event_listener.write().discarded_many(&removed); + self.with_event_listener(|listener| listener.discarded_many(&removed)); removed } @@ -1007,11 +1046,11 @@ where } let removed = self.pool.write().remove_transactions_and_descendants(hashes); - let mut listener = self.event_listener.write(); - - for tx in &removed { - listener.discarded(tx.hash()); - } + self.with_event_listener(|listener| { + for tx in &removed { + listener.discarded(tx.hash()); + } + }); removed } @@ -1024,7 +1063,7 @@ where let sender_id = self.get_sender_id(sender); let removed = self.pool.write().remove_transactions_by_sender(sender_id); - self.event_listener.write().discarded_many(&removed); + self.with_event_listener(|listener| listener.discarded_many(&removed)); removed } @@ -1160,11 +1199,9 @@ where if txs.0.is_empty() { return } - let mut listener = self.event_listener.write(); - - if !listener.is_empty() { + self.with_event_listener(|listener| { txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers)); - } + }); } /// Number of transactions in the entire pool