feat(flashblock): improve state root calculation condition (#19667)

This commit is contained in:
Francis Li
2025-11-16 02:22:59 -08:00
committed by GitHub
parent a3cebced10
commit 5866a82516
12 changed files with 419 additions and 94 deletions

1
Cargo.lock generated
View File

@@ -3613,6 +3613,7 @@ dependencies = [
"reth-network-peers",
"reth-node-builder",
"reth-op",
"reth-optimism-flashblocks",
"reth-optimism-forks",
"reth-payload-builder",
"reth-rpc-api",

View File

@@ -973,7 +973,12 @@ where
);
let eth_config = config.rpc.eth_config().max_batch_size(config.txpool.max_batch_size());
let ctx = EthApiCtx { components: &node, config: eth_config, cache };
let ctx = EthApiCtx {
components: &node,
config: eth_config,
cache,
engine_handle: beacon_engine_handle.clone(),
};
let eth_api = eth_api_builder.build_eth_api(ctx).await?;
let auth_config = config.rpc.auth_server_config(jwt_secret)?;
@@ -1137,6 +1142,8 @@ pub struct EthApiCtx<'a, N: FullNodeTypes> {
pub config: EthConfig,
/// Cache for eth state
pub cache: EthStateCache<PrimitivesTy<N::Types>>,
/// Handle to the beacon consensus engine
pub engine_handle: ConsensusEngineHandle<<N::Types as NodeTypes>::Payload>,
}
impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumHardforks>>>

View File

