perf: improve ethsendrawsync for op with flashblock (#19462)

Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
This commit is contained in:
Matthias Seitz
2025-11-04 21:48:55 +01:00
committed by GitHub
parent c3a60fa75a
commit 3ae73e63e5
7 changed files with 105 additions and 96 deletions

1
Cargo.lock generated
View File

@@ -9621,7 +9621,6 @@ dependencies = [
"reth-primitives-traits",
"reth-rpc",
"reth-rpc-api",
"reth-rpc-convert",
"reth-rpc-engine-api",
"reth-rpc-eth-api",
"reth-rpc-eth-types",

View File

@@ -28,7 +28,6 @@ reth-node-builder.workspace = true
reth-chainspec.workspace = true
reth-chain-state.workspace = true
reth-rpc-engine-api.workspace = true
reth-rpc-convert.workspace = true
# op-reth
reth-optimism-evm.workspace = true

View File

@@ -94,6 +94,11 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
Self { inner }
}
/// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
pub const fn builder() -> OpEthApiBuilder<Rpc> {
OpEthApiBuilder::new()
}
/// Returns a reference to the [`EthApiNodeBackend`].
pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
self.inner.eth_api()
@@ -132,11 +137,6 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
}
/// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
pub const fn builder() -> OpEthApiBuilder<Rpc> {
OpEthApiBuilder::new()
}
/// Awaits a fresh flashblock if one is being built, otherwise returns current.
async fn flashblock(
&self,

View File

@@ -1,20 +1,15 @@
//! Loads and formats OP transaction RPC response.
use crate::{OpEthApi, OpEthApiError, SequencerClient};
use alloy_consensus::TxReceipt as _;
use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_eth::TransactionInfo;
use futures::StreamExt;
use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
use reth_chain_state::CanonStateSubscriptions;
use reth_optimism_primitives::DepositReceipt;
use reth_primitives_traits::{BlockBody, SignedTransaction, SignerRecoverable};
use reth_rpc_convert::transaction::ConvertReceiptInput;
use reth_primitives_traits::{BlockBody, SignedTransaction};
use reth_rpc_eth_api::{
helpers::{
receipt::calculate_gas_used_and_next_log_index, spec::SignersForRpc, EthTransactions,
LoadReceipt, LoadTransaction,
},
helpers::{spec::SignersForRpc, EthTransactions, LoadReceipt, LoadTransaction},
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
RpcReceipt, TxInfoMapper,
};
@@ -88,21 +83,35 @@ where
fn send_raw_transaction_sync(
&self,
tx: Bytes,
) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
where
Self: LoadReceipt + 'static,
{
) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send {
let this = self.clone();
let timeout_duration = self.send_raw_transaction_sync_timeout();
async move {
let mut canonical_stream = this.provider().canonical_state_stream();
let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
let flashblock_rx = this.pending_block_rx();
let mut flashblock_stream = flashblock_rx.map(WatchStream::new);
let mut flashblock_stream = this.pending_block_rx().map(WatchStream::new);
tokio::time::timeout(timeout_duration, async {
loop {
tokio::select! {
biased;
// check if the tx was preconfirmed in a new flashblock
flashblock = async {
if let Some(stream) = &mut flashblock_stream {
stream.next().await
} else {
futures::future::pending().await
}
} => {
if let Some(flashblock) = flashblock.flatten() {
// if flashblocks are supported, attempt to find id from the pending block
if let Some(receipt) = flashblock
.find_and_convert_transaction_receipt(hash, this.tx_resp_builder())
{
return receipt;
}
}
}
// Listen for regular canonical block updates for inclusion
canonical_notification = canonical_stream.next() => {
if let Some(notification) = canonical_notification {
@@ -118,23 +127,6 @@ where
break;
}
}
// check if the tx was preconfirmed in a new flashblock
_flashblock_update = async {
if let Some(ref mut stream) = flashblock_stream {
stream.next().await
} else {
futures::future::pending().await
}
} => {
// Check flashblocks for faster confirmation (Optimism-specific)
if let Ok(Some(pending_block)) = this.pending_flashblock().await {
let block_and_receipts = pending_block.into_block_and_receipts();
if block_and_receipts.block.body().contains_transaction(&hash)
&& let Some(receipt) = this.transaction_receipt(hash).await? {
return Ok(receipt);
}
}
}
}
}
Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
@@ -168,42 +160,11 @@ where
if tx_receipt.is_none() {
// if flashblocks are supported, attempt to find id from the pending block
if let Ok(Some(pending_block)) = this.pending_flashblock().await {
let block_and_receipts = pending_block.into_block_and_receipts();
if let Some((tx, receipt)) =
block_and_receipts.find_transaction_and_receipt_by_hash(hash)
{
// Build tx receipt from pending block and receipts directly inline.
// This avoids canonical cache lookup that would be done by the
// `build_transaction_receipt` which would result in a block not found
// issue. See: https://github.com/paradigmxyz/reth/issues/18529
let meta = tx.meta();
let all_receipts = &block_and_receipts.receipts;
let (gas_used, next_log_index) =
calculate_gas_used_and_next_log_index(meta.index, all_receipts);
return Ok(Some(
this.tx_resp_builder()
.convert_receipts_with_block(
vec![ConvertReceiptInput {
tx: tx
.tx()
.clone()
.try_into_recovered_unchecked()
.map_err(Self::Error::from_eth_err)?
.as_recovered_ref(),
gas_used: receipt.cumulative_gas_used() - gas_used,
receipt: receipt.clone(),
next_log_index,
meta,
}],
block_and_receipts.sealed_block(),
)?
.pop()
.unwrap(),
))
}
if let Ok(Some(pending_block)) = this.pending_flashblock().await &&
let Some(Ok(receipt)) = pending_block
.find_and_convert_transaction_receipt(hash, this.tx_resp_builder())
{
return Ok(Some(receipt));
}
}
let Some((tx, meta, receipt)) = tx_receipt else { return Ok(None) };

View File

@@ -6,27 +6,11 @@ use alloy_consensus::{transaction::TransactionMeta, TxReceipt};
use futures::Future;
use reth_primitives_traits::SignerRecoverable;
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert};
use reth_rpc_eth_types::{error::FromEthApiError, EthApiError};
use reth_rpc_eth_types::{
error::FromEthApiError, utils::calculate_gas_used_and_next_log_index, EthApiError,
};
use reth_storage_api::{ProviderReceipt, ProviderTx};
/// Calculates the gas used and next log index for a transaction at the given index
pub fn calculate_gas_used_and_next_log_index(
tx_index: u64,
all_receipts: &[impl TxReceipt],
) -> (u64, usize) {
let mut gas_used = 0;
let mut next_log_index = 0;
if tx_index > 0 {
for receipt in all_receipts.iter().take(tx_index as usize) {
gas_used = receipt.cumulative_gas_used();
next_log_index += receipt.logs().len();
}
}
(gas_used, next_log_index)
}
/// Assembles transaction receipt data w.r.t to network.
///
/// Behaviour shared by several `eth_` RPC methods, not exclusive to `eth_` receipts RPC methods.

View File

@@ -4,17 +4,18 @@
use std::{sync::Arc, time::Instant};
use crate::block::BlockAndReceipts;
use alloy_consensus::BlockHeader;
use crate::{block::BlockAndReceipts, utils::calculate_gas_used_and_next_log_index};
use alloy_consensus::{BlockHeader, TxReceipt};
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_primitives::{BlockHash, B256};
use alloy_primitives::{BlockHash, TxHash, B256};
use derive_more::Constructor;
use reth_chain_state::{BlockState, ExecutedBlock};
use reth_ethereum_primitives::Receipt;
use reth_evm::{ConfigureEvm, EvmEnvFor};
use reth_primitives_traits::{
Block, BlockTy, NodePrimitives, ReceiptTy, RecoveredBlock, SealedHeader,
Block, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedHeader,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
/// Configured [`reth_evm::EvmEnv`] for a pending block.
#[derive(Debug, Clone, Constructor)]
@@ -129,6 +130,52 @@ impl<N: NodePrimitives> PendingBlock<N> {
pub fn parent_hash(&self) -> BlockHash {
self.executed_block.recovered_block().parent_hash()
}
/// Finds a transaction by hash and returns it along with its corresponding receipt.
///
/// Returns `None` if the transaction is not found in this block.
pub fn find_transaction_and_receipt_by_hash(
&self,
tx_hash: TxHash,
) -> Option<(IndexedTx<'_, N::Block>, &N::Receipt)> {
let indexed_tx = self.executed_block.recovered_block().find_indexed(tx_hash)?;
let receipt = self.receipts.get(indexed_tx.index())?;
Some((indexed_tx, receipt))
}
/// Returns the rpc transaction receipt for the given transaction hash if it exists.
///
/// This uses the given converter to turn [`Self::find_transaction_and_receipt_by_hash`] into
/// the rpc format.
pub fn find_and_convert_transaction_receipt<C>(
&self,
tx_hash: TxHash,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
C: RpcConvert<Primitives = N>,
{
let (tx, receipt) = self.find_transaction_and_receipt_by_hash(tx_hash)?;
let meta = tx.meta();
let all_receipts = &self.receipts;
let (gas_used, next_log_index) =
calculate_gas_used_and_next_log_index(meta.index, all_receipts);
converter
.convert_receipts_with_block(
vec![ConvertReceiptInput {
tx: tx.recovered_tx(),
gas_used: receipt.cumulative_gas_used() - gas_used,
receipt: receipt.clone(),
next_log_index,
meta,
}],
self.executed_block.sealed_block(),
)
.map(|mut receipts| receipts.pop())
.transpose()
}
}
impl<N: NodePrimitives> From<PendingBlock<N>> for BlockState<N> {

View File

@@ -1,9 +1,28 @@
//! Commonly used code snippets
use super::{EthApiError, EthResult};
use alloy_consensus::TxReceipt;
use reth_primitives_traits::{Recovered, SignedTransaction};
use std::future::Future;
/// Calculates the gas used and next log index for a transaction at the given index
pub fn calculate_gas_used_and_next_log_index(
tx_index: u64,
all_receipts: &[impl TxReceipt],
) -> (u64, usize) {
let mut gas_used = 0;
let mut next_log_index = 0;
if tx_index > 0 {
for receipt in all_receipts.iter().take(tx_index as usize) {
gas_used = receipt.cumulative_gas_used();
next_log_index += receipt.logs().len();
}
}
(gas_used, next_log_index)
}
/// Recovers a [`SignedTransaction`] from an enveloped encoded byte stream.
///
/// This is a helper function that returns the appropriate RPC-specific error if the input data is