diff --git a/Cargo.lock b/Cargo.lock index 771d1d8a58..f1dcff8785 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3613,6 +3613,7 @@ dependencies = [ "reth-network-peers", "reth-node-builder", "reth-op", + "reth-optimism-flashblocks", "reth-optimism-forks", "reth-payload-builder", "reth-rpc-api", diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index a66d7b222e..a9c9640e3c 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -973,7 +973,12 @@ where ); let eth_config = config.rpc.eth_config().max_batch_size(config.txpool.max_batch_size()); - let ctx = EthApiCtx { components: &node, config: eth_config, cache }; + let ctx = EthApiCtx { + components: &node, + config: eth_config, + cache, + engine_handle: beacon_engine_handle.clone(), + }; let eth_api = eth_api_builder.build_eth_api(ctx).await?; let auth_config = config.rpc.auth_server_config(jwt_secret)?; @@ -1137,6 +1142,8 @@ pub struct EthApiCtx<'a, N: FullNodeTypes> { pub config: EthConfig, /// Cache for eth state pub cache: EthStateCache>, + /// Handle to the beacon consensus engine + pub engine_handle: ConsensusEngineHandle<::Payload>, } impl<'a, N: FullNodeComponents>> diff --git a/crates/optimism/flashblocks/src/consensus.rs b/crates/optimism/flashblocks/src/consensus.rs index 8df2e01a07..65926a4d91 100644 --- a/crates/optimism/flashblocks/src/consensus.rs +++ b/crates/optimism/flashblocks/src/consensus.rs @@ -1,86 +1,271 @@ -use crate::FlashBlockCompleteSequenceRx; +use crate::{FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx}; use alloy_primitives::B256; +use alloy_rpc_types_engine::PayloadStatusEnum; +use op_alloy_rpc_types_engine::OpExecutionData; use reth_engine_primitives::ConsensusEngineHandle; use reth_optimism_payload_builder::OpPayloadTypes; -use reth_payload_primitives::EngineApiMessageVersion; +use reth_payload_primitives::{EngineApiMessageVersion, ExecutionPayload, PayloadTypes}; use ringbuffer::{AllocRingBuffer, RingBuffer}; -use tracing::warn; +use tracing::*; + +/// Cache entry for block information: (block hash, block number, timestamp). +type BlockCacheEntry = (B256, u64, u64); /// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`] /// /// [`FlashBlockService`]: crate::FlashBlockService #[derive(Debug)] -pub struct FlashBlockConsensusClient { +pub struct FlashBlockConsensusClient

