fix: propagate promoted transactions (#4236)

This commit is contained in:
Matthias Seitz
2023-08-17 01:42:11 +02:00
committed by GitHub
parent 40f9576c3a
commit d427ade178

View File

@@ -322,8 +322,8 @@ where
let hash = *added.hash(); let hash = *added.hash();
// Notify about new pending transactions // Notify about new pending transactions
if added.is_pending() { if let Some(pending) = added.as_pending() {
self.on_new_pending_transaction(&added); self.on_new_pending_transaction(pending);
} }
// Notify tx event listeners // Notify tx event listeners
@@ -394,8 +394,7 @@ where
} }
/// Notify all listeners about a new pending transaction. /// Notify all listeners about a new pending transaction.
fn on_new_pending_transaction(&self, pending: &AddedTransaction<T::Transaction>) { fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
let tx_hash = *pending.hash();
let propagate_allowed = pending.is_propagate_allowed(); let propagate_allowed = pending.is_propagate_allowed();
let mut transaction_listeners = self.pending_transaction_listener.lock(); let mut transaction_listeners = self.pending_transaction_listener.lock();
@@ -406,25 +405,29 @@ where
return !listener.sender.is_closed() return !listener.sender.is_closed()
} }
match listener.sender.try_send(tx_hash) { // broadcast all pending transactions to the listener
Ok(()) => true, for tx_hash in pending.pending_transactions() {
Err(err) => { match listener.sender.try_send(tx_hash) {
if matches!(err, mpsc::error::TrySendError::Full(_)) { Ok(()) => {}
debug!( Err(err) => {
target: "txpool", return if matches!(err, mpsc::error::TrySendError::Full(_)) {
"[{:?}] failed to send pending tx; channel full", debug!(
tx_hash, target: "txpool",
); "[{:?}] failed to send pending tx; channel full",
true tx_hash,
} else { );
false true
} else {
false
}
} }
} }
} }
true
}); });
} }
/// Notify all listeners about a new pending transaction. /// Notify all listeners about a newly inserted pending transaction.
fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) { fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
let mut transaction_listeners = self.transaction_listener.lock(); let mut transaction_listeners = self.transaction_listener.lock();
@@ -455,7 +458,7 @@ where
discarded.iter().for_each(|tx| listener.discarded(tx)); discarded.iter().for_each(|tx| listener.discarded(tx));
} }
/// Fire events for the newly added transaction. /// Fire events for the newly added transaction if there are any.
fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) { fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
let mut listener = self.event_listener.write(); let mut listener = self.event_listener.write();
@@ -601,12 +604,24 @@ pub struct AddedPendingTransaction<T: PoolTransaction> {
transaction: Arc<ValidPoolTransaction<T>>, transaction: Arc<ValidPoolTransaction<T>>,
/// Replaced transaction. /// Replaced transaction.
replaced: Option<Arc<ValidPoolTransaction<T>>>, replaced: Option<Arc<ValidPoolTransaction<T>>>,
/// transactions promoted to the ready queue /// transactions promoted to the pending queue
promoted: Vec<TxHash>, promoted: Vec<H256>,
/// transaction that failed and became discarded /// transaction that failed and became discarded
discarded: Vec<TxHash>, discarded: Vec<TxHash>,
} }
impl<T: PoolTransaction> AddedPendingTransaction<T> {
/// Returns all transactions that were promoted to the pending pool
pub(crate) fn pending_transactions(&self) -> impl Iterator<Item = H256> + '_ {
std::iter::once(self.transaction.hash()).chain(self.promoted.iter()).copied()
}
/// Returns if the transaction should be propagated.
pub(crate) fn is_propagate_allowed(&self) -> bool {
self.transaction.propagate
}
}
/// Represents a transaction that was added into the pool and its state /// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> { pub enum AddedTransaction<T: PoolTransaction> {
@@ -625,9 +640,12 @@ pub enum AddedTransaction<T: PoolTransaction> {
} }
impl<T: PoolTransaction> AddedTransaction<T> { impl<T: PoolTransaction> AddedTransaction<T> {
/// Returns whether the transaction is pending /// Returns whether the transaction has been added to the pending pool.
pub(crate) fn is_pending(&self) -> bool { pub(crate) fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
matches!(self, AddedTransaction::Pending(_)) match self {
AddedTransaction::Pending(tx) => Some(tx),
_ => None,
}
} }
/// Returns the hash of the transaction /// Returns the hash of the transaction
@@ -637,13 +655,6 @@ impl<T: PoolTransaction> AddedTransaction<T> {
AddedTransaction::Parked { transaction, .. } => transaction.hash(), AddedTransaction::Parked { transaction, .. } => transaction.hash(),
} }
} }
/// Returns if the transaction should be propagated.
pub(crate) fn is_propagate_allowed(&self) -> bool {
match self {
AddedTransaction::Pending(transaction) => transaction.transaction.propagate,
AddedTransaction::Parked { transaction, .. } => transaction.propagate,
}
}
/// Converts this type into the event type for listeners /// Converts this type into the event type for listeners
pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> { pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {