feat: recovery backpressure

This commit is contained in:
Arsenii Kulikov
2025-12-09 17:04:44 +04:00
parent 245cca7ce2
commit ced1d4919d
12 changed files with 106 additions and 43 deletions

View File

@@ -62,9 +62,9 @@ use reth_network_types::ReputationChangeKind;
use reth_primitives_traits::SignedTransaction;
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
error::{PoolError, PoolResult},
error::{InvalidPoolTransactionError, PoolError, PoolResult},
AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
PropagatedTransactions, TransactionPool, ValidPoolTransaction,
PropagatedTransactions, TransactionPool, TxRecoveryHandle, ValidPoolTransaction,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
@@ -1370,21 +1370,6 @@ where
// reallocations
let mut new_txs = Vec::with_capacity(transactions.len());
for tx in transactions {
// recover transaction
let tx = match tx.try_into_recovered() {
Ok(tx) => tx,
Err(badtx) => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%badtx.tx_hash(),
client_version=%peer.client_version,
"failed ecrecovery for transaction"
);
has_bad_transactions = true;
continue
}
};
match self.transactions_by_peers.entry(*tx.tx_hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
@@ -1401,9 +1386,7 @@ where
has_bad_transactions = true;
} else {
// this is a new transaction that should be imported into the pool
let pool_transaction = Pool::Transaction::from_pooled(tx);
new_txs.push(pool_transaction);
new_txs.push(tx);
entry.insert(HashSet::from([peer_id]));
}
@@ -1430,14 +1413,50 @@ where
trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
let import = Box::pin(async move {
let added = new_txs.len();
let res = pool.add_external_transactions(new_txs).await;
let mut stream = new_txs
.into_iter()
.map(|tx| async move {
let tx_hash = *tx.tx_hash();
TxRecoveryHandle::new()
.try_into_recovered(tx)
.await
.map(Pool::Transaction::from_pooled)
.map_err(|_| {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%tx_hash,
"failed ecrecovery for transaction"
);
PoolError::new(
tx_hash,
InvalidPoolTransactionError::InvalidSignature,
)
})
})
.collect::<FuturesUnordered<_>>();
let mut recovered = Vec::new();
let mut results = Vec::new();
while let Some(res) = stream.next().await {
match res {
Ok(tx) => {
recovered.push(tx);
}
Err(err) => {
results.push(Err(err));
}
}
}
results.extend(pool.add_external_transactions(recovered).await);
// update metrics
metric_pending_pool_imports.decrement(added as f64);
// update self-monitoring info
tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
res
results
});
self.pool_imports.push(import);

View File

