refactor: simplify --dev setup (#16662)

This commit is contained in:
Arsenii Kulikov
2025-06-05 00:44:50 +04:00
committed by GitHub
parent 6d5b0ef74e
commit cf80ef4d86
7 changed files with 58 additions and 264 deletions

View File

@@ -11,19 +11,12 @@ exclude.workspace = true
[dependencies]
# reth
reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-engine-primitives.workspace = true
reth-engine-service.workspace = true
reth-engine-tree.workspace = true
reth-node-types.workspace = true
reth-evm.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-transaction-pool.workspace = true
reth-stages-api.workspace = true
# alloy
alloy-consensus.workspace = true
@@ -50,5 +43,4 @@ op = [
"dep:op-alloy-rpc-types-engine",
"dep:reth-optimism-chainspec",
"reth-payload-primitives/op",
"reth-evm/op",
]

View File

@@ -10,8 +10,6 @@
pub mod miner;
pub mod payload;
pub mod service;
pub use miner::MiningMode;
pub use miner::{LocalMiner, MiningMode};
pub use payload::LocalPayloadAttributesBuilder;
pub use service::LocalEngineService;

View File

@@ -5,7 +5,7 @@ use alloy_primitives::{TxHash, B256};
use alloy_rpc_types_engine::ForkchoiceState;
use eyre::OptionExt;
use futures_util::{stream::Fuse, StreamExt};
use reth_engine_primitives::BeaconEngineMessage;
use reth_engine_primitives::BeaconConsensusEngineHandle;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
@@ -18,10 +18,7 @@ use std::{
task::{Context, Poll},
time::{Duration, UNIX_EPOCH},
};
use tokio::{
sync::{mpsc::UnboundedSender, oneshot},
time::Interval,
};
use tokio::time::Interval;
use tokio_stream::wrappers::ReceiverStream;
use tracing::error;
@@ -78,7 +75,7 @@ pub struct LocalMiner<T: PayloadTypes, B> {
/// The payload attribute builder for the engine
payload_attributes_builder: B,
/// Sender for events to engine.
to_engine: UnboundedSender<BeaconEngineMessage<T>>,
to_engine: BeaconConsensusEngineHandle<T>,
/// The mining mode for the engine
mode: MiningMode,
/// The payload builder for the engine
@@ -95,31 +92,28 @@ where
B: PayloadAttributesBuilder<<T as PayloadTypes>::PayloadAttributes>,
{
/// Spawns a new [`LocalMiner`] with the given parameters.
pub fn spawn_new(
pub fn new(
provider: impl BlockReader,
payload_attributes_builder: B,
to_engine: UnboundedSender<BeaconEngineMessage<T>>,
to_engine: BeaconConsensusEngineHandle<T>,
mode: MiningMode,
payload_builder: PayloadBuilderHandle<T>,
) {
) -> Self {
let latest_header =
provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
let miner = Self {
Self {
payload_attributes_builder,
to_engine,
mode,
payload_builder,
last_timestamp: latest_header.timestamp(),
last_block_hashes: vec![latest_header.hash()],
};
// Spawn the miner
tokio::spawn(miner.run());
}
}
/// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
async fn run(mut self) {
pub async fn run(mut self) {
let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
@@ -156,16 +150,12 @@ where
/// Sends a FCU to the engine.
async fn update_forkchoice_state(&self) -> eyre::Result<()> {
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state: self.forkchoice_state(),
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
})?;
let res = self
.to_engine
.fork_choice_updated(self.forkchoice_state(), None, EngineApiMessageVersion::default())
.await?;
let res = rx.await??;
if !res.forkchoice_status().is_valid() {
if !res.is_valid() {
eyre::bail!("Invalid fork choice update")
}
@@ -183,16 +173,16 @@ where
.as_secs(),
);
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state: self.forkchoice_state(),
payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
tx,
version: EngineApiMessageVersion::default(),
})?;
let res = self
.to_engine
.fork_choice_updated(
self.forkchoice_state(),
Some(self.payload_attributes_builder.build(timestamp)),
EngineApiMessageVersion::default(),
)
.await?;
let res = rx.await??.await?;
if !res.payload_status.is_valid() {
if !res.is_valid() {
eyre::bail!("Invalid payload status")
}
@@ -206,11 +196,8 @@ where
let block = payload.block();
let (tx, rx) = oneshot::channel();
let payload = T::block_to_payload(payload.block().clone());
self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx })?;
let res = rx.await??;
let res = self.to_engine.new_payload(payload).await?;
if !res.is_valid() {
eyre::bail!("Invalid payload")

View File

@@ -1,5 +1,5 @@
//! The implementation of the [`PayloadAttributesBuilder`] for the
//! [`LocalEngineService`](super::service::LocalEngineService).
//! [`LocalMiner`](super::LocalMiner).
use alloy_primitives::{Address, B256};
use reth_chainspec::EthereumHardforks;

View File

@@ -1,163 +0,0 @@
//! Provides a local dev service engine that can be used to run a dev chain.
//!
//! [`LocalEngineService`] polls the payload builder based on a mining mode
//! which can be set to `Instant` or `Interval`. The `Instant` mode will
//! constantly poll the payload builder and initiate block building
//! with a single transaction. The `Interval` mode will initiate block
//! building at a fixed interval.
use core::fmt;
use std::{
fmt::{Debug, Formatter},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use crate::miner::{LocalMiner, MiningMode};
use futures_util::{Stream, StreamExt};
use reth_chainspec::EthChainSpec;
use reth_consensus::{ConsensusError, FullConsensus};
use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineValidator};
use reth_engine_service::service::EngineMessageStream;
use reth_engine_tree::{
chain::{ChainEvent, HandlerEvent},
engine::{
EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
RequestHandlerEvent,
},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
use reth_evm::ConfigureEvm;
use reth_node_types::BlockTy;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes};
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ChainSpecProvider, ProviderFactory,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::MetricEventsSender;
use tokio::sync::mpsc::UnboundedSender;
use tracing::error;
/// Provides a local dev service engine that can be used to drive the
/// chain forward.
///
/// This service both produces and consumes [`BeaconEngineMessage`]s. This is done to allow
/// modifications of the stream
pub struct LocalEngineService<N>
where
N: ProviderNodeTypes,
{
/// Processes requests.
///
/// This type is responsible for processing incoming requests.
handler: EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
/// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
incoming_requests: EngineMessageStream<N::Payload>,
}
impl<N> LocalEngineService<N>
where
N: ProviderNodeTypes,
{
/// Constructor for [`LocalEngineService`].
#[expect(clippy::too_many_arguments)]
pub fn new<B, V, C>(
consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
payload_builder: PayloadBuilderHandle<N::Payload>,
payload_validator: V,
tree_config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
sync_metrics_tx: MetricEventsSender,
to_engine: UnboundedSender<BeaconEngineMessage<N::Payload>>,
from_engine: EngineMessageStream<N::Payload>,
mode: MiningMode,
payload_attributes_builder: B,
evm_config: C,
) -> Self
where
B: PayloadAttributesBuilder<<N::Payload as PayloadTypes>::PayloadAttributes>,
V: EngineValidator<N::Payload, Block = BlockTy<N>>,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let chain_spec = provider.chain_spec();
let engine_kind =
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
blockchain_db.clone(),
consensus,
payload_validator,
persistence_handle,
payload_builder.clone(),
canonical_in_memory_state,
tree_config,
invalid_block_hook,
engine_kind,
evm_config,
);
let handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
LocalMiner::spawn_new(
blockchain_db,
payload_attributes_builder,
to_engine,
mode,
payload_builder,
);
Self { handler, incoming_requests: from_engine }
}
}
impl<N> Stream for LocalEngineService<N>
where
N: ProviderNodeTypes,
{
type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Poll::Ready(ev) = this.handler.poll(cx) {
return match ev {
RequestHandlerEvent::HandlerEvent(ev) => match ev {
HandlerEvent::BackfillAction(_) => {
error!(target: "engine::local", "received backfill request in local engine");
Poll::Ready(Some(ChainEvent::FatalError))
}
HandlerEvent::Event(ev) => Poll::Ready(Some(ChainEvent::Handler(ev))),
HandlerEvent::FatalError => Poll::Ready(Some(ChainEvent::FatalError)),
},
RequestHandlerEvent::Download(_) => {
error!(target: "engine::local", "received download request in local engine");
Poll::Ready(Some(ChainEvent::FatalError))
}
}
}
// forward incoming requests to the handler
while let Poll::Ready(Some(req)) = this.incoming_requests.poll_next_unpin(cx) {
this.handler.on_event(FromEngine::Request(req.into()));
}
Poll::Pending
}
}
impl<N: ProviderNodeTypes> Debug for LocalEngineService<N> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalEngineService").finish_non_exhaustive()
}
}

