feat(optimism): Launch FlashBlockService when websocket URL is provided in OpEthApi (#18077)

This commit is contained in:
Roman Hodulák
2025-08-27 11:51:33 +02:00
committed by GitHub
parent b7b70a46a5
commit 97f4b00fc0
5 changed files with 74 additions and 9 deletions

1
Cargo.lock generated
View File

@@ -9499,6 +9499,7 @@ dependencies = [
"reth-node-builder",
"reth-optimism-chainspec",
"reth-optimism-evm",
"reth-optimism-flashblocks",
"reth-optimism-forks",
"reth-optimism-payload-builder",
"reth-optimism-primitives",

View File

@@ -30,6 +30,7 @@ reth-rpc-engine-api.workspace = true
# op-reth
reth-optimism-evm.workspace = true
reth-optimism-flashblocks.workspace = true
reth-optimism-payload-builder.workspace = true
reth-optimism-txpool.workspace = true
# TODO remove node-builder import

View File

@@ -16,9 +16,11 @@ use alloy_primitives::U256;
use eyre::WrapErr;
use op_alloy_network::Optimism;
pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
use reqwest::Url;
use reth_evm::ConfigureEvm;
use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy};
use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
use reth_optimism_flashblocks::{launch_wss_flashblocks_service, FlashBlockRx};
use reth_rpc::eth::{core::EthApiInner, DevSigner};
use reth_rpc_eth_api::{
helpers::{
@@ -28,7 +30,9 @@ use reth_rpc_eth_api::{
EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
RpcNodeCoreExt, RpcTypes, SignableTxRequest,
};
use reth_rpc_eth_types::{EthStateCache, FeeHistoryCache, GasPriceOracle};
use reth_rpc_eth_types::{
pending_block::PendingBlockAndReceipts, EthStateCache, FeeHistoryCache, GasPriceOracle,
};
use reth_storage_api::{ProviderHeader, ProviderTx};
use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
@@ -66,9 +70,14 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
eth_api: EthApiNodeBackend<N, Rpc>,
sequencer_client: Option<SequencerClient>,
min_suggested_priority_fee: U256,
flashblocks_rx: Option<FlashBlockRx<N::Primitives>>,
) -> Self {
let inner =
Arc::new(OpEthApiInner { eth_api, sequencer_client, min_suggested_priority_fee });
let inner = Arc::new(OpEthApiInner {
eth_api,
sequencer_client,
min_suggested_priority_fee,
flashblocks_rx,
});
Self { inner }
}
@@ -85,6 +94,13 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
pub const fn builder() -> OpEthApiBuilder<Rpc> {
OpEthApiBuilder::new()
}
/// Returns a [`PendingBlockAndReceipts`] that is built out of flashblocks.
///
/// If flashblocks receiver is not set, then it always returns `None`.
pub fn pending_flashblock(&self) -> Option<PendingBlockAndReceipts<N::Primitives>> {
Some(self.inner.flashblocks_rx.as_ref()?.borrow().as_ref()?.to_block_and_receipts())
}
}
impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
@@ -271,6 +287,10 @@ pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
///
/// See also <https://github.com/ethereum-optimism/op-geth/blob/d4e0fe9bb0c2075a9bff269fb975464dd8498f75/eth/gasprice/optimism-gasprice.go#L38-L38>
min_suggested_priority_fee: U256,
/// Flashblocks receiver.
///
/// If set, then it provides current pending block based on received Flashblocks.
flashblocks_rx: Option<FlashBlockRx<N::Primitives>>,
}
impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
@@ -310,6 +330,10 @@ pub struct OpEthApiBuilder<NetworkT = Optimism> {
sequencer_headers: Vec<String>,
/// Minimum suggested priority fee (tip)
min_suggested_priority_fee: u64,
/// A URL pointing to a secure websocket connection (wss) that streams out [flashblocks].
///
/// [flashblocks]: reth_optimism_flashblocks
flashblocks_url: Option<Url>,
/// Marker for network types.
_nt: PhantomData<NetworkT>,
}
@@ -320,6 +344,7 @@ impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
sequencer_url: None,
sequencer_headers: Vec::new(),
min_suggested_priority_fee: 1_000_000,
flashblocks_url: None,
_nt: PhantomData,
}
}
@@ -332,6 +357,7 @@ impl<NetworkT> OpEthApiBuilder<NetworkT> {
sequencer_url: None,
sequencer_headers: Vec::new(),
min_suggested_priority_fee: 1_000_000,
flashblocks_url: None,
_nt: PhantomData,
}
}
@@ -348,16 +374,24 @@ impl<NetworkT> OpEthApiBuilder<NetworkT> {
self
}
/// With minimum suggested priority fee (tip)
/// With minimum suggested priority fee (tip).
pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
self.min_suggested_priority_fee = min;
self
}
/// With a subscription to flashblocks secure websocket connection.
pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
self.flashblocks_url = flashblocks_url;
self
}
}
impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
where
N: FullNodeComponents<Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>>>,
N: FullNodeComponents<
Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>> + Unpin>,
>,
NetworkT: RpcTypes,
OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
OpEthApi<N, OpRpcConvert<N, NetworkT>>:
@@ -366,13 +400,17 @@ where
type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
let Self { sequencer_url, sequencer_headers, min_suggested_priority_fee, .. } = self;
let Self {
sequencer_url,
sequencer_headers,
min_suggested_priority_fee,
flashblocks_url,
..
} = self;
let rpc_converter =
RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
.with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
let sequencer_client = if let Some(url) = sequencer_url {
Some(
SequencerClient::new_with_headers(&url, sequencer_headers)
@@ -383,6 +421,21 @@ where
None
};
Ok(OpEthApi::new(eth_api, sequencer_client, U256::from(min_suggested_priority_fee)))
let flashblocks_rx = flashblocks_url.map(|ws_url| {
launch_wss_flashblocks_service(
ws_url,
ctx.components.evm_config().clone(),
ctx.components.provider().clone(),
)
});
let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
Ok(OpEthApi::new(
eth_api,
sequencer_client,
U256::from(min_suggested_priority_fee),
flashblocks_rx,
))
}
}

View File

@@ -45,6 +45,10 @@ where
)>,
Self::Error,
> {
if let Some(block) = self.pending_flashblock() {
return Ok(Some(block));
}
// See: <https://github.com/ethereum-optimism/op-geth/blob/f2e69450c6eec9c35d56af91389a1c47737206ca/miner/worker.go#L367-L375>
let latest = self
.provider()

View File

@@ -120,6 +120,12 @@ impl<N: NodePrimitives> PendingBlock<N> {
pub fn into_block_and_receipts(self) -> PendingBlockAndReceipts<N> {
(self.executed_block.recovered_block, self.receipts)
}
/// Returns a pair of [`RecoveredBlock`] and a vector of [`NodePrimitives::Receipt`]s by
/// cloning from borrowed self.
pub fn to_block_and_receipts(&self) -> PendingBlockAndReceipts<N> {
(self.executed_block.recovered_block.clone(), self.receipts.clone())
}
}
impl<N: NodePrimitives> From<PendingBlock<N>> for BlockState<N> {