perf: use RwLock for transaction pool listeners (#20398)

Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
This commit is contained in:
Matthias Seitz
2025-12-15 22:47:59 +01:00
committed by GitHub
parent b5e7a694d2
commit 179da26305

View File

@@ -145,9 +145,9 @@ where
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Listeners for new _full_ pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
/// Listener for new blob transaction sidecars added to the pool.
blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
/// Metrics for the blob store
@@ -243,7 +243,12 @@ where
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
let listener = PendingTransactionHashListener { sender, kind };
self.pending_transaction_listener.lock().push(listener);
let mut listeners = self.pending_transaction_listener.write();
// Clean up dead listeners before adding new one
listeners.retain(|l| !l.sender.is_closed());
listeners.push(listener);
rx
}
@@ -254,7 +259,12 @@ where
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
let listener = TransactionListener { sender, kind };
self.transaction_listener.lock().push(listener);
let mut listeners = self.transaction_listener.write();
// Clean up dead listeners before adding new one
listeners.retain(|l| !l.sender.is_closed());
listeners.push(listener);
rx
}
/// Adds a new blob sidecar listener to the pool that gets notified about every new
@@ -638,11 +648,23 @@ where
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
/// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
// broadcast all pending transactions to the listener
listener.send_all(pending.pending_transactions(listener.kind))
});
let mut needs_cleanup = false;
{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(pending.pending_transactions(listener.kind)) {
needs_cleanup = true;
}
}
}
// Clean up dead listeners if we detected any closed channels
if needs_cleanup {
self.pending_transaction_listener
.write()
.retain(|listener| !listener.sender.is_closed());
}
}
/// Notify all listeners about a newly inserted pending transaction.
@@ -654,16 +676,29 @@ where
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
/// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
let mut transaction_listeners = self.transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
if listener.kind.is_propagate_only() && !event.transaction.propagate {
// only emit this hash to listeners that are only allowed to receive propagate only
// transactions, such as network
return !listener.sender.is_closed()
}
let mut needs_cleanup = false;
listener.send(event.clone())
});
{
let listeners = self.transaction_listener.read();
for listener in listeners.iter() {
if listener.kind.is_propagate_only() && !event.transaction.propagate {
if listener.sender.is_closed() {
needs_cleanup = true;
}
// Skip non-propagate transactions for propagate-only listeners
continue
}
if !listener.send(event.clone()) {
needs_cleanup = true;
}
}
}
// Clean up dead listeners if we detected any closed channels
if needs_cleanup {
self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
}
}
/// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
@@ -697,16 +732,33 @@ where
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
// notify about promoted pending transactions
// emit hashes
self.pending_transaction_listener
.lock()
.retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
// notify about promoted pending transactions - emit hashes
let mut needs_pending_cleanup = false;
{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(outcome.pending_transactions(listener.kind)) {
needs_pending_cleanup = true;
}
}
}
if needs_pending_cleanup {
self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
}
// emit full transactions
self.transaction_listener.lock().retain_mut(|listener| {
listener.send_all(outcome.full_pending_transactions(listener.kind))
});
let mut needs_tx_cleanup = false;
{
let listeners = self.transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
needs_tx_cleanup = true;
}
}
}
if needs_tx_cleanup {
self.transaction_listener.write().retain(|l| !l.sender.is_closed());
}
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
@@ -742,28 +794,46 @@ where
) {
// Notify about promoted pending transactions (similar to notify_on_new_state)
if !promoted.is_empty() {
self.pending_transaction_listener.lock().retain_mut(|listener| {
let promoted_hashes = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(*tx.hash())
let mut needs_pending_cleanup = false;
{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
let promoted_hashes = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(*tx.hash())
}
});
if !listener.send_all(promoted_hashes) {
needs_pending_cleanup = true;
}
});
listener.send_all(promoted_hashes)
});
}
}
if needs_pending_cleanup {
self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
}
// in this case we should also emit promoted transactions in full
self.transaction_listener.lock().retain_mut(|listener| {
let promoted_txs = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(NewTransactionEvent::pending(tx.clone()))
let mut needs_tx_cleanup = false;
{
let listeners = self.transaction_listener.read();
for listener in listeners.iter() {
let promoted_txs = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(NewTransactionEvent::pending(tx.clone()))
}
});
if !listener.send_all(promoted_txs) {
needs_tx_cleanup = true;
}
});
listener.send_all(promoted_txs)
});
}
}
if needs_tx_cleanup {
self.transaction_listener.write().retain(|l| !l.sender.is_closed());
}
}
{