feat: avoid mutex locking (#20678)

This commit is contained in:
Karl Yu
2025-12-30 17:28:40 +08:00
committed by GitHub
parent 5053322711
commit 16f75bb0c3

View File

@@ -96,7 +96,15 @@ use reth_execution_types::ChangedAccount;
use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
use reth_primitives_traits::Recovered;
use rustc_hash::FxHashMap;
use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
use std::{
collections::HashSet,
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Instant,
};
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
mod events;
@@ -144,6 +152,8 @@ where
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Tracks whether any event listeners have ever been installed.
has_event_listeners: AtomicBool,
/// Listeners for new _full_ pending transactions.
pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
/// Listeners for new transactions added to the pool.
@@ -168,6 +178,7 @@ where
identifiers: Default::default(),
validator,
event_listener: Default::default(),
has_event_listeners: AtomicBool::new(false),
pool: RwLock::new(TxPool::new(ordering, config.clone())),
pending_transaction_listener: Default::default(),
transaction_listener: Default::default(),
@@ -279,14 +290,53 @@ where
/// If the pool contains the transaction, this adds a new listener that gets notified about
/// transaction events.
pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
self.get_pool_data()
.contains(&tx_hash)
.then(|| self.event_listener.write().subscribe(tx_hash))
if !self.get_pool_data().contains(&tx_hash) {
return None
}
let mut listener = self.event_listener.write();
let events = listener.subscribe(tx_hash);
self.mark_event_listener_installed();
Some(events)
}
/// Adds a listener for all transaction events.
pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
self.event_listener.write().subscribe_all()
let mut listener = self.event_listener.write();
let events = listener.subscribe_all();
self.mark_event_listener_installed();
events
}
#[inline]
fn has_event_listeners(&self) -> bool {
self.has_event_listeners.load(Ordering::Relaxed)
}
#[inline]
fn mark_event_listener_installed(&self) {
self.has_event_listeners.store(true, Ordering::Relaxed);
}
#[inline]
fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
if listener.is_empty() {
self.has_event_listeners.store(false, Ordering::Relaxed);
}
}
#[inline]
fn with_event_listener<F>(&self, emit: F)
where
F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
{
if !self.has_event_listeners() {
return
}
let mut listener = self.event_listener.write();
if !listener.is_empty() {
emit(&mut listener);
}
self.update_event_listener_state(&listener);
}
/// Returns a read lock to the pool's data.
@@ -554,13 +604,11 @@ where
(Ok(AddedTransactionOutcome { hash, state }), Some(meta))
}
TransactionValidationOutcome::Invalid(tx, err) => {
let mut listener = self.event_listener.write();
listener.invalid(tx.hash());
self.with_event_listener(|listener| listener.invalid(tx.hash()));
(Err(PoolError::new(*tx.hash(), err)), None)
}
TransactionValidationOutcome::Error(tx_hash, err) => {
let mut listener = self.event_listener.write();
listener.discarded(&tx_hash);
self.with_event_listener(|listener| listener.discarded(&tx_hash));
(Err(PoolError::other(tx_hash, err)), None)
}
}
@@ -574,7 +622,9 @@ where
) -> PoolResult<TransactionEvents> {
let listener = {
let mut listener = self.event_listener.write();
listener.subscribe(tx.tx_hash())
let events = listener.subscribe(tx.tx_hash());
self.mark_event_listener_installed();
events
};
let mut results = self.add_transactions(origin, std::iter::once(tx));
results.pop().expect("result length is the same as the input")?;
@@ -625,7 +675,7 @@ where
if !discarded.is_empty() {
// Delete any blobs associated with discarded blob transactions
self.delete_discarded_blobs(discarded.iter());
self.event_listener.write().discarded_many(&discarded);
self.with_event_listener(|listener| listener.discarded_many(&discarded));
let discarded_hashes =
discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
@@ -803,9 +853,7 @@ where
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
// broadcast specific transaction events
let mut listener = self.event_listener.write();
if !listener.is_empty() {
self.with_event_listener(|listener| {
for tx in &mined {
listener.mined(tx, block_hash);
}
@@ -815,7 +863,7 @@ where
for tx in &discarded {
listener.discarded(tx.hash());
}
}
})
}
/// Notifies all listeners about the transaction movements.
@@ -876,17 +924,14 @@ where
}
}
{
let mut listener = self.event_listener.write();
if !listener.is_empty() {
for tx in &promoted {
listener.pending(tx.hash(), None);
}
for tx in &discarded {
listener.discarded(tx.hash());
}
self.with_event_listener(|listener| {
for tx in &promoted {
listener.pending(tx.hash(), None);
}
}
for tx in &discarded {
listener.discarded(tx.hash());
}
});
if !discarded.is_empty() {
// This deletes outdated blob txs from the blob store, based on the account's nonce.
@@ -904,13 +949,7 @@ where
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
/// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
let mut listener = self.event_listener.write();
if listener.is_empty() {
// nothing to notify
return
}
match tx {
self.with_event_listener(|listener| match tx {
AddedTransaction::Pending(tx) => {
let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
@@ -928,7 +967,7 @@ where
listener.replaced(replaced.clone(), *transaction.hash());
}
}
}
});
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
@@ -991,7 +1030,7 @@ where
}
let removed = self.pool.write().remove_transactions(hashes);
self.event_listener.write().discarded_many(&removed);
self.with_event_listener(|listener| listener.discarded_many(&removed));
removed
}
@@ -1007,11 +1046,11 @@ where
}
let removed = self.pool.write().remove_transactions_and_descendants(hashes);
let mut listener = self.event_listener.write();
for tx in &removed {
listener.discarded(tx.hash());
}
self.with_event_listener(|listener| {
for tx in &removed {
listener.discarded(tx.hash());
}
});
removed
}
@@ -1024,7 +1063,7 @@ where
let sender_id = self.get_sender_id(sender);
let removed = self.pool.write().remove_transactions_by_sender(sender_id);
self.event_listener.write().discarded_many(&removed);
self.with_event_listener(|listener| listener.discarded_many(&removed));
removed
}
@@ -1160,11 +1199,9 @@ where
if txs.0.is_empty() {
return
}
let mut listener = self.event_listener.write();
if !listener.is_empty() {
self.with_event_listener(|listener| {
txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
}
});
}
/// Number of transactions in the entire pool