From 8a802dab600cd9a2dc0470eeeb22d1a0ccc30d8a Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 12 Aug 2024 22:06:39 -0700 Subject: [PATCH] chore(engine): enable engine debug streams in new implementation (#10282) --- crates/engine/service/Cargo.toml | 2 +- crates/engine/service/src/service.rs | 11 +++++++---- crates/engine/util/src/reorg.rs | 13 +++++++------ crates/node/builder/src/launch/engine.rs | 18 +++++++++++++++++- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/crates/engine/service/Cargo.toml b/crates/engine/service/Cargo.toml index 9aecd780a4..c5ace879db 100644 --- a/crates/engine/service/Cargo.toml +++ b/crates/engine/service/Cargo.toml @@ -30,7 +30,6 @@ reth-tasks.workspace = true # async futures.workspace = true pin-project.workspace = true -tokio-stream.workspace = true # misc thiserror.workspace = true @@ -46,3 +45,4 @@ reth-primitives.workspace = true reth-prune-types.workspace = true tokio = { workspace = true, features = ["sync"] } +tokio-stream.workspace = true diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index da93425197..6783ed01d3 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -30,13 +30,15 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio_stream::wrappers::UnboundedReceiverStream; + +/// Alias for consensus engine stream. +type EngineMessageStream = Pin> + Send + Sync>>; /// Alias for chain orchestrator. type EngineServiceType = ChainOrchestrator< EngineHandler< EngineApiRequestHandler>, - UnboundedReceiverStream>, + EngineMessageStream, BasicBlockDownloader, >, PipelineSync, @@ -70,7 +72,7 @@ where executor_factory: E, chain_spec: Arc, client: Client, - incoming_requests: UnboundedReceiverStream>, + incoming_requests: EngineMessageStream, pipeline: Pipeline, pipeline_task_spawner: Box, provider: ProviderFactory, @@ -149,6 +151,7 @@ mod tests { use reth_tasks::TokioTaskExecutor; use std::sync::Arc; use tokio::sync::{mpsc::unbounded_channel, watch}; + use tokio_stream::wrappers::UnboundedReceiverStream; #[test] fn eth_chain_orchestrator_build() { @@ -185,7 +188,7 @@ mod tests { executor_factory, chain_spec, client, - incoming_requests, + Box::pin(incoming_requests), pipeline, pipeline_task_spawner, provider_factory, diff --git a/crates/engine/util/src/reorg.rs b/crates/engine/util/src/reorg.rs index 65117e1723..b9a76cb1c1 100644 --- a/crates/engine/util/src/reorg.rs +++ b/crates/engine/util/src/reorg.rs @@ -1,6 +1,6 @@ //! Stream wrapper that simulates reorgs. -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; +use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; use itertools::Either; use reth_beacon_consensus::{BeaconEngineMessage, BeaconOnNewPayloadError, OnForkChoiceUpdated}; use reth_engine_primitives::EngineTypes; @@ -26,6 +26,7 @@ use reth_rpc_types_compat::engine::payload::block_to_payload; use revm_primitives::{BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg}; use std::{ collections::VecDeque, + future::Future, pin::Pin, task::{ready, Context, Poll}, }; @@ -43,6 +44,8 @@ type EngineReorgResponse = Result< oneshot::error::RecvError, >; +type ReorgResponseFut = Pin + Send + Sync>>; + /// Engine API stream wrapper that simulates reorgs with specified frequency. #[derive(Debug)] #[pin_project::pin_project] @@ -66,7 +69,7 @@ pub struct EngineReorg { /// Last forkchoice state. last_forkchoice_state: Option, /// Pending engine responses to reorg messages. - reorg_responses: FuturesUnordered>, + reorg_responses: FuturesUnordered, } impl EngineReorg { @@ -181,10 +184,8 @@ where let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel(); let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel(); this.reorg_responses.extend([ - Box::pin(reorg_payload_rx.map_ok(Either::Left)) - as BoxFuture<'static, EngineReorgResponse>, - Box::pin(reorg_fcu_rx.map_ok(Either::Right)) - as BoxFuture<'static, EngineReorgResponse>, + Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut, + Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut, ]); *this.state = EngineReorgState::Reorg { diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 83cd599b73..5512fd3285 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -20,6 +20,7 @@ use reth_engine_tree::{ engine::{EngineApiRequest, EngineRequestHandler}, tree::TreeConfig, }; +use reth_engine_util::EngineMessageStreamExt; use reth_exex::ExExManagerHandle; use reth_network::{NetworkSyncUpdater, SyncState}; use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider}; @@ -133,6 +134,21 @@ where let network_client = ctx.components().network().fetch_client().await?; let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + let node_config = ctx.node_config(); + let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) + .maybe_skip_fcu(node_config.debug.skip_fcu) + .maybe_skip_new_payload(node_config.debug.skip_new_payload) + .maybe_reorg( + ctx.blockchain_db().clone(), + ctx.components().evm_config().clone(), + reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()), + node_config.debug.reorg_frequency, + ) + // Store messages _after_ skipping so that `replay-engine` command + // would replay only the messages that were observed by the engine + // during this run. + .maybe_store_messages(node_config.debug.engine_api_store.clone()); + let max_block = ctx.max_block(network_client.clone()).await?; let mut hooks = EngineHooks::new(); @@ -179,7 +195,7 @@ where ctx.components().block_executor().clone(), ctx.chain_spec(), network_client.clone(), - UnboundedReceiverStream::new(consensus_engine_rx), + Box::pin(consensus_engine_stream), pipeline, Box::new(ctx.task_executor().clone()), ctx.provider_factory().clone(),