diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index 0a07751fa6..caeb77c139 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -82,13 +82,15 @@ pub trait EthTransactions: LoadTransaction { ) -> impl Future> + Send { async move { let recovered = recover_raw_transaction::>(&tx)?; - self.send_transaction(WithEncoded::new(tx, recovered)).await + self.send_transaction(TransactionOrigin::External, WithEncoded::new(tx, recovered)) + .await } } - /// Submits the transaction to the pool. + /// Submits the transaction to the pool with the given [`TransactionOrigin`]. fn send_transaction( &self, + origin: TransactionOrigin, tx: WithEncoded>>, ) -> impl Future> + Send; diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 28fc38e078..3e05c38814 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -556,10 +556,11 @@ where #[inline] pub async fn add_pool_transaction( &self, + origin: reth_transaction_pool::TransactionOrigin, transaction: ::Transaction, ) -> Result { let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - let request = reth_transaction_pool::BatchTxRequest::new(transaction, response_tx); + let request = reth_transaction_pool::BatchTxRequest::new(origin, transaction, response_tx); self.tx_batch_sender() .send(request) diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index 5aee138735..3391ba6ed2 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -38,6 +38,7 @@ where async fn send_transaction( &self, + origin: reth_transaction_pool::TransactionOrigin, tx: WithEncoded>>, ) -> Result { let (tx, recovered) = tx.split(); @@ -106,7 +107,7 @@ where }).map_err(EthApiError::other)?; // Retain tx in local tx pool after forwarding, for local RPC usage. - let _ = self.inner.add_pool_transaction(pool_transaction).await; + let _ = self.inner.add_pool_transaction(origin, pool_transaction).await; return Ok(hash); } @@ -114,9 +115,8 @@ where // broadcast raw transaction to subscribers if there is any. self.broadcast_raw_transaction(tx); - // submit the transaction to the pool with a `Local` origin let AddedTransactionOutcome { hash, .. } = - self.inner.add_pool_transaction(pool_transaction).await?; + self.inner.add_pool_transaction(origin, pool_transaction).await?; Ok(hash) } diff --git a/crates/transaction-pool/benches/insertion.rs b/crates/transaction-pool/benches/insertion.rs index dc90d47366..058b5ad9e2 100644 --- a/crates/transaction-pool/benches/insertion.rs +++ b/crates/transaction-pool/benches/insertion.rs @@ -94,7 +94,8 @@ fn txpool_batch_insertion(c: &mut Criterion) { let mut response_futures = Vec::with_capacity(tx_count); for tx in txs { let (response_tx, response_rx) = oneshot::channel(); - let request = BatchTxRequest::new(tx, response_tx); + let request = + BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx); batch_requests.push(request); response_futures.push(response_rx); } diff --git a/crates/transaction-pool/src/batcher.rs b/crates/transaction-pool/src/batcher.rs index 75280e68b3..d1f6c1e453 100644 --- a/crates/transaction-pool/src/batcher.rs +++ b/crates/transaction-pool/src/batcher.rs @@ -15,10 +15,10 @@ use std::{ use tokio::sync::{mpsc, oneshot}; /// A single batch transaction request -/// All transactions processed through the batcher are considered local -/// transactions (`TransactionOrigin::Local`) when inserted into the pool. #[derive(Debug)] pub struct BatchTxRequest { + /// Origin of the transaction (e.g. Local, External) + origin: TransactionOrigin, /// Tx to be inserted in to the pool pool_tx: T, /// Channel to send result back to caller @@ -31,10 +31,11 @@ where { /// Create a new batch transaction request pub const fn new( + origin: TransactionOrigin, pool_tx: T, response_tx: oneshot::Sender>, ) -> Self { - Self { pool_tx, response_tx } + Self { origin, pool_tx, response_tx } } } @@ -66,24 +67,24 @@ where } async fn process_request(pool: &Pool, req: BatchTxRequest) { - let BatchTxRequest { pool_tx, response_tx } = req; - let pool_result = pool.add_transaction(TransactionOrigin::Local, pool_tx).await; + let BatchTxRequest { origin, pool_tx, response_tx } = req; + let pool_result = pool.add_transaction(origin, pool_tx).await; let _ = response_tx.send(pool_result); } - /// Process a batch of transaction requests, grouped by origin - async fn process_batch(pool: &Pool, mut batch: Vec>) { + /// Process a batch of transaction requests with per-transaction origins + async fn process_batch(pool: &Pool, batch: Vec>) { if batch.len() == 1 { - Self::process_request(pool, batch.remove(0)).await; + Self::process_request(pool, batch.into_iter().next().expect("batch is not empty")) + .await; return } - let (pool_transactions, response_tx): (Vec<_>, Vec<_>) = - batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip(); + let (transactions, response_txs): (Vec<_>, Vec<_>) = + batch.into_iter().map(|req| ((req.origin, req.pool_tx), req.response_tx)).unzip(); - let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await; - - for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) { + let pool_results = pool.add_transactions_with_origins(transactions).await; + for (response_tx, pool_result) in response_txs.into_iter().zip(pool_results) { let _ = response_tx.send(pool_result); } } @@ -138,7 +139,7 @@ mod tests { let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - batch_requests.push(BatchTxRequest::new(tx, response_tx)); + batch_requests.push(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx)); responses.push(response_rx); } @@ -167,7 +168,9 @@ mod tests { let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx"); + request_tx + .send(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx)) + .expect("Could not send batch tx"); responses.push(response_rx); } @@ -197,7 +200,7 @@ mod tests { for i in 0..10 { let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - let request = BatchTxRequest::new(tx, response_tx); + let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx); request_tx.send(request).expect("Could not send batch tx"); results.push(response_rx); } @@ -225,7 +228,7 @@ mod tests { for i in 0..max_batch_size { let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - let request = BatchTxRequest::new(tx, response_tx); + let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx); let request_tx_clone = request_tx.clone(); let tx_fut = async move { diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 38c64d2fe2..b47da0caf5 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -389,17 +389,6 @@ where self.pool.validator().validate_transaction(origin, transaction).await } - /// Returns future that validates all transactions in the given iterator. - /// - /// This returns the validated transactions in the iterator's order. - async fn validate_all( - &self, - origin: TransactionOrigin, - transactions: impl IntoIterator + Send, - ) -> Vec> { - self.pool.validator().validate_transactions_with_origin(origin, transactions).await - } - /// Number of transactions in the entire pool pub fn len(&self) -> usize { self.pool.len() @@ -513,17 +502,17 @@ where results.pop().expect("result length is the same as the input") } - async fn add_transactions( + async fn add_transactions_with_origins( &self, - origin: TransactionOrigin, - transactions: Vec, + transactions: impl IntoIterator + Send, ) -> Vec> { + let transactions: Vec<_> = transactions.into_iter().collect(); if transactions.is_empty() { return Vec::new() } - let validated = self.validate_all(origin, transactions).await; - - self.pool.add_transactions(origin, validated.into_iter()) + let origins: Vec<_> = transactions.iter().map(|(origin, _)| *origin).collect(); + let validated = self.pool.validator().validate_transactions(transactions).await; + self.pool.add_transactions_with_origins(origins.into_iter().zip(validated)) } fn transaction_event_listener(&self, tx_hash: TxHash) -> Option { diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index 2a64a42044..384e44f565 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -84,14 +84,13 @@ impl TransactionPool for NoopTransactionPool { Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction)))) } - async fn add_transactions( + async fn add_transactions_with_origins( &self, - _origin: TransactionOrigin, - transactions: Vec, + transactions: impl IntoIterator + Send, ) -> Vec> { transactions .into_iter() - .map(|transaction| { + .map(|(_, transaction)| { let hash = *transaction.hash(); Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction)))) }) diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 2bf275fbe8..e6114e7e36 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -630,10 +630,24 @@ where } /// Adds all transactions in the iterator to the pool, returning a list of results. + /// + /// Convenience method that assigns the same origin to all transactions. Delegates to + /// [`Self::add_transactions_with_origins`]. pub fn add_transactions( &self, origin: TransactionOrigin, transactions: impl IntoIterator>, + ) -> Vec> { + self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx))) + } + + /// Adds all transactions in the iterator to the pool, each with its own + /// [`TransactionOrigin`], returning a list of results. + pub fn add_transactions_with_origins( + &self, + transactions: impl IntoIterator< + Item = (TransactionOrigin, TransactionValidationOutcome), + >, ) -> Vec> { // Collect results and metadata while holding the pool write lock let (mut results, added_metas, discarded) = { @@ -642,7 +656,7 @@ where let results = transactions .into_iter() - .map(|tx| { + .map(|(origin, tx)| { let (result, meta) = self.add_transaction(&mut pool, origin, tx); // Only collect metadata for successful insertions diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index d28529a3ff..cae0418af2 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -175,6 +175,20 @@ pub trait TransactionPool: Clone + Debug + Send + Sync { &self, origin: TransactionOrigin, transactions: Vec, + ) -> impl Future>> + Send { + self.add_transactions_with_origins(transactions.into_iter().map(move |tx| (origin, tx))) + } + + /// Adds the given _unvalidated_ transactions into the pool. + /// + /// Each transaction is paired with its own [`TransactionOrigin`]. + /// + /// Returns a list of results. + /// + /// Consumer: RPC + fn add_transactions_with_origins( + &self, + transactions: impl IntoIterator + Send, ) -> impl Future>> + Send; /// Submit a consensus transaction directly to the pool diff --git a/crates/transaction-pool/src/validate/eth.rs b/crates/transaction-pool/src/validate/eth.rs index 8dd22adc07..69cd89c414 100644 --- a/crates/transaction-pool/src/validate/eth.rs +++ b/crates/transaction-pool/src/validate/eth.rs @@ -743,7 +743,7 @@ where /// Validates all given transactions. fn validate_batch( &self, - transactions: Vec<(TransactionOrigin, Tx)>, + transactions: impl IntoIterator, ) -> Vec> { let mut provider = None; transactions @@ -853,7 +853,7 @@ where async fn validate_transactions( &self, - transactions: Vec<(TransactionOrigin, Self::Transaction)>, + transactions: impl IntoIterator + Send, ) -> Vec> { self.validate_batch(transactions) } diff --git a/crates/transaction-pool/src/validate/mod.rs b/crates/transaction-pool/src/validate/mod.rs index 7344ac7609..1ae8dfacfa 100644 --- a/crates/transaction-pool/src/validate/mod.rs +++ b/crates/transaction-pool/src/validate/mod.rs @@ -212,7 +212,7 @@ pub trait TransactionValidator: Debug + Send + Sync { /// See also [`Self::validate_transaction`]. fn validate_transactions( &self, - transactions: Vec<(TransactionOrigin, Self::Transaction)>, + transactions: impl IntoIterator + Send, ) -> impl Future>> + Send { futures_util::future::join_all( transactions.into_iter().map(|(origin, tx)| self.validate_transaction(origin, tx)), @@ -260,7 +260,7 @@ where async fn validate_transactions( &self, - transactions: Vec<(TransactionOrigin, Self::Transaction)>, + transactions: impl IntoIterator + Send, ) -> Vec> { match self { Self::Left(v) => v.validate_transactions(transactions).await, diff --git a/crates/transaction-pool/src/validate/task.rs b/crates/transaction-pool/src/validate/task.rs index e25c47b248..eb07de96da 100644 --- a/crates/transaction-pool/src/validate/task.rs +++ b/crates/transaction-pool/src/validate/task.rs @@ -254,8 +254,9 @@ where async fn validate_transactions( &self, - transactions: Vec<(TransactionOrigin, Self::Transaction)>, + transactions: impl IntoIterator + Send, ) -> Vec> { + let transactions: Vec<_> = transactions.into_iter().collect(); let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect(); let (tx, rx) = oneshot::channel(); { @@ -300,7 +301,8 @@ where origin: TransactionOrigin, transactions: impl IntoIterator + Send, ) -> Vec> { - self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await + let transactions: Vec<_> = transactions.into_iter().map(|tx| (origin, tx)).collect(); + self.validate_transactions(transactions).await } fn on_new_head_block(&self, new_tip_block: &SealedBlock) {