From 25ac1370fdaaa0ce3ff4d803d44937d790084244 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Sun, 19 Mar 2023 15:59:37 +0200 Subject: [PATCH] refactor(sync): header sync mode (#1831) --- Cargo.lock | 1 - bin/reth/src/chain/import.rs | 28 ++-- bin/reth/src/node/mod.rs | 94 +++++------ bin/reth/src/stage/mod.rs | 2 +- crates/consensus/beacon/Cargo.toml | 3 - .../consensus/beacon/src/beacon_consensus.rs | 28 +--- crates/consensus/beacon/src/builder.rs | 22 --- crates/consensus/beacon/src/lib.rs | 3 - crates/interfaces/src/consensus.rs | 4 - crates/interfaces/src/test_utils/headers.rs | 23 +-- crates/stages/src/lib.rs | 8 +- crates/stages/src/pipeline/builder.rs | 9 +- crates/stages/src/pipeline/mod.rs | 10 +- crates/stages/src/sets.rs | 25 ++- crates/stages/src/stages/headers.rs | 147 +++++++----------- 15 files changed, 155 insertions(+), 252 deletions(-) delete mode 100644 crates/consensus/beacon/src/builder.rs diff --git a/Cargo.lock b/Cargo.lock index bbd333abd0..4ad04cc2b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4583,7 +4583,6 @@ dependencies = [ "reth-consensus-common", "reth-interfaces", "reth-primitives", - "tokio", ] [[package]] diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 2a6ad2490d..4cd314e227 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -12,11 +12,9 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, }; use reth_interfaces::{ - consensus::{Consensus, ForkchoiceState}, - p2p::headers::client::NoopStatusUpdater, - sync::SyncStateUpdater, + consensus::Consensus, p2p::headers::client::NoopStatusUpdater, sync::SyncStateUpdater, }; -use reth_primitives::ChainSpec; +use reth_primitives::{ChainSpec, H256}; use reth_staged_sync::{ utils::{ chainspec::genesis_value_parser, @@ -26,9 +24,10 @@ use reth_staged_sync::{ }; use reth_stages::{ prelude::*, - stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, + stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage}, }; use std::sync::Arc; +use tokio::sync::watch; use tracing::{debug, info}; /// Syncs RLP encoded blocks from a file. @@ -89,6 +88,9 @@ impl ImportCommand { init_genesis(db.clone(), self.chain.clone())?; + let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())); + info!(target: "reth::cli", "Consensus engine initialized"); + // create a new FileClient info!(target: "reth::cli", "Importing chain file"); let file_client = Arc::new(FileClient::new(&self.path).await?); @@ -97,18 +99,13 @@ impl ImportCommand { let tip = file_client.tip().expect("file client has no tip"); info!(target: "reth::cli", "Chain file imported"); - let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone()); - debug!(target: "reth::cli", %tip, "Tip manually set"); - notifier.send(ForkchoiceState { - head_block_hash: tip, - safe_block_hash: tip, - finalized_block_hash: tip, - })?; - info!(target: "reth::cli", "Consensus engine initialized"); - let (mut pipeline, events) = self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?; + // override the tip + pipeline.set_tip(tip); + debug!(target: "reth::cli", %tip, "Tip manually set"); + tokio::spawn(handle_events(None, events)); // Run pipeline @@ -140,12 +137,15 @@ impl ImportCommand { .build(file_client.clone(), consensus.clone(), db) .into_task(); + let (tip_tx, tip_rx) = watch::channel(H256::zero()); let factory = reth_executor::Factory::new(self.chain.clone()); let mut pipeline = Pipeline::builder() + .with_tip_sender(tip_tx) .with_sync_state_updater(file_client) .add_stages( DefaultStages::new( + HeaderSyncMode::Tip(tip_rx), consensus.clone(), header_downloader, body_downloader, diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 225676bd7f..4a3e426a96 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -50,7 +50,7 @@ use reth_staged_sync::{ }; use reth_stages::{ prelude::*, - stages::{ExecutionStage, HeaderStage, SenderRecoveryStage, TotalDifficultyStage, FINISH}, + stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage, FINISH}, }; use reth_tasks::TaskExecutor; use std::{ @@ -154,7 +154,7 @@ impl Command { init_genesis(db.clone(), self.chain.clone())?; - let (consensus, forkchoice_state_tx) = self.init_consensus()?; + let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())) as Arc; info!(target: "reth::cli", "Consensus engine initialized"); self.init_trusted_nodes(&mut config); @@ -183,8 +183,9 @@ impl Command { info!(target: "reth::cli", "Continuous sync mode enabled"); } - let engine_api_handle = - self.init_engine_api(Arc::clone(&db), forkchoice_state_tx, &ctx.task_executor); + // TODO: This will be fixed with the sync controller (https://github.com/paradigmxyz/reth/pull/1662) + let (tx, _rx) = watch::channel(ForkchoiceState::default()); + let engine_api_handle = self.init_engine_api(Arc::clone(&db), tx, &ctx.task_executor); info!(target: "reth::cli", "Engine API handler initialized"); let _auth_server = self @@ -209,6 +210,16 @@ impl Command { ) .await?; + if let Some(tip) = self.tip { + pipeline.set_tip(tip); + debug!(target: "reth::cli", %tip, "Tip manually set"); + } else { + let warn_msg = "No tip specified. \ + reth cannot communicate with consensus clients, \ + so a tip must manually be provided for the online stages with --debug.tip ."; + warn!(target: "reth::cli", warn_msg); + } + ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events)); // Run pipeline @@ -304,26 +315,6 @@ impl Command { } } - fn init_consensus(&self) -> eyre::Result<(Arc, watch::Sender)> { - let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone()); - - if let Some(tip) = self.tip { - debug!(target: "reth::cli", %tip, "Tip manually set"); - notifier.send(ForkchoiceState { - head_block_hash: tip, - safe_block_hash: tip, - finalized_block_hash: tip, - })?; - } else { - let warn_msg = "No tip specified. \ - reth cannot communicate with consensus clients, \ - so a tip must manually be provided for the online stages with --debug.tip ."; - warn!(target: "reth::cli", warn_msg); - } - - Ok((consensus, notifier)) - } - fn init_engine_api( &self, db: Arc>, @@ -486,44 +477,31 @@ impl Command { builder = builder.with_max_block(max_block) } + let (tip_tx, tip_rx) = watch::channel(H256::zero()); let factory = reth_executor::Factory::new(self.chain.clone()); - let default_stages = if continuous { - let continuous_headers = - HeaderStage::new(header_downloader, consensus.clone()).continuous(); - let online_builder = OnlineStages::builder_with_headers( - continuous_headers, - consensus.clone(), - body_downloader, - ); - DefaultStages::::add_offline_stages( - online_builder, - updater.clone(), - factory.clone(), - ) - } else { - DefaultStages::new( - consensus.clone(), - header_downloader, - body_downloader, - updater.clone(), - factory.clone(), - ) - .builder() - }; - + let header_mode = + if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder - .with_sync_state_updater(updater) + .with_sync_state_updater(updater.clone()) + .with_tip_sender(tip_tx) .add_stages( - default_stages - .set( - TotalDifficultyStage::new(consensus.clone()) - .with_commit_threshold(stage_conf.total_difficulty.commit_threshold), - ) - .set(SenderRecoveryStage { - commit_threshold: stage_conf.sender_recovery.commit_threshold, - }) - .set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)), + DefaultStages::new( + header_mode, + consensus.clone(), + header_downloader, + body_downloader, + updater, + factory.clone(), + ) + .set( + TotalDifficultyStage::new(consensus.clone()) + .with_commit_threshold(stage_conf.total_difficulty.commit_threshold), + ) + .set(SenderRecoveryStage { + commit_threshold: stage_conf.sender_recovery.commit_threshold, + }) + .set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)), ) .build(); diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index ca0c4fb366..b57bb7f454 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -123,7 +123,7 @@ impl Command { match self.stage { StageEnum::Bodies => { - let (consensus, _) = BeaconConsensus::builder().build(self.chain.clone()); + let consensus = Arc::new(BeaconConsensus::new(self.chain.clone())); let mut config = config; config.peers.connect_trusted_nodes_only = self.network.trusted_only; diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 90a304372e..1ff30c99f1 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -12,8 +12,5 @@ reth-consensus-common = { path = "../common" } reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } -# async -tokio = { version = "1", features = ["sync"] } - [dev-dependencies] reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } diff --git a/crates/consensus/beacon/src/beacon_consensus.rs b/crates/consensus/beacon/src/beacon_consensus.rs index 5f7d226023..2018d32025 100644 --- a/crates/consensus/beacon/src/beacon_consensus.rs +++ b/crates/consensus/beacon/src/beacon_consensus.rs @@ -1,44 +1,26 @@ //! Consensus for ethereum network use reth_consensus_common::validation; -use reth_interfaces::consensus::{Consensus, ConsensusError, ForkchoiceState}; +use reth_interfaces::consensus::{Consensus, ConsensusError}; use reth_primitives::{ChainSpec, Hardfork, SealedBlock, SealedHeader, EMPTY_OMMER_ROOT, U256}; use std::sync::Arc; -use tokio::sync::watch; - -use super::BeaconConsensusBuilder; /// Ethereum beacon consensus /// -/// This consensus engine does basic checks as outlined in the execution specs, -/// but otherwise defers consensus on what the current chain is to a consensus client. +/// This consensus engine does basic checks as outlined in the execution specs. #[derive(Debug)] pub struct BeaconConsensus { - /// Watcher over the forkchoice state - forkchoice_state_rx: watch::Receiver, /// Configuration chain_spec: Arc, } impl BeaconConsensus { /// Create a new instance of [BeaconConsensus] - pub fn new( - chain_spec: Arc, - forkchoice_state_rx: watch::Receiver, - ) -> Self { - Self { chain_spec, forkchoice_state_rx } - } - - /// Create new [BeaconConsensusBuilder]. - pub fn builder() -> BeaconConsensusBuilder { - BeaconConsensusBuilder::default() + pub fn new(chain_spec: Arc) -> Self { + Self { chain_spec } } } impl Consensus for BeaconConsensus { - fn fork_choice_state(&self) -> watch::Receiver { - self.forkchoice_state_rx.clone() - } - fn pre_validate_header( &self, header: &SealedHeader, @@ -101,7 +83,7 @@ mod test { #[test] fn test_has_block_reward_before_paris() { let chain_spec = Arc::new(ChainSpecBuilder::mainnet().build()); - let (consensus, _) = BeaconConsensus::builder().build(chain_spec); + let consensus = BeaconConsensus::new(chain_spec); assert!(consensus.has_block_reward(U256::ZERO, U256::ZERO)); } } diff --git a/crates/consensus/beacon/src/builder.rs b/crates/consensus/beacon/src/builder.rs deleted file mode 100644 index 3636e8f95a..0000000000 --- a/crates/consensus/beacon/src/builder.rs +++ /dev/null @@ -1,22 +0,0 @@ -use super::BeaconConsensus; -use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::ChainSpec; -use std::sync::Arc; -use tokio::sync::watch; - -/// TODO: -#[derive(Debug, Default)] -pub struct BeaconConsensusBuilder; - -impl BeaconConsensusBuilder { - /// Create new instance of [BeaconConsensus] and forkchoice notifier. Internally, creates a - /// [watch::channel] for updating the forkchoice state. - pub fn build( - self, - chain_spec: Arc, - ) -> (Arc, watch::Sender) { - let (forkchoice_state_tx, forkchoice_state_rx) = watch::channel(ForkchoiceState::default()); - let inner = Arc::new(BeaconConsensus::new(chain_spec, forkchoice_state_rx)); - (inner, forkchoice_state_tx) - } -} diff --git a/crates/consensus/beacon/src/lib.rs b/crates/consensus/beacon/src/lib.rs index 5ecb1a1e05..04d5af3287 100644 --- a/crates/consensus/beacon/src/lib.rs +++ b/crates/consensus/beacon/src/lib.rs @@ -7,7 +7,4 @@ //! Beacon consensus implementation. mod beacon_consensus; -mod builder; - pub use beacon_consensus::BeaconConsensus; -pub use builder::BeaconConsensusBuilder; diff --git a/crates/interfaces/src/consensus.rs b/crates/interfaces/src/consensus.rs index 6886da85e4..5f232b67f3 100644 --- a/crates/interfaces/src/consensus.rs +++ b/crates/interfaces/src/consensus.rs @@ -3,7 +3,6 @@ use reth_primitives::{ BlockHash, BlockNumber, InvalidTransactionError, SealedBlock, SealedHeader, H256, U256, }; use std::fmt::Debug; -use tokio::sync::watch::Receiver; /// Re-export fork choice state pub use reth_rpc_types::engine::ForkchoiceState; @@ -12,9 +11,6 @@ pub use reth_rpc_types::engine::ForkchoiceState; #[async_trait] #[auto_impl::auto_impl(&, Arc)] pub trait Consensus: Debug + Send + Sync { - /// Get a receiver for the fork choice state - fn fork_choice_state(&self) -> Receiver; - /// Validate if the header is correct and follows consensus specification. /// /// This is called before properties that are not in the header itself (like total difficulty) diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index d4f7fce563..ccb8bcd59b 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -259,22 +259,13 @@ impl HeadersClient for TestHeadersClient { /// Consensus engine implementation for testing #[derive(Debug)] pub struct TestConsensus { - /// Watcher over the forkchoice state - channel: (watch::Sender, watch::Receiver), /// Flag whether the header validation should purposefully fail fail_validation: AtomicBool, } impl Default for TestConsensus { fn default() -> Self { - Self { - channel: watch::channel(ForkchoiceState { - head_block_hash: H256::zero(), - finalized_block_hash: H256::zero(), - safe_block_hash: H256::zero(), - }), - fail_validation: AtomicBool::new(false), - } + Self { fail_validation: AtomicBool::new(false) } } } @@ -288,14 +279,6 @@ impl TestConsensus { pub fn set_fail_validation(&self, val: bool) { self.fail_validation.store(val, Ordering::SeqCst) } - - /// Update the forkchoice state. - pub fn notify_fork_choice_state( - &self, - state: ForkchoiceState, - ) -> Result<(), SendError> { - self.channel.0.send(state) - } } /// Status updater for testing. @@ -323,10 +306,6 @@ impl StatusUpdater for TestStatusUpdater { #[async_trait::async_trait] impl Consensus for TestConsensus { - fn fork_choice_state(&self) -> watch::Receiver { - self.channel.1.clone() - } - fn pre_validate_header( &self, header: &SealedHeader, diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 562d61665a..dbc43f486a 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -28,9 +28,11 @@ //! # use reth_interfaces::sync::NoopSyncStateUpdate; //! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater}; //! # use reth_executor::Factory; -//! # use reth_primitives::{PeerId,MAINNET}; +//! # use reth_primitives::{PeerId, MAINNET, H256}; //! # use reth_stages::Pipeline; //! # use reth_stages::sets::DefaultStages; +//! # use reth_stages::stages::HeaderSyncMode; +//! # use tokio::sync::watch; //! # let consensus: Arc = Arc::new(TestConsensus::default()); //! # let headers_downloader = ReverseHeadersDownloaderBuilder::default().build( //! # Arc::new(TestHeadersClient::default()), @@ -41,13 +43,15 @@ //! # consensus.clone(), //! # create_test_rw_db() //! # ); +//! # let (tip_tx, tip_rx) = watch::channel(H256::default()); //! # let factory = Factory::new(Arc::new(MAINNET.clone())); //! # let (status_updater, _) = TestStatusUpdater::new(); //! // Create a pipeline that can fully sync //! # let pipeline: Pipeline, NoopSyncStateUpdate> = //! Pipeline::builder() +//! .with_tip_sender(tip_tx) //! .add_stages( -//! DefaultStages::new(consensus, headers_downloader, bodies_downloader, status_updater, factory) +//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, status_updater, factory) //! ) //! .build(); //! ``` diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 4882dbdf0f..37853708cd 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -1,7 +1,8 @@ use crate::{Pipeline, Stage, StageSet}; use reth_db::database::Database; use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater}; -use reth_primitives::BlockNumber; +use reth_primitives::{BlockNumber, H256}; +use tokio::sync::watch; /// Builds a [`Pipeline`]. #[derive(Debug)] @@ -56,6 +57,12 @@ where self } + /// Set the tip sender. + pub fn with_tip_sender(mut self, tip_tx: watch::Sender) -> Self { + self.pipeline.tip_tx = Some(tip_tx); + self + } + /// Set a [SyncStateUpdater]. pub fn with_sync_state_updater(mut self, updater: U) -> Self { self.pipeline.sync_state_updater = Some(updater); diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index b8db114629..65521a0b98 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -3,7 +3,7 @@ use metrics::Gauge; use reth_db::database::Database; use reth_interfaces::sync::{SyncState, SyncStateUpdater}; use reth_metrics_derive::Metrics; -use reth_primitives::BlockNumber; +use reth_primitives::{BlockNumber, H256}; use reth_provider::Transaction; use std::{ collections::HashMap, @@ -11,6 +11,7 @@ use std::{ ops::Deref, sync::Arc, }; +use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; @@ -80,6 +81,7 @@ pub struct Pipeline { listeners: PipelineEventListeners, sync_state_updater: Option, progress: PipelineProgress, + tip_tx: Option>, metrics: Metrics, } @@ -91,6 +93,7 @@ impl Default for Pipeline { listeners: PipelineEventListeners::default(), sync_state_updater: None, progress: PipelineProgress::default(), + tip_tx: None, metrics: Metrics::default(), } } @@ -111,6 +114,11 @@ impl Pipeline { PipelineBuilder::default() } + /// Set tip for reverse sync. + pub fn set_tip(&self, tip: H256) { + self.tip_tx.as_ref().expect("tip sender is set").send(tip).expect("tip channel closed"); + } + /// Listen for events on the pipeline. pub fn events(&mut self) -> UnboundedReceiverStream { self.listeners.new_listener() diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 501cd0fdde..248c34dd5d 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -40,7 +40,7 @@ //! ``` use crate::{ stages::{ - AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage, + AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage, HeaderSyncMode, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, }, @@ -77,6 +77,7 @@ pub struct DefaultStages { impl DefaultStages { /// Create a new set of default stages with default values. pub fn new( + header_mode: HeaderSyncMode, consensus: Arc, header_downloader: H, body_downloader: B, @@ -87,7 +88,7 @@ impl DefaultStages { EF: ExecutorFactory, { Self { - online: OnlineStages::new(consensus, header_downloader, body_downloader), + online: OnlineStages::new(header_mode, consensus, header_downloader, body_downloader), executor_factory, status_updater, } @@ -130,6 +131,8 @@ where /// themselves offline. #[derive(Debug)] pub struct OnlineStages { + /// The sync mode for the headers stage. + header_mode: HeaderSyncMode, /// The consensus engine used to validate incoming data. consensus: Arc, /// The block header downloader @@ -140,8 +143,13 @@ pub struct OnlineStages { impl OnlineStages { /// Create a new set of online stages with default values. - pub fn new(consensus: Arc, header_downloader: H, body_downloader: B) -> Self { - Self { consensus, header_downloader, body_downloader } + pub fn new( + header_mode: HeaderSyncMode, + consensus: Arc, + header_downloader: H, + body_downloader: B, + ) -> Self { + Self { header_mode, consensus, header_downloader, body_downloader } } } @@ -153,8 +161,8 @@ where /// Create a new builder using the given headers stage. pub fn builder_with_headers( headers: HeaderStage, - consensus: Arc, body_downloader: B, + consensus: Arc, ) -> StageSetBuilder { StageSetBuilder::default() .add_stage(headers) @@ -165,11 +173,12 @@ where /// Create a new builder using the given bodies stage. pub fn builder_with_bodies( bodies: BodyStage, - consensus: Arc, + mode: HeaderSyncMode, header_downloader: H, + consensus: Arc, ) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(HeaderStage::new(header_downloader, consensus.clone())) + .add_stage(HeaderStage::new(header_downloader, mode)) .add_stage(TotalDifficultyStage::new(consensus.clone())) .add_stage(bodies) } @@ -183,7 +192,7 @@ where { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(HeaderStage::new(self.header_downloader, self.consensus.clone())) + .add_stage(HeaderStage::new(self.header_downloader, self.header_mode)) .add_stage(TotalDifficultyStage::new(self.consensus.clone())) .add_stage(BodyStage { downloader: self.body_downloader, consensus: self.consensus }) } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index b33ae869a8..a628069f85 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -7,18 +7,28 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_interfaces::{ - consensus::{Consensus, ForkchoiceState}, p2p::headers::downloader::{HeaderDownloader, SyncTarget}, provider::ProviderError, }; -use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader}; +use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, H256}; use reth_provider::Transaction; -use std::sync::Arc; +use tokio::sync::watch; use tracing::*; /// The [`StageId`] of the headers downloader stage. pub const HEADERS: StageId = StageId("Headers"); +/// The header sync mode. +#[derive(Debug)] +pub enum HeaderSyncMode { + /// A sync mode in which the stage continuously requests the downloader for + /// next blocks. + Continuous, + /// A sync mode in which the stage polls the receiver for the next tip + /// to download from. + Tip(watch::Receiver), +} + /// The headers stage. /// /// The headers stage downloads all block headers from the highest block in the local database to @@ -36,10 +46,8 @@ pub const HEADERS: StageId = StageId("Headers"); pub struct HeaderStage { /// Strategy for downloading the headers downloader: D, - /// Consensus client implementation - consensus: Arc, - /// Whether or not the stage should download continuously, or wait for the fork choice state - continuous: bool, + /// The sync mode for the stage. + mode: HeaderSyncMode, } // === impl HeaderStage === @@ -49,8 +57,8 @@ where D: HeaderDownloader, { /// Create a new header stage - pub fn new(downloader: D, consensus: Arc) -> Self { - Self { downloader, consensus, continuous: false } + pub fn new(downloader: D, mode: HeaderSyncMode) -> Self { + Self { downloader, mode } } fn is_stage_done( @@ -66,17 +74,11 @@ where Ok(header_cursor.next()?.map(|(next_num, _)| head_num + 1 == next_num).unwrap_or_default()) } - /// Set the stage to download continuously - pub fn continuous(mut self) -> Self { - self.continuous = true; - self - } - /// Get the head and tip of the range we need to sync /// /// See also [SyncTarget] async fn get_sync_gap( - &self, + &mut self, tx: &Transaction<'_, DB>, stage_progress: u64, ) -> Result { @@ -112,28 +114,27 @@ where // reverse from there. Else, it should use whatever the forkchoice state reports. let target = match next_header { Some(header) if stage_progress + 1 != header.number => SyncTarget::Gap(header), - None => { - if self.continuous { - tracing::trace!(target: "sync::stages::headers", ?head_num, "No next header found, using continuous sync strategy"); - SyncTarget::TipNum(head_num + 1) - } else { - SyncTarget::Tip(self.next_fork_choice_state().await.head_block_hash) - } - } + None => self.next_sync_target(head_num).await, _ => return Err(StageError::StageProgress(stage_progress)), }; Ok(SyncGap { local_head, target }) } - /// Awaits the next [ForkchoiceState] message from [Consensus] with a non-zero block hash - async fn next_fork_choice_state(&self) -> ForkchoiceState { - let mut state_rcv = self.consensus.fork_choice_state(); - loop { - let _ = state_rcv.changed().await; - let forkchoice = state_rcv.borrow(); - if !forkchoice.head_block_hash.is_zero() { - return forkchoice.clone() + async fn next_sync_target(&mut self, head: BlockNumber) -> SyncTarget { + match self.mode { + HeaderSyncMode::Tip(ref mut rx) => { + loop { + let _ = rx.changed().await; // TODO: remove this await? + let tip = rx.borrow(); + if !tip.is_zero() { + return SyncTarget::Tip(*tip) + } + } + } + HeaderSyncMode::Continuous => { + tracing::trace!(target: "sync::stages::headers", head, "No next header found, using continuous sync strategy"); + SyncTarget::TipNum(head + 1) } } } @@ -285,35 +286,20 @@ mod tests { use test_runner::HeadersTestRunner; mod test_runner { - use crate::{ - stages::headers::HeaderStage, - test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, - }, - ExecInput, ExecOutput, UnwindInput, - }; - use reth_db::{ - tables, - transaction::{DbTx, DbTxMut}, - }; + use super::*; + use crate::test_utils::{TestRunnerError, TestTransaction}; use reth_downloaders::headers::reverse_headers::{ ReverseHeadersDownloader, ReverseHeadersDownloaderBuilder, }; - use reth_interfaces::{ - consensus::ForkchoiceState, - p2p::headers::downloader::HeaderDownloader, - test_utils::{ - generators::{random_header, random_header_range}, - TestConsensus, TestHeaderDownloader, TestHeadersClient, - }, + use reth_interfaces::test_utils::{ + generators::random_header_range, TestConsensus, TestHeaderDownloader, TestHeadersClient, }; - use reth_primitives::{BlockNumber, SealedHeader, U256}; + use reth_primitives::U256; use std::sync::Arc; pub(crate) struct HeadersTestRunner { - pub(crate) consensus: Arc, pub(crate) client: Arc, + channel: (watch::Sender, watch::Receiver), downloader_factory: Box D + Send + Sync + 'static>, tx: TestTransaction, } @@ -321,12 +307,16 @@ mod tests { impl Default for HeadersTestRunner { fn default() -> Self { let client = Arc::new(TestHeadersClient::default()); - let consensus = Arc::new(TestConsensus::default()); Self { client: client.clone(), - consensus: consensus.clone(), + channel: watch::channel(H256::zero()), downloader_factory: Box::new(move || { - TestHeaderDownloader::new(client.clone(), consensus.clone(), 1000, 1000) + TestHeaderDownloader::new( + client.clone(), + Arc::new(TestConsensus::default()), + 1000, + 1000, + ) }), tx: TestTransaction::default(), } @@ -342,9 +332,8 @@ mod tests { fn stage(&self) -> Self::S { HeaderStage { - consensus: self.consensus.clone(), + mode: HeaderSyncMode::Tip(self.channel.1.clone()), downloader: (*self.downloader_factory)(), - continuous: false, } } } @@ -414,12 +403,7 @@ mod tests { self.tx.insert_headers(std::iter::once(&tip))?; tip.hash() }; - self.consensus - .notify_fork_choice_state(ForkchoiceState { - head_block_hash: tip, - ..Default::default() - }) - .expect("Setting tip failed"); + self.send_tip(tip); Ok(()) } } @@ -433,14 +417,13 @@ mod tests { impl HeadersTestRunner> { pub(crate) fn with_linear_downloader() -> Self { let client = Arc::new(TestHeadersClient::default()); - let consensus = Arc::new(TestConsensus::default()); Self { client: client.clone(), - consensus: consensus.clone(), + channel: watch::channel(H256::zero()), downloader_factory: Box::new(move || { ReverseHeadersDownloaderBuilder::default() .stream_batch_size(500) - .build(client.clone(), consensus.clone()) + .build(client.clone(), Arc::new(TestConsensus::default())) }), tx: TestTransaction::default(), } @@ -458,6 +441,10 @@ mod tests { self.tx.ensure_no_entry_above::(block, |key| key)?; Ok(()) } + + pub(crate) fn send_tip(&self, tip: H256) { + self.channel.0.send(tip).expect("failed to send tip"); + } } } @@ -479,13 +466,7 @@ mod tests { // skip `after_execution` hook for linear downloader let tip = headers.last().unwrap(); - runner - .consensus - .notify_fork_choice_state(ForkchoiceState { - head_block_hash: tip.hash(), - ..Default::default() - }) - .expect("Setting tip failed"); + runner.send_tip(tip.hash()); let result = rx.await.unwrap(); assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number); @@ -497,16 +478,10 @@ mod tests { async fn head_and_tip_lookup() { let runner = HeadersTestRunner::default(); let tx = runner.tx().inner(); - let stage = runner.stage(); + let mut stage = runner.stage(); let consensus_tip = H256::random(); - runner - .consensus - .notify_fork_choice_state(ForkchoiceState { - head_block_hash: consensus_tip, - ..Default::default() - }) - .expect("Setting tip failed"); + runner.send_tip(consensus_tip); // Genesis let stage_progress = 0; @@ -570,13 +545,7 @@ mod tests { // skip `after_execution` hook for linear downloader let tip = headers.last().unwrap(); - runner - .consensus - .notify_fork_choice_state(ForkchoiceState { - head_block_hash: tip.hash(), - ..Default::default() - }) - .expect("Setting tip failed"); + runner.send_tip(tip.hash()); let result = rx.await.unwrap(); assert_matches!(result, Ok(ExecOutput { done: false, stage_progress: progress }) if progress == stage_progress);