mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 07:17:56 -05:00
perf: defer transaction pool notifications until after lock release (#20405)
This commit is contained in:
@@ -485,6 +485,9 @@ where
|
||||
|
||||
/// Add a single validated transaction into the pool.
|
||||
///
|
||||
/// Returns the outcome and optionally metadata to be processed after the pool lock is
|
||||
/// released.
|
||||
///
|
||||
/// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
|
||||
/// come in through that function, either as a batch or `std::iter::once`.
|
||||
fn add_transaction(
|
||||
@@ -492,7 +495,7 @@ where
|
||||
pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
|
||||
origin: TransactionOrigin,
|
||||
tx: TransactionValidationOutcome<T::Transaction>,
|
||||
) -> PoolResult<AddedTransactionOutcome> {
|
||||
) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
|
||||
match tx {
|
||||
TransactionValidationOutcome::Valid {
|
||||
balance,
|
||||
@@ -506,7 +509,7 @@ where
|
||||
let transaction_id = TransactionId::new(sender_id, transaction.nonce());
|
||||
|
||||
// split the valid transaction and the blob sidecar if it has any
|
||||
let (transaction, maybe_sidecar) = match transaction {
|
||||
let (transaction, blob_sidecar) = match transaction {
|
||||
ValidTransaction::Valid(tx) => (tx, None),
|
||||
ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
|
||||
debug_assert!(
|
||||
@@ -526,50 +529,26 @@ where
|
||||
authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
|
||||
};
|
||||
|
||||
let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
|
||||
let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
|
||||
Ok(added) => added,
|
||||
Err(err) => return (Err(err), None),
|
||||
};
|
||||
let hash = *added.hash();
|
||||
let state = added.transaction_state();
|
||||
|
||||
// transaction was successfully inserted into the pool
|
||||
if let Some(sidecar) = maybe_sidecar {
|
||||
// notify blob sidecar listeners
|
||||
self.on_new_blob_sidecar(&hash, &sidecar);
|
||||
// store the sidecar in the blob store
|
||||
self.insert_blob(hash, sidecar);
|
||||
}
|
||||
let meta = AddedTransactionMeta { added, blob_sidecar };
|
||||
|
||||
if let Some(replaced) = added.replaced_blob_transaction() {
|
||||
debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
|
||||
// delete the replaced transaction from the blob store
|
||||
self.delete_blob(replaced);
|
||||
}
|
||||
|
||||
// Notify about new pending transactions
|
||||
if let Some(pending) = added.as_pending() {
|
||||
self.on_new_pending_transaction(pending);
|
||||
}
|
||||
|
||||
// Notify tx event listeners
|
||||
self.notify_event_listeners(&added);
|
||||
|
||||
if let Some(discarded) = added.discarded_transactions() {
|
||||
self.delete_discarded_blobs(discarded.iter());
|
||||
}
|
||||
|
||||
// Notify listeners for _all_ transactions
|
||||
self.on_new_transaction(added.into_new_transaction_event());
|
||||
|
||||
Ok(AddedTransactionOutcome { hash, state })
|
||||
(Ok(AddedTransactionOutcome { hash, state }), Some(meta))
|
||||
}
|
||||
TransactionValidationOutcome::Invalid(tx, err) => {
|
||||
let mut listener = self.event_listener.write();
|
||||
listener.invalid(tx.hash());
|
||||
Err(PoolError::new(*tx.hash(), err))
|
||||
(Err(PoolError::new(*tx.hash(), err)), None)
|
||||
}
|
||||
TransactionValidationOutcome::Error(tx_hash, err) => {
|
||||
let mut listener = self.event_listener.write();
|
||||
listener.discarded(&tx_hash);
|
||||
Err(PoolError::other(tx_hash, err))
|
||||
(Err(PoolError::other(tx_hash, err)), None)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -590,33 +569,46 @@ where
|
||||
}
|
||||
|
||||
/// Adds all transactions in the iterator to the pool, returning a list of results.
|
||||
///
|
||||
/// Note: A large batch may lock the pool for a long time that blocks important operations
|
||||
/// like updating the pool on canonical state changes. The caller should consider having
|
||||
/// a max batch size to balance transaction insertions with other updates.
|
||||
pub fn add_transactions(
|
||||
&self,
|
||||
origin: TransactionOrigin,
|
||||
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
|
||||
) -> Vec<PoolResult<AddedTransactionOutcome>> {
|
||||
// Process all transactions in one write lock, maintaining individual origins
|
||||
let (mut added, discarded) = {
|
||||
// Collect results and metadata while holding the pool write lock
|
||||
let (mut results, added_metas, discarded) = {
|
||||
let mut pool = self.pool.write();
|
||||
let added = transactions
|
||||
let mut added_metas = Vec::new();
|
||||
|
||||
let results = transactions
|
||||
.into_iter()
|
||||
.map(|tx| self.add_transaction(&mut pool, origin, tx))
|
||||
.map(|tx| {
|
||||
let (result, meta) = self.add_transaction(&mut pool, origin, tx);
|
||||
|
||||
// Only collect metadata for successful insertions
|
||||
if result.is_ok() &&
|
||||
let Some(meta) = meta
|
||||
{
|
||||
added_metas.push(meta);
|
||||
}
|
||||
|
||||
result
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Enforce the pool size limits if at least one transaction was added successfully
|
||||
let discarded = if added.iter().any(Result::is_ok) {
|
||||
let discarded = if results.iter().any(Result::is_ok) {
|
||||
pool.discard_worst()
|
||||
} else {
|
||||
Default::default()
|
||||
};
|
||||
|
||||
(added, discarded)
|
||||
(results, added_metas, discarded)
|
||||
};
|
||||
|
||||
for meta in added_metas {
|
||||
self.on_added_transaction(meta);
|
||||
}
|
||||
|
||||
if !discarded.is_empty() {
|
||||
// Delete any blobs associated with discarded blob transactions
|
||||
self.delete_discarded_blobs(discarded.iter());
|
||||
@@ -627,7 +619,7 @@ where
|
||||
|
||||
// A newly added transaction may be immediately discarded, so we need to
|
||||
// adjust the result here
|
||||
for res in &mut added {
|
||||
for res in &mut results {
|
||||
if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
|
||||
discarded_hashes.contains(hash)
|
||||
{
|
||||
@@ -636,7 +628,42 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
added
|
||||
results
|
||||
}
|
||||
|
||||
/// Process a transaction that was added to the pool.
|
||||
///
|
||||
/// Performs blob storage operations and sends all notifications. This should be called
|
||||
/// after the pool write lock has been released to avoid blocking pool operations.
|
||||
fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
|
||||
// Handle blob sidecar storage and notifications for EIP-4844 transactions
|
||||
if let Some(sidecar) = meta.blob_sidecar {
|
||||
let hash = *meta.added.hash();
|
||||
self.on_new_blob_sidecar(&hash, &sidecar);
|
||||
self.insert_blob(hash, sidecar);
|
||||
}
|
||||
|
||||
// Delete replaced blob sidecar if any
|
||||
if let Some(replaced) = meta.added.replaced_blob_transaction() {
|
||||
debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
|
||||
self.delete_blob(replaced);
|
||||
}
|
||||
|
||||
// Delete discarded blob sidecars if any, this doesnt do any IO.
|
||||
if let Some(discarded) = meta.added.discarded_transactions() {
|
||||
self.delete_discarded_blobs(discarded.iter());
|
||||
}
|
||||
|
||||
// Notify pending transaction listeners
|
||||
if let Some(pending) = meta.added.as_pending() {
|
||||
self.on_new_pending_transaction(pending);
|
||||
}
|
||||
|
||||
// Notify event listeners
|
||||
self.notify_event_listeners(&meta.added);
|
||||
|
||||
// Notify new transaction listeners
|
||||
self.on_new_transaction(meta.added.into_new_transaction_event());
|
||||
}
|
||||
|
||||
/// Notify all listeners about a new pending transaction.
|
||||
@@ -1195,6 +1222,18 @@ impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata for a transaction that was added to the pool.
|
||||
///
|
||||
/// This holds all the data needed to complete post-insertion operations (notifications,
|
||||
/// blob storage).
|
||||
#[derive(Debug)]
|
||||
struct AddedTransactionMeta<T: PoolTransaction> {
|
||||
/// The transaction that was added to the pool
|
||||
added: AddedTransaction<T>,
|
||||
/// Optional blob sidecar for EIP-4844 transactions
|
||||
blob_sidecar: Option<BlobTransactionSidecarVariant>,
|
||||
}
|
||||
|
||||
/// Tracks an added transaction and all graph changes caused by adding it.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AddedPendingTransaction<T: PoolTransaction> {
|
||||
|
||||
Reference in New Issue
Block a user