chore(cli): display latest available block number in status logs (#3316)

This commit is contained in:
Roman Krasiuk
2023-06-22 15:02:36 +03:00
committed by GitHub
parent 1d3bab64ae
commit 5cd2148789
4 changed files with 39 additions and 26 deletions

View File

@@ -7,6 +7,7 @@ use clap::Parser;
use eyre::Context;
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_provider::{ProviderFactory, StageCheckpointReader};
use crate::args::utils::genesis_value_parser;
use reth_config::Config;
@@ -16,7 +17,7 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient,
};
use reth_interfaces::consensus::Consensus;
use reth_primitives::{ChainSpec, H256};
use reth_primitives::{stage::StageId, ChainSpec, H256};
use reth_staged_sync::utils::init::{init_db, init_genesis};
use reth_stages::{
prelude::*,
@@ -105,13 +106,18 @@ impl ImportCommand {
info!(target: "reth::cli", "Chain file imported");
let (mut pipeline, events) =
self.build_import_pipeline(config, db, &consensus, file_client).await?;
self.build_import_pipeline(config, Arc::clone(&db), &consensus, file_client).await?;
// override the tip
pipeline.set_tip(tip);
debug!(target: "reth::cli", ?tip, "Tip manually set");
tokio::spawn(handle_events(None, events));
let factory = ProviderFactory::new(&db, self.chain.clone());
let provider = factory.provider().map_err(PipelineError::Interface)?;
let latest_block_number =
provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
tokio::spawn(handle_events(None, latest_block_number, events));
// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");

View File

@@ -234,26 +234,27 @@ impl Command {
&ctx.task_executor,
)?;
let factory = ProviderFactory::new(&db, self.chain.clone());
let provider = factory.provider().map_err(PipelineError::Interface)?;
let latest_block_number =
provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
if latest_block_number.unwrap_or_default() >= self.to {
info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
return Ok(())
}
let pipeline_events = pipeline.events();
let events = stream_select(
network.event_listener().map(Into::into),
pipeline_events.map(Into::into),
);
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
let factory = ProviderFactory::new(&db, self.chain.clone());
let provider = factory.provider().map_err(PipelineError::Interface)?;
let latest_block_number =
provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number;
if latest_block_number >= self.to {
info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
return Ok(())
}
let mut current_max_block = latest_block_number;
ctx.task_executor.spawn_critical(
"events task",
events::handle_events(Some(network.clone()), latest_block_number, events),
);
let mut current_max_block = latest_block_number.unwrap_or_default();
while current_max_block < self.to {
let next_block = current_max_block + 1;
let target_block = self.to.min(current_max_block + self.interval);

View File

@@ -38,13 +38,13 @@ struct NodeState {
}
impl NodeState {
fn new(network: Option<NetworkHandle>) -> Self {
fn new(network: Option<NetworkHandle>, latest_block_number: Option<BlockNumber>) -> Self {
Self {
network,
current_stage: None,
eta: Eta::default(),
current_checkpoint: StageCheckpoint::new(0),
latest_canonical_engine_block: None,
latest_canonical_engine_block: latest_block_number,
}
}
@@ -190,11 +190,14 @@ impl From<ConsensusLayerHealthEvent> for NodeEvent {
/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events<E>(network: Option<NetworkHandle>, events: E)
where
pub async fn handle_events<E>(
network: Option<NetworkHandle>,
latest_block_number: Option<BlockNumber>,
events: E,
) where
E: Stream<Item = NodeEvent> + Unpin,
{
let state = NodeState::new(network);
let state = NodeState::new(network, latest_block_number);
let mut info_interval = tokio::time::interval(INFO_MESSAGE_INTERVAL);
info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

View File

@@ -236,10 +236,12 @@ impl Command {
debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file");
let secret_key = get_secret_key(&network_secret_path)?;
let default_peers_path = data_dir.known_peers_path();
let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing");
let network_config = self.load_network_config(
&config,
Arc::clone(&db),
ctx.task_executor.clone(),
head,
secret_key,
default_peers_path.clone(),
);
@@ -360,8 +362,10 @@ impl Command {
Either::Right(stream::empty())
}
);
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
ctx.task_executor.spawn_critical(
"events task",
events::handle_events(Some(network.clone()), Some(head.number), events),
);
let engine_api = EngineApi::new(
blockchain_db.clone(),
@@ -600,11 +604,10 @@ impl Command {
config: &Config,
db: Arc<Env<WriteMap>>,
executor: TaskExecutor,
head: Head,
secret_key: SecretKey,
default_peers_path: PathBuf,
) -> NetworkConfig<ProviderFactory<Arc<Env<WriteMap>>>> {
let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing");
self.network
.network_config(config, self.chain.clone(), secret_key, default_peers_path)
.with_task_executor(Box::new(executor))