@@ -1,86 +1,271 @@
use crate::FlashBlockCompleteSequenceRx;
use crate::{FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx};
use alloy_primitives::B256;
use alloy_rpc_types_engine::PayloadStatusEnum;
use op_alloy_rpc_types_engine::OpExecutionData;
use reth_engine_primitives::ConsensusEngineHandle;
use reth_optimism_payload_builder::OpPayloadTypes;
use reth_payload_primitives::EngineApiMessageVersion;
use reth_payload_primitives::{EngineApiMessageVersion, ExecutionPayload, PayloadTypes};
use ringbuffer::{AllocRingBuffer, RingBuffer};
use tracing::warn;
use tracing::*;
/// Cache entry for block information: (block hash, block number, timestamp).
type BlockCacheEntry = (B256, u64, u64);
/// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`]
///
/// [`FlashBlockService`]: crate::FlashBlockService
#[derive(Debug)]
pub struct FlashBlockConsensusClient {
pub struct FlashBlockConsensusClient<P = OpPayloadTypes>
where
P: PayloadTypes,
{
/// Handle to execution client.
engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
engine_handle: ConsensusEngineHandle<P>,
sequence_receiver: FlashBlockCompleteSequenceRx,
/// Caches previous block info for lookup: (block hash, block number, timestamp).
block_hash_buffer: AllocRingBuffer<BlockCacheEntry>,
}
impl FlashBlockConsensusClient {
impl<P> FlashBlockConsensusClient<P>
where
P: PayloadTypes,
P::ExecutionData: for<'a> TryFrom<&'a FlashBlockCompleteSequence, Error: std::fmt::Display>,
{
/// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver.
pub const fn new(
engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
pub fn new(
engine_handle: ConsensusEngineHandle<P>,
sequence_receiver: FlashBlockCompleteSequenceRx,
) -> eyre::Result<Self> {
Ok(Self { engine_handle, sequence_receiver })
// Buffer size of 768 blocks (64 * 12) supports 1s block time chains like Unichain.
// Oversized for 2s block time chains like Base, but acceptable given minimal memory usage.
let block_hash_buffer = AllocRingBuffer::new(768);
Ok(Self { engine_handle, sequence_receiver, block_hash_buffer })
}
/// Get previous block hash using previous block hash buffer. If it isn't available (buffer
/// started more recently than `offset`), return default zero hash
fn get_previous_block_hash(
/// Return the safe and finalized block hash for FCU calls.
///
/// Safe blocks are considered 32 L1 blocks (approximately 384s at 12s/block) behind the head,
/// and finalized blocks are 64 L1 blocks (approximately 768s) behind the head. This
/// approximation, while not precisely matching the OP stack's derivation, provides
/// sufficient proximity and enables op-reth to sync the chain independently of an op-node.
/// The offset is dynamically adjusted based on the actual block time detected from the
/// buffer.
fn get_safe_and_finalized_block_hash(&self) -> (B256, B256) {
let cached_blocks_count = self.block_hash_buffer.len();
// Not enough blocks to determine safe/finalized yet
if cached_blocks_count < 2 {
return (B256::ZERO, B256::ZERO);
}
// Calculate average block time using block numbers to handle missing blocks correctly.
// By dividing timestamp difference by block number difference, we get accurate block
// time even when blocks are missing from the buffer.
let (_, latest_block_number, latest_timestamp) =
self.block_hash_buffer.get(cached_blocks_count - 1).unwrap();
let (_, previous_block_number, previous_timestamp) =
self.block_hash_buffer.get(cached_blocks_count - 2).unwrap();
let timestamp_delta = latest_timestamp.saturating_sub(*previous_timestamp);
let block_number_delta = latest_block_number.saturating_sub(*previous_block_number).max(1);
let block_time_secs = timestamp_delta / block_number_delta;
// L1 reference: 32 blocks * 12s = 384s for safe, 64 blocks * 12s = 768s for finalized
const SAFE_TIME_SECS: u64 = 384;
const FINALIZED_TIME_SECS: u64 = 768;
// Calculate how many L2 blocks correspond to these L1 time periods
let safe_block_offset =
(SAFE_TIME_SECS / block_time_secs).min(cached_blocks_count as u64) as usize;
let finalized_block_offset =
(FINALIZED_TIME_SECS / block_time_secs).min(cached_blocks_count as u64) as usize;
// Get safe hash: offset from end of buffer
let safe_hash = self
.block_hash_buffer
.get(cached_blocks_count.saturating_sub(safe_block_offset))
.map(|&(hash, _, _)| hash)
.unwrap();
// Get finalized hash: offset from end of buffer
let finalized_hash = self
.block_hash_buffer
.get(cached_blocks_count.saturating_sub(finalized_block_offset))
.map(|&(hash, _, _)| hash)
.unwrap();
(safe_hash, finalized_hash)
}
/// Receive the next flashblock sequence and cache its block information.
///
/// Returns `None` if receiving fails (error is already logged).
async fn receive_and_cache_sequence(&mut self) -> Option<FlashBlockCompleteSequence> {
match self.sequence_receiver.recv().await {
Ok(sequence) => {
self.block_hash_buffer.push((
sequence.payload_base().parent_hash,
sequence.block_number(),
sequence.payload_base().timestamp,
));
Some(sequence)
}
Err(err) => {
error!(
target: "flashblocks",
%err,
"error while fetching flashblock completed sequence",
);
None
}
}
}
/// Convert a flashblock sequence to an execution payload.
///
/// Returns `None` if conversion fails (error is already logged).
fn convert_sequence_to_payload(
&self,
previous_block_hashes: &AllocRingBuffer<B256>,
offset: usize,
) -> B256 {
*previous_block_hashes
.len()
.checked_sub(offset)
.and_then(|index| previous_block_hashes.get(index))
.unwrap_or_default()
sequence: &FlashBlockCompleteSequence,
) -> Option<P::ExecutionData> {
match P::ExecutionData::try_from(sequence) {
Ok(payload) => Some(payload),
Err(err) => {
error!(
target: "flashblocks",
%err,
"error while converting to payload from completed sequence",
);
None
}
}
}
/// Submit a new payload to the engine.
///
/// Returns `Ok(block_hash)` if the payload was accepted, `Err(())` otherwise (errors are
/// logged).
async fn submit_new_payload(
&self,
payload: P::ExecutionData,
sequence: &FlashBlockCompleteSequence,
) -> Result<B256, ()> {
let block_number = payload.block_number();
let block_hash = payload.block_hash();
match self.engine_handle.new_payload(payload).await {
Ok(result) => {
debug!(
target: "flashblocks",
flashblock_count = sequence.count(),
block_number,
%block_hash,
?result,
"Submitted engine_newPayload",
);
if let PayloadStatusEnum::Invalid { validation_error } = result.status {
debug!(
target: "flashblocks",
block_number,
%block_hash,
%validation_error,
"Payload validation error",
);
return Err(());
}
Ok(block_hash)
}
Err(err) => {
error!(
target: "flashblocks",
%err,
block_number,
"Failed to submit new payload",
);
Err(())
}
}
}
/// Submit a forkchoice update to the engine.
async fn submit_forkchoice_update(
&self,
head_block_hash: B256,
sequence: &FlashBlockCompleteSequence,
) {
let block_number = sequence.block_number();
let (safe_hash, finalized_hash) = self.get_safe_and_finalized_block_hash();
let fcu_state = alloy_rpc_types_engine::ForkchoiceState {
head_block_hash,
safe_block_hash: safe_hash,
finalized_block_hash: finalized_hash,
};
match self
.engine_handle
.fork_choice_updated(fcu_state, None, EngineApiMessageVersion::V5)
.await
{
Ok(result) => {
debug!(
target: "flashblocks",
flashblock_count = sequence.count(),
block_number,
%head_block_hash,
%safe_hash,
%finalized_hash,
?result,
"Submitted engine_forkChoiceUpdated",
)
}
Err(err) => {
error!(
target: "flashblocks",
%err,
block_number,
%head_block_hash,
%safe_hash,
%finalized_hash,
"Failed to submit fork choice update",
);
}
}
}
/// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
/// blocks.
pub async fn run(mut self) {
let mut previous_block_hashes = AllocRingBuffer::new(64);
loop {
match self.sequence_receiver.recv().await {
Ok(sequence) => {
let block_hash = sequence.payload_base().parent_hash;
previous_block_hashes.push(block_hash);
let Some(sequence) = self.receive_and_cache_sequence().await else {
continue;
};
if sequence.state_root().is_none() {
warn!(target: "flashblocks", "Missing state root for the complete sequence")
}
let Some(payload) = self.convert_sequence_to_payload(&sequence) else {
continue;
};
// Load previous block hashes. We're using (head - 32) and (head - 64) as the
// safe and finalized block hashes.
let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32);
let finalized_block_hash =
self.get_previous_block_hash(&previous_block_hashes, 64);
let Ok(block_hash) = self.submit_new_payload(payload, &sequence).await else {
continue;
};
let state = alloy_rpc_types_engine::ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash,
finalized_block_hash,
};
// Send FCU
let _ = self
.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::V3)
.await;
}
Err(err) => {
warn!(
target: "flashblocks",
%err,
"error while fetching flashblock completed sequence"
);
break;
}
}
self.submit_forkchoice_update(block_hash, &sequence).await;
}
}
}
impl From<&FlashBlockCompleteSequence> for OpExecutionData {
fn from(sequence: &FlashBlockCompleteSequence) -> Self {
let mut data = Self::from_flashblocks_unchecked(sequence);
// Replace payload's state_root with the calculated one. For flashblocks, there was an
// option to disable state root calculation for blocks, and in that case, the payload's
// state_root will be zero, and we'll need to locally calculate state_root before
// proceeding to call engine_newPayload.
if let Some(execution_outcome) = sequence.execution_outcome() {
let payload = data.payload.as_v1_mut();
payload.state_root = execution_outcome.state_root;
payload.block_hash = execution_outcome.block_hash;
}
data
}
}

View File

@@ -13,18 +13,24 @@ use tracing::{debug, trace, warn};
/// The size of the broadcast channel for completed flashblock sequences.
const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;
/// Outcome from executing a flashblock sequence.
#[derive(Debug, Clone, Copy)]
pub struct SequenceExecutionOutcome {
/// The block hash of the executed pending block
pub block_hash: B256,
/// Properly computed state root
pub state_root: B256,
}
/// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices.
#[derive(Debug)]
pub struct FlashBlockPendingSequence<T> {
/// tracks the individual flashblocks in order
///
/// With a blocktime of 2s and flashblock tick-rate of 200ms plus one extra flashblock per new
/// pending block, we expect 11 flashblocks per slot.
inner: BTreeMap<u64, PreparedFlashBlock<T>>,
/// Broadcasts flashblocks to subscribers.
block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
/// Optional properly computed state root for the current sequence.
state_root: Option<B256>,
/// Optional execution outcome from building the current sequence.
execution_outcome: Option<SequenceExecutionOutcome>,
}
impl<T> FlashBlockPendingSequence<T>
@@ -36,7 +42,7 @@ where
// Note: if the channel is full, send will not block but rather overwrite the oldest
// messages. Order is preserved.
let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None }
Self { inner: BTreeMap::new(), block_broadcaster: tx, execution_outcome: None }
}
/// Returns the sender half of the [`FlashBlockCompleteSequence`] channel.
@@ -53,13 +59,18 @@ where
// Clears the state and broadcasts the blocks produced to subscribers.
fn clear_and_broadcast_blocks(&mut self) {
if self.inner.is_empty() {
return;
}
let flashblocks = mem::take(&mut self.inner);
let execution_outcome = mem::take(&mut self.execution_outcome);
// If there are any subscribers, send the flashblocks to them.
if self.block_broadcaster.receiver_count() > 0 {
let flashblocks = match FlashBlockCompleteSequence::new(
flashblocks.into_iter().map(|block| block.1.into()).collect(),
self.state_root,
execution_outcome,
) {
Ok(flashblocks) => flashblocks,
Err(err) => {
@@ -106,9 +117,12 @@ where
Ok(())
}
/// Set state root
pub const fn set_state_root(&mut self, state_root: Option<B256>) {
self.state_root = state_root;
/// Set execution outcome from building the flashblock sequence
pub const fn set_execution_outcome(
&mut self,
execution_outcome: Option<SequenceExecutionOutcome>,
) {
self.execution_outcome = execution_outcome;
}
/// Iterator over sequence of executable transactions.
@@ -171,12 +185,12 @@ where
///
/// Ensures invariants of a complete flashblocks sequence.
/// If this entire sequence of flashblocks was executed on top of latest block, this also includes
/// the computed state root.
/// the execution outcome with block hash and state root.
#[derive(Debug, Clone)]
pub struct FlashBlockCompleteSequence {
inner: Vec<FlashBlock>,
/// Optional state root for the current sequence
state_root: Option<B256>,
/// Optional execution outcome from building the flashblock sequence
execution_outcome: Option<SequenceExecutionOutcome>,
}
impl FlashBlockCompleteSequence {
@@ -185,7 +199,10 @@ impl FlashBlockCompleteSequence {
/// * vector is not empty
/// * first flashblock have the base payload
/// * sequence of flashblocks is sound (successive index from 0, same payload id, ...)
pub fn new(blocks: Vec<FlashBlock>, state_root: Option<B256>) -> eyre::Result<Self> {
pub fn new(
blocks: Vec<FlashBlock>,
execution_outcome: Option<SequenceExecutionOutcome>,
) -> eyre::Result<Self> {
let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
// Ensure that first flashblock have base
@@ -200,7 +217,7 @@ impl FlashBlockCompleteSequence {
bail!("Flashblock inconsistencies detected in sequence");
}
Ok(Self { inner: blocks, state_root })
Ok(Self { inner: blocks, execution_outcome })
}
/// Returns the block number
@@ -223,9 +240,9 @@ impl FlashBlockCompleteSequence {
self.inner.last().unwrap()
}
/// Returns the state root for the current sequence
pub const fn state_root(&self) -> Option<B256> {
self.state_root
/// Returns the execution outcome of the sequence.
pub const fn execution_outcome(&self) -> Option<SequenceExecutionOutcome> {
self.execution_outcome
}
/// Returns all transactions from all flashblocks in the sequence
@@ -247,7 +264,7 @@ impl<T> TryFrom<FlashBlockPendingSequence<T>> for FlashBlockCompleteSequence {
fn try_from(sequence: FlashBlockPendingSequence<T>) -> Result<Self, Self::Error> {
Self::new(
sequence.inner.into_values().map(|block| block.block().clone()).collect::<Vec<_>>(),
sequence.state_root,
sequence.execution_outcome,
)
}
}

View File

@@ -1,5 +1,5 @@
use crate::{
sequence::FlashBlockPendingSequence,
sequence::{FlashBlockPendingSequence, SequenceExecutionOutcome},
worker::{BuildArgs, FlashBlockBuilder},
FlashBlock, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx, InProgressFlashBlockRx,
PendingFlashBlock,
@@ -30,7 +30,8 @@ use tokio::{
};
use tracing::{debug, trace, warn};
pub(crate) const FB_STATE_ROOT_FROM_INDEX: usize = 9;
/// 200 ms flashblock time.
pub(crate) const FLASHBLOCK_BLOCK_TIME: u64 = 200;
/// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of
/// [`FlashBlock`]s.
@@ -60,7 +61,7 @@ pub struct FlashBlockService<
in_progress_tx: watch::Sender<Option<FlashBlockBuildInfo>>,
/// `FlashBlock` service's metrics
metrics: FlashBlockServiceMetrics,
/// Enable state root calculation from flashblock with index [`FB_STATE_ROOT_FROM_INDEX`]
/// Enable state root calculation
compute_state_root: bool,
}
@@ -177,10 +178,13 @@ where
return None
};
let Some(latest) = self.builder.provider().latest_header().ok().flatten() else {
trace!(target: "flashblocks", "No latest header found");
return None
};
// attempt an initial consecutive check
if let Some(latest) = self.builder.provider().latest_header().ok().flatten() &&
latest.hash() != base.parent_hash
{
if latest.hash() != base.parent_hash {
trace!(target: "flashblocks", flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
return None
}
@@ -190,9 +194,39 @@ where
return None
};
// Check if state root must be computed
let compute_state_root =
self.compute_state_root && self.blocks.index() >= Some(FB_STATE_ROOT_FROM_INDEX as u64);
// Auto-detect when to compute state root: only if the builder didn't provide it (sent
// B256::ZERO) and we're near the expected final flashblock index.
//
// Background: Each block period receives multiple flashblocks at regular intervals.
// The sequencer sends an initial "base" flashblock at index 0 when a new block starts,
// then subsequent flashblocks are produced every FLASHBLOCK_BLOCK_TIME intervals (200ms).
//
// Examples with different block times:
// - Base (2s blocks): expect 2000ms / 200ms = 10 intervals → Flashblocks: index 0 (base)
// + indices 1-10 = potentially 11 total
//
// - Unichain (1s blocks): expect 1000ms / 200ms = 5 intervals → Flashblocks: index 0 (base)
// + indices 1-5 = potentially 6 total
//
// Why compute at N-1 instead of N:
// 1. Timing variance in flashblock producing time may mean only N flashblocks were produced
// instead of N+1 (missing the final one). Computing at N-1 ensures we get the state root
// for most common cases.
//
// 2. The +1 case (index 0 base + N intervals): If all N+1 flashblocks do arrive, we'll
// still calculate state root for flashblock N, which sacrifices a little performance but
// still ensures correctness for common cases.
//
// Note: Pathological cases may result in fewer flashblocks than expected (e.g., builder
// downtime, flashblock execution exceeding timing budget). When this occurs, we won't
// compute the state root, causing FlashblockConsensusClient to lack precomputed state for
// engine_newPayload. This is safe: we still have op-node as backstop to maintain
// chain progression.
let block_time_ms = (base.timestamp - latest.timestamp()) * 1000;
let expected_final_flashblock = block_time_ms / FLASHBLOCK_BLOCK_TIME;
let compute_state_root = self.compute_state_root &&
last_flashblock.diff.state_root.is_zero() &&
self.blocks.index() >= Some(expected_final_flashblock.saturating_sub(1));
Some(BuildArgs {
base,
@@ -268,8 +302,15 @@ where
if let Some((now, result)) = result {
match result {
Ok(Some((new_pending, cached_reads))) => {
// update state root of the current sequence
this.blocks.set_state_root(new_pending.computed_state_root());
// update execution outcome of the current sequence
let execution_outcome =
new_pending.computed_state_root().map(|state_root| {
SequenceExecutionOutcome {
block_hash: new_pending.block().hash(),
state_root,
}
});
this.blocks.set_execution_outcome(execution_outcome);
// built a new pending block
this.current = Some(new_pending.clone());

View File

@@ -107,6 +107,7 @@ where
// if the real state root should be computed
let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
if args.compute_state_root {
trace!(target: "flashblocks", "Computing block state root");
builder.finish(&state_provider)?
} else {
builder.finish(NoopProvider::default())?

View File

@@ -74,6 +74,14 @@ pub struct RollupArgs {
/// block tag will use the pending state based on flashblocks.
#[arg(long)]
pub flashblocks_url: Option<Url>,
/// Enable flashblock consensus client to drive the chain forward
///
/// When enabled, the flashblock consensus client will process flashblock sequences and submit
/// them to the engine API to advance the chain.
/// Requires `flashblocks_url` to be set.
#[arg(long, default_value_t = false, requires = "flashblocks_url")]
pub flashblock_consensus: bool,
}
impl Default for RollupArgs {
@@ -90,6 +98,7 @@ impl Default for RollupArgs {
historical_rpc: None,
min_suggested_priority_fee: 1_000_000,
flashblocks_url: None,
flashblock_consensus: false,
}
}
}

View File

@@ -194,6 +194,7 @@ impl OpNode {
.with_min_suggested_priority_fee(self.args.min_suggested_priority_fee)
.with_historical_rpc(self.args.historical_rpc.clone())
.with_flashblocks(self.args.flashblocks_url.clone())
.with_flashblock_consensus(self.args.flashblock_consensus)
}
/// Instantiates the [`ProviderFactoryBuilder`] for an opstack node.
@@ -695,6 +696,8 @@ pub struct OpAddOnsBuilder<NetworkT, RpcMiddleware = Identity> {
tokio_runtime: Option<tokio::runtime::Handle>,
/// A URL pointing to a secure websocket service that streams out flashblocks.
flashblocks_url: Option<Url>,
/// Enable flashblock consensus client to drive chain forward.
flashblock_consensus: bool,
}
impl<NetworkT> Default for OpAddOnsBuilder<NetworkT> {
@@ -711,6 +714,7 @@ impl<NetworkT> Default for OpAddOnsBuilder<NetworkT> {
rpc_middleware: Identity::new(),
tokio_runtime: None,
flashblocks_url: None,
flashblock_consensus: false,
}
}
}
@@ -779,6 +783,7 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
tokio_runtime,
_nt,
flashblocks_url,
flashblock_consensus,
..
} = self;
OpAddOnsBuilder {
@@ -793,6 +798,7 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
rpc_middleware,
tokio_runtime,
flashblocks_url,
flashblock_consensus,
}
}
@@ -801,6 +807,12 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
self.flashblocks_url = flashblocks_url;
self
}
/// With a flashblock consensus client to drive chain forward.
pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self {
self.flashblock_consensus = flashblock_consensus;
self
}
}
impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
@@ -826,6 +838,7 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
rpc_middleware,
tokio_runtime,
flashblocks_url,
flashblock_consensus,
..
} = self;
@@ -835,7 +848,8 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
.with_sequencer(sequencer_url.clone())
.with_sequencer_headers(sequencer_headers.clone())
.with_min_suggested_priority_fee(min_suggested_priority_fee)
.with_flashblocks(flashblocks_url),
.with_flashblocks(flashblocks_url)
.with_flashblock_consensus(flashblock_consensus),
PVB::default(),
EB::default(),
EVB::default(),

View File

@@ -13,7 +13,7 @@
//! components::ComponentsBuilder,
//! hooks::OnComponentInitializedHook,
//! rpc::{EthApiBuilder, EthApiCtx},
//! LaunchContext, NodeConfig, RethFullAdapter,
//! ConsensusEngineHandle, LaunchContext, NodeConfig, RethFullAdapter,
//! };
//! use reth_optimism_chainspec::OP_SEPOLIA;
//! use reth_optimism_evm::OpEvmConfig;
@@ -67,7 +67,14 @@
//! config.cache,
//! node.task_executor().clone(),
//! );
//! let ctx = EthApiCtx { components: node.node_adapter(), config, cache };
//! // Create a dummy beacon engine handle for offline mode
//! let (tx, _) = tokio::sync::mpsc::unbounded_channel();
//! let ctx = EthApiCtx {
//! components: node.node_adapter(),
//! config,
//! cache,
//! engine_handle: ConsensusEngineHandle::new(tx),
//! };
//! let eth_api = OpEthApiBuilder::<Optimism>::default().build_eth_api(ctx).await.unwrap();
//!
//! // build `trace` namespace API

View File

@@ -24,8 +24,9 @@ use reth_evm::ConfigureEvm;
use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
use reth_optimism_flashblocks::{
FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockRx, FlashBlockService,
FlashblocksListeners, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx,
FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblocksListeners,
PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
};
use reth_rpc::eth::core::EthApiInner;
use reth_rpc_eth_api::{
@@ -399,6 +400,12 @@ pub struct OpEthApiBuilder<NetworkT = Optimism> {
///
/// [flashblocks]: reth_optimism_flashblocks
flashblocks_url: Option<Url>,
/// Enable flashblock consensus client to drive the chain forward.
///
/// When enabled, flashblock sequences are submitted to the engine API via
/// `newPayload` and `forkchoiceUpdated` calls, advancing the canonical chain state.
/// Requires `flashblocks_url` to be set.
flashblock_consensus: bool,
/// Marker for network types.
_nt: PhantomData<NetworkT>,
}
@@ -410,6 +417,7 @@ impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
sequencer_headers: Vec::new(),
min_suggested_priority_fee: 1_000_000,
flashblocks_url: None,
flashblock_consensus: false,
_nt: PhantomData,
}
}
@@ -423,6 +431,7 @@ impl<NetworkT> OpEthApiBuilder<NetworkT> {
sequencer_headers: Vec::new(),
min_suggested_priority_fee: 1_000_000,
flashblocks_url: None,
flashblock_consensus: false,
_nt: PhantomData,
}
}
@@ -450,6 +459,12 @@ impl<NetworkT> OpEthApiBuilder<NetworkT> {
self.flashblocks_url = flashblocks_url;
self
}
/// With flashblock consensus client enabled to drive chain forward
pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self {
self.flashblock_consensus = flashblock_consensus;
self
}
}
impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
@@ -460,7 +475,15 @@ where
+ From<OpFlashblockPayloadBase>
+ Unpin,
>,
Types: NodeTypes<ChainSpec: Hardforks + EthereumHardforks>,
Types: NodeTypes<
ChainSpec: Hardforks + EthereumHardforks,
Payload: reth_node_api::PayloadTypes<
ExecutionData: for<'a> TryFrom<
&'a FlashBlockCompleteSequence,
Error: std::fmt::Display,
>,
>,
>,
>,
NetworkT: RpcTypes,
OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
@@ -475,6 +498,7 @@ where
sequencer_headers,
min_suggested_priority_fee,
flashblocks_url,
flashblock_consensus,
..
} = self;
let rpc_converter =
@@ -501,14 +525,23 @@ where
ctx.components.evm_config().clone(),
ctx.components.provider().clone(),
ctx.components.task_executor().clone(),
);
)
.compute_state_root(flashblock_consensus); // enable state root calculation if flashblock_consensus if enabled.
let flashblocks_sequence = service.block_sequence_broadcaster().clone();
let received_flashblocks = service.flashblocks_broadcaster().clone();
let in_progress_rx = service.subscribe_in_progress();
ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
if flashblock_consensus {
info!(target: "reth::cli", "Launching FlashBlockConsensusClient");
let flashblock_client = FlashBlockConsensusClient::new(
ctx.engine_handle.clone(),
flashblocks_sequence.subscribe(),
)?;
ctx.components.task_executor().spawn(Box::pin(flashblock_client.run()));
}
Some(FlashblocksListeners::new(
pending_rx,
flashblocks_sequence,

View File

@@ -11,6 +11,7 @@ reth-codecs.workspace = true
reth-network-peers.workspace = true
reth-node-builder.workspace = true
reth-optimism-forks.workspace = true
reth-optimism-flashblocks.workspace = true
reth-db-api.workspace = true
reth-op = { workspace = true, features = ["node", "pool", "rpc"] }
reth-payload-builder.workspace = true

View File

@@ -67,6 +67,15 @@ impl ExecutionPayload for CustomExecutionData {
}
}
impl From<&reth_optimism_flashblocks::FlashBlockCompleteSequence> for CustomExecutionData {
fn from(sequence: &reth_optimism_flashblocks::FlashBlockCompleteSequence) -> Self {
let inner = OpExecutionData::from(sequence);
// Derive extension from sequence data - using gas_used from last flashblock as an example
let extension = sequence.last().diff.gas_used;
Self { inner, extension }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomPayloadAttributes {
#[serde(flatten)]