+where + P: PayloadTypes, +{ /// Handle to execution client. - engine_handle: ConsensusEngineHandle, + engine_handle: ConsensusEngineHandle

, sequence_receiver: FlashBlockCompleteSequenceRx, + /// Caches previous block info for lookup: (block hash, block number, timestamp). + block_hash_buffer: AllocRingBuffer, } -impl FlashBlockConsensusClient { +impl

FlashBlockConsensusClient

+where + P: PayloadTypes, + P::ExecutionData: for<'a> TryFrom<&'a FlashBlockCompleteSequence, Error: std::fmt::Display>, +{ /// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver. - pub const fn new( - engine_handle: ConsensusEngineHandle, + pub fn new( + engine_handle: ConsensusEngineHandle

, sequence_receiver: FlashBlockCompleteSequenceRx, ) -> eyre::Result { - Ok(Self { engine_handle, sequence_receiver }) + // Buffer size of 768 blocks (64 * 12) supports 1s block time chains like Unichain. + // Oversized for 2s block time chains like Base, but acceptable given minimal memory usage. + let block_hash_buffer = AllocRingBuffer::new(768); + Ok(Self { engine_handle, sequence_receiver, block_hash_buffer }) } - /// Get previous block hash using previous block hash buffer. If it isn't available (buffer - /// started more recently than `offset`), return default zero hash - fn get_previous_block_hash( + /// Return the safe and finalized block hash for FCU calls. + /// + /// Safe blocks are considered 32 L1 blocks (approximately 384s at 12s/block) behind the head, + /// and finalized blocks are 64 L1 blocks (approximately 768s) behind the head. This + /// approximation, while not precisely matching the OP stack's derivation, provides + /// sufficient proximity and enables op-reth to sync the chain independently of an op-node. + /// The offset is dynamically adjusted based on the actual block time detected from the + /// buffer. + fn get_safe_and_finalized_block_hash(&self) -> (B256, B256) { + let cached_blocks_count = self.block_hash_buffer.len(); + + // Not enough blocks to determine safe/finalized yet + if cached_blocks_count < 2 { + return (B256::ZERO, B256::ZERO); + } + + // Calculate average block time using block numbers to handle missing blocks correctly. + // By dividing timestamp difference by block number difference, we get accurate block + // time even when blocks are missing from the buffer. + let (_, latest_block_number, latest_timestamp) = + self.block_hash_buffer.get(cached_blocks_count - 1).unwrap(); + let (_, previous_block_number, previous_timestamp) = + self.block_hash_buffer.get(cached_blocks_count - 2).unwrap(); + let timestamp_delta = latest_timestamp.saturating_sub(*previous_timestamp); + let block_number_delta = latest_block_number.saturating_sub(*previous_block_number).max(1); + let block_time_secs = timestamp_delta / block_number_delta; + + // L1 reference: 32 blocks * 12s = 384s for safe, 64 blocks * 12s = 768s for finalized + const SAFE_TIME_SECS: u64 = 384; + const FINALIZED_TIME_SECS: u64 = 768; + + // Calculate how many L2 blocks correspond to these L1 time periods + let safe_block_offset = + (SAFE_TIME_SECS / block_time_secs).min(cached_blocks_count as u64) as usize; + let finalized_block_offset = + (FINALIZED_TIME_SECS / block_time_secs).min(cached_blocks_count as u64) as usize; + + // Get safe hash: offset from end of buffer + let safe_hash = self + .block_hash_buffer + .get(cached_blocks_count.saturating_sub(safe_block_offset)) + .map(|&(hash, _, _)| hash) + .unwrap(); + + // Get finalized hash: offset from end of buffer + let finalized_hash = self + .block_hash_buffer + .get(cached_blocks_count.saturating_sub(finalized_block_offset)) + .map(|&(hash, _, _)| hash) + .unwrap(); + + (safe_hash, finalized_hash) + } + + /// Receive the next flashblock sequence and cache its block information. + /// + /// Returns `None` if receiving fails (error is already logged). + async fn receive_and_cache_sequence(&mut self) -> Option { + match self.sequence_receiver.recv().await { + Ok(sequence) => { + self.block_hash_buffer.push(( + sequence.payload_base().parent_hash, + sequence.block_number(), + sequence.payload_base().timestamp, + )); + Some(sequence) + } + Err(err) => { + error!( + target: "flashblocks", + %err, + "error while fetching flashblock completed sequence", + ); + None + } + } + } + + /// Convert a flashblock sequence to an execution payload. + /// + /// Returns `None` if conversion fails (error is already logged). + fn convert_sequence_to_payload( &self, - previous_block_hashes: &AllocRingBuffer, - offset: usize, - ) -> B256 { - *previous_block_hashes - .len() - .checked_sub(offset) - .and_then(|index| previous_block_hashes.get(index)) - .unwrap_or_default() + sequence: &FlashBlockCompleteSequence, + ) -> Option { + match P::ExecutionData::try_from(sequence) { + Ok(payload) => Some(payload), + Err(err) => { + error!( + target: "flashblocks", + %err, + "error while converting to payload from completed sequence", + ); + None + } + } + } + + /// Submit a new payload to the engine. + /// + /// Returns `Ok(block_hash)` if the payload was accepted, `Err(())` otherwise (errors are + /// logged). + async fn submit_new_payload( + &self, + payload: P::ExecutionData, + sequence: &FlashBlockCompleteSequence, + ) -> Result { + let block_number = payload.block_number(); + let block_hash = payload.block_hash(); + + match self.engine_handle.new_payload(payload).await { + Ok(result) => { + debug!( + target: "flashblocks", + flashblock_count = sequence.count(), + block_number, + %block_hash, + ?result, + "Submitted engine_newPayload", + ); + + if let PayloadStatusEnum::Invalid { validation_error } = result.status { + debug!( + target: "flashblocks", + block_number, + %block_hash, + %validation_error, + "Payload validation error", + ); + return Err(()); + } + + Ok(block_hash) + } + Err(err) => { + error!( + target: "flashblocks", + %err, + block_number, + "Failed to submit new payload", + ); + Err(()) + } + } + } + + /// Submit a forkchoice update to the engine. + async fn submit_forkchoice_update( + &self, + head_block_hash: B256, + sequence: &FlashBlockCompleteSequence, + ) { + let block_number = sequence.block_number(); + let (safe_hash, finalized_hash) = self.get_safe_and_finalized_block_hash(); + let fcu_state = alloy_rpc_types_engine::ForkchoiceState { + head_block_hash, + safe_block_hash: safe_hash, + finalized_block_hash: finalized_hash, + }; + + match self + .engine_handle + .fork_choice_updated(fcu_state, None, EngineApiMessageVersion::V5) + .await + { + Ok(result) => { + debug!( + target: "flashblocks", + flashblock_count = sequence.count(), + block_number, + %head_block_hash, + %safe_hash, + %finalized_hash, + ?result, + "Submitted engine_forkChoiceUpdated", + ) + } + Err(err) => { + error!( + target: "flashblocks", + %err, + block_number, + %head_block_hash, + %safe_hash, + %finalized_hash, + "Failed to submit fork choice update", + ); + } + } } /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent /// blocks. pub async fn run(mut self) { - let mut previous_block_hashes = AllocRingBuffer::new(64); - loop { - match self.sequence_receiver.recv().await { - Ok(sequence) => { - let block_hash = sequence.payload_base().parent_hash; - previous_block_hashes.push(block_hash); + let Some(sequence) = self.receive_and_cache_sequence().await else { + continue; + }; - if sequence.state_root().is_none() { - warn!(target: "flashblocks", "Missing state root for the complete sequence") - } + let Some(payload) = self.convert_sequence_to_payload(&sequence) else { + continue; + }; - // Load previous block hashes. We're using (head - 32) and (head - 64) as the - // safe and finalized block hashes. - let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32); - let finalized_block_hash = - self.get_previous_block_hash(&previous_block_hashes, 64); + let Ok(block_hash) = self.submit_new_payload(payload, &sequence).await else { + continue; + }; - let state = alloy_rpc_types_engine::ForkchoiceState { - head_block_hash: block_hash, - safe_block_hash, - finalized_block_hash, - }; - - // Send FCU - let _ = self - .engine_handle - .fork_choice_updated(state, None, EngineApiMessageVersion::V3) - .await; - } - Err(err) => { - warn!( - target: "flashblocks", - %err, - "error while fetching flashblock completed sequence" - ); - break; - } - } + self.submit_forkchoice_update(block_hash, &sequence).await; } } } + +impl From<&FlashBlockCompleteSequence> for OpExecutionData { + fn from(sequence: &FlashBlockCompleteSequence) -> Self { + let mut data = Self::from_flashblocks_unchecked(sequence); + // Replace payload's state_root with the calculated one. For flashblocks, there was an + // option to disable state root calculation for blocks, and in that case, the payload's + // state_root will be zero, and we'll need to locally calculate state_root before + // proceeding to call engine_newPayload. + if let Some(execution_outcome) = sequence.execution_outcome() { + let payload = data.payload.as_v1_mut(); + payload.state_root = execution_outcome.state_root; + payload.block_hash = execution_outcome.block_hash; + } + data + } +} diff --git a/crates/optimism/flashblocks/src/sequence.rs b/crates/optimism/flashblocks/src/sequence.rs index 0bcde2ace1..409a13f516 100644 --- a/crates/optimism/flashblocks/src/sequence.rs +++ b/crates/optimism/flashblocks/src/sequence.rs @@ -13,18 +13,24 @@ use tracing::{debug, trace, warn}; /// The size of the broadcast channel for completed flashblock sequences. const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128; +/// Outcome from executing a flashblock sequence. +#[derive(Debug, Clone, Copy)] +pub struct SequenceExecutionOutcome { + /// The block hash of the executed pending block + pub block_hash: B256, + /// Properly computed state root + pub state_root: B256, +} + /// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices. #[derive(Debug)] pub struct FlashBlockPendingSequence { /// tracks the individual flashblocks in order - /// - /// With a blocktime of 2s and flashblock tick-rate of 200ms plus one extra flashblock per new - /// pending block, we expect 11 flashblocks per slot. inner: BTreeMap>, /// Broadcasts flashblocks to subscribers. block_broadcaster: broadcast::Sender, - /// Optional properly computed state root for the current sequence. - state_root: Option, + /// Optional execution outcome from building the current sequence. + execution_outcome: Option, } impl FlashBlockPendingSequence @@ -36,7 +42,7 @@ where // Note: if the channel is full, send will not block but rather overwrite the oldest // messages. Order is preserved. let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE); - Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None } + Self { inner: BTreeMap::new(), block_broadcaster: tx, execution_outcome: None } } /// Returns the sender half of the [`FlashBlockCompleteSequence`] channel. @@ -53,13 +59,18 @@ where // Clears the state and broadcasts the blocks produced to subscribers. fn clear_and_broadcast_blocks(&mut self) { + if self.inner.is_empty() { + return; + } + let flashblocks = mem::take(&mut self.inner); + let execution_outcome = mem::take(&mut self.execution_outcome); // If there are any subscribers, send the flashblocks to them. if self.block_broadcaster.receiver_count() > 0 { let flashblocks = match FlashBlockCompleteSequence::new( flashblocks.into_iter().map(|block| block.1.into()).collect(), - self.state_root, + execution_outcome, ) { Ok(flashblocks) => flashblocks, Err(err) => { @@ -106,9 +117,12 @@ where Ok(()) } - /// Set state root - pub const fn set_state_root(&mut self, state_root: Option) { - self.state_root = state_root; + /// Set execution outcome from building the flashblock sequence + pub const fn set_execution_outcome( + &mut self, + execution_outcome: Option, + ) { + self.execution_outcome = execution_outcome; } /// Iterator over sequence of executable transactions. @@ -171,12 +185,12 @@ where /// /// Ensures invariants of a complete flashblocks sequence. /// If this entire sequence of flashblocks was executed on top of latest block, this also includes -/// the computed state root. +/// the execution outcome with block hash and state root. #[derive(Debug, Clone)] pub struct FlashBlockCompleteSequence { inner: Vec, - /// Optional state root for the current sequence - state_root: Option, + /// Optional execution outcome from building the flashblock sequence + execution_outcome: Option, } impl FlashBlockCompleteSequence { @@ -185,7 +199,10 @@ impl FlashBlockCompleteSequence { /// * vector is not empty /// * first flashblock have the base payload /// * sequence of flashblocks is sound (successive index from 0, same payload id, ...) - pub fn new(blocks: Vec, state_root: Option) -> eyre::Result { + pub fn new( + blocks: Vec, + execution_outcome: Option, + ) -> eyre::Result { let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?; // Ensure that first flashblock have base @@ -200,7 +217,7 @@ impl FlashBlockCompleteSequence { bail!("Flashblock inconsistencies detected in sequence"); } - Ok(Self { inner: blocks, state_root }) + Ok(Self { inner: blocks, execution_outcome }) } /// Returns the block number @@ -223,9 +240,9 @@ impl FlashBlockCompleteSequence { self.inner.last().unwrap() } - /// Returns the state root for the current sequence - pub const fn state_root(&self) -> Option { - self.state_root + /// Returns the execution outcome of the sequence. + pub const fn execution_outcome(&self) -> Option { + self.execution_outcome } /// Returns all transactions from all flashblocks in the sequence @@ -247,7 +264,7 @@ impl TryFrom> for FlashBlockCompleteSequence { fn try_from(sequence: FlashBlockPendingSequence) -> Result { Self::new( sequence.inner.into_values().map(|block| block.block().clone()).collect::>(), - sequence.state_root, + sequence.execution_outcome, ) } } diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 40046e1596..82a62c9e01 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,5 +1,5 @@ use crate::{ - sequence::FlashBlockPendingSequence, + sequence::{FlashBlockPendingSequence, SequenceExecutionOutcome}, worker::{BuildArgs, FlashBlockBuilder}, FlashBlock, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx, InProgressFlashBlockRx, PendingFlashBlock, @@ -30,7 +30,8 @@ use tokio::{ }; use tracing::{debug, trace, warn}; -pub(crate) const FB_STATE_ROOT_FROM_INDEX: usize = 9; +/// 200 ms flashblock time. +pub(crate) const FLASHBLOCK_BLOCK_TIME: u64 = 200; /// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of /// [`FlashBlock`]s. @@ -60,7 +61,7 @@ pub struct FlashBlockService< in_progress_tx: watch::Sender>, /// `FlashBlock` service's metrics metrics: FlashBlockServiceMetrics, - /// Enable state root calculation from flashblock with index [`FB_STATE_ROOT_FROM_INDEX`] + /// Enable state root calculation compute_state_root: bool, } @@ -177,10 +178,13 @@ where return None }; + let Some(latest) = self.builder.provider().latest_header().ok().flatten() else { + trace!(target: "flashblocks", "No latest header found"); + return None + }; + // attempt an initial consecutive check - if let Some(latest) = self.builder.provider().latest_header().ok().flatten() && - latest.hash() != base.parent_hash - { + if latest.hash() != base.parent_hash { trace!(target: "flashblocks", flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt"); return None } @@ -190,9 +194,39 @@ where return None }; - // Check if state root must be computed - let compute_state_root = - self.compute_state_root && self.blocks.index() >= Some(FB_STATE_ROOT_FROM_INDEX as u64); + // Auto-detect when to compute state root: only if the builder didn't provide it (sent + // B256::ZERO) and we're near the expected final flashblock index. + // + // Background: Each block period receives multiple flashblocks at regular intervals. + // The sequencer sends an initial "base" flashblock at index 0 when a new block starts, + // then subsequent flashblocks are produced every FLASHBLOCK_BLOCK_TIME intervals (200ms). + // + // Examples with different block times: + // - Base (2s blocks): expect 2000ms / 200ms = 10 intervals → Flashblocks: index 0 (base) + // + indices 1-10 = potentially 11 total + // + // - Unichain (1s blocks): expect 1000ms / 200ms = 5 intervals → Flashblocks: index 0 (base) + // + indices 1-5 = potentially 6 total + // + // Why compute at N-1 instead of N: + // 1. Timing variance in flashblock producing time may mean only N flashblocks were produced + // instead of N+1 (missing the final one). Computing at N-1 ensures we get the state root + // for most common cases. + // + // 2. The +1 case (index 0 base + N intervals): If all N+1 flashblocks do arrive, we'll + // still calculate state root for flashblock N, which sacrifices a little performance but + // still ensures correctness for common cases. + // + // Note: Pathological cases may result in fewer flashblocks than expected (e.g., builder + // downtime, flashblock execution exceeding timing budget). When this occurs, we won't + // compute the state root, causing FlashblockConsensusClient to lack precomputed state for + // engine_newPayload. This is safe: we still have op-node as backstop to maintain + // chain progression. + let block_time_ms = (base.timestamp - latest.timestamp()) * 1000; + let expected_final_flashblock = block_time_ms / FLASHBLOCK_BLOCK_TIME; + let compute_state_root = self.compute_state_root && + last_flashblock.diff.state_root.is_zero() && + self.blocks.index() >= Some(expected_final_flashblock.saturating_sub(1)); Some(BuildArgs { base, @@ -268,8 +302,15 @@ where if let Some((now, result)) = result { match result { Ok(Some((new_pending, cached_reads))) => { - // update state root of the current sequence - this.blocks.set_state_root(new_pending.computed_state_root()); + // update execution outcome of the current sequence + let execution_outcome = + new_pending.computed_state_root().map(|state_root| { + SequenceExecutionOutcome { + block_hash: new_pending.block().hash(), + state_root, + } + }); + this.blocks.set_execution_outcome(execution_outcome); // built a new pending block this.current = Some(new_pending.clone()); diff --git a/crates/optimism/flashblocks/src/worker.rs b/crates/optimism/flashblocks/src/worker.rs index cf33d5c8ce..16ae67a09a 100644 --- a/crates/optimism/flashblocks/src/worker.rs +++ b/crates/optimism/flashblocks/src/worker.rs @@ -107,6 +107,7 @@ where // if the real state root should be computed let BlockBuilderOutcome { execution_result, block, hashed_state, .. } = if args.compute_state_root { + trace!(target: "flashblocks", "Computing block state root"); builder.finish(&state_provider)? } else { builder.finish(NoopProvider::default())? diff --git a/crates/optimism/node/src/args.rs b/crates/optimism/node/src/args.rs index 4e9bb2ce7c..9d27c99320 100644 --- a/crates/optimism/node/src/args.rs +++ b/crates/optimism/node/src/args.rs @@ -74,6 +74,14 @@ pub struct RollupArgs { /// block tag will use the pending state based on flashblocks. #[arg(long)] pub flashblocks_url: Option, + + /// Enable flashblock consensus client to drive the chain forward + /// + /// When enabled, the flashblock consensus client will process flashblock sequences and submit + /// them to the engine API to advance the chain. + /// Requires `flashblocks_url` to be set. + #[arg(long, default_value_t = false, requires = "flashblocks_url")] + pub flashblock_consensus: bool, } impl Default for RollupArgs { @@ -90,6 +98,7 @@ impl Default for RollupArgs { historical_rpc: None, min_suggested_priority_fee: 1_000_000, flashblocks_url: None, + flashblock_consensus: false, } } } diff --git a/crates/optimism/node/src/node.rs b/crates/optimism/node/src/node.rs index 65055eb671..c177051677 100644 --- a/crates/optimism/node/src/node.rs +++ b/crates/optimism/node/src/node.rs @@ -194,6 +194,7 @@ impl OpNode { .with_min_suggested_priority_fee(self.args.min_suggested_priority_fee) .with_historical_rpc(self.args.historical_rpc.clone()) .with_flashblocks(self.args.flashblocks_url.clone()) + .with_flashblock_consensus(self.args.flashblock_consensus) } /// Instantiates the [`ProviderFactoryBuilder`] for an opstack node. @@ -695,6 +696,8 @@ pub struct OpAddOnsBuilder { tokio_runtime: Option, /// A URL pointing to a secure websocket service that streams out flashblocks. flashblocks_url: Option, + /// Enable flashblock consensus client to drive chain forward. + flashblock_consensus: bool, } impl Default for OpAddOnsBuilder { @@ -711,6 +714,7 @@ impl Default for OpAddOnsBuilder { rpc_middleware: Identity::new(), tokio_runtime: None, flashblocks_url: None, + flashblock_consensus: false, } } } @@ -779,6 +783,7 @@ impl OpAddOnsBuilder { tokio_runtime, _nt, flashblocks_url, + flashblock_consensus, .. } = self; OpAddOnsBuilder { @@ -793,6 +798,7 @@ impl OpAddOnsBuilder { rpc_middleware, tokio_runtime, flashblocks_url, + flashblock_consensus, } } @@ -801,6 +807,12 @@ impl OpAddOnsBuilder { self.flashblocks_url = flashblocks_url; self } + + /// With a flashblock consensus client to drive chain forward. + pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self { + self.flashblock_consensus = flashblock_consensus; + self + } } impl OpAddOnsBuilder { @@ -826,6 +838,7 @@ impl OpAddOnsBuilder { rpc_middleware, tokio_runtime, flashblocks_url, + flashblock_consensus, .. } = self; @@ -835,7 +848,8 @@ impl OpAddOnsBuilder { .with_sequencer(sequencer_url.clone()) .with_sequencer_headers(sequencer_headers.clone()) .with_min_suggested_priority_fee(min_suggested_priority_fee) - .with_flashblocks(flashblocks_url), + .with_flashblocks(flashblocks_url) + .with_flashblock_consensus(flashblock_consensus), PVB::default(), EB::default(), EVB::default(), diff --git a/crates/optimism/node/src/rpc.rs b/crates/optimism/node/src/rpc.rs index db811a7f92..b87800a54e 100644 --- a/crates/optimism/node/src/rpc.rs +++ b/crates/optimism/node/src/rpc.rs @@ -13,7 +13,7 @@ //! components::ComponentsBuilder, //! hooks::OnComponentInitializedHook, //! rpc::{EthApiBuilder, EthApiCtx}, -//! LaunchContext, NodeConfig, RethFullAdapter, +//! ConsensusEngineHandle, LaunchContext, NodeConfig, RethFullAdapter, //! }; //! use reth_optimism_chainspec::OP_SEPOLIA; //! use reth_optimism_evm::OpEvmConfig; @@ -67,7 +67,14 @@ //! config.cache, //! node.task_executor().clone(), //! ); -//! let ctx = EthApiCtx { components: node.node_adapter(), config, cache }; +//! // Create a dummy beacon engine handle for offline mode +//! let (tx, _) = tokio::sync::mpsc::unbounded_channel(); +//! let ctx = EthApiCtx { +//! components: node.node_adapter(), +//! config, +//! cache, +//! engine_handle: ConsensusEngineHandle::new(tx), +//! }; //! let eth_api = OpEthApiBuilder::::default().build_eth_api(ctx).await.unwrap(); //! //! // build `trace` namespace API diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index fa2da4f44e..f9b6195133 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -24,8 +24,9 @@ use reth_evm::ConfigureEvm; use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes}; use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; use reth_optimism_flashblocks::{ - FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockRx, FlashBlockService, - FlashblocksListeners, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream, + FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx, + FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblocksListeners, + PendingBlockRx, PendingFlashBlock, WsFlashBlockStream, }; use reth_rpc::eth::core::EthApiInner; use reth_rpc_eth_api::{ @@ -399,6 +400,12 @@ pub struct OpEthApiBuilder { /// /// [flashblocks]: reth_optimism_flashblocks flashblocks_url: Option, + /// Enable flashblock consensus client to drive the chain forward. + /// + /// When enabled, flashblock sequences are submitted to the engine API via + /// `newPayload` and `forkchoiceUpdated` calls, advancing the canonical chain state. + /// Requires `flashblocks_url` to be set. + flashblock_consensus: bool, /// Marker for network types. _nt: PhantomData, } @@ -410,6 +417,7 @@ impl Default for OpEthApiBuilder { sequencer_headers: Vec::new(), min_suggested_priority_fee: 1_000_000, flashblocks_url: None, + flashblock_consensus: false, _nt: PhantomData, } } @@ -423,6 +431,7 @@ impl OpEthApiBuilder { sequencer_headers: Vec::new(), min_suggested_priority_fee: 1_000_000, flashblocks_url: None, + flashblock_consensus: false, _nt: PhantomData, } } @@ -450,6 +459,12 @@ impl OpEthApiBuilder { self.flashblocks_url = flashblocks_url; self } + + /// With flashblock consensus client enabled to drive chain forward + pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self { + self.flashblock_consensus = flashblock_consensus; + self + } } impl EthApiBuilder for OpEthApiBuilder @@ -460,7 +475,15 @@ where + From + Unpin, >, - Types: NodeTypes, + Types: NodeTypes< + ChainSpec: Hardforks + EthereumHardforks, + Payload: reth_node_api::PayloadTypes< + ExecutionData: for<'a> TryFrom< + &'a FlashBlockCompleteSequence, + Error: std::fmt::Display, + >, + >, + >, >, NetworkT: RpcTypes, OpRpcConvert: RpcConvert, @@ -475,6 +498,7 @@ where sequencer_headers, min_suggested_priority_fee, flashblocks_url, + flashblock_consensus, .. } = self; let rpc_converter = @@ -501,14 +525,23 @@ where ctx.components.evm_config().clone(), ctx.components.provider().clone(), ctx.components.task_executor().clone(), - ); + ) + .compute_state_root(flashblock_consensus); // enable state root calculation if flashblock_consensus if enabled. let flashblocks_sequence = service.block_sequence_broadcaster().clone(); let received_flashblocks = service.flashblocks_broadcaster().clone(); let in_progress_rx = service.subscribe_in_progress(); - ctx.components.task_executor().spawn(Box::pin(service.run(tx))); + if flashblock_consensus { + info!(target: "reth::cli", "Launching FlashBlockConsensusClient"); + let flashblock_client = FlashBlockConsensusClient::new( + ctx.engine_handle.clone(), + flashblocks_sequence.subscribe(), + )?; + ctx.components.task_executor().spawn(Box::pin(flashblock_client.run())); + } + Some(FlashblocksListeners::new( pending_rx, flashblocks_sequence, diff --git a/examples/custom-node/Cargo.toml b/examples/custom-node/Cargo.toml index 7fa0020e01..8eb3dbd143 100644 --- a/examples/custom-node/Cargo.toml +++ b/examples/custom-node/Cargo.toml @@ -11,6 +11,7 @@ reth-codecs.workspace = true reth-network-peers.workspace = true reth-node-builder.workspace = true reth-optimism-forks.workspace = true +reth-optimism-flashblocks.workspace = true reth-db-api.workspace = true reth-op = { workspace = true, features = ["node", "pool", "rpc"] } reth-payload-builder.workspace = true diff --git a/examples/custom-node/src/engine.rs b/examples/custom-node/src/engine.rs index d7eabdc19f..02a8ca99b2 100644 --- a/examples/custom-node/src/engine.rs +++ b/examples/custom-node/src/engine.rs @@ -67,6 +67,15 @@ impl ExecutionPayload for CustomExecutionData { } } +impl From<&reth_optimism_flashblocks::FlashBlockCompleteSequence> for CustomExecutionData { + fn from(sequence: &reth_optimism_flashblocks::FlashBlockCompleteSequence) -> Self { + let inner = OpExecutionData::from(sequence); + // Derive extension from sequence data - using gas_used from last flashblock as an example + let extension = sequence.last().diff.gas_used; + Self { inner, extension } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CustomPayloadAttributes { #[serde(flatten)]