refactor(optimism): Extract pending block building responsibility out of FlashBlockService (#18247)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Roman Hodulák
2025-09-08 11:15:59 +02:00
committed by GitHub
parent a14f345c27
commit 4f930c25c4
6 changed files with 288 additions and 152 deletions

1
Cargo.lock generated
View File

@@ -9304,6 +9304,7 @@ dependencies = [
"reth-revm",
"reth-rpc-eth-types",
"reth-storage-api",
"reth-tasks",
"serde",
"serde_json",
"test-case",

View File

@@ -22,6 +22,7 @@ reth-revm.workspace = true
reth-rpc-eth-types.workspace = true
reth-errors.workspace = true
reth-storage-api.workspace = true
reth-tasks.workspace = true
# alloy
alloy-eips = { workspace = true, features = ["serde"] }

View File

@@ -10,6 +10,7 @@ pub use ws::{WsConnect, WsFlashBlockStream};
mod payload;
mod sequence;
mod service;
mod worker;
mod ws;
/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s.

View File

@@ -1,27 +1,26 @@
use crate::{sequence::FlashBlockSequence, ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::BlockNumberOrTag;
use crate::{
sequence::FlashBlockSequence,
worker::{BuildArgs, FlashBlockBuilder},
ExecutionPayloadBaseV1, FlashBlock,
};
use alloy_eips::eip2718::WithEncoded;
use alloy_primitives::B256;
use futures_util::{FutureExt, Stream, StreamExt};
use reth_chain_state::{
CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock,
use reth_chain_state::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions};
use reth_evm::ConfigureEvm;
use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
};
use reth_errors::RethError;
use reth_evm::{
execute::{BlockBuilder, BlockBuilderOutcome},
ConfigureEvm,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
use reth_revm::cached::CachedReads;
use reth_rpc_eth_types::PendingBlock;
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_tasks::TaskExecutor;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
task::{ready, Context, Poll},
time::Instant,
};
use tokio::pin;
use tokio::{pin, sync::oneshot};
use tracing::{debug, trace, warn};
/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
@@ -37,9 +36,10 @@ pub struct FlashBlockService<
current: Option<PendingBlock<N>>,
blocks: FlashBlockSequence<N::SignedTx>,
rebuild: bool,
evm_config: EvmConfig,
provider: Provider,
builder: FlashBlockBuilder<EvmConfig, Provider>,
canon_receiver: CanonStateNotifications<N>,
spawner: TaskExecutor,
job: Option<BuildJob<N>>,
/// Cached state reads for the current block.
/// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when
/// fb received on top of the same block. Avoid redundant I/O across multiple executions
@@ -50,8 +50,10 @@ pub struct FlashBlockService<
impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
where
N: NodePrimitives,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
+ Clone
+ 'static,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
+ BlockReaderIdExt<
@@ -59,19 +61,22 @@ where
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
> + Unpin,
> + Unpin
+ Clone
+ 'static,
{
/// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self {
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self {
Self {
rx,
current: None,
blocks: FlashBlockSequence::new(),
evm_config,
canon_receiver: provider.subscribe_to_canonical_state(),
provider,
cached_state: None,
builder: FlashBlockBuilder::new(evm_config, provider),
rebuild: false,
spawner,
job: None,
cached_state: None,
}
}
@@ -88,86 +93,35 @@ where
warn!("Flashblock service has stopped");
}
/// Returns the cached reads at the given head hash.
/// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier.
///
/// Returns a new cache instance if this is new `head` hash.
fn cached_reads(&mut self, head: B256) -> CachedReads {
if let Some((tracked, cache)) = self.cached_state.take() {
if tracked == head {
return cache
/// Returns `None` if the flashblock have no `base` or the base is not a child block of latest.
fn build_args(
&mut self,
) -> Option<BuildArgs<impl IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>>> {
let Some(base) = self.blocks.payload_base() else {
trace!(
flashblock_number = ?self.blocks.block_number(),
count = %self.blocks.count(),
"Missing flashblock payload base"
);
return None
};
// attempt an initial consecutive check
if let Some(latest) = self.builder.provider().latest_header().ok().flatten() {
if latest.hash() != base.parent_hash {
trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
return None;
}
}
// instantiate a new cache instance
CachedReads::default()
}
/// Updates the cached reads at the given head hash
fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) {
self.cached_state = Some((head, cached_reads));
}
/// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier.
///
/// Returns None if the flashblock doesn't attach to the latest header.
fn execute(&mut self) -> eyre::Result<Option<PendingBlock<N>>> {
trace!("Attempting new flashblock");
let latest = self
.provider
.latest_header()?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let latest_hash = latest.hash();
let Some(attrs) = self.blocks.payload_base() else {
trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base");
return Ok(None)
};
if attrs.parent_hash != latest_hash {
trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock");
// doesn't attach to the latest block
return Ok(None)
}
let state_provider = self.provider.history_by_block_hash(latest.hash())?;
let mut request_cache = self.cached_reads(latest_hash);
let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider));
let mut state = State::builder().with_database(cached_db).with_bundle_update().build();
let mut builder = self
.evm_config
.builder_for_next_block(&mut state, &latest, attrs.into())
.map_err(RethError::other)?;
builder.apply_pre_execution_changes()?;
for tx in self.blocks.ready_transactions() {
let _gas_used = builder.execute_transaction(tx)?;
}
let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
builder.finish(NoopProvider::default())?;
let execution_outcome = ExecutionOutcome::new(
state.take_bundle(),
vec![execution_result.receipts],
block.number(),
vec![execution_result.requests],
);
// update cached reads
self.update_cached_reads(latest_hash, request_cache);
Ok(Some(PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),
ExecutedBlock {
recovered_block: block.into(),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
},
)))
Some(BuildArgs {
base,
transactions: self.blocks.ready_transactions().collect::<Vec<_>>(),
cached_state: self.cached_state.take(),
})
}
/// Takes out `current` [`PendingBlock`] if `state` is not preceding it.
@@ -180,8 +134,10 @@ where
impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
where
N: NodePrimitives,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
+ Clone
+ 'static,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
+ BlockReaderIdExt<
@@ -189,63 +145,108 @@ where
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
> + Unpin,
> + Unpin
+ Clone
+ 'static,
{
type Item = eyre::Result<Option<PendingBlock<N>>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// consume new flashblocks while they're ready
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
match result {
Ok(flashblock) => match this.blocks.insert(flashblock) {
Ok(_) => this.rebuild = true,
Err(err) => debug!(%err, "Failed to prepare flashblock"),
},
Err(err) => return Poll::Ready(Some(Err(err))),
loop {
// drive pending build job to completion
let result = match this.job.as_mut() {
Some((now, rx)) => {
let result = ready!(rx.poll_unpin(cx));
result.ok().map(|res| (*now, res))
}
None => None,
};
// reset job
this.job.take();
if let Some((now, result)) = result {
match result {
Ok(Some((new_pending, cached_reads))) => {
// built a new pending block
this.current = Some(new_pending.clone());
// cache reads
this.cached_state = Some((new_pending.parent_hash(), cached_reads));
this.rebuild = false;
trace!(
parent_hash = %new_pending.block().parent_hash(),
block_number = new_pending.block().number(),
flash_blocks = this.blocks.count(),
elapsed = ?now.elapsed(),
"Built new block with flashblocks"
);
return Poll::Ready(Some(Ok(Some(new_pending))));
}
Ok(None) => {
// nothing to do because tracked flashblock doesn't attach to latest
}
Err(err) => {
// we can ignore this error
debug!(%err, "failed to execute flashblock");
}
}
}
}
if let Poll::Ready(Ok(state)) = {
let fut = this.canon_receiver.recv();
pin!(fut);
fut.poll_unpin(cx)
} {
if let Some(current) = this.on_new_tip(state) {
trace!(
parent_hash = %current.block().parent_hash(),
block_number = current.block().number(),
"Clearing current flashblock on new canonical block"
);
return Poll::Ready(Some(Ok(None)))
// consume new flashblocks while they're ready
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
match result {
Ok(flashblock) => match this.blocks.insert(flashblock) {
Ok(_) => this.rebuild = true,
Err(err) => debug!(%err, "Failed to prepare flashblock"),
},
Err(err) => return Poll::Ready(Some(Err(err))),
}
}
// update on new head block
if let Poll::Ready(Ok(state)) = {
let fut = this.canon_receiver.recv();
pin!(fut);
fut.poll_unpin(cx)
} {
if let Some(current) = this.on_new_tip(state) {
trace!(
parent_hash = %current.block().parent_hash(),
block_number = current.block().number(),
"Clearing current flashblock on new canonical block"
);
return Poll::Ready(Some(Ok(None)))
}
}
if !this.rebuild && this.current.is_some() {
return Poll::Pending
}
// try to build a block on top of latest
if let Some(args) = this.build_args() {
let now = Instant::now();
let (tx, rx) = oneshot::channel();
let builder = this.builder.clone();
this.spawner.spawn_blocking(async move {
let _ = tx.send(builder.execute(args));
});
this.job.replace((now, rx));
// continue and poll the spawned job
continue
}
}
if !this.rebuild && this.current.is_some() {
return Poll::Pending
}
let now = Instant::now();
// try to build a block on top of latest
match this.execute() {
Ok(Some(new_pending)) => {
// built a new pending block
this.current = Some(new_pending.clone());
this.rebuild = false;
trace!(parent_hash=%new_pending.block().parent_hash(), block_number=new_pending.block().number(), flash_blocks=this.blocks.count(), elapsed=?now.elapsed(), "Built new block with flashblocks");
return Poll::Ready(Some(Ok(Some(new_pending))));
}
Ok(None) => {
// nothing to do because tracked flashblock doesn't attach to latest
}
Err(err) => {
// we can ignore this error
debug!(%err, "failed to execute flashblock");
}
}
Poll::Pending
}
}
type BuildJob<N> =
(Instant, oneshot::Receiver<eyre::Result<Option<(PendingBlock<N>, CachedReads)>>>);

