feat(rpc): propagate TransactionOrigin through send_transaction and batcher (#21969)

Co-authored-by: klkvr <klkvr@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Georgios Konstantopoulos
2026-02-09 12:34:23 -08:00
committed by GitHub
parent f2061991c5
commit 32466fe223
12 changed files with 77 additions and 52 deletions

View File

@@ -82,13 +82,15 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
) -> impl Future<Output = Result<B256, Self::Error>> + Send {
async move {
let recovered = recover_raw_transaction::<PoolPooledTx<Self::Pool>>(&tx)?;
self.send_transaction(WithEncoded::new(tx, recovered)).await
self.send_transaction(TransactionOrigin::External, WithEncoded::new(tx, recovered))
.await
}
}
/// Submits the transaction to the pool.
/// Submits the transaction to the pool with the given [`TransactionOrigin`].
fn send_transaction(
&self,
origin: TransactionOrigin,
tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
) -> impl Future<Output = Result<B256, Self::Error>> + Send;

View File

@@ -556,10 +556,11 @@ where
#[inline]
pub async fn add_pool_transaction(
&self,
origin: reth_transaction_pool::TransactionOrigin,
transaction: <N::Pool as TransactionPool>::Transaction,
) -> Result<AddedTransactionOutcome, EthApiError> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = reth_transaction_pool::BatchTxRequest::new(transaction, response_tx);
let request = reth_transaction_pool::BatchTxRequest::new(origin, transaction, response_tx);
self.tx_batch_sender()
.send(request)

View File

@@ -38,6 +38,7 @@ where
async fn send_transaction(
&self,
origin: reth_transaction_pool::TransactionOrigin,
tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
) -> Result<B256, Self::Error> {
let (tx, recovered) = tx.split();
@@ -106,7 +107,7 @@ where
}).map_err(EthApiError::other)?;
// Retain tx in local tx pool after forwarding, for local RPC usage.
let _ = self.inner.add_pool_transaction(pool_transaction).await;
let _ = self.inner.add_pool_transaction(origin, pool_transaction).await;
return Ok(hash);
}
@@ -114,9 +115,8 @@ where
// broadcast raw transaction to subscribers if there is any.
self.broadcast_raw_transaction(tx);
// submit the transaction to the pool with a `Local` origin
let AddedTransactionOutcome { hash, .. } =
self.inner.add_pool_transaction(pool_transaction).await?;
self.inner.add_pool_transaction(origin, pool_transaction).await?;
Ok(hash)
}

View File

