feat(pool): return state of an added tx (#17442)

This commit is contained in:
Hai | RISE
2025-07-22 21:51:03 +07:00
committed by GitHub
parent c2098faea3
commit 868c421c5d
15 changed files with 88 additions and 49 deletions

View File

@@ -61,8 +61,8 @@ use reth_primitives_traits::SignedTransaction;
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
error::{PoolError, PoolResult},
GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
TransactionPool, ValidPoolTransaction,
AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
PropagatedTransactions, TransactionPool, ValidPoolTransaction,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
@@ -81,7 +81,8 @@ use tracing::{debug, trace};
/// The future for importing transactions into the pool.
///
/// Resolves with the result of each transaction import.
pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
pub type PoolImportFuture =
Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
/// Api to interact with [`TransactionsManager`] task.
///
@@ -561,10 +562,10 @@ impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
TransactionsManager<Pool, N, PBundle>
{
/// Processes a batch import results.
fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
for res in batch_results {
match res {
Ok(hash) => {
Ok(AddedTransactionOutcome { hash, .. }) => {
self.on_good_import(hash);
}
Err(err) => {

View File

@@ -10,7 +10,9 @@ use reth_network::{
};
use reth_network_api::{events::PeerEvent, PeerKind, PeersInfo};
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool};
use reth_transaction_pool::{
test_utils::TransactionGenerator, AddedTransactionOutcome, PoolTransaction, TransactionPool,
};
use std::sync::Arc;
use tokio::join;
@@ -42,7 +44,8 @@ async fn test_tx_gossip() {
provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000)));
// insert pending tx in peer0's pool
let hash = peer0_pool.add_external_transaction(tx).await.unwrap();
let AddedTransactionOutcome { hash, .. } =
peer0_pool.add_external_transaction(tx).await.unwrap();
let inserted = peer0_tx_listener.recv().await.unwrap();
assert_eq!(inserted, hash);
@@ -81,10 +84,10 @@ async fn test_tx_propagation_policy_trusted_only() {
provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000)));
// insert the tx in peer0's pool
let hash_0 = peer_0_handle.pool().unwrap().add_external_transaction(tx).await.unwrap();
let outcome_0 = peer_0_handle.pool().unwrap().add_external_transaction(tx).await.unwrap();
let inserted = peer0_tx_listener.recv().await.unwrap();
assert_eq!(inserted, hash_0);
assert_eq!(inserted, outcome_0.hash);
// ensure tx is not gossiped to peer1
peer1_tx_listener.try_recv().expect_err("Empty");
@@ -108,16 +111,16 @@ async fn test_tx_propagation_policy_trusted_only() {
provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000)));
// insert pending tx in peer0's pool
let hash_1 = peer_0_handle.pool().unwrap().add_external_transaction(tx).await.unwrap();
let outcome_1 = peer_0_handle.pool().unwrap().add_external_transaction(tx).await.unwrap();
let inserted = peer0_tx_listener.recv().await.unwrap();
assert_eq!(inserted, hash_1);
assert_eq!(inserted, outcome_1.hash);
// ensure peer1 now receives the pending txs from peer0
let mut buff = Vec::with_capacity(2);
buff.push(peer1_tx_listener.recv().await.unwrap());
buff.push(peer1_tx_listener.recv().await.unwrap());
assert!(buff.contains(&hash_1));
assert!(buff.contains(&outcome_1.hash));
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -10,7 +10,9 @@ use reth_optimism_txpool::conditional::MaybeConditionalTransaction;
use reth_rpc_eth_api::L2EthApiExtServer;
use reth_rpc_eth_types::utils::recover_raw_transaction;
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
};
use std::sync::Arc;
use tokio::sync::Semaphore;
@@ -157,7 +159,7 @@ where
} else {
// otherwise, add to pool with the appended conditional
tx.set_conditional(condition);
let hash =
let AddedTransactionOutcome { hash, .. } =
self.pool().add_transaction(TransactionOrigin::Private, tx).await.map_err(|e| {
OpEthApiError::Eth(reth_rpc_eth_types::EthApiError::PoolError(e.into()))
})?;

View File

@@ -12,7 +12,9 @@ use reth_rpc_eth_api::{
};
use reth_rpc_eth_types::utils::recover_raw_transaction;
use reth_storage_api::{errors::ProviderError, ReceiptProvider};
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
};
use std::fmt::{Debug, Formatter};
impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
@@ -55,7 +57,7 @@ where
}
// submit the transaction to the pool with a `Local` origin
let hash = self
let AddedTransactionOutcome { hash, .. } = self
.pool()
.add_transaction(TransactionOrigin::Local, pool_transaction)
.await

