From 84875e90df940d6d7e7737e6e47d94eefaff44df Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 30 Jun 2023 10:52:04 +0200 Subject: [PATCH] feat(txpool): add transaction_event_listener function (#3493) --- crates/transaction-pool/src/lib.rs | 4 ++++ crates/transaction-pool/src/pool/listener.rs | 2 +- crates/transaction-pool/src/pool/mod.rs | 14 ++++++++++++++ crates/transaction-pool/src/traits.rs | 5 +++++ 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index ccafbe9778..65e2e0e958 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -280,6 +280,10 @@ where Ok(transactions) } + fn transaction_event_listener(&self, tx_hash: TxHash) -> Option { + self.pool.add_transaction_event_listener(tx_hash) + } + fn pending_transactions_listener(&self) -> Receiver { self.pool.add_pending_listener() } diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index 39950f7a4c..b2197e4328 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -9,7 +9,7 @@ use std::{ }; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -/// A Stream that receives [TransactionEvent] for the transactions +/// A Stream that receives [TransactionEvent] for the transaction with the given hash. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TransactionEvents { diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 8832ed644f..30cd3daffc 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -204,6 +204,20 @@ where rx } + /// If the pool contains the transaction, this adds a new listener that gets notified about + /// transaction events. + pub(crate) fn add_transaction_event_listener( + &self, + tx_hash: TxHash, + ) -> Option { + let pool = self.pool.read(); + if pool.contains(&tx_hash) { + Some(self.event_listener.write().subscribe(tx_hash)) + } else { + None + } + } + /// Returns hashes of _all_ transactions in the pool. pub(crate) fn pooled_transactions_hashes(&self) -> Vec { let pool = self.pool.read(); diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 05c027a804..121889bb18 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -89,6 +89,11 @@ pub trait TransactionPool: Send + Sync + Clone { transactions: Vec, ) -> PoolResult>>; + /// Returns a new transaction change event stream for the given transaction. + /// + /// Returns `None` if the transaction is not in the pool. + fn transaction_event_listener(&self, tx_hash: TxHash) -> Option; + /// Returns a new Stream that yields transactions hashes for new ready transactions. /// /// Consumer: RPC