diff --git a/Cargo.lock b/Cargo.lock index 9c03829028..51f1e86bd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4493,6 +4493,7 @@ dependencies = [ "reth-provider", "reth-rpc-types", "reth-stages", + "reth-tasks", "reth-tracing", "thiserror", "tokio", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 3777069306..1fe5a75c36 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -285,8 +285,13 @@ impl Command { ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events)); - let beacon_consensus_engine = - self.build_consensus_engine(db.clone(), consensus, pipeline, consensus_engine_rx)?; + let beacon_consensus_engine = self.build_consensus_engine( + db.clone(), + &ctx.task_executor, + consensus, + pipeline, + consensus_engine_rx, + )?; // Run consensus engine let (rx, tx) = tokio::sync::oneshot::channel(); @@ -360,10 +365,11 @@ impl Command { fn build_consensus_engine( &self, db: Arc, + task_executor: &TaskExecutor, consensus: C, pipeline: Pipeline, message_rx: UnboundedReceiver, - ) -> eyre::Result> + ) -> eyre::Result> where DB: Database + Unpin + 'static, U: SyncStateUpdater + Unpin + 'static, @@ -374,7 +380,14 @@ impl Command { TreeExternals::new(db.clone(), consensus, executor_factory, self.chain.clone()); let blockchain_tree = BlockchainTree::new(tree_externals, BlockchainTreeConfig::default())?; - Ok(BeaconConsensusEngine::new(db, pipeline, blockchain_tree, message_rx, self.max_block)) + Ok(BeaconConsensusEngine::new( + db, + task_executor.clone(), + pipeline, + blockchain_tree, + message_rx, + self.max_block, + )) } fn load_config(&self) -> eyre::Result { diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 82c9449ba6..1c977d6def 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -16,6 +16,7 @@ reth-db = { path = "../../storage/db" } reth-provider = { path = "../../storage/provider" } reth-executor = { path = "../../executor" } reth-rpc-types = { path = "../../rpc/rpc-types" } +reth-tasks = { path = "../../tasks" } # async tokio = { version = "1.21.2", features = ["sync"] } diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index 64da402e99..7da1103508 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -11,6 +11,9 @@ pub enum BeaconEngineError { /// Forkchoice zero hash head received. #[error("Received zero hash as forkchoice head")] ForkchoiceEmptyHead, + /// Pipeline channel closed. + #[error("Pipeline channel closed")] + PipelineChannelClosed, /// Encountered a payload error. #[error(transparent)] Payload(#[from] PayloadError), diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 2c08eafa25..ab2e9e57ce 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -13,12 +13,13 @@ use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, }; use reth_stages::Pipeline; +use reth_tasks::TaskSpawner; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, }; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::{mpsc::UnboundedReceiver, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; @@ -48,15 +49,18 @@ pub use pipeline_state::PipelineState; /// /// If the future is polled more than once. Leads to undefined state. #[must_use = "Future does nothing unless polled"] -pub struct BeaconConsensusEngine +pub struct BeaconConsensusEngine where DB: Database, + TS: TaskSpawner, U: SyncStateUpdater, C: Consensus, EF: ExecutorFactory, { /// The database handle. db: Arc, + /// Task spawner for spawning the pipeline. + task_spawner: TS, /// The current state of the pipeline. /// Must always be [Some] unless the state is being reevaluated. /// The pipeline is used for historical sync by setting the current forkchoice head. @@ -75,9 +79,10 @@ where max_block: Option, } -impl BeaconConsensusEngine +impl BeaconConsensusEngine where DB: Database + Unpin + 'static, + TS: TaskSpawner, U: SyncStateUpdater + 'static, C: Consensus, EF: ExecutorFactory + 'static, @@ -88,6 +93,7 @@ where /// handle the messages received from the Consensus Layer. pub fn new( db: Arc, + task_spawner: TS, pipeline: Pipeline, blockchain_tree: BlockchainTree, message_rx: UnboundedReceiver, @@ -95,6 +101,7 @@ where ) -> Self { Self { db, + task_spawner, pipeline_state: Some(PipelineState::Idle(pipeline)), blockchain_tree, message_rx: UnboundedReceiverStream::new(message_rx), @@ -232,7 +239,16 @@ where PipelineTarget::Safe => forkchoice_state.safe_block_hash, }; trace!(target: "consensus::engine", ?tip, "Starting the pipeline"); - PipelineState::Running(pipeline.run_as_fut(self.db.clone(), tip)) + let (tx, rx) = oneshot::channel(); + let db = self.db.clone(); + self.task_spawner.spawn_critical( + "pipeline", + Box::pin(async move { + let result = pipeline.run_as_fut(db, tip).await; + let _ = tx.send(result); + }), + ); + PipelineState::Running(rx) } else { PipelineState::Idle(pipeline) } @@ -274,9 +290,10 @@ where /// local forkchoice state, it will launch the pipeline to sync to the head hash. /// While the pipeline is syncing, the consensus engine will keep processing messages from the /// receiver and forwarding them to the blockchain tree. -impl Future for BeaconConsensusEngine +impl Future for BeaconConsensusEngine where DB: Database + Unpin + 'static, + TS: TaskSpawner + Unpin, U: SyncStateUpdater + Unpin + 'static, C: Consensus + Unpin, EF: ExecutorFactory + Unpin + 'static, @@ -322,7 +339,7 @@ where let next_state = match this.pipeline_state.take().expect("pipeline state is set") { PipelineState::Running(mut fut) => { match fut.poll_unpin(cx) { - Poll::Ready((pipeline, result)) => { + Poll::Ready(Ok((pipeline, result))) => { if let Err(error) = result { return Poll::Ready(Err(error.into())) } @@ -356,6 +373,10 @@ where // Get next pipeline state. this.next_pipeline_state(pipeline, forkchoice_state) } + Poll::Ready(Err(error)) => { + error!(target: "consensus::engine", ?error, "Failed to receive pipeline result"); + return Poll::Ready(Err(BeaconEngineError::PipelineChannelClosed)) + } Poll::Pending => { this.pipeline_state = Some(PipelineState::Running(fut)); return Poll::Pending @@ -409,6 +430,7 @@ mod tests { use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; use reth_provider::Transaction; use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; + use reth_tasks::TokioTaskExecutor; use std::{collections::VecDeque, time::Duration}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedSender}, @@ -418,6 +440,7 @@ mod tests { type TestBeaconConsensusEngine = BeaconConsensusEngine< Env, + TokioTaskExecutor, NoopSyncStateUpdate, TestConsensus, TestExecutorFactory, @@ -488,7 +511,14 @@ mod tests { let (sync_tx, sync_rx) = unbounded_channel(); ( - BeaconConsensusEngine::new(db.clone(), pipeline, tree, sync_rx, None), + BeaconConsensusEngine::new( + db.clone(), + TokioTaskExecutor::default(), + pipeline, + tree, + sync_rx, + None, + ), TestEnv::new(db, tip_rx, sync_tx), ) } diff --git a/crates/consensus/beacon/src/engine/pipeline_state.rs b/crates/consensus/beacon/src/engine/pipeline_state.rs index 42ffe1489c..4cab3d3533 100644 --- a/crates/consensus/beacon/src/engine/pipeline_state.rs +++ b/crates/consensus/beacon/src/engine/pipeline_state.rs @@ -1,6 +1,7 @@ use reth_db::database::Database; use reth_interfaces::sync::SyncStateUpdater; -use reth_stages::{Pipeline, PipelineFut}; +use reth_stages::{Pipeline, PipelineWithResult}; +use tokio::sync::oneshot; /// The possible pipeline states within the sync controller. /// @@ -15,7 +16,7 @@ pub enum PipelineState { /// Pipeline is idle. Idle(Pipeline), /// Pipeline is running. - Running(PipelineFut), + Running(oneshot::Receiver>), } impl PipelineState {