diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 7c93b56169..f24b134f4a 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -83,6 +83,7 @@ pub use crate::{ config::PoolConfig, ordering::{CostOrdering, TransactionOrdering}, + pool::TransactionEvents, traits::{ AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount, PoolTransaction, PooledTransaction, PropagateKind, PropagatedTransactions, @@ -94,7 +95,7 @@ pub use crate::{ }, }; use crate::{ - error::{PoolError, PoolResult}, + error::PoolResult, pool::PoolInner, traits::{NewTransactionEvent, PoolSize}, }; @@ -249,24 +250,22 @@ where self.pool.on_canonical_state_change(update); } + async fn add_transaction_and_subscribe( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> PoolResult { + let (_, tx) = self.validate(origin, transaction).await; + self.pool.add_transaction_and_subscribe(origin, tx) + } + async fn add_transaction( &self, origin: TransactionOrigin, transaction: Self::Transaction, ) -> PoolResult { let (_, tx) = self.validate(origin, transaction).await; - - match tx { - TransactionValidationOutcome::Valid { .. } => { - self.pool.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed") - } - TransactionValidationOutcome::Invalid(transaction, error) => { - Err(PoolError::InvalidTransaction(*transaction.hash(), error)) - } - TransactionValidationOutcome::Error(transaction, error) => { - Err(PoolError::Other(*transaction.hash(), error)) - } - } + self.pool.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed") } async fn add_transactions( diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index 7ee45ef834..39950f7a4c 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -1,9 +1,39 @@ //! Listeners for the transaction-pool use crate::{pool::events::TransactionEvent, traits::PropagateKind}; +use futures_util::Stream; use reth_primitives::{TxHash, H256}; -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::mpsc::UnboundedSender; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, +}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +/// A Stream that receives [TransactionEvent] for the transactions +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct TransactionEvents { + hash: TxHash, + events: UnboundedReceiver, +} + +impl TransactionEvents { + /// The hash for this transaction + pub fn hash(&self) -> TxHash { + self.hash + } +} + +impl Stream for TransactionEvents { + type Item = TransactionEvent; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().events.poll_recv(cx) + } +} type EventBroadcast = UnboundedSender; @@ -35,6 +65,21 @@ impl PoolEventBroadcast { } } + /// Create a new subscription for the given transaction hash. + pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + match self.broadcasters.entry(tx_hash) { + Entry::Occupied(mut entry) => { + entry.get_mut().senders.push(tx); + } + Entry::Vacant(entry) => { + entry.insert(PoolEventBroadcaster { is_done: false, senders: vec![tx] }); + } + }; + TransactionEvents { hash: tx_hash, events: rx } + } + /// Notify listeners about a transaction that was added to the pending queue. pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) { self.broadcast_with(tx, |notifier| notifier.pending()); diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index c79af8d795..10ae43b6c7 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -102,6 +102,7 @@ pub(crate) mod size; pub(crate) mod state; pub mod txpool; mod update; +pub use listener::TransactionEvents; /// Transaction pool internals. pub struct PoolInner { @@ -292,6 +293,19 @@ where } } + pub(crate) fn add_transaction_and_subscribe( + &self, + origin: TransactionOrigin, + tx: TransactionValidationOutcome, + ) -> PoolResult { + let listener = { + let mut listener = self.event_listener.write(); + listener.subscribe(tx.tx_hash()) + }; + self.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed")?; + Ok(listener) + } + /// Adds all transactions in the iterator to the pool, returning a list of results. pub fn add_transactions( &self, diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index a7b725e407..aa5891d52b 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1,4 +1,8 @@ -use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction}; +use crate::{ + error::PoolResult, + pool::{state::SubPool, TransactionEvents}, + validate::ValidPoolTransaction, +}; use reth_primitives::{ Address, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, Transaction, TransactionKind, TransactionSignedEcRecovered, TxHash, EIP1559_TX_TYPE_ID, H256, U256, @@ -60,6 +64,18 @@ pub trait TransactionPool: Send + Sync + Clone { self.add_transactions(TransactionOrigin::External, transactions).await } + /// Adds an _unvalidated_ transaction into the pool and subscribe to state changes. + /// + /// This is the same as [TransactionPool::add_transaction] but returns an event stream for the + /// given transaction. + /// + /// Consumer: Custom + async fn add_transaction_and_subscribe( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> PoolResult; + /// Adds an _unvalidated_ transaction into the pool. /// /// Consumer: RPC diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index cf079ae24c..f8705b324e 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -33,6 +33,22 @@ pub enum TransactionValidationOutcome { Error(T, Box), } +impl TransactionValidationOutcome { + /// Returns the transaction that was validated. + pub fn transaction(&self) -> &T { + match self { + Self::Valid { transaction, .. } => transaction, + Self::Invalid(transaction, ..) => transaction, + Self::Error(transaction, ..) => transaction, + } + } + + /// Returns the hash of the transactions + pub fn tx_hash(&self) -> TxHash { + *self.transaction().hash() + } +} + /// Provides support for validating transaction at any given state of the chain #[async_trait::async_trait] pub trait TransactionValidator: Send + Sync {