@@ -121,7 +121,7 @@ where
return Err(TxConditionalErr::ConditionalCostExceeded.into());
}
let recovered_tx = recover_raw_transaction(&bytes).map_err(|_| {
let recovered_tx = recover_raw_transaction(&bytes).await.map_err(|_| {
OpEthApiError::Eth(reth_rpc_eth_types::EthApiError::FailedToDecodeSignedTransaction)
})?;

View File

@@ -15,7 +15,7 @@ use reth_rpc_eth_api::{
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
RpcReceipt, TxInfoMapper,
};
use reth_rpc_eth_types::{EthApiError, TransactionSource};
use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError, TransactionSource};
use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
@@ -196,7 +196,7 @@ where
{
let transaction = tx
.try_into_recovered_unchecked()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;
.map_err(|_| RpcInvalidTransactionError::InvalidTransactionSignature)?;
return Ok(Some(TransactionSource::Block {
transaction,

View File

@@ -79,7 +79,7 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
tx: Bytes,
) -> impl Future<Output = Result<B256, Self::Error>> + Send {
async move {
let recovered = recover_raw_transaction::<PoolPooledTx<Self::Pool>>(&tx)?;
let recovered = recover_raw_transaction::<PoolPooledTx<Self::Pool>>(&tx).await?;
self.send_transaction(WithEncoded::new(tx, recovered)).await
}
}
@@ -622,9 +622,8 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
// Note: we assume this transaction is valid, because it's mined (or
// part of pending block) and already. We don't need to
// check for pre EIP-2 because this transaction could be pre-EIP-2.
let transaction = tx
.try_into_recovered_unchecked()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;
let transaction =
tx.try_into_recovered_unchecked().map_err(Self::Error::from_eth_err)?;
return Ok(Some(TransactionSource::Block {
transaction,

View File

@@ -63,9 +63,6 @@ pub enum EthApiError {
/// When decoding a signed transaction fails
#[error("failed to decode signed transaction")]
FailedToDecodeSignedTransaction,
/// When the transaction signature is invalid
#[error("invalid transaction signature")]
InvalidTransactionSignature,
/// Errors related to the transaction pool
#[error(transparent)]
PoolError(#[from] RpcPoolError),
@@ -265,7 +262,6 @@ impl From<EthApiError> for jsonrpsee_types::error::ErrorObject<'static> {
fn from(error: EthApiError) -> Self {
match error {
EthApiError::FailedToDecodeSignedTransaction |
EthApiError::InvalidTransactionSignature |
EthApiError::EmptyRawTransactionData |
EthApiError::InvalidBlockRange |
EthApiError::ExceedsMaxProofWindow |
@@ -525,7 +521,7 @@ where
impl From<RecoveryError> for EthApiError {
fn from(_: RecoveryError) -> Self {
Self::InvalidTransactionSignature
Self::InvalidTransaction(RpcInvalidTransactionError::InvalidTransactionSignature)
}
}
@@ -694,6 +690,9 @@ pub enum RpcInvalidTransactionError {
/// Minimum required priority fee.
minimum_priority_fee: u128,
},
/// When the transaction signature is invalid
#[error("invalid transaction signature")]
InvalidTransactionSignature,
/// Any other error
#[error("{0}")]
Other(Box<dyn ToRpcError>),
@@ -1026,6 +1025,9 @@ impl From<PoolError> for RpcPoolError {
impl From<InvalidPoolTransactionError> for RpcPoolError {
fn from(err: InvalidPoolTransactionError) -> Self {
match err {
InvalidPoolTransactionError::InvalidSignature => {
Self::Invalid(RpcInvalidTransactionError::InvalidTransactionSignature)
}
InvalidPoolTransactionError::Consensus(err) => Self::Invalid(err.into()),
InvalidPoolTransactionError::ExceedsGasLimit(_, _) => Self::ExceedsGasLimit,
InvalidPoolTransactionError::MaxTxGasLimitExceeded(_, _) => Self::MaxTxGasLimitExceeded,

View File

@@ -1,8 +1,11 @@
//! Commonly used code snippets
use crate::RpcInvalidTransactionError;
use super::{EthApiError, EthResult};
use alloy_consensus::TxReceipt;
use reth_primitives_traits::{Recovered, SignedTransaction};
use reth_transaction_pool::TxRecoveryHandle;
use std::future::Future;
/// Calculates the gas used and next log index for a transaction at the given index
@@ -32,7 +35,7 @@ pub fn calculate_gas_used_and_next_log_index(
/// that the entire input buffer is consumed and no trailing bytes are allowed.
///
/// See [`alloy_eips::eip2718::Decodable2718::decode_2718_exact`]
pub fn recover_raw_transaction<T: SignedTransaction>(data: &[u8]) -> EthResult<Recovered<T>> {
pub async fn recover_raw_transaction<T: SignedTransaction>(data: &[u8]) -> EthResult<Recovered<T>> {
if data.is_empty() {
return Err(EthApiError::EmptyRawTransactionData)
}
@@ -40,8 +43,10 @@ pub fn recover_raw_transaction<T: SignedTransaction>(data: &[u8]) -> EthResult<R
let transaction =
T::decode_2718_exact(data).map_err(|_| EthApiError::FailedToDecodeSignedTransaction)?;
SignedTransaction::try_into_recovered(transaction)
.or(Err(EthApiError::InvalidTransactionSignature))
TxRecoveryHandle::new()
.try_into_recovered(transaction)
.await
.or(Err(RpcInvalidTransactionError::InvalidTransactionSignature.into()))
}
/// Performs a binary search within a given block range to find the desired block number.

View File

@@ -5,6 +5,7 @@ use alloy_eips::eip7840::BlobParams;
use alloy_evm::env::BlockEnvironment;
use alloy_primitives::{uint, Keccak256, U256};
use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
use futures::future::TryJoinAll;
use jsonrpsee::core::RpcResult;
use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_evm::{ConfigureEvm, Evm};
@@ -79,8 +80,9 @@ where
let transactions = txs
.into_iter()
.map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
.collect::<Result<Vec<_>, _>>()?
.map(|tx| async move { recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx).await })
.collect::<TryJoinAll<_>>()
.await?
.into_iter()
.collect::<Vec<_>>();

View File

@@ -87,7 +87,7 @@ where
/// `FlattenedBundleItem` with their associated metadata. This handles recursive bundle
/// processing up to `MAX_NESTED_BUNDLE_DEPTH` and `MAX_BUNDLE_BODY_SIZE`, preserving
/// inclusion, validity and privacy settings from parent bundles.
fn parse_and_flatten_bundle(
async fn parse_and_flatten_bundle(
&self,
request: &MevSendBundle,
) -> Result<Vec<FlattenedBundleItem<ProviderTx<Eth::Provider>>>, EthApiError> {
@@ -167,7 +167,7 @@ where
while idx < body.len() {
match &body[idx] {
BundleItem::Tx { tx, can_revert } => {
let tx = recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(tx)?;
let tx = recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(tx).await?;
let tx = tx.map(
<Eth::Pool as TransactionPool>::Transaction::pooled_into_consensus,
);
@@ -228,7 +228,7 @@ where
// Parse and validate bundle
// Also, flatten the bundle here so that its easier to process
let flattened_bundle = self.parse_and_flatten_bundle(&request)?;
let flattened_bundle = self.parse_and_flatten_bundle(&request).await?;
let block_id = parent_block.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest));
let (mut evm_env, current_block_id) = self.eth_api().evm_env_at(block_id).await?;

View File

@@ -118,7 +118,8 @@ where
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> Result<TraceResults, Eth::Error> {
let tx = recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx)?
let tx = recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx)
.await?
.map(<Eth::Pool as TransactionPool>::Transaction::pooled_into_consensus);
let (evm_env, at) = self.eth_api().evm_env_at(block_id.unwrap_or_default()).await?;

View File

@@ -278,6 +278,9 @@ pub enum InvalidPoolTransactionError {
/// Minimum required priority fee.
minimum_priority_fee: u128,
},
/// Thrown if the transaction signature is invalid.
#[error("invalid signature")]
InvalidSignature,
}
// === impl InvalidPoolTransactionError ===
@@ -393,6 +396,7 @@ impl InvalidPoolTransactionError {
Eip7702PoolTransactionError::AuthorityReserved => false,
},
Self::PriorityFeeBelowMinimum { .. } => false,
Self::InvalidSignature => true,
}
}

View File

@@ -321,6 +321,8 @@ pub mod blobstore;
mod config;
pub mod identifier;
mod ordering;
mod recovery;
pub use recovery::TxRecoveryHandle;
mod traits;
#[cfg(any(test, feature = "test-utils"))]

View File

@@ -0,0 +1,29 @@
use alloy_consensus::crypto::RecoveryError;
use reth_primitives_traits::{Recovered, SignerRecoverable};
use tokio::sync::Semaphore;
// We allow up to 1000 concurrent conversions to avoid excessive memory usage.
static SEMAPHORE: Semaphore = Semaphore::const_new(5);
/// A simple semaphore-based blob sidecar converter.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct TxRecoveryHandle;
impl TxRecoveryHandle {
/// Creates a new transaction recovery handle.
pub const fn new() -> Self {
Self
}
/// Recovers a [`SignerRecoverable`] transaction.
pub async fn try_into_recovered<T: SignerRecoverable + Send + 'static>(
&self,
tx: T,
) -> Result<Recovered<T>, RecoveryError> {
let _permit = SEMAPHORE.acquire().await.unwrap();
tokio::task::spawn_blocking(move || tx.try_into_recovered())
.await
.expect("transaction recovery panicked")
}
}