mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
refactor(engine): remove reth-engine-service crate (#22187)
Co-authored-by: mattsse <mattsse@users.noreply.github.com> Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
committed by
GitHub
parent
3ab7cb98aa
commit
1f3fd5da2e
31
Cargo.lock
generated
31
Cargo.lock
generated
@@ -8322,36 +8322,6 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-engine-service"
|
||||
version = "1.11.0"
|
||||
dependencies = [
|
||||
"alloy-eips",
|
||||
"futures",
|
||||
"pin-project",
|
||||
"reth-chainspec",
|
||||
"reth-consensus",
|
||||
"reth-engine-primitives",
|
||||
"reth-engine-tree",
|
||||
"reth-ethereum-consensus",
|
||||
"reth-ethereum-engine-primitives",
|
||||
"reth-evm",
|
||||
"reth-evm-ethereum",
|
||||
"reth-exex-types",
|
||||
"reth-network-p2p",
|
||||
"reth-node-ethereum",
|
||||
"reth-node-types",
|
||||
"reth-payload-builder",
|
||||
"reth-primitives-traits",
|
||||
"reth-provider",
|
||||
"reth-prune",
|
||||
"reth-stages-api",
|
||||
"reth-tasks",
|
||||
"reth-trie-db",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-engine-tree"
|
||||
version = "1.11.0"
|
||||
@@ -9270,7 +9240,6 @@ dependencies = [
|
||||
"reth-downloaders",
|
||||
"reth-engine-local",
|
||||
"reth-engine-primitives",
|
||||
"reth-engine-service",
|
||||
"reth-engine-tree",
|
||||
"reth-engine-util",
|
||||
"reth-ethereum-engine-primitives",
|
||||
|
||||
@@ -27,7 +27,6 @@ members = [
|
||||
"crates/engine/invalid-block-hooks/",
|
||||
"crates/engine/local",
|
||||
"crates/engine/primitives/",
|
||||
"crates/engine/service",
|
||||
"crates/engine/tree/",
|
||||
"crates/engine/util/",
|
||||
"crates/era",
|
||||
@@ -349,7 +348,6 @@ reth-ecies = { path = "crates/net/ecies" }
|
||||
reth-engine-local = { path = "crates/engine/local" }
|
||||
reth-engine-primitives = { path = "crates/engine/primitives", default-features = false }
|
||||
reth-engine-tree = { path = "crates/engine/tree" }
|
||||
reth-engine-service = { path = "crates/engine/service" }
|
||||
reth-engine-util = { path = "crates/engine/util" }
|
||||
reth-era = { path = "crates/era" }
|
||||
reth-era-downloader = { path = "crates/era-downloader" }
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
[package]
|
||||
name = "reth-engine-service"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-consensus.workspace = true
|
||||
reth-engine-tree.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-network-p2p.workspace = true
|
||||
reth-payload-builder.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-prune.workspace = true
|
||||
reth-stages-api.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-node-types.workspace = true
|
||||
reth-chainspec.workspace = true
|
||||
reth-engine-primitives.workspace = true
|
||||
reth-trie-db.workspace = true
|
||||
|
||||
# async
|
||||
futures.workspace = true
|
||||
pin-project.workspace = true
|
||||
|
||||
# misc
|
||||
|
||||
[dev-dependencies]
|
||||
reth-engine-tree = { workspace = true, features = ["test-utils"] }
|
||||
reth-ethereum-consensus.workspace = true
|
||||
reth-ethereum-engine-primitives.workspace = true
|
||||
reth-evm-ethereum.workspace = true
|
||||
reth-exex-types.workspace = true
|
||||
reth-primitives-traits.workspace = true
|
||||
reth-node-ethereum.workspace = true
|
||||
reth-trie-db.workspace = true
|
||||
|
||||
alloy-eips.workspace = true
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
tokio-stream.workspace = true
|
||||
@@ -1,12 +0,0 @@
|
||||
//! Engine service implementation.
|
||||
|
||||
#![doc(
|
||||
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
|
||||
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
|
||||
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
|
||||
)]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
|
||||
|
||||
/// Engine Service
|
||||
pub mod service;
|
||||
@@ -1,229 +0,0 @@
|
||||
use futures::{Stream, StreamExt};
|
||||
use pin_project::pin_project;
|
||||
use reth_chainspec::EthChainSpec;
|
||||
use reth_consensus::FullConsensus;
|
||||
use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
|
||||
use reth_engine_tree::{
|
||||
backfill::PipelineSync,
|
||||
download::BasicBlockDownloader,
|
||||
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
|
||||
persistence::PersistenceHandle,
|
||||
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
|
||||
};
|
||||
pub use reth_engine_tree::{
|
||||
chain::{ChainEvent, ChainOrchestrator},
|
||||
engine::EngineApiEvent,
|
||||
};
|
||||
use reth_evm::ConfigureEvm;
|
||||
use reth_network_p2p::BlockClient;
|
||||
use reth_node_types::{BlockTy, NodeTypes};
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, ProviderNodeTypes},
|
||||
ProviderFactory, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune::PrunerWithFactory;
|
||||
use reth_stages_api::{MetricEventsSender, Pipeline};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// Alias for consensus engine stream.
|
||||
pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
|
||||
|
||||
/// Alias for chain orchestrator.
|
||||
type EngineServiceType<N, Client> = ChainOrchestrator<
|
||||
EngineHandler<
|
||||
EngineApiRequestHandler<
|
||||
EngineApiRequest<<N as NodeTypes>::Payload, <N as NodeTypes>::Primitives>,
|
||||
<N as NodeTypes>::Primitives,
|
||||
>,
|
||||
EngineMessageStream<<N as NodeTypes>::Payload>,
|
||||
BasicBlockDownloader<Client, BlockTy<N>>,
|
||||
>,
|
||||
PipelineSync<N>,
|
||||
>;
|
||||
|
||||
/// The type that drives the chain forward and communicates progress.
|
||||
#[pin_project]
|
||||
#[expect(missing_debug_implementations)]
|
||||
// TODO(mattsse): remove hidden once fixed : <https://github.com/rust-lang/rust/issues/135363>
|
||||
// otherwise rustdoc fails to resolve the alias
|
||||
#[doc(hidden)]
|
||||
pub struct EngineService<N, Client>
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
Client: BlockClient<Block = BlockTy<N>> + 'static,
|
||||
{
|
||||
orchestrator: EngineServiceType<N, Client>,
|
||||
}
|
||||
|
||||
impl<N, Client> EngineService<N, Client>
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
Client: BlockClient<Block = BlockTy<N>> + 'static,
|
||||
{
|
||||
/// Constructor for `EngineService`.
|
||||
#[expect(clippy::too_many_arguments)]
|
||||
pub fn new<V, C>(
|
||||
consensus: Arc<dyn FullConsensus<N::Primitives>>,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
client: Client,
|
||||
incoming_requests: EngineMessageStream<N::Payload>,
|
||||
pipeline: Pipeline<N>,
|
||||
pipeline_task_spawner: Box<dyn TaskSpawner>,
|
||||
provider: ProviderFactory<N>,
|
||||
blockchain_db: BlockchainProvider<N>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
payload_builder: PayloadBuilderHandle<N::Payload>,
|
||||
payload_validator: V,
|
||||
tree_config: TreeConfig,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> Self
|
||||
where
|
||||
V: EngineValidator<N::Payload>,
|
||||
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
{
|
||||
let engine_kind =
|
||||
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
|
||||
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
|
||||
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
|
||||
|
||||
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
|
||||
blockchain_db,
|
||||
consensus,
|
||||
payload_validator,
|
||||
persistence_handle,
|
||||
payload_builder,
|
||||
canonical_in_memory_state,
|
||||
tree_config,
|
||||
engine_kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
);
|
||||
|
||||
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
|
||||
let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
|
||||
|
||||
let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
|
||||
|
||||
Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the orchestrator.
|
||||
pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
|
||||
&mut self.orchestrator
|
||||
}
|
||||
}
|
||||
|
||||
impl<N, Client> Stream for EngineService<N, Client>
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
Client: BlockClient<Block = BlockTy<N>> + 'static,
|
||||
{
|
||||
type Item = ChainEvent<ConsensusEngineEvent<N::Primitives>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut orchestrator = self.project().orchestrator;
|
||||
StreamExt::poll_next_unpin(&mut orchestrator, cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use reth_chainspec::{ChainSpecBuilder, MAINNET};
|
||||
use reth_engine_primitives::{BeaconEngineMessage, NoopInvalidBlockHook};
|
||||
use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::BasicEngineValidator};
|
||||
use reth_ethereum_consensus::EthBeaconConsensus;
|
||||
use reth_ethereum_engine_primitives::EthEngineTypes;
|
||||
use reth_evm_ethereum::EthEvmConfig;
|
||||
use reth_exex_types::FinishedExExHeight;
|
||||
use reth_network_p2p::test_utils::TestFullBlockClient;
|
||||
use reth_node_ethereum::EthereumEngineValidator;
|
||||
use reth_primitives_traits::SealedHeader;
|
||||
use reth_provider::{
|
||||
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
|
||||
};
|
||||
use reth_prune::Pruner;
|
||||
use reth_tasks::TokioTaskExecutor;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc::unbounded_channel, watch};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
#[test]
|
||||
fn eth_chain_orchestrator_build() {
|
||||
let chain_spec = Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
.chain(MAINNET.chain)
|
||||
.genesis(MAINNET.genesis.clone())
|
||||
.paris_activated()
|
||||
.build(),
|
||||
);
|
||||
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
|
||||
|
||||
let client = TestFullBlockClient::default();
|
||||
|
||||
let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
|
||||
let incoming_requests = UnboundedReceiverStream::new(rx);
|
||||
|
||||
let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
|
||||
let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
|
||||
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
|
||||
|
||||
let blockchain_db =
|
||||
BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
|
||||
.unwrap();
|
||||
let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
|
||||
let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
|
||||
let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
|
||||
let evm_config = EthEvmConfig::new(chain_spec.clone());
|
||||
|
||||
let changeset_cache = ChangesetCache::new();
|
||||
|
||||
let engine_validator = BasicEngineValidator::new(
|
||||
blockchain_db.clone(),
|
||||
consensus.clone(),
|
||||
evm_config.clone(),
|
||||
engine_payload_validator,
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
changeset_cache.clone(),
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
|
||||
let (tx, _rx) = unbounded_channel();
|
||||
let _eth_service = EngineService::new(
|
||||
consensus,
|
||||
chain_spec,
|
||||
client,
|
||||
Box::pin(incoming_requests),
|
||||
pipeline,
|
||||
pipeline_task_spawner,
|
||||
provider_factory,
|
||||
blockchain_db,
|
||||
pruner,
|
||||
PayloadBuilderHandle::new(tx),
|
||||
engine_validator,
|
||||
TreeConfig::default(),
|
||||
sync_metrics_tx,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
);
|
||||
}
|
||||
}
|
||||
110
crates/engine/tree/src/launch.rs
Normal file
110
crates/engine/tree/src/launch.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
//! Engine orchestrator launch helper.
|
||||
//!
|
||||
//! Provides [`build_engine_orchestrator`](crate::launch::build_engine_orchestrator) which wires
|
||||
//! together all engine components and returns a
|
||||
//! [`ChainOrchestrator`](crate::chain::ChainOrchestrator) ready to be polled as a `Stream`.
|
||||
|
||||
use crate::{
|
||||
backfill::PipelineSync,
|
||||
chain::ChainOrchestrator,
|
||||
download::BasicBlockDownloader,
|
||||
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
|
||||
persistence::PersistenceHandle,
|
||||
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
|
||||
};
|
||||
use futures::Stream;
|
||||
use reth_consensus::FullConsensus;
|
||||
use reth_engine_primitives::BeaconEngineMessage;
|
||||
use reth_evm::ConfigureEvm;
|
||||
use reth_network_p2p::BlockClient;
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, ProviderNodeTypes},
|
||||
ProviderFactory, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune::PrunerWithFactory;
|
||||
use reth_stages_api::{MetricEventsSender, Pipeline};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Builds the engine [`ChainOrchestrator`] that drives the chain forward.
|
||||
///
|
||||
/// This spawns and wires together the following components:
|
||||
///
|
||||
/// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync.
|
||||
/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing
|
||||
/// blocks and performing pruning outside the critical consensus path.
|
||||
/// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests
|
||||
/// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state.
|
||||
/// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL
|
||||
/// messages to the tree handler and manages download requests.
|
||||
/// - **[`PipelineSync`]** — wraps the staged sync [`Pipeline`] for backfill sync when the node
|
||||
/// needs to catch up over large block ranges.
|
||||
///
|
||||
/// The returned orchestrator implements [`Stream`] and yields
|
||||
/// [`ChainEvent`]s.
|
||||
///
|
||||
/// [`ChainEvent`]: crate::chain::ChainEvent
|
||||
#[expect(clippy::too_many_arguments, clippy::type_complexity)]
|
||||
pub fn build_engine_orchestrator<N, Client, S, V, C>(
|
||||
engine_kind: EngineApiKind,
|
||||
consensus: Arc<dyn FullConsensus<N::Primitives>>,
|
||||
client: Client,
|
||||
incoming_requests: S,
|
||||
pipeline: Pipeline<N>,
|
||||
pipeline_task_spawner: Box<dyn TaskSpawner>,
|
||||
provider: ProviderFactory<N>,
|
||||
blockchain_db: BlockchainProvider<N>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
payload_builder: PayloadBuilderHandle<N::Payload>,
|
||||
payload_validator: V,
|
||||
tree_config: TreeConfig,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> ChainOrchestrator<
|
||||
EngineHandler<
|
||||
EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
|
||||
S,
|
||||
BasicBlockDownloader<Client, <N::Primitives as NodePrimitives>::Block>,
|
||||
>,
|
||||
PipelineSync<N>,
|
||||
>
|
||||
where
|
||||
N: ProviderNodeTypes,
|
||||
Client: BlockClient<Block = <N::Primitives as NodePrimitives>::Block> + 'static,
|
||||
S: Stream<Item = BeaconEngineMessage<N::Payload>> + Send + Sync + Unpin + 'static,
|
||||
V: EngineValidator<N::Payload>,
|
||||
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
{
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
|
||||
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
|
||||
|
||||
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
|
||||
blockchain_db,
|
||||
consensus,
|
||||
payload_validator,
|
||||
persistence_handle,
|
||||
payload_builder,
|
||||
canonical_in_memory_state,
|
||||
tree_config,
|
||||
engine_kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
);
|
||||
|
||||
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
|
||||
let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
|
||||
|
||||
let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
|
||||
|
||||
ChainOrchestrator::new(handler, backfill_sync)
|
||||
}
|
||||
@@ -100,6 +100,8 @@ pub mod chain;
|
||||
pub mod download;
|
||||
/// Engine Api chain handler support.
|
||||
pub mod engine;
|
||||
/// Engine orchestrator launch helper.
|
||||
pub mod launch;
|
||||
/// Metrics support.
|
||||
pub mod metrics;
|
||||
/// The background writer service, coordinating write operations on static files and the database.
|
||||
|
||||
@@ -24,7 +24,6 @@ reth-db-common.workspace = true
|
||||
reth-downloaders.workspace = true
|
||||
reth-engine-local.workspace = true
|
||||
reth-engine-primitives.workspace = true
|
||||
reth-engine-service.workspace = true
|
||||
reth-engine-tree.workspace = true
|
||||
reth-engine-util.workspace = true
|
||||
reth-evm.workspace = true
|
||||
|
||||
@@ -11,10 +11,10 @@ use crate::{
|
||||
use alloy_consensus::BlockHeader;
|
||||
use futures::{stream_select, FutureExt, StreamExt};
|
||||
use reth_chainspec::{EthChainSpec, EthereumHardforks};
|
||||
use reth_engine_service::service::{ChainEvent, EngineService};
|
||||
use reth_engine_tree::{
|
||||
chain::FromOrchestrator,
|
||||
engine::{EngineApiRequest, EngineRequestHandler},
|
||||
chain::{ChainEvent, FromOrchestrator},
|
||||
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
|
||||
launch::build_engine_orchestrator,
|
||||
tree::TreeConfig,
|
||||
};
|
||||
use reth_engine_util::EngineMessageStreamExt;
|
||||
@@ -219,9 +219,15 @@ impl EngineNodeLauncher {
|
||||
// during this run.
|
||||
.maybe_store_messages(node_config.debug.engine_api_store.clone());
|
||||
|
||||
let mut engine_service = EngineService::new(
|
||||
let engine_kind = if ctx.chain_spec().is_optimism() {
|
||||
EngineApiKind::OpStack
|
||||
} else {
|
||||
EngineApiKind::Ethereum
|
||||
};
|
||||
|
||||
let mut orchestrator = build_engine_orchestrator(
|
||||
engine_kind,
|
||||
consensus.clone(),
|
||||
ctx.chain_spec(),
|
||||
network_client.clone(),
|
||||
Box::pin(consensus_engine_stream),
|
||||
pipeline,
|
||||
@@ -290,7 +296,7 @@ impl EngineNodeLauncher {
|
||||
if let Some(initial_target) = initial_target {
|
||||
debug!(target: "reth::cli", %initial_target, "start backfill sync");
|
||||
// network_handle's sync state is already initialized at Syncing
|
||||
engine_service.orchestrator_mut().start_backfill_sync(initial_target);
|
||||
orchestrator.start_backfill_sync(initial_target);
|
||||
} else if startup_sync_state_idle {
|
||||
network_handle.update_sync_state(SyncState::Idle);
|
||||
}
|
||||
@@ -303,7 +309,7 @@ impl EngineNodeLauncher {
|
||||
// the CL
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = engine_service.next() => {
|
||||
event = orchestrator.next() => {
|
||||
let Some(event) = event else { break };
|
||||
debug!(target: "reth::cli", "Event: {event}");
|
||||
match event {
|
||||
@@ -353,13 +359,13 @@ impl EngineNodeLauncher {
|
||||
payload = built_payloads.select_next_some() => {
|
||||
if let Some(executed_block) = payload.executed_block() {
|
||||
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
|
||||
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
|
||||
orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
|
||||
}
|
||||
}
|
||||
shutdown_req = &mut shutdown_rx => {
|
||||
if let Ok(req) = shutdown_req {
|
||||
debug!(target: "reth::cli", "received engine shutdown request");
|
||||
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
|
||||
orchestrator.handler_mut().handler_mut().on_event(
|
||||
FromOrchestrator::Terminate { tx: req.done_tx }.into()
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user