diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 19c9ee52b2..4d7e450424 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -485,6 +485,9 @@ where /// Add a single validated transaction into the pool. /// + /// Returns the outcome and optionally metadata to be processed after the pool lock is + /// released. + /// /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s) /// come in through that function, either as a batch or `std::iter::once`. fn add_transaction( @@ -492,7 +495,7 @@ where pool: &mut RwLockWriteGuard<'_, TxPool>, origin: TransactionOrigin, tx: TransactionValidationOutcome, - ) -> PoolResult { + ) -> (PoolResult, Option>) { match tx { TransactionValidationOutcome::Valid { balance, @@ -506,7 +509,7 @@ where let transaction_id = TransactionId::new(sender_id, transaction.nonce()); // split the valid transaction and the blob sidecar if it has any - let (transaction, maybe_sidecar) = match transaction { + let (transaction, blob_sidecar) = match transaction { ValidTransaction::Valid(tx) => (tx, None), ValidTransaction::ValidWithSidecar { transaction, sidecar } => { debug_assert!( @@ -526,50 +529,26 @@ where authority_ids: authorities.map(|auths| self.get_sender_ids(auths)), }; - let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?; + let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) { + Ok(added) => added, + Err(err) => return (Err(err), None), + }; let hash = *added.hash(); let state = added.transaction_state(); - // transaction was successfully inserted into the pool - if let Some(sidecar) = maybe_sidecar { - // notify blob sidecar listeners - self.on_new_blob_sidecar(&hash, &sidecar); - // store the sidecar in the blob store - self.insert_blob(hash, sidecar); - } + let meta = AddedTransactionMeta { added, blob_sidecar }; - if let Some(replaced) = added.replaced_blob_transaction() { - debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced); - // delete the replaced transaction from the blob store - self.delete_blob(replaced); - } - - // Notify about new pending transactions - if let Some(pending) = added.as_pending() { - self.on_new_pending_transaction(pending); - } - - // Notify tx event listeners - self.notify_event_listeners(&added); - - if let Some(discarded) = added.discarded_transactions() { - self.delete_discarded_blobs(discarded.iter()); - } - - // Notify listeners for _all_ transactions - self.on_new_transaction(added.into_new_transaction_event()); - - Ok(AddedTransactionOutcome { hash, state }) + (Ok(AddedTransactionOutcome { hash, state }), Some(meta)) } TransactionValidationOutcome::Invalid(tx, err) => { let mut listener = self.event_listener.write(); listener.invalid(tx.hash()); - Err(PoolError::new(*tx.hash(), err)) + (Err(PoolError::new(*tx.hash(), err)), None) } TransactionValidationOutcome::Error(tx_hash, err) => { let mut listener = self.event_listener.write(); listener.discarded(&tx_hash); - Err(PoolError::other(tx_hash, err)) + (Err(PoolError::other(tx_hash, err)), None) } } } @@ -590,33 +569,46 @@ where } /// Adds all transactions in the iterator to the pool, returning a list of results. - /// - /// Note: A large batch may lock the pool for a long time that blocks important operations - /// like updating the pool on canonical state changes. The caller should consider having - /// a max batch size to balance transaction insertions with other updates. pub fn add_transactions( &self, origin: TransactionOrigin, transactions: impl IntoIterator>, ) -> Vec> { - // Process all transactions in one write lock, maintaining individual origins - let (mut added, discarded) = { + // Collect results and metadata while holding the pool write lock + let (mut results, added_metas, discarded) = { let mut pool = self.pool.write(); - let added = transactions + let mut added_metas = Vec::new(); + + let results = transactions .into_iter() - .map(|tx| self.add_transaction(&mut pool, origin, tx)) + .map(|tx| { + let (result, meta) = self.add_transaction(&mut pool, origin, tx); + + // Only collect metadata for successful insertions + if result.is_ok() && + let Some(meta) = meta + { + added_metas.push(meta); + } + + result + }) .collect::>(); // Enforce the pool size limits if at least one transaction was added successfully - let discarded = if added.iter().any(Result::is_ok) { + let discarded = if results.iter().any(Result::is_ok) { pool.discard_worst() } else { Default::default() }; - (added, discarded) + (results, added_metas, discarded) }; + for meta in added_metas { + self.on_added_transaction(meta); + } + if !discarded.is_empty() { // Delete any blobs associated with discarded blob transactions self.delete_discarded_blobs(discarded.iter()); @@ -627,7 +619,7 @@ where // A newly added transaction may be immediately discarded, so we need to // adjust the result here - for res in &mut added { + for res in &mut results { if let Ok(AddedTransactionOutcome { hash, .. }) = res && discarded_hashes.contains(hash) { @@ -636,7 +628,42 @@ where } }; - added + results + } + + /// Process a transaction that was added to the pool. + /// + /// Performs blob storage operations and sends all notifications. This should be called + /// after the pool write lock has been released to avoid blocking pool operations. + fn on_added_transaction(&self, meta: AddedTransactionMeta) { + // Handle blob sidecar storage and notifications for EIP-4844 transactions + if let Some(sidecar) = meta.blob_sidecar { + let hash = *meta.added.hash(); + self.on_new_blob_sidecar(&hash, &sidecar); + self.insert_blob(hash, sidecar); + } + + // Delete replaced blob sidecar if any + if let Some(replaced) = meta.added.replaced_blob_transaction() { + debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced); + self.delete_blob(replaced); + } + + // Delete discarded blob sidecars if any, this doesnt do any IO. + if let Some(discarded) = meta.added.discarded_transactions() { + self.delete_discarded_blobs(discarded.iter()); + } + + // Notify pending transaction listeners + if let Some(pending) = meta.added.as_pending() { + self.on_new_pending_transaction(pending); + } + + // Notify event listeners + self.notify_event_listeners(&meta.added); + + // Notify new transaction listeners + self.on_new_transaction(meta.added.into_new_transaction_event()); } /// Notify all listeners about a new pending transaction. @@ -1195,6 +1222,18 @@ impl fmt::Debug for PoolInner { } } +/// Metadata for a transaction that was added to the pool. +/// +/// This holds all the data needed to complete post-insertion operations (notifications, +/// blob storage). +#[derive(Debug)] +struct AddedTransactionMeta { + /// The transaction that was added to the pool + added: AddedTransaction, + /// Optional blob sidecar for EIP-4844 transactions + blob_sidecar: Option, +} + /// Tracks an added transaction and all graph changes caused by adding it. #[derive(Debug, Clone)] pub struct AddedPendingTransaction {