diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index e010afe7d2..f31b04fe8f 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -14,7 +14,7 @@ use fdlimit::raise_fd_limit; use futures::{pin_mut, stream::select as stream_select, StreamExt}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus}; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; -use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage}; +use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine}; use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; @@ -30,7 +30,7 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_interfaces::{ - consensus::{Consensus, ForkchoiceState}, + consensus::Consensus, p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, either::EitherDownloader, @@ -244,44 +244,6 @@ impl Command { let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); - // Forward genesis as forkchoice state to the consensus engine. - // This will allow the downloader to start - if self.debug.continuous { - info!(target: "reth::cli", "Continuous sync mode enabled"); - let (tip_tx, _tip_rx) = oneshot::channel(); - let state = ForkchoiceState { - head_block_hash: genesis_hash, - finalized_block_hash: genesis_hash, - safe_block_hash: genesis_hash, - }; - consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs: None, - tx: tip_tx, - })?; - } - - // Forward the `debug.tip` as forkchoice state to the consensus engine. - // This will initiate the sync up to the provided tip. - let _tip_rx = match self.debug.tip { - Some(tip) => { - let (tip_tx, tip_rx) = oneshot::channel(); - let state = ForkchoiceState { - head_block_hash: tip, - finalized_block_hash: tip, - safe_block_hash: tip, - }; - consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs: None, - tx: tip_tx, - })?; - debug!(target: "reth::cli", %tip, "Tip manually set"); - Some(tip_rx) - } - None => None, - }; - // configure the payload builder let payload_generator = BasicPayloadJobGenerator::new( blockchain_db.clone(), @@ -339,6 +301,20 @@ impl Command { let pipeline_events = pipeline.events(); + let initial_target = if let Some(tip) = self.debug.tip { + // Set the provided tip as the initial pipeline target. + debug!(target: "reth::cli", %tip, "Tip manually set"); + Some(tip) + } else if self.debug.continuous { + // Set genesis as the initial pipeline target. + // This will allow the downloader to start + debug!(target: "reth::cli", "Continuous sync mode enabled"); + Some(genesis_hash) + } else { + None + }; + + // Configure the consensus engine let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( Arc::clone(&db), client, @@ -349,6 +325,7 @@ impl Command { self.debug.max_block, self.debug.continuous, payload_builder.clone(), + initial_target, consensus_engine_tx, consensus_engine_rx, ); diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index f8ad240a5e..f5fc9fca86 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -191,6 +191,7 @@ where max_block: Option, run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, + target: Option, ) -> (Self, BeaconConsensusEngineHandle) { let (to_engine, rx) = mpsc::unbounded_channel(); Self::with_channel( @@ -203,6 +204,7 @@ where max_block, run_pipeline_continuously, payload_builder, + target, to_engine, rx, ) @@ -221,6 +223,7 @@ where max_block: Option, run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, + target: Option, to_engine: UnboundedSender, rx: UnboundedReceiver, ) -> (Self, BeaconConsensusEngineHandle) { @@ -232,7 +235,7 @@ where run_pipeline_continuously, max_block, ); - let this = Self { + let mut this = Self { db, sync, blockchain, @@ -246,6 +249,10 @@ where metrics: Metrics::default(), }; + if let Some(target) = target { + this.sync.set_pipeline_sync_target(target); + } + (this, handle) } @@ -765,7 +772,6 @@ where fn on_sync_event( &mut self, ev: EngineSyncEvent, - current_state: &ForkchoiceState, ) -> Option> { match ev { EngineSyncEvent::FetchedFullBlock(block) => { @@ -795,16 +801,30 @@ where EngineSyncEvent::PipelineFinished { result, reached_max_block } => { match result { Ok(ctrl) => { - if ctrl.is_unwind() { - self.sync.set_pipeline_sync_target(current_state.head_block_hash); - } else if reached_max_block { + if reached_max_block { // Terminate the sync early if it's reached the maximum user // configured block. return Some(Ok(())) } + let current_state = match self.forkchoice_state { + Some(state) => state, + None => { + // This is only possible if the node was run with `debug.tip` + // argument and without CL. + warn!(target: "consensus::engine", "No forkchoice state available"); + return None + } + }; + + if ctrl.is_unwind() { + // Attempt to sync to the head block after unwind. + self.sync.set_pipeline_sync_target(current_state.head_block_hash); + return None + } + // Update the state and hashes of the blockchain tree if possible. - match self.restore_tree_if_possible(*current_state) { + match self.restore_tree_if_possible(current_state) { Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle), Err(error) => { error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); @@ -859,15 +879,9 @@ where } } - // Lookup the forkchoice state. We can't launch the pipeline without the tip. - let forkchoice_state = match &this.forkchoice_state { - Some(state) => *state, - None => return Poll::Pending, - }; - // poll sync controller while let Poll::Ready(sync_event) = this.sync.poll(cx) { - if let Some(res) = this.on_sync_event(sync_event, &forkchoice_state) { + if let Some(res) = this.on_sync_event(sync_event) { return Poll::Ready(res) } } @@ -1040,6 +1054,7 @@ mod tests { None, false, payload_builder, + None, ); (engine, TestEnv::new(db, tip_rx, handle))