View File

@@ -29,7 +29,9 @@ use reth_storage_api::{
BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
TransactionsProvider,
};
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
};
use std::sync::Arc;
/// Transaction related functions for the [`EthApiServer`](crate::EthApiServer) trait in
@@ -417,7 +419,7 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
.map_err(|_| EthApiError::TransactionConversionError)?;
// submit the transaction to the pool with a `Local` origin
let hash = self
let AddedTransactionOutcome { hash, .. } = self
.pool()
.add_transaction(TransactionOrigin::Local, pool_transaction)
.await

View File

@@ -8,7 +8,9 @@ use reth_rpc_eth_api::{
FromEvmError, RpcNodeCore,
};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
};
impl<N, Rpc> EthTransactions for EthApi<N, Rpc>
where
@@ -33,7 +35,8 @@ where
let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
// submit the transaction to the pool with a `Local` origin
let hash = self.pool().add_transaction(TransactionOrigin::Local, pool_transaction).await?;
let AddedTransactionOutcome { hash, .. } =
self.pool().add_transaction(TransactionOrigin::Local, pool_transaction).await?;
Ok(hash)
}

View File

@@ -281,8 +281,9 @@ pub use crate::{
error::PoolResult,
ordering::{CoinbaseTipOrdering, Priority, TransactionOrdering},
pool::{
blob_tx_priority, fee_delta, state::SubPool, AllTransactionsEvents, FullTransactionEvent,
NewTransactionEvent, TransactionEvent, TransactionEvents, TransactionListenerKind,
blob_tx_priority, fee_delta, state::SubPool, AddedTransactionOutcome,
AllTransactionsEvents, FullTransactionEvent, NewTransactionEvent, TransactionEvent,
TransactionEvents, TransactionListenerKind,
},
traits::*,
validate::{
@@ -486,7 +487,7 @@ where
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> PoolResult<TxHash> {
) -> PoolResult<AddedTransactionOutcome> {
let (_, tx) = self.validate(origin, transaction).await;
let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
results.pop().expect("result length is the same as the input")
@@ -496,7 +497,7 @@ where
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> Vec<PoolResult<TxHash>> {
) -> Vec<PoolResult<AddedTransactionOutcome>> {
if transactions.is_empty() {
return Vec::new()
}

View File

@@ -9,9 +9,9 @@ use crate::{
pool::TransactionListenerKind,
traits::{BestTransactionsAttributes, GetPooledTransactionLimit, NewBlobSidecar},
validate::ValidTransaction,
AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo, EthPoolTransaction,
EthPooledTransaction, NewTransactionEvent, PoolResult, PoolSize, PoolTransaction,
PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool,
AddedTransactionOutcome, AllPoolTransactions, AllTransactionsEvents, BestTransactions,
BlockInfo, EthPoolTransaction, EthPooledTransaction, NewTransactionEvent, PoolResult, PoolSize,
PoolTransaction, PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool,
TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction,
};
use alloy_eips::{
@@ -79,7 +79,7 @@ impl<T: EthPoolTransaction> TransactionPool for NoopTransactionPool<T> {
&self,
_origin: TransactionOrigin,
transaction: Self::Transaction,
) -> PoolResult<TxHash> {
) -> PoolResult<AddedTransactionOutcome> {
let hash = *transaction.hash();
Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction))))
}
@@ -88,7 +88,7 @@ impl<T: EthPoolTransaction> TransactionPool for NoopTransactionPool<T> {
&self,
_origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> Vec<PoolResult<TxHash>> {
) -> Vec<PoolResult<AddedTransactionOutcome>> {
transactions
.into_iter()
.map(|transaction| {

View File

@@ -458,7 +458,7 @@ where
pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
origin: TransactionOrigin,
tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TxHash> {
) -> PoolResult<AddedTransactionOutcome> {
match tx {
TransactionValidationOutcome::Valid {
balance,
@@ -494,6 +494,10 @@ where
let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
let hash = *added.hash();
let state = match added.subpool() {
SubPool::Pending => AddedTransactionState::Pending,
_ => AddedTransactionState::Queued,
};
// transaction was successfully inserted into the pool
if let Some(sidecar) = maybe_sidecar {
@@ -524,7 +528,7 @@ where
// Notify listeners for _all_ transactions
self.on_new_transaction(added.into_new_transaction_event());
Ok(hash)
Ok(AddedTransactionOutcome { hash, state })
}
TransactionValidationOutcome::Invalid(tx, err) => {
let mut listener = self.event_listener.write();
@@ -563,7 +567,7 @@ where
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<TxHash>> {
) -> Vec<PoolResult<AddedTransactionOutcome>> {
// Add the transactions and enforce the pool size limits in one write lock
let (mut added, discarded) = {
let mut pool = self.pool.write();
@@ -599,7 +603,7 @@ where
// A newly added transaction may be immediately discarded, so we need to
// adjust the result here
for res in &mut added {
if let Ok(hash) = res {
if let Ok(AddedTransactionOutcome { hash, .. }) = res {
if discarded_hashes.contains(hash) {
*res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
}
@@ -1167,7 +1171,6 @@ impl<T: PoolTransaction> AddedTransaction<T> {
}
/// Returns the subpool this transaction was added to
#[cfg(test)]
pub(crate) const fn subpool(&self) -> SubPool {
match self {
Self::Pending(_) => SubPool::Pending,
@@ -1185,6 +1188,24 @@ impl<T: PoolTransaction> AddedTransaction<T> {
}
}
/// The state of a transaction when is was added to the pool
#[derive(Debug)]
pub enum AddedTransactionState {
/// Ready for execution
Pending,
/// Not ready for execution due to a nonce gap or insufficient balance
Queued, // TODO: Break it down to missing nonce, insufficient balance, etc.
}
/// The outcome of a successful transaction addition
#[derive(Debug)]
pub struct AddedTransactionOutcome {
/// The hash of the transaction
pub hash: TxHash,
/// The state of the transaction
pub state: AddedTransactionState,
}
/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
#[derive(Debug)]
pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {

View File

@@ -58,7 +58,7 @@ use crate::{
TransactionListenerKind,
},
validate::ValidPoolTransaction,
AllTransactionsEvents,
AddedTransactionOutcome, AllTransactionsEvents,
};
use alloy_consensus::{error::ValueError, BlockHeader, Signed, Typed2718};
use alloy_eips::{
@@ -130,7 +130,7 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
fn add_external_transaction(
&self,
transaction: Self::Transaction,
) -> impl Future<Output = PoolResult<TxHash>> + Send {
) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send {
self.add_transaction(TransactionOrigin::External, transaction)
}
@@ -140,7 +140,7 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
fn add_external_transactions(
&self,
transactions: Vec<Self::Transaction>,
) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send {
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send {
self.add_transactions(TransactionOrigin::External, transactions)
}
@@ -163,7 +163,7 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> impl Future<Output = PoolResult<TxHash>> + Send;
) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send;
/// Adds the given _unvalidated_ transaction into the pool.
///
@@ -174,14 +174,14 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send;
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send;
/// Submit a consensus transaction directly to the pool
fn add_consensus_transaction(
&self,
tx: Recovered<<Self::Transaction as PoolTransaction>::Consensus>,
origin: TransactionOrigin,
) -> impl Future<Output = PoolResult<TxHash>> + Send {
) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send {
async move {
let tx_hash = *tx.tx_hash();

View File

@@ -3,7 +3,7 @@
use reth_transaction_pool::{
error::PoolErrorKind,
test_utils::{MockTransaction, MockTransactionFactory, TestPoolBuilder},
PoolTransaction, TransactionOrigin, TransactionPool,
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
};
#[tokio::test(flavor = "multi_thread")]
@@ -12,7 +12,7 @@ async fn blobs_exclusive() {
let mut mock_tx_factory = MockTransactionFactory::default();
let blob_tx = mock_tx_factory.create_eip4844();
let hash = txpool
let AddedTransactionOutcome { hash, .. } = txpool
.add_transaction(TransactionOrigin::External, blob_tx.transaction.clone())
.await
.unwrap();

View File

@@ -9,7 +9,8 @@ use reth_transaction_pool::{
test_utils::{
MockFeeRange, MockTransactionDistribution, MockTransactionRatio, TestPool, TestPoolBuilder,
},
BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionPool, TransactionPoolExt,
AddedTransactionOutcome, BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin,
TransactionPool, TransactionPoolExt,
};
#[tokio::test(flavor = "multi_thread")]
@@ -97,7 +98,7 @@ async fn only_blobs_eviction() {
let results = pool.add_transactions(TransactionOrigin::External, set).await;
for (i, result) in results.iter().enumerate() {
match result {
Ok(hash) => {
Ok(AddedTransactionOutcome { hash, .. }) => {
println!("✅ Inserted tx into pool with hash: {hash}");
}
Err(e) => {

View File

@@ -113,7 +113,7 @@ async fn txpool_listener_all() {
let added_result =
txpool.add_transaction(TransactionOrigin::External, transaction.transaction.clone()).await;
assert_matches!(added_result, Ok(hash) if hash == *transaction.transaction.get_hash());
assert_matches!(added_result, Ok(outcome) if outcome.hash == *transaction.transaction.get_hash());
assert_matches!(
all_tx_events.next().await,

View File

@@ -12,7 +12,7 @@ async fn txpool_new_pending_txs() {
let added_result =
txpool.add_transaction(TransactionOrigin::External, transaction.transaction.clone()).await;
assert_matches!(added_result, Ok(hash) if hash == *transaction.transaction.get_hash());
assert_matches!(added_result, Ok(outcome) if outcome.hash == *transaction.transaction.get_hash());
let mut best_txns = txpool.best_transactions();
assert_matches!(best_txns.next(), Some(tx) if tx.transaction.get_hash() == transaction.transaction.get_hash());
@@ -20,6 +20,6 @@ async fn txpool_new_pending_txs() {
let transaction = mock_tx_factory.create_eip1559();
let added_result =
txpool.add_transaction(TransactionOrigin::External, transaction.transaction.clone()).await;
assert_matches!(added_result, Ok(hash) if hash == *transaction.transaction.get_hash());
assert_matches!(added_result, Ok(outcome) if outcome.hash == *transaction.transaction.get_hash());
assert_matches!(best_txns.next(), Some(tx) if tx.transaction.get_hash() == transaction.transaction.get_hash());
}

View File

@@ -7,7 +7,10 @@ use alloy_primitives::{Address, TxHash, U256};
use futures_util::StreamExt;
use reth_ethereum::{
node::api::{FullNodeComponents, NodeTypes},
pool::{PoolTransaction, TransactionEvent, TransactionOrigin, TransactionPool},
pool::{
AddedTransactionOutcome, PoolTransaction, TransactionEvent, TransactionOrigin,
TransactionPool,
},
primitives::SignerRecoverable,
rpc::eth::primitives::TransactionRequest,
EthPrimitives, TransactionSigned,
@@ -93,7 +96,7 @@ pub async fn submit_eth_transfer<FC>(
gas_limit: u64,
max_priority_fee_per_gas: u128,
max_fee_per_gas: u128,
) -> eyre::Result<TxHash>
) -> eyre::Result<AddedTransactionOutcome>
where
FC: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
{