diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 79faa46225..d22105b050 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -20,3 +20,4 @@ pub mod runner; pub mod stage; pub mod test_eth_chain; pub mod test_vectors; +pub mod utils; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index d9a03ad3d2..10a83e7c4f 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -6,6 +6,7 @@ use crate::{ dirs::{ConfigPath, DbPath, PlatformPath}, prometheus_exporter, runner::CliContext, + utils::get_single_header, }; use clap::{crate_version, Parser}; use eyre::Context; @@ -31,10 +32,10 @@ use reth_interfaces::{ sync::SyncStateUpdater, }; use reth_network::{ - error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, + error::NetworkError, FetchClient, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, }; use reth_network_api::NetworkInfo; -use reth_primitives::{BlockNumber, ChainSpec, Head, H256}; +use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, Head, H256}; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; use reth_rpc_engine_api::{EngineApi, EngineApiHandle}; use reth_staged_sync::{ @@ -205,9 +206,18 @@ impl Command { task_executor: &TaskExecutor, ) -> eyre::Result<(Pipeline, impl SyncStateUpdater>, impl Stream)> { - // building network downloaders using the fetch client - let fetch_client = Arc::new(network.fetch_client().await?); + let fetch_client = network.fetch_client().await?; + let max_block = if let Some(block) = self.max_block { + Some(block) + } else if let Some(tip) = self.tip { + Some(self.lookup_or_fetch_tip(db.clone(), fetch_client.clone(), tip).await?) + } else { + None + }; + // TODO: remove Arc requirement from downloader builders. + // building network downloaders using the fetch client + let fetch_client = Arc::new(fetch_client); let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers) .build(fetch_client.clone(), consensus.clone()) .into_task_with(task_executor); @@ -217,7 +227,14 @@ impl Command { .into_task_with(task_executor); let mut pipeline = self - .build_pipeline(config, header_downloader, body_downloader, network.clone(), consensus) + .build_pipeline( + config, + header_downloader, + body_downloader, + network.clone(), + consensus, + max_block, + ) .await?; let events = stream_select( @@ -316,7 +333,7 @@ impl Command { Ok(handle) } - fn fetch_head(&self, db: Arc>) -> Result { + fn lookup_head(&self, db: Arc>) -> Result { db.view(|tx| { let head = FINISH.get_progress(tx)?.unwrap_or_default(); let header = tx @@ -339,13 +356,42 @@ impl Command { .map_err(Into::into) } + /// Attempt to look up the block number for the tip hash in the database. + /// If it doesn't exist, download the header and return the block number. + /// + /// NOTE: The download is attempted with infinite retries. + async fn lookup_or_fetch_tip( + &self, + db: Arc>, + fetch_client: FetchClient, + tip: H256, + ) -> Result { + if let Some(number) = db.view(|tx| tx.get::(tip))?? { + debug!(target: "reth::cli", ?tip, number, "Successfully looked up tip in the database"); + return Ok(number) + } + + debug!(target: "reth::cli", ?tip, "Fetching tip header from the network."); + loop { + match get_single_header(fetch_client.clone(), BlockHashOrNumber::Hash(tip)).await { + Ok(tip_header) => { + debug!(target: "reth::cli", ?tip, number = tip_header.number, "Successfully fetched tip"); + return Ok(tip_header.number) + } + Err(error) => { + error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying..."); + } + } + } + } + fn load_network_config( &self, config: &Config, db: Arc>, executor: TaskExecutor, ) -> NetworkConfig>>> { - let head = self.fetch_head(Arc::clone(&db)).expect("the head block is missing"); + let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing"); self.network .network_config(config, self.chain.clone()) @@ -361,6 +407,7 @@ impl Command { body_downloader: B, updater: U, consensus: &Arc, + max_block: Option, ) -> eyre::Result, U>> where H: HeaderDownloader + 'static, @@ -371,7 +418,7 @@ impl Command { let mut builder = Pipeline::builder(); - if let Some(max_block) = self.max_block { + if let Some(max_block) = max_block { debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); builder = builder.with_max_block(max_block) } diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs index f340f851b9..d8b27777ba 100644 --- a/bin/reth/src/p2p/mod.rs +++ b/bin/reth/src/p2p/mod.rs @@ -2,17 +2,14 @@ use crate::{ args::DiscoveryArgs, dirs::{ConfigPath, PlatformPath}, + utils::get_single_header, }; use backon::{ConstantBuilder, Retryable}; use clap::{Parser, Subcommand}; use reth_db::mdbx::{Env, EnvKind, WriteMap}; use reth_discv4::NatResolver; -use reth_interfaces::p2p::{ - bodies::client::BodiesClient, - headers::client::{HeadersClient, HeadersRequest}, -}; -use reth_network::FetchClient; -use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader}; +use reth_interfaces::p2p::bodies::client::BodiesClient; +use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord}; use reth_provider::ShareableDatabase; use reth_staged_sync::{ utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser}, @@ -117,7 +114,7 @@ impl Command { match self.command { Subcommands::Header { id } => { - let header = (move || self.get_single_header(fetch_client.clone(), id)) + let header = (move || get_single_header(fetch_client.clone(), id)) .retry(&backoff) .notify(|err, _| println!("Error requesting header: {err}. Retrying...")) .await?; @@ -130,10 +127,7 @@ impl Command { println!("Block number provided. Downloading header first..."); let client = fetch_client.clone(); let header = (move || { - self.get_single_header( - client.clone(), - BlockHashOrNumber::Number(number), - ) + get_single_header(client.clone(), BlockHashOrNumber::Number(number)) }) .retry(&backoff) .notify(|err, _| println!("Error requesting header: {err}. Retrying...")) @@ -162,43 +156,4 @@ impl Command { Ok(()) } - - /// Get a single header from network - pub async fn get_single_header( - &self, - client: FetchClient, - id: BlockHashOrNumber, - ) -> eyre::Result { - let request = HeadersRequest { - direction: reth_primitives::HeadersDirection::Rising, - limit: 1, - start: id, - }; - - let (_, response) = client.get_headers(request).await?.split(); - - if response.len() != 1 { - eyre::bail!( - "Invalid number of headers received. Expected: 1. Received: {}", - response.len() - ) - } - - let header = response.into_iter().next().unwrap().seal_slow(); - - let valid = match id { - BlockHashOrNumber::Hash(hash) => header.hash() == hash, - BlockHashOrNumber::Number(number) => header.number == number, - }; - - if !valid { - eyre::bail!( - "Received invalid header. Received: {:?}. Expected: {:?}", - header.num_hash(), - id - ); - } - - Ok(header) - } } diff --git a/bin/reth/src/utils.rs b/bin/reth/src/utils.rs new file mode 100644 index 0000000000..e163afa71c --- /dev/null +++ b/bin/reth/src/utils.rs @@ -0,0 +1,43 @@ +//! Common CLI utility functions. + +use reth_interfaces::p2p::{ + download::DownloadClient, + headers::client::{HeadersClient, HeadersRequest}, + priority::Priority, +}; +use reth_network::FetchClient; +use reth_primitives::{BlockHashOrNumber, HeadersDirection, SealedHeader}; + +/// Get a single header from network +pub async fn get_single_header( + client: FetchClient, + id: BlockHashOrNumber, +) -> eyre::Result { + let request = HeadersRequest { direction: HeadersDirection::Rising, limit: 1, start: id }; + + let (peer_id, response) = + client.get_headers_with_priority(request, Priority::High).await?.split(); + + if response.len() != 1 { + client.report_bad_message(peer_id); + eyre::bail!("Invalid number of headers received. Expected: 1. Received: {}", response.len()) + } + + let header = response.into_iter().next().unwrap().seal_slow(); + + let valid = match id { + BlockHashOrNumber::Hash(hash) => header.hash() == hash, + BlockHashOrNumber::Number(number) => header.number == number, + }; + + if !valid { + client.report_bad_message(peer_id); + eyre::bail!( + "Received invalid header. Received: {:?}. Expected: {:?}", + header.num_hash(), + id + ); + } + + Ok(header) +} diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs index 5fd1259cfe..2223527f46 100644 --- a/crates/stages/src/pipeline/ctrl.rs +++ b/crates/stages/src/pipeline/ctrl.rs @@ -15,11 +15,14 @@ pub(crate) enum ControlFlow { /// The progress of the last stage progress: BlockNumber, }, - NoProgress, + NoProgress { + /// The current stage progress. + stage_progress: Option, + }, } impl ControlFlow { pub(crate) fn should_continue(&self) -> bool { - matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress) + matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. }) } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 797c9b3363..88712737cd 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -145,6 +145,13 @@ impl Pipeline { .zip(self.max_block) .map_or(false, |(progress, target)| progress >= target) { + trace!( + target: "sync::pipeline", + ?next_action, + minimum_progress = ?self.progress.minimum_progress, + max_block = ?self.max_block, + "Terminating pipeline." + ); return Ok(()) } } @@ -168,18 +175,18 @@ impl Pipeline { updater.update_sync_state(state); } - trace!( - target: "sync::pipeline", - stage = %stage_id, - "Executing stage" - ); + trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage"); let next = self .execute_stage_to_completion(db, previous_stage, stage_index) .instrument(info_span!("execute", stage = %stage_id)) .await?; match next { - ControlFlow::NoProgress => {} // noop + ControlFlow::NoProgress { stage_progress } => { + if let Some(progress) = stage_progress { + self.progress.update(progress); + } + } ControlFlow::Continue { progress } => self.progress.update(progress), ControlFlow::Unwind { target, bad_block } => { // reset the sync state @@ -277,7 +284,7 @@ impl Pipeline { self.listeners.notify(PipelineEvent::Skipped { stage_id }); // We reached the maximum block, so we skip the stage - return Ok(ControlFlow::NoProgress) + return Ok(ControlFlow::NoProgress { stage_progress: prev_progress }) } self.listeners @@ -308,7 +315,7 @@ impl Pipeline { return Ok(if made_progress { ControlFlow::Continue { progress: stage_progress } } else { - ControlFlow::NoProgress + ControlFlow::NoProgress { stage_progress: Some(stage_progress) } }) } } @@ -405,7 +412,7 @@ impl PipelineProgress { fn next_ctrl(&self) -> ControlFlow { match self.progress { Some(progress) => ControlFlow::Continue { progress }, - None => ControlFlow::NoProgress, + None => ControlFlow::NoProgress { stage_progress: None }, } } } @@ -458,7 +465,7 @@ mod tests { fn progress_ctrl_flow() { let mut progress = PipelineProgress::default(); - assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress); + assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { stage_progress: None }); progress.update(1); assert_eq!(progress.next_ctrl(), ControlFlow::Continue { progress: 1 });