View File

@@ -4,7 +4,7 @@ use alloy_consensus::BlockHeader;
use futures::{future::Either, stream, stream_select, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_db_api::{database_metrics::DatabaseMetrics, Database};
use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
use reth_engine_local::{LocalMiner, LocalPayloadAttributesBuilder};
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
engine::{EngineApiRequest, EngineRequestHandler},
@@ -212,46 +212,37 @@ where
// during this run.
.maybe_store_messages(node_config.debug.engine_api_store.clone());
let mut engine_service = if ctx.is_dev() {
let eth_service = LocalEngineService::new(
consensus.clone(),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
ctx.components().payload_builder_handle().clone(),
engine_payload_validator,
engine_tree_config,
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
consensus_engine_tx.clone(),
Box::pin(consensus_engine_stream),
ctx.dev_mining_mode(ctx.components().pool()),
LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
ctx.components().evm_config().clone(),
);
let mut engine_service = EngineService::new(
consensus.clone(),
ctx.chain_spec(),
network_client.clone(),
Box::pin(consensus_engine_stream),
pipeline,
Box::new(ctx.task_executor().clone()),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
ctx.components().payload_builder_handle().clone(),
engine_payload_validator,
engine_tree_config,
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.components().evm_config().clone(),
);
Either::Left(eth_service)
} else {
let eth_service = EngineService::new(
consensus.clone(),
ctx.chain_spec(),
network_client.clone(),
Box::pin(consensus_engine_stream),
pipeline,
Box::new(ctx.task_executor().clone()),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
ctx.components().payload_builder_handle().clone(),
engine_payload_validator,
engine_tree_config,
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.components().evm_config().clone(),
if ctx.is_dev() {
ctx.task_executor().spawn_critical(
"local engine",
LocalMiner::new(
ctx.blockchain_db().clone(),
LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
beacon_engine_handle.clone(),
ctx.dev_mining_mode(ctx.components().pool()),
ctx.components().payload_builder_handle().clone(),
)
.run(),
);
Either::Right(eth_service)
};
}
info!(target: "reth::cli", "Consensus engine initialized");
@@ -301,9 +292,7 @@ where
ctx.task_executor().spawn_critical("consensus engine", async move {
if let Some(initial_target) = initial_target {
debug!(target: "reth::cli", %initial_target, "start backfill sync");
if let Either::Right(eth_service) = &mut engine_service {
eth_service.orchestrator_mut().start_backfill_sync(initial_target);
}
engine_service.orchestrator_mut().start_backfill_sync(initial_target);
}
let mut res = Ok(());
@@ -314,9 +303,7 @@ where
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(), "inserting built payload");
if let Either::Right(eth_service) = &mut engine_service {
eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
}
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
}
}
event = engine_service.next() => {