feat: optimize send_raw_transaction_sync receipts fetching (#20689)

This commit is contained in:
Karl Yu
2026-01-05 19:22:04 +08:00
committed by GitHub
parent 66f3453b3c
commit 480029a678
6 changed files with 142 additions and 50 deletions

View File

@@ -7,8 +7,8 @@ use alloy_eips::{eip1898::ForkBlock, eip2718::Encodable2718, BlockNumHash};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash};
use core::{fmt, ops::RangeInclusive};
use reth_primitives_traits::{
transaction::signed::SignedTransaction, Block, BlockBody, NodePrimitives, RecoveredBlock,
SealedHeader,
transaction::signed::SignedTransaction, Block, BlockBody, IndexedTx, NodePrimitives,
RecoveredBlock, SealedHeader,
};
use reth_trie_common::updates::TrieUpdates;
use revm::database::BundleState;
@@ -41,6 +41,13 @@ pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
trie_updates: Option<TrieUpdates>,
}
type ChainTxReceiptMeta<'a, N> = (
&'a RecoveredBlock<<N as NodePrimitives>::Block>,
IndexedTx<'a, <N as NodePrimitives>::Block>,
&'a <N as NodePrimitives>::Receipt,
&'a [<N as NodePrimitives>::Receipt],
);
impl<N: NodePrimitives> Default for Chain<N> {
fn default() -> Self {
Self {
@@ -185,6 +192,24 @@ impl<N: NodePrimitives> Chain<N> {
self.blocks_iter().zip(self.block_receipts_iter())
}
/// Finds a transaction by hash and returns it along with its corresponding receipt data.
///
/// Returns `None` if the transaction is not found in this chain.
pub fn find_transaction_and_receipt_by_hash(
&self,
tx_hash: TxHash,
) -> Option<ChainTxReceiptMeta<'_, N>> {
for (block, receipts) in self.blocks_and_receipts() {
let Some(indexed_tx) = block.find_indexed(tx_hash) else {
continue;
};
let receipt = receipts.get(indexed_tx.index())?;
return Some((block, indexed_tx, receipt, receipts.as_slice()));
}
None
}
/// Get the block at which this chain forked.
pub fn fork_block(&self) -> ForkBlock {
let first = self.first();

View File

@@ -3,7 +3,7 @@
use eyre::WrapErr;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use std::sync::{atomic::AtomicBool, LazyLock};
use std::sync::{atomic::AtomicBool, OnceLock};
/// Installs the Prometheus recorder as the global recorder.
///
@@ -12,13 +12,25 @@ use std::sync::{atomic::AtomicBool, LazyLock};
/// Caution: This only configures the global recorder and does not spawn the exporter.
/// Callers must run [`PrometheusRecorder::spawn_upkeep`] manually.
pub fn install_prometheus_recorder() -> &'static PrometheusRecorder {
&PROMETHEUS_RECORDER_HANDLE
PROMETHEUS_RECORDER_HANDLE.get_or_init(|| PrometheusRecorder::install().unwrap())
}
/// The default Prometheus recorder handle. We use a global static to ensure that it is only
/// installed once.
static PROMETHEUS_RECORDER_HANDLE: LazyLock<PrometheusRecorder> =
LazyLock::new(|| PrometheusRecorder::install().unwrap());
static PROMETHEUS_RECORDER_HANDLE: OnceLock<PrometheusRecorder> = OnceLock::new();
/// Installs the Prometheus recorder with a custom builder.
///
/// Returns an error if a recorder has already been installed.
pub fn try_install_prometheus_recorder_with_builder(
builder: PrometheusBuilder,
) -> eyre::Result<&'static PrometheusRecorder> {
let recorder = PrometheusRecorder::install_with_builder(builder)?;
PROMETHEUS_RECORDER_HANDLE
.set(recorder)
.map_err(|_| eyre::eyre!("Prometheus recorder already installed"))?;
Ok(PROMETHEUS_RECORDER_HANDLE.get().expect("recorder is set"))
}
/// A handle to the Prometheus recorder.
///
@@ -75,7 +87,11 @@ impl PrometheusRecorder {
/// Caution: This only configures the global recorder and does not spawn the exporter.
/// Callers must run [`Self::spawn_upkeep`] manually.
pub fn install() -> eyre::Result<Self> {
let recorder = PrometheusBuilder::new().build_recorder();
Self::install_with_builder(PrometheusBuilder::new())
}
fn install_with_builder(builder: PrometheusBuilder) -> eyre::Result<Self> {
let recorder = builder.build_recorder();
let handle = recorder.handle();
// Build metrics stack
@@ -98,14 +114,13 @@ mod tests {
// `metrics-exporter-prometheus` dependency version.
#[test]
fn process_metrics() {
// initialize the lazy handle
let _ = &*PROMETHEUS_RECORDER_HANDLE;
let recorder = install_prometheus_recorder();
let process = metrics_process::Collector::default();
process.describe();
process.collect();
let metrics = PROMETHEUS_RECORDER_HANDLE.handle.render();
let metrics = recorder.handle().render();
assert!(metrics.contains("process_cpu_seconds_total"), "{metrics:?}");
}
}

View File

@@ -7,15 +7,13 @@ use futures::StreamExt;
use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
use reth_chain_state::CanonStateSubscriptions;
use reth_optimism_primitives::DepositReceipt;
use reth_primitives_traits::{
BlockBody, Recovered, SignedTransaction, SignerRecoverable, WithEncoded,
};
use reth_primitives_traits::{Recovered, SignedTransaction, SignerRecoverable, WithEncoded};
use reth_rpc_eth_api::{
helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction, SpawnBlocking},
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
RpcReceipt, TxInfoMapper,
};
use reth_rpc_eth_types::{EthApiError, TransactionSource};
use reth_rpc_eth_types::{block::convert_transaction_receipt, EthApiError, TransactionSource};
use reth_storage_api::{errors::ProviderError, ProviderTx, ReceiptProvider, TransactionsProvider};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
@@ -118,11 +116,18 @@ where
canonical_notification = canonical_stream.next() => {
if let Some(notification) = canonical_notification {
let chain = notification.committed();
for block in chain.blocks_iter() {
if block.body().contains_transaction(&hash)
&& let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
if let Some((block, tx, receipt, all_receipts)) =
chain.find_transaction_and_receipt_by_hash(hash) &&
let Some(receipt) = convert_transaction_receipt(
block,
all_receipts,
tx,
receipt,
this.converter(),
)
.transpose()?
{
return Ok(receipt);
}
} else {
// Canonical stream ended

View File

@@ -18,10 +18,12 @@ use alloy_primitives::{Address, Bytes, TxHash, B256, U256};
use alloy_rpc_types_eth::{BlockNumberOrTag, TransactionInfo};
use futures::{Future, StreamExt};
use reth_chain_state::CanonStateSubscriptions;
use reth_node_api::BlockBody;
use reth_primitives_traits::{Recovered, RecoveredBlock, SignedTransaction, TxTy, WithEncoded};
use reth_primitives_traits::{
BlockBody, Recovered, RecoveredBlock, SignedTransaction, TxTy, WithEncoded,
};
use reth_rpc_convert::{transaction::RpcConvert, RpcTxReq, TransactionConversionError};
use reth_rpc_eth_types::{
block::convert_transaction_receipt,
utils::{binary_search, recover_raw_transaction},
EthApiError::{self, TransactionConfirmationTimeout},
FillTransaction, SignError, TransactionSource,
@@ -108,12 +110,19 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
tokio::time::timeout(timeout_duration, async {
while let Some(notification) = stream.next().await {
let chain = notification.committed();
for block in chain.blocks_iter() {
if block.body().contains_transaction(&hash) &&
let Some(receipt) = this.transaction_receipt(hash).await?
{
return Ok(receipt);
}
if let Some((block, tx, receipt, all_receipts)) =
chain.find_transaction_and_receipt_by_hash(hash) &&
let Some(receipt) = convert_transaction_receipt(
block,
all_receipts,
tx,
receipt,
this.converter(),
)
.transpose()
.map_err(Self::Error::from)?
{
return Ok(receipt);
}
}
Err(Self::Error::from_eth_err(TransactionConfirmationTimeout {

View File

@@ -2,10 +2,14 @@
use std::sync::Arc;
use alloy_consensus::TxReceipt;
use alloy_primitives::TxHash;
use reth_primitives_traits::{
BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedBlock,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
use crate::utils::calculate_gas_used_and_next_log_index;
/// A pair of an [`Arc`] wrapped [`RecoveredBlock`] and its corresponding receipts.
///
@@ -44,4 +48,57 @@ impl<N: NodePrimitives> BlockAndReceipts<N> {
pub fn sealed_block(&self) -> &SealedBlock<BlockTy<N>> {
self.block.sealed_block()
}
/// Returns the rpc transaction receipt for the given transaction hash if it exists.
///
/// This uses the given converter to turn [`Self::find_transaction_and_receipt_by_hash`] into
/// the rpc format.
pub fn find_and_convert_transaction_receipt<C>(
&self,
tx_hash: TxHash,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
C: RpcConvert<Primitives = N>,
{
let (tx, receipt) = self.find_transaction_and_receipt_by_hash(tx_hash)?;
convert_transaction_receipt(
self.block.as_ref(),
self.receipts.as_ref(),
tx,
receipt,
converter,
)
}
}
/// Converts a transaction and its receipt into the rpc receipt format using the given converter.
pub fn convert_transaction_receipt<N, C>(
block: &RecoveredBlock<BlockTy<N>>,
all_receipts: &[ReceiptTy<N>],
tx: IndexedTx<'_, BlockTy<N>>,
receipt: &ReceiptTy<N>,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
N: NodePrimitives,
C: RpcConvert<Primitives = N>,
{
let meta = tx.meta();
let (gas_used, next_log_index) =
calculate_gas_used_and_next_log_index(meta.index, all_receipts);
converter
.convert_receipts_with_block(
vec![ConvertReceiptInput {
tx: tx.recovered_tx(),
gas_used: receipt.cumulative_gas_used() - gas_used,
receipt: receipt.clone(),
next_log_index,
meta,
}],
block.sealed_block(),
)
.map(|mut receipts| receipts.pop())
.transpose()
}

View File

@@ -4,8 +4,8 @@
use std::{sync::Arc, time::Instant};
use crate::{block::BlockAndReceipts, utils::calculate_gas_used_and_next_log_index};
use alloy_consensus::{BlockHeader, TxReceipt};
use crate::block::BlockAndReceipts;
use alloy_consensus::BlockHeader;
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_primitives::{BlockHash, TxHash, B256};
use derive_more::Constructor;
@@ -15,7 +15,7 @@ use reth_evm::{ConfigureEvm, EvmEnvFor};
use reth_primitives_traits::{
Block, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedHeader,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
use reth_rpc_convert::{RpcConvert, RpcTypes};
/// Configured [`reth_evm::EvmEnv`] for a pending block.
#[derive(Debug, Clone, Constructor)]
@@ -155,26 +155,7 @@ impl<N: NodePrimitives> PendingBlock<N> {
where
C: RpcConvert<Primitives = N>,
{
let (tx, receipt) = self.find_transaction_and_receipt_by_hash(tx_hash)?;
let meta = tx.meta();
let all_receipts = &self.receipts;
let (gas_used, next_log_index) =
calculate_gas_used_and_next_log_index(meta.index, all_receipts);
converter
.convert_receipts_with_block(
vec![ConvertReceiptInput {
tx: tx.recovered_tx(),
gas_used: receipt.cumulative_gas_used() - gas_used,
receipt: receipt.clone(),
next_log_index,
meta,
}],
self.executed_block.sealed_block(),
)
.map(|mut receipts| receipts.pop())
.transpose()
self.to_block_and_receipts().find_and_convert_transaction_receipt(tx_hash, converter)
}
}