mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 09:38:24 -05:00
feat(txpool): all transactions event listener (#3565)
This commit is contained in:
@@ -105,7 +105,7 @@ pub use crate::{
|
||||
},
|
||||
error::PoolResult,
|
||||
ordering::{GasCostOrdering, TransactionOrdering},
|
||||
pool::TransactionEvents,
|
||||
pool::{AllTransactionsEvents, PoolTransactionEvent, TransactionEvent, TransactionEvents},
|
||||
traits::{
|
||||
AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount,
|
||||
NewTransactionEvent, PoolSize, PoolTransaction, PooledTransaction, PropagateKind,
|
||||
@@ -288,12 +288,16 @@ where
|
||||
self.pool.add_transaction_event_listener(tx_hash)
|
||||
}
|
||||
|
||||
fn all_transactions_event_listener(&self) -> AllTransactionsEvents {
|
||||
self.pool.add_all_transactions_event_listener()
|
||||
}
|
||||
|
||||
fn pending_transactions_listener(&self) -> Receiver<TxHash> {
|
||||
self.pool.add_pending_listener()
|
||||
}
|
||||
|
||||
fn transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
self.pool.add_transaction_listener()
|
||||
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
self.pool.add_new_transaction_listener()
|
||||
}
|
||||
|
||||
fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
|
||||
|
||||
@@ -4,10 +4,10 @@
|
||||
//! to be generic over it.
|
||||
|
||||
use crate::{
|
||||
error::PoolError, AllPoolTransactions, BestTransactions, BlockInfo, NewTransactionEvent,
|
||||
PoolResult, PoolSize, PoolTransaction, PooledTransaction, PropagatedTransactions,
|
||||
TransactionEvents, TransactionOrigin, TransactionPool, TransactionValidationOutcome,
|
||||
TransactionValidator, ValidPoolTransaction,
|
||||
error::PoolError, AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo,
|
||||
NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PooledTransaction,
|
||||
PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool,
|
||||
TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction,
|
||||
};
|
||||
use reth_primitives::{Address, TxHash};
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
@@ -73,11 +73,15 @@ impl TransactionPool for NoopTransactionPool {
|
||||
None
|
||||
}
|
||||
|
||||
fn all_transactions_event_listener(&self) -> AllTransactionsEvents {
|
||||
AllTransactionsEvents { events: mpsc::channel(1).1 }
|
||||
}
|
||||
|
||||
fn pending_transactions_listener(&self) -> Receiver<TxHash> {
|
||||
mpsc::channel(1).1
|
||||
}
|
||||
|
||||
fn transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
mpsc::channel(1).1
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,32 @@ use std::sync::Arc;
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Wrapper around a transaction hash and the event that happened to it.
|
||||
#[derive(Debug)]
|
||||
pub struct PoolTransactionEvent(TxHash, TransactionEvent);
|
||||
|
||||
impl PoolTransactionEvent {
|
||||
/// Create a new transaction event.
|
||||
pub fn new(hash: TxHash, event: TransactionEvent) -> Self {
|
||||
Self(hash, event)
|
||||
}
|
||||
|
||||
/// The hash of the transaction this event is about.
|
||||
pub fn hash(&self) -> TxHash {
|
||||
self.0
|
||||
}
|
||||
|
||||
/// The event that happened to the transaction.
|
||||
pub fn event(&self) -> &TransactionEvent {
|
||||
&self.1
|
||||
}
|
||||
|
||||
/// Split the event into its components.
|
||||
pub fn split(self) -> (TxHash, TransactionEvent) {
|
||||
(self.0, self.1)
|
||||
}
|
||||
}
|
||||
|
||||
/// Various events that describe status changes of a transaction.
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
@@ -26,3 +52,16 @@ pub enum TransactionEvent {
|
||||
/// Transaction was propagated to peers.
|
||||
Propagated(Arc<Vec<PropagateKind>>),
|
||||
}
|
||||
|
||||
impl TransactionEvent {
|
||||
/// Returns `true` if the event is final and no more events are expected for this transaction
|
||||
/// hash.
|
||||
pub fn is_final(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
TransactionEvent::Replaced(_) |
|
||||
TransactionEvent::Mined(_) |
|
||||
TransactionEvent::Discarded
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,30 @@
|
||||
//! Listeners for the transaction-pool
|
||||
|
||||
use crate::{pool::events::TransactionEvent, traits::PropagateKind};
|
||||
use crate::{
|
||||
pool::events::{PoolTransactionEvent, TransactionEvent},
|
||||
traits::PropagateKind,
|
||||
};
|
||||
use futures_util::Stream;
|
||||
use reth_primitives::{TxHash, H256};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc::{
|
||||
error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
|
||||
};
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
|
||||
/// A Stream that receives [TransactionEvent] for the transaction with the given hash.
|
||||
/// The size of the event channel used to propagate transaction events.
|
||||
const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
|
||||
|
||||
/// A Stream that receives [TransactionEvent] only for the transaction with the given hash.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
pub struct TransactionEvents {
|
||||
hash: TxHash,
|
||||
events: UnboundedReceiver<TransactionEvent>,
|
||||
events: UnboundedReceiver<PoolTransactionEvent>,
|
||||
}
|
||||
|
||||
impl TransactionEvents {
|
||||
@@ -25,7 +35,7 @@ impl TransactionEvents {
|
||||
}
|
||||
|
||||
impl Stream for TransactionEvents {
|
||||
type Item = TransactionEvent;
|
||||
type Item = PoolTransactionEvent;
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
@@ -35,133 +45,141 @@ impl Stream for TransactionEvents {
|
||||
}
|
||||
}
|
||||
|
||||
type EventBroadcast = UnboundedSender<TransactionEvent>;
|
||||
/// A Stream that receives [PoolTransactionEvent] for _all_ transaction.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
pub struct AllTransactionsEvents {
|
||||
pub(crate) events: Receiver<PoolTransactionEvent>,
|
||||
}
|
||||
|
||||
impl Stream for AllTransactionsEvents {
|
||||
type Item = PoolTransactionEvent;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.get_mut().events.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// A type that broadcasts [`TransactionEvent`] to installed listeners.
|
||||
///
|
||||
/// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to
|
||||
/// all active receivers.
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub(crate) struct PoolEventBroadcast {
|
||||
/// All listeners for certain transaction events.
|
||||
broadcasters: HashMap<TxHash, PoolEventBroadcaster>,
|
||||
/// All listeners for all transaction events.
|
||||
all_events_broadcaster: AllPoolEventsBroadcaster,
|
||||
/// All listeners for events for a certain transaction hash.
|
||||
broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
|
||||
}
|
||||
|
||||
impl PoolEventBroadcast {
|
||||
/// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
|
||||
fn broadcast_with<F>(&mut self, hash: &TxHash, callback: F)
|
||||
where
|
||||
F: FnOnce(&mut PoolEventBroadcaster),
|
||||
{
|
||||
let is_done = if let Some(sink) = self.broadcasters.get_mut(hash) {
|
||||
callback(sink);
|
||||
sink.is_done()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
fn broadcast_event(&mut self, hash: &TxHash, event: TransactionEvent) {
|
||||
// Broadcast to all listeners for the transaction hash.
|
||||
if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
|
||||
sink.get_mut().broadcast(*hash, event.clone());
|
||||
|
||||
if is_done {
|
||||
self.broadcasters.remove(hash);
|
||||
if sink.get().is_empty() || event.is_final() {
|
||||
sink.remove();
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast to all listeners for all transactions.
|
||||
self.all_events_broadcaster.broadcast(*hash, event);
|
||||
}
|
||||
|
||||
/// Create a new subscription for the given transaction hash.
|
||||
pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
match self.broadcasters.entry(tx_hash) {
|
||||
match self.broadcasters_by_hash.entry(tx_hash) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().senders.push(tx);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(PoolEventBroadcaster { is_done: false, senders: vec![tx] });
|
||||
entry.insert(PoolEventBroadcaster { senders: vec![tx] });
|
||||
}
|
||||
};
|
||||
TransactionEvents { hash: tx_hash, events: rx }
|
||||
}
|
||||
|
||||
/// Create a new subscription for all transactions.
|
||||
pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
|
||||
self.all_events_broadcaster.senders.push(tx);
|
||||
AllTransactionsEvents { events: rx }
|
||||
}
|
||||
|
||||
/// Notify listeners about a transaction that was added to the pending queue.
|
||||
pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) {
|
||||
self.broadcast_with(tx, |notifier| notifier.pending());
|
||||
self.broadcast_event(tx, TransactionEvent::Pending);
|
||||
|
||||
if let Some(replaced) = replaced {
|
||||
// notify listeners that this transaction was replaced
|
||||
self.broadcast_with(replaced, |notifier| notifier.replaced(*tx));
|
||||
self.broadcast_event(replaced, TransactionEvent::Replaced(*tx));
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify listeners about a transaction that was added to the queued pool.
|
||||
pub(crate) fn queued(&mut self, tx: &TxHash) {
|
||||
self.broadcast_with(tx, |notifier| notifier.queued());
|
||||
self.broadcast_event(tx, TransactionEvent::Queued);
|
||||
}
|
||||
|
||||
/// Notify listeners about a transaction that was propagated.
|
||||
pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
|
||||
self.broadcast_with(tx, |notifier| notifier.propagated(peers));
|
||||
self.broadcast_event(tx, TransactionEvent::Propagated(Arc::new(peers)));
|
||||
}
|
||||
|
||||
/// Notify listeners about a transaction that was discarded.
|
||||
pub(crate) fn discarded(&mut self, tx: &TxHash) {
|
||||
self.broadcast_with(tx, |notifier| notifier.discarded());
|
||||
self.broadcast_event(tx, TransactionEvent::Discarded);
|
||||
}
|
||||
|
||||
/// Notify listeners that the transaction was mined
|
||||
pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: H256) {
|
||||
self.broadcast_with(tx, |notifier| notifier.mined(block_hash));
|
||||
self.broadcast_event(tx, TransactionEvent::Mined(block_hash));
|
||||
}
|
||||
}
|
||||
|
||||
/// All Sender half(s) of the event channels for all transactions.
|
||||
///
|
||||
/// This mimics [tokio::sync::broadcast] but uses separate channels.
|
||||
#[derive(Default, Debug)]
|
||||
struct AllPoolEventsBroadcaster {
|
||||
/// Corresponding sender half(s) for event listener channel
|
||||
senders: Vec<Sender<PoolTransactionEvent>>,
|
||||
}
|
||||
|
||||
impl AllPoolEventsBroadcaster {
|
||||
// Broadcast an event to all listeners. Dropped listeners are silently evicted.
|
||||
fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) {
|
||||
self.senders.retain(|sender| {
|
||||
match sender.try_send(PoolTransactionEvent::new(tx_hash, event.clone())) {
|
||||
Ok(_) | Err(TrySendError::Full(_)) => true,
|
||||
Err(TrySendError::Closed(_)) => false,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// All Sender half(s) of the event channels for a specific transaction.
|
||||
///
|
||||
/// This mimics [tokio::sync::broadcast] but uses separate channels.
|
||||
#[derive(Debug)]
|
||||
/// This mimics [tokio::sync::broadcast] but uses separate channels and is unbounded.
|
||||
#[derive(Default, Debug)]
|
||||
struct PoolEventBroadcaster {
|
||||
/// Tracks whether the transaction this notifier can stop because the transaction was
|
||||
/// completed, or removed.
|
||||
is_done: bool,
|
||||
/// Corresponding sender half(s) for event listener channel
|
||||
senders: Vec<EventBroadcast>,
|
||||
senders: Vec<UnboundedSender<PoolTransactionEvent>>,
|
||||
}
|
||||
|
||||
impl PoolEventBroadcaster {
|
||||
fn broadcast(&mut self, event: TransactionEvent) {
|
||||
self.senders.retain(|sender| sender.send(event.clone()).is_ok())
|
||||
/// Returns `true` if there are no more listeners remaining.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.senders.is_empty()
|
||||
}
|
||||
|
||||
fn is_done(&self) -> bool {
|
||||
self.senders.is_empty() || self.is_done
|
||||
}
|
||||
|
||||
/// Transaction was moved to the pending queue.
|
||||
fn pending(&mut self) {
|
||||
self.broadcast(TransactionEvent::Pending)
|
||||
}
|
||||
|
||||
/// Transaction was moved to the queued pool
|
||||
fn queued(&mut self) {
|
||||
self.broadcast(TransactionEvent::Queued)
|
||||
}
|
||||
|
||||
/// Transaction was replaced with the given transaction
|
||||
fn replaced(&mut self, hash: TxHash) {
|
||||
self.broadcast(TransactionEvent::Replaced(hash));
|
||||
self.is_done = true;
|
||||
}
|
||||
|
||||
/// Transaction was mined.
|
||||
fn mined(&mut self, block_hash: H256) {
|
||||
self.broadcast(TransactionEvent::Mined(block_hash));
|
||||
self.is_done = true;
|
||||
}
|
||||
|
||||
/// Transaction was propagated.
|
||||
fn propagated(&mut self, peers: Vec<PropagateKind>) {
|
||||
self.broadcast(TransactionEvent::Propagated(Arc::new(peers)));
|
||||
}
|
||||
|
||||
/// Transaction was replaced with the given transaction
|
||||
fn discarded(&mut self) {
|
||||
self.broadcast(TransactionEvent::Discarded);
|
||||
self.is_done = true;
|
||||
// Broadcast an event to all listeners. Dropped listeners are silently evicted.
|
||||
fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) {
|
||||
self.senders
|
||||
.retain(|sender| sender.send(PoolTransactionEvent::new(tx_hash, event.clone())).is_ok())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,7 +81,6 @@ use crate::{
|
||||
CanonicalStateUpdate, ChangedAccount, PoolConfig, TransactionOrdering, TransactionValidator,
|
||||
};
|
||||
use best::BestTransactions;
|
||||
pub use events::TransactionEvent;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use reth_primitives::{Address, TxHash, H256};
|
||||
use std::{
|
||||
@@ -93,16 +92,19 @@ use std::{
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
|
||||
mod best;
|
||||
mod events;
|
||||
pub use events::{PoolTransactionEvent, TransactionEvent};
|
||||
|
||||
mod listener;
|
||||
pub use listener::{AllTransactionsEvents, TransactionEvents};
|
||||
|
||||
mod best;
|
||||
mod parked;
|
||||
pub(crate) mod pending;
|
||||
pub(crate) mod size;
|
||||
pub(crate) mod state;
|
||||
pub mod txpool;
|
||||
mod update;
|
||||
pub use listener::TransactionEvents;
|
||||
|
||||
/// Transaction pool internals.
|
||||
pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
|
||||
@@ -197,7 +199,9 @@ where
|
||||
}
|
||||
|
||||
/// Adds a new transaction listener to the pool that gets notified about every new transaction
|
||||
pub fn add_transaction_listener(&self) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
|
||||
pub fn add_new_transaction_listener(
|
||||
&self,
|
||||
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
|
||||
const TX_LISTENER_BUFFER_SIZE: usize = 1024;
|
||||
let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
|
||||
self.transaction_listener.lock().push(tx);
|
||||
@@ -218,6 +222,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a listener for all transaction events.
|
||||
pub(crate) fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents {
|
||||
self.event_listener.write().subscribe_all()
|
||||
}
|
||||
|
||||
/// Returns hashes of _all_ transactions in the pool.
|
||||
pub(crate) fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
|
||||
let pool = self.pool.read();
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::{
|
||||
error::PoolResult,
|
||||
pool::{state::SubPool, TransactionEvents},
|
||||
validate::ValidPoolTransaction,
|
||||
AllTransactionsEvents,
|
||||
};
|
||||
use reth_primitives::{
|
||||
Address, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, Transaction,
|
||||
@@ -94,13 +95,16 @@ pub trait TransactionPool: Send + Sync + Clone {
|
||||
/// Returns `None` if the transaction is not in the pool.
|
||||
fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
|
||||
|
||||
/// Returns a new transaction change event stream for _all_ transactions in the pool.
|
||||
fn all_transactions_event_listener(&self) -> AllTransactionsEvents;
|
||||
|
||||
/// Returns a new Stream that yields transactions hashes for new ready transactions.
|
||||
///
|
||||
/// Consumer: RPC
|
||||
fn pending_transactions_listener(&self) -> Receiver<TxHash>;
|
||||
|
||||
/// Returns a new stream that yields new valid transactions added to the pool.
|
||||
fn transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>>;
|
||||
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>>;
|
||||
|
||||
/// Returns the _hashes_ of all transactions in the pool.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user