chore(txpool): enriched pool transaction event (#3653)

This commit is contained in:
Roman Krasiuk
2023-07-07 14:02:20 +03:00
committed by GitHub
parent 5cc55dbc7b
commit 0d76dd762a
6 changed files with 120 additions and 70 deletions

View File

@@ -156,7 +156,7 @@ pub use crate::{
},
error::PoolResult,
ordering::{GasCostOrdering, TransactionOrdering},
pool::{AllTransactionsEvents, PoolTransactionEvent, TransactionEvent, TransactionEvents},
pool::{AllTransactionsEvents, FullTransactionEvent, TransactionEvent, TransactionEvents},
traits::{
AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount,
NewTransactionEvent, PoolSize, PoolTransaction, PooledTransaction, PropagateKind,
@@ -354,7 +354,7 @@ where
self.pool.add_transaction_event_listener(tx_hash)
}
fn all_transactions_event_listener(&self) -> AllTransactionsEvents {
fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
self.pool.add_all_transactions_event_listener()
}

View File

@@ -73,7 +73,7 @@ impl TransactionPool for NoopTransactionPool {
None
}
fn all_transactions_event_listener(&self) -> AllTransactionsEvents {
fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
AllTransactionsEvents { events: mpsc::channel(1).1 }
}

View File

@@ -1,33 +1,49 @@
use crate::traits::PropagateKind;
use crate::{traits::PropagateKind, PoolTransaction, ValidPoolTransaction};
use reth_primitives::{TxHash, H256};
use std::sync::Arc;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
/// Wrapper around a transaction hash and the event that happened to it.
/// An event that happened to a transaction and contains its full body where possible.
#[derive(Debug)]
pub struct PoolTransactionEvent(TxHash, TransactionEvent);
pub enum FullTransactionEvent<T: PoolTransaction> {
/// Transaction has been added to the pending pool.
Pending(TxHash),
/// Transaction has been added to the queued pool.
Queued(TxHash),
/// Transaction has been included in the block belonging to this hash.
Mined {
/// The hash of the mined transaction.
tx_hash: TxHash,
/// The hash of the mined block that contains the transaction.
block_hash: H256,
},
/// Transaction has been replaced by the transaction belonging to the hash.
///
/// E.g. same (sender + nonce) pair
Replaced {
/// The transaction that was replaced.
transaction: Arc<ValidPoolTransaction<T>>,
/// The transaction that replaced the event subject.
replaced_by: TxHash,
},
/// Transaction was dropped due to configured limits.
Discarded(TxHash),
/// Transaction became invalid indefinitely.
Invalid(TxHash),
/// Transaction was propagated to peers.
Propagated(Arc<Vec<PropagateKind>>),
}
impl PoolTransactionEvent {
/// Create a new transaction event.
pub fn new(hash: TxHash, event: TransactionEvent) -> Self {
Self(hash, event)
}
/// The hash of the transaction this event is about.
pub fn hash(&self) -> TxHash {
self.0
}
/// The event that happened to the transaction.
pub fn event(&self) -> &TransactionEvent {
&self.1
}
/// Split the event into its components.
pub fn split(self) -> (TxHash, TransactionEvent) {
(self.0, self.1)
impl<T: PoolTransaction> Clone for FullTransactionEvent<T> {
fn clone(&self) -> Self {
match self {
Self::Replaced { transaction, replaced_by } => {
Self::Replaced { transaction: Arc::clone(transaction), replaced_by: *replaced_by }
}
other => other.clone(),
}
}
}

View File

@@ -1,8 +1,9 @@
//! Listeners for the transaction-pool
use crate::{
pool::events::{PoolTransactionEvent, TransactionEvent},
pool::events::{FullTransactionEvent, TransactionEvent},
traits::PropagateKind,
PoolTransaction, ValidPoolTransaction,
};
use futures_util::Stream;
use reth_primitives::{TxHash, H256};
@@ -24,7 +25,7 @@ const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
#[must_use = "streams do nothing unless polled"]
pub struct TransactionEvents {
hash: TxHash,
events: UnboundedReceiver<PoolTransactionEvent>,
events: UnboundedReceiver<TransactionEvent>,
}
impl TransactionEvents {
@@ -35,7 +36,7 @@ impl TransactionEvents {
}
impl Stream for TransactionEvents {
type Item = PoolTransactionEvent;
type Item = TransactionEvent;
fn poll_next(
self: std::pin::Pin<&mut Self>,
@@ -45,15 +46,15 @@ impl Stream for TransactionEvents {
}
}
/// A Stream that receives [PoolTransactionEvent] for _all_ transaction.
/// A Stream that receives [FullTransactionEvent] for _all_ transaction.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct AllTransactionsEvents {
pub(crate) events: Receiver<PoolTransactionEvent>,
pub struct AllTransactionsEvents<T: PoolTransaction> {
pub(crate) events: Receiver<FullTransactionEvent<T>>,
}
impl Stream for AllTransactionsEvents {
type Item = PoolTransactionEvent;
impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
type Item = FullTransactionEvent<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().events.poll_recv(cx)
@@ -64,20 +65,34 @@ impl Stream for AllTransactionsEvents {
///
/// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to
/// all active receivers.
#[derive(Default, Debug)]
pub(crate) struct PoolEventBroadcast {
#[derive(Debug)]
pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
/// All listeners for all transaction events.
all_events_broadcaster: AllPoolEventsBroadcaster,
all_events_broadcaster: AllPoolEventsBroadcaster<T>,
/// All listeners for events for a certain transaction hash.
broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
}
impl PoolEventBroadcast {
impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
fn default() -> Self {
Self {
all_events_broadcaster: AllPoolEventsBroadcaster::default(),
broadcasters_by_hash: HashMap::default(),
}
}
}
impl<T: PoolTransaction> PoolEventBroadcast<T> {
/// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
fn broadcast_event(&mut self, hash: &TxHash, event: TransactionEvent) {
fn broadcast_event(
&mut self,
hash: &TxHash,
event: TransactionEvent,
pool_event: FullTransactionEvent<T>,
) {
// Broadcast to all listeners for the transaction hash.
if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
sink.get_mut().broadcast(*hash, event.clone());
sink.get_mut().broadcast(event.clone());
if sink.get().is_empty() || event.is_final() {
sink.remove();
@@ -85,7 +100,7 @@ impl PoolEventBroadcast {
}
// Broadcast to all listeners for all transactions.
self.all_events_broadcaster.broadcast(*hash, event);
self.all_events_broadcaster.broadcast(pool_event);
}
/// Create a new subscription for the given transaction hash.
@@ -104,65 +119,83 @@ impl PoolEventBroadcast {
}
/// Create a new subscription for all transactions.
pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents {
pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
self.all_events_broadcaster.senders.push(tx);
AllTransactionsEvents { events: rx }
}
/// Notify listeners about a transaction that was added to the pending queue.
pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) {
self.broadcast_event(tx, TransactionEvent::Pending);
pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
if let Some(replaced) = replaced {
// notify listeners that this transaction was replaced
self.replaced(replaced, tx);
self.replaced(replaced, *tx);
}
}
/// Notify listeners about a transaction that was replaced.
pub(crate) fn replaced(&mut self, tx: &TxHash, replaced_by: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Replaced(*replaced_by));
pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
let transaction = Arc::clone(&tx);
self.broadcast_event(
tx.hash(),
TransactionEvent::Replaced(replaced_by),
FullTransactionEvent::Replaced { transaction, replaced_by },
);
}
/// Notify listeners about a transaction that was added to the queued pool.
pub(crate) fn queued(&mut self, tx: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Queued);
self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
}
/// Notify listeners about a transaction that was propagated.
pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
self.broadcast_event(tx, TransactionEvent::Propagated(Arc::new(peers)));
let peers = Arc::new(peers);
self.broadcast_event(
tx,
TransactionEvent::Propagated(Arc::clone(&peers)),
FullTransactionEvent::Propagated(peers),
);
}
/// Notify listeners about a transaction that was discarded.
pub(crate) fn discarded(&mut self, tx: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Discarded);
self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
}
/// Notify listeners that the transaction was mined
pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: H256) {
self.broadcast_event(tx, TransactionEvent::Mined(block_hash));
self.broadcast_event(
tx,
TransactionEvent::Mined(block_hash),
FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
);
}
}
/// All Sender half(s) of the event channels for all transactions.
///
/// This mimics [tokio::sync::broadcast] but uses separate channels.
#[derive(Default, Debug)]
struct AllPoolEventsBroadcaster {
#[derive(Debug)]
struct AllPoolEventsBroadcaster<T: PoolTransaction> {
/// Corresponding sender half(s) for event listener channel
senders: Vec<Sender<PoolTransactionEvent>>,
senders: Vec<Sender<FullTransactionEvent<T>>>,
}
impl AllPoolEventsBroadcaster {
impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
fn default() -> Self {
Self { senders: Vec::new() }
}
}
impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
// Broadcast an event to all listeners. Dropped listeners are silently evicted.
fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) {
self.senders.retain(|sender| {
match sender.try_send(PoolTransactionEvent::new(tx_hash, event.clone())) {
Ok(_) | Err(TrySendError::Full(_)) => true,
Err(TrySendError::Closed(_)) => false,
}
fn broadcast(&mut self, event: FullTransactionEvent<T>) {
self.senders.retain(|sender| match sender.try_send(event.clone()) {
Ok(_) | Err(TrySendError::Full(_)) => true,
Err(TrySendError::Closed(_)) => false,
})
}
}
@@ -173,7 +206,7 @@ impl AllPoolEventsBroadcaster {
#[derive(Default, Debug)]
struct PoolEventBroadcaster {
/// Corresponding sender half(s) for event listener channel
senders: Vec<UnboundedSender<PoolTransactionEvent>>,
senders: Vec<UnboundedSender<TransactionEvent>>,
}
impl PoolEventBroadcaster {
@@ -183,8 +216,7 @@ impl PoolEventBroadcaster {
}
// Broadcast an event to all listeners. Dropped listeners are silently evicted.
fn broadcast(&mut self, tx_hash: TxHash, event: TransactionEvent) {
self.senders
.retain(|sender| sender.send(PoolTransactionEvent::new(tx_hash, event.clone())).is_ok())
fn broadcast(&mut self, event: TransactionEvent) {
self.senders.retain(|sender| sender.send(event.clone()).is_ok())
}
}

View File

@@ -93,7 +93,7 @@ use tokio::sync::mpsc;
use tracing::debug;
mod events;
pub use events::{PoolTransactionEvent, TransactionEvent};
pub use events::{FullTransactionEvent, TransactionEvent};
mod listener;
pub use listener::{AllTransactionsEvents, TransactionEvents};
@@ -117,7 +117,7 @@ pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
/// Pool settings.
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast>,
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Listeners for new ready transactions.
pending_transaction_listener: Mutex<Vec<mpsc::Sender<TxHash>>>,
/// Listeners for new transactions added to the pool.
@@ -223,7 +223,9 @@ where
}
/// Adds a listener for all transaction events.
pub(crate) fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents {
pub(crate) fn add_all_transactions_event_listener(
&self,
) -> AllTransactionsEvents<T::Transaction> {
self.event_listener.write().subscribe_all()
}
@@ -417,14 +419,14 @@ where
AddedTransaction::Pending(tx) => {
let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
listener.pending(transaction.hash(), replaced.as_ref().map(|tx| tx.hash()));
listener.pending(transaction.hash(), replaced.clone());
promoted.iter().for_each(|tx| listener.pending(tx, None));
discarded.iter().for_each(|tx| listener.discarded(tx));
}
AddedTransaction::Parked { transaction, replaced, .. } => {
listener.queued(transaction.hash());
if let Some(replaced) = replaced {
listener.replaced(replaced.hash(), transaction.hash());
listener.replaced(replaced.clone(), *transaction.hash());
}
}
}

View File

@@ -96,7 +96,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
/// Returns a new transaction change event stream for _all_ transactions in the pool.
fn all_transactions_event_listener(&self) -> AllTransactionsEvents;
fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;
/// Returns a new Stream that yields transactions hashes for new ready transactions.
///