View File

@@ -0,0 +1,131 @@
use crate::ExecutionPayloadBaseV1;
use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag};
use alloy_primitives::B256;
use reth_chain_state::{CanonStateSubscriptions, ExecutedBlock};
use reth_errors::RethError;
use reth_evm::{
execute::{BlockBuilder, BlockBuilderOutcome},
ConfigureEvm,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tracing::trace;
/// The `FlashBlockBuilder` builds [`PendingBlock`] out of a sequence of transactions.
#[derive(Debug)]
pub(crate) struct FlashBlockBuilder<EvmConfig, Provider> {
evm_config: EvmConfig,
provider: Provider,
}
impl<EvmConfig, Provider> FlashBlockBuilder<EvmConfig, Provider> {
pub(crate) const fn new(evm_config: EvmConfig, provider: Provider) -> Self {
Self { evm_config, provider }
}
pub(crate) const fn provider(&self) -> &Provider {
&self.provider
}
}
pub(crate) struct BuildArgs<I> {
pub base: ExecutionPayloadBaseV1,
pub transactions: I,
pub cached_state: Option<(B256, CachedReads)>,
}
impl<N, EvmConfig, Provider> FlashBlockBuilder<EvmConfig, Provider>
where
N: NodePrimitives,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
+ BlockReaderIdExt<
Header = HeaderTy<N>,
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
> + Unpin,
{
/// Returns the [`PendingBlock`] made purely out of transactions and [`ExecutionPayloadBaseV1`]
/// in `args`.
///
/// Returns `None` if the flashblock doesn't attach to the latest header.
pub(crate) fn execute<I: IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>>(
&self,
mut args: BuildArgs<I>,
) -> eyre::Result<Option<(PendingBlock<N>, CachedReads)>> {
trace!("Attempting new pending block from flashblocks");
let latest = self
.provider
.latest_header()?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let latest_hash = latest.hash();
if args.base.parent_hash != latest_hash {
trace!(flashblock_parent = ?args.base.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock");
// doesn't attach to the latest block
return Ok(None)
}
let state_provider = self.provider.history_by_block_hash(latest.hash())?;
let mut request_cache = args
.cached_state
.take()
.filter(|(hash, _)| hash == &latest_hash)
.map(|(_, state)| state)
.unwrap_or_default();
let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider));
let mut state = State::builder().with_database(cached_db).with_bundle_update().build();
let mut builder = self
.evm_config
.builder_for_next_block(&mut state, &latest, args.base.into())
.map_err(RethError::other)?;
builder.apply_pre_execution_changes()?;
for tx in args.transactions {
let _gas_used = builder.execute_transaction(tx)?;
}
let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
builder.finish(NoopProvider::default())?;
let execution_outcome = ExecutionOutcome::new(
state.take_bundle(),
vec![execution_result.receipts],
block.number(),
vec![execution_result.requests],
);
Ok(Some((
PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),
ExecutedBlock {
recovered_block: block.into(),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
},
),
request_cache,
)))
}
}
impl<EvmConfig: Clone, Provider: Clone> Clone for FlashBlockBuilder<EvmConfig, Provider> {
fn clone(&self) -> Self {
Self { evm_config: self.evm_config.clone(), provider: self.provider.clone() }
}
}

View File

@@ -467,8 +467,9 @@ where
stream,
ctx.components.evm_config().clone(),
ctx.components.provider().clone(),
ctx.components.task_executor().clone(),
);
ctx.components.task_executor().spawn_blocking(Box::pin(service.run(tx)));
ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
Some(rx)
} else {
None