From 7da36e042125397af89b9b477f9292dc676e69f8 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Tue, 4 Jul 2023 22:11:11 +0300 Subject: [PATCH] feat(txpool): all transactions event listener (#3565) --- crates/transaction-pool/src/lib.rs | 10 +- crates/transaction-pool/src/noop.rs | 14 +- crates/transaction-pool/src/pool/events.rs | 39 +++++ crates/transaction-pool/src/pool/listener.rs | 162 ++++++++++--------- crates/transaction-pool/src/pool/mod.rs | 17 +- crates/transaction-pool/src/traits.rs | 6 +- 6 files changed, 163 insertions(+), 85 deletions(-) diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 797d448fac..917c2aecad 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -105,7 +105,7 @@ pub use crate::{ }, error::PoolResult, ordering::{GasCostOrdering, TransactionOrdering}, - pool::TransactionEvents, + pool::{AllTransactionsEvents, PoolTransactionEvent, TransactionEvent, TransactionEvents}, traits::{ AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount, NewTransactionEvent, PoolSize, PoolTransaction, PooledTransaction, PropagateKind, @@ -288,12 +288,16 @@ where self.pool.add_transaction_event_listener(tx_hash) } + fn all_transactions_event_listener(&self) -> AllTransactionsEvents { + self.pool.add_all_transactions_event_listener() + } + fn pending_transactions_listener(&self) -> Receiver { self.pool.add_pending_listener() } - fn transactions_listener(&self) -> Receiver> { - self.pool.add_transaction_listener() + fn new_transactions_listener(&self) -> Receiver> { + self.pool.add_new_transaction_listener() } fn pooled_transaction_hashes(&self) -> Vec { diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index e3b7a3e81a..50be4a2bcf 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -4,10 +4,10 @@ //! to be generic over it. use crate::{ - error::PoolError, AllPoolTransactions, BestTransactions, BlockInfo, NewTransactionEvent, - PoolResult, PoolSize, PoolTransaction, PooledTransaction, PropagatedTransactions, - TransactionEvents, TransactionOrigin, TransactionPool, TransactionValidationOutcome, - TransactionValidator, ValidPoolTransaction, + error::PoolError, AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo, + NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PooledTransaction, + PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool, + TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction, }; use reth_primitives::{Address, TxHash}; use std::{marker::PhantomData, sync::Arc}; @@ -73,11 +73,15 @@ impl TransactionPool for NoopTransactionPool { None } + fn all_transactions_event_listener(&self) -> AllTransactionsEvents { + AllTransactionsEvents { events: mpsc::channel(1).1 } + } + fn pending_transactions_listener(&self) -> Receiver { mpsc::channel(1).1 } - fn transactions_listener(&self) -> Receiver> { + fn new_transactions_listener(&self) -> Receiver> { mpsc::channel(1).1 } diff --git a/crates/transaction-pool/src/pool/events.rs b/crates/transaction-pool/src/pool/events.rs index 8026bdfefc..62da13392e 100644 --- a/crates/transaction-pool/src/pool/events.rs +++ b/crates/transaction-pool/src/pool/events.rs @@ -5,6 +5,32 @@ use std::sync::Arc; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +/// Wrapper around a transaction hash and the event that happened to it. +#[derive(Debug)] +pub struct PoolTransactionEvent(TxHash, TransactionEvent); + +impl PoolTransactionEvent { + /// Create a new transaction event. + pub fn new(hash: TxHash, event: TransactionEvent) -> Self { + Self(hash, event) + } + + /// The hash of the transaction this event is about. + pub fn hash(&self) -> TxHash { + self.0 + } + + /// The event that happened to the transaction. + pub fn event(&self) -> &TransactionEvent { + &self.1 + } + + /// Split the event into its components. + pub fn split(self) -> (TxHash, TransactionEvent) { + (self.0, self.1) + } +} + /// Various events that describe status changes of a transaction. #[derive(Debug, Clone, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -26,3 +52,16 @@ pub enum TransactionEvent { /// Transaction was propagated to peers. Propagated(Arc>), } + +impl TransactionEvent { + /// Returns `true` if the event is final and no more events are expected for this transaction + /// hash. + pub fn is_final(&self) -> bool { + matches!( + self, + TransactionEvent::Replaced(_) | + TransactionEvent::Mined(_) | + TransactionEvent::Discarded + ) + } +} diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index b2197e4328..f3ff535453 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -1,20 +1,30 @@ //! Listeners for the transaction-pool -use crate::{pool::events::TransactionEvent, traits::PropagateKind}; +use crate::{ + pool::events::{PoolTransactionEvent, TransactionEvent}, + traits::PropagateKind, +}; use futures_util::Stream; use reth_primitives::{TxHash, H256}; use std::{ collections::{hash_map::Entry, HashMap}, + pin::Pin, sync::Arc, + task::{Context, Poll}, +}; +use tokio::sync::mpsc::{ + error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender, }; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -/// A Stream that receives [TransactionEvent] for the transaction with the given hash. +/// The size of the event channel used to propagate transaction events. +const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024; + +/// A Stream that receives [TransactionEvent] only for the transaction with the given hash. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TransactionEvents { hash: TxHash, - events: UnboundedReceiver, + events: UnboundedReceiver, } impl TransactionEvents { @@ -25,7 +35,7 @@ impl TransactionEvents { } impl Stream for TransactionEvents { - type Item = TransactionEvent; + type Item = PoolTransactionEvent; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -35,133 +45,141 @@ impl Stream for TransactionEvents { } } -type EventBroadcast = UnboundedSender; +/// A Stream that receives [PoolTransactionEvent] for _all_ transaction. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct AllTransactionsEvents { + pub(crate) events: Receiver, +} + +impl Stream for AllTransactionsEvents { + type Item = PoolTransactionEvent; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().events.poll_recv(cx) + } +} /// A type that broadcasts [`TransactionEvent`] to installed listeners. /// /// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to /// all active receivers. -#[derive(Debug, Default)] +#[derive(Default, Debug)] pub(crate) struct PoolEventBroadcast { - /// All listeners for certain transaction events. - broadcasters: HashMap, + /// All listeners for all transaction events. + all_events_broadcaster: AllPoolEventsBroadcaster, + /// All listeners for events for a certain transaction hash. + broadcasters_by_hash: HashMap, } impl PoolEventBroadcast { /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash. - fn broadcast_with(&mut self, hash: &TxHash, callback: F) - where - F: FnOnce(&mut PoolEventBroadcaster), - { - let is_done = if let Some(sink) = self.broadcasters.get_mut(hash) { - callback(sink); - sink.is_done() - } else { - false - }; + fn broadcast_event(&mut self, hash: &TxHash, event: TransactionEvent) { + // Broadcast to all listeners for the transaction hash. + if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) { + sink.get_mut().broadcast(*hash, event.clone()); - if is_done { - self.broadcasters.remove(hash); + if sink.get().is_empty() || event.is_final() { + sink.remove(); + } } + + // Broadcast to all listeners for all transactions. + self.all_events_broadcaster.broadcast(*hash, event); } /// Create a new subscription for the given transaction hash. pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - match self.broadcasters.entry(tx_hash) { + match self.broadcasters_by_hash.entry(tx_hash) { Entry::Occupied(mut entry) => { entry.get_mut().senders.push(tx); } Entry::Vacant(entry) => { - entry.insert(PoolEventBroadcaster { is_done: false, senders: vec![tx] }); + entry.insert(PoolEventBroadcaster { senders: vec![tx] }); } }; TransactionEvents { hash: tx_hash, events: rx } } + /// Create a new subscription for all transactions. + pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents { + let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE); + self.all_events_broadcaster.senders.push(tx); + AllTransactionsEvents { events: rx } + } + /// Notify listeners about a transaction that was added to the pending queue. pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) { - self.broadcast_with(tx, |notifier| notifier.pending()); + self.broadcast_event(tx, TransactionEvent::Pending); if let Some(replaced) = replaced { // notify listeners that this transaction was replaced - self.broadcast_with(replaced, |notifier| notifier.replaced(*tx)); + self.broadcast_event(replaced, TransactionEvent::Replaced(*tx)); } } /// Notify listeners about a transaction that was added to the queued pool. pub(crate) fn queued(&mut self, tx: &TxHash) { - self.broadcast_with(tx, |notifier| notifier.queued()); + self.broadcast_event(tx, TransactionEvent::Queued); } /// Notify listeners about a transaction that was propagated. pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec) { - self.broadcast_with(tx, |notifier| notifier.propagated(peers)); + self.broadcast_event(tx, TransactionEvent::Propagated(Arc::new(peers))); } /// Notify listeners about a transaction that was discarded. pub(crate) fn discarded(&mut self, tx: &TxHash) { - self.broadcast_with(tx, |notifier| notifier.discarded()); + self.broadcast_event(tx, TransactionEvent::Discarded); } /// Notify listeners that the transaction was mined pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: H256) { - self.broadcast_with(tx, |notifier| notifier.mined(block_hash)); + self.broadcast_event(tx, TransactionEvent::Mined(block_hash)); + } +} + +/// All Sender half(s) of the event channels for all transactions. +/// +/// This mimics [tokio::sync::broadcast] but uses separate channels. +#[derive(Default, Debug)] +struct AllPoolEventsBroadcaster { + /// Corresponding sender half(s) for event listener channel + senders: Vec>, +} + +impl AllPoolEventsBroadcaster { + // Broadcast an event to all listeners. Dropped listeners are silently evicted. + fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) { + self.senders.retain(|sender| { + match sender.try_send(PoolTransactionEvent::new(tx_hash, event.clone())) { + Ok(_) | Err(TrySendError::Full(_)) => true, + Err(TrySendError::Closed(_)) => false, + } + }) } } /// All Sender half(s) of the event channels for a specific transaction. /// -/// This mimics [tokio::sync::broadcast] but uses separate channels. -#[derive(Debug)] +/// This mimics [tokio::sync::broadcast] but uses separate channels and is unbounded. +#[derive(Default, Debug)] struct PoolEventBroadcaster { - /// Tracks whether the transaction this notifier can stop because the transaction was - /// completed, or removed. - is_done: bool, /// Corresponding sender half(s) for event listener channel - senders: Vec, + senders: Vec>, } impl PoolEventBroadcaster { - fn broadcast(&mut self, event: TransactionEvent) { - self.senders.retain(|sender| sender.send(event.clone()).is_ok()) + /// Returns `true` if there are no more listeners remaining. + fn is_empty(&self) -> bool { + self.senders.is_empty() } - fn is_done(&self) -> bool { - self.senders.is_empty() || self.is_done - } - - /// Transaction was moved to the pending queue. - fn pending(&mut self) { - self.broadcast(TransactionEvent::Pending) - } - - /// Transaction was moved to the queued pool - fn queued(&mut self) { - self.broadcast(TransactionEvent::Queued) - } - - /// Transaction was replaced with the given transaction - fn replaced(&mut self, hash: TxHash) { - self.broadcast(TransactionEvent::Replaced(hash)); - self.is_done = true; - } - - /// Transaction was mined. - fn mined(&mut self, block_hash: H256) { - self.broadcast(TransactionEvent::Mined(block_hash)); - self.is_done = true; - } - - /// Transaction was propagated. - fn propagated(&mut self, peers: Vec) { - self.broadcast(TransactionEvent::Propagated(Arc::new(peers))); - } - - /// Transaction was replaced with the given transaction - fn discarded(&mut self) { - self.broadcast(TransactionEvent::Discarded); - self.is_done = true; + // Broadcast an event to all listeners. Dropped listeners are silently evicted. + fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) { + self.senders + .retain(|sender| sender.send(PoolTransactionEvent::new(tx_hash, event.clone())).is_ok()) } } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 30cd3daffc..0d6b0983fa 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -81,7 +81,6 @@ use crate::{ CanonicalStateUpdate, ChangedAccount, PoolConfig, TransactionOrdering, TransactionValidator, }; use best::BestTransactions; -pub use events::TransactionEvent; use parking_lot::{Mutex, RwLock}; use reth_primitives::{Address, TxHash, H256}; use std::{ @@ -93,16 +92,19 @@ use std::{ use tokio::sync::mpsc; use tracing::debug; -mod best; mod events; +pub use events::{PoolTransactionEvent, TransactionEvent}; + mod listener; +pub use listener::{AllTransactionsEvents, TransactionEvents}; + +mod best; mod parked; pub(crate) mod pending; pub(crate) mod size; pub(crate) mod state; pub mod txpool; mod update; -pub use listener::TransactionEvents; /// Transaction pool internals. pub struct PoolInner { @@ -197,7 +199,9 @@ where } /// Adds a new transaction listener to the pool that gets notified about every new transaction - pub fn add_transaction_listener(&self) -> mpsc::Receiver> { + pub fn add_new_transaction_listener( + &self, + ) -> mpsc::Receiver> { const TX_LISTENER_BUFFER_SIZE: usize = 1024; let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE); self.transaction_listener.lock().push(tx); @@ -218,6 +222,11 @@ where } } + /// Adds a listener for all transaction events. + pub(crate) fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents { + self.event_listener.write().subscribe_all() + } + /// Returns hashes of _all_ transactions in the pool. pub(crate) fn pooled_transactions_hashes(&self) -> Vec { let pool = self.pool.read(); diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 256afe9964..6e5ebe197d 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -2,6 +2,7 @@ use crate::{ error::PoolResult, pool::{state::SubPool, TransactionEvents}, validate::ValidPoolTransaction, + AllTransactionsEvents, }; use reth_primitives::{ Address, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, Transaction, @@ -94,13 +95,16 @@ pub trait TransactionPool: Send + Sync + Clone { /// Returns `None` if the transaction is not in the pool. fn transaction_event_listener(&self, tx_hash: TxHash) -> Option; + /// Returns a new transaction change event stream for _all_ transactions in the pool. + fn all_transactions_event_listener(&self) -> AllTransactionsEvents; + /// Returns a new Stream that yields transactions hashes for new ready transactions. /// /// Consumer: RPC fn pending_transactions_listener(&self) -> Receiver; /// Returns a new stream that yields new valid transactions added to the pool. - fn transactions_listener(&self) -> Receiver>; + fn new_transactions_listener(&self) -> Receiver>; /// Returns the _hashes_ of all transactions in the pool. ///