From b3cbfa4ced71202e087ea0a3fdc2dc14e316b74c Mon Sep 17 00:00:00 2001 From: dustinjake <154801775+dustinjake@users.noreply.github.com> Date: Tue, 30 Sep 2025 10:08:39 +0200 Subject: [PATCH] feat(flashblocks): additional pending flashblock data (#18776) --- Cargo.lock | 1 + crates/optimism/flashblocks/Cargo.toml | 1 + crates/optimism/flashblocks/src/lib.rs | 7 ++-- crates/optimism/flashblocks/src/payload.rs | 27 ++++++++++++++++ crates/optimism/flashblocks/src/sequence.rs | 5 +++ crates/optimism/flashblocks/src/service.rs | 32 ++++++++++-------- crates/optimism/flashblocks/src/worker.rs | 36 ++++++++++++--------- crates/optimism/rpc/src/eth/mod.rs | 2 +- 8 files changed, 79 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6d6fceddd1..b998865663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9344,6 +9344,7 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-serde", "brotli", + "derive_more", "eyre", "futures-util", "metrics", diff --git a/crates/optimism/flashblocks/Cargo.toml b/crates/optimism/flashblocks/Cargo.toml index 8babbe710e..d889498d54 100644 --- a/crates/optimism/flashblocks/Cargo.toml +++ b/crates/optimism/flashblocks/Cargo.toml @@ -51,6 +51,7 @@ metrics.workspace = true eyre.workspace = true ringbuffer.workspace = true +derive_more.workspace = true [dev-dependencies] test-case.workspace = true diff --git a/crates/optimism/flashblocks/src/lib.rs b/crates/optimism/flashblocks/src/lib.rs index 1d13adad89..bf8417788b 100644 --- a/crates/optimism/flashblocks/src/lib.rs +++ b/crates/optimism/flashblocks/src/lib.rs @@ -3,23 +3,24 @@ pub use payload::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, Metadata, }; -use reth_rpc_eth_types::PendingBlock; pub use service::FlashBlockService; pub use ws::{WsConnect, WsFlashBlockStream}; mod consensus; pub use consensus::FlashBlockConsensusClient; mod payload; +pub use payload::PendingFlashBlock; mod sequence; pub use sequence::FlashBlockCompleteSequence; + mod service; mod worker; mod ws; -/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s. +/// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s. /// /// [`FlashBlock`]: crate::FlashBlock -pub type PendingBlockRx = tokio::sync::watch::Receiver>>; +pub type PendingBlockRx = tokio::sync::watch::Receiver>>; /// Receiver of the sequences of [`FlashBlock`]s built. /// diff --git a/crates/optimism/flashblocks/src/payload.rs b/crates/optimism/flashblocks/src/payload.rs index dee2458178..a71f996420 100644 --- a/crates/optimism/flashblocks/src/payload.rs +++ b/crates/optimism/flashblocks/src/payload.rs @@ -1,8 +1,11 @@ use alloy_eips::eip4895::Withdrawal; use alloy_primitives::{Address, Bloom, Bytes, B256, U256}; use alloy_rpc_types_engine::PayloadId; +use derive_more::Deref; +use reth_node_api::NodePrimitives; use reth_optimism_evm::OpNextBlockEnvAttributes; use reth_optimism_primitives::OpReceipt; +use reth_rpc_eth_types::PendingBlock; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -119,3 +122,27 @@ impl From for OpNextBlockEnvAttributes { } } } + +/// The pending block built with all received Flashblocks alongside the metadata for the last added +/// Flashblock. +#[derive(Debug, Clone, Deref)] +pub struct PendingFlashBlock { + /// The complete pending block built out of all received Flashblocks. + #[deref] + pub pending: PendingBlock, + /// A sequential index that identifies the last Flashblock added to this block. + pub last_flashblock_index: u64, + /// The last Flashblock block hash, + pub last_flashblock_hash: B256, +} + +impl PendingFlashBlock { + /// Create new pending flashblock. + pub const fn new( + pending: PendingBlock, + last_flashblock_index: u64, + last_flashblock_hash: B256, + ) -> Self { + Self { pending, last_flashblock_index, last_flashblock_hash } + } +} diff --git a/crates/optimism/flashblocks/src/sequence.rs b/crates/optimism/flashblocks/src/sequence.rs index 0976909442..c77d1e1c6f 100644 --- a/crates/optimism/flashblocks/src/sequence.rs +++ b/crates/optimism/flashblocks/src/sequence.rs @@ -121,6 +121,11 @@ where pub(crate) fn count(&self) -> usize { self.inner.len() } + + /// Returns the reference to the last flashblock. + pub(crate) fn last_flashblock(&self) -> Option<&FlashBlock> { + self.inner.last_key_value().map(|(_, b)| &b.block) + } } /// A complete sequence of flashblocks, often corresponding to a full block. diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index a9620083ed..e419adf33a 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,7 +1,7 @@ use crate::{ sequence::FlashBlockPendingSequence, worker::{BuildArgs, FlashBlockBuilder}, - ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, + ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, PendingFlashBlock, }; use alloy_eips::eip2718::WithEncoded; use alloy_primitives::B256; @@ -14,7 +14,6 @@ use reth_primitives_traits::{ AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered, }; use reth_revm::cached::CachedReads; -use reth_rpc_eth_types::PendingBlock; use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; use reth_tasks::TaskExecutor; use std::{ @@ -25,7 +24,7 @@ use std::{ use tokio::{pin, sync::oneshot}; use tracing::{debug, trace, warn}; -/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of +/// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of /// [`FlashBlock`]s. #[derive(Debug)] pub struct FlashBlockService< @@ -35,7 +34,7 @@ pub struct FlashBlockService< Provider, > { rx: S, - current: Option>, + current: Option>, blocks: FlashBlockPendingSequence, rebuild: bool, builder: FlashBlockBuilder, @@ -43,9 +42,9 @@ pub struct FlashBlockService< spawner: TaskExecutor, job: Option>, /// Cached state reads for the current block. - /// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when - /// fb received on top of the same block. Avoid redundant I/O across multiple executions - /// within the same block. + /// Current `PendingFlashBlock` is built out of a sequence of `FlashBlocks`, and executed again + /// when fb received on top of the same block. Avoid redundant I/O across multiple + /// executions within the same block. cached_state: Option<(B256, CachedReads)>, metrics: FlashBlockServiceMetrics, } @@ -92,7 +91,7 @@ where /// Drives the services and sends new blocks to the receiver /// /// Note: this should be spawned - pub async fn run(mut self, tx: tokio::sync::watch::Sender>>) { + pub async fn run(mut self, tx: tokio::sync::watch::Sender>>) { while let Some(block) = self.next().await { if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) { let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}")); @@ -128,18 +127,25 @@ where latest.hash() != base.parent_hash { trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt"); - return None; + return None } + let Some(last_flashblock) = self.blocks.last_flashblock() else { + trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing last flashblock"); + return None + }; + Some(BuildArgs { base, transactions: self.blocks.ready_transactions().collect::>(), cached_state: self.cached_state.take(), + last_flashblock_index: last_flashblock.index, + last_flashblock_hash: last_flashblock.diff.block_hash, }) } - /// Takes out `current` [`PendingBlock`] if `state` is not preceding it. - fn on_new_tip(&mut self, state: CanonStateNotification) -> Option> { + /// Takes out `current` [`PendingFlashBlock`] if `state` is not preceding it. + fn on_new_tip(&mut self, state: CanonStateNotification) -> Option> { let tip = state.tip_checked()?; let tip_hash = tip.hash(); let current = self.current.take_if(|current| current.parent_hash() != tip_hash); @@ -180,7 +186,7 @@ where + Clone + 'static, { - type Item = eyre::Result>>; + type Item = eyre::Result>>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -286,7 +292,7 @@ where } type BuildJob = - (Instant, oneshot::Receiver, CachedReads)>>>); + (Instant, oneshot::Receiver, CachedReads)>>>); #[derive(Metrics)] #[metrics(scope = "flashblock_service")] diff --git a/crates/optimism/flashblocks/src/worker.rs b/crates/optimism/flashblocks/src/worker.rs index c2bf04495e..8f5217c275 100644 --- a/crates/optimism/flashblocks/src/worker.rs +++ b/crates/optimism/flashblocks/src/worker.rs @@ -1,4 +1,4 @@ -use crate::ExecutionPayloadBaseV1; +use crate::{ExecutionPayloadBaseV1, PendingFlashBlock}; use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag}; use alloy_primitives::B256; use reth_chain_state::{CanonStateSubscriptions, ExecutedBlock}; @@ -41,6 +41,8 @@ pub(crate) struct BuildArgs { pub base: ExecutionPayloadBaseV1, pub transactions: I, pub cached_state: Option<(B256, CachedReads)>, + pub last_flashblock_index: u64, + pub last_flashblock_hash: B256, } impl FlashBlockBuilder @@ -56,14 +58,14 @@ where Receipt = ReceiptTy, > + Unpin, { - /// Returns the [`PendingBlock`] made purely out of transactions and [`ExecutionPayloadBaseV1`] - /// in `args`. + /// Returns the [`PendingFlashBlock`] made purely out of transactions and + /// [`ExecutionPayloadBaseV1`] in `args`. /// /// Returns `None` if the flashblock doesn't attach to the latest header. pub(crate) fn execute>>>( &self, mut args: BuildArgs, - ) -> eyre::Result, CachedReads)>> { + ) -> eyre::Result, CachedReads)>> { trace!("Attempting new pending block from flashblocks"); let latest = self @@ -110,17 +112,21 @@ where vec![execution_result.requests], ); - Ok(Some(( - PendingBlock::with_executed_block( - Instant::now() + Duration::from_secs(1), - ExecutedBlock { - recovered_block: block.into(), - execution_output: Arc::new(execution_outcome), - hashed_state: Arc::new(hashed_state), - }, - ), - request_cache, - ))) + let pending_block = PendingBlock::with_executed_block( + Instant::now() + Duration::from_secs(1), + ExecutedBlock { + recovered_block: block.into(), + execution_output: Arc::new(execution_outcome), + hashed_state: Arc::new(hashed_state), + }, + ); + let pending_flashblock = PendingFlashBlock::new( + pending_block, + args.last_flashblock_index, + args.last_flashblock_hash, + ); + + Ok(Some((pending_flashblock, request_cache))) } } diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index 31d4f27e35..fdd06d224b 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -139,7 +139,7 @@ impl OpEthApi { parent.hash() == pending_block.block().parent_hash() && now <= pending_block.expires_at { - return Ok(Some(pending_block.clone())); + return Ok(Some(pending_block.pending.clone())); } Ok(None)