From d427ade178e13a2dd9770734b77f6941459537d8 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 17 Aug 2023 01:42:11 +0200 Subject: [PATCH] fix: propagate promoted transactions (#4236) --- crates/transaction-pool/src/pool/mod.rs | 71 ++++++++++++++----------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 8279633690..7bb1e9f30b 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -322,8 +322,8 @@ where let hash = *added.hash(); // Notify about new pending transactions - if added.is_pending() { - self.on_new_pending_transaction(&added); + if let Some(pending) = added.as_pending() { + self.on_new_pending_transaction(pending); } // Notify tx event listeners @@ -394,8 +394,7 @@ where } /// Notify all listeners about a new pending transaction. - fn on_new_pending_transaction(&self, pending: &AddedTransaction) { - let tx_hash = *pending.hash(); + fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction) { let propagate_allowed = pending.is_propagate_allowed(); let mut transaction_listeners = self.pending_transaction_listener.lock(); @@ -406,25 +405,29 @@ where return !listener.sender.is_closed() } - match listener.sender.try_send(tx_hash) { - Ok(()) => true, - Err(err) => { - if matches!(err, mpsc::error::TrySendError::Full(_)) { - debug!( - target: "txpool", - "[{:?}] failed to send pending tx; channel full", - tx_hash, - ); - true - } else { - false + // broadcast all pending transactions to the listener + for tx_hash in pending.pending_transactions() { + match listener.sender.try_send(tx_hash) { + Ok(()) => {} + Err(err) => { + return if matches!(err, mpsc::error::TrySendError::Full(_)) { + debug!( + target: "txpool", + "[{:?}] failed to send pending tx; channel full", + tx_hash, + ); + true + } else { + false + } } } } + true }); } - /// Notify all listeners about a new pending transaction. + /// Notify all listeners about a newly inserted pending transaction. fn on_new_transaction(&self, event: NewTransactionEvent) { let mut transaction_listeners = self.transaction_listener.lock(); @@ -455,7 +458,7 @@ where discarded.iter().for_each(|tx| listener.discarded(tx)); } - /// Fire events for the newly added transaction. + /// Fire events for the newly added transaction if there are any. fn notify_event_listeners(&self, tx: &AddedTransaction) { let mut listener = self.event_listener.write(); @@ -601,12 +604,24 @@ pub struct AddedPendingTransaction { transaction: Arc>, /// Replaced transaction. replaced: Option>>, - /// transactions promoted to the ready queue - promoted: Vec, + /// transactions promoted to the pending queue + promoted: Vec, /// transaction that failed and became discarded discarded: Vec, } +impl AddedPendingTransaction { + /// Returns all transactions that were promoted to the pending pool + pub(crate) fn pending_transactions(&self) -> impl Iterator + '_ { + std::iter::once(self.transaction.hash()).chain(self.promoted.iter()).copied() + } + + /// Returns if the transaction should be propagated. + pub(crate) fn is_propagate_allowed(&self) -> bool { + self.transaction.propagate + } +} + /// Represents a transaction that was added into the pool and its state #[derive(Debug, Clone)] pub enum AddedTransaction { @@ -625,9 +640,12 @@ pub enum AddedTransaction { } impl AddedTransaction { - /// Returns whether the transaction is pending - pub(crate) fn is_pending(&self) -> bool { - matches!(self, AddedTransaction::Pending(_)) + /// Returns whether the transaction has been added to the pending pool. + pub(crate) fn as_pending(&self) -> Option<&AddedPendingTransaction> { + match self { + AddedTransaction::Pending(tx) => Some(tx), + _ => None, + } } /// Returns the hash of the transaction @@ -637,13 +655,6 @@ impl AddedTransaction { AddedTransaction::Parked { transaction, .. } => transaction.hash(), } } - /// Returns if the transaction should be propagated. - pub(crate) fn is_propagate_allowed(&self) -> bool { - match self { - AddedTransaction::Pending(transaction) => transaction.transaction.propagate, - AddedTransaction::Parked { transaction, .. } => transaction.propagate, - } - } /// Converts this type into the event type for listeners pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent {