mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
fix(txpool): respect propagate setting in the full tx stream (#4362)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
committed by
GitHub
parent
b7541943b8
commit
03afe376b8
@@ -181,8 +181,8 @@ pub use crate::{
|
||||
traits::{
|
||||
AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount,
|
||||
EthBlobTransactionSidecar, EthPoolTransaction, EthPooledTransaction,
|
||||
GetPooledTransactionLimit, NewTransactionEvent, PendingTransactionListenerKind, PoolSize,
|
||||
PoolTransaction, PropagateKind, PropagatedTransactions, TransactionOrigin, TransactionPool,
|
||||
GetPooledTransactionLimit, NewTransactionEvent, PoolSize, PoolTransaction, PropagateKind,
|
||||
PropagatedTransactions, TransactionListenerKind, TransactionOrigin, TransactionPool,
|
||||
TransactionPoolExt,
|
||||
},
|
||||
validate::{
|
||||
@@ -374,15 +374,15 @@ where
|
||||
self.pool.add_all_transactions_event_listener()
|
||||
}
|
||||
|
||||
fn pending_transactions_listener_for(
|
||||
&self,
|
||||
kind: PendingTransactionListenerKind,
|
||||
) -> Receiver<TxHash> {
|
||||
fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
|
||||
self.pool.add_pending_listener(kind)
|
||||
}
|
||||
|
||||
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
self.pool.add_new_transaction_listener()
|
||||
fn new_transactions_listener_for(
|
||||
&self,
|
||||
kind: TransactionListenerKind,
|
||||
) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
self.pool.add_new_transaction_listener(kind)
|
||||
}
|
||||
|
||||
fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
use crate::{
|
||||
blobstore::BlobStoreError,
|
||||
error::PoolError,
|
||||
traits::{GetPooledTransactionLimit, PendingTransactionListenerKind},
|
||||
traits::{GetPooledTransactionLimit, TransactionListenerKind},
|
||||
validate::ValidTransaction,
|
||||
AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo, EthPooledTransaction,
|
||||
NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PropagatedTransactions,
|
||||
@@ -83,7 +83,7 @@ impl TransactionPool for NoopTransactionPool {
|
||||
|
||||
fn pending_transactions_listener_for(
|
||||
&self,
|
||||
_kind: PendingTransactionListenerKind,
|
||||
_kind: TransactionListenerKind,
|
||||
) -> Receiver<TxHash> {
|
||||
mpsc::channel(1).1
|
||||
}
|
||||
@@ -92,6 +92,13 @@ impl TransactionPool for NoopTransactionPool {
|
||||
mpsc::channel(1).1
|
||||
}
|
||||
|
||||
fn new_transactions_listener_for(
|
||||
&self,
|
||||
_kind: TransactionListenerKind,
|
||||
) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
mpsc::channel(1).1
|
||||
}
|
||||
|
||||
fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Transaction Pool internals.
|
||||
//!
|
||||
//! Incoming transactions validated are before they enter the pool first. The validation outcome can
|
||||
//! Incoming transactions are validated before they enter the pool first. The validation outcome can
|
||||
//! have 3 states:
|
||||
//!
|
||||
//! 1. Transaction can _never_ be valid
|
||||
@@ -103,7 +103,7 @@ use crate::{
|
||||
blobstore::BlobStore,
|
||||
metrics::BlobStoreMetrics,
|
||||
pool::txpool::UpdateOutcome,
|
||||
traits::{GetPooledTransactionLimit, PendingTransactionListenerKind},
|
||||
traits::{GetPooledTransactionLimit, TransactionListenerKind},
|
||||
validate::ValidTransaction,
|
||||
};
|
||||
pub use listener::{AllTransactionsEvents, TransactionEvents};
|
||||
@@ -137,7 +137,7 @@ where
|
||||
/// Listeners for new pending transactions.
|
||||
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
|
||||
/// Listeners for new transactions added to the pool.
|
||||
transaction_listener: Mutex<Vec<mpsc::Sender<NewTransactionEvent<T::Transaction>>>>,
|
||||
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
|
||||
/// Metrics for the blob store
|
||||
blob_store_metrics: BlobStoreMetrics,
|
||||
}
|
||||
@@ -222,10 +222,7 @@ where
|
||||
|
||||
/// Adds a new transaction listener to the pool that gets notified about every new _pending_
|
||||
/// transaction inserted into the pool
|
||||
pub fn add_pending_listener(
|
||||
&self,
|
||||
kind: PendingTransactionListenerKind,
|
||||
) -> mpsc::Receiver<TxHash> {
|
||||
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
|
||||
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
|
||||
let (sender, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
|
||||
let listener = PendingTransactionListener { sender, kind };
|
||||
@@ -236,10 +233,12 @@ where
|
||||
/// Adds a new transaction listener to the pool that gets notified about every new transaction.
|
||||
pub fn add_new_transaction_listener(
|
||||
&self,
|
||||
kind: TransactionListenerKind,
|
||||
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
|
||||
const TX_LISTENER_BUFFER_SIZE: usize = 1024;
|
||||
let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
|
||||
self.transaction_listener.lock().push(tx);
|
||||
let (sender, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
|
||||
let listener = TransactionListener { sender, kind };
|
||||
self.transaction_listener.lock().push(listener);
|
||||
rx
|
||||
}
|
||||
|
||||
@@ -517,18 +516,25 @@ where
|
||||
/// Notify all listeners about a newly inserted pending transaction.
|
||||
fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
|
||||
let mut transaction_listeners = self.transaction_listener.lock();
|
||||
transaction_listeners.retain_mut(|listener| {
|
||||
if listener.kind.is_propagate_only() && !event.transaction.propagate {
|
||||
// only emit this hash to listeners that are only allowed to receive propagate only
|
||||
// transactions, such as network
|
||||
return !listener.sender.is_closed()
|
||||
}
|
||||
|
||||
transaction_listeners.retain_mut(|listener| match listener.try_send(event.clone()) {
|
||||
Ok(()) => true,
|
||||
Err(err) => {
|
||||
if matches!(err, mpsc::error::TrySendError::Full(_)) {
|
||||
debug!(
|
||||
target: "txpool",
|
||||
"skipping transaction on full transaction listener",
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
match listener.sender.try_send(event.clone()) {
|
||||
Ok(()) => true,
|
||||
Err(err) => {
|
||||
if matches!(err, mpsc::error::TrySendError::Full(_)) {
|
||||
debug!(
|
||||
target: "txpool",
|
||||
"skipping transaction on full transaction listener",
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -742,7 +748,15 @@ impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
|
||||
struct PendingTransactionListener {
|
||||
sender: mpsc::Sender<TxHash>,
|
||||
/// Whether to include transactions that should not be propagated over the network.
|
||||
kind: PendingTransactionListenerKind,
|
||||
kind: TransactionListenerKind,
|
||||
}
|
||||
|
||||
/// An active listener for new pending transactions.
|
||||
#[derive(Debug)]
|
||||
struct TransactionListener<T: PoolTransaction> {
|
||||
sender: mpsc::Sender<NewTransactionEvent<T>>,
|
||||
/// Whether to include transactions that should not be propagated over the network.
|
||||
kind: TransactionListenerKind,
|
||||
}
|
||||
|
||||
/// Tracks an added transaction and all graph changes caused by adding it.
|
||||
@@ -754,19 +768,19 @@ pub struct AddedPendingTransaction<T: PoolTransaction> {
|
||||
replaced: Option<Arc<ValidPoolTransaction<T>>>,
|
||||
/// transactions promoted to the pending queue
|
||||
promoted: Vec<Arc<ValidPoolTransaction<T>>>,
|
||||
/// transaction that failed and became discarded
|
||||
/// transactions that failed and became discarded
|
||||
discarded: Vec<Arc<ValidPoolTransaction<T>>>,
|
||||
}
|
||||
|
||||
impl<T: PoolTransaction> AddedPendingTransaction<T> {
|
||||
/// Returns all transactions that were promoted to the pending pool and adhere to the given
|
||||
/// [PendingTransactionListenerKind].
|
||||
/// [TransactionListenerKind].
|
||||
///
|
||||
/// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that
|
||||
/// If the kind is [TransactionListenerKind::PropagateOnly], then only transactions that
|
||||
/// are allowed to be propagated are returned.
|
||||
pub(crate) fn pending_transactions(
|
||||
&self,
|
||||
kind: PendingTransactionListenerKind,
|
||||
kind: TransactionListenerKind,
|
||||
) -> impl Iterator<Item = H256> + '_ {
|
||||
let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
|
||||
PendingTransactionIter { kind, iter }
|
||||
@@ -779,7 +793,7 @@ impl<T: PoolTransaction> AddedPendingTransaction<T> {
|
||||
}
|
||||
|
||||
pub(crate) struct PendingTransactionIter<Iter> {
|
||||
kind: PendingTransactionListenerKind,
|
||||
kind: TransactionListenerKind,
|
||||
iter: Iter,
|
||||
}
|
||||
|
||||
@@ -876,13 +890,13 @@ pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
|
||||
|
||||
impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
|
||||
/// Returns all transactions that were promoted to the pending pool and adhere to the given
|
||||
/// [PendingTransactionListenerKind].
|
||||
/// [TransactionListenerKind].
|
||||
///
|
||||
/// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that
|
||||
/// If the kind is [TransactionListenerKind::PropagateOnly], then only transactions that
|
||||
/// are allowed to be propagated are returned.
|
||||
pub(crate) fn pending_transactions(
|
||||
&self,
|
||||
kind: PendingTransactionListenerKind,
|
||||
kind: TransactionListenerKind,
|
||||
) -> impl Iterator<Item = H256> + '_ {
|
||||
let iter = self.promoted.iter();
|
||||
PendingTransactionIter { kind, iter }
|
||||
|
||||
@@ -118,18 +118,24 @@ pub trait TransactionPool: Send + Sync + Clone {
|
||||
///
|
||||
/// Consumer: RPC/P2P
|
||||
fn pending_transactions_listener(&self) -> Receiver<TxHash> {
|
||||
self.pending_transactions_listener_for(PendingTransactionListenerKind::PropagateOnly)
|
||||
self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
|
||||
}
|
||||
|
||||
/// Returns a new Stream that yields transactions hashes for new __pending__ transactions
|
||||
/// inserted into the pool depending on the given [PendingTransactionListenerKind] argument.
|
||||
fn pending_transactions_listener_for(
|
||||
&self,
|
||||
kind: PendingTransactionListenerKind,
|
||||
) -> Receiver<TxHash>;
|
||||
/// inserted into the pool depending on the given [TransactionListenerKind] argument.
|
||||
fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
|
||||
|
||||
/// Returns a new stream that yields new valid transactions added to the pool.
|
||||
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>>;
|
||||
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
|
||||
self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
|
||||
}
|
||||
|
||||
/// Returns a new stream that yields new valid transactions added to the pool
|
||||
/// depending on the given [TransactionListenerKind] argument.
|
||||
fn new_transactions_listener_for(
|
||||
&self,
|
||||
kind: TransactionListenerKind,
|
||||
) -> Receiver<NewTransactionEvent<Self::Transaction>>;
|
||||
|
||||
/// Returns a new Stream that yields new transactions added to the pending sub-pool.
|
||||
///
|
||||
@@ -138,7 +144,10 @@ pub trait TransactionPool: Send + Sync + Clone {
|
||||
fn new_pending_pool_transactions_listener(
|
||||
&self,
|
||||
) -> NewSubpoolTransactionStream<Self::Transaction> {
|
||||
NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Pending)
|
||||
NewSubpoolTransactionStream::new(
|
||||
self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
|
||||
SubPool::Pending,
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns a new Stream that yields new transactions added to the basefee sub-pool.
|
||||
@@ -326,12 +335,11 @@ pub trait TransactionPoolExt: TransactionPool {
|
||||
fn delete_blobs(&self, txs: Vec<H256>);
|
||||
}
|
||||
|
||||
/// Determines what kind of new pending transactions should be emitted by a stream of pending
|
||||
/// transactions.
|
||||
/// Determines what kind of new transactions should be emitted by a stream of transactions.
|
||||
///
|
||||
/// This gives control whether to include transactions that are allowed to be propagated.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum PendingTransactionListenerKind {
|
||||
pub enum TransactionListenerKind {
|
||||
/// Any new pending transactions
|
||||
All,
|
||||
/// Only transactions that are allowed to be propagated.
|
||||
@@ -340,7 +348,7 @@ pub enum PendingTransactionListenerKind {
|
||||
PropagateOnly,
|
||||
}
|
||||
|
||||
impl PendingTransactionListenerKind {
|
||||
impl TransactionListenerKind {
|
||||
/// Returns true if we're only interested in transactions that are allowed to be propagated.
|
||||
#[inline]
|
||||
pub fn is_propagate_only(&self) -> bool {
|
||||
|
||||
@@ -2,7 +2,7 @@ use assert_matches::assert_matches;
|
||||
use reth_transaction_pool::{
|
||||
noop::MockTransactionValidator,
|
||||
test_utils::{testing_pool, testing_pool_with_validator, MockTransactionFactory},
|
||||
FullTransactionEvent, PendingTransactionListenerKind, TransactionEvent, TransactionOrigin,
|
||||
FullTransactionEvent, TransactionEvent, TransactionListenerKind, TransactionOrigin,
|
||||
TransactionPool,
|
||||
};
|
||||
use std::{future::poll_fn, task::Poll};
|
||||
@@ -48,8 +48,7 @@ async fn txpool_listener_propagate_only() {
|
||||
let transaction = mock_tx_factory.create_eip1559();
|
||||
let expected = *transaction.hash();
|
||||
let mut listener_network = txpool.pending_transactions_listener();
|
||||
let mut listener_all =
|
||||
txpool.pending_transactions_listener_for(PendingTransactionListenerKind::All);
|
||||
let mut listener_all = txpool.pending_transactions_listener_for(TransactionListenerKind::All);
|
||||
let result =
|
||||
txpool.add_transaction(TransactionOrigin::Local, transaction.transaction.clone()).await;
|
||||
assert!(result.is_ok());
|
||||
@@ -64,3 +63,27 @@ async fn txpool_listener_propagate_only() {
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn txpool_listener_new_propagate_only() {
|
||||
let txpool = testing_pool_with_validator(MockTransactionValidator::no_propagate_local());
|
||||
let mut mock_tx_factory = MockTransactionFactory::default();
|
||||
let transaction = mock_tx_factory.create_eip1559();
|
||||
let expected = *transaction.hash();
|
||||
let mut listener_network = txpool.new_transactions_listener();
|
||||
let mut listener_all = txpool.new_transactions_listener_for(TransactionListenerKind::All);
|
||||
let result =
|
||||
txpool.add_transaction(TransactionOrigin::Local, transaction.transaction.clone()).await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
let inserted = listener_all.recv().await.unwrap();
|
||||
let actual = *inserted.transaction.hash();
|
||||
assert_eq!(actual, expected);
|
||||
|
||||
poll_fn(|cx| {
|
||||
// no propagation
|
||||
assert!(listener_network.poll_recv(cx).is_pending());
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user