From ca99ee2ec95cf30f3b56a700765d0cffd1ad5753 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 17 Aug 2023 17:19:39 +0200 Subject: [PATCH] fix: broadcast promoted transactions (#4248) --- crates/transaction-pool/src/pool/mod.rs | 99 +++++++++++++++++++--- crates/transaction-pool/src/pool/txpool.rs | 41 ++++++--- 2 files changed, 115 insertions(+), 25 deletions(-) diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 7bb1e9f30b..5056a39054 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -269,11 +269,15 @@ where last_seen_block_number: number, pending_basefee: pending_block_base_fee, }; + + // update the pool let outcome = self.pool.write().on_canonical_state_change( block_info, mined_transactions, changed_senders, ); + + // notify listeners about updates self.notify_on_new_state(outcome); } @@ -285,7 +289,8 @@ where let UpdateOutcome { promoted, discarded } = self.pool.write().update_accounts(changed_senders); let mut listener = self.event_listener.write(); - promoted.iter().for_each(|tx| listener.pending(tx, None)); + + promoted.iter().for_each(|tx| listener.pending(tx.hash(), None)); discarded.iter().for_each(|tx| listener.discarded(tx)); } @@ -406,7 +411,7 @@ where } // broadcast all pending transactions to the listener - for tx_hash in pending.pending_transactions() { + for tx_hash in pending.pending_transactions(listener.kind) { match listener.sender.try_send(tx_hash) { Ok(()) => {} Err(err) => { @@ -448,13 +453,39 @@ where } /// Notifies transaction listeners about changes after a block was processed. - fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome) { + fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome) { + // notify about promoted pending transactions + { + let mut transaction_listeners = self.pending_transaction_listener.lock(); + transaction_listeners.retain_mut(|listener| { + // broadcast all pending transactions to the listener + for tx_hash in outcome.pending_transactions(listener.kind) { + match listener.sender.try_send(tx_hash) { + Ok(()) => {} + Err(err) => { + return if matches!(err, mpsc::error::TrySendError::Full(_)) { + debug!( + target: "txpool", + "[{:?}] failed to send pending tx; channel full", + tx_hash, + ); + true + } else { + false + } + } + } + } + true + }); + } + let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome; let mut listener = self.event_listener.write(); mined.iter().for_each(|tx| listener.mined(tx, block_hash)); - promoted.iter().for_each(|tx| listener.pending(tx, None)); + promoted.iter().for_each(|tx| listener.pending(tx.hash(), None)); discarded.iter().for_each(|tx| listener.discarded(tx)); } @@ -467,7 +498,7 @@ where let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx; listener.pending(transaction.hash(), replaced.clone()); - promoted.iter().for_each(|tx| listener.pending(tx, None)); + promoted.iter().for_each(|tx| listener.pending(tx.hash(), None)); discarded.iter().for_each(|tx| listener.discarded(tx)); } AddedTransaction::Parked { transaction, replaced, .. } => { @@ -605,15 +636,23 @@ pub struct AddedPendingTransaction { /// Replaced transaction. replaced: Option>>, /// transactions promoted to the pending queue - promoted: Vec, + promoted: Vec>>, /// transaction that failed and became discarded discarded: Vec, } impl AddedPendingTransaction { - /// Returns all transactions that were promoted to the pending pool - pub(crate) fn pending_transactions(&self) -> impl Iterator + '_ { - std::iter::once(self.transaction.hash()).chain(self.promoted.iter()).copied() + /// Returns all transactions that were promoted to the pending pool and adhere to the given + /// [PendingTransactionListenerKind]. + /// + /// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that + /// are allowed to be propagated are returned. + pub(crate) fn pending_transactions( + &self, + kind: PendingTransactionListenerKind, + ) -> impl Iterator + '_ { + let iter = std::iter::once(&self.transaction).chain(self.promoted.iter()); + PendingTransactionIter { kind, iter } } /// Returns if the transaction should be propagated. @@ -622,6 +661,29 @@ impl AddedPendingTransaction { } } +pub(crate) struct PendingTransactionIter { + kind: PendingTransactionListenerKind, + iter: Iter, +} + +impl<'a, Iter, T> Iterator for PendingTransactionIter +where + Iter: Iterator>>, + T: PoolTransaction + 'a, +{ + type Item = H256; + + fn next(&mut self) -> Option { + loop { + let next = self.iter.next()?; + if self.kind.is_propagate_only() && !next.propagate { + continue + } + return Some(*next.hash()) + } + } +} + /// Represents a transaction that was added into the pool and its state #[derive(Debug, Clone)] pub enum AddedTransaction { @@ -671,13 +733,28 @@ impl AddedTransaction { /// Contains all state changes after a [`CanonicalStateUpdate`] was processed #[derive(Debug)] -pub(crate) struct OnNewCanonicalStateOutcome { +pub(crate) struct OnNewCanonicalStateOutcome { /// Hash of the block. pub(crate) block_hash: H256, /// All mined transactions. pub(crate) mined: Vec, /// Transactions promoted to the ready queue. - pub(crate) promoted: Vec, + pub(crate) promoted: Vec>>, /// transaction that were discarded during the update pub(crate) discarded: Vec, } + +impl OnNewCanonicalStateOutcome { + /// Returns all transactions that were promoted to the pending pool and adhere to the given + /// [PendingTransactionListenerKind]. + /// + /// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that + /// are allowed to be propagated are returned. + pub(crate) fn pending_transactions( + &self, + kind: PendingTransactionListenerKind, + ) -> impl Iterator + '_ { + let iter = self.promoted.iter(); + PendingTransactionIter { kind, iter } + } +} diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 0aa7f84be0..8f8ba2de0f 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -266,7 +266,7 @@ impl TxPool { pub(crate) fn update_accounts( &mut self, changed_senders: HashMap, - ) -> UpdateOutcome { + ) -> UpdateOutcome { // track changed accounts self.sender_info.extend(changed_senders.clone()); // Apply the state changes to the total set of transactions which triggers sub-pool updates. @@ -287,7 +287,7 @@ impl TxPool { block_info: BlockInfo, mined_transactions: Vec, changed_senders: HashMap, - ) -> OnNewCanonicalStateOutcome { + ) -> OnNewCanonicalStateOutcome { // update block info let block_hash = block_info.last_seen_block_hash; self.all_transactions.set_block_info(block_info); @@ -409,7 +409,7 @@ impl TxPool { /// Maintenance task to apply a series of updates. /// /// This will move/discard the given transaction according to the `PoolUpdate` - fn process_updates(&mut self, updates: Vec) -> UpdateOutcome { + fn process_updates(&mut self, updates: Vec) -> UpdateOutcome { let mut outcome = UpdateOutcome::default(); for update in updates { let PoolUpdate { id, hash, current, destination } = update; @@ -422,9 +422,11 @@ impl TxPool { } Destination::Pool(move_to) => { debug_assert!(!move_to.eq(¤t), "destination must be different"); - self.move_transaction(current, move_to, &id); + let moved = self.move_transaction(current, move_to, &id); if matches!(move_to, SubPool::Pending) { - outcome.promoted.push(hash); + if let Some(tx) = moved { + outcome.promoted.push(tx); + } } } } @@ -436,10 +438,15 @@ impl TxPool { /// /// This will remove the given transaction from one sub-pool and insert it into the other /// sub-pool. - fn move_transaction(&mut self, from: SubPool, to: SubPool, id: &TransactionId) { - if let Some(tx) = self.remove_from_subpool(from, id) { - self.add_transaction_to_subpool(to, tx); - } + fn move_transaction( + &mut self, + from: SubPool, + to: SubPool, + id: &TransactionId, + ) -> Option>> { + let tx = self.remove_from_subpool(from, id)?; + self.add_transaction_to_subpool(to, tx.clone()); + Some(tx) } /// Removes and returns all matching transactions from the pool. @@ -1324,14 +1331,20 @@ impl PoolInternalTransaction { } /// Tracks the result after updating the pool -#[derive(Default, Debug)] -pub(crate) struct UpdateOutcome { - /// transactions promoted to the ready queue - pub(crate) promoted: Vec, - /// transaction that failed and became discarded +#[derive(Debug)] +pub(crate) struct UpdateOutcome { + /// transactions promoted to the pending pool + pub(crate) promoted: Vec>>, + /// transaction that failed and were discarded pub(crate) discarded: Vec, } +impl Default for UpdateOutcome { + fn default() -> Self { + Self { promoted: vec![], discarded: vec![] } + } +} + /// Represents the outcome of a prune pub struct PruneResult { /// A list of added transactions that a pruned marker satisfied