diff --git a/Cargo.lock b/Cargo.lock index 4fddc42997..2e851bc647 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8322,36 +8322,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "reth-engine-service" -version = "1.11.0" -dependencies = [ - "alloy-eips", - "futures", - "pin-project", - "reth-chainspec", - "reth-consensus", - "reth-engine-primitives", - "reth-engine-tree", - "reth-ethereum-consensus", - "reth-ethereum-engine-primitives", - "reth-evm", - "reth-evm-ethereum", - "reth-exex-types", - "reth-network-p2p", - "reth-node-ethereum", - "reth-node-types", - "reth-payload-builder", - "reth-primitives-traits", - "reth-provider", - "reth-prune", - "reth-stages-api", - "reth-tasks", - "reth-trie-db", - "tokio", - "tokio-stream", -] - [[package]] name = "reth-engine-tree" version = "1.11.0" @@ -9270,7 +9240,6 @@ dependencies = [ "reth-downloaders", "reth-engine-local", "reth-engine-primitives", - "reth-engine-service", "reth-engine-tree", "reth-engine-util", "reth-ethereum-engine-primitives", diff --git a/Cargo.toml b/Cargo.toml index 750ef1756b..a735e713f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ members = [ "crates/engine/invalid-block-hooks/", "crates/engine/local", "crates/engine/primitives/", - "crates/engine/service", "crates/engine/tree/", "crates/engine/util/", "crates/era", @@ -349,7 +348,6 @@ reth-ecies = { path = "crates/net/ecies" } reth-engine-local = { path = "crates/engine/local" } reth-engine-primitives = { path = "crates/engine/primitives", default-features = false } reth-engine-tree = { path = "crates/engine/tree" } -reth-engine-service = { path = "crates/engine/service" } reth-engine-util = { path = "crates/engine/util" } reth-era = { path = "crates/era" } reth-era-downloader = { path = "crates/era-downloader" } diff --git a/crates/engine/service/Cargo.toml b/crates/engine/service/Cargo.toml deleted file mode 100644 index 8c866e865e..0000000000 --- a/crates/engine/service/Cargo.toml +++ /dev/null @@ -1,47 +0,0 @@ -[package] -name = "reth-engine-service" -version.workspace = true -edition.workspace = true -rust-version.workspace = true -license.workspace = true -homepage.workspace = true -repository.workspace = true - -[lints] -workspace = true - -[dependencies] -# reth -reth-consensus.workspace = true -reth-engine-tree.workspace = true -reth-evm.workspace = true -reth-network-p2p.workspace = true -reth-payload-builder.workspace = true -reth-provider.workspace = true -reth-prune.workspace = true -reth-stages-api.workspace = true -reth-tasks.workspace = true -reth-node-types.workspace = true -reth-chainspec.workspace = true -reth-engine-primitives.workspace = true -reth-trie-db.workspace = true - -# async -futures.workspace = true -pin-project.workspace = true - -# misc - -[dev-dependencies] -reth-engine-tree = { workspace = true, features = ["test-utils"] } -reth-ethereum-consensus.workspace = true -reth-ethereum-engine-primitives.workspace = true -reth-evm-ethereum.workspace = true -reth-exex-types.workspace = true -reth-primitives-traits.workspace = true -reth-node-ethereum.workspace = true -reth-trie-db.workspace = true - -alloy-eips.workspace = true -tokio = { workspace = true, features = ["sync"] } -tokio-stream.workspace = true diff --git a/crates/engine/service/src/lib.rs b/crates/engine/service/src/lib.rs deleted file mode 100644 index cd61b0354e..0000000000 --- a/crates/engine/service/src/lib.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! Engine service implementation. - -#![doc( - html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", - html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", - issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" -)] -#![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(not(test), warn(unused_crate_dependencies))] - -/// Engine Service -pub mod service; diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs deleted file mode 100644 index 06f88479d3..0000000000 --- a/crates/engine/service/src/service.rs +++ /dev/null @@ -1,229 +0,0 @@ -use futures::{Stream, StreamExt}; -use pin_project::pin_project; -use reth_chainspec::EthChainSpec; -use reth_consensus::FullConsensus; -use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent}; -use reth_engine_tree::{ - backfill::PipelineSync, - download::BasicBlockDownloader, - engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler}, - persistence::PersistenceHandle, - tree::{EngineApiTreeHandler, EngineValidator, TreeConfig}, -}; -pub use reth_engine_tree::{ - chain::{ChainEvent, ChainOrchestrator}, - engine::EngineApiEvent, -}; -use reth_evm::ConfigureEvm; -use reth_network_p2p::BlockClient; -use reth_node_types::{BlockTy, NodeTypes}; -use reth_payload_builder::PayloadBuilderHandle; -use reth_provider::{ - providers::{BlockchainProvider, ProviderNodeTypes}, - ProviderFactory, StorageSettingsCache, -}; -use reth_prune::PrunerWithFactory; -use reth_stages_api::{MetricEventsSender, Pipeline}; -use reth_tasks::TaskSpawner; -use reth_trie_db::ChangesetCache; -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -/// Alias for consensus engine stream. -pub type EngineMessageStream = Pin> + Send + Sync>>; - -/// Alias for chain orchestrator. -type EngineServiceType = ChainOrchestrator< - EngineHandler< - EngineApiRequestHandler< - EngineApiRequest<::Payload, ::Primitives>, - ::Primitives, - >, - EngineMessageStream<::Payload>, - BasicBlockDownloader>, - >, - PipelineSync, ->; - -/// The type that drives the chain forward and communicates progress. -#[pin_project] -#[expect(missing_debug_implementations)] -// TODO(mattsse): remove hidden once fixed : -// otherwise rustdoc fails to resolve the alias -#[doc(hidden)] -pub struct EngineService -where - N: ProviderNodeTypes, - Client: BlockClient> + 'static, -{ - orchestrator: EngineServiceType, -} - -impl EngineService -where - N: ProviderNodeTypes, - Client: BlockClient> + 'static, -{ - /// Constructor for `EngineService`. - #[expect(clippy::too_many_arguments)] - pub fn new( - consensus: Arc>, - chain_spec: Arc, - client: Client, - incoming_requests: EngineMessageStream, - pipeline: Pipeline, - pipeline_task_spawner: Box, - provider: ProviderFactory, - blockchain_db: BlockchainProvider, - pruner: PrunerWithFactory>, - payload_builder: PayloadBuilderHandle, - payload_validator: V, - tree_config: TreeConfig, - sync_metrics_tx: MetricEventsSender, - evm_config: C, - changeset_cache: ChangesetCache, - ) -> Self - where - V: EngineValidator, - C: ConfigureEvm + 'static, - { - let engine_kind = - if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum }; - - let downloader = BasicBlockDownloader::new(client, consensus.clone()); - let use_hashed_state = provider.cached_storage_settings().use_hashed_state(); - - let persistence_handle = - PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx); - - let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); - - let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new( - blockchain_db, - consensus, - payload_validator, - persistence_handle, - payload_builder, - canonical_in_memory_state, - tree_config, - engine_kind, - evm_config, - changeset_cache, - use_hashed_state, - ); - - let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); - let handler = EngineHandler::new(engine_handler, downloader, incoming_requests); - - let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner); - - Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) } - } - - /// Returns a mutable reference to the orchestrator. - pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType { - &mut self.orchestrator - } -} - -impl Stream for EngineService -where - N: ProviderNodeTypes, - Client: BlockClient> + 'static, -{ - type Item = ChainEvent>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut orchestrator = self.project().orchestrator; - StreamExt::poll_next_unpin(&mut orchestrator, cx) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use reth_chainspec::{ChainSpecBuilder, MAINNET}; - use reth_engine_primitives::{BeaconEngineMessage, NoopInvalidBlockHook}; - use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::BasicEngineValidator}; - use reth_ethereum_consensus::EthBeaconConsensus; - use reth_ethereum_engine_primitives::EthEngineTypes; - use reth_evm_ethereum::EthEvmConfig; - use reth_exex_types::FinishedExExHeight; - use reth_network_p2p::test_utils::TestFullBlockClient; - use reth_node_ethereum::EthereumEngineValidator; - use reth_primitives_traits::SealedHeader; - use reth_provider::{ - providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, - }; - use reth_prune::Pruner; - use reth_tasks::TokioTaskExecutor; - use reth_trie_db::ChangesetCache; - use std::sync::Arc; - use tokio::sync::{mpsc::unbounded_channel, watch}; - use tokio_stream::wrappers::UnboundedReceiverStream; - - #[test] - fn eth_chain_orchestrator_build() { - let chain_spec = Arc::new( - ChainSpecBuilder::default() - .chain(MAINNET.chain) - .genesis(MAINNET.genesis.clone()) - .paris_activated() - .build(), - ); - let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone())); - - let client = TestFullBlockClient::default(); - - let (_tx, rx) = unbounded_channel::>(); - let incoming_requests = UnboundedReceiverStream::new(rx); - - let pipeline = TestPipelineBuilder::new().build(chain_spec.clone()); - let pipeline_task_spawner = Box::::default(); - let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); - - let blockchain_db = - BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default()) - .unwrap(); - let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone()); - let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs); - let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx); - let evm_config = EthEvmConfig::new(chain_spec.clone()); - - let changeset_cache = ChangesetCache::new(); - - let engine_validator = BasicEngineValidator::new( - blockchain_db.clone(), - consensus.clone(), - evm_config.clone(), - engine_payload_validator, - TreeConfig::default(), - Box::new(NoopInvalidBlockHook::default()), - changeset_cache.clone(), - reth_tasks::Runtime::test(), - ); - - let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel(); - let (tx, _rx) = unbounded_channel(); - let _eth_service = EngineService::new( - consensus, - chain_spec, - client, - Box::pin(incoming_requests), - pipeline, - pipeline_task_spawner, - provider_factory, - blockchain_db, - pruner, - PayloadBuilderHandle::new(tx), - engine_validator, - TreeConfig::default(), - sync_metrics_tx, - evm_config, - changeset_cache, - ); - } -} diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs new file mode 100644 index 0000000000..3bfc458d9a --- /dev/null +++ b/crates/engine/tree/src/launch.rs @@ -0,0 +1,110 @@ +//! Engine orchestrator launch helper. +//! +//! Provides [`build_engine_orchestrator`](crate::launch::build_engine_orchestrator) which wires +//! together all engine components and returns a +//! [`ChainOrchestrator`](crate::chain::ChainOrchestrator) ready to be polled as a `Stream`. + +use crate::{ + backfill::PipelineSync, + chain::ChainOrchestrator, + download::BasicBlockDownloader, + engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler}, + persistence::PersistenceHandle, + tree::{EngineApiTreeHandler, EngineValidator, TreeConfig}, +}; +use futures::Stream; +use reth_consensus::FullConsensus; +use reth_engine_primitives::BeaconEngineMessage; +use reth_evm::ConfigureEvm; +use reth_network_p2p::BlockClient; +use reth_payload_builder::PayloadBuilderHandle; +use reth_primitives_traits::NodePrimitives; +use reth_provider::{ + providers::{BlockchainProvider, ProviderNodeTypes}, + ProviderFactory, StorageSettingsCache, +}; +use reth_prune::PrunerWithFactory; +use reth_stages_api::{MetricEventsSender, Pipeline}; +use reth_tasks::TaskSpawner; +use reth_trie_db::ChangesetCache; +use std::sync::Arc; + +/// Builds the engine [`ChainOrchestrator`] that drives the chain forward. +/// +/// This spawns and wires together the following components: +/// +/// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync. +/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing +/// blocks and performing pruning outside the critical consensus path. +/// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests +/// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state. +/// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL +/// messages to the tree handler and manages download requests. +/// - **[`PipelineSync`]** — wraps the staged sync [`Pipeline`] for backfill sync when the node +/// needs to catch up over large block ranges. +/// +/// The returned orchestrator implements [`Stream`] and yields +/// [`ChainEvent`]s. +/// +/// [`ChainEvent`]: crate::chain::ChainEvent +#[expect(clippy::too_many_arguments, clippy::type_complexity)] +pub fn build_engine_orchestrator( + engine_kind: EngineApiKind, + consensus: Arc>, + client: Client, + incoming_requests: S, + pipeline: Pipeline, + pipeline_task_spawner: Box, + provider: ProviderFactory, + blockchain_db: BlockchainProvider, + pruner: PrunerWithFactory>, + payload_builder: PayloadBuilderHandle, + payload_validator: V, + tree_config: TreeConfig, + sync_metrics_tx: MetricEventsSender, + evm_config: C, + changeset_cache: ChangesetCache, +) -> ChainOrchestrator< + EngineHandler< + EngineApiRequestHandler, N::Primitives>, + S, + BasicBlockDownloader::Block>, + >, + PipelineSync, +> +where + N: ProviderNodeTypes, + Client: BlockClient::Block> + 'static, + S: Stream> + Send + Sync + Unpin + 'static, + V: EngineValidator, + C: ConfigureEvm + 'static, +{ + let downloader = BasicBlockDownloader::new(client, consensus.clone()); + let use_hashed_state = provider.cached_storage_settings().use_hashed_state(); + + let persistence_handle = + PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx); + + let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); + + let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new( + blockchain_db, + consensus, + payload_validator, + persistence_handle, + payload_builder, + canonical_in_memory_state, + tree_config, + engine_kind, + evm_config, + changeset_cache, + use_hashed_state, + ); + + let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); + let handler = EngineHandler::new(engine_handler, downloader, incoming_requests); + + let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner); + + ChainOrchestrator::new(handler, backfill_sync) +} diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs index 43f29b8e0b..6ecc418773 100644 --- a/crates/engine/tree/src/lib.rs +++ b/crates/engine/tree/src/lib.rs @@ -100,6 +100,8 @@ pub mod chain; pub mod download; /// Engine Api chain handler support. pub mod engine; +/// Engine orchestrator launch helper. +pub mod launch; /// Metrics support. pub mod metrics; /// The background writer service, coordinating write operations on static files and the database. diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index 3171713a93..a7fbe745c3 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -24,7 +24,6 @@ reth-db-common.workspace = true reth-downloaders.workspace = true reth-engine-local.workspace = true reth-engine-primitives.workspace = true -reth-engine-service.workspace = true reth-engine-tree.workspace = true reth-engine-util.workspace = true reth-evm.workspace = true diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 31a21d1668..c93ffbb4b9 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -11,10 +11,10 @@ use crate::{ use alloy_consensus::BlockHeader; use futures::{stream_select, FutureExt, StreamExt}; use reth_chainspec::{EthChainSpec, EthereumHardforks}; -use reth_engine_service::service::{ChainEvent, EngineService}; use reth_engine_tree::{ - chain::FromOrchestrator, - engine::{EngineApiRequest, EngineRequestHandler}, + chain::{ChainEvent, FromOrchestrator}, + engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler}, + launch::build_engine_orchestrator, tree::TreeConfig, }; use reth_engine_util::EngineMessageStreamExt; @@ -219,9 +219,15 @@ impl EngineNodeLauncher { // during this run. .maybe_store_messages(node_config.debug.engine_api_store.clone()); - let mut engine_service = EngineService::new( + let engine_kind = if ctx.chain_spec().is_optimism() { + EngineApiKind::OpStack + } else { + EngineApiKind::Ethereum + }; + + let mut orchestrator = build_engine_orchestrator( + engine_kind, consensus.clone(), - ctx.chain_spec(), network_client.clone(), Box::pin(consensus_engine_stream), pipeline, @@ -290,7 +296,7 @@ impl EngineNodeLauncher { if let Some(initial_target) = initial_target { debug!(target: "reth::cli", %initial_target, "start backfill sync"); // network_handle's sync state is already initialized at Syncing - engine_service.orchestrator_mut().start_backfill_sync(initial_target); + orchestrator.start_backfill_sync(initial_target); } else if startup_sync_state_idle { network_handle.update_sync_state(SyncState::Idle); } @@ -303,7 +309,7 @@ impl EngineNodeLauncher { // the CL loop { tokio::select! { - event = engine_service.next() => { + event = orchestrator.next() => { let Some(event) = event else { break }; debug!(target: "reth::cli", "Event: {event}"); match event { @@ -353,13 +359,13 @@ impl EngineNodeLauncher { payload = built_payloads.select_next_some() => { if let Some(executed_block) = payload.executed_block() { debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload"); - engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into()); + orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into()); } } shutdown_req = &mut shutdown_rx => { if let Ok(req) = shutdown_req { debug!(target: "reth::cli", "received engine shutdown request"); - engine_service.orchestrator_mut().handler_mut().handler_mut().on_event( + orchestrator.handler_mut().handler_mut().on_event( FromOrchestrator::Terminate { tx: req.done_tx }.into() ); }