@@ -94,7 +94,8 @@ fn txpool_batch_insertion(c: &mut Criterion) {
let mut response_futures = Vec::with_capacity(tx_count);
for tx in txs {
let (response_tx, response_rx) = oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request =
BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
batch_requests.push(request);
response_futures.push(response_rx);
}

View File

@@ -15,10 +15,10 @@ use std::{
use tokio::sync::{mpsc, oneshot};
/// A single batch transaction request
/// All transactions processed through the batcher are considered local
/// transactions (`TransactionOrigin::Local`) when inserted into the pool.
#[derive(Debug)]
pub struct BatchTxRequest<T: PoolTransaction> {
/// Origin of the transaction (e.g. Local, External)
origin: TransactionOrigin,
/// Tx to be inserted in to the pool
pool_tx: T,
/// Channel to send result back to caller
@@ -31,10 +31,11 @@ where
{
/// Create a new batch transaction request
pub const fn new(
origin: TransactionOrigin,
pool_tx: T,
response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
) -> Self {
Self { pool_tx, response_tx }
Self { origin, pool_tx, response_tx }
}
}
@@ -66,24 +67,24 @@ where
}
async fn process_request(pool: &Pool, req: BatchTxRequest<Pool::Transaction>) {
let BatchTxRequest { pool_tx, response_tx } = req;
let pool_result = pool.add_transaction(TransactionOrigin::Local, pool_tx).await;
let BatchTxRequest { origin, pool_tx, response_tx } = req;
let pool_result = pool.add_transaction(origin, pool_tx).await;
let _ = response_tx.send(pool_result);
}
/// Process a batch of transaction requests, grouped by origin
async fn process_batch(pool: &Pool, mut batch: Vec<BatchTxRequest<Pool::Transaction>>) {
/// Process a batch of transaction requests with per-transaction origins
async fn process_batch(pool: &Pool, batch: Vec<BatchTxRequest<Pool::Transaction>>) {
if batch.len() == 1 {
Self::process_request(pool, batch.remove(0)).await;
Self::process_request(pool, batch.into_iter().next().expect("batch is not empty"))
.await;
return
}
let (pool_transactions, response_tx): (Vec<_>, Vec<_>) =
batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip();
let (transactions, response_txs): (Vec<_>, Vec<_>) =
batch.into_iter().map(|req| ((req.origin, req.pool_tx), req.response_tx)).unzip();
let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await;
for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) {
let pool_results = pool.add_transactions_with_origins(transactions).await;
for (response_tx, pool_result) in response_txs.into_iter().zip(pool_results) {
let _ = response_tx.send(pool_result);
}
}
@@ -138,7 +139,7 @@ mod tests {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
batch_requests.push(BatchTxRequest::new(tx, response_tx));
batch_requests.push(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx));
responses.push(response_rx);
}
@@ -167,7 +168,9 @@ mod tests {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx");
request_tx
.send(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx))
.expect("Could not send batch tx");
responses.push(response_rx);
}
@@ -197,7 +200,7 @@ mod tests {
for i in 0..10 {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
request_tx.send(request).expect("Could not send batch tx");
results.push(response_rx);
}
@@ -225,7 +228,7 @@ mod tests {
for i in 0..max_batch_size {
let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
let request_tx_clone = request_tx.clone();
let tx_fut = async move {

View File

@@ -389,17 +389,6 @@ where
self.pool.validator().validate_transaction(origin, transaction).await
}
/// Returns future that validates all transactions in the given iterator.
///
/// This returns the validated transactions in the iterator's order.
async fn validate_all(
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = V::Transaction> + Send,
) -> Vec<TransactionValidationOutcome<V::Transaction>> {
self.pool.validator().validate_transactions_with_origin(origin, transactions).await
}
/// Number of transactions in the entire pool
pub fn len(&self) -> usize {
self.pool.len()
@@ -513,17 +502,17 @@ where
results.pop().expect("result length is the same as the input")
}
async fn add_transactions(
async fn add_transactions_with_origins(
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
let transactions: Vec<_> = transactions.into_iter().collect();
if transactions.is_empty() {
return Vec::new()
}
let validated = self.validate_all(origin, transactions).await;
self.pool.add_transactions(origin, validated.into_iter())
let origins: Vec<_> = transactions.iter().map(|(origin, _)| *origin).collect();
let validated = self.pool.validator().validate_transactions(transactions).await;
self.pool.add_transactions_with_origins(origins.into_iter().zip(validated))
}
fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {

View File

@@ -84,14 +84,13 @@ impl<T: EthPoolTransaction> TransactionPool for NoopTransactionPool<T> {
Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction))))
}
async fn add_transactions(
async fn add_transactions_with_origins(
&self,
_origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
transactions
.into_iter()
.map(|transaction| {
.map(|(_, transaction)| {
let hash = *transaction.hash();
Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction))))
})

View File

@@ -630,10 +630,24 @@ where
}
/// Adds all transactions in the iterator to the pool, returning a list of results.
///
/// Convenience method that assigns the same origin to all transactions. Delegates to
/// [`Self::add_transactions_with_origins`].
pub fn add_transactions(
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
}
/// Adds all transactions in the iterator to the pool, each with its own
/// [`TransactionOrigin`], returning a list of results.
pub fn add_transactions_with_origins(
&self,
transactions: impl IntoIterator<
Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
>,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
// Collect results and metadata while holding the pool write lock
let (mut results, added_metas, discarded) = {
@@ -642,7 +656,7 @@ where
let results = transactions
.into_iter()
.map(|tx| {
.map(|(origin, tx)| {
let (result, meta) = self.add_transaction(&mut pool, origin, tx);
// Only collect metadata for successful insertions

View File

@@ -175,6 +175,20 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send {
self.add_transactions_with_origins(transactions.into_iter().map(move |tx| (origin, tx)))
}
/// Adds the given _unvalidated_ transactions into the pool.
///
/// Each transaction is paired with its own [`TransactionOrigin`].
///
/// Returns a list of results.
///
/// Consumer: RPC
fn add_transactions_with_origins(
&self,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send;
/// Submit a consensus transaction directly to the pool

View File

@@ -743,7 +743,7 @@ where
/// Validates all given transactions.
fn validate_batch(
&self,
transactions: Vec<(TransactionOrigin, Tx)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Tx)>,
) -> Vec<TransactionValidationOutcome<Tx>> {
let mut provider = None;
transactions
@@ -853,7 +853,7 @@ where
async fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
self.validate_batch(transactions)
}

View File

@@ -212,7 +212,7 @@ pub trait TransactionValidator: Debug + Send + Sync {
/// See also [`Self::validate_transaction`].
fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> impl Future<Output = Vec<TransactionValidationOutcome<Self::Transaction>>> + Send {
futures_util::future::join_all(
transactions.into_iter().map(|(origin, tx)| self.validate_transaction(origin, tx)),
@@ -260,7 +260,7 @@ where
async fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
match self {
Self::Left(v) => v.validate_transactions(transactions).await,

View File

@@ -254,8 +254,9 @@ where
async fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
let transactions: Vec<_> = transactions.into_iter().collect();
let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
let (tx, rx) = oneshot::channel();
{
@@ -300,7 +301,8 @@ where
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = Self::Transaction> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
let transactions: Vec<_> = transactions.into_iter().map(|tx| (origin, tx)).collect();
self.validate_transactions(transactions).await
}
fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {