From 5cd2148789f93a8561e8aecfdd4ea320c2344187 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Thu, 22 Jun 2023 15:02:36 +0300 Subject: [PATCH] chore(cli): display latest available block number in status logs (#3316) --- bin/reth/src/chain/import.rs | 12 +++++++++--- bin/reth/src/debug_cmd/execution.rs | 29 +++++++++++++++-------------- bin/reth/src/node/events.rs | 13 ++++++++----- bin/reth/src/node/mod.rs | 11 +++++++---- 4 files changed, 39 insertions(+), 26 deletions(-) diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index ca4904fccb..6a1a79e693 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -7,6 +7,7 @@ use clap::Parser; use eyre::Context; use futures::{Stream, StreamExt}; use reth_beacon_consensus::BeaconConsensus; +use reth_provider::{ProviderFactory, StageCheckpointReader}; use crate::args::utils::genesis_value_parser; use reth_config::Config; @@ -16,7 +17,7 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, }; use reth_interfaces::consensus::Consensus; -use reth_primitives::{ChainSpec, H256}; +use reth_primitives::{stage::StageId, ChainSpec, H256}; use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_stages::{ prelude::*, @@ -105,13 +106,18 @@ impl ImportCommand { info!(target: "reth::cli", "Chain file imported"); let (mut pipeline, events) = - self.build_import_pipeline(config, db, &consensus, file_client).await?; + self.build_import_pipeline(config, Arc::clone(&db), &consensus, file_client).await?; // override the tip pipeline.set_tip(tip); debug!(target: "reth::cli", ?tip, "Tip manually set"); - tokio::spawn(handle_events(None, events)); + let factory = ProviderFactory::new(&db, self.chain.clone()); + let provider = factory.provider().map_err(PipelineError::Interface)?; + + let latest_block_number = + provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); + tokio::spawn(handle_events(None, latest_block_number, events)); // Run pipeline info!(target: "reth::cli", "Starting sync pipeline"); diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 0865f5abbb..0533dd3daf 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -234,26 +234,27 @@ impl Command { &ctx.task_executor, )?; + let factory = ProviderFactory::new(&db, self.chain.clone()); + let provider = factory.provider().map_err(PipelineError::Interface)?; + + let latest_block_number = + provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); + if latest_block_number.unwrap_or_default() >= self.to { + info!(target: "reth::cli", latest = latest_block_number, "Nothing to run"); + return Ok(()) + } + let pipeline_events = pipeline.events(); let events = stream_select( network.event_listener().map(Into::into), pipeline_events.map(Into::into), ); - ctx.task_executor - .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); - - let factory = ProviderFactory::new(&db, self.chain.clone()); - let provider = factory.provider().map_err(PipelineError::Interface)?; - - let latest_block_number = - provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; - if latest_block_number >= self.to { - info!(target: "reth::cli", latest = latest_block_number, "Nothing to run"); - return Ok(()) - } - - let mut current_max_block = latest_block_number; + ctx.task_executor.spawn_critical( + "events task", + events::handle_events(Some(network.clone()), latest_block_number, events), + ); + let mut current_max_block = latest_block_number.unwrap_or_default(); while current_max_block < self.to { let next_block = current_max_block + 1; let target_block = self.to.min(current_max_block + self.interval); diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 3187132b7e..4f0c673e09 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -38,13 +38,13 @@ struct NodeState { } impl NodeState { - fn new(network: Option) -> Self { + fn new(network: Option, latest_block_number: Option) -> Self { Self { network, current_stage: None, eta: Eta::default(), current_checkpoint: StageCheckpoint::new(0), - latest_canonical_engine_block: None, + latest_canonical_engine_block: latest_block_number, } } @@ -190,11 +190,14 @@ impl From for NodeEvent { /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. -pub async fn handle_events(network: Option, events: E) -where +pub async fn handle_events( + network: Option, + latest_block_number: Option, + events: E, +) where E: Stream + Unpin, { - let state = NodeState::new(network); + let state = NodeState::new(network, latest_block_number); let mut info_interval = tokio::time::interval(INFO_MESSAGE_INTERVAL); info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 3bd2609b29..9aa3e8eab0 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -236,10 +236,12 @@ impl Command { debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); let secret_key = get_secret_key(&network_secret_path)?; let default_peers_path = data_dir.known_peers_path(); + let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing"); let network_config = self.load_network_config( &config, Arc::clone(&db), ctx.task_executor.clone(), + head, secret_key, default_peers_path.clone(), ); @@ -360,8 +362,10 @@ impl Command { Either::Right(stream::empty()) } ); - ctx.task_executor - .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); + ctx.task_executor.spawn_critical( + "events task", + events::handle_events(Some(network.clone()), Some(head.number), events), + ); let engine_api = EngineApi::new( blockchain_db.clone(), @@ -600,11 +604,10 @@ impl Command { config: &Config, db: Arc>, executor: TaskExecutor, + head: Head, secret_key: SecretKey, default_peers_path: PathBuf, ) -> NetworkConfig>>> { - let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing"); - self.network .network_config(config, self.chain.clone(), secret_key, default_peers_path) .with_task_executor(Box::new(executor))