mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 07:17:56 -05:00
chore: specialize send_raw_transaction_sync for op-reth with flashblocks support (#18586)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -9479,6 +9479,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"derive_more",
|
||||
"eyre",
|
||||
"futures",
|
||||
"jsonrpsee",
|
||||
"jsonrpsee-core",
|
||||
"jsonrpsee-types",
|
||||
@@ -9518,6 +9519,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@@ -60,6 +60,8 @@ op-revm.workspace = true
|
||||
|
||||
# async
|
||||
tokio.workspace = true
|
||||
futures.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
|
||||
async-trait.workspace = true
|
||||
tower.workspace = true
|
||||
|
||||
@@ -4,9 +4,11 @@ use crate::{OpEthApi, OpEthApiError, SequencerClient};
|
||||
use alloy_consensus::TxReceipt as _;
|
||||
use alloy_primitives::{Bytes, B256};
|
||||
use alloy_rpc_types_eth::TransactionInfo;
|
||||
use futures::StreamExt;
|
||||
use op_alloy_consensus::{transaction::OpTransactionInfo, OpTransaction};
|
||||
use reth_chain_state::CanonStateSubscriptions;
|
||||
use reth_optimism_primitives::DepositReceipt;
|
||||
use reth_primitives_traits::{SignedTransaction, SignerRecoverable};
|
||||
use reth_primitives_traits::{BlockBody, SignedTransaction, SignerRecoverable};
|
||||
use reth_rpc_convert::transaction::ConvertReceiptInput;
|
||||
use reth_rpc_eth_api::{
|
||||
helpers::{
|
||||
@@ -16,7 +18,7 @@ use reth_rpc_eth_api::{
|
||||
try_into_op_tx_info, EthApiTypes as _, FromEthApiError, FromEvmError, RpcConvert, RpcNodeCore,
|
||||
RpcReceipt, TxInfoMapper,
|
||||
};
|
||||
use reth_rpc_eth_types::utils::recover_raw_transaction;
|
||||
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
|
||||
use reth_storage_api::{errors::ProviderError, ReceiptProvider};
|
||||
use reth_transaction_pool::{
|
||||
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
|
||||
@@ -26,6 +28,7 @@ use std::{
|
||||
future::Future,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio_stream::wrappers::WatchStream;
|
||||
|
||||
impl<N, Rpc> EthTransactions for OpEthApi<N, Rpc>
|
||||
where
|
||||
@@ -78,6 +81,79 @@ where
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
/// Decodes and recovers the transaction and submits it to the pool.
|
||||
///
|
||||
/// And awaits the receipt, checking both canonical blocks and flashblocks for faster
|
||||
/// confirmation.
|
||||
fn send_raw_transaction_sync(
|
||||
&self,
|
||||
tx: Bytes,
|
||||
) -> impl Future<Output = Result<RpcReceipt<Self::NetworkTypes>, Self::Error>> + Send
|
||||
where
|
||||
Self: LoadReceipt + 'static,
|
||||
{
|
||||
let this = self.clone();
|
||||
let timeout_duration = self.send_raw_transaction_sync_timeout();
|
||||
async move {
|
||||
let hash = EthTransactions::send_raw_transaction(&this, tx).await?;
|
||||
let mut canonical_stream = this.provider().canonical_state_stream();
|
||||
let flashblock_rx = this.pending_block_rx();
|
||||
let mut flashblock_stream = flashblock_rx.map(WatchStream::new);
|
||||
|
||||
tokio::time::timeout(timeout_duration, async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Listen for regular canonical block updates for inclusion
|
||||
canonical_notification = canonical_stream.next() => {
|
||||
if let Some(notification) = canonical_notification {
|
||||
let chain = notification.committed();
|
||||
for block in chain.blocks_iter() {
|
||||
if block.body().contains_transaction(&hash) {
|
||||
if let Some(receipt) = this.transaction_receipt(hash).await? {
|
||||
return Ok(receipt);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Canonical stream ended
|
||||
break;
|
||||
}
|
||||
}
|
||||
// check if the tx was preconfirmed in a new flashblock
|
||||
_flashblock_update = async {
|
||||
if let Some(ref mut stream) = flashblock_stream {
|
||||
stream.next().await
|
||||
} else {
|
||||
futures::future::pending().await
|
||||
}
|
||||
} => {
|
||||
// Check flashblocks for faster confirmation (Optimism-specific)
|
||||
if let Ok(Some(pending_block)) = this.pending_flashblock() {
|
||||
let block_and_receipts = pending_block.into_block_and_receipts();
|
||||
if block_and_receipts.block.body().contains_transaction(&hash) {
|
||||
if let Some(receipt) = this.transaction_receipt(hash).await? {
|
||||
return Ok(receipt);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
|
||||
hash,
|
||||
duration: timeout_duration,
|
||||
}))
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_elapsed| {
|
||||
Err(Self::Error::from_eth_err(EthApiError::TransactionConfirmationTimeout {
|
||||
hash,
|
||||
duration: timeout_duration,
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the transaction receipt for the given hash.
|
||||
///
|
||||
/// With flashblocks, we should also lookup the pending block for the transaction
|
||||
|
||||
Reference in New Issue
Block a user