From 59cff107bc2f39e95b8dac3ac98ee2705f66abe3 Mon Sep 17 00:00:00 2001 From: Julio <30329843+julio4@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:13:22 +0900 Subject: [PATCH] feat(op-reth): initial setup FlashBlockConsensusClient engine sidecar (#18443) --- Cargo.lock | 3 + crates/optimism/flashblocks/Cargo.toml | 5 ++ crates/optimism/flashblocks/src/consensus.rs | 81 ++++++++++++++++++++ crates/optimism/flashblocks/src/lib.rs | 2 + crates/optimism/flashblocks/src/sequence.rs | 21 +++-- crates/optimism/flashblocks/src/service.rs | 9 +-- 6 files changed, 110 insertions(+), 11 deletions(-) create mode 100644 crates/optimism/flashblocks/src/consensus.rs diff --git a/Cargo.lock b/Cargo.lock index a85a351777..499e14f04d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9304,13 +9304,16 @@ dependencies = [ "reth-errors", "reth-evm", "reth-execution-types", + "reth-node-api", "reth-optimism-evm", + "reth-optimism-payload-builder", "reth-optimism-primitives", "reth-primitives-traits", "reth-revm", "reth-rpc-eth-types", "reth-storage-api", "reth-tasks", + "ringbuffer", "serde", "serde_json", "test-case", diff --git a/crates/optimism/flashblocks/Cargo.toml b/crates/optimism/flashblocks/Cargo.toml index e83815bfd8..2344911ffc 100644 --- a/crates/optimism/flashblocks/Cargo.toml +++ b/crates/optimism/flashblocks/Cargo.toml @@ -19,9 +19,11 @@ reth-primitives-traits = { workspace = true, features = ["serde"] } reth-execution-types = { workspace = true, features = ["serde"] } reth-evm.workspace = true reth-revm.workspace = true +reth-optimism-payload-builder.workspace = true reth-rpc-eth-types.workspace = true reth-errors.workspace = true reth-storage-api.workspace = true +reth-node-api.workspace = true reth-tasks.workspace = true # alloy @@ -29,6 +31,7 @@ alloy-eips = { workspace = true, features = ["serde"] } alloy-serde.workspace = true alloy-primitives = { workspace = true, features = ["serde"] } alloy-rpc-types-engine = { workspace = true, features = ["serde"] } +alloy-consensus.workspace = true # io tokio.workspace = true @@ -45,6 +48,8 @@ tracing.workspace = true # errors eyre.workspace = true +ringbuffer.workspace = true + [dev-dependencies] test-case.workspace = true alloy-consensus.workspace = true diff --git a/crates/optimism/flashblocks/src/consensus.rs b/crates/optimism/flashblocks/src/consensus.rs new file mode 100644 index 0000000000..90071141b9 --- /dev/null +++ b/crates/optimism/flashblocks/src/consensus.rs @@ -0,0 +1,81 @@ +use crate::FlashBlockCompleteSequenceRx; +use alloy_primitives::B256; +use reth_node_api::{ConsensusEngineHandle, EngineApiMessageVersion}; +use reth_optimism_payload_builder::OpPayloadTypes; +use ringbuffer::{AllocRingBuffer, RingBuffer}; +use tracing::warn; + +/// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`] +/// +/// [`FlashBlockService`]: crate::FlashBlockService +#[derive(Debug)] +pub struct FlashBlockConsensusClient { + /// Handle to execution client. + engine_handle: ConsensusEngineHandle, + sequence_receiver: FlashBlockCompleteSequenceRx, +} + +impl FlashBlockConsensusClient { + /// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver. + pub const fn new( + engine_handle: ConsensusEngineHandle, + sequence_receiver: FlashBlockCompleteSequenceRx, + ) -> eyre::Result { + Ok(Self { engine_handle, sequence_receiver }) + } + + /// Get previous block hash using previous block hash buffer. If it isn't available (buffer + /// started more recently than `offset`), return default zero hash + fn get_previous_block_hash( + &self, + previous_block_hashes: &AllocRingBuffer, + offset: usize, + ) -> B256 { + *previous_block_hashes + .len() + .checked_sub(offset) + .and_then(|index| previous_block_hashes.get(index)) + .unwrap_or_default() + } + + /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent + /// blocks. + pub async fn run(mut self) { + let mut previous_block_hashes = AllocRingBuffer::new(64); + + loop { + match self.sequence_receiver.recv().await { + Ok(sequence) => { + let block_hash = sequence.payload_base().parent_hash; + previous_block_hashes.push(block_hash); + + // Load previous block hashes. We're using (head - 32) and (head - 64) as the + // safe and finalized block hashes. + let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32); + let finalized_block_hash = + self.get_previous_block_hash(&previous_block_hashes, 64); + + let state = alloy_rpc_types_engine::ForkchoiceState { + head_block_hash: block_hash, + safe_block_hash, + finalized_block_hash, + }; + + // Send FCU + let _ = self + .engine_handle + .fork_choice_updated(state, None, EngineApiMessageVersion::V3) + .await; + } + Err(err) => { + warn!( + target: "consensus::flashblock-client", + %err, + "error while fetching flashblock completed sequence" + ); + break; + } + } + } + } +} diff --git a/crates/optimism/flashblocks/src/lib.rs b/crates/optimism/flashblocks/src/lib.rs index f189afa8f6..1d13adad89 100644 --- a/crates/optimism/flashblocks/src/lib.rs +++ b/crates/optimism/flashblocks/src/lib.rs @@ -7,6 +7,8 @@ use reth_rpc_eth_types::PendingBlock; pub use service::FlashBlockService; pub use ws::{WsConnect, WsFlashBlockStream}; +mod consensus; +pub use consensus::FlashBlockConsensusClient; mod payload; mod sequence; pub use sequence::FlashBlockCompleteSequence; diff --git a/crates/optimism/flashblocks/src/sequence.rs b/crates/optimism/flashblocks/src/sequence.rs index 72abfdca16..0976909442 100644 --- a/crates/optimism/flashblocks/src/sequence.rs +++ b/crates/optimism/flashblocks/src/sequence.rs @@ -1,9 +1,9 @@ -use crate::{ExecutionPayloadBaseV1, FlashBlock}; +use crate::{ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx}; use alloy_eips::eip2718::WithEncoded; use core::mem; use eyre::{bail, OptionExt}; use reth_primitives_traits::{Recovered, SignedTransaction}; -use std::collections::BTreeMap; +use std::{collections::BTreeMap, ops::Deref}; use tokio::sync::broadcast; use tracing::{debug, trace, warn}; @@ -34,9 +34,7 @@ where } /// Gets a subscriber to the flashblock sequences produced. - pub(crate) fn subscribe_block_sequence( - &self, - ) -> broadcast::Receiver { + pub(crate) fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx { self.block_broadcaster.subscribe() } @@ -168,6 +166,19 @@ impl FlashBlockCompleteSequence { pub const fn count(&self) -> usize { self.0.len() } + + /// Returns the last flashblock in the sequence. + pub fn last(&self) -> &FlashBlock { + self.0.last().unwrap() + } +} + +impl Deref for FlashBlockCompleteSequence { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } } impl TryFrom> for FlashBlockCompleteSequence { diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 9b93baad0d..831aac550f 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,7 +1,7 @@ use crate::{ sequence::FlashBlockPendingSequence, worker::{BuildArgs, FlashBlockBuilder}, - ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequence, + ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, }; use alloy_eips::eip2718::WithEncoded; use alloy_primitives::B256; @@ -20,10 +20,7 @@ use std::{ task::{ready, Context, Poll}, time::Instant, }; -use tokio::{ - pin, - sync::{broadcast, oneshot}, -}; +use tokio::{pin, sync::oneshot}; use tracing::{debug, trace, warn}; /// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of @@ -84,7 +81,7 @@ where } /// Returns a subscriber to the flashblock sequence. - pub fn subscribe_block_sequence(&self) -> broadcast::Receiver { + pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx { self.blocks.subscribe_block_sequence() }